Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 /**
35  * \addtogoup nrs
36  * @{
37  */
38
39 #define DEBUG_SUBSYSTEM S_RPC
40 #include <obd_support.h>
41 #include <obd_class.h>
42 #include <libcfs/libcfs.h>
43 #include <lustre_req_layout.h>
44 #include "ptlrpc_internal.h"
45
46 /**
47  * \name tbf
48  *
49  * Token Bucket Filter over client NIDs
50  *
51  * @{
52  */
53
54 #define NRS_POL_NAME_TBF        "tbf"
55
56 static int tbf_jobid_cache_size = 8192;
57 module_param(tbf_jobid_cache_size, int, 0644);
58 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
59
60 static int tbf_rate = 10000;
61 module_param(tbf_rate, int, 0644);
62 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
63
64 static int tbf_depth = 3;
65 module_param(tbf_depth, int, 0644);
66 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
67
68 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
69 {
70         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
71                                                  th_timer);
72         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
73         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
74
75         nrs->nrs_throttling = 0;
76         wake_up(&svcpt->scp_waitq);
77
78         return HRTIMER_NORESTART;
79 }
80
81 #define NRS_TBF_DEFAULT_RULE "default"
82
83 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
84 {
85         LASSERT(atomic_read(&rule->tr_ref) == 0);
86         LASSERT(list_empty(&rule->tr_cli_list));
87         LASSERT(list_empty(&rule->tr_linkage));
88
89         rule->tr_head->th_ops->o_rule_fini(rule);
90         OBD_FREE_PTR(rule);
91 }
92
93 /**
94  * Decreases the rule's usage reference count, and stops the rule in case it
95  * was already stopping and have no more outstanding usage references (which
96  * indicates it has no more queued or started requests, and can be safely
97  * stopped).
98  */
99 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
100 {
101         if (atomic_dec_and_test(&rule->tr_ref))
102                 nrs_tbf_rule_fini(rule);
103 }
104
105 /**
106  * Increases the rule's usage reference count.
107  */
108 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
109 {
110         atomic_inc(&rule->tr_ref);
111 }
112
113 static void
114 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
115 {
116         LASSERT(!list_empty(&cli->tc_linkage));
117         LASSERT(cli->tc_rule);
118         spin_lock(&cli->tc_rule->tr_rule_lock);
119         list_del_init(&cli->tc_linkage);
120         spin_unlock(&cli->tc_rule->tr_rule_lock);
121         nrs_tbf_rule_put(cli->tc_rule);
122         cli->tc_rule = NULL;
123 }
124
125 static void
126 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
127                         struct nrs_tbf_client *cli)
128
129 {
130         struct nrs_tbf_rule *rule = cli->tc_rule;
131
132         cli->tc_rpc_rate = rule->tr_rpc_rate;
133         cli->tc_nsecs = rule->tr_nsecs_per_rpc;
134         cli->tc_nsecs_resid = 0;
135         cli->tc_depth = rule->tr_depth;
136         cli->tc_ntoken = rule->tr_depth;
137         cli->tc_check_time = ktime_to_ns(ktime_get());
138         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
139         cli->tc_rule_generation = rule->tr_generation;
140
141         if (cli->tc_in_heap)
142                 binheap_relocate(head->th_binheap,
143                                  &cli->tc_node);
144 }
145
146 static void
147 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
148                   struct nrs_tbf_rule *rule,
149                   struct nrs_tbf_client *cli)
150 {
151         spin_lock(&cli->tc_rule_lock);
152         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
153                 LASSERT(rule != cli->tc_rule);
154                 nrs_tbf_cli_rule_put(cli);
155         }
156         LASSERT(cli->tc_rule == NULL);
157         LASSERT(list_empty(&cli->tc_linkage));
158         /* Rule's ref is added before called */
159         cli->tc_rule = rule;
160         spin_lock(&rule->tr_rule_lock);
161         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
162         spin_unlock(&rule->tr_rule_lock);
163         spin_unlock(&cli->tc_rule_lock);
164         nrs_tbf_cli_reset_value(head, cli);
165 }
166
167 static int
168 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
169 {
170         return rule->tr_head->th_ops->o_rule_dump(rule, m);
171 }
172
173 static int
174 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
175 {
176         struct nrs_tbf_rule *rule;
177         int rc = 0;
178
179         LASSERT(head != NULL);
180         spin_lock(&head->th_rule_lock);
181         /* List the rules from newest to oldest */
182         list_for_each_entry(rule, &head->th_list, tr_linkage) {
183                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
184                 rc = nrs_tbf_rule_dump(rule, m);
185                 if (rc) {
186                         rc = -ENOSPC;
187                         break;
188                 }
189         }
190         spin_unlock(&head->th_rule_lock);
191
192         return rc;
193 }
194
195 static struct nrs_tbf_rule *
196 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
197                          const char *name)
198 {
199         struct nrs_tbf_rule *rule;
200
201         LASSERT(head != NULL);
202         list_for_each_entry(rule, &head->th_list, tr_linkage) {
203                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
204                 if (strcmp(rule->tr_name, name) == 0) {
205                         nrs_tbf_rule_get(rule);
206                         return rule;
207                 }
208         }
209         return NULL;
210 }
211
212 static struct nrs_tbf_rule *
213 nrs_tbf_rule_find(struct nrs_tbf_head *head,
214                   const char *name)
215 {
216         struct nrs_tbf_rule *rule;
217
218         LASSERT(head != NULL);
219         spin_lock(&head->th_rule_lock);
220         rule = nrs_tbf_rule_find_nolock(head, name);
221         spin_unlock(&head->th_rule_lock);
222         return rule;
223 }
224
225 static struct nrs_tbf_rule *
226 nrs_tbf_rule_match(struct nrs_tbf_head *head,
227                    struct nrs_tbf_client *cli)
228 {
229         struct nrs_tbf_rule *rule = NULL;
230         struct nrs_tbf_rule *tmp_rule;
231
232         spin_lock(&head->th_rule_lock);
233         /* Match the newest rule in the list */
234         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
235                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
236                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
237                         rule = tmp_rule;
238                         break;
239                 }
240         }
241
242         if (rule == NULL)
243                 rule = head->th_rule;
244
245         nrs_tbf_rule_get(rule);
246         spin_unlock(&head->th_rule_lock);
247         return rule;
248 }
249
250 static void
251 nrs_tbf_cli_init(struct nrs_tbf_head *head,
252                  struct nrs_tbf_client *cli,
253                  struct ptlrpc_request *req)
254 {
255         struct nrs_tbf_rule *rule;
256
257         memset(cli, 0, sizeof(*cli));
258         cli->tc_in_heap = false;
259         head->th_ops->o_cli_init(cli, req);
260         INIT_LIST_HEAD(&cli->tc_list);
261         INIT_LIST_HEAD(&cli->tc_linkage);
262         spin_lock_init(&cli->tc_rule_lock);
263         refcount_set(&cli->tc_ref, 1);
264         rule = nrs_tbf_rule_match(head, cli);
265         nrs_tbf_cli_reset(head, rule, cli);
266 }
267
268 static void
269 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
270 {
271         LASSERT(list_empty(&cli->tc_list));
272         LASSERT(!cli->tc_in_heap);
273         spin_lock(&cli->tc_rule_lock);
274         nrs_tbf_cli_rule_put(cli);
275         spin_unlock(&cli->tc_rule_lock);
276         OBD_FREE_PTR(cli);
277 }
278
279 static int
280 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
281                    struct nrs_tbf_head *head,
282                    struct nrs_tbf_cmd *start)
283 {
284         struct nrs_tbf_rule     *rule;
285         struct nrs_tbf_rule     *tmp_rule;
286         struct nrs_tbf_rule     *next_rule;
287         char                    *next_name = start->u.tc_start.ts_next_name;
288         int                      rc;
289
290         rule = nrs_tbf_rule_find(head, start->tc_name);
291         if (rule) {
292                 nrs_tbf_rule_put(rule);
293                 return -EEXIST;
294         }
295
296         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
297         if (rule == NULL)
298                 return -ENOMEM;
299
300         strscpy(rule->tr_name, start->tc_name, sizeof(rule->tr_name));
301         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
302         rule->tr_flags = start->u.tc_start.ts_rule_flags;
303         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
304         rule->tr_depth = tbf_depth;
305         atomic_set(&rule->tr_ref, 1);
306         INIT_LIST_HEAD(&rule->tr_cli_list);
307         INIT_LIST_HEAD(&rule->tr_nids);
308         INIT_LIST_HEAD(&rule->tr_linkage);
309         spin_lock_init(&rule->tr_rule_lock);
310         rule->tr_head = head;
311
312         rc = head->th_ops->o_rule_init(policy, rule, start);
313         if (rc) {
314                 OBD_FREE_PTR(rule);
315                 return rc;
316         }
317
318         /* Add as the newest rule */
319         spin_lock(&head->th_rule_lock);
320         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
321         if (tmp_rule) {
322                 spin_unlock(&head->th_rule_lock);
323                 nrs_tbf_rule_put(tmp_rule);
324                 nrs_tbf_rule_put(rule);
325                 return -EEXIST;
326         }
327
328         if (next_name) {
329                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
330                 if (!next_rule) {
331                         spin_unlock(&head->th_rule_lock);
332                         nrs_tbf_rule_put(rule);
333                         return -ENOENT;
334                 }
335
336                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
337                 nrs_tbf_rule_put(next_rule);
338         } else {
339                 /* Add on the top of the rule list */
340                 list_add(&rule->tr_linkage, &head->th_list);
341         }
342         spin_unlock(&head->th_rule_lock);
343         atomic_inc(&head->th_rule_sequence);
344         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
345                 rule->tr_flags |= NTRS_DEFAULT;
346                 LASSERT(head->th_rule == NULL);
347                 head->th_rule = rule;
348         }
349
350         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %llu gen %llu\n",
351                rule, rule->tr_rpc_rate, rule->tr_generation);
352
353         return 0;
354 }
355
356 /**
357  * Change the rank of a rule in the rule list
358  *
359  * The matched rule will be moved to the position right before another
360  * given rule.
361  *
362  * \param[in] policy    the policy instance
363  * \param[in] head      the TBF policy instance
364  * \param[in] name      the rule name to be moved
365  * \param[in] next_name the rule name before which the matched rule will be
366  *                      moved
367  *
368  */
369 static int
370 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
371                          struct nrs_tbf_head *head,
372                          char *name,
373                          char *next_name)
374 {
375         struct nrs_tbf_rule     *rule = NULL;
376         struct nrs_tbf_rule     *next_rule = NULL;
377         int                      rc = 0;
378
379         LASSERT(head != NULL);
380
381         spin_lock(&head->th_rule_lock);
382         rule = nrs_tbf_rule_find_nolock(head, name);
383         if (!rule)
384                 GOTO(out, rc = -ENOENT);
385
386         if (strcmp(name, next_name) == 0)
387                 GOTO(out_put, rc);
388
389         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
390         if (!next_rule)
391                 GOTO(out_put, rc = -ENOENT);
392
393         /* rules may be adjacent in same list, so list_move() isn't safe here */
394         list_move_tail(&rule->tr_linkage, &next_rule->tr_linkage);
395         nrs_tbf_rule_put(next_rule);
396 out_put:
397         nrs_tbf_rule_put(rule);
398 out:
399         spin_unlock(&head->th_rule_lock);
400         return rc;
401 }
402
403 static int
404 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
405                          struct nrs_tbf_head *head,
406                          char *name,
407                          __u64 rate)
408 {
409         struct nrs_tbf_rule *rule;
410
411         assert_spin_locked(&policy->pol_nrs->nrs_lock);
412
413         rule = nrs_tbf_rule_find(head, name);
414         if (rule == NULL)
415                 return -ENOENT;
416
417         rule->tr_rpc_rate = rate;
418         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
419         rule->tr_generation++;
420         nrs_tbf_rule_put(rule);
421
422         return 0;
423 }
424
425 static int
426 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
427                     struct nrs_tbf_head *head,
428                     struct nrs_tbf_cmd *change)
429 {
430         __u64    rate = change->u.tc_change.tc_rpc_rate;
431         char    *next_name = change->u.tc_change.tc_next_name;
432         int      rc;
433
434         if (rate != 0) {
435                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
436                                               rate);
437                 if (rc)
438                         return rc;
439         }
440
441         if (next_name) {
442                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
443                                               next_name);
444                 if (rc)
445                         return rc;
446         }
447
448         return 0;
449 }
450
451 static int
452 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
453                   struct nrs_tbf_head *head,
454                   struct nrs_tbf_cmd *stop)
455 {
456         struct nrs_tbf_rule *rule;
457
458         assert_spin_locked(&policy->pol_nrs->nrs_lock);
459
460         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
461                 return -EPERM;
462
463         rule = nrs_tbf_rule_find(head, stop->tc_name);
464         if (rule == NULL)
465                 return -ENOENT;
466
467         list_del_init(&rule->tr_linkage);
468         rule->tr_flags |= NTRS_STOPPING;
469         nrs_tbf_rule_put(rule);
470         nrs_tbf_rule_put(rule);
471
472         return 0;
473 }
474
475 static int
476 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
477                 struct nrs_tbf_head *head,
478                 struct nrs_tbf_cmd *cmd)
479 {
480         int rc;
481
482         assert_spin_locked(&policy->pol_nrs->nrs_lock);
483
484         switch (cmd->tc_cmd) {
485         case NRS_CTL_TBF_START_RULE:
486                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
487                         return -EINVAL;
488
489                 spin_unlock(&policy->pol_nrs->nrs_lock);
490                 rc = nrs_tbf_rule_start(policy, head, cmd);
491                 spin_lock(&policy->pol_nrs->nrs_lock);
492                 return rc;
493         case NRS_CTL_TBF_CHANGE_RULE:
494                 rc = nrs_tbf_rule_change(policy, head, cmd);
495                 return rc;
496         case NRS_CTL_TBF_STOP_RULE:
497                 rc = nrs_tbf_rule_stop(policy, head, cmd);
498                 /* Take it as a success, if not exists at all */
499                 return rc == -ENOENT ? 0 : rc;
500         default:
501                 return -EFAULT;
502         }
503 }
504
505 /**
506  * Binary heap predicate.
507  *
508  * \param[in] e1 the first binheap node to compare
509  * \param[in] e2 the second binheap node to compare
510  *
511  * \retval 0 e1 > e2
512  * \retval 1 e1 < e2
513  */
514 static int
515 tbf_cli_compare(struct binheap_node *e1, struct binheap_node *e2)
516 {
517         struct nrs_tbf_client *cli1;
518         struct nrs_tbf_client *cli2;
519
520         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
521         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
522
523         if (cli1->tc_deadline < cli2->tc_deadline)
524                 return 1;
525         else if (cli1->tc_deadline > cli2->tc_deadline)
526                 return 0;
527
528         if (cli1->tc_check_time < cli2->tc_check_time)
529                 return 1;
530         else if (cli1->tc_check_time > cli2->tc_check_time)
531                 return 0;
532
533         /* Maybe need more comparasion, e.g. request number in the rules */
534         return 1;
535 }
536
537 /**
538  * TBF binary heap operations
539  */
540 static struct binheap_ops nrs_tbf_heap_ops = {
541         .hop_enter      = NULL,
542         .hop_exit       = NULL,
543         .hop_compare    = tbf_cli_compare,
544 };
545
546 static unsigned int
547 nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
548                        const unsigned int bits)
549 {
550         return cfs_hash_djb2_hash(key, strlen(key), bits);
551 }
552
553 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
554 {
555         struct nrs_tbf_client *cli = hlist_entry(hnode,
556                                                      struct nrs_tbf_client,
557                                                      tc_hnode);
558
559         return (strcmp(cli->tc_jobid, key) == 0);
560 }
561
562 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
563 {
564         struct nrs_tbf_client *cli = hlist_entry(hnode,
565                                                      struct nrs_tbf_client,
566                                                      tc_hnode);
567
568         return cli->tc_jobid;
569 }
570
571 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
572 {
573         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
574 }
575
576 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
577 {
578         struct nrs_tbf_client *cli = hlist_entry(hnode,
579                                                      struct nrs_tbf_client,
580                                                      tc_hnode);
581
582         refcount_inc(&cli->tc_ref);
583 }
584
585 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
586 {
587         struct nrs_tbf_client *cli = hlist_entry(hnode,
588                                                      struct nrs_tbf_client,
589                                                      tc_hnode);
590
591         refcount_dec(&cli->tc_ref);
592 }
593
594 static void
595 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
596
597 {
598         struct nrs_tbf_client *cli = hlist_entry(hnode,
599                                                  struct nrs_tbf_client,
600                                                  tc_hnode);
601
602         nrs_tbf_cli_fini(cli);
603 }
604
605 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
606         .hs_hash        = nrs_tbf_jobid_hop_hash,
607         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
608         .hs_key         = nrs_tbf_jobid_hop_key,
609         .hs_object      = nrs_tbf_hop_object,
610         .hs_get         = nrs_tbf_jobid_hop_get,
611         .hs_put         = nrs_tbf_jobid_hop_put,
612         .hs_put_locked  = nrs_tbf_jobid_hop_put,
613         .hs_exit        = nrs_tbf_jobid_hop_exit,
614 };
615
616 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
617                                   CFS_HASH_NO_ITEMREF | \
618                                   CFS_HASH_DEPTH)
619
620 static struct nrs_tbf_client *
621 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
622                           struct cfs_hash_bd *bd,
623                           const char *jobid)
624 {
625         struct hlist_node *hnode;
626         struct nrs_tbf_client *cli;
627
628         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
629         if (hnode == NULL)
630                 return NULL;
631
632         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
633         if (!list_empty(&cli->tc_lru))
634                 list_del_init(&cli->tc_lru);
635         return cli;
636 }
637
638 #define NRS_TBF_JOBID_NULL ""
639
640 static struct nrs_tbf_client *
641 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
642                        struct ptlrpc_request *req)
643 {
644         const char              *jobid;
645         struct nrs_tbf_client   *cli;
646         struct cfs_hash         *hs = head->th_cli_hash;
647         struct cfs_hash_bd               bd;
648
649         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
650         if (jobid == NULL)
651                 jobid = NRS_TBF_JOBID_NULL;
652         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
653         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
654         cfs_hash_bd_unlock(hs, &bd, 1);
655
656         return cli;
657 }
658
659 static struct nrs_tbf_client *
660 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
661                           struct nrs_tbf_client *cli)
662 {
663         const char              *jobid;
664         struct nrs_tbf_client   *ret;
665         struct cfs_hash         *hs = head->th_cli_hash;
666         struct cfs_hash_bd               bd;
667
668         jobid = cli->tc_jobid;
669         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
670         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
671         if (ret == NULL) {
672                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
673                 ret = cli;
674         }
675         cfs_hash_bd_unlock(hs, &bd, 1);
676
677         return ret;
678 }
679
680 static void
681 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
682                       struct nrs_tbf_client *cli)
683 {
684         struct cfs_hash_bd               bd;
685         struct cfs_hash         *hs = head->th_cli_hash;
686         struct nrs_tbf_bucket   *bkt;
687         int                      hw;
688         LIST_HEAD(zombies);
689
690         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
691         bkt = cfs_hash_bd_extra_get(hs, &bd);
692         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
693                 return;
694         LASSERT(list_empty(&cli->tc_lru));
695         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
696
697         /*
698          * Check and purge the LRU, there is at least one client in the LRU.
699          */
700         hw = tbf_jobid_cache_size >>
701              (hs->hs_cur_bits - hs->hs_bkt_bits);
702         while (cfs_hash_bd_count_get(&bd) > hw) {
703                 if (unlikely(list_empty(&bkt->ntb_lru)))
704                         break;
705                 cli = list_first_entry(&bkt->ntb_lru,
706                                        struct nrs_tbf_client,
707                                        tc_lru);
708                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
709                 list_move(&cli->tc_lru, &zombies);
710         }
711         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
712
713         while (!list_empty(&zombies)) {
714                 cli = container_of(zombies.next,
715                                    struct nrs_tbf_client, tc_lru);
716                 list_del_init(&cli->tc_lru);
717                 nrs_tbf_cli_fini(cli);
718         }
719 }
720
721 static void
722 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
723                        struct ptlrpc_request *req)
724 {
725         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
726
727         if (jobid == NULL)
728                 jobid = NRS_TBF_JOBID_NULL;
729         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
730         INIT_LIST_HEAD(&cli->tc_lru);
731         memcpy(cli->tc_jobid, jobid, strlen(jobid));
732 }
733
734 static int nrs_tbf_jobid_hash_order(void)
735 {
736         int bits;
737
738         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
739                 ;
740
741         return bits;
742 }
743
744 #define NRS_TBF_JOBID_BKT_BITS 10
745
746 static int
747 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
748                       struct nrs_tbf_head *head)
749 {
750         struct nrs_tbf_cmd       start;
751         struct nrs_tbf_bucket   *bkt;
752         int                      bits;
753         int                      i;
754         int                      rc;
755         struct cfs_hash_bd       bd;
756
757         bits = nrs_tbf_jobid_hash_order();
758         if (bits < NRS_TBF_JOBID_BKT_BITS)
759                 bits = NRS_TBF_JOBID_BKT_BITS;
760         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
761                                             bits,
762                                             bits,
763                                             NRS_TBF_JOBID_BKT_BITS,
764                                             sizeof(*bkt),
765                                             0,
766                                             0,
767                                             &nrs_tbf_jobid_hash_ops,
768                                             NRS_TBF_JOBID_HASH_FLAGS);
769         if (head->th_cli_hash == NULL)
770                 return -ENOMEM;
771
772         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
773                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
774                 INIT_LIST_HEAD(&bkt->ntb_lru);
775         }
776
777         memset(&start, 0, sizeof(start));
778         start.u.tc_start.ts_jobids_str = "*";
779
780         start.u.tc_start.ts_rpc_rate = tbf_rate;
781         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
782         start.tc_name = NRS_TBF_DEFAULT_RULE;
783         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
784         rc = nrs_tbf_rule_start(policy, head, &start);
785         if (rc) {
786                 cfs_hash_putref(head->th_cli_hash);
787                 head->th_cli_hash = NULL;
788         }
789
790         return rc;
791 }
792
793 /**
794  * Frees jobid of \a list.
795  *
796  */
797 static void
798 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
799 {
800         struct nrs_tbf_jobid *jobid, *n;
801
802         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
803                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
804                 list_del(&jobid->tj_linkage);
805                 OBD_FREE_PTR(jobid);
806         }
807 }
808
809 static int
810 nrs_tbf_jobid_list_add(char *id, struct list_head *jobid_list)
811 {
812         struct nrs_tbf_jobid *jobid;
813         char *ptr;
814
815         OBD_ALLOC_PTR(jobid);
816         if (jobid == NULL)
817                 return -ENOMEM;
818
819         OBD_ALLOC(jobid->tj_id, strlen(id) + 1);
820         if (jobid->tj_id == NULL) {
821                 OBD_FREE_PTR(jobid);
822                 return -ENOMEM;
823         }
824
825         strcpy(jobid->tj_id, id);
826         ptr = strchr(id, '*');
827         if (ptr == NULL)
828                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
829         else
830                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
831
832         list_add_tail(&jobid->tj_linkage, jobid_list);
833         return 0;
834 }
835
836 static bool
837 cfs_match_wildcard(const char *pattern, const char *content)
838 {
839         if (*pattern == '\0' && *content == '\0')
840                 return true;
841
842         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
843                 return false;
844
845         while (*pattern == *content) {
846                 pattern++;
847                 content++;
848                 if (*pattern == '\0' && *content == '\0')
849                         return true;
850
851                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
852                     *content == '\0')
853                         return false;
854         }
855
856         if (*pattern == '*')
857                 return (cfs_match_wildcard(pattern + 1, content) ||
858                         cfs_match_wildcard(pattern, content + 1));
859
860         return false;
861 }
862
863 static inline bool
864 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
865 {
866         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
867                 return strcmp(jobid->tj_id, id) == 0;
868
869         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
870                 return cfs_match_wildcard(jobid->tj_id, id);
871
872         return false;
873 }
874
875 static int
876 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
877 {
878         struct nrs_tbf_jobid *jobid;
879
880         list_for_each_entry(jobid, jobid_list, tj_linkage) {
881                 if (nrs_tbf_jobid_match(jobid, id))
882                         return 1;
883         }
884         return 0;
885 }
886
887 static int
888 nrs_tbf_jobid_list_parse(char *orig, struct list_head *jobid_list)
889 {
890         char *str, *copy;
891         int rc = 0;
892         ENTRY;
893
894         copy = kstrdup(orig, GFP_KERNEL);
895         if (!copy)
896                 return -ENOMEM;
897         str = copy;
898         INIT_LIST_HEAD(jobid_list);
899         while (str && rc == 0) {
900                 char *tok = strsep(&str, " ");
901
902                 if (*tok)
903                         rc = nrs_tbf_jobid_list_add(tok, jobid_list);
904         }
905         if (list_empty(jobid_list))
906                 rc = -EINVAL;
907         if (rc)
908                 nrs_tbf_jobid_list_free(jobid_list);
909         kfree(copy);
910         RETURN(rc);
911 }
912
913 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
914 {
915         if (!list_empty(&cmd->u.tc_start.ts_jobids))
916                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
917         if (cmd->u.tc_start.ts_jobids_str)
918                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
919                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
920 }
921
922 static int nrs_tbf_check_id_value(char **strp, char *key)
923 {
924         char *str = *strp;
925         char *tok;
926         int len;
927
928         tok = strim(strsep(&str, "="));
929         if (!*tok || !str)
930                 /* No LHS or no '=' */
931                 return -EINVAL;
932         str = strim(str);
933         len = strlen(str);
934         if (strcmp(tok, key) != 0 ||
935             str[0] != '{' || str[len-1] != '}')
936                 /* Wrong key, or RHS missing {} */
937                 return -EINVAL;
938
939         /* Skip '{' and '}' */
940         str[len-1] = '\0';
941         str += 1;
942         *strp = str;
943         return 0;
944 }
945
946 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
947 {
948         int rc;
949
950         rc = nrs_tbf_check_id_value(&id, "jobid");
951         if (rc)
952                 return rc;
953
954         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, strlen(id) + 1);
955         if (cmd->u.tc_start.ts_jobids_str == NULL)
956                 return -ENOMEM;
957
958         strcpy(cmd->u.tc_start.ts_jobids_str, id);
959
960         /* parse jobid list */
961         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
962                                       &cmd->u.tc_start.ts_jobids);
963         if (rc)
964                 nrs_tbf_jobid_cmd_fini(cmd);
965
966         return rc;
967 }
968
969 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
970                                    struct nrs_tbf_rule *rule,
971                                    struct nrs_tbf_cmd *start)
972 {
973         int rc = 0;
974
975         LASSERT(start->u.tc_start.ts_jobids_str);
976         OBD_ALLOC(rule->tr_jobids_str,
977                   strlen(start->u.tc_start.ts_jobids_str) + 1);
978         if (rule->tr_jobids_str == NULL)
979                 return -ENOMEM;
980
981         memcpy(rule->tr_jobids_str,
982                start->u.tc_start.ts_jobids_str,
983                strlen(start->u.tc_start.ts_jobids_str));
984
985         INIT_LIST_HEAD(&rule->tr_jobids);
986         if (!list_empty(&start->u.tc_start.ts_jobids)) {
987                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
988                                               &rule->tr_jobids);
989                 if (rc)
990                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
991         }
992         if (rc)
993                 OBD_FREE(rule->tr_jobids_str,
994                          strlen(start->u.tc_start.ts_jobids_str) + 1);
995         return rc;
996 }
997
998 static int
999 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1000 {
1001         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1002                    rule->tr_jobids_str, rule->tr_rpc_rate,
1003                    atomic_read(&rule->tr_ref) - 1);
1004         return 0;
1005 }
1006
1007 static int
1008 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1009                          struct nrs_tbf_client *cli)
1010 {
1011         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1012 }
1013
1014 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1015 {
1016         if (!list_empty(&rule->tr_jobids))
1017                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1018         LASSERT(rule->tr_jobids_str != NULL);
1019         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1020 }
1021
1022 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1023         .o_name = NRS_TBF_TYPE_JOBID,
1024         .o_startup = nrs_tbf_jobid_startup,
1025         .o_cli_find = nrs_tbf_jobid_cli_find,
1026         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1027         .o_cli_put = nrs_tbf_jobid_cli_put,
1028         .o_cli_init = nrs_tbf_jobid_cli_init,
1029         .o_rule_init = nrs_tbf_jobid_rule_init,
1030         .o_rule_dump = nrs_tbf_jobid_rule_dump,
1031         .o_rule_match = nrs_tbf_jobid_rule_match,
1032         .o_rule_fini = nrs_tbf_jobid_rule_fini,
1033 };
1034
1035 /**
1036  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1037  *
1038  * This uses ptlrpc_request::rq_peer.nid (as nid4) as its key, in order to hash
1039  * nrs_tbf_client objects.
1040  */
1041 #define NRS_TBF_NID_BKT_BITS    8
1042 #define NRS_TBF_NID_BITS        16
1043
1044 static unsigned int
1045 nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1046                      const unsigned int bits)
1047 {
1048         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), bits);
1049 }
1050
1051 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1052 {
1053         const struct lnet_nid *nid = key;
1054         struct nrs_tbf_client *cli = hlist_entry(hnode,
1055                                                  struct nrs_tbf_client,
1056                                                  tc_hnode);
1057
1058         return nid_same(nid, &cli->tc_nid);
1059 }
1060
1061 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1062 {
1063         struct nrs_tbf_client *cli = hlist_entry(hnode,
1064                                                  struct nrs_tbf_client,
1065                                                  tc_hnode);
1066
1067         return &cli->tc_nid;
1068 }
1069
1070 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1071 {
1072         struct nrs_tbf_client *cli = hlist_entry(hnode,
1073                                                  struct nrs_tbf_client,
1074                                                  tc_hnode);
1075
1076         refcount_inc(&cli->tc_ref);
1077 }
1078
1079 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1080 {
1081         struct nrs_tbf_client *cli = hlist_entry(hnode,
1082                                                  struct nrs_tbf_client,
1083                                                  tc_hnode);
1084
1085         refcount_dec(&cli->tc_ref);
1086 }
1087
1088 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1089 {
1090         struct nrs_tbf_client *cli = hlist_entry(hnode,
1091                                                  struct nrs_tbf_client,
1092                                                  tc_hnode);
1093
1094         CDEBUG(D_RPCTRACE,
1095                "Busy TBF object from client with NID %s, with %d refs\n",
1096                libcfs_nidstr(&cli->tc_nid), refcount_read(&cli->tc_ref));
1097
1098         nrs_tbf_cli_fini(cli);
1099 }
1100
1101 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1102         .hs_hash        = nrs_tbf_nid_hop_hash,
1103         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1104         .hs_key         = nrs_tbf_nid_hop_key,
1105         .hs_object      = nrs_tbf_hop_object,
1106         .hs_get         = nrs_tbf_nid_hop_get,
1107         .hs_put         = nrs_tbf_nid_hop_put,
1108         .hs_put_locked  = nrs_tbf_nid_hop_put,
1109         .hs_exit        = nrs_tbf_nid_hop_exit,
1110 };
1111
1112 static struct nrs_tbf_client *
1113 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1114                      struct ptlrpc_request *req)
1115 {
1116         lnet_nid_t nid4 = lnet_nid_to_nid4(&req->rq_peer.nid);
1117
1118         return cfs_hash_lookup(head->th_cli_hash, &nid4);
1119 }
1120
1121 static struct nrs_tbf_client *
1122 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1123                         struct nrs_tbf_client *cli)
1124 {
1125         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1126                                        &cli->tc_hnode);
1127 }
1128
1129 static void
1130 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1131                       struct nrs_tbf_client *cli)
1132 {
1133         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1134 }
1135
1136 static int
1137 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1138                     struct nrs_tbf_head *head)
1139 {
1140         struct nrs_tbf_cmd      start;
1141         int rc;
1142
1143         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1144                                             NRS_TBF_NID_BITS,
1145                                             NRS_TBF_NID_BITS,
1146                                             NRS_TBF_NID_BKT_BITS, 0,
1147                                             CFS_HASH_MIN_THETA,
1148                                             CFS_HASH_MAX_THETA,
1149                                             &nrs_tbf_nid_hash_ops,
1150                                             CFS_HASH_RW_BKTLOCK);
1151         if (head->th_cli_hash == NULL)
1152                 return -ENOMEM;
1153
1154         memset(&start, 0, sizeof(start));
1155         start.u.tc_start.ts_nids_str = "*";
1156
1157         start.u.tc_start.ts_rpc_rate = tbf_rate;
1158         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1159         start.tc_name = NRS_TBF_DEFAULT_RULE;
1160         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1161         rc = nrs_tbf_rule_start(policy, head, &start);
1162         if (rc) {
1163                 cfs_hash_putref(head->th_cli_hash);
1164                 head->th_cli_hash = NULL;
1165         }
1166
1167         return rc;
1168 }
1169
1170 static void
1171 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1172                              struct ptlrpc_request *req)
1173 {
1174         cli->tc_nid = req->rq_peer.nid;
1175 }
1176
1177 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1178                                  struct nrs_tbf_rule *rule,
1179                                  struct nrs_tbf_cmd *start)
1180 {
1181         LASSERT(start->u.tc_start.ts_nids_str);
1182         OBD_ALLOC(rule->tr_nids_str,
1183                   strlen(start->u.tc_start.ts_nids_str) + 1);
1184         if (rule->tr_nids_str == NULL)
1185                 return -ENOMEM;
1186
1187         memcpy(rule->tr_nids_str,
1188                start->u.tc_start.ts_nids_str,
1189                strlen(start->u.tc_start.ts_nids_str));
1190
1191         INIT_LIST_HEAD(&rule->tr_nids);
1192         if (!list_empty(&start->u.tc_start.ts_nids)) {
1193                 if (cfs_parse_nidlist(rule->tr_nids_str,
1194                                       &rule->tr_nids) < 0) {
1195                         CERROR("nids {%s} illegal\n",
1196                                rule->tr_nids_str);
1197                         OBD_FREE(rule->tr_nids_str,
1198                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1199                         return -EINVAL;
1200                 }
1201         }
1202         return 0;
1203 }
1204
1205 static int
1206 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1207 {
1208         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1209                    rule->tr_nids_str, rule->tr_rpc_rate,
1210                    atomic_read(&rule->tr_ref) - 1);
1211         return 0;
1212 }
1213
1214 static int
1215 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1216                        struct nrs_tbf_client *cli)
1217 {
1218         return cfs_match_nid(&cli->tc_nid, &rule->tr_nids);
1219 }
1220
1221 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1222 {
1223         if (!list_empty(&rule->tr_nids))
1224                 cfs_free_nidlist(&rule->tr_nids);
1225         LASSERT(rule->tr_nids_str != NULL);
1226         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1227 }
1228
1229 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1230 {
1231         if (!list_empty(&cmd->u.tc_start.ts_nids))
1232                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1233         if (cmd->u.tc_start.ts_nids_str)
1234                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1235                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1236 }
1237
1238 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1239 {
1240         int rc;
1241
1242         rc = nrs_tbf_check_id_value(&id, "nid");
1243         if (rc)
1244                 return rc;
1245
1246         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, strlen(id) + 1);
1247         if (cmd->u.tc_start.ts_nids_str == NULL)
1248                 return -ENOMEM;
1249
1250         strcpy(cmd->u.tc_start.ts_nids_str, id);
1251
1252         /* parse NID list */
1253         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1254                               &cmd->u.tc_start.ts_nids) < 0) {
1255                 nrs_tbf_nid_cmd_fini(cmd);
1256                 return -EINVAL;
1257         }
1258
1259         return 0;
1260 }
1261
1262 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1263         .o_name = NRS_TBF_TYPE_NID,
1264         .o_startup = nrs_tbf_nid_startup,
1265         .o_cli_find = nrs_tbf_nid_cli_find,
1266         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1267         .o_cli_put = nrs_tbf_nid_cli_put,
1268         .o_cli_init = nrs_tbf_nid_cli_init,
1269         .o_rule_init = nrs_tbf_nid_rule_init,
1270         .o_rule_dump = nrs_tbf_nid_rule_dump,
1271         .o_rule_match = nrs_tbf_nid_rule_match,
1272         .o_rule_fini = nrs_tbf_nid_rule_fini,
1273 };
1274
1275 static unsigned int
1276 nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1277                  const unsigned int bits)
1278 {
1279         return cfs_hash_djb2_hash(key, strlen(key), bits);
1280 }
1281
1282 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1283 {
1284         struct nrs_tbf_client *cli = hlist_entry(hnode,
1285                                                  struct nrs_tbf_client,
1286                                                  tc_hnode);
1287
1288         return (strcmp(cli->tc_key, key) == 0);
1289 }
1290
1291 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1292 {
1293         struct nrs_tbf_client *cli = hlist_entry(hnode,
1294                                                  struct nrs_tbf_client,
1295                                                  tc_hnode);
1296         return cli->tc_key;
1297 }
1298
1299 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1300 {
1301         struct nrs_tbf_client *cli = hlist_entry(hnode,
1302                                                  struct nrs_tbf_client,
1303                                                  tc_hnode);
1304
1305         refcount_inc(&cli->tc_ref);
1306 }
1307
1308 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1309 {
1310         struct nrs_tbf_client *cli = hlist_entry(hnode,
1311                                                  struct nrs_tbf_client,
1312                                                  tc_hnode);
1313
1314         refcount_dec(&cli->tc_ref);
1315 }
1316
1317 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1318
1319 {
1320         struct nrs_tbf_client *cli = hlist_entry(hnode,
1321                                                  struct nrs_tbf_client,
1322                                                  tc_hnode);
1323
1324         nrs_tbf_cli_fini(cli);
1325 }
1326
1327 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1328         .hs_hash        = nrs_tbf_hop_hash,
1329         .hs_keycmp      = nrs_tbf_hop_keycmp,
1330         .hs_key         = nrs_tbf_hop_key,
1331         .hs_object      = nrs_tbf_hop_object,
1332         .hs_get         = nrs_tbf_hop_get,
1333         .hs_put         = nrs_tbf_hop_put,
1334         .hs_put_locked  = nrs_tbf_hop_put,
1335         .hs_exit        = nrs_tbf_hop_exit,
1336 };
1337
1338 #define NRS_TBF_GENERIC_BKT_BITS        10
1339 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1340                                         CFS_HASH_NO_ITEMREF | \
1341                                         CFS_HASH_DEPTH)
1342
1343 static int
1344 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1345 {
1346         struct nrs_tbf_cmd       start;
1347         struct nrs_tbf_bucket   *bkt;
1348         int                      bits;
1349         int                      i;
1350         int                      rc;
1351         struct cfs_hash_bd       bd;
1352
1353         bits = nrs_tbf_jobid_hash_order();
1354         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1355                 bits = NRS_TBF_GENERIC_BKT_BITS;
1356         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1357                                             bits, bits,
1358                                             NRS_TBF_GENERIC_BKT_BITS,
1359                                             sizeof(*bkt), 0, 0,
1360                                             &nrs_tbf_hash_ops,
1361                                             NRS_TBF_GENERIC_HASH_FLAGS);
1362         if (head->th_cli_hash == NULL)
1363                 return -ENOMEM;
1364
1365         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1366                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1367                 INIT_LIST_HEAD(&bkt->ntb_lru);
1368         }
1369
1370         memset(&start, 0, sizeof(start));
1371         start.u.tc_start.ts_conds_str = "*";
1372
1373         start.u.tc_start.ts_rpc_rate = tbf_rate;
1374         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1375         start.tc_name = NRS_TBF_DEFAULT_RULE;
1376         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1377         rc = nrs_tbf_rule_start(policy, head, &start);
1378         if (rc)
1379                 cfs_hash_putref(head->th_cli_hash);
1380
1381         return rc;
1382 }
1383
1384 static struct nrs_tbf_client *
1385 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1386                         const char *key)
1387 {
1388         struct hlist_node *hnode;
1389         struct nrs_tbf_client *cli;
1390
1391         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1392         if (hnode == NULL)
1393                 return NULL;
1394
1395         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
1396         if (!list_empty(&cli->tc_lru))
1397                 list_del_init(&cli->tc_lru);
1398         return cli;
1399 }
1400
1401 /**
1402  * ONLY opcode presented in this function will be checked in
1403  * nrs_tbf_id_cli_set(). That means, we can add or remove an
1404  * opcode to enable or disable requests handled in nrs_tbf
1405  */
1406 static struct req_format *req_fmt(__u32 opcode)
1407 {
1408         switch (opcode) {
1409         case OST_GETATTR:
1410                 return &RQF_OST_GETATTR;
1411         case OST_SETATTR:
1412                 return &RQF_OST_SETATTR;
1413         case OST_READ:
1414                 return &RQF_OST_BRW_READ;
1415         case OST_WRITE:
1416                 return &RQF_OST_BRW_WRITE;
1417         /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1418          * in most case. Should they be removed? */
1419         case OST_CREATE:
1420                 return &RQF_OST_CREATE;
1421         case OST_DESTROY:
1422                 return &RQF_OST_DESTROY;
1423         case OST_PUNCH:
1424                 return &RQF_OST_PUNCH;
1425         case OST_SYNC:
1426                 return &RQF_OST_SYNC;
1427         case OST_LADVISE:
1428                 return &RQF_OST_LADVISE;
1429         case MDS_GETATTR:
1430                 return &RQF_MDS_GETATTR;
1431         case MDS_GETATTR_NAME:
1432                 return &RQF_MDS_GETATTR_NAME;
1433         /* close is skipped to avoid LDLM cancel slowness */
1434 #if 0
1435         case MDS_CLOSE:
1436                 return &RQF_MDS_CLOSE;
1437 #endif
1438         case MDS_REINT:
1439                 return &RQF_MDS_REINT;
1440         case MDS_READPAGE:
1441                 return &RQF_MDS_READPAGE;
1442         case MDS_GET_ROOT:
1443                 return &RQF_MDS_GET_ROOT;
1444         case MDS_STATFS:
1445                 return &RQF_MDS_STATFS;
1446         case MDS_SYNC:
1447                 return &RQF_MDS_SYNC;
1448         case MDS_QUOTACTL:
1449                 return &RQF_MDS_QUOTACTL;
1450         case MDS_GETXATTR:
1451                 return &RQF_MDS_GETXATTR;
1452         case MDS_GET_INFO:
1453                 return &RQF_MDS_GET_INFO;
1454         /* HSM op is skipped */
1455 #if 0 
1456         case MDS_HSM_STATE_GET:
1457                 return &RQF_MDS_HSM_STATE_GET;
1458         case MDS_HSM_STATE_SET:
1459                 return &RQF_MDS_HSM_STATE_SET;
1460         case MDS_HSM_ACTION:
1461                 return &RQF_MDS_HSM_ACTION;
1462         case MDS_HSM_CT_REGISTER:
1463                 return &RQF_MDS_HSM_CT_REGISTER;
1464         case MDS_HSM_CT_UNREGISTER:
1465                 return &RQF_MDS_HSM_CT_UNREGISTER;
1466 #endif
1467         case MDS_SWAP_LAYOUTS:
1468                 return &RQF_MDS_SWAP_LAYOUTS;
1469         case LDLM_ENQUEUE:
1470                 return &RQF_LDLM_ENQUEUE;
1471         default:
1472                 return NULL;
1473         }
1474 }
1475
1476 static struct req_format *intent_req_fmt(__u32 it_opc)
1477 {
1478         if (it_opc & (IT_OPEN | IT_CREAT))
1479                 return &RQF_LDLM_INTENT_OPEN;
1480         else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1481                 return &RQF_LDLM_INTENT_GETATTR;
1482         else if (it_opc & IT_GETXATTR)
1483                 return &RQF_LDLM_INTENT_GETXATTR;
1484         else if (it_opc & (IT_GLIMPSE | IT_BRW))
1485                 return &RQF_LDLM_INTENT;
1486         else
1487                 return NULL;
1488 }
1489
1490 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1491                               struct tbf_id *id)
1492 {
1493         struct ost_body *body;
1494
1495         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1496         if (body != NULL) {
1497                 id->ti_uid = body->oa.o_uid;
1498                 id->ti_gid = body->oa.o_gid;
1499                 return 0;
1500         }
1501
1502         return -EINVAL;
1503 }
1504
1505 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1506                                       struct tbf_id *id)
1507 {
1508         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1509                                                     &RMF_MDT_BODY);
1510         LASSERT(b != NULL);
1511
1512         /* TODO: nodemaping feature converts {ug}id from individual
1513          * clients to the actual ones of the file system. Some work
1514          * may be needed to fix this. */
1515         id->ti_uid = b->mbo_uid;
1516         id->ti_gid = b->mbo_gid;
1517 }
1518
1519 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1520                                            struct tbf_id *id)
1521 {
1522         struct mdt_rec_reint *rec;
1523
1524         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1525         LASSERT(rec != NULL);
1526
1527         /* use the fs{ug}id as {ug}id of the process */
1528         id->ti_uid = rec->rr_fsuid;
1529         id->ti_gid = rec->rr_fsgid;
1530 }
1531
1532 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1533                               struct tbf_id *id)
1534 {
1535         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1536         int rc = 0;
1537
1538         switch (opc) {
1539         case MDS_GETATTR:
1540         case MDS_GETATTR_NAME:
1541         case MDS_GET_ROOT:
1542         case MDS_READPAGE:
1543         case MDS_SYNC:
1544         case MDS_GETXATTR:
1545         case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1546                 unpack_ugid_from_mdt_body(req, id);
1547                 break;
1548         case MDS_CLOSE:
1549         case MDS_REINT:
1550                 unpack_ugid_from_mdt_rec_reint(req, id);
1551                 break;
1552         default:
1553                 rc = -EINVAL;
1554                 break;
1555         }
1556         return rc;
1557 }
1558
1559 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1560                               struct tbf_id *id)
1561 {
1562         struct ldlm_intent *lit;
1563         struct req_format *fmt;
1564
1565         if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1566                 return -EINVAL;
1567
1568         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1569         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1570         if (lit == NULL)
1571                 return -EINVAL;
1572
1573         fmt = intent_req_fmt(lit->opc);
1574         if (fmt == NULL)
1575                 return -EINVAL;
1576
1577         req_capsule_extend(&req->rq_pill, fmt);
1578
1579         if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1580                 unpack_ugid_from_mdt_body(req, id);
1581         else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1582                 unpack_ugid_from_mdt_rec_reint(req, id);
1583         else
1584                 return -EINVAL;
1585         return 0;
1586 }
1587
1588 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1589                               enum nrs_tbf_flag ti_type)
1590 {
1591         u32 opc;
1592         struct req_format *fmt;
1593         const struct req_format *old_fmt;
1594         int rc;
1595
1596         memset(id, 0, sizeof(struct tbf_id));
1597         id->ti_type = ti_type;
1598
1599         rc = lustre_msg_get_uid_gid(req->rq_reqmsg, &id->ti_uid, &id->ti_gid);
1600         if (!rc && id->ti_uid != (u32) -1 && id->ti_gid != (u32) -1)
1601                 return 0;
1602
1603         /* client req doesn't have uid/gid pack in ptlrpc_body
1604          * --> fallback to the old method
1605          */
1606         opc = lustre_msg_get_opc(req->rq_reqmsg);
1607         fmt = req_fmt(opc);
1608         if (fmt == NULL)
1609                 return -EINVAL;
1610
1611         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1612         old_fmt = req->rq_pill.rc_fmt;
1613         if (old_fmt == NULL)
1614                 req_capsule_set(&req->rq_pill, fmt);
1615
1616         if (opc < OST_LAST_OPC)
1617                 rc = ost_tbf_id_cli_set(req, id);
1618         else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1619                 rc = mdt_tbf_id_cli_set(req, id);
1620         else if (opc == LDLM_ENQUEUE)
1621                 rc = ldlm_tbf_id_cli_set(req, id);
1622         else
1623                 rc = -EINVAL;
1624
1625         /* restore it to the original state */
1626         if (req->rq_pill.rc_fmt != old_fmt)
1627                 req->rq_pill.rc_fmt = old_fmt;
1628         return rc;
1629 }
1630
1631 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1632                                        struct ptlrpc_request *req,
1633                                        char *keystr, size_t keystr_sz)
1634 {
1635         const char *jobid;
1636         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1637         struct tbf_id id;
1638
1639         nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1640         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1641         if (jobid == NULL)
1642                 jobid = NRS_TBF_JOBID_NULL;
1643
1644         snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1645                  libcfs_nidstr(&req->rq_peer.nid), opc, id.ti_uid,
1646                  id.ti_gid);
1647
1648         if (cli) {
1649                 INIT_LIST_HEAD(&cli->tc_lru);
1650                 strscpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1651                 strscpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1652                 cli->tc_nid = req->rq_peer.nid;
1653                 cli->tc_opcode = opc;
1654                 cli->tc_id = id;
1655         }
1656 }
1657
1658 static struct nrs_tbf_client *
1659 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1660 {
1661         struct nrs_tbf_client *cli;
1662         struct cfs_hash *hs = head->th_cli_hash;
1663         struct cfs_hash_bd bd;
1664         char keystr[NRS_TBF_KEY_LEN];
1665
1666         nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1667         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1668         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1669         cfs_hash_bd_unlock(hs, &bd, 1);
1670
1671         return cli;
1672 }
1673
1674 static struct nrs_tbf_client *
1675 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1676                     struct nrs_tbf_client *cli)
1677 {
1678         const char              *key;
1679         struct nrs_tbf_client   *ret;
1680         struct cfs_hash         *hs = head->th_cli_hash;
1681         struct cfs_hash_bd       bd;
1682
1683         key = cli->tc_key;
1684         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1685         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1686         if (ret == NULL) {
1687                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1688                 ret = cli;
1689         }
1690         cfs_hash_bd_unlock(hs, &bd, 1);
1691
1692         return ret;
1693 }
1694
1695 static void
1696 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1697 {
1698         struct cfs_hash_bd       bd;
1699         struct cfs_hash         *hs = head->th_cli_hash;
1700         struct nrs_tbf_bucket   *bkt;
1701         int                      hw;
1702         LIST_HEAD(zombies);
1703
1704         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1705         bkt = cfs_hash_bd_extra_get(hs, &bd);
1706         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1707                 return;
1708         LASSERT(list_empty(&cli->tc_lru));
1709         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1710
1711         /**
1712          * Check and purge the LRU, there is at least one client in the LRU.
1713          */
1714         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1715         while (cfs_hash_bd_count_get(&bd) > hw) {
1716                 if (unlikely(list_empty(&bkt->ntb_lru)))
1717                         break;
1718                 cli = list_first_entry(&bkt->ntb_lru,
1719                                        struct nrs_tbf_client,
1720                                        tc_lru);
1721                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1722                 list_move(&cli->tc_lru, &zombies);
1723         }
1724         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1725
1726         while (!list_empty(&zombies)) {
1727                 cli = container_of(zombies.next,
1728                                    struct nrs_tbf_client, tc_lru);
1729                 list_del_init(&cli->tc_lru);
1730                 nrs_tbf_cli_fini(cli);
1731         }
1732 }
1733
1734 static void
1735 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1736                          struct ptlrpc_request *req)
1737 {
1738         char keystr[NRS_TBF_KEY_LEN];
1739
1740         nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1741 }
1742
1743 static void
1744 nrs_tbf_id_list_free(struct list_head *uid_list)
1745 {
1746         struct nrs_tbf_id *nti_id, *n;
1747
1748         list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1749                 list_del_init(&nti_id->nti_linkage);
1750                 OBD_FREE_PTR(nti_id);
1751         }
1752 }
1753
1754 static void
1755 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1756 {
1757         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1758                 expr->te_field < NRS_TBF_FIELD_MAX);
1759         switch (expr->te_field) {
1760         case NRS_TBF_FIELD_NID:
1761                 cfs_free_nidlist(&expr->te_cond);
1762                 break;
1763         case NRS_TBF_FIELD_JOBID:
1764                 nrs_tbf_jobid_list_free(&expr->te_cond);
1765                 break;
1766         case NRS_TBF_FIELD_OPCODE:
1767                 bitmap_free(expr->te_opcodes);
1768                 break;
1769         case NRS_TBF_FIELD_UID:
1770         case NRS_TBF_FIELD_GID:
1771                 nrs_tbf_id_list_free(&expr->te_cond);
1772                 break;
1773         default:
1774                 LBUG();
1775         }
1776         OBD_FREE_PTR(expr);
1777 }
1778
1779 static void
1780 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1781 {
1782         struct nrs_tbf_expression *expression;
1783         struct nrs_tbf_expression *n;
1784
1785         LASSERT(list_empty(&conjunction->tc_linkage));
1786         list_for_each_entry_safe(expression, n,
1787                                  &conjunction->tc_expressions,
1788                                  te_linkage) {
1789                 list_del_init(&expression->te_linkage);
1790                 nrs_tbf_expression_free(expression);
1791         }
1792         OBD_FREE_PTR(conjunction);
1793 }
1794
1795 static void
1796 nrs_tbf_conds_free(struct list_head *cond_list)
1797 {
1798         struct nrs_tbf_conjunction *conjunction;
1799         struct nrs_tbf_conjunction *n;
1800
1801         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1802                 list_del_init(&conjunction->tc_linkage);
1803                 nrs_tbf_conjunction_free(conjunction);
1804         }
1805 }
1806
1807 static void
1808 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1809 {
1810         if (!list_empty(&cmd->u.tc_start.ts_conds))
1811                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1812         if (cmd->u.tc_start.ts_conds_str)
1813                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1814                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1815 }
1816
1817 #define NRS_TBF_DISJUNCTION_DELIM       (",")
1818 #define NRS_TBF_CONJUNCTION_DELIM       ("&")
1819 #define NRS_TBF_EXPRESSION_DELIM        ("=")
1820
1821 static int
1822 nrs_tbf_opcode_list_parse(char *str, unsigned long **bitmaptr);
1823 static int
1824 nrs_tbf_id_list_parse(char *str, struct list_head *id_list,
1825                       enum nrs_tbf_flag tif);
1826
1827 static int
1828 nrs_tbf_expression_parse(char *str, struct list_head *cond_list)
1829 {
1830         struct nrs_tbf_expression *expr;
1831         char *field;
1832         int rc = 0;
1833         int len;
1834
1835         OBD_ALLOC_PTR(expr);
1836         if (expr == NULL)
1837                 return -ENOMEM;
1838
1839         field = strim(strsep(&str, NRS_TBF_EXPRESSION_DELIM));
1840         if (!*field || !str)
1841                 /* No LHS or no '=' sign */
1842                 GOTO(out, rc = -EINVAL);
1843         str = strim(str);
1844         len = strlen(str);
1845         if (len < 2 || str[0] != '{' || str[len-1] != '}')
1846                 /* No {} around RHS */
1847                 GOTO(out, rc = -EINVAL);
1848
1849         /* Skip '{' and '}' */
1850         str[len-1] = '\0';
1851         str += 1;
1852         len -= 2;
1853
1854         if (strcmp(field, "nid") == 0) {
1855                 if (cfs_parse_nidlist(str, &expr->te_cond) < 0)
1856                         GOTO(out, rc = -EINVAL);
1857                 expr->te_field = NRS_TBF_FIELD_NID;
1858         } else if (strcmp(field, "jobid") == 0) {
1859                 if (nrs_tbf_jobid_list_parse(str, &expr->te_cond) < 0)
1860                         GOTO(out, rc = -EINVAL);
1861                 expr->te_field = NRS_TBF_FIELD_JOBID;
1862         } else if (strcmp(field, "opcode") == 0) {
1863                 if (nrs_tbf_opcode_list_parse(str, &expr->te_opcodes) < 0)
1864                         GOTO(out, rc = -EINVAL);
1865                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1866         } else if (strcmp(field, "uid") == 0) {
1867                 if (nrs_tbf_id_list_parse(str, &expr->te_cond,
1868                                           NRS_TBF_FLAG_UID) < 0)
1869                         GOTO(out, rc = -EINVAL);
1870                 expr->te_field = NRS_TBF_FIELD_UID;
1871         } else if (strcmp(field, "gid") == 0) {
1872                 if (nrs_tbf_id_list_parse(str, &expr->te_cond,
1873                                           NRS_TBF_FLAG_GID) < 0)
1874                         GOTO(out, rc = -EINVAL);
1875                 expr->te_field = NRS_TBF_FIELD_GID;
1876         } else {
1877                 GOTO(out, rc = -EINVAL);
1878         }
1879
1880         list_add_tail(&expr->te_linkage, cond_list);
1881         return 0;
1882 out:
1883         OBD_FREE_PTR(expr);
1884         return rc;
1885 }
1886
1887 static int
1888 nrs_tbf_conjunction_parse(char *str, struct list_head *cond_list)
1889 {
1890         struct nrs_tbf_conjunction *conjunction;
1891         int rc = 0;
1892
1893         OBD_ALLOC_PTR(conjunction);
1894         if (conjunction == NULL)
1895                 return -ENOMEM;
1896
1897         INIT_LIST_HEAD(&conjunction->tc_expressions);
1898         list_add_tail(&conjunction->tc_linkage, cond_list);
1899
1900         while (str && !rc) {
1901                 char *expr = strsep(&str, NRS_TBF_CONJUNCTION_DELIM);
1902
1903                 rc = nrs_tbf_expression_parse(expr,
1904                                               &conjunction->tc_expressions);
1905         }
1906         return rc;
1907 }
1908
1909 static int
1910 nrs_tbf_conds_parse(char *orig, struct list_head *cond_list)
1911 {
1912         char *str;
1913         int rc = 0;
1914
1915         orig = kstrdup(orig, GFP_KERNEL);
1916         if (!orig)
1917                 return -ENOMEM;
1918         str = orig;
1919
1920         INIT_LIST_HEAD(cond_list);
1921         while (str && !rc) {
1922                 char *term = strsep(&str, NRS_TBF_DISJUNCTION_DELIM);
1923
1924                 rc = nrs_tbf_conjunction_parse(term, cond_list);
1925         }
1926         kfree(orig);
1927
1928         return rc;
1929 }
1930
1931 static int
1932 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1933 {
1934         int rc;
1935
1936         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1937         if (cmd->u.tc_start.ts_conds_str == NULL)
1938                 return -ENOMEM;
1939
1940         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1941
1942         /* Parse hybird NID and JOBID conditions */
1943         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1944                                  &cmd->u.tc_start.ts_conds);
1945         if (rc)
1946                 nrs_tbf_generic_cmd_fini(cmd);
1947
1948         return rc;
1949 }
1950
1951 static int
1952 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1953
1954 static int
1955 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1956                          struct nrs_tbf_rule *rule,
1957                          struct nrs_tbf_client *cli)
1958 {
1959         switch (expr->te_field) {
1960         case NRS_TBF_FIELD_NID:
1961                 return cfs_match_nid(&cli->tc_nid, &expr->te_cond);
1962         case NRS_TBF_FIELD_JOBID:
1963                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1964         case NRS_TBF_FIELD_OPCODE:
1965                 return test_bit(cli->tc_opcode, expr->te_opcodes);
1966         case NRS_TBF_FIELD_UID:
1967         case NRS_TBF_FIELD_GID:
1968                 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1969         default:
1970                 return 0;
1971         }
1972 }
1973
1974 static int
1975 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1976                           struct nrs_tbf_rule *rule,
1977                           struct nrs_tbf_client *cli)
1978 {
1979         struct nrs_tbf_expression *expr;
1980         int matched;
1981
1982         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
1983                 matched = nrs_tbf_expression_match(expr, rule, cli);
1984                 if (!matched)
1985                         return 0;
1986         }
1987
1988         return 1;
1989 }
1990
1991 static int
1992 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
1993 {
1994         struct nrs_tbf_conjunction *conjunction;
1995         int matched;
1996
1997         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
1998                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
1999                 if (matched)
2000                         return 1;
2001         }
2002
2003         return 0;
2004 }
2005
2006 static void
2007 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2008 {
2009         if (!list_empty(&rule->tr_conds))
2010                 nrs_tbf_conds_free(&rule->tr_conds);
2011         LASSERT(rule->tr_conds_str != NULL);
2012         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2013 }
2014
2015 static int
2016 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2017                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2018 {
2019         int rc = 0;
2020
2021         LASSERT(start->u.tc_start.ts_conds_str);
2022         OBD_ALLOC(rule->tr_conds_str,
2023                   strlen(start->u.tc_start.ts_conds_str) + 1);
2024         if (rule->tr_conds_str == NULL)
2025                 return -ENOMEM;
2026
2027         memcpy(rule->tr_conds_str,
2028                start->u.tc_start.ts_conds_str,
2029                strlen(start->u.tc_start.ts_conds_str));
2030
2031         INIT_LIST_HEAD(&rule->tr_conds);
2032         if (!list_empty(&start->u.tc_start.ts_conds)) {
2033                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2034                                          &rule->tr_conds);
2035         }
2036         if (rc)
2037                 nrs_tbf_generic_rule_fini(rule);
2038
2039         return rc;
2040 }
2041
2042 static int
2043 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2044 {
2045         seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
2046                    rule->tr_conds_str, rule->tr_rpc_rate,
2047                    atomic_read(&rule->tr_ref) - 1);
2048         return 0;
2049 }
2050
2051 static int
2052 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2053                            struct nrs_tbf_client *cli)
2054 {
2055         return nrs_tbf_cond_match(rule, cli);
2056 }
2057
2058 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2059         .o_name = NRS_TBF_TYPE_GENERIC,
2060         .o_startup = nrs_tbf_startup,
2061         .o_cli_find = nrs_tbf_cli_find,
2062         .o_cli_findadd = nrs_tbf_cli_findadd,
2063         .o_cli_put = nrs_tbf_cli_put,
2064         .o_cli_init = nrs_tbf_generic_cli_init,
2065         .o_rule_init = nrs_tbf_rule_init,
2066         .o_rule_dump = nrs_tbf_generic_rule_dump,
2067         .o_rule_match = nrs_tbf_generic_rule_match,
2068         .o_rule_fini = nrs_tbf_generic_rule_fini,
2069 };
2070
2071 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2072 {
2073         if (rule->tr_opcodes)
2074                 bitmap_free(rule->tr_opcodes);
2075
2076         LASSERT(rule->tr_opcodes_str != NULL);
2077         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2078 }
2079
2080 static unsigned int
2081 nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2082                         const unsigned int bits)
2083 {
2084         /* XXX did hash needs ? */
2085         return cfs_hash_djb2_hash(key, sizeof(__u32), bits);
2086 }
2087
2088 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2089 {
2090         const __u32     *opc = key;
2091         struct nrs_tbf_client *cli = hlist_entry(hnode,
2092                                                  struct nrs_tbf_client,
2093                                                  tc_hnode);
2094
2095         return *opc == cli->tc_opcode;
2096 }
2097
2098 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2099 {
2100         struct nrs_tbf_client *cli = hlist_entry(hnode,
2101                                                  struct nrs_tbf_client,
2102                                                  tc_hnode);
2103
2104         return &cli->tc_opcode;
2105 }
2106
2107 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2108                                    struct hlist_node *hnode)
2109 {
2110         struct nrs_tbf_client *cli = hlist_entry(hnode,
2111                                                  struct nrs_tbf_client,
2112                                                  tc_hnode);
2113
2114         refcount_inc(&cli->tc_ref);
2115 }
2116
2117 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2118                                    struct hlist_node *hnode)
2119 {
2120         struct nrs_tbf_client *cli = hlist_entry(hnode,
2121                                                  struct nrs_tbf_client,
2122                                                  tc_hnode);
2123
2124         refcount_dec(&cli->tc_ref);
2125 }
2126
2127 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2128                                     struct hlist_node *hnode)
2129 {
2130         struct nrs_tbf_client *cli = hlist_entry(hnode,
2131                                                  struct nrs_tbf_client,
2132                                                  tc_hnode);
2133
2134         CDEBUG(D_RPCTRACE,
2135                "Busy TBF object from client with opcode %s, with %d refs\n",
2136                ll_opcode2str(cli->tc_opcode), refcount_read(&cli->tc_ref));
2137
2138         nrs_tbf_cli_fini(cli);
2139 }
2140 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2141         .hs_hash        = nrs_tbf_opcode_hop_hash,
2142         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
2143         .hs_key         = nrs_tbf_opcode_hop_key,
2144         .hs_object      = nrs_tbf_hop_object,
2145         .hs_get         = nrs_tbf_opcode_hop_get,
2146         .hs_put         = nrs_tbf_opcode_hop_put,
2147         .hs_put_locked  = nrs_tbf_opcode_hop_put,
2148         .hs_exit        = nrs_tbf_opcode_hop_exit,
2149 };
2150
2151 static int
2152 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2153                     struct nrs_tbf_head *head)
2154 {
2155         struct nrs_tbf_cmd      start = { 0 };
2156         int rc;
2157
2158         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2159                                             NRS_TBF_NID_BITS,
2160                                             NRS_TBF_NID_BITS,
2161                                             NRS_TBF_NID_BKT_BITS, 0,
2162                                             CFS_HASH_MIN_THETA,
2163                                             CFS_HASH_MAX_THETA,
2164                                             &nrs_tbf_opcode_hash_ops,
2165                                             CFS_HASH_RW_BKTLOCK);
2166         if (head->th_cli_hash == NULL)
2167                 return -ENOMEM;
2168
2169         start.u.tc_start.ts_opcodes_str = "*";
2170
2171         start.u.tc_start.ts_rpc_rate = tbf_rate;
2172         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2173         start.tc_name = NRS_TBF_DEFAULT_RULE;
2174         rc = nrs_tbf_rule_start(policy, head, &start);
2175
2176         return rc;
2177 }
2178
2179 static struct nrs_tbf_client *
2180 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2181                         struct ptlrpc_request *req)
2182 {
2183         __u32 opc;
2184
2185         opc = lustre_msg_get_opc(req->rq_reqmsg);
2186         return cfs_hash_lookup(head->th_cli_hash, &opc);
2187 }
2188
2189 static struct nrs_tbf_client *
2190 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2191                            struct nrs_tbf_client *cli)
2192 {
2193         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2194                                        &cli->tc_hnode);
2195 }
2196
2197 static void
2198 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2199                         struct ptlrpc_request *req)
2200 {
2201         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2202 }
2203
2204 #define MAX_OPCODE_LEN  32
2205 static int
2206 nrs_tbf_opcode_set_bit(char *id, unsigned long *opcodes)
2207 {
2208         int op;
2209
2210         op = ll_str2opcode(id);
2211         if (op < 0)
2212                 return -EINVAL;
2213
2214         set_bit(op, opcodes);
2215         return 0;
2216 }
2217
2218 static int
2219 nrs_tbf_opcode_list_parse(char *orig, unsigned long **bitmaptr)
2220 {
2221         unsigned long *opcodes;
2222         char *str;
2223         int cnt = 0;
2224         int rc = 0;
2225
2226         ENTRY;
2227         orig = kstrdup(orig, GFP_KERNEL);
2228         if (!orig)
2229                 return -ENOMEM;
2230         opcodes = bitmap_zalloc(LUSTRE_MAX_OPCODES, GFP_KERNEL);
2231         if (!opcodes) {
2232                 kfree(orig);
2233                 return -ENOMEM;
2234         }
2235         str = orig;
2236         while (str && rc == 0) {
2237                 char *tok = strsep(&str, " ");
2238
2239                 if (*tok) {
2240                         rc = nrs_tbf_opcode_set_bit(tok, opcodes);
2241                         cnt += 1;
2242                 }
2243         }
2244         if (cnt == 0)
2245                 rc = -EINVAL;
2246
2247         kfree(orig);
2248         if (rc == 0 && bitmaptr)
2249                 *bitmaptr = opcodes;
2250         else
2251                 bitmap_free(opcodes);
2252
2253         RETURN(rc);
2254 }
2255
2256 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2257 {
2258         if (cmd->u.tc_start.ts_opcodes_str)
2259                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2260                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2261
2262 }
2263
2264 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2265 {
2266         int rc;
2267
2268         rc = nrs_tbf_check_id_value(&id, "opcode");
2269         if (rc)
2270                 return rc;
2271
2272         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, strlen(id) + 1);
2273         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2274                 return -ENOMEM;
2275
2276         strcpy(cmd->u.tc_start.ts_opcodes_str, id);
2277
2278         /* parse opcode list */
2279         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str, NULL);
2280         if (rc)
2281                 nrs_tbf_opcode_cmd_fini(cmd);
2282
2283         return rc;
2284 }
2285
2286 static int
2287 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2288                           struct nrs_tbf_client *cli)
2289 {
2290         if (rule->tr_opcodes == NULL)
2291                 return 0;
2292
2293         return test_bit(cli->tc_opcode, rule->tr_opcodes);
2294 }
2295
2296 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2297                                     struct nrs_tbf_rule *rule,
2298                                     struct nrs_tbf_cmd *start)
2299 {
2300         int rc = 0;
2301
2302         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2303         OBD_ALLOC(rule->tr_opcodes_str,
2304                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2305         if (rule->tr_opcodes_str == NULL)
2306                 return -ENOMEM;
2307
2308         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2309                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2310
2311         /* Default rule '*' */
2312         if (strcmp(start->u.tc_start.ts_opcodes_str, "*") == 0)
2313                 return 0;
2314
2315         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2316                                        &rule->tr_opcodes);
2317         if (rc)
2318                 OBD_FREE(rule->tr_opcodes_str,
2319                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2320
2321         return rc;
2322 }
2323
2324 static int
2325 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2326 {
2327         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2328                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2329                    atomic_read(&rule->tr_ref) - 1);
2330         return 0;
2331 }
2332
2333
2334 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2335         .o_name = NRS_TBF_TYPE_OPCODE,
2336         .o_startup = nrs_tbf_opcode_startup,
2337         .o_cli_find = nrs_tbf_opcode_cli_find,
2338         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2339         .o_cli_put = nrs_tbf_nid_cli_put,
2340         .o_cli_init = nrs_tbf_opcode_cli_init,
2341         .o_rule_init = nrs_tbf_opcode_rule_init,
2342         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2343         .o_rule_match = nrs_tbf_opcode_rule_match,
2344         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2345 };
2346
2347 static unsigned int
2348 nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2349                     const unsigned int bits)
2350 {
2351         return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), bits);
2352 }
2353
2354 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2355 {
2356         const struct tbf_id *opc = key;
2357         enum nrs_tbf_flag ntf;
2358         struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2359                                                  tc_hnode);
2360         ntf = opc->ti_type & cli->tc_id.ti_type;
2361         if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2362                 return 0;
2363
2364         if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2365                 return 0;
2366
2367         return 1;
2368 }
2369
2370 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2371 {
2372         struct nrs_tbf_client *cli = hlist_entry(hnode,
2373                                                  struct nrs_tbf_client,
2374                                                  tc_hnode);
2375         return &cli->tc_id;
2376 }
2377
2378 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2379 {
2380         struct nrs_tbf_client *cli = hlist_entry(hnode,
2381                                                  struct nrs_tbf_client,
2382                                                  tc_hnode);
2383
2384         refcount_inc(&cli->tc_ref);
2385 }
2386
2387 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2388 {
2389         struct nrs_tbf_client *cli = hlist_entry(hnode,
2390                                                  struct nrs_tbf_client,
2391                                                  tc_hnode);
2392
2393         refcount_dec(&cli->tc_ref);
2394 }
2395
2396 static void
2397 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2398
2399 {
2400         struct nrs_tbf_client *cli = hlist_entry(hnode,
2401                                                  struct nrs_tbf_client,
2402                                                  tc_hnode);
2403
2404         nrs_tbf_cli_fini(cli);
2405 }
2406
2407 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2408         .hs_hash        = nrs_tbf_id_hop_hash,
2409         .hs_keycmp      = nrs_tbf_id_hop_keycmp,
2410         .hs_key         = nrs_tbf_id_hop_key,
2411         .hs_object      = nrs_tbf_hop_object,
2412         .hs_get         = nrs_tbf_id_hop_get,
2413         .hs_put         = nrs_tbf_id_hop_put,
2414         .hs_put_locked  = nrs_tbf_id_hop_put,
2415         .hs_exit        = nrs_tbf_id_hop_exit,
2416 };
2417
2418 static int
2419 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2420                    struct nrs_tbf_head *head)
2421 {
2422         struct nrs_tbf_cmd start;
2423         int rc;
2424
2425         head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2426                                             NRS_TBF_NID_BITS,
2427                                             NRS_TBF_NID_BITS,
2428                                             NRS_TBF_NID_BKT_BITS, 0,
2429                                             CFS_HASH_MIN_THETA,
2430                                             CFS_HASH_MAX_THETA,
2431                                             &nrs_tbf_id_hash_ops,
2432                                             CFS_HASH_RW_BKTLOCK);
2433         if (head->th_cli_hash == NULL)
2434                 return -ENOMEM;
2435
2436         memset(&start, 0, sizeof(start));
2437         start.u.tc_start.ts_ids_str = "*";
2438         start.u.tc_start.ts_rpc_rate = tbf_rate;
2439         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2440         start.tc_name = NRS_TBF_DEFAULT_RULE;
2441         INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2442         rc = nrs_tbf_rule_start(policy, head, &start);
2443         if (rc) {
2444                 cfs_hash_putref(head->th_cli_hash);
2445                 head->th_cli_hash = NULL;
2446         }
2447
2448         return rc;
2449 }
2450
2451 static struct nrs_tbf_client *
2452 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2453                     struct ptlrpc_request *req)
2454 {
2455         struct tbf_id id;
2456
2457         LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2458                 head->th_type_flag == NRS_TBF_FLAG_GID);
2459
2460         nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2461         return cfs_hash_lookup(head->th_cli_hash, &id);
2462 }
2463
2464 static struct nrs_tbf_client *
2465 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2466                        struct nrs_tbf_client *cli)
2467 {
2468         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2469                                        &cli->tc_hnode);
2470 }
2471
2472 static void
2473 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2474                      struct ptlrpc_request *req)
2475 {
2476         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2477 }
2478
2479 static void
2480 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2481                      struct ptlrpc_request *req)
2482 {
2483         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2484 }
2485
2486 static int
2487 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2488 {
2489         struct nrs_tbf_id *nti_id;
2490         enum nrs_tbf_flag flag;
2491
2492         list_for_each_entry(nti_id, id_list, nti_linkage) {
2493                 flag = id.ti_type & nti_id->nti_id.ti_type;
2494                 if (!flag)
2495                         continue;
2496
2497                 if ((flag & NRS_TBF_FLAG_UID) &&
2498                     (id.ti_uid != nti_id->nti_id.ti_uid))
2499                         continue;
2500
2501                 if ((flag & NRS_TBF_FLAG_GID) &&
2502                     (id.ti_gid != nti_id->nti_id.ti_gid))
2503                         continue;
2504
2505                 return 1;
2506         }
2507         return 0;
2508 }
2509
2510 static int
2511 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2512                       struct nrs_tbf_client *cli)
2513 {
2514         return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2515 }
2516
2517 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2518 {
2519         nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2520
2521         if (cmd->u.tc_start.ts_ids_str)
2522                 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2523                          strlen(cmd->u.tc_start.ts_ids_str) + 1);
2524 }
2525
2526 static int
2527 nrs_tbf_id_list_parse(char *orig, struct list_head *id_list,
2528                       enum nrs_tbf_flag tif)
2529 {
2530         int rc = 0;
2531         unsigned long val;
2532         char *str;
2533         struct tbf_id id = { 0 };
2534         ENTRY;
2535
2536         if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2537                 RETURN(-EINVAL);
2538
2539         orig = kstrdup(orig, GFP_KERNEL);
2540         if (!orig)
2541                 return -ENOMEM;
2542
2543         INIT_LIST_HEAD(id_list);
2544         for (str = orig; str ; ) {
2545                 struct nrs_tbf_id *nti_id;
2546                 char *tok;
2547
2548                 tok = strsep(&str, " ");
2549                 if (!*tok)
2550                         /* Empty token - leading, trailing, or
2551                          * multiple spaces in list
2552                          */
2553                         continue;
2554
2555                 id.ti_type = tif;
2556                 rc = kstrtoul(tok, 0, &val);
2557                 if (rc < 0)
2558                         GOTO(out, rc = -EINVAL);
2559                 if (tif == NRS_TBF_FLAG_UID)
2560                         id.ti_uid = val;
2561                 else
2562                         id.ti_gid = val;
2563
2564                 OBD_ALLOC_PTR(nti_id);
2565                 if (nti_id == NULL)
2566                         GOTO(out, rc = -ENOMEM);
2567
2568                 nti_id->nti_id = id;
2569                 list_add_tail(&nti_id->nti_linkage, id_list);
2570         }
2571         if (list_empty(id_list))
2572                 /* Only white space in the list */
2573                 GOTO(out, rc = -EINVAL);
2574 out:
2575         kfree(orig);
2576         if (rc)
2577                 nrs_tbf_id_list_free(id_list);
2578         RETURN(rc);
2579 }
2580
2581 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2582 {
2583         int rc;
2584         enum nrs_tbf_flag tif;
2585
2586         tif = cmd->u.tc_start.ts_valid_type;
2587
2588         rc = nrs_tbf_check_id_value(&id,
2589                                     tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2590         if (rc)
2591                 return rc;
2592
2593         OBD_ALLOC(cmd->u.tc_start.ts_ids_str, strlen(id) + 1);
2594         if (cmd->u.tc_start.ts_ids_str == NULL)
2595                 return -ENOMEM;
2596
2597         strcpy(cmd->u.tc_start.ts_ids_str, id);
2598
2599         rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2600                                    &cmd->u.tc_start.ts_ids, tif);
2601         if (rc)
2602                 nrs_tbf_id_cmd_fini(cmd);
2603
2604         return rc;
2605 }
2606
2607 static int
2608 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2609                      struct nrs_tbf_rule *rule,
2610                      struct nrs_tbf_cmd *start)
2611 {
2612         struct nrs_tbf_head *head = rule->tr_head;
2613         int rc = 0;
2614         enum nrs_tbf_flag tif = head->th_type_flag;
2615         int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2616
2617         LASSERT(start->u.tc_start.ts_ids_str);
2618         INIT_LIST_HEAD(&rule->tr_ids);
2619
2620         OBD_ALLOC(rule->tr_ids_str, ids_len);
2621         if (rule->tr_ids_str == NULL)
2622                 return -ENOMEM;
2623
2624         strscpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2625                 ids_len);
2626
2627         if (!list_empty(&start->u.tc_start.ts_ids)) {
2628                 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2629                                            &rule->tr_ids, tif);
2630                 if (rc)
2631                         CERROR("%ss {%s} illegal\n",
2632                                tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2633                                rule->tr_ids_str);
2634         }
2635         if (rc) {
2636                 OBD_FREE(rule->tr_ids_str, ids_len);
2637                 rule->tr_ids_str = NULL;
2638         }
2639         return rc;
2640 }
2641
2642 static int
2643 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2644 {
2645         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2646                    rule->tr_ids_str, rule->tr_rpc_rate,
2647                    atomic_read(&rule->tr_ref) - 1);
2648         return 0;
2649 }
2650
2651 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2652 {
2653         nrs_tbf_id_list_free(&rule->tr_ids);
2654         if (rule->tr_ids_str != NULL)
2655                 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2656 }
2657
2658 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2659         .o_name = NRS_TBF_TYPE_UID,
2660         .o_startup = nrs_tbf_id_startup,
2661         .o_cli_find = nrs_tbf_id_cli_find,
2662         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2663         .o_cli_put = nrs_tbf_nid_cli_put,
2664         .o_cli_init = nrs_tbf_uid_cli_init,
2665         .o_rule_init = nrs_tbf_id_rule_init,
2666         .o_rule_dump = nrs_tbf_id_rule_dump,
2667         .o_rule_match = nrs_tbf_id_rule_match,
2668         .o_rule_fini = nrs_tbf_id_rule_fini,
2669 };
2670
2671 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2672         .o_name = NRS_TBF_TYPE_GID,
2673         .o_startup = nrs_tbf_id_startup,
2674         .o_cli_find = nrs_tbf_id_cli_find,
2675         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2676         .o_cli_put = nrs_tbf_nid_cli_put,
2677         .o_cli_init = nrs_tbf_gid_cli_init,
2678         .o_rule_init = nrs_tbf_id_rule_init,
2679         .o_rule_dump = nrs_tbf_id_rule_dump,
2680         .o_rule_match = nrs_tbf_id_rule_match,
2681         .o_rule_fini = nrs_tbf_id_rule_fini,
2682 };
2683
2684 static struct nrs_tbf_type nrs_tbf_types[] = {
2685         {
2686                 .ntt_name = NRS_TBF_TYPE_JOBID,
2687                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2688                 .ntt_ops = &nrs_tbf_jobid_ops,
2689         },
2690         {
2691                 .ntt_name = NRS_TBF_TYPE_NID,
2692                 .ntt_flag = NRS_TBF_FLAG_NID,
2693                 .ntt_ops = &nrs_tbf_nid_ops,
2694         },
2695         {
2696                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2697                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2698                 .ntt_ops = &nrs_tbf_opcode_ops,
2699         },
2700         {
2701                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2702                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2703                 .ntt_ops = &nrs_tbf_generic_ops,
2704         },
2705         {
2706                 .ntt_name = NRS_TBF_TYPE_UID,
2707                 .ntt_flag = NRS_TBF_FLAG_UID,
2708                 .ntt_ops = &nrs_tbf_uid_ops,
2709         },
2710         {
2711                 .ntt_name = NRS_TBF_TYPE_GID,
2712                 .ntt_flag = NRS_TBF_FLAG_GID,
2713                 .ntt_ops = &nrs_tbf_gid_ops,
2714         },
2715 };
2716
2717 /**
2718  * Is called before the policy transitions into
2719  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2720  * policy-specific private data structure.
2721  *
2722  * \param[in] policy The policy to start
2723  *
2724  * \retval -ENOMEM OOM error
2725  * \retval  0      success
2726  *
2727  * \see nrs_policy_register()
2728  * \see nrs_policy_ctl()
2729  */
2730 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2731 {
2732         struct nrs_tbf_head     *head;
2733         struct nrs_tbf_ops      *ops;
2734         __u32                    type;
2735         char                    *name;
2736         int found = 0;
2737         int i;
2738         int rc = 0;
2739
2740         if (arg == NULL)
2741                 name = NRS_TBF_TYPE_GENERIC;
2742         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2743                 name = arg;
2744         else
2745                 GOTO(out, rc = -EINVAL);
2746
2747         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2748                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2749                         ops = nrs_tbf_types[i].ntt_ops;
2750                         type = nrs_tbf_types[i].ntt_flag;
2751                         found = 1;
2752                         break;
2753                 }
2754         }
2755         if (found == 0)
2756                 GOTO(out, rc = -ENOTSUPP);
2757
2758         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2759         if (head == NULL)
2760                 GOTO(out, rc = -ENOMEM);
2761
2762         memcpy(head->th_type, name, strlen(name));
2763         head->th_type[strlen(name)] = '\0';
2764         head->th_ops = ops;
2765         head->th_type_flag = type;
2766
2767         head->th_binheap = binheap_create(&nrs_tbf_heap_ops,
2768                                           CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2769                                           nrs_pol2cptab(policy),
2770                                           nrs_pol2cptid(policy));
2771         if (head->th_binheap == NULL)
2772                 GOTO(out_free_head, rc = -ENOMEM);
2773
2774         atomic_set(&head->th_rule_sequence, 0);
2775         spin_lock_init(&head->th_rule_lock);
2776         INIT_LIST_HEAD(&head->th_list);
2777         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2778         head->th_timer.function = nrs_tbf_timer_cb;
2779         rc = head->th_ops->o_startup(policy, head);
2780         if (rc)
2781                 GOTO(out_free_heap, rc);
2782
2783         policy->pol_private = head;
2784         return 0;
2785 out_free_heap:
2786         binheap_destroy(head->th_binheap);
2787 out_free_head:
2788         OBD_FREE_PTR(head);
2789 out:
2790         return rc;
2791 }
2792
2793 /**
2794  * Is called before the policy transitions into
2795  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2796  * private data structure.
2797  *
2798  * \param[in] policy The policy to stop
2799  *
2800  * \see nrs_policy_stop0()
2801  */
2802 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2803 {
2804         struct nrs_tbf_head *head = policy->pol_private;
2805         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2806         struct nrs_tbf_rule *rule, *n;
2807
2808         LASSERT(head != NULL);
2809         LASSERT(head->th_cli_hash != NULL);
2810         hrtimer_cancel(&head->th_timer);
2811         /* Should cleanup hash first before free rules */
2812         cfs_hash_putref(head->th_cli_hash);
2813         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2814                 list_del_init(&rule->tr_linkage);
2815                 nrs_tbf_rule_put(rule);
2816         }
2817         LASSERT(list_empty(&head->th_list));
2818         LASSERT(head->th_binheap != NULL);
2819         LASSERT(binheap_is_empty(head->th_binheap));
2820         binheap_destroy(head->th_binheap);
2821         OBD_FREE_PTR(head);
2822         nrs->nrs_throttling = 0;
2823         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2824 }
2825
2826 /**
2827  * Performs a policy-specific ctl function on TBF policy instances; similar
2828  * to ioctl.
2829  *
2830  * \param[in]     policy the policy instance
2831  * \param[in]     opc    the opcode
2832  * \param[in,out] arg    used for passing parameters and information
2833  *
2834  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2835  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2836  *
2837  * \retval 0   operation carried out successfully
2838  * \retval -ve error
2839  */
2840 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2841                        enum ptlrpc_nrs_ctl opc,
2842                        void *arg)
2843 {
2844         int rc = 0;
2845         ENTRY;
2846
2847         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2848
2849         switch (opc) {
2850         default:
2851                 RETURN(-EINVAL);
2852
2853         /**
2854          * Read RPC rate size of a policy instance.
2855          */
2856         case NRS_CTL_TBF_RD_RULE: {
2857                 struct nrs_tbf_head *head = policy->pol_private;
2858                 struct seq_file *m = arg;
2859                 struct ptlrpc_service_part *svcpt;
2860
2861                 svcpt = policy->pol_nrs->nrs_svcpt;
2862                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2863
2864                 rc = nrs_tbf_rule_dump_all(head, m);
2865                 }
2866                 break;
2867
2868         /**
2869          * Write RPC rate of a policy instance.
2870          */
2871         case NRS_CTL_TBF_WR_RULE: {
2872                 struct nrs_tbf_head *head = policy->pol_private;
2873                 struct nrs_tbf_cmd *cmd;
2874
2875                 cmd = (struct nrs_tbf_cmd *)arg;
2876                 rc = nrs_tbf_command(policy,
2877                                      head,
2878                                      cmd);
2879                 }
2880                 break;
2881         /**
2882          * Read the TBF policy type of a policy instance.
2883          */
2884         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2885                 struct nrs_tbf_head *head = policy->pol_private;
2886
2887                 *(__u32 *)arg = head->th_type_flag;
2888                 }
2889                 break;
2890         }
2891
2892         RETURN(rc);
2893 }
2894
2895 /**
2896  * Is called for obtaining a TBF policy resource.
2897  *
2898  * \param[in]  policy     The policy on which the request is being asked for
2899  * \param[in]  nrq        The request for which resources are being taken
2900  * \param[in]  parent     Parent resource, unused in this policy
2901  * \param[out] resp       Resources references are placed in this array
2902  * \param[in]  moving_req Signifies limited caller context; unused in this
2903  *                        policy
2904  *
2905  *
2906  * \see nrs_resource_get_safe()
2907  */
2908 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2909                            struct ptlrpc_nrs_request *nrq,
2910                            const struct ptlrpc_nrs_resource *parent,
2911                            struct ptlrpc_nrs_resource **resp,
2912                            bool moving_req)
2913 {
2914         struct nrs_tbf_head   *head;
2915         struct nrs_tbf_client *cli;
2916         struct nrs_tbf_client *tmp;
2917         struct ptlrpc_request *req;
2918
2919         if (parent == NULL) {
2920                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2921                 return 0;
2922         }
2923
2924         head = container_of(parent, struct nrs_tbf_head, th_res);
2925         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2926         cli = head->th_ops->o_cli_find(head, req);
2927         if (cli != NULL) {
2928                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2929                 LASSERT(cli->tc_rule);
2930                 if (cli->tc_rule_sequence !=
2931                     atomic_read(&head->th_rule_sequence) ||
2932                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2933                         struct nrs_tbf_rule *rule;
2934
2935                         CDEBUG(D_RPCTRACE,
2936                                "TBF class@%p rate %llu sequence %d, "
2937                                "rule flags %d, head sequence %d\n",
2938                                cli, cli->tc_rpc_rate,
2939                                cli->tc_rule_sequence,
2940                                cli->tc_rule->tr_flags,
2941                                atomic_read(&head->th_rule_sequence));
2942                         rule = nrs_tbf_rule_match(head, cli);
2943                         if (rule != cli->tc_rule) {
2944                                 nrs_tbf_cli_reset(head, rule, cli);
2945                         } else {
2946                                 if (cli->tc_rule_generation != rule->tr_generation)
2947                                         nrs_tbf_cli_reset_value(head, cli);
2948                                 nrs_tbf_rule_put(rule);
2949                         }
2950                 } else if (cli->tc_rule_generation !=
2951                            cli->tc_rule->tr_generation) {
2952                         nrs_tbf_cli_reset_value(head, cli);
2953                 }
2954                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2955                 goto out;
2956         }
2957
2958         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2959                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2960         if (cli == NULL)
2961                 return -ENOMEM;
2962
2963         nrs_tbf_cli_init(head, cli, req);
2964         tmp = head->th_ops->o_cli_findadd(head, cli);
2965         if (tmp != cli) {
2966                 refcount_dec(&cli->tc_ref);
2967                 nrs_tbf_cli_fini(cli);
2968                 cli = tmp;
2969         }
2970 out:
2971         *resp = &cli->tc_res;
2972
2973         return 1;
2974 }
2975
2976 /**
2977  * Called when releasing references to the resource hierachy obtained for a
2978  * request for scheduling using the TBF policy.
2979  *
2980  * \param[in] policy   the policy the resource belongs to
2981  * \param[in] res      the resource to be released
2982  */
2983 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
2984                             const struct ptlrpc_nrs_resource *res)
2985 {
2986         struct nrs_tbf_head   *head;
2987         struct nrs_tbf_client *cli;
2988
2989         /**
2990          * Do nothing for freeing parent, nrs_tbf_net resources
2991          */
2992         if (res->res_parent == NULL)
2993                 return;
2994
2995         cli = container_of(res, struct nrs_tbf_client, tc_res);
2996         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
2997
2998         head->th_ops->o_cli_put(head, cli);
2999 }
3000
3001 /**
3002  * Called when getting a request from the TBF policy for handling, or just
3003  * peeking; removes the request from the policy when it is to be handled.
3004  *
3005  * \param[in] policy The policy
3006  * \param[in] peek   When set, signifies that we just want to examine the
3007  *                   request, and not handle it, so the request is not removed
3008  *                   from the policy.
3009  * \param[in] force  Force the policy to return a request
3010  *
3011  * \retval The request to be handled; this is the next request in the TBF
3012  *         rule
3013  *
3014  * \see ptlrpc_nrs_req_get_nolock()
3015  * \see nrs_request_get()
3016  */
3017 static
3018 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3019                                            bool peek, bool force)
3020 {
3021         struct nrs_tbf_head       *head = policy->pol_private;
3022         struct ptlrpc_nrs_request *nrq = NULL;
3023         struct nrs_tbf_client     *cli;
3024         struct binheap_node       *node;
3025
3026         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3027
3028         if (likely(!peek && !force) && policy->pol_nrs->nrs_throttling)
3029                 return NULL;
3030
3031         node = binheap_root(head->th_binheap);
3032         if (unlikely(node == NULL))
3033                 return NULL;
3034
3035         cli = container_of(node, struct nrs_tbf_client, tc_node);
3036         LASSERT(cli->tc_in_heap);
3037         if (unlikely(peek)) {
3038                 nrq = list_first_entry(&cli->tc_list,
3039                                        struct ptlrpc_nrs_request,
3040                                        nr_u.tbf.tr_list);
3041         } else {
3042                 struct nrs_tbf_rule *rule = cli->tc_rule;
3043                 __u64 now = ktime_to_ns(ktime_get());
3044                 __u64 passed;
3045                 __u64 ntoken;
3046                 __u64 deadline;
3047                 __u64 old_resid = 0;
3048
3049                 deadline = cli->tc_check_time +
3050                           cli->tc_nsecs;
3051                 LASSERT(now >= cli->tc_check_time);
3052                 passed = now - cli->tc_check_time;
3053                 ntoken = passed * cli->tc_rpc_rate;
3054                 do_div(ntoken, NSEC_PER_SEC);
3055
3056                 ntoken += cli->tc_ntoken;
3057                 if (rule->tr_flags & NTRS_REALTIME) {
3058                         LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3059                         old_resid = cli->tc_nsecs_resid;
3060                         cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3061                         if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3062                                 ntoken++;
3063                                 cli->tc_nsecs_resid -= cli->tc_nsecs;
3064                         }
3065                 } else if (ntoken > cli->tc_depth)
3066                         ntoken = cli->tc_depth;
3067
3068                 /* give an extra token with force mode */
3069                 if (unlikely(force) && ntoken == 0)
3070                         ntoken = 1;
3071
3072                 if (ntoken > 0) {
3073                         nrq = list_first_entry(&cli->tc_list,
3074                                          struct ptlrpc_nrs_request,
3075                                          nr_u.tbf.tr_list);
3076                         ntoken--;
3077                         cli->tc_ntoken = ntoken;
3078                         cli->tc_check_time = now;
3079                         list_del_init(&nrq->nr_u.tbf.tr_list);
3080                         if (list_empty(&cli->tc_list)) {
3081                                 binheap_remove(head->th_binheap,
3082                                                &cli->tc_node);
3083                                 cli->tc_in_heap = false;
3084                         } else {
3085                                 if (!(rule->tr_flags & NTRS_REALTIME))
3086                                         cli->tc_deadline = now + cli->tc_nsecs;
3087                                 binheap_relocate(head->th_binheap,
3088                                                  &cli->tc_node);
3089                         }
3090                         CDEBUG(D_RPCTRACE,
3091                                "TBF dequeues: class@%p rate %llu gen %llu token %llu, rule@%p rate %llu gen %llu\n",
3092                                cli, cli->tc_rpc_rate,
3093                                cli->tc_rule_generation, cli->tc_ntoken,
3094                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3095                                cli->tc_rule->tr_generation);
3096                 } else {
3097                         ktime_t time;
3098
3099                         if (rule->tr_flags & NTRS_REALTIME) {
3100                                 cli->tc_deadline = deadline;
3101                                 cli->tc_nsecs_resid = old_resid;
3102                                 binheap_relocate(head->th_binheap,
3103                                                  &cli->tc_node);
3104                                 if (node != binheap_root(head->th_binheap))
3105                                         return nrs_tbf_req_get(policy,
3106                                                                peek, force);
3107                         }
3108                         policy->pol_nrs->nrs_throttling = 1;
3109                         head->th_deadline = deadline;
3110                         time = ktime_set(0, 0);
3111                         time = ktime_add_ns(time, deadline);
3112                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3113                 }
3114         }
3115
3116         return nrq;
3117 }
3118
3119 /**
3120  * Adds request \a nrq to \a policy's list of queued requests
3121  *
3122  * \param[in] policy The policy
3123  * \param[in] nrq    The request to add
3124  *
3125  * \retval 0 success; nrs_request_enqueue() assumes this function will always
3126  *                    succeed
3127  */
3128 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3129                            struct ptlrpc_nrs_request *nrq)
3130 {
3131         struct nrs_tbf_head   *head;
3132         struct nrs_tbf_client *cli;
3133         int                    rc = 0;
3134
3135         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3136
3137         cli = container_of(nrs_request_resource(nrq),
3138                            struct nrs_tbf_client, tc_res);
3139         head = container_of(nrs_request_resource(nrq)->res_parent,
3140                             struct nrs_tbf_head, th_res);
3141         if (list_empty(&cli->tc_list)) {
3142                 LASSERT(!cli->tc_in_heap);
3143                 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3144                 rc = binheap_insert(head->th_binheap, &cli->tc_node);
3145                 if (rc == 0) {
3146                         cli->tc_in_heap = true;
3147                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3148                         list_add_tail(&nrq->nr_u.tbf.tr_list,
3149                                           &cli->tc_list);
3150                         if (policy->pol_nrs->nrs_throttling) {
3151                                 __u64 deadline = cli->tc_deadline;
3152                                 if ((head->th_deadline > deadline) &&
3153                                     (hrtimer_try_to_cancel(&head->th_timer)
3154                                      >= 0)) {
3155                                         ktime_t time;
3156                                         head->th_deadline = deadline;
3157                                         time = ktime_set(0, 0);
3158                                         time = ktime_add_ns(time, deadline);
3159                                         hrtimer_start(&head->th_timer, time,
3160                                                       HRTIMER_MODE_ABS);
3161                                 }
3162                         }
3163                 }
3164         } else {
3165                 LASSERT(cli->tc_in_heap);
3166                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3167                 list_add_tail(&nrq->nr_u.tbf.tr_list,
3168                                   &cli->tc_list);
3169         }
3170
3171         if (rc == 0)
3172                 CDEBUG(D_RPCTRACE,
3173                        "TBF enqueues: class@%p rate %llu gen %llu token %llu, rule@%p rate %llu gen %llu\n",
3174                        cli, cli->tc_rpc_rate,
3175                        cli->tc_rule_generation, cli->tc_ntoken,
3176                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3177                        cli->tc_rule->tr_generation);
3178
3179         return rc;
3180 }
3181
3182 /**
3183  * Removes request \a nrq from \a policy's list of queued requests.
3184  *
3185  * \param[in] policy The policy
3186  * \param[in] nrq    The request to remove
3187  */
3188 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3189                              struct ptlrpc_nrs_request *nrq)
3190 {
3191         struct nrs_tbf_head   *head;
3192         struct nrs_tbf_client *cli;
3193
3194         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3195
3196         cli = container_of(nrs_request_resource(nrq),
3197                            struct nrs_tbf_client, tc_res);
3198         head = container_of(nrs_request_resource(nrq)->res_parent,
3199                             struct nrs_tbf_head, th_res);
3200
3201         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3202         list_del_init(&nrq->nr_u.tbf.tr_list);
3203         if (list_empty(&cli->tc_list)) {
3204                 binheap_remove(head->th_binheap,
3205                                &cli->tc_node);
3206                 cli->tc_in_heap = false;
3207         } else {
3208                 binheap_relocate(head->th_binheap,
3209                                  &cli->tc_node);
3210         }
3211 }
3212
3213 /**
3214  * Prints a debug statement right before the request \a nrq stops being
3215  * handled.
3216  *
3217  * \param[in] policy The policy handling the request
3218  * \param[in] nrq    The request being handled
3219  *
3220  * \see ptlrpc_server_finish_request()
3221  * \see ptlrpc_nrs_req_stop_nolock()
3222  */
3223 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3224                               struct ptlrpc_nrs_request *nrq)
3225 {
3226         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3227                                                   rq_nrq);
3228
3229         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3230
3231         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3232                policy->pol_desc->pd_name, libcfs_idstr(&req->rq_peer),
3233                nrq->nr_u.tbf.tr_sequence);
3234 }
3235
3236 /**
3237  * debugfs interface
3238  */
3239
3240 /**
3241  * The maximum RPC rate.
3242  */
3243 #define LPROCFS_NRS_RATE_MAX            1000000ULL      /* 1rpc/us */
3244
3245 static int
3246 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3247 {
3248         struct ptlrpc_service       *svc = m->private;
3249         int                          rc;
3250
3251         seq_printf(m, "regular_requests:\n");
3252         /**
3253          * Perform two separate calls to this as only one of the NRS heads'
3254          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3255          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3256          */
3257         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3258                                        NRS_POL_NAME_TBF,
3259                                        NRS_CTL_TBF_RD_RULE,
3260                                        false, m);
3261         if (rc == 0) {
3262                 /**
3263                  * -ENOSPC means buf in the parameter m is overflow, return 0
3264                  * here to let upper layer function seq_read alloc a larger
3265                  * memory area and do this process again.
3266                  */
3267         } else if (rc == -ENOSPC) {
3268                 return 0;
3269
3270                 /**
3271                  * Ignore -ENODEV as the regular NRS head's policy may be in the
3272                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3273                  */
3274         } else if (rc != -ENODEV) {
3275                 return rc;
3276         }
3277
3278         if (!nrs_svc_has_hp(svc))
3279                 goto no_hp;
3280
3281         seq_printf(m, "high_priority_requests:\n");
3282         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3283                                        NRS_POL_NAME_TBF,
3284                                        NRS_CTL_TBF_RD_RULE,
3285                                        false, m);
3286         if (rc == 0) {
3287                 /**
3288                  * -ENOSPC means buf in the parameter m is overflow, return 0
3289                  * here to let upper layer function seq_read alloc a larger
3290                  * memory area and do this process again.
3291                  */
3292         } else if (rc == -ENOSPC) {
3293                 return 0;
3294         }
3295
3296 no_hp:
3297
3298         return rc;
3299 }
3300
3301 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3302 {
3303         int rc;
3304         ENTRY;
3305
3306         switch (cmd->u.tc_start.ts_valid_type) {
3307         case NRS_TBF_FLAG_JOBID:
3308                 rc = nrs_tbf_jobid_parse(cmd, token);
3309                 break;
3310         case NRS_TBF_FLAG_NID:
3311                 rc = nrs_tbf_nid_parse(cmd, token);
3312                 break;
3313         case NRS_TBF_FLAG_OPCODE:
3314                 rc = nrs_tbf_opcode_parse(cmd, token);
3315                 break;
3316         case NRS_TBF_FLAG_GENERIC:
3317                 rc = nrs_tbf_generic_parse(cmd, token);
3318                 break;
3319         case NRS_TBF_FLAG_UID:
3320         case NRS_TBF_FLAG_GID:
3321                 rc = nrs_tbf_ug_id_parse(cmd, token);
3322                 break;
3323         default:
3324                 RETURN(-EINVAL);
3325         }
3326
3327         RETURN(rc);
3328 }
3329
3330 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3331 {
3332         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3333                 switch (cmd->u.tc_start.ts_valid_type) {
3334                 case NRS_TBF_FLAG_JOBID:
3335                         nrs_tbf_jobid_cmd_fini(cmd);
3336                         break;
3337                 case NRS_TBF_FLAG_NID:
3338                         nrs_tbf_nid_cmd_fini(cmd);
3339                         break;
3340                 case NRS_TBF_FLAG_OPCODE:
3341                         nrs_tbf_opcode_cmd_fini(cmd);
3342                         break;
3343                 case NRS_TBF_FLAG_GENERIC:
3344                         nrs_tbf_generic_cmd_fini(cmd);
3345                         break;
3346                 case NRS_TBF_FLAG_UID:
3347                 case NRS_TBF_FLAG_GID:
3348                         nrs_tbf_id_cmd_fini(cmd);
3349                         break;
3350                 default:
3351                         CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3352                               cmd->u.tc_start.ts_valid_type);
3353                 }
3354         }
3355 }
3356
3357 static int check_rule_name(const char *name)
3358 {
3359         int i;
3360
3361         if (name[0] == '\0')
3362                 return -EINVAL;
3363
3364         for (i = 0; name[i] != '\0' && i < MAX_TBF_NAME; i++) {
3365                 if (!isalnum(name[i]) && name[i] != '_')
3366                         return -EINVAL;
3367         }
3368
3369         if (i == MAX_TBF_NAME)
3370                 return -ENAMETOOLONG;
3371
3372         return 0;
3373 }
3374
3375 static int
3376 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3377 {
3378         char    *key;
3379         char    *val;
3380         int      rc;
3381         __u64    rate;
3382
3383         val = buffer;
3384         key = strsep(&val, "=");
3385         if (val == NULL || strlen(val) == 0)
3386                 return -EINVAL;
3387
3388         /* Key of the value pair */
3389         if (strcmp(key, "rate") == 0) {
3390                 rc = kstrtoull(val, 10, &rate);
3391                 if (rc)
3392                         return rc;
3393
3394                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3395                         return -EINVAL;
3396
3397                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3398                         cmd->u.tc_start.ts_rpc_rate = rate;
3399                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3400                         cmd->u.tc_change.tc_rpc_rate = rate;
3401                 else
3402                         return -EINVAL;
3403         }  else if (strcmp(key, "rank") == 0) {
3404                 rc = check_rule_name(val);
3405                 if (rc)
3406                         return rc;
3407
3408                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3409                         cmd->u.tc_start.ts_next_name = val;
3410                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3411                         cmd->u.tc_change.tc_next_name = val;
3412                 else
3413                         return -EINVAL;
3414         } else if (strcmp(key, "realtime") == 0) {
3415                 unsigned long realtime;
3416
3417                 rc = kstrtoul(val, 10, &realtime);
3418                 if (rc)
3419                         return rc;
3420
3421                 if (realtime > 0)
3422                         cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3423         } else {
3424                 return -EINVAL;
3425         }
3426         return 0;
3427 }
3428
3429 static int
3430 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3431 {
3432         char    *val;
3433         char    *token;
3434         int      rc;
3435
3436         val = buffer;
3437         while (val != NULL && strlen(val) != 0) {
3438                 token = strsep(&val, " ");
3439                 rc = nrs_tbf_parse_value_pair(cmd, token);
3440                 if (rc)
3441                         return rc;
3442         }
3443
3444         switch (cmd->tc_cmd) {
3445         case NRS_CTL_TBF_START_RULE:
3446                 if (cmd->u.tc_start.ts_rpc_rate == 0)
3447                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3448                 break;
3449         case NRS_CTL_TBF_CHANGE_RULE:
3450                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3451                     cmd->u.tc_change.tc_next_name == NULL)
3452                         return -EINVAL;
3453                 break;
3454         case NRS_CTL_TBF_STOP_RULE:
3455                 break;
3456         default:
3457                 return -EINVAL;
3458         }
3459         return 0;
3460 }
3461
3462 static struct nrs_tbf_cmd *
3463 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3464 {
3465         struct nrs_tbf_cmd *cmd;
3466         char *token;
3467         char *val;
3468         int rc = 0;
3469
3470         OBD_ALLOC_PTR(cmd);
3471         if (cmd == NULL)
3472                 GOTO(out, rc = -ENOMEM);
3473         memset(cmd, 0, sizeof(*cmd));
3474
3475         val = buffer;
3476         token = strsep(&val, " ");
3477         if (val == NULL || strlen(val) == 0)
3478                 GOTO(out_free_cmd, rc = -EINVAL);
3479
3480         /* Type of the command */
3481         if (strcmp(token, "start") == 0) {
3482                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3483                 cmd->u.tc_start.ts_valid_type = type_flag;
3484         } else if (strcmp(token, "stop") == 0)
3485                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3486         else if (strcmp(token, "change") == 0)
3487                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3488         else
3489                 GOTO(out_free_cmd, rc = -EINVAL);
3490
3491         /* Name of the rule */
3492         token = strsep(&val, " ");
3493         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE))
3494                 GOTO(out_free_cmd, rc = -EINVAL);
3495
3496         rc = check_rule_name(token);
3497         if (rc)
3498                 GOTO(out_free_cmd, rc);
3499
3500         cmd->tc_name = token;
3501
3502         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3503                 /* List of ID */
3504                 LASSERT(val);
3505                 token = val;
3506                 val = strrchr(token, '}');
3507                 if (!val)
3508                         GOTO(out_free_cmd, rc = -EINVAL);
3509
3510                 /* Skip '}' */
3511                 val++;
3512                 if (*val == '\0') {
3513                         val = NULL;
3514                 } else if (*val == ' ') {
3515                         *val = '\0';
3516                         val++;
3517                 } else
3518                         GOTO(out_free_cmd, rc = -EINVAL);
3519
3520                 rc = nrs_tbf_id_parse(cmd, token);
3521                 if (rc)
3522                         GOTO(out_free_cmd, rc);
3523         }
3524
3525         rc = nrs_tbf_parse_value_pairs(cmd, val);
3526         if (rc)
3527                 GOTO(out_cmd_fini, rc = -EINVAL);
3528         goto out;
3529 out_cmd_fini:
3530         nrs_tbf_cmd_fini(cmd);
3531 out_free_cmd:
3532         OBD_FREE_PTR(cmd);
3533 out:
3534         if (rc)
3535                 cmd = ERR_PTR(rc);
3536         return cmd;
3537 }
3538
3539 /**
3540  * Get the TBF policy type (nid, jobid, etc) preset by
3541  * proc entry 'nrs_policies' for command buffer parsing.
3542  *
3543  * \param[in] svc the PTLRPC service
3544  * \param[in] queue the NRS queue type
3545  *
3546  * \retval the preset TBF policy type flag
3547  */
3548 static __u32
3549 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3550 {
3551         __u32   type;
3552         int     rc;
3553
3554         rc = ptlrpc_nrs_policy_control(svc, queue,
3555                                        NRS_POL_NAME_TBF,
3556                                        NRS_CTL_TBF_RD_TYPE_FLAG,
3557                                        true, &type);
3558         if (rc != 0)
3559                 type = NRS_TBF_FLAG_INVALID;
3560
3561         return type;
3562 }
3563
3564 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3565 static ssize_t
3566 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3567                                       const char __user *buffer,
3568                                       size_t count, loff_t *off)
3569 {
3570         struct seq_file *m = file->private_data;
3571         struct ptlrpc_service *svc = m->private;
3572         char *kernbuf;
3573         char *val;
3574         int rc;
3575         struct nrs_tbf_cmd *cmd;
3576         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3577         unsigned long length;
3578         char *token;
3579
3580         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3581         if (kernbuf == NULL)
3582                 GOTO(out, rc = -ENOMEM);
3583
3584         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3585                 GOTO(out_free_kernbuff, rc = -EINVAL);
3586
3587         if (copy_from_user(kernbuf, buffer, count))
3588                 GOTO(out_free_kernbuff, rc = -EFAULT);
3589
3590         val = kernbuf;
3591         token = strsep(&val, " ");
3592         if (val == NULL)
3593                 GOTO(out_free_kernbuff, rc = -EINVAL);
3594
3595         if (strcmp(token, "reg") == 0) {
3596                 queue = PTLRPC_NRS_QUEUE_REG;
3597         } else if (strcmp(token, "hp") == 0) {
3598                 queue = PTLRPC_NRS_QUEUE_HP;
3599         } else {
3600                 kernbuf[strlen(token)] = ' ';
3601                 val = kernbuf;
3602         }
3603         length = strlen(val);
3604
3605         if (length == 0)
3606                 GOTO(out_free_kernbuff, rc = -EINVAL);
3607
3608         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3609                 GOTO(out_free_kernbuff, rc = -ENODEV);
3610         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3611                 queue = PTLRPC_NRS_QUEUE_REG;
3612
3613         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3614         if (IS_ERR(cmd))
3615                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3616
3617         /**
3618          * Serialize NRS core lprocfs operations with policy registration/
3619          * unregistration.
3620          */
3621         mutex_lock(&nrs_core.nrs_mutex);
3622         rc = ptlrpc_nrs_policy_control(svc, queue,
3623                                        NRS_POL_NAME_TBF,
3624                                        NRS_CTL_TBF_WR_RULE,
3625                                        false, cmd);
3626         mutex_unlock(&nrs_core.nrs_mutex);
3627
3628         nrs_tbf_cmd_fini(cmd);
3629         OBD_FREE_PTR(cmd);
3630 out_free_kernbuff:
3631         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3632 out:
3633         return rc ? rc : count;
3634 }
3635
3636 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3637
3638 /**
3639  * Initializes a TBF policy's lprocfs interface for service \a svc
3640  *
3641  * \param[in] svc the service
3642  *
3643  * \retval 0    success
3644  * \retval != 0 error
3645  */
3646 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3647 {
3648         struct ldebugfs_vars nrs_tbf_lprocfs_vars[] = {
3649                 { .name         = "nrs_tbf_rule",
3650                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3651                   .data = svc },
3652                 { NULL }
3653         };
3654
3655         if (!svc->srv_debugfs_entry)
3656                 return 0;
3657
3658         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_tbf_lprocfs_vars, NULL);
3659
3660         return 0;
3661 }
3662
3663 /**
3664  * TBF policy operations
3665  */
3666 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3667         .op_policy_start        = nrs_tbf_start,
3668         .op_policy_stop         = nrs_tbf_stop,
3669         .op_policy_ctl          = nrs_tbf_ctl,
3670         .op_res_get             = nrs_tbf_res_get,
3671         .op_res_put             = nrs_tbf_res_put,
3672         .op_req_get             = nrs_tbf_req_get,
3673         .op_req_enqueue         = nrs_tbf_req_add,
3674         .op_req_dequeue         = nrs_tbf_req_del,
3675         .op_req_stop            = nrs_tbf_req_stop,
3676         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3677 };
3678
3679 /**
3680  * TBF policy configuration
3681  */
3682 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3683         .nc_name                = NRS_POL_NAME_TBF,
3684         .nc_ops                 = &nrs_tbf_ops,
3685         .nc_compat              = nrs_policy_compat_all,
3686 };
3687
3688 /** @} tbf */
3689
3690 /** @} nrs */