Whamcloud - gitweb
LU-9611 lod: allow -1 for default stripe count/offset
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 #ifdef HAVE_SERVER_SUPPORT
35
36 /**
37  * \addtogoup nrs
38  * @{
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <libcfs/libcfs.h>
45 #include "ptlrpc_internal.h"
46
47 /**
48  * \name tbf
49  *
50  * Token Bucket Filter over client NIDs
51  *
52  * @{
53  */
54
55 #define NRS_POL_NAME_TBF        "tbf"
56
57 static int tbf_jobid_cache_size = 8192;
58 module_param(tbf_jobid_cache_size, int, 0644);
59 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
60
61 static int tbf_rate = 10000;
62 module_param(tbf_rate, int, 0644);
63 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
64
65 static int tbf_depth = 3;
66 module_param(tbf_depth, int, 0644);
67 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
68
69 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
70 {
71         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
72                                                  th_timer);
73         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
74         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
75
76         nrs->nrs_throttling = 0;
77         wake_up(&svcpt->scp_waitq);
78
79         return HRTIMER_NORESTART;
80 }
81
82 #define NRS_TBF_DEFAULT_RULE "default"
83
84 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
85 {
86         LASSERT(atomic_read(&rule->tr_ref) == 0);
87         LASSERT(list_empty(&rule->tr_cli_list));
88         LASSERT(list_empty(&rule->tr_linkage));
89
90         rule->tr_head->th_ops->o_rule_fini(rule);
91         OBD_FREE_PTR(rule);
92 }
93
94 /**
95  * Decreases the rule's usage reference count, and stops the rule in case it
96  * was already stopping and have no more outstanding usage references (which
97  * indicates it has no more queued or started requests, and can be safely
98  * stopped).
99  */
100 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
101 {
102         if (atomic_dec_and_test(&rule->tr_ref))
103                 nrs_tbf_rule_fini(rule);
104 }
105
106 /**
107  * Increases the rule's usage reference count.
108  */
109 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
110 {
111         atomic_inc(&rule->tr_ref);
112 }
113
114 static void
115 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
116 {
117         LASSERT(!list_empty(&cli->tc_linkage));
118         LASSERT(cli->tc_rule);
119         spin_lock(&cli->tc_rule->tr_rule_lock);
120         list_del_init(&cli->tc_linkage);
121         spin_unlock(&cli->tc_rule->tr_rule_lock);
122         nrs_tbf_rule_put(cli->tc_rule);
123         cli->tc_rule = NULL;
124 }
125
126 static void
127 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
128                         struct nrs_tbf_client *cli)
129
130 {
131         struct nrs_tbf_rule *rule = cli->tc_rule;
132
133         cli->tc_rpc_rate = rule->tr_rpc_rate;
134         cli->tc_nsecs = rule->tr_nsecs;
135         cli->tc_depth = rule->tr_depth;
136         cli->tc_ntoken = rule->tr_depth;
137         cli->tc_check_time = ktime_to_ns(ktime_get());
138         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
139         cli->tc_rule_generation = rule->tr_generation;
140
141         if (cli->tc_in_heap)
142                 cfs_binheap_relocate(head->th_binheap,
143                                      &cli->tc_node);
144 }
145
146 static void
147 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
148                   struct nrs_tbf_rule *rule,
149                   struct nrs_tbf_client *cli)
150 {
151         spin_lock(&cli->tc_rule_lock);
152         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
153                 LASSERT(rule != cli->tc_rule);
154                 nrs_tbf_cli_rule_put(cli);
155         }
156         LASSERT(cli->tc_rule == NULL);
157         LASSERT(list_empty(&cli->tc_linkage));
158         /* Rule's ref is added before called */
159         cli->tc_rule = rule;
160         spin_lock(&rule->tr_rule_lock);
161         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
162         spin_unlock(&rule->tr_rule_lock);
163         spin_unlock(&cli->tc_rule_lock);
164         nrs_tbf_cli_reset_value(head, cli);
165 }
166
167 static int
168 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
169 {
170         return rule->tr_head->th_ops->o_rule_dump(rule, m);
171 }
172
173 static int
174 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
175 {
176         struct nrs_tbf_rule *rule;
177         int rc = 0;
178
179         LASSERT(head != NULL);
180         spin_lock(&head->th_rule_lock);
181         /* List the rules from newest to oldest */
182         list_for_each_entry(rule, &head->th_list, tr_linkage) {
183                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
184                 rc = nrs_tbf_rule_dump(rule, m);
185                 if (rc) {
186                         rc = -ENOSPC;
187                         break;
188                 }
189         }
190         spin_unlock(&head->th_rule_lock);
191
192         return rc;
193 }
194
195 static struct nrs_tbf_rule *
196 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
197                          const char *name)
198 {
199         struct nrs_tbf_rule *rule;
200
201         LASSERT(head != NULL);
202         list_for_each_entry(rule, &head->th_list, tr_linkage) {
203                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
204                 if (strcmp(rule->tr_name, name) == 0) {
205                         nrs_tbf_rule_get(rule);
206                         return rule;
207                 }
208         }
209         return NULL;
210 }
211
212 static struct nrs_tbf_rule *
213 nrs_tbf_rule_find(struct nrs_tbf_head *head,
214                   const char *name)
215 {
216         struct nrs_tbf_rule *rule;
217
218         LASSERT(head != NULL);
219         spin_lock(&head->th_rule_lock);
220         rule = nrs_tbf_rule_find_nolock(head, name);
221         spin_unlock(&head->th_rule_lock);
222         return rule;
223 }
224
225 static struct nrs_tbf_rule *
226 nrs_tbf_rule_match(struct nrs_tbf_head *head,
227                    struct nrs_tbf_client *cli)
228 {
229         struct nrs_tbf_rule *rule = NULL;
230         struct nrs_tbf_rule *tmp_rule;
231
232         spin_lock(&head->th_rule_lock);
233         /* Match the newest rule in the list */
234         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
235                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
236                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
237                         rule = tmp_rule;
238                         break;
239                 }
240         }
241
242         if (rule == NULL)
243                 rule = head->th_rule;
244
245         nrs_tbf_rule_get(rule);
246         spin_unlock(&head->th_rule_lock);
247         return rule;
248 }
249
250 static void
251 nrs_tbf_cli_init(struct nrs_tbf_head *head,
252                  struct nrs_tbf_client *cli,
253                  struct ptlrpc_request *req)
254 {
255         struct nrs_tbf_rule *rule;
256
257         memset(cli, 0, sizeof(*cli));
258         cli->tc_in_heap = false;
259         head->th_ops->o_cli_init(cli, req);
260         INIT_LIST_HEAD(&cli->tc_list);
261         INIT_LIST_HEAD(&cli->tc_linkage);
262         spin_lock_init(&cli->tc_rule_lock);
263         atomic_set(&cli->tc_ref, 1);
264         rule = nrs_tbf_rule_match(head, cli);
265         nrs_tbf_cli_reset(head, rule, cli);
266 }
267
268 static void
269 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
270 {
271         LASSERT(list_empty(&cli->tc_list));
272         LASSERT(!cli->tc_in_heap);
273         LASSERT(atomic_read(&cli->tc_ref) == 0);
274         spin_lock(&cli->tc_rule_lock);
275         nrs_tbf_cli_rule_put(cli);
276         spin_unlock(&cli->tc_rule_lock);
277         OBD_FREE_PTR(cli);
278 }
279
280 static int
281 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
282                    struct nrs_tbf_head *head,
283                    struct nrs_tbf_cmd *start)
284 {
285         struct nrs_tbf_rule     *rule;
286         struct nrs_tbf_rule     *tmp_rule;
287         struct nrs_tbf_rule     *next_rule;
288         char                    *next_name = start->u.tc_start.ts_next_name;
289         int                      rc;
290
291         rule = nrs_tbf_rule_find(head, start->tc_name);
292         if (rule) {
293                 nrs_tbf_rule_put(rule);
294                 return -EEXIST;
295         }
296
297         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
298         if (rule == NULL)
299                 return -ENOMEM;
300
301         memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
302         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
303         rule->tr_nsecs = NSEC_PER_SEC;
304         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
305         rule->tr_depth = tbf_depth;
306         atomic_set(&rule->tr_ref, 1);
307         INIT_LIST_HEAD(&rule->tr_cli_list);
308         INIT_LIST_HEAD(&rule->tr_nids);
309         INIT_LIST_HEAD(&rule->tr_linkage);
310         spin_lock_init(&rule->tr_rule_lock);
311         rule->tr_head = head;
312
313         rc = head->th_ops->o_rule_init(policy, rule, start);
314         if (rc) {
315                 OBD_FREE_PTR(rule);
316                 return rc;
317         }
318
319         /* Add as the newest rule */
320         spin_lock(&head->th_rule_lock);
321         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
322         if (tmp_rule) {
323                 spin_unlock(&head->th_rule_lock);
324                 nrs_tbf_rule_put(tmp_rule);
325                 nrs_tbf_rule_put(rule);
326                 return -EEXIST;
327         }
328
329         if (next_name) {
330                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
331                 if (!next_rule) {
332                         spin_unlock(&head->th_rule_lock);
333                         nrs_tbf_rule_put(rule);
334                         return -ENOENT;
335                 }
336
337                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
338                 nrs_tbf_rule_put(next_rule);
339         } else {
340                 /* Add on the top of the rule list */
341                 list_add(&rule->tr_linkage, &head->th_list);
342         }
343         spin_unlock(&head->th_rule_lock);
344         atomic_inc(&head->th_rule_sequence);
345         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
346                 rule->tr_flags |= NTRS_DEFAULT;
347                 LASSERT(head->th_rule == NULL);
348                 head->th_rule = rule;
349         }
350
351         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %llu gen %llu\n",
352                rule, rule->tr_rpc_rate, rule->tr_generation);
353
354         return 0;
355 }
356
357 /**
358  * Change the rank of a rule in the rule list
359  *
360  * The matched rule will be moved to the position right before another
361  * given rule.
362  *
363  * \param[in] policy    the policy instance
364  * \param[in] head      the TBF policy instance
365  * \param[in] name      the rule name to be moved
366  * \param[in] next_name the rule name before which the matched rule will be
367  *                      moved
368  *
369  */
370 static int
371 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
372                          struct nrs_tbf_head *head,
373                          char *name,
374                          char *next_name)
375 {
376         struct nrs_tbf_rule     *rule = NULL;
377         struct nrs_tbf_rule     *next_rule = NULL;
378         int                      rc = 0;
379
380         LASSERT(head != NULL);
381
382         spin_lock(&head->th_rule_lock);
383         rule = nrs_tbf_rule_find_nolock(head, name);
384         if (!rule)
385                 GOTO(out, rc = -ENOENT);
386
387         if (strcmp(name, next_name) == 0)
388                 GOTO(out_put, rc);
389
390         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
391         if (!next_rule)
392                 GOTO(out_put, rc = -ENOENT);
393
394         list_move(&rule->tr_linkage, next_rule->tr_linkage.prev);
395         nrs_tbf_rule_put(next_rule);
396 out_put:
397         nrs_tbf_rule_put(rule);
398 out:
399         spin_unlock(&head->th_rule_lock);
400         return rc;
401 }
402
403 static int
404 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
405                          struct nrs_tbf_head *head,
406                          char *name,
407                          __u64 rate)
408 {
409         struct nrs_tbf_rule *rule;
410
411         assert_spin_locked(&policy->pol_nrs->nrs_lock);
412
413         rule = nrs_tbf_rule_find(head, name);
414         if (rule == NULL)
415                 return -ENOENT;
416
417         rule->tr_rpc_rate = rate;
418         rule->tr_nsecs = NSEC_PER_SEC;
419         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
420         rule->tr_generation++;
421         nrs_tbf_rule_put(rule);
422
423         return 0;
424 }
425
426 static int
427 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
428                     struct nrs_tbf_head *head,
429                     struct nrs_tbf_cmd *change)
430 {
431         __u64    rate = change->u.tc_change.tc_rpc_rate;
432         char    *next_name = change->u.tc_change.tc_next_name;
433         int      rc;
434
435         if (rate != 0) {
436                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
437                                               rate);
438                 if (rc)
439                         return rc;
440         }
441
442         if (next_name) {
443                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
444                                               next_name);
445                 if (rc)
446                         return rc;
447         }
448
449         return 0;
450 }
451
452 static int
453 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
454                   struct nrs_tbf_head *head,
455                   struct nrs_tbf_cmd *stop)
456 {
457         struct nrs_tbf_rule *rule;
458
459         assert_spin_locked(&policy->pol_nrs->nrs_lock);
460
461         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
462                 return -EPERM;
463
464         rule = nrs_tbf_rule_find(head, stop->tc_name);
465         if (rule == NULL)
466                 return -ENOENT;
467
468         list_del_init(&rule->tr_linkage);
469         rule->tr_flags |= NTRS_STOPPING;
470         nrs_tbf_rule_put(rule);
471         nrs_tbf_rule_put(rule);
472
473         return 0;
474 }
475
476 static int
477 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
478                 struct nrs_tbf_head *head,
479                 struct nrs_tbf_cmd *cmd)
480 {
481         int rc;
482
483         assert_spin_locked(&policy->pol_nrs->nrs_lock);
484
485         switch (cmd->tc_cmd) {
486         case NRS_CTL_TBF_START_RULE:
487                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
488                         return -EINVAL;
489
490                 spin_unlock(&policy->pol_nrs->nrs_lock);
491                 rc = nrs_tbf_rule_start(policy, head, cmd);
492                 spin_lock(&policy->pol_nrs->nrs_lock);
493                 return rc;
494         case NRS_CTL_TBF_CHANGE_RULE:
495                 rc = nrs_tbf_rule_change(policy, head, cmd);
496                 return rc;
497         case NRS_CTL_TBF_STOP_RULE:
498                 rc = nrs_tbf_rule_stop(policy, head, cmd);
499                 /* Take it as a success, if not exists at all */
500                 return rc == -ENOENT ? 0 : rc;
501         default:
502                 return -EFAULT;
503         }
504 }
505
506 /**
507  * Binary heap predicate.
508  *
509  * \param[in] e1 the first binheap node to compare
510  * \param[in] e2 the second binheap node to compare
511  *
512  * \retval 0 e1 > e2
513  * \retval 1 e1 < e2
514  */
515 static int
516 tbf_cli_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
517 {
518         struct nrs_tbf_client *cli1;
519         struct nrs_tbf_client *cli2;
520
521         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
522         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
523
524         if (cli1->tc_check_time + cli1->tc_nsecs <
525             cli2->tc_check_time + cli2->tc_nsecs)
526                 return 1;
527         else if (cli1->tc_check_time + cli1->tc_nsecs >
528                  cli2->tc_check_time + cli2->tc_nsecs)
529                 return 0;
530
531         if (cli1->tc_check_time < cli2->tc_check_time)
532                 return 1;
533         else if (cli1->tc_check_time > cli2->tc_check_time)
534                 return 0;
535
536         /* Maybe need more comparasion, e.g. request number in the rules */
537         return 1;
538 }
539
540 /**
541  * TBF binary heap operations
542  */
543 static struct cfs_binheap_ops nrs_tbf_heap_ops = {
544         .hop_enter      = NULL,
545         .hop_exit       = NULL,
546         .hop_compare    = tbf_cli_compare,
547 };
548
549 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
550                                   unsigned mask)
551 {
552         return cfs_hash_djb2_hash(key, strlen(key), mask);
553 }
554
555 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
556 {
557         struct nrs_tbf_client *cli = hlist_entry(hnode,
558                                                      struct nrs_tbf_client,
559                                                      tc_hnode);
560
561         return (strcmp(cli->tc_jobid, key) == 0);
562 }
563
564 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
565 {
566         struct nrs_tbf_client *cli = hlist_entry(hnode,
567                                                      struct nrs_tbf_client,
568                                                      tc_hnode);
569
570         return cli->tc_jobid;
571 }
572
573 static void *nrs_tbf_jobid_hop_object(struct hlist_node *hnode)
574 {
575         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
576 }
577
578 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
579 {
580         struct nrs_tbf_client *cli = hlist_entry(hnode,
581                                                      struct nrs_tbf_client,
582                                                      tc_hnode);
583
584         atomic_inc(&cli->tc_ref);
585 }
586
587 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
588 {
589         struct nrs_tbf_client *cli = hlist_entry(hnode,
590                                                      struct nrs_tbf_client,
591                                                      tc_hnode);
592
593         atomic_dec(&cli->tc_ref);
594 }
595
596 static void
597 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
598
599 {
600         struct nrs_tbf_client *cli = hlist_entry(hnode,
601                                                  struct nrs_tbf_client,
602                                                  tc_hnode);
603
604         LASSERT(atomic_read(&cli->tc_ref) == 0);
605         nrs_tbf_cli_fini(cli);
606 }
607
608 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
609         .hs_hash        = nrs_tbf_jobid_hop_hash,
610         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
611         .hs_key         = nrs_tbf_jobid_hop_key,
612         .hs_object      = nrs_tbf_jobid_hop_object,
613         .hs_get         = nrs_tbf_jobid_hop_get,
614         .hs_put         = nrs_tbf_jobid_hop_put,
615         .hs_put_locked  = nrs_tbf_jobid_hop_put,
616         .hs_exit        = nrs_tbf_jobid_hop_exit,
617 };
618
619 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
620                                   CFS_HASH_NO_ITEMREF | \
621                                   CFS_HASH_DEPTH)
622
623 static struct nrs_tbf_client *
624 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
625                           struct cfs_hash_bd *bd,
626                           const char *jobid)
627 {
628         struct hlist_node *hnode;
629         struct nrs_tbf_client *cli;
630
631         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
632         if (hnode == NULL)
633                 return NULL;
634
635         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
636         if (!list_empty(&cli->tc_lru))
637                 list_del_init(&cli->tc_lru);
638         return cli;
639 }
640
641 #define NRS_TBF_JOBID_NULL ""
642
643 static struct nrs_tbf_client *
644 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
645                        struct ptlrpc_request *req)
646 {
647         const char              *jobid;
648         struct nrs_tbf_client   *cli;
649         struct cfs_hash         *hs = head->th_cli_hash;
650         struct cfs_hash_bd               bd;
651
652         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
653         if (jobid == NULL)
654                 jobid = NRS_TBF_JOBID_NULL;
655         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
656         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
657         cfs_hash_bd_unlock(hs, &bd, 1);
658
659         return cli;
660 }
661
662 static struct nrs_tbf_client *
663 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
664                           struct nrs_tbf_client *cli)
665 {
666         const char              *jobid;
667         struct nrs_tbf_client   *ret;
668         struct cfs_hash         *hs = head->th_cli_hash;
669         struct cfs_hash_bd               bd;
670
671         jobid = cli->tc_jobid;
672         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
673         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
674         if (ret == NULL) {
675                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
676                 ret = cli;
677         }
678         cfs_hash_bd_unlock(hs, &bd, 1);
679
680         return ret;
681 }
682
683 static void
684 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
685                       struct nrs_tbf_client *cli)
686 {
687         struct cfs_hash_bd               bd;
688         struct cfs_hash         *hs = head->th_cli_hash;
689         struct nrs_tbf_bucket   *bkt;
690         int                      hw;
691         struct list_head        zombies;
692
693         INIT_LIST_HEAD(&zombies);
694         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
695         bkt = cfs_hash_bd_extra_get(hs, &bd);
696         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
697                 return;
698         LASSERT(list_empty(&cli->tc_lru));
699         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
700
701         /*
702          * Check and purge the LRU, there is at least one client in the LRU.
703          */
704         hw = tbf_jobid_cache_size >>
705              (hs->hs_cur_bits - hs->hs_bkt_bits);
706         while (cfs_hash_bd_count_get(&bd) > hw) {
707                 if (unlikely(list_empty(&bkt->ntb_lru)))
708                         break;
709                 cli = list_entry(bkt->ntb_lru.next,
710                                      struct nrs_tbf_client,
711                                      tc_lru);
712                 LASSERT(atomic_read(&cli->tc_ref) == 0);
713                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
714                 list_move(&cli->tc_lru, &zombies);
715         }
716         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
717
718         while (!list_empty(&zombies)) {
719                 cli = container_of0(zombies.next,
720                                     struct nrs_tbf_client, tc_lru);
721                 list_del_init(&cli->tc_lru);
722                 nrs_tbf_cli_fini(cli);
723         }
724 }
725
726 static void
727 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
728                        struct ptlrpc_request *req)
729 {
730         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
731
732         if (jobid == NULL)
733                 jobid = NRS_TBF_JOBID_NULL;
734         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
735         INIT_LIST_HEAD(&cli->tc_lru);
736         memcpy(cli->tc_jobid, jobid, strlen(jobid));
737 }
738
739 static int nrs_tbf_jobid_hash_order(void)
740 {
741         int bits;
742
743         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
744                 ;
745
746         return bits;
747 }
748
749 #define NRS_TBF_JOBID_BKT_BITS 10
750
751 static int
752 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
753                       struct nrs_tbf_head *head)
754 {
755         struct nrs_tbf_cmd       start;
756         struct nrs_tbf_bucket   *bkt;
757         int                      bits;
758         int                      i;
759         int                      rc;
760         struct cfs_hash_bd       bd;
761
762         bits = nrs_tbf_jobid_hash_order();
763         if (bits < NRS_TBF_JOBID_BKT_BITS)
764                 bits = NRS_TBF_JOBID_BKT_BITS;
765         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
766                                             bits,
767                                             bits,
768                                             NRS_TBF_JOBID_BKT_BITS,
769                                             sizeof(*bkt),
770                                             0,
771                                             0,
772                                             &nrs_tbf_jobid_hash_ops,
773                                             NRS_TBF_JOBID_HASH_FLAGS);
774         if (head->th_cli_hash == NULL)
775                 return -ENOMEM;
776
777         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
778                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
779                 INIT_LIST_HEAD(&bkt->ntb_lru);
780         }
781
782         memset(&start, 0, sizeof(start));
783         start.u.tc_start.ts_jobids_str = "*";
784
785         start.u.tc_start.ts_rpc_rate = tbf_rate;
786         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
787         start.tc_name = NRS_TBF_DEFAULT_RULE;
788         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
789         rc = nrs_tbf_rule_start(policy, head, &start);
790         if (rc) {
791                 cfs_hash_putref(head->th_cli_hash);
792                 head->th_cli_hash = NULL;
793         }
794
795         return rc;
796 }
797
798 /**
799  * Frees jobid of \a list.
800  *
801  */
802 static void
803 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
804 {
805         struct nrs_tbf_jobid *jobid, *n;
806
807         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
808                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
809                 list_del(&jobid->tj_linkage);
810                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
811         }
812 }
813
814 static int
815 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
816 {
817         struct nrs_tbf_jobid *jobid;
818         struct cfs_lstr res;
819         int rc;
820
821         OBD_ALLOC(jobid, sizeof(struct nrs_tbf_jobid));
822         if (jobid == NULL)
823                 return -ENOMEM;
824
825         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
826         if (jobid->tj_id == NULL) {
827                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
828                 return -ENOMEM;
829         }
830
831         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
832         rc = cfs_gettok(id, '*', &res);
833         if (rc == 0)
834                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
835         else
836                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
837
838         list_add_tail(&jobid->tj_linkage, jobid_list);
839         return 0;
840 }
841
842 static bool
843 cfs_match_wildcard(const char *pattern, const char *content)
844 {
845         if (*pattern == '\0' && *content == '\0')
846                 return true;
847
848         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
849                 return false;
850
851         while (*pattern == *content) {
852                 pattern++;
853                 content++;
854                 if (*pattern == '\0' && *content == '\0')
855                         return true;
856
857                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
858                     *content == '\0')
859                         return false;
860         }
861
862         if (*pattern == '*')
863                 return (cfs_match_wildcard(pattern + 1, content) ||
864                         cfs_match_wildcard(pattern, content + 1));
865
866         return false;
867 }
868
869 static inline bool
870 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
871 {
872         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
873                 return strcmp(jobid->tj_id, id) == 0;
874
875         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
876                 return cfs_match_wildcard(jobid->tj_id, id);
877
878         return false;
879 }
880
881 static int
882 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
883 {
884         struct nrs_tbf_jobid *jobid;
885
886         list_for_each_entry(jobid, jobid_list, tj_linkage) {
887                 if (nrs_tbf_jobid_match(jobid, id))
888                         return 1;
889         }
890         return 0;
891 }
892
893 static int
894 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
895 {
896         struct cfs_lstr src;
897         struct cfs_lstr res;
898         int rc = 0;
899         ENTRY;
900
901         src.ls_str = str;
902         src.ls_len = len;
903         INIT_LIST_HEAD(jobid_list);
904         while (src.ls_str) {
905                 rc = cfs_gettok(&src, ' ', &res);
906                 if (rc == 0) {
907                         rc = -EINVAL;
908                         break;
909                 }
910                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
911                 if (rc)
912                         break;
913         }
914         if (rc)
915                 nrs_tbf_jobid_list_free(jobid_list);
916         RETURN(rc);
917 }
918
919 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
920 {
921         if (!list_empty(&cmd->u.tc_start.ts_jobids))
922                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
923         if (cmd->u.tc_start.ts_jobids_str)
924                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
925                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
926 }
927
928 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
929 {
930         struct cfs_lstr res;
931         int keylen = strlen(key);
932         int rc;
933
934         rc = cfs_gettok(src, '=', &res);
935         if (rc == 0 || res.ls_len != keylen ||
936             strncmp(res.ls_str, key, keylen) != 0 ||
937             src->ls_len <= 2 || src->ls_str[0] != '{' ||
938             src->ls_str[src->ls_len - 1] != '}')
939                 return -EINVAL;
940
941         /* Skip '{' and '}' */
942         src->ls_str++;
943         src->ls_len -= 2;
944         return 0;
945 }
946
947 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
948 {
949         struct cfs_lstr src;
950         int rc;
951
952         src.ls_str = id;
953         src.ls_len = strlen(id);
954         rc = nrs_tbf_check_id_value(&src, "jobid");
955         if (rc)
956                 return rc;
957
958         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
959         if (cmd->u.tc_start.ts_jobids_str == NULL)
960                 return -ENOMEM;
961
962         memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
963
964         /* parse jobid list */
965         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
966                                       strlen(cmd->u.tc_start.ts_jobids_str),
967                                       &cmd->u.tc_start.ts_jobids);
968         if (rc)
969                 nrs_tbf_jobid_cmd_fini(cmd);
970
971         return rc;
972 }
973
974 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
975                                    struct nrs_tbf_rule *rule,
976                                    struct nrs_tbf_cmd *start)
977 {
978         int rc = 0;
979
980         LASSERT(start->u.tc_start.ts_jobids_str);
981         OBD_ALLOC(rule->tr_jobids_str,
982                   strlen(start->u.tc_start.ts_jobids_str) + 1);
983         if (rule->tr_jobids_str == NULL)
984                 return -ENOMEM;
985
986         memcpy(rule->tr_jobids_str,
987                start->u.tc_start.ts_jobids_str,
988                strlen(start->u.tc_start.ts_jobids_str));
989
990         INIT_LIST_HEAD(&rule->tr_jobids);
991         if (!list_empty(&start->u.tc_start.ts_jobids)) {
992                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
993                                               strlen(rule->tr_jobids_str),
994                                               &rule->tr_jobids);
995                 if (rc)
996                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
997         }
998         if (rc)
999                 OBD_FREE(rule->tr_jobids_str,
1000                          strlen(start->u.tc_start.ts_jobids_str) + 1);
1001         return rc;
1002 }
1003
1004 static int
1005 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1006 {
1007         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1008                    rule->tr_jobids_str, rule->tr_rpc_rate,
1009                    atomic_read(&rule->tr_ref) - 1);
1010         return 0;
1011 }
1012
1013 static int
1014 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1015                          struct nrs_tbf_client *cli)
1016 {
1017         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1018 }
1019
1020 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1021 {
1022         if (!list_empty(&rule->tr_jobids))
1023                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1024         LASSERT(rule->tr_jobids_str != NULL);
1025         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1026 }
1027
1028 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1029         .o_name = NRS_TBF_TYPE_JOBID,
1030         .o_startup = nrs_tbf_jobid_startup,
1031         .o_cli_find = nrs_tbf_jobid_cli_find,
1032         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1033         .o_cli_put = nrs_tbf_jobid_cli_put,
1034         .o_cli_init = nrs_tbf_jobid_cli_init,
1035         .o_rule_init = nrs_tbf_jobid_rule_init,
1036         .o_rule_dump = nrs_tbf_jobid_rule_dump,
1037         .o_rule_match = nrs_tbf_jobid_rule_match,
1038         .o_rule_fini = nrs_tbf_jobid_rule_fini,
1039 };
1040
1041 /**
1042  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1043  *
1044  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
1045  * nrs_tbf_client objects.
1046  */
1047 #define NRS_TBF_NID_BKT_BITS    8
1048 #define NRS_TBF_NID_BITS        16
1049
1050 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1051                                   unsigned mask)
1052 {
1053         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1054 }
1055
1056 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1057 {
1058         lnet_nid_t            *nid = (lnet_nid_t *)key;
1059         struct nrs_tbf_client *cli = hlist_entry(hnode,
1060                                                      struct nrs_tbf_client,
1061                                                      tc_hnode);
1062
1063         return *nid == cli->tc_nid;
1064 }
1065
1066 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1067 {
1068         struct nrs_tbf_client *cli = hlist_entry(hnode,
1069                                                      struct nrs_tbf_client,
1070                                                      tc_hnode);
1071
1072         return &cli->tc_nid;
1073 }
1074
1075 static void *nrs_tbf_nid_hop_object(struct hlist_node *hnode)
1076 {
1077         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
1078 }
1079
1080 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1081 {
1082         struct nrs_tbf_client *cli = hlist_entry(hnode,
1083                                                      struct nrs_tbf_client,
1084                                                      tc_hnode);
1085
1086         atomic_inc(&cli->tc_ref);
1087 }
1088
1089 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1090 {
1091         struct nrs_tbf_client *cli = hlist_entry(hnode,
1092                                                      struct nrs_tbf_client,
1093                                                      tc_hnode);
1094
1095         atomic_dec(&cli->tc_ref);
1096 }
1097
1098 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1099 {
1100         struct nrs_tbf_client *cli = hlist_entry(hnode,
1101                                                      struct nrs_tbf_client,
1102                                                      tc_hnode);
1103
1104         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1105                  "Busy TBF object from client with NID %s, with %d refs\n",
1106                  libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1107
1108         nrs_tbf_cli_fini(cli);
1109 }
1110
1111 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1112         .hs_hash        = nrs_tbf_nid_hop_hash,
1113         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1114         .hs_key         = nrs_tbf_nid_hop_key,
1115         .hs_object      = nrs_tbf_nid_hop_object,
1116         .hs_get         = nrs_tbf_nid_hop_get,
1117         .hs_put         = nrs_tbf_nid_hop_put,
1118         .hs_put_locked  = nrs_tbf_nid_hop_put,
1119         .hs_exit        = nrs_tbf_nid_hop_exit,
1120 };
1121
1122 static struct nrs_tbf_client *
1123 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1124                      struct ptlrpc_request *req)
1125 {
1126         return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1127 }
1128
1129 static struct nrs_tbf_client *
1130 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1131                         struct nrs_tbf_client *cli)
1132 {
1133         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1134                                        &cli->tc_hnode);
1135 }
1136
1137 static void
1138 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1139                       struct nrs_tbf_client *cli)
1140 {
1141         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1142 }
1143
1144 static int
1145 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1146                     struct nrs_tbf_head *head)
1147 {
1148         struct nrs_tbf_cmd      start;
1149         int rc;
1150
1151         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1152                                             NRS_TBF_NID_BITS,
1153                                             NRS_TBF_NID_BITS,
1154                                             NRS_TBF_NID_BKT_BITS, 0,
1155                                             CFS_HASH_MIN_THETA,
1156                                             CFS_HASH_MAX_THETA,
1157                                             &nrs_tbf_nid_hash_ops,
1158                                             CFS_HASH_RW_BKTLOCK);
1159         if (head->th_cli_hash == NULL)
1160                 return -ENOMEM;
1161
1162         memset(&start, 0, sizeof(start));
1163         start.u.tc_start.ts_nids_str = "*";
1164
1165         start.u.tc_start.ts_rpc_rate = tbf_rate;
1166         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1167         start.tc_name = NRS_TBF_DEFAULT_RULE;
1168         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1169         rc = nrs_tbf_rule_start(policy, head, &start);
1170         if (rc) {
1171                 cfs_hash_putref(head->th_cli_hash);
1172                 head->th_cli_hash = NULL;
1173         }
1174
1175         return rc;
1176 }
1177
1178 static void
1179 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1180                              struct ptlrpc_request *req)
1181 {
1182         cli->tc_nid = req->rq_peer.nid;
1183 }
1184
1185 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1186                                  struct nrs_tbf_rule *rule,
1187                                  struct nrs_tbf_cmd *start)
1188 {
1189         LASSERT(start->u.tc_start.ts_nids_str);
1190         OBD_ALLOC(rule->tr_nids_str,
1191                   strlen(start->u.tc_start.ts_nids_str) + 1);
1192         if (rule->tr_nids_str == NULL)
1193                 return -ENOMEM;
1194
1195         memcpy(rule->tr_nids_str,
1196                start->u.tc_start.ts_nids_str,
1197                strlen(start->u.tc_start.ts_nids_str));
1198
1199         INIT_LIST_HEAD(&rule->tr_nids);
1200         if (!list_empty(&start->u.tc_start.ts_nids)) {
1201                 if (cfs_parse_nidlist(rule->tr_nids_str,
1202                                       strlen(rule->tr_nids_str),
1203                                       &rule->tr_nids) <= 0) {
1204                         CERROR("nids {%s} illegal\n",
1205                                rule->tr_nids_str);
1206                         OBD_FREE(rule->tr_nids_str,
1207                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1208                         return -EINVAL;
1209                 }
1210         }
1211         return 0;
1212 }
1213
1214 static int
1215 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1216 {
1217         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1218                    rule->tr_nids_str, rule->tr_rpc_rate,
1219                    atomic_read(&rule->tr_ref) - 1);
1220         return 0;
1221 }
1222
1223 static int
1224 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1225                        struct nrs_tbf_client *cli)
1226 {
1227         return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1228 }
1229
1230 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1231 {
1232         if (!list_empty(&rule->tr_nids))
1233                 cfs_free_nidlist(&rule->tr_nids);
1234         LASSERT(rule->tr_nids_str != NULL);
1235         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1236 }
1237
1238 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1239 {
1240         if (!list_empty(&cmd->u.tc_start.ts_nids))
1241                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1242         if (cmd->u.tc_start.ts_nids_str)
1243                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1244                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1245 }
1246
1247 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1248 {
1249         struct cfs_lstr src;
1250         int rc;
1251
1252         src.ls_str = id;
1253         src.ls_len = strlen(id);
1254         rc = nrs_tbf_check_id_value(&src, "nid");
1255         if (rc)
1256                 return rc;
1257
1258         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1259         if (cmd->u.tc_start.ts_nids_str == NULL)
1260                 return -ENOMEM;
1261
1262         memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1263
1264         /* parse NID list */
1265         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1266                               strlen(cmd->u.tc_start.ts_nids_str),
1267                               &cmd->u.tc_start.ts_nids) <= 0) {
1268                 nrs_tbf_nid_cmd_fini(cmd);
1269                 return -EINVAL;
1270         }
1271
1272         return 0;
1273 }
1274
1275 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1276         .o_name = NRS_TBF_TYPE_NID,
1277         .o_startup = nrs_tbf_nid_startup,
1278         .o_cli_find = nrs_tbf_nid_cli_find,
1279         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1280         .o_cli_put = nrs_tbf_nid_cli_put,
1281         .o_cli_init = nrs_tbf_nid_cli_init,
1282         .o_rule_init = nrs_tbf_nid_rule_init,
1283         .o_rule_dump = nrs_tbf_nid_rule_dump,
1284         .o_rule_match = nrs_tbf_nid_rule_match,
1285         .o_rule_fini = nrs_tbf_nid_rule_fini,
1286 };
1287
1288 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1289                                  unsigned mask)
1290 {
1291         return cfs_hash_djb2_hash(key, strlen(key), mask);
1292 }
1293
1294 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1295 {
1296         struct nrs_tbf_client *cli = hlist_entry(hnode,
1297                                                  struct nrs_tbf_client,
1298                                                  tc_hnode);
1299
1300         return (strcmp(cli->tc_key, key) == 0);
1301 }
1302
1303 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1304 {
1305         struct nrs_tbf_client *cli = hlist_entry(hnode,
1306                                                  struct nrs_tbf_client,
1307                                                  tc_hnode);
1308         return cli->tc_key;
1309 }
1310
1311 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
1312 {
1313         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
1314 }
1315
1316 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1317 {
1318         struct nrs_tbf_client *cli = hlist_entry(hnode,
1319                                                  struct nrs_tbf_client,
1320                                                  tc_hnode);
1321
1322         atomic_inc(&cli->tc_ref);
1323 }
1324
1325 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1326 {
1327         struct nrs_tbf_client *cli = hlist_entry(hnode,
1328                                                  struct nrs_tbf_client,
1329                                                  tc_hnode);
1330
1331         atomic_dec(&cli->tc_ref);
1332 }
1333
1334 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1335
1336 {
1337         struct nrs_tbf_client *cli = hlist_entry(hnode,
1338                                                  struct nrs_tbf_client,
1339                                                  tc_hnode);
1340
1341         LASSERT(atomic_read(&cli->tc_ref) == 0);
1342         nrs_tbf_cli_fini(cli);
1343 }
1344
1345 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1346         .hs_hash        = nrs_tbf_hop_hash,
1347         .hs_keycmp      = nrs_tbf_hop_keycmp,
1348         .hs_key         = nrs_tbf_hop_key,
1349         .hs_object      = nrs_tbf_hop_object,
1350         .hs_get         = nrs_tbf_hop_get,
1351         .hs_put         = nrs_tbf_hop_put,
1352         .hs_put_locked  = nrs_tbf_hop_put,
1353         .hs_exit        = nrs_tbf_hop_exit,
1354 };
1355
1356 #define NRS_TBF_GENERIC_BKT_BITS        10
1357 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1358                                         CFS_HASH_NO_ITEMREF | \
1359                                         CFS_HASH_DEPTH)
1360
1361 static int
1362 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1363 {
1364         struct nrs_tbf_cmd       start;
1365         struct nrs_tbf_bucket   *bkt;
1366         int                      bits;
1367         int                      i;
1368         int                      rc;
1369         struct cfs_hash_bd       bd;
1370
1371         bits = nrs_tbf_jobid_hash_order();
1372         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1373                 bits = NRS_TBF_GENERIC_BKT_BITS;
1374         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1375                                             bits, bits,
1376                                             NRS_TBF_GENERIC_BKT_BITS,
1377                                             sizeof(*bkt), 0, 0,
1378                                             &nrs_tbf_hash_ops,
1379                                             NRS_TBF_GENERIC_HASH_FLAGS);
1380         if (head->th_cli_hash == NULL)
1381                 return -ENOMEM;
1382
1383         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1384                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1385                 INIT_LIST_HEAD(&bkt->ntb_lru);
1386         }
1387
1388         memset(&start, 0, sizeof(start));
1389         start.u.tc_start.ts_conds_str = "*";
1390
1391         start.u.tc_start.ts_rpc_rate = tbf_rate;
1392         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1393         start.tc_name = NRS_TBF_DEFAULT_RULE;
1394         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1395         rc = nrs_tbf_rule_start(policy, head, &start);
1396         if (rc)
1397                 cfs_hash_putref(head->th_cli_hash);
1398
1399         return rc;
1400 }
1401
1402 static struct nrs_tbf_client *
1403 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1404                         const char *key)
1405 {
1406         struct hlist_node *hnode;
1407         struct nrs_tbf_client *cli;
1408
1409         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1410         if (hnode == NULL)
1411                 return NULL;
1412
1413         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
1414         if (!list_empty(&cli->tc_lru))
1415                 list_del_init(&cli->tc_lru);
1416         return cli;
1417 }
1418
1419 static struct nrs_tbf_client *
1420 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1421 {
1422         struct nrs_tbf_client *cli;
1423         struct cfs_hash *hs = head->th_cli_hash;
1424         struct cfs_hash_bd bd;
1425         char keystr[NRS_TBF_KEY_LEN] = { '\0' };
1426         const char *jobid;
1427         __u32 opc;
1428
1429         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1430         if (jobid == NULL)
1431                 jobid = NRS_TBF_JOBID_NULL;
1432         opc = lustre_msg_get_opc(req->rq_reqmsg);
1433         snprintf(keystr, sizeof(keystr), "%s_%s_%d", jobid,
1434                  libcfs_nid2str(req->rq_peer.nid), opc);
1435         LASSERT(strlen(keystr) < NRS_TBF_KEY_LEN);
1436         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1437         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1438         cfs_hash_bd_unlock(hs, &bd, 1);
1439
1440         return cli;
1441 }
1442
1443 static struct nrs_tbf_client *
1444 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1445                     struct nrs_tbf_client *cli)
1446 {
1447         const char              *key;
1448         struct nrs_tbf_client   *ret;
1449         struct cfs_hash         *hs = head->th_cli_hash;
1450         struct cfs_hash_bd       bd;
1451
1452         key = cli->tc_key;
1453         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1454         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1455         if (ret == NULL) {
1456                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1457                 ret = cli;
1458         }
1459         cfs_hash_bd_unlock(hs, &bd, 1);
1460
1461         return ret;
1462 }
1463
1464 static void
1465 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1466 {
1467         struct cfs_hash_bd       bd;
1468         struct cfs_hash         *hs = head->th_cli_hash;
1469         struct nrs_tbf_bucket   *bkt;
1470         int                      hw;
1471         struct list_head         zombies;
1472
1473         INIT_LIST_HEAD(&zombies);
1474         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1475         bkt = cfs_hash_bd_extra_get(hs, &bd);
1476         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1477                 return;
1478         LASSERT(list_empty(&cli->tc_lru));
1479         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1480
1481         /**
1482          * Check and purge the LRU, there is at least one client in the LRU.
1483          */
1484         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1485         while (cfs_hash_bd_count_get(&bd) > hw) {
1486                 if (unlikely(list_empty(&bkt->ntb_lru)))
1487                         break;
1488                 cli = list_entry(bkt->ntb_lru.next,
1489                                  struct nrs_tbf_client,
1490                                  tc_lru);
1491                 LASSERT(atomic_read(&cli->tc_ref) == 0);
1492                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1493                 list_move(&cli->tc_lru, &zombies);
1494         }
1495         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1496
1497         while (!list_empty(&zombies)) {
1498                 cli = container_of0(zombies.next,
1499                                     struct nrs_tbf_client, tc_lru);
1500                 list_del_init(&cli->tc_lru);
1501                 nrs_tbf_cli_fini(cli);
1502         }
1503 }
1504
1505 static void
1506 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1507                          struct ptlrpc_request *req)
1508 {
1509         char keystr[NRS_TBF_KEY_LEN];
1510         const char *jobid;
1511         __u32 opc;
1512
1513         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1514         if (jobid == NULL)
1515                 jobid = NRS_TBF_JOBID_NULL;
1516         opc = lustre_msg_get_opc(req->rq_reqmsg);
1517         snprintf(keystr, sizeof(keystr), "%s_%s_%d", jobid,
1518                  libcfs_nid2str(req->rq_peer.nid), opc);
1519
1520         LASSERT(strlen(keystr) < NRS_TBF_KEY_LEN);
1521         INIT_LIST_HEAD(&cli->tc_lru);
1522         memcpy(cli->tc_key, keystr, strlen(keystr));
1523         memcpy(cli->tc_jobid, jobid, strlen(jobid));
1524         cli->tc_nid = req->rq_peer.nid;
1525         cli->tc_opcode = opc;
1526 }
1527
1528 static void
1529 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1530 {
1531         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1532                 expr->te_field < NRS_TBF_FIELD_MAX);
1533         switch (expr->te_field) {
1534         case NRS_TBF_FIELD_NID:
1535                 cfs_free_nidlist(&expr->te_cond);
1536                 break;
1537         case NRS_TBF_FIELD_JOBID:
1538                 nrs_tbf_jobid_list_free(&expr->te_cond);
1539                 break;
1540         case NRS_TBF_FIELD_OPCODE:
1541                 CFS_FREE_BITMAP(expr->te_opcodes);
1542                 break;
1543         default:
1544                 LBUG();
1545         }
1546         OBD_FREE_PTR(expr);
1547 }
1548
1549 static void
1550 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1551 {
1552         struct nrs_tbf_expression *expression;
1553         struct nrs_tbf_expression *n;
1554
1555         LASSERT(list_empty(&conjunction->tc_linkage));
1556         list_for_each_entry_safe(expression, n,
1557                                  &conjunction->tc_expressions,
1558                                  te_linkage) {
1559                 list_del_init(&expression->te_linkage);
1560                 nrs_tbf_expression_free(expression);
1561         }
1562         OBD_FREE_PTR(conjunction);
1563 }
1564
1565 static void
1566 nrs_tbf_conds_free(struct list_head *cond_list)
1567 {
1568         struct nrs_tbf_conjunction *conjunction;
1569         struct nrs_tbf_conjunction *n;
1570
1571         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1572                 list_del_init(&conjunction->tc_linkage);
1573                 nrs_tbf_conjunction_free(conjunction);
1574         }
1575 }
1576
1577 static void
1578 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1579 {
1580         if (!list_empty(&cmd->u.tc_start.ts_conds))
1581                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1582         if (cmd->u.tc_start.ts_conds_str)
1583                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1584                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1585 }
1586
1587 #define NRS_TBF_DISJUNCTION_DELIM       (',')
1588 #define NRS_TBF_CONJUNCTION_DELIM       ('&')
1589 #define NRS_TBF_EXPRESSION_DELIM        ('=')
1590
1591 static inline bool
1592 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1593 {
1594         int len = strlen(str);
1595
1596         return (field->ls_len == len &&
1597                 strncmp(field->ls_str, str, len) == 0);
1598 }
1599
1600 static int
1601 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1602
1603 static int
1604 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1605 {
1606         struct nrs_tbf_expression *expr;
1607         struct cfs_lstr field;
1608         int rc = 0;
1609
1610         OBD_ALLOC(expr, sizeof(struct nrs_tbf_expression));
1611         if (expr == NULL)
1612                 return -ENOMEM;
1613
1614         rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1615         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1616             src->ls_str[src->ls_len - 1] != '}')
1617                 GOTO(out, rc = -EINVAL);
1618
1619         /* Skip '{' and '}' */
1620         src->ls_str++;
1621         src->ls_len -= 2;
1622
1623         if (nrs_tbf_check_field(&field, "nid")) {
1624                 if (cfs_parse_nidlist(src->ls_str,
1625                                       src->ls_len,
1626                                       &expr->te_cond) <= 0)
1627                         GOTO(out, rc = -EINVAL);
1628                 expr->te_field = NRS_TBF_FIELD_NID;
1629         } else if (nrs_tbf_check_field(&field, "jobid")) {
1630                 if (nrs_tbf_jobid_list_parse(src->ls_str,
1631                                              src->ls_len,
1632                                              &expr->te_cond) < 0)
1633                         GOTO(out, rc = -EINVAL);
1634                 expr->te_field = NRS_TBF_FIELD_JOBID;
1635         } else if (nrs_tbf_check_field(&field, "opcode")) {
1636                 if (nrs_tbf_opcode_list_parse(src->ls_str,
1637                                               src->ls_len,
1638                                               &expr->te_opcodes) < 0)
1639                         GOTO(out, rc = -EINVAL);
1640                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1641         } else
1642                 GOTO(out, rc = -EINVAL);
1643
1644         list_add_tail(&expr->te_linkage, cond_list);
1645         return 0;
1646 out:
1647         OBD_FREE_PTR(expr);
1648         return rc;
1649 }
1650
1651 static int
1652 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1653 {
1654         struct nrs_tbf_conjunction *conjunction;
1655         struct cfs_lstr expr;
1656         int rc = 0;
1657
1658         OBD_ALLOC(conjunction, sizeof(struct nrs_tbf_conjunction));
1659         if (conjunction == NULL)
1660                 return -ENOMEM;
1661
1662         INIT_LIST_HEAD(&conjunction->tc_expressions);
1663         list_add_tail(&conjunction->tc_linkage, cond_list);
1664
1665         while (src->ls_str) {
1666                 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1667                 if (rc == 0) {
1668                         rc = -EINVAL;
1669                         break;
1670                 }
1671                 rc = nrs_tbf_expression_parse(&expr,
1672                                               &conjunction->tc_expressions);
1673                 if (rc)
1674                         break;
1675         }
1676         return rc;
1677 }
1678
1679 static int
1680 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1681 {
1682         struct cfs_lstr src;
1683         struct cfs_lstr res;
1684         int rc = 0;
1685
1686         src.ls_str = str;
1687         src.ls_len = len;
1688         INIT_LIST_HEAD(cond_list);
1689         while (src.ls_str) {
1690                 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1691                 if (rc == 0) {
1692                         rc = -EINVAL;
1693                         break;
1694                 }
1695                 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1696                 if (rc)
1697                         break;
1698         }
1699         return rc;
1700 }
1701
1702 static int
1703 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1704 {
1705         int rc;
1706
1707         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1708         if (cmd->u.tc_start.ts_conds_str == NULL)
1709                 return -ENOMEM;
1710
1711         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1712
1713         /* Parse hybird NID and JOBID conditions */
1714         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1715                                  strlen(cmd->u.tc_start.ts_conds_str),
1716                                  &cmd->u.tc_start.ts_conds);
1717         if (rc)
1718                 nrs_tbf_generic_cmd_fini(cmd);
1719
1720         return rc;
1721 }
1722
1723 static int
1724 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1725                          struct nrs_tbf_rule *rule,
1726                          struct nrs_tbf_client *cli)
1727 {
1728         switch (expr->te_field) {
1729         case NRS_TBF_FIELD_NID:
1730                 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1731         case NRS_TBF_FIELD_JOBID:
1732                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1733         case NRS_TBF_FIELD_OPCODE:
1734                 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1735         default:
1736                 return 0;
1737         }
1738 }
1739
1740 static int
1741 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1742                           struct nrs_tbf_rule *rule,
1743                           struct nrs_tbf_client *cli)
1744 {
1745         struct nrs_tbf_expression *expr;
1746         int matched;
1747
1748         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
1749                 matched = nrs_tbf_expression_match(expr, rule, cli);
1750                 if (!matched)
1751                         return 0;
1752         }
1753
1754         return 1;
1755 }
1756
1757 static int
1758 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
1759 {
1760         struct nrs_tbf_conjunction *conjunction;
1761         int matched;
1762
1763         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
1764                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
1765                 if (matched)
1766                         return 1;
1767         }
1768
1769         return 0;
1770 }
1771
1772 static void
1773 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
1774 {
1775         if (!list_empty(&rule->tr_conds))
1776                 nrs_tbf_conds_free(&rule->tr_conds);
1777         LASSERT(rule->tr_conds_str != NULL);
1778         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
1779 }
1780
1781 static int
1782 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
1783                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
1784 {
1785         int rc = 0;
1786
1787         LASSERT(start->u.tc_start.ts_conds_str);
1788         OBD_ALLOC(rule->tr_conds_str,
1789                   strlen(start->u.tc_start.ts_conds_str) + 1);
1790         if (rule->tr_conds_str == NULL)
1791                 return -ENOMEM;
1792
1793         memcpy(rule->tr_conds_str,
1794                start->u.tc_start.ts_conds_str,
1795                strlen(start->u.tc_start.ts_conds_str));
1796
1797         INIT_LIST_HEAD(&rule->tr_conds);
1798         if (!list_empty(&start->u.tc_start.ts_conds)) {
1799                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
1800                                          strlen(rule->tr_conds_str),
1801                                          &rule->tr_conds);
1802         }
1803         if (rc)
1804                 nrs_tbf_generic_rule_fini(rule);
1805
1806         return rc;
1807 }
1808
1809 static int
1810 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1811 {
1812         seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
1813                    rule->tr_conds_str, rule->tr_rpc_rate,
1814                    atomic_read(&rule->tr_ref) - 1);
1815         return 0;
1816 }
1817
1818 static int
1819 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
1820                            struct nrs_tbf_client *cli)
1821 {
1822         return nrs_tbf_cond_match(rule, cli);
1823 }
1824
1825 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
1826         .o_name = NRS_TBF_TYPE_GENERIC,
1827         .o_startup = nrs_tbf_startup,
1828         .o_cli_find = nrs_tbf_cli_find,
1829         .o_cli_findadd = nrs_tbf_cli_findadd,
1830         .o_cli_put = nrs_tbf_cli_put,
1831         .o_cli_init = nrs_tbf_generic_cli_init,
1832         .o_rule_init = nrs_tbf_rule_init,
1833         .o_rule_dump = nrs_tbf_generic_rule_dump,
1834         .o_rule_match = nrs_tbf_generic_rule_match,
1835         .o_rule_fini = nrs_tbf_generic_rule_fini,
1836 };
1837
1838 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
1839 {
1840         if (rule->tr_opcodes != NULL)
1841                 CFS_FREE_BITMAP(rule->tr_opcodes);
1842
1843         LASSERT(rule->tr_opcodes_str != NULL);
1844         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
1845 }
1846
1847 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
1848                                         unsigned mask)
1849 {
1850         return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
1851 }
1852
1853 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
1854 {
1855         const __u32     *opc = key;
1856         struct nrs_tbf_client *cli = hlist_entry(hnode,
1857                                                  struct nrs_tbf_client,
1858                                                  tc_hnode);
1859
1860         return *opc == cli->tc_opcode;
1861 }
1862
1863 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
1864 {
1865         struct nrs_tbf_client *cli = hlist_entry(hnode,
1866                                                  struct nrs_tbf_client,
1867                                                  tc_hnode);
1868
1869         return &cli->tc_opcode;
1870 }
1871
1872 static void *nrs_tbf_opcode_hop_object(struct hlist_node *hnode)
1873 {
1874         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
1875 }
1876
1877 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
1878                                    struct hlist_node *hnode)
1879 {
1880         struct nrs_tbf_client *cli = hlist_entry(hnode,
1881                                                  struct nrs_tbf_client,
1882                                                  tc_hnode);
1883
1884         atomic_inc(&cli->tc_ref);
1885 }
1886
1887 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
1888                                    struct hlist_node *hnode)
1889 {
1890         struct nrs_tbf_client *cli = hlist_entry(hnode,
1891                                                  struct nrs_tbf_client,
1892                                                  tc_hnode);
1893
1894         atomic_dec(&cli->tc_ref);
1895 }
1896
1897 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
1898                                     struct hlist_node *hnode)
1899 {
1900         struct nrs_tbf_client *cli = hlist_entry(hnode,
1901                                                  struct nrs_tbf_client,
1902                                                  tc_hnode);
1903
1904         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1905                  "Busy TBF object from client with opcode %s, with %d refs\n",
1906                  ll_opcode2str(cli->tc_opcode),
1907                  atomic_read(&cli->tc_ref));
1908
1909         nrs_tbf_cli_fini(cli);
1910 }
1911 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
1912         .hs_hash        = nrs_tbf_opcode_hop_hash,
1913         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
1914         .hs_key         = nrs_tbf_opcode_hop_key,
1915         .hs_object      = nrs_tbf_opcode_hop_object,
1916         .hs_get         = nrs_tbf_opcode_hop_get,
1917         .hs_put         = nrs_tbf_opcode_hop_put,
1918         .hs_put_locked  = nrs_tbf_opcode_hop_put,
1919         .hs_exit        = nrs_tbf_opcode_hop_exit,
1920 };
1921
1922 static int
1923 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
1924                     struct nrs_tbf_head *head)
1925 {
1926         struct nrs_tbf_cmd      start = { 0 };
1927         int rc;
1928
1929         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1930                                             NRS_TBF_NID_BITS,
1931                                             NRS_TBF_NID_BITS,
1932                                             NRS_TBF_NID_BKT_BITS, 0,
1933                                             CFS_HASH_MIN_THETA,
1934                                             CFS_HASH_MAX_THETA,
1935                                             &nrs_tbf_opcode_hash_ops,
1936                                             CFS_HASH_RW_BKTLOCK);
1937         if (head->th_cli_hash == NULL)
1938                 return -ENOMEM;
1939
1940         start.u.tc_start.ts_opcodes = NULL;
1941         start.u.tc_start.ts_opcodes_str = "*";
1942
1943         start.u.tc_start.ts_rpc_rate = tbf_rate;
1944         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1945         start.tc_name = NRS_TBF_DEFAULT_RULE;
1946         rc = nrs_tbf_rule_start(policy, head, &start);
1947
1948         return rc;
1949 }
1950
1951 static struct nrs_tbf_client *
1952 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
1953                         struct ptlrpc_request *req)
1954 {
1955         __u32 opc;
1956
1957         opc = lustre_msg_get_opc(req->rq_reqmsg);
1958         return cfs_hash_lookup(head->th_cli_hash, &opc);
1959 }
1960
1961 static struct nrs_tbf_client *
1962 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
1963                            struct nrs_tbf_client *cli)
1964 {
1965         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
1966                                        &cli->tc_hnode);
1967 }
1968
1969 static void
1970 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
1971                         struct ptlrpc_request *req)
1972 {
1973         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
1974 }
1975
1976 #define MAX_OPCODE_LEN  32
1977 static int
1978 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
1979 {
1980         int     op = 0;
1981         char    opcode_str[MAX_OPCODE_LEN];
1982
1983         if (id->ls_len + 1 > MAX_OPCODE_LEN)
1984                 return -EINVAL;
1985
1986         memcpy(opcode_str, id->ls_str, id->ls_len);
1987         opcode_str[id->ls_len] = '\0';
1988
1989         op = ll_str2opcode(opcode_str);
1990         if (op < 0)
1991                 return -EINVAL;
1992
1993         cfs_bitmap_set(opcodes, op);
1994         return 0;
1995 }
1996
1997 static int
1998 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
1999 {
2000         struct cfs_bitmap *opcodes;
2001         struct cfs_lstr src;
2002         struct cfs_lstr res;
2003         int rc = 0;
2004         ENTRY;
2005
2006         opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
2007         if (opcodes == NULL)
2008                 return -ENOMEM;
2009
2010         src.ls_str = str;
2011         src.ls_len = len;
2012         while (src.ls_str) {
2013                 rc = cfs_gettok(&src, ' ', &res);
2014                 if (rc == 0) {
2015                         rc = -EINVAL;
2016                         break;
2017                 }
2018                 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2019                 if (rc)
2020                         break;
2021         }
2022
2023         if (rc == 0)
2024                 *bitmaptr = opcodes;
2025         else
2026                 CFS_FREE_BITMAP(opcodes);
2027
2028         RETURN(rc);
2029 }
2030
2031 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2032 {
2033         if (cmd->u.tc_start.ts_opcodes)
2034                 CFS_FREE_BITMAP(cmd->u.tc_start.ts_opcodes);
2035
2036         if (cmd->u.tc_start.ts_opcodes_str)
2037                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2038                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2039
2040 }
2041
2042 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2043 {
2044         struct cfs_lstr src;
2045         int rc;
2046
2047         src.ls_str = id;
2048         src.ls_len = strlen(id);
2049         rc = nrs_tbf_check_id_value(&src, "opcode");
2050         if (rc)
2051                 return rc;
2052
2053         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2054         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2055                 return -ENOMEM;
2056
2057         memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2058
2059         /* parse opcode list */
2060         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2061                                        strlen(cmd->u.tc_start.ts_opcodes_str),
2062                                        &cmd->u.tc_start.ts_opcodes);
2063         if (rc)
2064                 nrs_tbf_opcode_cmd_fini(cmd);
2065
2066         return rc;
2067 }
2068
2069 static int
2070 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2071                           struct nrs_tbf_client *cli)
2072 {
2073         if (rule->tr_opcodes == NULL)
2074                 return 0;
2075
2076         return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2077 }
2078
2079 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2080                                     struct nrs_tbf_rule *rule,
2081                                     struct nrs_tbf_cmd *start)
2082 {
2083         int rc = 0;
2084
2085         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2086         OBD_ALLOC(rule->tr_opcodes_str,
2087                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2088         if (rule->tr_opcodes_str == NULL)
2089                 return -ENOMEM;
2090
2091         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2092                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2093
2094         /* Default rule '*' */
2095         if (start->u.tc_start.ts_opcodes == NULL)
2096                 return 0;
2097
2098         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2099                                        strlen(rule->tr_opcodes_str),
2100                                        &rule->tr_opcodes);
2101         if (rc)
2102                 OBD_FREE(rule->tr_opcodes_str,
2103                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2104
2105         return rc;
2106 }
2107
2108 static int
2109 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2110 {
2111         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2112                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2113                    atomic_read(&rule->tr_ref) - 1);
2114         return 0;
2115 }
2116
2117
2118 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2119         .o_name = NRS_TBF_TYPE_OPCODE,
2120         .o_startup = nrs_tbf_opcode_startup,
2121         .o_cli_find = nrs_tbf_opcode_cli_find,
2122         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2123         .o_cli_put = nrs_tbf_nid_cli_put,
2124         .o_cli_init = nrs_tbf_opcode_cli_init,
2125         .o_rule_init = nrs_tbf_opcode_rule_init,
2126         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2127         .o_rule_match = nrs_tbf_opcode_rule_match,
2128         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2129 };
2130
2131 static struct nrs_tbf_type nrs_tbf_types[] = {
2132         {
2133                 .ntt_name = NRS_TBF_TYPE_JOBID,
2134                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2135                 .ntt_ops = &nrs_tbf_jobid_ops,
2136         },
2137         {
2138                 .ntt_name = NRS_TBF_TYPE_NID,
2139                 .ntt_flag = NRS_TBF_FLAG_NID,
2140                 .ntt_ops = &nrs_tbf_nid_ops,
2141         },
2142         {
2143                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2144                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2145                 .ntt_ops = &nrs_tbf_opcode_ops,
2146         },
2147         {
2148                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2149                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2150                 .ntt_ops = &nrs_tbf_generic_ops,
2151         },
2152 };
2153
2154 /**
2155  * Is called before the policy transitions into
2156  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2157  * policy-specific private data structure.
2158  *
2159  * \param[in] policy The policy to start
2160  *
2161  * \retval -ENOMEM OOM error
2162  * \retval  0      success
2163  *
2164  * \see nrs_policy_register()
2165  * \see nrs_policy_ctl()
2166  */
2167 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2168 {
2169         struct nrs_tbf_head     *head;
2170         struct nrs_tbf_ops      *ops;
2171         __u32                    type;
2172         char                    *name;
2173         int found = 0;
2174         int i;
2175         int rc = 0;
2176
2177         if (arg == NULL)
2178                 name = NRS_TBF_TYPE_GENERIC;
2179         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2180                 name = arg;
2181         else
2182                 GOTO(out, rc = -EINVAL);
2183
2184         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2185                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2186                         ops = nrs_tbf_types[i].ntt_ops;
2187                         type = nrs_tbf_types[i].ntt_flag;
2188                         found = 1;
2189                         break;
2190                 }
2191         }
2192         if (found == 0)
2193                 GOTO(out, rc = -ENOTSUPP);
2194
2195         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2196         if (head == NULL)
2197                 GOTO(out, rc = -ENOMEM);
2198
2199         memcpy(head->th_type, name, strlen(name));
2200         head->th_type[strlen(name)] = '\0';
2201         head->th_ops = ops;
2202         head->th_type_flag = type;
2203
2204         head->th_binheap = cfs_binheap_create(&nrs_tbf_heap_ops,
2205                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2206                                               nrs_pol2cptab(policy),
2207                                               nrs_pol2cptid(policy));
2208         if (head->th_binheap == NULL)
2209                 GOTO(out_free_head, rc = -ENOMEM);
2210
2211         atomic_set(&head->th_rule_sequence, 0);
2212         spin_lock_init(&head->th_rule_lock);
2213         INIT_LIST_HEAD(&head->th_list);
2214         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2215         head->th_timer.function = nrs_tbf_timer_cb;
2216         rc = head->th_ops->o_startup(policy, head);
2217         if (rc)
2218                 GOTO(out_free_heap, rc);
2219
2220         policy->pol_private = head;
2221         return 0;
2222 out_free_heap:
2223         cfs_binheap_destroy(head->th_binheap);
2224 out_free_head:
2225         OBD_FREE_PTR(head);
2226 out:
2227         return rc;
2228 }
2229
2230 /**
2231  * Is called before the policy transitions into
2232  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2233  * private data structure.
2234  *
2235  * \param[in] policy The policy to stop
2236  *
2237  * \see nrs_policy_stop0()
2238  */
2239 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2240 {
2241         struct nrs_tbf_head *head = policy->pol_private;
2242         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2243         struct nrs_tbf_rule *rule, *n;
2244
2245         LASSERT(head != NULL);
2246         LASSERT(head->th_cli_hash != NULL);
2247         hrtimer_cancel(&head->th_timer);
2248         /* Should cleanup hash first before free rules */
2249         cfs_hash_putref(head->th_cli_hash);
2250         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2251                 list_del_init(&rule->tr_linkage);
2252                 nrs_tbf_rule_put(rule);
2253         }
2254         LASSERT(list_empty(&head->th_list));
2255         LASSERT(head->th_binheap != NULL);
2256         LASSERT(cfs_binheap_is_empty(head->th_binheap));
2257         cfs_binheap_destroy(head->th_binheap);
2258         OBD_FREE_PTR(head);
2259         nrs->nrs_throttling = 0;
2260         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2261 }
2262
2263 /**
2264  * Performs a policy-specific ctl function on TBF policy instances; similar
2265  * to ioctl.
2266  *
2267  * \param[in]     policy the policy instance
2268  * \param[in]     opc    the opcode
2269  * \param[in,out] arg    used for passing parameters and information
2270  *
2271  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2272  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2273  *
2274  * \retval 0   operation carried out successfully
2275  * \retval -ve error
2276  */
2277 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2278                        enum ptlrpc_nrs_ctl opc,
2279                        void *arg)
2280 {
2281         int rc = 0;
2282         ENTRY;
2283
2284         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2285
2286         switch ((enum nrs_ctl_tbf)opc) {
2287         default:
2288                 RETURN(-EINVAL);
2289
2290         /**
2291          * Read RPC rate size of a policy instance.
2292          */
2293         case NRS_CTL_TBF_RD_RULE: {
2294                 struct nrs_tbf_head *head = policy->pol_private;
2295                 struct seq_file *m = (struct seq_file *) arg;
2296                 struct ptlrpc_service_part *svcpt;
2297
2298                 svcpt = policy->pol_nrs->nrs_svcpt;
2299                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2300
2301                 rc = nrs_tbf_rule_dump_all(head, m);
2302                 }
2303                 break;
2304
2305         /**
2306          * Write RPC rate of a policy instance.
2307          */
2308         case NRS_CTL_TBF_WR_RULE: {
2309                 struct nrs_tbf_head *head = policy->pol_private;
2310                 struct nrs_tbf_cmd *cmd;
2311
2312                 cmd = (struct nrs_tbf_cmd *)arg;
2313                 rc = nrs_tbf_command(policy,
2314                                      head,
2315                                      cmd);
2316                 }
2317                 break;
2318         /**
2319          * Read the TBF policy type of a policy instance.
2320          */
2321         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2322                 struct nrs_tbf_head *head = policy->pol_private;
2323
2324                 *(__u32 *)arg = head->th_type_flag;
2325                 }
2326                 break;
2327         }
2328
2329         RETURN(rc);
2330 }
2331
2332 /**
2333  * Is called for obtaining a TBF policy resource.
2334  *
2335  * \param[in]  policy     The policy on which the request is being asked for
2336  * \param[in]  nrq        The request for which resources are being taken
2337  * \param[in]  parent     Parent resource, unused in this policy
2338  * \param[out] resp       Resources references are placed in this array
2339  * \param[in]  moving_req Signifies limited caller context; unused in this
2340  *                        policy
2341  *
2342  *
2343  * \see nrs_resource_get_safe()
2344  */
2345 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2346                            struct ptlrpc_nrs_request *nrq,
2347                            const struct ptlrpc_nrs_resource *parent,
2348                            struct ptlrpc_nrs_resource **resp,
2349                            bool moving_req)
2350 {
2351         struct nrs_tbf_head   *head;
2352         struct nrs_tbf_client *cli;
2353         struct nrs_tbf_client *tmp;
2354         struct ptlrpc_request *req;
2355
2356         if (parent == NULL) {
2357                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2358                 return 0;
2359         }
2360
2361         head = container_of(parent, struct nrs_tbf_head, th_res);
2362         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2363         cli = head->th_ops->o_cli_find(head, req);
2364         if (cli != NULL) {
2365                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2366                 LASSERT(cli->tc_rule);
2367                 if (cli->tc_rule_sequence !=
2368                     atomic_read(&head->th_rule_sequence) ||
2369                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2370                         struct nrs_tbf_rule *rule;
2371
2372                         CDEBUG(D_RPCTRACE,
2373                                "TBF class@%p rate %llu sequence %d, "
2374                                "rule flags %d, head sequence %d\n",
2375                                cli, cli->tc_rpc_rate,
2376                                cli->tc_rule_sequence,
2377                                cli->tc_rule->tr_flags,
2378                                atomic_read(&head->th_rule_sequence));
2379                         rule = nrs_tbf_rule_match(head, cli);
2380                         if (rule != cli->tc_rule) {
2381                                 nrs_tbf_cli_reset(head, rule, cli);
2382                         } else {
2383                                 if (cli->tc_rule_generation != rule->tr_generation)
2384                                         nrs_tbf_cli_reset_value(head, cli);
2385                                 nrs_tbf_rule_put(rule);
2386                         }
2387                 } else if (cli->tc_rule_generation !=
2388                            cli->tc_rule->tr_generation) {
2389                         nrs_tbf_cli_reset_value(head, cli);
2390                 }
2391                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2392                 goto out;
2393         }
2394
2395         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2396                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2397         if (cli == NULL)
2398                 return -ENOMEM;
2399
2400         nrs_tbf_cli_init(head, cli, req);
2401         tmp = head->th_ops->o_cli_findadd(head, cli);
2402         if (tmp != cli) {
2403                 atomic_dec(&cli->tc_ref);
2404                 nrs_tbf_cli_fini(cli);
2405                 cli = tmp;
2406         }
2407 out:
2408         *resp = &cli->tc_res;
2409
2410         return 1;
2411 }
2412
2413 /**
2414  * Called when releasing references to the resource hierachy obtained for a
2415  * request for scheduling using the TBF policy.
2416  *
2417  * \param[in] policy   the policy the resource belongs to
2418  * \param[in] res      the resource to be released
2419  */
2420 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
2421                             const struct ptlrpc_nrs_resource *res)
2422 {
2423         struct nrs_tbf_head   *head;
2424         struct nrs_tbf_client *cli;
2425
2426         /**
2427          * Do nothing for freeing parent, nrs_tbf_net resources
2428          */
2429         if (res->res_parent == NULL)
2430                 return;
2431
2432         cli = container_of(res, struct nrs_tbf_client, tc_res);
2433         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
2434
2435         head->th_ops->o_cli_put(head, cli);
2436 }
2437
2438 /**
2439  * Called when getting a request from the TBF policy for handling, or just
2440  * peeking; removes the request from the policy when it is to be handled.
2441  *
2442  * \param[in] policy The policy
2443  * \param[in] peek   When set, signifies that we just want to examine the
2444  *                   request, and not handle it, so the request is not removed
2445  *                   from the policy.
2446  * \param[in] force  Force the policy to return a request; unused in this
2447  *                   policy
2448  *
2449  * \retval The request to be handled; this is the next request in the TBF
2450  *         rule
2451  *
2452  * \see ptlrpc_nrs_req_get_nolock()
2453  * \see nrs_request_get()
2454  */
2455 static
2456 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
2457                                            bool peek, bool force)
2458 {
2459         struct nrs_tbf_head       *head = policy->pol_private;
2460         struct ptlrpc_nrs_request *nrq = NULL;
2461         struct nrs_tbf_client     *cli;
2462         struct cfs_binheap_node   *node;
2463
2464         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2465
2466         if (!peek && policy->pol_nrs->nrs_throttling)
2467                 return NULL;
2468
2469         node = cfs_binheap_root(head->th_binheap);
2470         if (unlikely(node == NULL))
2471                 return NULL;
2472
2473         cli = container_of(node, struct nrs_tbf_client, tc_node);
2474         LASSERT(cli->tc_in_heap);
2475         if (peek) {
2476                 nrq = list_entry(cli->tc_list.next,
2477                                      struct ptlrpc_nrs_request,
2478                                      nr_u.tbf.tr_list);
2479         } else {
2480                 __u64 now = ktime_to_ns(ktime_get());
2481                 __u64 passed;
2482                 __u64 ntoken;
2483                 __u64 deadline;
2484
2485                 deadline = cli->tc_check_time +
2486                           cli->tc_nsecs;
2487                 LASSERT(now >= cli->tc_check_time);
2488                 passed = now - cli->tc_check_time;
2489                 ntoken = passed * cli->tc_rpc_rate;
2490                 do_div(ntoken, NSEC_PER_SEC);
2491                 ntoken += cli->tc_ntoken;
2492                 if (ntoken > cli->tc_depth)
2493                         ntoken = cli->tc_depth;
2494                 if (ntoken > 0) {
2495                         struct ptlrpc_request *req;
2496                         nrq = list_entry(cli->tc_list.next,
2497                                              struct ptlrpc_nrs_request,
2498                                              nr_u.tbf.tr_list);
2499                         req = container_of(nrq,
2500                                            struct ptlrpc_request,
2501                                            rq_nrq);
2502                         ntoken--;
2503                         cli->tc_ntoken = ntoken;
2504                         cli->tc_check_time = now;
2505                         list_del_init(&nrq->nr_u.tbf.tr_list);
2506                         if (list_empty(&cli->tc_list)) {
2507                                 cfs_binheap_remove(head->th_binheap,
2508                                                    &cli->tc_node);
2509                                 cli->tc_in_heap = false;
2510                         } else {
2511                                 cfs_binheap_relocate(head->th_binheap,
2512                                                      &cli->tc_node);
2513                         }
2514                         CDEBUG(D_RPCTRACE,
2515                                "TBF dequeues: class@%p rate %llu gen %llu "
2516                                "token %llu, rule@%p rate %llu gen %llu\n",
2517                                cli, cli->tc_rpc_rate,
2518                                cli->tc_rule_generation, cli->tc_ntoken,
2519                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
2520                                cli->tc_rule->tr_generation);
2521                 } else {
2522                         ktime_t time;
2523
2524                         policy->pol_nrs->nrs_throttling = 1;
2525                         head->th_deadline = deadline;
2526                         time = ktime_set(0, 0);
2527                         time = ktime_add_ns(time, deadline);
2528                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
2529                 }
2530         }
2531
2532         return nrq;
2533 }
2534
2535 /**
2536  * Adds request \a nrq to \a policy's list of queued requests
2537  *
2538  * \param[in] policy The policy
2539  * \param[in] nrq    The request to add
2540  *
2541  * \retval 0 success; nrs_request_enqueue() assumes this function will always
2542  *                    succeed
2543  */
2544 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
2545                            struct ptlrpc_nrs_request *nrq)
2546 {
2547         struct nrs_tbf_head   *head;
2548         struct nrs_tbf_client *cli;
2549         int                    rc = 0;
2550
2551         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2552
2553         cli = container_of(nrs_request_resource(nrq),
2554                            struct nrs_tbf_client, tc_res);
2555         head = container_of(nrs_request_resource(nrq)->res_parent,
2556                             struct nrs_tbf_head, th_res);
2557         if (list_empty(&cli->tc_list)) {
2558                 LASSERT(!cli->tc_in_heap);
2559                 rc = cfs_binheap_insert(head->th_binheap, &cli->tc_node);
2560                 if (rc == 0) {
2561                         cli->tc_in_heap = true;
2562                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
2563                         list_add_tail(&nrq->nr_u.tbf.tr_list,
2564                                           &cli->tc_list);
2565                         if (policy->pol_nrs->nrs_throttling) {
2566                                 __u64 deadline = cli->tc_check_time +
2567                                                  cli->tc_nsecs;
2568                                 if ((head->th_deadline > deadline) &&
2569                                     (hrtimer_try_to_cancel(&head->th_timer)
2570                                      >= 0)) {
2571                                         ktime_t time;
2572                                         head->th_deadline = deadline;
2573                                         time = ktime_set(0, 0);
2574                                         time = ktime_add_ns(time, deadline);
2575                                         hrtimer_start(&head->th_timer, time,
2576                                                       HRTIMER_MODE_ABS);
2577                                 }
2578                         }
2579                 }
2580         } else {
2581                 LASSERT(cli->tc_in_heap);
2582                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
2583                 list_add_tail(&nrq->nr_u.tbf.tr_list,
2584                                   &cli->tc_list);
2585         }
2586
2587         if (rc == 0)
2588                 CDEBUG(D_RPCTRACE,
2589                        "TBF enqueues: class@%p rate %llu gen %llu "
2590                        "token %llu, rule@%p rate %llu gen %llu\n",
2591                        cli, cli->tc_rpc_rate,
2592                        cli->tc_rule_generation, cli->tc_ntoken,
2593                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
2594                        cli->tc_rule->tr_generation);
2595
2596         return rc;
2597 }
2598
2599 /**
2600  * Removes request \a nrq from \a policy's list of queued requests.
2601  *
2602  * \param[in] policy The policy
2603  * \param[in] nrq    The request to remove
2604  */
2605 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
2606                              struct ptlrpc_nrs_request *nrq)
2607 {
2608         struct nrs_tbf_head   *head;
2609         struct nrs_tbf_client *cli;
2610
2611         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2612
2613         cli = container_of(nrs_request_resource(nrq),
2614                            struct nrs_tbf_client, tc_res);
2615         head = container_of(nrs_request_resource(nrq)->res_parent,
2616                             struct nrs_tbf_head, th_res);
2617
2618         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
2619         list_del_init(&nrq->nr_u.tbf.tr_list);
2620         if (list_empty(&cli->tc_list)) {
2621                 cfs_binheap_remove(head->th_binheap,
2622                                    &cli->tc_node);
2623                 cli->tc_in_heap = false;
2624         } else {
2625                 cfs_binheap_relocate(head->th_binheap,
2626                                      &cli->tc_node);
2627         }
2628 }
2629
2630 /**
2631  * Prints a debug statement right before the request \a nrq stops being
2632  * handled.
2633  *
2634  * \param[in] policy The policy handling the request
2635  * \param[in] nrq    The request being handled
2636  *
2637  * \see ptlrpc_server_finish_request()
2638  * \see ptlrpc_nrs_req_stop_nolock()
2639  */
2640 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
2641                               struct ptlrpc_nrs_request *nrq)
2642 {
2643         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
2644                                                   rq_nrq);
2645
2646         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2647
2648         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
2649                policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
2650                nrq->nr_u.tbf.tr_sequence);
2651 }
2652
2653 #ifdef CONFIG_PROC_FS
2654
2655 /**
2656  * lprocfs interface
2657  */
2658
2659 /**
2660  * The maximum RPC rate.
2661  */
2662 #define LPROCFS_NRS_RATE_MAX            65535
2663
2664 static int
2665 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
2666 {
2667         struct ptlrpc_service       *svc = m->private;
2668         int                          rc;
2669
2670         seq_printf(m, "regular_requests:\n");
2671         /**
2672          * Perform two separate calls to this as only one of the NRS heads'
2673          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
2674          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
2675          */
2676         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
2677                                        NRS_POL_NAME_TBF,
2678                                        NRS_CTL_TBF_RD_RULE,
2679                                        false, m);
2680         if (rc == 0) {
2681                 /**
2682                  * -ENOSPC means buf in the parameter m is overflow, return 0
2683                  * here to let upper layer function seq_read alloc a larger
2684                  * memory area and do this process again.
2685                  */
2686         } else if (rc == -ENOSPC) {
2687                 return 0;
2688
2689                 /**
2690                  * Ignore -ENODEV as the regular NRS head's policy may be in the
2691                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
2692                  */
2693         } else if (rc != -ENODEV) {
2694                 return rc;
2695         }
2696
2697         if (!nrs_svc_has_hp(svc))
2698                 goto no_hp;
2699
2700         seq_printf(m, "high_priority_requests:\n");
2701         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
2702                                        NRS_POL_NAME_TBF,
2703                                        NRS_CTL_TBF_RD_RULE,
2704                                        false, m);
2705         if (rc == 0) {
2706                 /**
2707                  * -ENOSPC means buf in the parameter m is overflow, return 0
2708                  * here to let upper layer function seq_read alloc a larger
2709                  * memory area and do this process again.
2710                  */
2711         } else if (rc == -ENOSPC) {
2712                 return 0;
2713         }
2714
2715 no_hp:
2716
2717         return rc;
2718 }
2719
2720 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
2721 {
2722         int rc;
2723
2724         switch (cmd->u.tc_start.ts_valid_type) {
2725         case NRS_TBF_FLAG_JOBID:
2726                 rc = nrs_tbf_jobid_parse(cmd, token);
2727                 break;
2728         case NRS_TBF_FLAG_NID:
2729                 rc = nrs_tbf_nid_parse(cmd, token);
2730                 break;
2731         case NRS_TBF_FLAG_OPCODE:
2732                 rc = nrs_tbf_opcode_parse(cmd, token);
2733                 break;
2734         case NRS_TBF_FLAG_GENERIC:
2735                 rc = nrs_tbf_generic_parse(cmd, token);
2736                 break;
2737         default:
2738                 RETURN(-EINVAL);
2739         }
2740
2741         return rc;
2742 }
2743
2744 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
2745 {
2746         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
2747                 if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_JOBID)
2748                         nrs_tbf_jobid_cmd_fini(cmd);
2749                 else if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_NID)
2750                         nrs_tbf_nid_cmd_fini(cmd);
2751                 else if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_OPCODE)
2752                         nrs_tbf_opcode_cmd_fini(cmd);
2753                 else if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_GENERIC)
2754                         nrs_tbf_generic_cmd_fini(cmd);
2755         }
2756 }
2757
2758 static bool name_is_valid(const char *name)
2759 {
2760         int i;
2761
2762         for (i = 0; i < strlen(name); i++) {
2763                 if ((!isalnum(name[i])) &&
2764                     (name[i] != '_'))
2765                         return false;
2766         }
2767         return true;
2768 }
2769
2770 static int
2771 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
2772 {
2773         char    *key;
2774         char    *val;
2775         int      rc;
2776         __u64    rate;
2777
2778         val = buffer;
2779         key = strsep(&val, "=");
2780         if (val == NULL || strlen(val) == 0)
2781                 return -EINVAL;
2782
2783         /* Key of the value pair */
2784         if (strcmp(key, "rate") == 0) {
2785                 rc = kstrtoull(val, 10, &rate);
2786                 if (rc)
2787                         return rc;
2788
2789                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
2790                         return -EINVAL;
2791
2792                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
2793                         cmd->u.tc_start.ts_rpc_rate = rate;
2794                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
2795                         cmd->u.tc_change.tc_rpc_rate = rate;
2796                 else
2797                         return -EINVAL;
2798         }  else if (strcmp(key, "rank") == 0) {
2799                 if (!name_is_valid(val))
2800                         return -EINVAL;
2801
2802                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
2803                         cmd->u.tc_start.ts_next_name = val;
2804                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
2805                         cmd->u.tc_change.tc_next_name = val;
2806                 else
2807                         return -EINVAL;
2808         } else {
2809                 return -EINVAL;
2810         }
2811         return 0;
2812 }
2813
2814 static int
2815 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
2816 {
2817         char    *val;
2818         char    *token;
2819         int      rc;
2820
2821         val = buffer;
2822         while (val != NULL && strlen(val) != 0) {
2823                 token = strsep(&val, " ");
2824                 rc = nrs_tbf_parse_value_pair(cmd, token);
2825                 if (rc)
2826                         return rc;
2827         }
2828
2829         switch (cmd->tc_cmd) {
2830         case NRS_CTL_TBF_START_RULE:
2831                 if (cmd->u.tc_start.ts_rpc_rate == 0)
2832                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
2833                 break;
2834         case NRS_CTL_TBF_CHANGE_RULE:
2835                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
2836                     cmd->u.tc_change.tc_next_name == NULL)
2837                         return -EINVAL;
2838                 break;
2839         case NRS_CTL_TBF_STOP_RULE:
2840                 break;
2841         default:
2842                 return -EINVAL;
2843         }
2844         return 0;
2845 }
2846
2847 static struct nrs_tbf_cmd *
2848 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
2849 {
2850         static struct nrs_tbf_cmd       *cmd;
2851         char                            *token;
2852         char                            *val;
2853         int                              rc = 0;
2854
2855         OBD_ALLOC_PTR(cmd);
2856         if (cmd == NULL)
2857                 GOTO(out, rc = -ENOMEM);
2858         memset(cmd, 0, sizeof(*cmd));
2859
2860         val = buffer;
2861         token = strsep(&val, " ");
2862         if (val == NULL || strlen(val) == 0)
2863                 GOTO(out_free_cmd, rc = -EINVAL);
2864
2865         /* Type of the command */
2866         if (strcmp(token, "start") == 0) {
2867                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
2868                 cmd->u.tc_start.ts_valid_type = type_flag;
2869         } else if (strcmp(token, "stop") == 0)
2870                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
2871         else if (strcmp(token, "change") == 0)
2872                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
2873         else
2874                 GOTO(out_free_cmd, rc = -EINVAL);
2875
2876         /* Name of the rule */
2877         token = strsep(&val, " ");
2878         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) ||
2879             !name_is_valid(token))
2880                 GOTO(out_free_cmd, rc = -EINVAL);
2881         cmd->tc_name = token;
2882
2883         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
2884                 /* List of ID */
2885                 LASSERT(val);
2886                 token = val;
2887                 val = strrchr(token, '}');
2888                 if (!val)
2889                         GOTO(out_free_cmd, rc = -EINVAL);
2890
2891                 /* Skip '}' */
2892                 val++;
2893                 if (*val == '\0') {
2894                         val = NULL;
2895                 } else if (*val == ' ') {
2896                         *val = '\0';
2897                         val++;
2898                 } else
2899                         GOTO(out_free_cmd, rc = -EINVAL);
2900
2901                 rc = nrs_tbf_id_parse(cmd, token);
2902                 if (rc)
2903                         GOTO(out_free_cmd, rc);
2904         }
2905
2906         rc = nrs_tbf_parse_value_pairs(cmd, val);
2907         if (rc)
2908                 GOTO(out_cmd_fini, rc = -EINVAL);
2909         goto out;
2910 out_cmd_fini:
2911         nrs_tbf_cmd_fini(cmd);
2912 out_free_cmd:
2913         OBD_FREE_PTR(cmd);
2914 out:
2915         if (rc)
2916                 cmd = ERR_PTR(rc);
2917         return cmd;
2918 }
2919
2920 /**
2921  * Get the TBF policy type (nid, jobid, etc) preset by
2922  * proc entry 'nrs_policies' for command buffer parsing.
2923  *
2924  * \param[in] svc the PTLRPC service
2925  * \param[in] queue the NRS queue type
2926  *
2927  * \retval the preset TBF policy type flag
2928  */
2929 static __u32
2930 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
2931 {
2932         __u32   type;
2933         int     rc;
2934
2935         rc = ptlrpc_nrs_policy_control(svc, queue,
2936                                        NRS_POL_NAME_TBF,
2937                                        NRS_CTL_TBF_RD_TYPE_FLAG,
2938                                        true, &type);
2939         if (rc != 0)
2940                 type = NRS_TBF_FLAG_INVALID;
2941
2942         return type;
2943 }
2944
2945 extern struct nrs_core nrs_core;
2946 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
2947 static ssize_t
2948 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
2949                                       const char __user *buffer,
2950                                       size_t count, loff_t *off)
2951 {
2952         struct seq_file           *m = file->private_data;
2953         struct ptlrpc_service     *svc = m->private;
2954         char                      *kernbuf;
2955         char                      *val;
2956         int                        rc;
2957         static struct nrs_tbf_cmd *cmd;
2958         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
2959         unsigned long              length;
2960         char                      *token;
2961
2962         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
2963         if (kernbuf == NULL)
2964                 GOTO(out, rc = -ENOMEM);
2965
2966         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
2967                 GOTO(out_free_kernbuff, rc = -EINVAL);
2968
2969         if (copy_from_user(kernbuf, buffer, count))
2970                 GOTO(out_free_kernbuff, rc = -EFAULT);
2971
2972         val = kernbuf;
2973         token = strsep(&val, " ");
2974         if (val == NULL)
2975                 GOTO(out_free_kernbuff, rc = -EINVAL);
2976
2977         if (strcmp(token, "reg") == 0) {
2978                 queue = PTLRPC_NRS_QUEUE_REG;
2979         } else if (strcmp(token, "hp") == 0) {
2980                 queue = PTLRPC_NRS_QUEUE_HP;
2981         } else {
2982                 kernbuf[strlen(token)] = ' ';
2983                 val = kernbuf;
2984         }
2985         length = strlen(val);
2986
2987         if (length == 0)
2988                 GOTO(out_free_kernbuff, rc = -EINVAL);
2989
2990         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
2991                 GOTO(out_free_kernbuff, rc = -ENODEV);
2992         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
2993                 queue = PTLRPC_NRS_QUEUE_REG;
2994
2995         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
2996         if (IS_ERR(cmd))
2997                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
2998
2999         /**
3000          * Serialize NRS core lprocfs operations with policy registration/
3001          * unregistration.
3002          */
3003         mutex_lock(&nrs_core.nrs_mutex);
3004         rc = ptlrpc_nrs_policy_control(svc, queue,
3005                                        NRS_POL_NAME_TBF,
3006                                        NRS_CTL_TBF_WR_RULE,
3007                                        false, cmd);
3008         mutex_unlock(&nrs_core.nrs_mutex);
3009
3010         nrs_tbf_cmd_fini(cmd);
3011         OBD_FREE_PTR(cmd);
3012 out_free_kernbuff:
3013         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3014 out:
3015         return rc ? rc : count;
3016 }
3017 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3018
3019 /**
3020  * Initializes a TBF policy's lprocfs interface for service \a svc
3021  *
3022  * \param[in] svc the service
3023  *
3024  * \retval 0    success
3025  * \retval != 0 error
3026  */
3027 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3028 {
3029         struct lprocfs_vars nrs_tbf_lprocfs_vars[] = {
3030                 { .name         = "nrs_tbf_rule",
3031                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3032                   .data = svc },
3033                 { NULL }
3034         };
3035
3036         if (svc->srv_procroot == NULL)
3037                 return 0;
3038
3039         return lprocfs_add_vars(svc->srv_procroot, nrs_tbf_lprocfs_vars, NULL);
3040 }
3041
3042 /**
3043  * Cleans up a TBF policy's lprocfs interface for service \a svc
3044  *
3045  * \param[in] svc the service
3046  */
3047 static void nrs_tbf_lprocfs_fini(struct ptlrpc_service *svc)
3048 {
3049         if (svc->srv_procroot == NULL)
3050                 return;
3051
3052         lprocfs_remove_proc_entry("nrs_tbf_rule", svc->srv_procroot);
3053 }
3054
3055 #endif /* CONFIG_PROC_FS */
3056
3057 /**
3058  * TBF policy operations
3059  */
3060 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3061         .op_policy_start        = nrs_tbf_start,
3062         .op_policy_stop         = nrs_tbf_stop,
3063         .op_policy_ctl          = nrs_tbf_ctl,
3064         .op_res_get             = nrs_tbf_res_get,
3065         .op_res_put             = nrs_tbf_res_put,
3066         .op_req_get             = nrs_tbf_req_get,
3067         .op_req_enqueue         = nrs_tbf_req_add,
3068         .op_req_dequeue         = nrs_tbf_req_del,
3069         .op_req_stop            = nrs_tbf_req_stop,
3070 #ifdef CONFIG_PROC_FS
3071         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3072         .op_lprocfs_fini        = nrs_tbf_lprocfs_fini,
3073 #endif
3074 };
3075
3076 /**
3077  * TBF policy configuration
3078  */
3079 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3080         .nc_name                = NRS_POL_NAME_TBF,
3081         .nc_ops                 = &nrs_tbf_ops,
3082         .nc_compat              = nrs_policy_compat_all,
3083 };
3084
3085 /** @} tbf */
3086
3087 /** @} nrs */
3088
3089 #endif /* HAVE_SERVER_SUPPORT */