Whamcloud - gitweb
LU-6245 libcfs: replace CFS_MODULE_PARAM with linux kernel module api
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 #ifdef HAVE_SERVER_SUPPORT
35
36 /**
37  * \addtogoup nrs
38  * @{
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <libcfs/libcfs.h>
45 #include "ptlrpc_internal.h"
46
47 /**
48  * \name tbf
49  *
50  * Token Bucket Filter over client NIDs
51  *
52  * @{
53  */
54
55 #define NRS_POL_NAME_TBF        "tbf"
56
57 static int tbf_jobid_cache_size = 8192;
58 module_param(tbf_jobid_cache_size, int, 0644);
59 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
60
61 static int tbf_rate = 10000;
62 module_param(tbf_rate, int, 0644);
63 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
64
65 static int tbf_depth = 3;
66 module_param(tbf_depth, int, 0644);
67 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
68
69 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
70 {
71         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
72                                                  th_timer);
73         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
74         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
75
76         nrs->nrs_throttling = 0;
77         wake_up(&svcpt->scp_waitq);
78
79         return HRTIMER_NORESTART;
80 }
81
82 #define NRS_TBF_DEFAULT_RULE "default"
83
84 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
85 {
86         LASSERT(atomic_read(&rule->tr_ref) == 0);
87         LASSERT(list_empty(&rule->tr_cli_list));
88         LASSERT(list_empty(&rule->tr_linkage));
89
90         rule->tr_head->th_ops->o_rule_fini(rule);
91         OBD_FREE_PTR(rule);
92 }
93
94 /**
95  * Decreases the rule's usage reference count, and stops the rule in case it
96  * was already stopping and have no more outstanding usage references (which
97  * indicates it has no more queued or started requests, and can be safely
98  * stopped).
99  */
100 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
101 {
102         if (atomic_dec_and_test(&rule->tr_ref))
103                 nrs_tbf_rule_fini(rule);
104 }
105
106 /**
107  * Increases the rule's usage reference count.
108  */
109 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
110 {
111         atomic_inc(&rule->tr_ref);
112 }
113
114 static void
115 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
116 {
117         LASSERT(!list_empty(&cli->tc_linkage));
118         LASSERT(cli->tc_rule);
119         spin_lock(&cli->tc_rule->tr_rule_lock);
120         list_del_init(&cli->tc_linkage);
121         spin_unlock(&cli->tc_rule->tr_rule_lock);
122         nrs_tbf_rule_put(cli->tc_rule);
123         cli->tc_rule = NULL;
124 }
125
126 static void
127 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
128                         struct nrs_tbf_client *cli)
129
130 {
131         struct nrs_tbf_rule *rule = cli->tc_rule;
132
133         cli->tc_rpc_rate = rule->tr_rpc_rate;
134         cli->tc_nsecs = rule->tr_nsecs;
135         cli->tc_depth = rule->tr_depth;
136         cli->tc_ntoken = rule->tr_depth;
137         cli->tc_check_time = ktime_to_ns(ktime_get());
138         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
139         cli->tc_rule_generation = rule->tr_generation;
140
141         if (cli->tc_in_heap)
142                 cfs_binheap_relocate(head->th_binheap,
143                                      &cli->tc_node);
144 }
145
146 static void
147 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
148                   struct nrs_tbf_rule *rule,
149                   struct nrs_tbf_client *cli)
150 {
151         spin_lock(&cli->tc_rule_lock);
152         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
153                 LASSERT(rule != cli->tc_rule);
154                 nrs_tbf_cli_rule_put(cli);
155         }
156         LASSERT(cli->tc_rule == NULL);
157         LASSERT(list_empty(&cli->tc_linkage));
158         /* Rule's ref is added before called */
159         cli->tc_rule = rule;
160         spin_lock(&rule->tr_rule_lock);
161         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
162         spin_unlock(&rule->tr_rule_lock);
163         spin_unlock(&cli->tc_rule_lock);
164         nrs_tbf_cli_reset_value(head, cli);
165 }
166
167 static int
168 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
169 {
170         return rule->tr_head->th_ops->o_rule_dump(rule, m);
171 }
172
173 static int
174 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
175 {
176         struct nrs_tbf_rule *rule;
177         int rc = 0;
178
179         LASSERT(head != NULL);
180         spin_lock(&head->th_rule_lock);
181         /* List the rules from newest to oldest */
182         list_for_each_entry(rule, &head->th_list, tr_linkage) {
183                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
184                 rc = nrs_tbf_rule_dump(rule, m);
185                 if (rc) {
186                         rc = -ENOSPC;
187                         break;
188                 }
189         }
190         spin_unlock(&head->th_rule_lock);
191
192         return rc;
193 }
194
195 static struct nrs_tbf_rule *
196 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
197                          const char *name)
198 {
199         struct nrs_tbf_rule *rule;
200
201         LASSERT(head != NULL);
202         list_for_each_entry(rule, &head->th_list, tr_linkage) {
203                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
204                 if (strcmp(rule->tr_name, name) == 0) {
205                         nrs_tbf_rule_get(rule);
206                         return rule;
207                 }
208         }
209         return NULL;
210 }
211
212 static struct nrs_tbf_rule *
213 nrs_tbf_rule_find(struct nrs_tbf_head *head,
214                   const char *name)
215 {
216         struct nrs_tbf_rule *rule;
217
218         LASSERT(head != NULL);
219         spin_lock(&head->th_rule_lock);
220         rule = nrs_tbf_rule_find_nolock(head, name);
221         spin_unlock(&head->th_rule_lock);
222         return rule;
223 }
224
225 static struct nrs_tbf_rule *
226 nrs_tbf_rule_match(struct nrs_tbf_head *head,
227                    struct nrs_tbf_client *cli)
228 {
229         struct nrs_tbf_rule *rule = NULL;
230         struct nrs_tbf_rule *tmp_rule;
231
232         spin_lock(&head->th_rule_lock);
233         /* Match the newest rule in the list */
234         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
235                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
236                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
237                         rule = tmp_rule;
238                         break;
239                 }
240         }
241
242         if (rule == NULL)
243                 rule = head->th_rule;
244
245         nrs_tbf_rule_get(rule);
246         spin_unlock(&head->th_rule_lock);
247         return rule;
248 }
249
250 static void
251 nrs_tbf_cli_init(struct nrs_tbf_head *head,
252                  struct nrs_tbf_client *cli,
253                  struct ptlrpc_request *req)
254 {
255         struct nrs_tbf_rule *rule;
256
257         cli->tc_in_heap = false;
258         head->th_ops->o_cli_init(cli, req);
259         INIT_LIST_HEAD(&cli->tc_list);
260         INIT_LIST_HEAD(&cli->tc_linkage);
261         spin_lock_init(&cli->tc_rule_lock);
262         atomic_set(&cli->tc_ref, 1);
263         rule = nrs_tbf_rule_match(head, cli);
264         nrs_tbf_cli_reset(head, rule, cli);
265 }
266
267 static void
268 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
269 {
270         LASSERT(list_empty(&cli->tc_list));
271         LASSERT(!cli->tc_in_heap);
272         LASSERT(atomic_read(&cli->tc_ref) == 0);
273         spin_lock(&cli->tc_rule_lock);
274         nrs_tbf_cli_rule_put(cli);
275         spin_unlock(&cli->tc_rule_lock);
276         OBD_FREE_PTR(cli);
277 }
278
279 static int
280 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
281                    struct nrs_tbf_head *head,
282                    struct nrs_tbf_cmd *start)
283 {
284         struct nrs_tbf_rule *rule, *tmp_rule;
285         int rc;
286
287         rule = nrs_tbf_rule_find(head, start->tc_name);
288         if (rule) {
289                 nrs_tbf_rule_put(rule);
290                 return -EEXIST;
291         }
292
293         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
294         if (rule == NULL)
295                 return -ENOMEM;
296
297         memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
298         rule->tr_rpc_rate = start->tc_rpc_rate;
299         rule->tr_nsecs = NSEC_PER_SEC;
300         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
301         rule->tr_depth = tbf_depth;
302         atomic_set(&rule->tr_ref, 1);
303         INIT_LIST_HEAD(&rule->tr_cli_list);
304         INIT_LIST_HEAD(&rule->tr_nids);
305         INIT_LIST_HEAD(&rule->tr_linkage);
306         spin_lock_init(&rule->tr_rule_lock);
307         rule->tr_head = head;
308
309         rc = head->th_ops->o_rule_init(policy, rule, start);
310         if (rc) {
311                 OBD_FREE_PTR(rule);
312                 return rc;
313         }
314
315         /* Add as the newest rule */
316         spin_lock(&head->th_rule_lock);
317         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
318         if (tmp_rule) {
319                 spin_unlock(&head->th_rule_lock);
320                 nrs_tbf_rule_put(tmp_rule);
321                 nrs_tbf_rule_put(rule);
322                 return -EEXIST;
323         }
324         list_add(&rule->tr_linkage, &head->th_list);
325         spin_unlock(&head->th_rule_lock);
326         atomic_inc(&head->th_rule_sequence);
327         if (start->tc_rule_flags & NTRS_DEFAULT) {
328                 rule->tr_flags |= NTRS_DEFAULT;
329                 LASSERT(head->th_rule == NULL);
330                 head->th_rule = rule;
331         }
332
333         return 0;
334 }
335
336 static int
337 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
338                     struct nrs_tbf_head *head,
339                     struct nrs_tbf_cmd *change)
340 {
341         struct nrs_tbf_rule *rule;
342
343         assert_spin_locked(&policy->pol_nrs->nrs_lock);
344
345         rule = nrs_tbf_rule_find(head, change->tc_name);
346         if (rule == NULL)
347                 return -ENOENT;
348
349         rule->tr_rpc_rate = change->tc_rpc_rate;
350         rule->tr_nsecs = NSEC_PER_SEC;
351         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
352         rule->tr_generation++;
353         nrs_tbf_rule_put(rule);
354
355         return 0;
356 }
357
358 static int
359 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
360                   struct nrs_tbf_head *head,
361                   struct nrs_tbf_cmd *stop)
362 {
363         struct nrs_tbf_rule *rule;
364
365         assert_spin_locked(&policy->pol_nrs->nrs_lock);
366
367         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
368                 return -EPERM;
369
370         rule = nrs_tbf_rule_find(head, stop->tc_name);
371         if (rule == NULL)
372                 return -ENOENT;
373
374         list_del_init(&rule->tr_linkage);
375         rule->tr_flags |= NTRS_STOPPING;
376         nrs_tbf_rule_put(rule);
377         nrs_tbf_rule_put(rule);
378
379         return 0;
380 }
381
382 static int
383 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
384                 struct nrs_tbf_head *head,
385                 struct nrs_tbf_cmd *cmd)
386 {
387         int rc;
388
389         assert_spin_locked(&policy->pol_nrs->nrs_lock);
390
391         switch (cmd->tc_cmd) {
392         case NRS_CTL_TBF_START_RULE:
393                 if (!(cmd->tc_valid_types & head->th_type_flag))
394                         return -EINVAL;
395
396                 spin_unlock(&policy->pol_nrs->nrs_lock);
397                 rc = nrs_tbf_rule_start(policy, head, cmd);
398                 spin_lock(&policy->pol_nrs->nrs_lock);
399                 return rc;
400         case NRS_CTL_TBF_CHANGE_RATE:
401                 rc = nrs_tbf_rule_change(policy, head, cmd);
402                 return rc;
403         case NRS_CTL_TBF_STOP_RULE:
404                 rc = nrs_tbf_rule_stop(policy, head, cmd);
405                 /* Take it as a success, if not exists at all */
406                 return rc == -ENOENT ? 0 : rc;
407         default:
408                 return -EFAULT;
409         }
410 }
411
412 /**
413  * Binary heap predicate.
414  *
415  * \param[in] e1 the first binheap node to compare
416  * \param[in] e2 the second binheap node to compare
417  *
418  * \retval 0 e1 > e2
419  * \retval 1 e1 < e2
420  */
421 static int
422 tbf_cli_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
423 {
424         struct nrs_tbf_client *cli1;
425         struct nrs_tbf_client *cli2;
426
427         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
428         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
429
430         if (cli1->tc_check_time + cli1->tc_nsecs <
431             cli2->tc_check_time + cli2->tc_nsecs)
432                 return 1;
433         else if (cli1->tc_check_time + cli1->tc_nsecs >
434                  cli2->tc_check_time + cli2->tc_nsecs)
435                 return 0;
436
437         if (cli1->tc_check_time < cli2->tc_check_time)
438                 return 1;
439         else if (cli1->tc_check_time > cli2->tc_check_time)
440                 return 0;
441
442         /* Maybe need more comparasion, e.g. request number in the rules */
443         return 1;
444 }
445
446 /**
447  * TBF binary heap operations
448  */
449 static struct cfs_binheap_ops nrs_tbf_heap_ops = {
450         .hop_enter      = NULL,
451         .hop_exit       = NULL,
452         .hop_compare    = tbf_cli_compare,
453 };
454
455 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
456                                   unsigned mask)
457 {
458         return cfs_hash_djb2_hash(key, strlen(key), mask);
459 }
460
461 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
462 {
463         struct nrs_tbf_client *cli = hlist_entry(hnode,
464                                                      struct nrs_tbf_client,
465                                                      tc_hnode);
466
467         return (strcmp(cli->tc_jobid, key) == 0);
468 }
469
470 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
471 {
472         struct nrs_tbf_client *cli = hlist_entry(hnode,
473                                                      struct nrs_tbf_client,
474                                                      tc_hnode);
475
476         return cli->tc_jobid;
477 }
478
479 static void *nrs_tbf_jobid_hop_object(struct hlist_node *hnode)
480 {
481         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
482 }
483
484 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
485 {
486         struct nrs_tbf_client *cli = hlist_entry(hnode,
487                                                      struct nrs_tbf_client,
488                                                      tc_hnode);
489
490         atomic_inc(&cli->tc_ref);
491 }
492
493 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
494 {
495         struct nrs_tbf_client *cli = hlist_entry(hnode,
496                                                      struct nrs_tbf_client,
497                                                      tc_hnode);
498
499         atomic_dec(&cli->tc_ref);
500 }
501
502 static void
503 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
504
505 {
506         struct nrs_tbf_client *cli = hlist_entry(hnode,
507                                                  struct nrs_tbf_client,
508                                                  tc_hnode);
509
510         LASSERT(atomic_read(&cli->tc_ref) == 0);
511         nrs_tbf_cli_fini(cli);
512 }
513
514 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
515         .hs_hash        = nrs_tbf_jobid_hop_hash,
516         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
517         .hs_key         = nrs_tbf_jobid_hop_key,
518         .hs_object      = nrs_tbf_jobid_hop_object,
519         .hs_get         = nrs_tbf_jobid_hop_get,
520         .hs_put         = nrs_tbf_jobid_hop_put,
521         .hs_put_locked  = nrs_tbf_jobid_hop_put,
522         .hs_exit        = nrs_tbf_jobid_hop_exit,
523 };
524
525 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
526                                   CFS_HASH_NO_ITEMREF | \
527                                   CFS_HASH_DEPTH)
528
529 static struct nrs_tbf_client *
530 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
531                           struct cfs_hash_bd *bd,
532                           const char *jobid)
533 {
534         struct hlist_node *hnode;
535         struct nrs_tbf_client *cli;
536
537         /* cfs_hash_bd_peek_locked is a somehow "internal" function
538          * of cfs_hash, it doesn't add refcount on object. */
539         hnode = cfs_hash_bd_peek_locked(hs, bd, (void *)jobid);
540         if (hnode == NULL)
541                 return NULL;
542
543         cfs_hash_get(hs, hnode);
544         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
545         if (!list_empty(&cli->tc_lru))
546                 list_del_init(&cli->tc_lru);
547         return cli;
548 }
549
550 #define NRS_TBF_JOBID_NULL ""
551
552 static struct nrs_tbf_client *
553 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
554                        struct ptlrpc_request *req)
555 {
556         const char              *jobid;
557         struct nrs_tbf_client   *cli;
558         struct cfs_hash         *hs = head->th_cli_hash;
559         struct cfs_hash_bd               bd;
560
561         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
562         if (jobid == NULL)
563                 jobid = NRS_TBF_JOBID_NULL;
564         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
565         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
566         cfs_hash_bd_unlock(hs, &bd, 1);
567
568         return cli;
569 }
570
571 static struct nrs_tbf_client *
572 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
573                           struct nrs_tbf_client *cli)
574 {
575         const char              *jobid;
576         struct nrs_tbf_client   *ret;
577         struct cfs_hash         *hs = head->th_cli_hash;
578         struct cfs_hash_bd               bd;
579
580         jobid = cli->tc_jobid;
581         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
582         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
583         if (ret == NULL) {
584                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
585                 ret = cli;
586         }
587         cfs_hash_bd_unlock(hs, &bd, 1);
588
589         return ret;
590 }
591
592 static void
593 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
594                       struct nrs_tbf_client *cli)
595 {
596         struct cfs_hash_bd               bd;
597         struct cfs_hash         *hs = head->th_cli_hash;
598         struct nrs_tbf_bucket   *bkt;
599         int                      hw;
600         struct list_head        zombies;
601
602         INIT_LIST_HEAD(&zombies);
603         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
604         bkt = cfs_hash_bd_extra_get(hs, &bd);
605         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
606                 return;
607         LASSERT(list_empty(&cli->tc_lru));
608         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
609
610         /*
611          * Check and purge the LRU, there is at least one client in the LRU.
612          */
613         hw = tbf_jobid_cache_size >>
614              (hs->hs_cur_bits - hs->hs_bkt_bits);
615         while (cfs_hash_bd_count_get(&bd) > hw) {
616                 if (unlikely(list_empty(&bkt->ntb_lru)))
617                         break;
618                 cli = list_entry(bkt->ntb_lru.next,
619                                      struct nrs_tbf_client,
620                                      tc_lru);
621                 LASSERT(atomic_read(&cli->tc_ref) == 0);
622                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
623                 list_move(&cli->tc_lru, &zombies);
624         }
625         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
626
627         while (!list_empty(&zombies)) {
628                 cli = container_of0(zombies.next,
629                                     struct nrs_tbf_client, tc_lru);
630                 list_del_init(&cli->tc_lru);
631                 nrs_tbf_cli_fini(cli);
632         }
633 }
634
635 static void
636 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
637                        struct ptlrpc_request *req)
638 {
639         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
640
641         if (jobid == NULL)
642                 jobid = NRS_TBF_JOBID_NULL;
643         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
644         INIT_LIST_HEAD(&cli->tc_lru);
645         memcpy(cli->tc_jobid, jobid, strlen(jobid));
646 }
647
648 static int nrs_tbf_jobid_hash_order(void)
649 {
650         int bits;
651
652         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
653                 ;
654
655         return bits;
656 }
657
658 #define NRS_TBF_JOBID_BKT_BITS 10
659
660 static int
661 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
662                       struct nrs_tbf_head *head)
663 {
664         struct nrs_tbf_cmd       start;
665         struct nrs_tbf_bucket   *bkt;
666         int                      bits;
667         int                      i;
668         int                      rc;
669         struct cfs_hash_bd       bd;
670
671         bits = nrs_tbf_jobid_hash_order();
672         if (bits < NRS_TBF_JOBID_BKT_BITS)
673                 bits = NRS_TBF_JOBID_BKT_BITS;
674         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
675                                             bits,
676                                             bits,
677                                             NRS_TBF_JOBID_BKT_BITS,
678                                             sizeof(*bkt),
679                                             0,
680                                             0,
681                                             &nrs_tbf_jobid_hash_ops,
682                                             NRS_TBF_JOBID_HASH_FLAGS);
683         if (head->th_cli_hash == NULL)
684                 return -ENOMEM;
685
686         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
687                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
688                 INIT_LIST_HEAD(&bkt->ntb_lru);
689         }
690
691         memset(&start, 0, sizeof(start));
692         start.tc_jobids_str = "*";
693
694         start.tc_rpc_rate = tbf_rate;
695         start.tc_rule_flags = NTRS_DEFAULT;
696         start.tc_name = NRS_TBF_DEFAULT_RULE;
697         INIT_LIST_HEAD(&start.tc_jobids);
698         rc = nrs_tbf_rule_start(policy, head, &start);
699
700         return rc;
701 }
702
703 /**
704  * Frees jobid of \a list.
705  *
706  */
707 static void
708 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
709 {
710         struct nrs_tbf_jobid *jobid, *n;
711
712         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
713                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
714                 list_del(&jobid->tj_linkage);
715                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
716         }
717 }
718
719 static int
720 nrs_tbf_jobid_list_add(const struct cfs_lstr *id, struct list_head *jobid_list)
721 {
722         struct nrs_tbf_jobid *jobid;
723
724         OBD_ALLOC(jobid, sizeof(struct nrs_tbf_jobid));
725         if (jobid == NULL)
726                 return -ENOMEM;
727
728         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
729         if (jobid->tj_id == NULL) {
730                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
731                 return -ENOMEM;
732         }
733
734         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
735         list_add_tail(&jobid->tj_linkage, jobid_list);
736         return 0;
737 }
738
739 static int
740 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
741 {
742         struct nrs_tbf_jobid *jobid;
743
744         list_for_each_entry(jobid, jobid_list, tj_linkage) {
745                 if (strcmp(id, jobid->tj_id) == 0)
746                         return 1;
747         }
748         return 0;
749 }
750
751 static int
752 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
753 {
754         struct cfs_lstr src;
755         struct cfs_lstr res;
756         int rc = 0;
757         ENTRY;
758
759         src.ls_str = str;
760         src.ls_len = len;
761         INIT_LIST_HEAD(jobid_list);
762         while (src.ls_str) {
763                 rc = cfs_gettok(&src, ' ', &res);
764                 if (rc == 0) {
765                         rc = -EINVAL;
766                         break;
767                 }
768                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
769                 if (rc)
770                         break;
771         }
772         if (rc)
773                 nrs_tbf_jobid_list_free(jobid_list);
774         RETURN(rc);
775 }
776
777 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
778 {
779         if (!list_empty(&cmd->tc_jobids))
780                 nrs_tbf_jobid_list_free(&cmd->tc_jobids);
781         if (cmd->tc_jobids_str)
782                 OBD_FREE(cmd->tc_jobids_str, strlen(cmd->tc_jobids_str) + 1);
783 }
784
785 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, const char *id)
786 {
787         int rc;
788
789         OBD_ALLOC(cmd->tc_jobids_str, strlen(id) + 1);
790         if (cmd->tc_jobids_str == NULL)
791                 return -ENOMEM;
792
793         memcpy(cmd->tc_jobids_str, id, strlen(id));
794
795         /* parse jobid list */
796         rc = nrs_tbf_jobid_list_parse(cmd->tc_jobids_str,
797                                       strlen(cmd->tc_jobids_str),
798                                       &cmd->tc_jobids);
799         if (rc)
800                 nrs_tbf_jobid_cmd_fini(cmd);
801
802         return rc;
803 }
804
805 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
806                                    struct nrs_tbf_rule *rule,
807                                    struct nrs_tbf_cmd *start)
808 {
809         int rc = 0;
810
811         LASSERT(start->tc_jobids_str);
812         OBD_ALLOC(rule->tr_jobids_str,
813                   strlen(start->tc_jobids_str) + 1);
814         if (rule->tr_jobids_str == NULL)
815                 return -ENOMEM;
816
817         memcpy(rule->tr_jobids_str,
818                start->tc_jobids_str,
819                strlen(start->tc_jobids_str));
820
821         INIT_LIST_HEAD(&rule->tr_jobids);
822         if (!list_empty(&start->tc_jobids)) {
823                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
824                                               strlen(rule->tr_jobids_str),
825                                               &rule->tr_jobids);
826                 if (rc)
827                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
828         }
829         if (rc)
830                 OBD_FREE(rule->tr_jobids_str,
831                          strlen(start->tc_jobids_str) + 1);
832         return rc;
833 }
834
835 static int
836 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
837 {
838         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
839                           rule->tr_jobids_str, rule->tr_rpc_rate,
840                           atomic_read(&rule->tr_ref) - 1);
841         return 0;
842 }
843
844 static int
845 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
846                          struct nrs_tbf_client *cli)
847 {
848         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
849 }
850
851 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
852 {
853         if (!list_empty(&rule->tr_jobids))
854                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
855         LASSERT(rule->tr_jobids_str != NULL);
856         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
857 }
858
859 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
860         .o_name = NRS_TBF_TYPE_JOBID,
861         .o_startup = nrs_tbf_jobid_startup,
862         .o_cli_find = nrs_tbf_jobid_cli_find,
863         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
864         .o_cli_put = nrs_tbf_jobid_cli_put,
865         .o_cli_init = nrs_tbf_jobid_cli_init,
866         .o_rule_init = nrs_tbf_jobid_rule_init,
867         .o_rule_dump = nrs_tbf_jobid_rule_dump,
868         .o_rule_match = nrs_tbf_jobid_rule_match,
869         .o_rule_fini = nrs_tbf_jobid_rule_fini,
870 };
871
872 /**
873  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
874  *
875  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
876  * nrs_tbf_client objects.
877  */
878 #define NRS_TBF_NID_BKT_BITS    8
879 #define NRS_TBF_NID_BITS        16
880
881 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
882                                   unsigned mask)
883 {
884         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
885 }
886
887 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
888 {
889         lnet_nid_t            *nid = (lnet_nid_t *)key;
890         struct nrs_tbf_client *cli = hlist_entry(hnode,
891                                                      struct nrs_tbf_client,
892                                                      tc_hnode);
893
894         return *nid == cli->tc_nid;
895 }
896
897 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
898 {
899         struct nrs_tbf_client *cli = hlist_entry(hnode,
900                                                      struct nrs_tbf_client,
901                                                      tc_hnode);
902
903         return &cli->tc_nid;
904 }
905
906 static void *nrs_tbf_nid_hop_object(struct hlist_node *hnode)
907 {
908         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
909 }
910
911 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
912 {
913         struct nrs_tbf_client *cli = hlist_entry(hnode,
914                                                      struct nrs_tbf_client,
915                                                      tc_hnode);
916
917         atomic_inc(&cli->tc_ref);
918 }
919
920 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
921 {
922         struct nrs_tbf_client *cli = hlist_entry(hnode,
923                                                      struct nrs_tbf_client,
924                                                      tc_hnode);
925
926         atomic_dec(&cli->tc_ref);
927 }
928
929 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
930 {
931         struct nrs_tbf_client *cli = hlist_entry(hnode,
932                                                      struct nrs_tbf_client,
933                                                      tc_hnode);
934
935         LASSERTF(atomic_read(&cli->tc_ref) == 0,
936                  "Busy TBF object from client with NID %s, with %d refs\n",
937                  libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
938
939         nrs_tbf_cli_fini(cli);
940 }
941
942 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
943         .hs_hash        = nrs_tbf_nid_hop_hash,
944         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
945         .hs_key         = nrs_tbf_nid_hop_key,
946         .hs_object      = nrs_tbf_nid_hop_object,
947         .hs_get         = nrs_tbf_nid_hop_get,
948         .hs_put         = nrs_tbf_nid_hop_put,
949         .hs_put_locked  = nrs_tbf_nid_hop_put,
950         .hs_exit        = nrs_tbf_nid_hop_exit,
951 };
952
953 static struct nrs_tbf_client *
954 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
955                      struct ptlrpc_request *req)
956 {
957         return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
958 }
959
960 static struct nrs_tbf_client *
961 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
962                         struct nrs_tbf_client *cli)
963 {
964         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
965                                        &cli->tc_hnode);
966 }
967
968 static void
969 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
970                       struct nrs_tbf_client *cli)
971 {
972         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
973 }
974
975 static int
976 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
977                     struct nrs_tbf_head *head)
978 {
979         struct nrs_tbf_cmd      start;
980         int rc;
981
982         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
983                                             NRS_TBF_NID_BITS,
984                                             NRS_TBF_NID_BITS,
985                                             NRS_TBF_NID_BKT_BITS, 0,
986                                             CFS_HASH_MIN_THETA,
987                                             CFS_HASH_MAX_THETA,
988                                             &nrs_tbf_nid_hash_ops,
989                                             CFS_HASH_RW_BKTLOCK);
990         if (head->th_cli_hash == NULL)
991                 return -ENOMEM;
992
993         memset(&start, 0, sizeof(start));
994         start.tc_nids_str = "*";
995
996         start.tc_rpc_rate = tbf_rate;
997         start.tc_rule_flags = NTRS_DEFAULT;
998         start.tc_name = NRS_TBF_DEFAULT_RULE;
999         INIT_LIST_HEAD(&start.tc_nids);
1000         rc = nrs_tbf_rule_start(policy, head, &start);
1001
1002         return rc;
1003 }
1004
1005 static void
1006 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1007                              struct ptlrpc_request *req)
1008 {
1009         cli->tc_nid = req->rq_peer.nid;
1010 }
1011
1012 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1013                                  struct nrs_tbf_rule *rule,
1014                                  struct nrs_tbf_cmd *start)
1015 {
1016         LASSERT(start->tc_nids_str);
1017         OBD_ALLOC(rule->tr_nids_str,
1018                   strlen(start->tc_nids_str) + 1);
1019         if (rule->tr_nids_str == NULL)
1020                 return -ENOMEM;
1021
1022         memcpy(rule->tr_nids_str,
1023                start->tc_nids_str,
1024                strlen(start->tc_nids_str));
1025
1026         INIT_LIST_HEAD(&rule->tr_nids);
1027         if (!list_empty(&start->tc_nids)) {
1028                 if (cfs_parse_nidlist(rule->tr_nids_str,
1029                                       strlen(rule->tr_nids_str),
1030                                       &rule->tr_nids) <= 0) {
1031                         CERROR("nids {%s} illegal\n",
1032                                rule->tr_nids_str);
1033                         OBD_FREE(rule->tr_nids_str,
1034                                  strlen(start->tc_nids_str) + 1);
1035                         return -EINVAL;
1036                 }
1037         }
1038         return 0;
1039 }
1040
1041 static int
1042 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1043 {
1044         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1045                           rule->tr_nids_str, rule->tr_rpc_rate,
1046                           atomic_read(&rule->tr_ref) - 1);
1047         return 0;
1048 }
1049
1050 static int
1051 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1052                        struct nrs_tbf_client *cli)
1053 {
1054         return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1055 }
1056
1057 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1058 {
1059         if (!list_empty(&rule->tr_nids))
1060                 cfs_free_nidlist(&rule->tr_nids);
1061         LASSERT(rule->tr_nids_str != NULL);
1062         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1063 }
1064
1065 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1066 {
1067         if (!list_empty(&cmd->tc_nids))
1068                 cfs_free_nidlist(&cmd->tc_nids);
1069         if (cmd->tc_nids_str)
1070                 OBD_FREE(cmd->tc_nids_str, strlen(cmd->tc_nids_str) + 1);
1071 }
1072
1073 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, const char *id)
1074 {
1075         OBD_ALLOC(cmd->tc_nids_str, strlen(id) + 1);
1076         if (cmd->tc_nids_str == NULL)
1077                 return -ENOMEM;
1078
1079         memcpy(cmd->tc_nids_str, id, strlen(id));
1080
1081         /* parse NID list */
1082         if (cfs_parse_nidlist(cmd->tc_nids_str,
1083                               strlen(cmd->tc_nids_str),
1084                               &cmd->tc_nids) <= 0) {
1085                 nrs_tbf_nid_cmd_fini(cmd);
1086                 return -EINVAL;
1087         }
1088
1089         return 0;
1090 }
1091
1092 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1093         .o_name = NRS_TBF_TYPE_NID,
1094         .o_startup = nrs_tbf_nid_startup,
1095         .o_cli_find = nrs_tbf_nid_cli_find,
1096         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1097         .o_cli_put = nrs_tbf_nid_cli_put,
1098         .o_cli_init = nrs_tbf_nid_cli_init,
1099         .o_rule_init = nrs_tbf_nid_rule_init,
1100         .o_rule_dump = nrs_tbf_nid_rule_dump,
1101         .o_rule_match = nrs_tbf_nid_rule_match,
1102         .o_rule_fini = nrs_tbf_nid_rule_fini,
1103 };
1104
1105 /**
1106  * Is called before the policy transitions into
1107  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
1108  * policy-specific private data structure.
1109  *
1110  * \param[in] policy The policy to start
1111  *
1112  * \retval -ENOMEM OOM error
1113  * \retval  0      success
1114  *
1115  * \see nrs_policy_register()
1116  * \see nrs_policy_ctl()
1117  */
1118 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
1119 {
1120         struct nrs_tbf_head     *head;
1121         struct nrs_tbf_ops      *ops;
1122         __u32                    type;
1123         int rc = 0;
1124
1125         if (arg == NULL || strlen(arg) > NRS_TBF_TYPE_MAX_LEN)
1126                 GOTO(out, rc = -EINVAL);
1127
1128         if (strcmp(arg, NRS_TBF_TYPE_NID) == 0) {
1129                 ops = &nrs_tbf_nid_ops;
1130                 type = NRS_TBF_FLAG_NID;
1131         } else if (strcmp(arg, NRS_TBF_TYPE_JOBID) == 0) {
1132                 ops = &nrs_tbf_jobid_ops;
1133                 type = NRS_TBF_FLAG_JOBID;
1134         } else
1135                 GOTO(out, rc = -ENOTSUPP);
1136
1137         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
1138         if (head == NULL)
1139                 GOTO(out, rc = -ENOMEM);
1140
1141         memcpy(head->th_type, arg, strlen(arg));
1142         head->th_type[strlen(arg)] = '\0';
1143         head->th_ops = ops;
1144         head->th_type_flag = type;
1145
1146         head->th_binheap = cfs_binheap_create(&nrs_tbf_heap_ops,
1147                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
1148                                               nrs_pol2cptab(policy),
1149                                               nrs_pol2cptid(policy));
1150         if (head->th_binheap == NULL)
1151                 GOTO(out_free_head, rc = -ENOMEM);
1152
1153         atomic_set(&head->th_rule_sequence, 0);
1154         spin_lock_init(&head->th_rule_lock);
1155         INIT_LIST_HEAD(&head->th_list);
1156         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
1157         head->th_timer.function = nrs_tbf_timer_cb;
1158         rc = head->th_ops->o_startup(policy, head);
1159         if (rc)
1160                 GOTO(out_free_heap, rc);
1161
1162         policy->pol_private = head;
1163         return 0;
1164 out_free_heap:
1165         cfs_binheap_destroy(head->th_binheap);
1166 out_free_head:
1167         OBD_FREE_PTR(head);
1168 out:
1169         return rc;
1170 }
1171
1172 /**
1173  * Is called before the policy transitions into
1174  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
1175  * private data structure.
1176  *
1177  * \param[in] policy The policy to stop
1178  *
1179  * \see nrs_policy_stop0()
1180  */
1181 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
1182 {
1183         struct nrs_tbf_head *head = policy->pol_private;
1184         struct ptlrpc_nrs *nrs = policy->pol_nrs;
1185         struct nrs_tbf_rule *rule, *n;
1186
1187         LASSERT(head != NULL);
1188         LASSERT(head->th_cli_hash != NULL);
1189         hrtimer_cancel(&head->th_timer);
1190         /* Should cleanup hash first before free rules */
1191         cfs_hash_putref(head->th_cli_hash);
1192         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
1193                 list_del_init(&rule->tr_linkage);
1194                 nrs_tbf_rule_put(rule);
1195         }
1196         LASSERT(list_empty(&head->th_list));
1197         LASSERT(head->th_binheap != NULL);
1198         LASSERT(cfs_binheap_is_empty(head->th_binheap));
1199         cfs_binheap_destroy(head->th_binheap);
1200         OBD_FREE_PTR(head);
1201         nrs->nrs_throttling = 0;
1202         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
1203 }
1204
1205 /**
1206  * Performs a policy-specific ctl function on TBF policy instances; similar
1207  * to ioctl.
1208  *
1209  * \param[in]     policy the policy instance
1210  * \param[in]     opc    the opcode
1211  * \param[in,out] arg    used for passing parameters and information
1212  *
1213  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
1214  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
1215  *
1216  * \retval 0   operation carried out successfully
1217  * \retval -ve error
1218  */
1219 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
1220                        enum ptlrpc_nrs_ctl opc,
1221                        void *arg)
1222 {
1223         int rc = 0;
1224         ENTRY;
1225
1226         assert_spin_locked(&policy->pol_nrs->nrs_lock);
1227
1228         switch ((enum nrs_ctl_tbf)opc) {
1229         default:
1230                 RETURN(-EINVAL);
1231
1232         /**
1233          * Read RPC rate size of a policy instance.
1234          */
1235         case NRS_CTL_TBF_RD_RULE: {
1236                 struct nrs_tbf_head *head = policy->pol_private;
1237                 struct seq_file *m = (struct seq_file *) arg;
1238                 struct ptlrpc_service_part *svcpt;
1239
1240                 svcpt = policy->pol_nrs->nrs_svcpt;
1241                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
1242
1243                 rc = nrs_tbf_rule_dump_all(head, m);
1244                 }
1245                 break;
1246
1247         /**
1248          * Write RPC rate of a policy instance.
1249          */
1250         case NRS_CTL_TBF_WR_RULE: {
1251                 struct nrs_tbf_head *head = policy->pol_private;
1252                 struct nrs_tbf_cmd *cmd;
1253
1254                 cmd = (struct nrs_tbf_cmd *)arg;
1255                 rc = nrs_tbf_command(policy,
1256                                      head,
1257                                      cmd);
1258                 }
1259                 break;
1260         }
1261
1262         RETURN(rc);
1263 }
1264
1265 /**
1266  * Is called for obtaining a TBF policy resource.
1267  *
1268  * \param[in]  policy     The policy on which the request is being asked for
1269  * \param[in]  nrq        The request for which resources are being taken
1270  * \param[in]  parent     Parent resource, unused in this policy
1271  * \param[out] resp       Resources references are placed in this array
1272  * \param[in]  moving_req Signifies limited caller context; unused in this
1273  *                        policy
1274  *
1275  *
1276  * \see nrs_resource_get_safe()
1277  */
1278 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
1279                            struct ptlrpc_nrs_request *nrq,
1280                            const struct ptlrpc_nrs_resource *parent,
1281                            struct ptlrpc_nrs_resource **resp,
1282                            bool moving_req)
1283 {
1284         struct nrs_tbf_head   *head;
1285         struct nrs_tbf_client *cli;
1286         struct nrs_tbf_client *tmp;
1287         struct ptlrpc_request *req;
1288
1289         if (parent == NULL) {
1290                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
1291                 return 0;
1292         }
1293
1294         head = container_of(parent, struct nrs_tbf_head, th_res);
1295         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
1296         cli = head->th_ops->o_cli_find(head, req);
1297         if (cli != NULL) {
1298                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
1299                 LASSERT(cli->tc_rule);
1300                 if (cli->tc_rule_sequence !=
1301                     atomic_read(&head->th_rule_sequence) ||
1302                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
1303                         struct nrs_tbf_rule *rule;
1304
1305                         rule = nrs_tbf_rule_match(head, cli);
1306                         if (rule != cli->tc_rule)
1307                                 nrs_tbf_cli_reset(head, rule, cli);
1308                         else
1309                                 nrs_tbf_rule_put(rule);
1310                 } else if (cli->tc_rule_generation !=
1311                            cli->tc_rule->tr_generation) {
1312                         nrs_tbf_cli_reset_value(head, cli);
1313                 }
1314                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
1315                 goto out;
1316         }
1317
1318         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
1319                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
1320         if (cli == NULL)
1321                 return -ENOMEM;
1322         nrs_tbf_cli_init(head, cli, req);
1323         tmp = head->th_ops->o_cli_findadd(head, cli);
1324         if (tmp != cli) {
1325                 atomic_dec(&cli->tc_ref);
1326                 nrs_tbf_cli_fini(cli);
1327                 cli = tmp;
1328         }
1329 out:
1330         *resp = &cli->tc_res;
1331
1332         return 1;
1333 }
1334
1335 /**
1336  * Called when releasing references to the resource hierachy obtained for a
1337  * request for scheduling using the TBF policy.
1338  *
1339  * \param[in] policy   the policy the resource belongs to
1340  * \param[in] res      the resource to be released
1341  */
1342 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
1343                             const struct ptlrpc_nrs_resource *res)
1344 {
1345         struct nrs_tbf_head   *head;
1346         struct nrs_tbf_client *cli;
1347
1348         /**
1349          * Do nothing for freeing parent, nrs_tbf_net resources
1350          */
1351         if (res->res_parent == NULL)
1352                 return;
1353
1354         cli = container_of(res, struct nrs_tbf_client, tc_res);
1355         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
1356
1357         head->th_ops->o_cli_put(head, cli);
1358 }
1359
1360 /**
1361  * Called when getting a request from the TBF policy for handling, or just
1362  * peeking; removes the request from the policy when it is to be handled.
1363  *
1364  * \param[in] policy The policy
1365  * \param[in] peek   When set, signifies that we just want to examine the
1366  *                   request, and not handle it, so the request is not removed
1367  *                   from the policy.
1368  * \param[in] force  Force the policy to return a request; unused in this
1369  *                   policy
1370  *
1371  * \retval The request to be handled; this is the next request in the TBF
1372  *         rule
1373  *
1374  * \see ptlrpc_nrs_req_get_nolock()
1375  * \see nrs_request_get()
1376  */
1377 static
1378 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
1379                                            bool peek, bool force)
1380 {
1381         struct nrs_tbf_head       *head = policy->pol_private;
1382         struct ptlrpc_nrs_request *nrq = NULL;
1383         struct nrs_tbf_client     *cli;
1384         struct cfs_binheap_node   *node;
1385
1386         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
1387
1388         if (!peek && policy->pol_nrs->nrs_throttling)
1389                 return NULL;
1390
1391         node = cfs_binheap_root(head->th_binheap);
1392         if (unlikely(node == NULL))
1393                 return NULL;
1394
1395         cli = container_of(node, struct nrs_tbf_client, tc_node);
1396         LASSERT(cli->tc_in_heap);
1397         if (peek) {
1398                 nrq = list_entry(cli->tc_list.next,
1399                                      struct ptlrpc_nrs_request,
1400                                      nr_u.tbf.tr_list);
1401         } else {
1402                 __u64 now = ktime_to_ns(ktime_get());
1403                 __u64 passed;
1404                 __u64 ntoken;
1405                 __u64 deadline;
1406
1407                 deadline = cli->tc_check_time +
1408                           cli->tc_nsecs;
1409                 LASSERT(now >= cli->tc_check_time);
1410                 passed = now - cli->tc_check_time;
1411                 ntoken = passed * cli->tc_rpc_rate;
1412                 do_div(ntoken, NSEC_PER_SEC);
1413                 ntoken += cli->tc_ntoken;
1414                 if (ntoken > cli->tc_depth)
1415                         ntoken = cli->tc_depth;
1416                 if (ntoken > 0) {
1417                         struct ptlrpc_request *req;
1418                         nrq = list_entry(cli->tc_list.next,
1419                                              struct ptlrpc_nrs_request,
1420                                              nr_u.tbf.tr_list);
1421                         req = container_of(nrq,
1422                                            struct ptlrpc_request,
1423                                            rq_nrq);
1424                         ntoken--;
1425                         cli->tc_ntoken = ntoken;
1426                         cli->tc_check_time = now;
1427                         list_del_init(&nrq->nr_u.tbf.tr_list);
1428                         if (list_empty(&cli->tc_list)) {
1429                                 cfs_binheap_remove(head->th_binheap,
1430                                                    &cli->tc_node);
1431                                 cli->tc_in_heap = false;
1432                         } else {
1433                                 cfs_binheap_relocate(head->th_binheap,
1434                                                      &cli->tc_node);
1435                         }
1436                         CDEBUG(D_RPCTRACE,
1437                                "NRS start %s request from %s, "
1438                                "seq: "LPU64"\n",
1439                                policy->pol_desc->pd_name,
1440                                libcfs_id2str(req->rq_peer),
1441                                nrq->nr_u.tbf.tr_sequence);
1442                 } else {
1443                         ktime_t time;
1444
1445                         policy->pol_nrs->nrs_throttling = 1;
1446                         head->th_deadline = deadline;
1447                         time = ktime_set(0, 0);
1448                         time = ktime_add_ns(time, deadline);
1449                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
1450                 }
1451         }
1452
1453         return nrq;
1454 }
1455
1456 /**
1457  * Adds request \a nrq to \a policy's list of queued requests
1458  *
1459  * \param[in] policy The policy
1460  * \param[in] nrq    The request to add
1461  *
1462  * \retval 0 success; nrs_request_enqueue() assumes this function will always
1463  *                    succeed
1464  */
1465 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
1466                            struct ptlrpc_nrs_request *nrq)
1467 {
1468         struct nrs_tbf_head   *head;
1469         struct nrs_tbf_client *cli;
1470         int                    rc = 0;
1471
1472         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
1473
1474         cli = container_of(nrs_request_resource(nrq),
1475                            struct nrs_tbf_client, tc_res);
1476         head = container_of(nrs_request_resource(nrq)->res_parent,
1477                             struct nrs_tbf_head, th_res);
1478         if (list_empty(&cli->tc_list)) {
1479                 LASSERT(!cli->tc_in_heap);
1480                 rc = cfs_binheap_insert(head->th_binheap, &cli->tc_node);
1481                 if (rc == 0) {
1482                         cli->tc_in_heap = true;
1483                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
1484                         list_add_tail(&nrq->nr_u.tbf.tr_list,
1485                                           &cli->tc_list);
1486                         if (policy->pol_nrs->nrs_throttling) {
1487                                 __u64 deadline = cli->tc_check_time +
1488                                                  cli->tc_nsecs;
1489                                 if ((head->th_deadline > deadline) &&
1490                                     (hrtimer_try_to_cancel(&head->th_timer)
1491                                      >= 0)) {
1492                                         ktime_t time;
1493                                         head->th_deadline = deadline;
1494                                         time = ktime_set(0, 0);
1495                                         time = ktime_add_ns(time, deadline);
1496                                         hrtimer_start(&head->th_timer, time,
1497                                                       HRTIMER_MODE_ABS);
1498                                 }
1499                         }
1500                 }
1501         } else {
1502                 LASSERT(cli->tc_in_heap);
1503                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
1504                 list_add_tail(&nrq->nr_u.tbf.tr_list,
1505                                   &cli->tc_list);
1506         }
1507         return rc;
1508 }
1509
1510 /**
1511  * Removes request \a nrq from \a policy's list of queued requests.
1512  *
1513  * \param[in] policy The policy
1514  * \param[in] nrq    The request to remove
1515  */
1516 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
1517                              struct ptlrpc_nrs_request *nrq)
1518 {
1519         struct nrs_tbf_head   *head;
1520         struct nrs_tbf_client *cli;
1521
1522         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
1523
1524         cli = container_of(nrs_request_resource(nrq),
1525                            struct nrs_tbf_client, tc_res);
1526         head = container_of(nrs_request_resource(nrq)->res_parent,
1527                             struct nrs_tbf_head, th_res);
1528
1529         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
1530         list_del_init(&nrq->nr_u.tbf.tr_list);
1531         if (list_empty(&cli->tc_list)) {
1532                 cfs_binheap_remove(head->th_binheap,
1533                                    &cli->tc_node);
1534                 cli->tc_in_heap = false;
1535         } else {
1536                 cfs_binheap_relocate(head->th_binheap,
1537                                      &cli->tc_node);
1538         }
1539 }
1540
1541 /**
1542  * Prints a debug statement right before the request \a nrq stops being
1543  * handled.
1544  *
1545  * \param[in] policy The policy handling the request
1546  * \param[in] nrq    The request being handled
1547  *
1548  * \see ptlrpc_server_finish_request()
1549  * \see ptlrpc_nrs_req_stop_nolock()
1550  */
1551 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
1552                               struct ptlrpc_nrs_request *nrq)
1553 {
1554         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
1555                                                   rq_nrq);
1556
1557         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
1558
1559         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: "LPU64"\n",
1560                policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
1561                nrq->nr_u.tbf.tr_sequence);
1562 }
1563
1564 #ifdef CONFIG_PROC_FS
1565
1566 /**
1567  * lprocfs interface
1568  */
1569
1570 /**
1571  * The maximum RPC rate.
1572  */
1573 #define LPROCFS_NRS_RATE_MAX            65535
1574
1575 static int
1576 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
1577 {
1578         struct ptlrpc_service       *svc = m->private;
1579         int                          rc;
1580
1581         seq_printf(m, "regular_requests:\n");
1582         /**
1583          * Perform two separate calls to this as only one of the NRS heads'
1584          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
1585          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
1586          */
1587         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
1588                                        NRS_POL_NAME_TBF,
1589                                        NRS_CTL_TBF_RD_RULE,
1590                                        false, m);
1591         if (rc == 0) {
1592                 /**
1593                  * -ENOSPC means buf in the parameter m is overflow, return 0
1594                  * here to let upper layer function seq_read alloc a larger
1595                  * memory area and do this process again.
1596                  */
1597         } else if (rc == -ENOSPC) {
1598                 return 0;
1599
1600                 /**
1601                  * Ignore -ENODEV as the regular NRS head's policy may be in the
1602                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
1603                  */
1604         } else if (rc != -ENODEV) {
1605                 return rc;
1606         }
1607
1608         if (!nrs_svc_has_hp(svc))
1609                 goto no_hp;
1610
1611         seq_printf(m, "high_priority_requests:\n");
1612         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
1613                                        NRS_POL_NAME_TBF,
1614                                        NRS_CTL_TBF_RD_RULE,
1615                                        false, m);
1616         if (rc == 0) {
1617                 /**
1618                  * -ENOSPC means buf in the parameter m is overflow, return 0
1619                  * here to let upper layer function seq_read alloc a larger
1620                  * memory area and do this process again.
1621                  */
1622         } else if (rc == -ENOSPC) {
1623                 return 0;
1624         }
1625
1626 no_hp:
1627
1628         return rc;
1629 }
1630
1631 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char **val)
1632 {
1633         int rc;
1634         char *token;
1635
1636         token = strsep(val, "}");
1637         if (*val == NULL)
1638                 GOTO(out, rc = -EINVAL);
1639
1640         if (strlen(token) <= 1 ||
1641             token[0] != '{')
1642                 GOTO(out, rc = -EINVAL);
1643         /* Skip '{' */
1644         token++;
1645
1646         /* Should be followed by ' ' or nothing */
1647         if ((*val)[0] == '\0')
1648                 *val = NULL;
1649         else if ((*val)[0] == ' ')
1650                 (*val)++;
1651         else
1652                 GOTO(out, rc = -EINVAL);
1653
1654         rc = nrs_tbf_jobid_parse(cmd, token);
1655         if (!rc)
1656                 cmd->tc_valid_types |= NRS_TBF_FLAG_JOBID;
1657
1658         rc = nrs_tbf_nid_parse(cmd, token);
1659         if (!rc)
1660                 cmd->tc_valid_types |= NRS_TBF_FLAG_NID;
1661
1662         if (!cmd->tc_valid_types)
1663                 rc = -EINVAL;
1664         else
1665                 rc = 0;
1666 out:
1667         return rc;
1668 }
1669
1670
1671 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
1672 {
1673         if (cmd->tc_valid_types & NRS_TBF_FLAG_JOBID)
1674                 nrs_tbf_jobid_cmd_fini(cmd);
1675         if (cmd->tc_valid_types & NRS_TBF_FLAG_NID)
1676                 nrs_tbf_nid_cmd_fini(cmd);
1677 }
1678
1679 static struct nrs_tbf_cmd *
1680 nrs_tbf_parse_cmd(char *buffer, unsigned long count)
1681 {
1682         static struct nrs_tbf_cmd *cmd;
1683         char                      *token;
1684         char                      *val;
1685         int                        i;
1686         int                        rc = 0;
1687
1688         OBD_ALLOC_PTR(cmd);
1689         if (cmd == NULL)
1690                 GOTO(out, rc = -ENOMEM);
1691
1692         val = buffer;
1693         token = strsep(&val, " ");
1694         if (val == NULL || strlen(val) == 0)
1695                 GOTO(out_free_cmd, rc = -EINVAL);
1696
1697         /* Type of the command */
1698         if (strcmp(token, "start") == 0)
1699                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
1700         else if (strcmp(token, "stop") == 0)
1701                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
1702         else if (strcmp(token, "change") == 0)
1703                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RATE;
1704         else
1705                 GOTO(out_free_cmd, rc = -EINVAL);
1706
1707         /* Name of the rule */
1708         token = strsep(&val, " ");
1709         if (val == NULL) {
1710                 /**
1711                  * Stop comand only need name argument,
1712                  * But other commands need ID or rate argument.
1713                  */
1714                 if (cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE)
1715                         GOTO(out_free_cmd, rc = -EINVAL);
1716         }
1717
1718         for (i = 0; i < strlen(token); i++) {
1719                 if ((!isalnum(token[i])) &&
1720                     (token[i] != '_'))
1721                         GOTO(out_free_cmd, rc = -EINVAL);
1722         }
1723         cmd->tc_name = token;
1724
1725         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
1726                 /* List of ID */
1727                 LASSERT(val);
1728                 rc = nrs_tbf_id_parse(cmd, &val);
1729                 if (rc)
1730                         GOTO(out_free_cmd, rc);
1731         }
1732
1733         if (val != NULL) {
1734                 if (cmd->tc_cmd == NRS_CTL_TBF_STOP_RULE ||
1735                     strlen(val) == 0 || !isdigit(val[0]))
1736                         GOTO(out_free_nid, rc = -EINVAL);
1737
1738                 cmd->tc_rpc_rate = simple_strtoull(val, NULL, 10);
1739                 if (cmd->tc_rpc_rate <= 0 ||
1740                     cmd->tc_rpc_rate >= LPROCFS_NRS_RATE_MAX)
1741                         GOTO(out_free_nid, rc = -EINVAL);
1742         } else {
1743                 if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RATE)
1744                         GOTO(out_free_nid, rc = -EINVAL);
1745                 /* No RPC rate given */
1746                 cmd->tc_rpc_rate = tbf_rate;
1747         }
1748         goto out;
1749 out_free_nid:
1750         nrs_tbf_cmd_fini(cmd);
1751 out_free_cmd:
1752         OBD_FREE_PTR(cmd);
1753 out:
1754         if (rc)
1755                 cmd = ERR_PTR(rc);
1756         return cmd;
1757 }
1758
1759 extern struct nrs_core nrs_core;
1760 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
1761 static ssize_t
1762 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
1763                                       const char __user *buffer,
1764                                       size_t count, loff_t *off)
1765 {
1766         struct seq_file           *m = file->private_data;
1767         struct ptlrpc_service     *svc = m->private;
1768         char                      *kernbuf;
1769         char                      *val;
1770         int                        rc;
1771         static struct nrs_tbf_cmd *cmd;
1772         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
1773         unsigned long              length;
1774         char                      *token;
1775
1776         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
1777         if (kernbuf == NULL)
1778                 GOTO(out, rc = -ENOMEM);
1779
1780         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
1781                 GOTO(out_free_kernbuff, rc = -EINVAL);
1782
1783         if (copy_from_user(kernbuf, buffer, count))
1784                 GOTO(out_free_kernbuff, rc = -EFAULT);
1785
1786         val = kernbuf;
1787         token = strsep(&val, " ");
1788         if (val == NULL)
1789                 GOTO(out_free_kernbuff, rc = -EINVAL);
1790
1791         if (strcmp(token, "reg") == 0) {
1792                 queue = PTLRPC_NRS_QUEUE_REG;
1793         } else if (strcmp(token, "hp") == 0) {
1794                 queue = PTLRPC_NRS_QUEUE_HP;
1795         } else {
1796                 kernbuf[strlen(token)] = ' ';
1797                 val = kernbuf;
1798         }
1799         length = strlen(val);
1800
1801         if (length == 0)
1802                 GOTO(out_free_kernbuff, rc = -EINVAL);
1803
1804         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
1805                 GOTO(out_free_kernbuff, rc = -ENODEV);
1806         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
1807                 queue = PTLRPC_NRS_QUEUE_REG;
1808
1809         cmd = nrs_tbf_parse_cmd(val, length);
1810         if (IS_ERR(cmd))
1811                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
1812
1813         /**
1814          * Serialize NRS core lprocfs operations with policy registration/
1815          * unregistration.
1816          */
1817         mutex_lock(&nrs_core.nrs_mutex);
1818         rc = ptlrpc_nrs_policy_control(svc, queue,
1819                                        NRS_POL_NAME_TBF,
1820                                        NRS_CTL_TBF_WR_RULE,
1821                                        false, cmd);
1822         mutex_unlock(&nrs_core.nrs_mutex);
1823
1824         nrs_tbf_cmd_fini(cmd);
1825         OBD_FREE_PTR(cmd);
1826 out_free_kernbuff:
1827         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
1828 out:
1829         return rc ? rc : count;
1830 }
1831 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
1832
1833 /**
1834  * Initializes a TBF policy's lprocfs interface for service \a svc
1835  *
1836  * \param[in] svc the service
1837  *
1838  * \retval 0    success
1839  * \retval != 0 error
1840  */
1841 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
1842 {
1843         struct lprocfs_vars nrs_tbf_lprocfs_vars[] = {
1844                 { .name         = "nrs_tbf_rule",
1845                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
1846                   .data = svc },
1847                 { NULL }
1848         };
1849
1850         if (svc->srv_procroot == NULL)
1851                 return 0;
1852
1853         return lprocfs_add_vars(svc->srv_procroot, nrs_tbf_lprocfs_vars, NULL);
1854 }
1855
1856 /**
1857  * Cleans up a TBF policy's lprocfs interface for service \a svc
1858  *
1859  * \param[in] svc the service
1860  */
1861 static void nrs_tbf_lprocfs_fini(struct ptlrpc_service *svc)
1862 {
1863         if (svc->srv_procroot == NULL)
1864                 return;
1865
1866         lprocfs_remove_proc_entry("nrs_tbf_rule", svc->srv_procroot);
1867 }
1868
1869 #endif /* CONFIG_PROC_FS */
1870
1871 /**
1872  * TBF policy operations
1873  */
1874 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
1875         .op_policy_start        = nrs_tbf_start,
1876         .op_policy_stop         = nrs_tbf_stop,
1877         .op_policy_ctl          = nrs_tbf_ctl,
1878         .op_res_get             = nrs_tbf_res_get,
1879         .op_res_put             = nrs_tbf_res_put,
1880         .op_req_get             = nrs_tbf_req_get,
1881         .op_req_enqueue         = nrs_tbf_req_add,
1882         .op_req_dequeue         = nrs_tbf_req_del,
1883         .op_req_stop            = nrs_tbf_req_stop,
1884 #ifdef CONFIG_PROC_FS
1885         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
1886         .op_lprocfs_fini        = nrs_tbf_lprocfs_fini,
1887 #endif
1888 };
1889
1890 /**
1891  * TBF policy configuration
1892  */
1893 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
1894         .nc_name                = NRS_POL_NAME_TBF,
1895         .nc_ops                 = &nrs_tbf_ops,
1896         .nc_compat              = nrs_policy_compat_all,
1897 };
1898
1899 /** @} tbf */
1900
1901 /** @} nrs */
1902
1903 #endif /* HAVE_SERVER_SUPPORT */