Whamcloud - gitweb
LU-9228 nrs: TBF realtime policies under congestion
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 #ifdef HAVE_SERVER_SUPPORT
35
36 /**
37  * \addtogoup nrs
38  * @{
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <libcfs/libcfs.h>
45 #include "ptlrpc_internal.h"
46
47 /**
48  * \name tbf
49  *
50  * Token Bucket Filter over client NIDs
51  *
52  * @{
53  */
54
55 #define NRS_POL_NAME_TBF        "tbf"
56
57 static int tbf_jobid_cache_size = 8192;
58 module_param(tbf_jobid_cache_size, int, 0644);
59 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
60
61 static int tbf_rate = 10000;
62 module_param(tbf_rate, int, 0644);
63 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
64
65 static int tbf_depth = 3;
66 module_param(tbf_depth, int, 0644);
67 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
68
69 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
70 {
71         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
72                                                  th_timer);
73         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
74         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
75
76         nrs->nrs_throttling = 0;
77         wake_up(&svcpt->scp_waitq);
78
79         return HRTIMER_NORESTART;
80 }
81
82 #define NRS_TBF_DEFAULT_RULE "default"
83
84 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
85 {
86         LASSERT(atomic_read(&rule->tr_ref) == 0);
87         LASSERT(list_empty(&rule->tr_cli_list));
88         LASSERT(list_empty(&rule->tr_linkage));
89
90         rule->tr_head->th_ops->o_rule_fini(rule);
91         OBD_FREE_PTR(rule);
92 }
93
94 /**
95  * Decreases the rule's usage reference count, and stops the rule in case it
96  * was already stopping and have no more outstanding usage references (which
97  * indicates it has no more queued or started requests, and can be safely
98  * stopped).
99  */
100 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
101 {
102         if (atomic_dec_and_test(&rule->tr_ref))
103                 nrs_tbf_rule_fini(rule);
104 }
105
106 /**
107  * Increases the rule's usage reference count.
108  */
109 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
110 {
111         atomic_inc(&rule->tr_ref);
112 }
113
114 static void
115 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
116 {
117         LASSERT(!list_empty(&cli->tc_linkage));
118         LASSERT(cli->tc_rule);
119         spin_lock(&cli->tc_rule->tr_rule_lock);
120         list_del_init(&cli->tc_linkage);
121         spin_unlock(&cli->tc_rule->tr_rule_lock);
122         nrs_tbf_rule_put(cli->tc_rule);
123         cli->tc_rule = NULL;
124 }
125
126 static void
127 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
128                         struct nrs_tbf_client *cli)
129
130 {
131         struct nrs_tbf_rule *rule = cli->tc_rule;
132
133         cli->tc_rpc_rate = rule->tr_rpc_rate;
134         cli->tc_nsecs = rule->tr_nsecs;
135         cli->tc_depth = rule->tr_depth;
136         cli->tc_ntoken = rule->tr_depth;
137         cli->tc_check_time = ktime_to_ns(ktime_get());
138         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
139         cli->tc_rule_generation = rule->tr_generation;
140
141         if (cli->tc_in_heap)
142                 cfs_binheap_relocate(head->th_binheap,
143                                      &cli->tc_node);
144 }
145
146 static void
147 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
148                   struct nrs_tbf_rule *rule,
149                   struct nrs_tbf_client *cli)
150 {
151         spin_lock(&cli->tc_rule_lock);
152         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
153                 LASSERT(rule != cli->tc_rule);
154                 nrs_tbf_cli_rule_put(cli);
155         }
156         LASSERT(cli->tc_rule == NULL);
157         LASSERT(list_empty(&cli->tc_linkage));
158         /* Rule's ref is added before called */
159         cli->tc_rule = rule;
160         spin_lock(&rule->tr_rule_lock);
161         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
162         spin_unlock(&rule->tr_rule_lock);
163         spin_unlock(&cli->tc_rule_lock);
164         nrs_tbf_cli_reset_value(head, cli);
165 }
166
167 static int
168 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
169 {
170         return rule->tr_head->th_ops->o_rule_dump(rule, m);
171 }
172
173 static int
174 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
175 {
176         struct nrs_tbf_rule *rule;
177         int rc = 0;
178
179         LASSERT(head != NULL);
180         spin_lock(&head->th_rule_lock);
181         /* List the rules from newest to oldest */
182         list_for_each_entry(rule, &head->th_list, tr_linkage) {
183                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
184                 rc = nrs_tbf_rule_dump(rule, m);
185                 if (rc) {
186                         rc = -ENOSPC;
187                         break;
188                 }
189         }
190         spin_unlock(&head->th_rule_lock);
191
192         return rc;
193 }
194
195 static struct nrs_tbf_rule *
196 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
197                          const char *name)
198 {
199         struct nrs_tbf_rule *rule;
200
201         LASSERT(head != NULL);
202         list_for_each_entry(rule, &head->th_list, tr_linkage) {
203                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
204                 if (strcmp(rule->tr_name, name) == 0) {
205                         nrs_tbf_rule_get(rule);
206                         return rule;
207                 }
208         }
209         return NULL;
210 }
211
212 static struct nrs_tbf_rule *
213 nrs_tbf_rule_find(struct nrs_tbf_head *head,
214                   const char *name)
215 {
216         struct nrs_tbf_rule *rule;
217
218         LASSERT(head != NULL);
219         spin_lock(&head->th_rule_lock);
220         rule = nrs_tbf_rule_find_nolock(head, name);
221         spin_unlock(&head->th_rule_lock);
222         return rule;
223 }
224
225 static struct nrs_tbf_rule *
226 nrs_tbf_rule_match(struct nrs_tbf_head *head,
227                    struct nrs_tbf_client *cli)
228 {
229         struct nrs_tbf_rule *rule = NULL;
230         struct nrs_tbf_rule *tmp_rule;
231
232         spin_lock(&head->th_rule_lock);
233         /* Match the newest rule in the list */
234         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
235                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
236                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
237                         rule = tmp_rule;
238                         break;
239                 }
240         }
241
242         if (rule == NULL)
243                 rule = head->th_rule;
244
245         nrs_tbf_rule_get(rule);
246         spin_unlock(&head->th_rule_lock);
247         return rule;
248 }
249
250 static void
251 nrs_tbf_cli_init(struct nrs_tbf_head *head,
252                  struct nrs_tbf_client *cli,
253                  struct ptlrpc_request *req)
254 {
255         struct nrs_tbf_rule *rule;
256
257         memset(cli, 0, sizeof(*cli));
258         cli->tc_in_heap = false;
259         head->th_ops->o_cli_init(cli, req);
260         INIT_LIST_HEAD(&cli->tc_list);
261         INIT_LIST_HEAD(&cli->tc_linkage);
262         spin_lock_init(&cli->tc_rule_lock);
263         atomic_set(&cli->tc_ref, 1);
264         rule = nrs_tbf_rule_match(head, cli);
265         nrs_tbf_cli_reset(head, rule, cli);
266 }
267
268 static void
269 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
270 {
271         LASSERT(list_empty(&cli->tc_list));
272         LASSERT(!cli->tc_in_heap);
273         LASSERT(atomic_read(&cli->tc_ref) == 0);
274         spin_lock(&cli->tc_rule_lock);
275         nrs_tbf_cli_rule_put(cli);
276         spin_unlock(&cli->tc_rule_lock);
277         OBD_FREE_PTR(cli);
278 }
279
280 static int
281 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
282                    struct nrs_tbf_head *head,
283                    struct nrs_tbf_cmd *start)
284 {
285         struct nrs_tbf_rule     *rule;
286         struct nrs_tbf_rule     *tmp_rule;
287         struct nrs_tbf_rule     *next_rule;
288         char                    *next_name = start->u.tc_start.ts_next_name;
289         int                      rc;
290
291         rule = nrs_tbf_rule_find(head, start->tc_name);
292         if (rule) {
293                 nrs_tbf_rule_put(rule);
294                 return -EEXIST;
295         }
296
297         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
298         if (rule == NULL)
299                 return -ENOMEM;
300
301         memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
302         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
303         rule->tr_flags = start->u.tc_start.ts_rule_flags;
304         rule->tr_nsecs = NSEC_PER_SEC;
305         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
306         rule->tr_depth = tbf_depth;
307         atomic_set(&rule->tr_ref, 1);
308         INIT_LIST_HEAD(&rule->tr_cli_list);
309         INIT_LIST_HEAD(&rule->tr_nids);
310         INIT_LIST_HEAD(&rule->tr_linkage);
311         spin_lock_init(&rule->tr_rule_lock);
312         rule->tr_head = head;
313
314         rc = head->th_ops->o_rule_init(policy, rule, start);
315         if (rc) {
316                 OBD_FREE_PTR(rule);
317                 return rc;
318         }
319
320         /* Add as the newest rule */
321         spin_lock(&head->th_rule_lock);
322         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
323         if (tmp_rule) {
324                 spin_unlock(&head->th_rule_lock);
325                 nrs_tbf_rule_put(tmp_rule);
326                 nrs_tbf_rule_put(rule);
327                 return -EEXIST;
328         }
329
330         if (next_name) {
331                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
332                 if (!next_rule) {
333                         spin_unlock(&head->th_rule_lock);
334                         nrs_tbf_rule_put(rule);
335                         return -ENOENT;
336                 }
337
338                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
339                 nrs_tbf_rule_put(next_rule);
340         } else {
341                 /* Add on the top of the rule list */
342                 list_add(&rule->tr_linkage, &head->th_list);
343         }
344         spin_unlock(&head->th_rule_lock);
345         atomic_inc(&head->th_rule_sequence);
346         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
347                 rule->tr_flags |= NTRS_DEFAULT;
348                 LASSERT(head->th_rule == NULL);
349                 head->th_rule = rule;
350         }
351
352         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %llu gen %llu\n",
353                rule, rule->tr_rpc_rate, rule->tr_generation);
354
355         return 0;
356 }
357
358 /**
359  * Change the rank of a rule in the rule list
360  *
361  * The matched rule will be moved to the position right before another
362  * given rule.
363  *
364  * \param[in] policy    the policy instance
365  * \param[in] head      the TBF policy instance
366  * \param[in] name      the rule name to be moved
367  * \param[in] next_name the rule name before which the matched rule will be
368  *                      moved
369  *
370  */
371 static int
372 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
373                          struct nrs_tbf_head *head,
374                          char *name,
375                          char *next_name)
376 {
377         struct nrs_tbf_rule     *rule = NULL;
378         struct nrs_tbf_rule     *next_rule = NULL;
379         int                      rc = 0;
380
381         LASSERT(head != NULL);
382
383         spin_lock(&head->th_rule_lock);
384         rule = nrs_tbf_rule_find_nolock(head, name);
385         if (!rule)
386                 GOTO(out, rc = -ENOENT);
387
388         if (strcmp(name, next_name) == 0)
389                 GOTO(out_put, rc);
390
391         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
392         if (!next_rule)
393                 GOTO(out_put, rc = -ENOENT);
394
395         list_move(&rule->tr_linkage, next_rule->tr_linkage.prev);
396         nrs_tbf_rule_put(next_rule);
397 out_put:
398         nrs_tbf_rule_put(rule);
399 out:
400         spin_unlock(&head->th_rule_lock);
401         return rc;
402 }
403
404 static int
405 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
406                          struct nrs_tbf_head *head,
407                          char *name,
408                          __u64 rate)
409 {
410         struct nrs_tbf_rule *rule;
411
412         assert_spin_locked(&policy->pol_nrs->nrs_lock);
413
414         rule = nrs_tbf_rule_find(head, name);
415         if (rule == NULL)
416                 return -ENOENT;
417
418         rule->tr_rpc_rate = rate;
419         rule->tr_nsecs = NSEC_PER_SEC;
420         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
421         rule->tr_generation++;
422         nrs_tbf_rule_put(rule);
423
424         return 0;
425 }
426
427 static int
428 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
429                     struct nrs_tbf_head *head,
430                     struct nrs_tbf_cmd *change)
431 {
432         __u64    rate = change->u.tc_change.tc_rpc_rate;
433         char    *next_name = change->u.tc_change.tc_next_name;
434         int      rc;
435
436         if (rate != 0) {
437                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
438                                               rate);
439                 if (rc)
440                         return rc;
441         }
442
443         if (next_name) {
444                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
445                                               next_name);
446                 if (rc)
447                         return rc;
448         }
449
450         return 0;
451 }
452
453 static int
454 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
455                   struct nrs_tbf_head *head,
456                   struct nrs_tbf_cmd *stop)
457 {
458         struct nrs_tbf_rule *rule;
459
460         assert_spin_locked(&policy->pol_nrs->nrs_lock);
461
462         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
463                 return -EPERM;
464
465         rule = nrs_tbf_rule_find(head, stop->tc_name);
466         if (rule == NULL)
467                 return -ENOENT;
468
469         list_del_init(&rule->tr_linkage);
470         rule->tr_flags |= NTRS_STOPPING;
471         nrs_tbf_rule_put(rule);
472         nrs_tbf_rule_put(rule);
473
474         return 0;
475 }
476
477 static int
478 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
479                 struct nrs_tbf_head *head,
480                 struct nrs_tbf_cmd *cmd)
481 {
482         int rc;
483
484         assert_spin_locked(&policy->pol_nrs->nrs_lock);
485
486         switch (cmd->tc_cmd) {
487         case NRS_CTL_TBF_START_RULE:
488                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
489                         return -EINVAL;
490
491                 spin_unlock(&policy->pol_nrs->nrs_lock);
492                 rc = nrs_tbf_rule_start(policy, head, cmd);
493                 spin_lock(&policy->pol_nrs->nrs_lock);
494                 return rc;
495         case NRS_CTL_TBF_CHANGE_RULE:
496                 rc = nrs_tbf_rule_change(policy, head, cmd);
497                 return rc;
498         case NRS_CTL_TBF_STOP_RULE:
499                 rc = nrs_tbf_rule_stop(policy, head, cmd);
500                 /* Take it as a success, if not exists at all */
501                 return rc == -ENOENT ? 0 : rc;
502         default:
503                 return -EFAULT;
504         }
505 }
506
507 /**
508  * Binary heap predicate.
509  *
510  * \param[in] e1 the first binheap node to compare
511  * \param[in] e2 the second binheap node to compare
512  *
513  * \retval 0 e1 > e2
514  * \retval 1 e1 < e2
515  */
516 static int
517 tbf_cli_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
518 {
519         struct nrs_tbf_client *cli1;
520         struct nrs_tbf_client *cli2;
521
522         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
523         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
524
525         if (cli1->tc_deadline < cli2->tc_deadline)
526                 return 1;
527         else if (cli1->tc_deadline > cli2->tc_deadline)
528                 return 0;
529
530         if (cli1->tc_check_time < cli2->tc_check_time)
531                 return 1;
532         else if (cli1->tc_check_time > cli2->tc_check_time)
533                 return 0;
534
535         /* Maybe need more comparasion, e.g. request number in the rules */
536         return 1;
537 }
538
539 /**
540  * TBF binary heap operations
541  */
542 static struct cfs_binheap_ops nrs_tbf_heap_ops = {
543         .hop_enter      = NULL,
544         .hop_exit       = NULL,
545         .hop_compare    = tbf_cli_compare,
546 };
547
548 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
549                                   unsigned mask)
550 {
551         return cfs_hash_djb2_hash(key, strlen(key), mask);
552 }
553
554 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
555 {
556         struct nrs_tbf_client *cli = hlist_entry(hnode,
557                                                      struct nrs_tbf_client,
558                                                      tc_hnode);
559
560         return (strcmp(cli->tc_jobid, key) == 0);
561 }
562
563 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
564 {
565         struct nrs_tbf_client *cli = hlist_entry(hnode,
566                                                      struct nrs_tbf_client,
567                                                      tc_hnode);
568
569         return cli->tc_jobid;
570 }
571
572 static void *nrs_tbf_jobid_hop_object(struct hlist_node *hnode)
573 {
574         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
575 }
576
577 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
578 {
579         struct nrs_tbf_client *cli = hlist_entry(hnode,
580                                                      struct nrs_tbf_client,
581                                                      tc_hnode);
582
583         atomic_inc(&cli->tc_ref);
584 }
585
586 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
587 {
588         struct nrs_tbf_client *cli = hlist_entry(hnode,
589                                                      struct nrs_tbf_client,
590                                                      tc_hnode);
591
592         atomic_dec(&cli->tc_ref);
593 }
594
595 static void
596 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
597
598 {
599         struct nrs_tbf_client *cli = hlist_entry(hnode,
600                                                  struct nrs_tbf_client,
601                                                  tc_hnode);
602
603         LASSERT(atomic_read(&cli->tc_ref) == 0);
604         nrs_tbf_cli_fini(cli);
605 }
606
607 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
608         .hs_hash        = nrs_tbf_jobid_hop_hash,
609         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
610         .hs_key         = nrs_tbf_jobid_hop_key,
611         .hs_object      = nrs_tbf_jobid_hop_object,
612         .hs_get         = nrs_tbf_jobid_hop_get,
613         .hs_put         = nrs_tbf_jobid_hop_put,
614         .hs_put_locked  = nrs_tbf_jobid_hop_put,
615         .hs_exit        = nrs_tbf_jobid_hop_exit,
616 };
617
618 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
619                                   CFS_HASH_NO_ITEMREF | \
620                                   CFS_HASH_DEPTH)
621
622 static struct nrs_tbf_client *
623 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
624                           struct cfs_hash_bd *bd,
625                           const char *jobid)
626 {
627         struct hlist_node *hnode;
628         struct nrs_tbf_client *cli;
629
630         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
631         if (hnode == NULL)
632                 return NULL;
633
634         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
635         if (!list_empty(&cli->tc_lru))
636                 list_del_init(&cli->tc_lru);
637         return cli;
638 }
639
640 #define NRS_TBF_JOBID_NULL ""
641
642 static struct nrs_tbf_client *
643 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
644                        struct ptlrpc_request *req)
645 {
646         const char              *jobid;
647         struct nrs_tbf_client   *cli;
648         struct cfs_hash         *hs = head->th_cli_hash;
649         struct cfs_hash_bd               bd;
650
651         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
652         if (jobid == NULL)
653                 jobid = NRS_TBF_JOBID_NULL;
654         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
655         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
656         cfs_hash_bd_unlock(hs, &bd, 1);
657
658         return cli;
659 }
660
661 static struct nrs_tbf_client *
662 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
663                           struct nrs_tbf_client *cli)
664 {
665         const char              *jobid;
666         struct nrs_tbf_client   *ret;
667         struct cfs_hash         *hs = head->th_cli_hash;
668         struct cfs_hash_bd               bd;
669
670         jobid = cli->tc_jobid;
671         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
672         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
673         if (ret == NULL) {
674                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
675                 ret = cli;
676         }
677         cfs_hash_bd_unlock(hs, &bd, 1);
678
679         return ret;
680 }
681
682 static void
683 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
684                       struct nrs_tbf_client *cli)
685 {
686         struct cfs_hash_bd               bd;
687         struct cfs_hash         *hs = head->th_cli_hash;
688         struct nrs_tbf_bucket   *bkt;
689         int                      hw;
690         struct list_head        zombies;
691
692         INIT_LIST_HEAD(&zombies);
693         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
694         bkt = cfs_hash_bd_extra_get(hs, &bd);
695         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
696                 return;
697         LASSERT(list_empty(&cli->tc_lru));
698         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
699
700         /*
701          * Check and purge the LRU, there is at least one client in the LRU.
702          */
703         hw = tbf_jobid_cache_size >>
704              (hs->hs_cur_bits - hs->hs_bkt_bits);
705         while (cfs_hash_bd_count_get(&bd) > hw) {
706                 if (unlikely(list_empty(&bkt->ntb_lru)))
707                         break;
708                 cli = list_entry(bkt->ntb_lru.next,
709                                      struct nrs_tbf_client,
710                                      tc_lru);
711                 LASSERT(atomic_read(&cli->tc_ref) == 0);
712                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
713                 list_move(&cli->tc_lru, &zombies);
714         }
715         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
716
717         while (!list_empty(&zombies)) {
718                 cli = container_of0(zombies.next,
719                                     struct nrs_tbf_client, tc_lru);
720                 list_del_init(&cli->tc_lru);
721                 nrs_tbf_cli_fini(cli);
722         }
723 }
724
725 static void
726 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
727                        struct ptlrpc_request *req)
728 {
729         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
730
731         if (jobid == NULL)
732                 jobid = NRS_TBF_JOBID_NULL;
733         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
734         INIT_LIST_HEAD(&cli->tc_lru);
735         memcpy(cli->tc_jobid, jobid, strlen(jobid));
736 }
737
738 static int nrs_tbf_jobid_hash_order(void)
739 {
740         int bits;
741
742         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
743                 ;
744
745         return bits;
746 }
747
748 #define NRS_TBF_JOBID_BKT_BITS 10
749
750 static int
751 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
752                       struct nrs_tbf_head *head)
753 {
754         struct nrs_tbf_cmd       start;
755         struct nrs_tbf_bucket   *bkt;
756         int                      bits;
757         int                      i;
758         int                      rc;
759         struct cfs_hash_bd       bd;
760
761         bits = nrs_tbf_jobid_hash_order();
762         if (bits < NRS_TBF_JOBID_BKT_BITS)
763                 bits = NRS_TBF_JOBID_BKT_BITS;
764         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
765                                             bits,
766                                             bits,
767                                             NRS_TBF_JOBID_BKT_BITS,
768                                             sizeof(*bkt),
769                                             0,
770                                             0,
771                                             &nrs_tbf_jobid_hash_ops,
772                                             NRS_TBF_JOBID_HASH_FLAGS);
773         if (head->th_cli_hash == NULL)
774                 return -ENOMEM;
775
776         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
777                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
778                 INIT_LIST_HEAD(&bkt->ntb_lru);
779         }
780
781         memset(&start, 0, sizeof(start));
782         start.u.tc_start.ts_jobids_str = "*";
783
784         start.u.tc_start.ts_rpc_rate = tbf_rate;
785         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
786         start.tc_name = NRS_TBF_DEFAULT_RULE;
787         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
788         rc = nrs_tbf_rule_start(policy, head, &start);
789         if (rc) {
790                 cfs_hash_putref(head->th_cli_hash);
791                 head->th_cli_hash = NULL;
792         }
793
794         return rc;
795 }
796
797 /**
798  * Frees jobid of \a list.
799  *
800  */
801 static void
802 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
803 {
804         struct nrs_tbf_jobid *jobid, *n;
805
806         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
807                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
808                 list_del(&jobid->tj_linkage);
809                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
810         }
811 }
812
813 static int
814 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
815 {
816         struct nrs_tbf_jobid *jobid;
817         struct cfs_lstr res;
818         int rc;
819
820         OBD_ALLOC(jobid, sizeof(struct nrs_tbf_jobid));
821         if (jobid == NULL)
822                 return -ENOMEM;
823
824         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
825         if (jobid->tj_id == NULL) {
826                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
827                 return -ENOMEM;
828         }
829
830         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
831         rc = cfs_gettok(id, '*', &res);
832         if (rc == 0)
833                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
834         else
835                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
836
837         list_add_tail(&jobid->tj_linkage, jobid_list);
838         return 0;
839 }
840
841 static bool
842 cfs_match_wildcard(const char *pattern, const char *content)
843 {
844         if (*pattern == '\0' && *content == '\0')
845                 return true;
846
847         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
848                 return false;
849
850         while (*pattern == *content) {
851                 pattern++;
852                 content++;
853                 if (*pattern == '\0' && *content == '\0')
854                         return true;
855
856                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
857                     *content == '\0')
858                         return false;
859         }
860
861         if (*pattern == '*')
862                 return (cfs_match_wildcard(pattern + 1, content) ||
863                         cfs_match_wildcard(pattern, content + 1));
864
865         return false;
866 }
867
868 static inline bool
869 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
870 {
871         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
872                 return strcmp(jobid->tj_id, id) == 0;
873
874         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
875                 return cfs_match_wildcard(jobid->tj_id, id);
876
877         return false;
878 }
879
880 static int
881 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
882 {
883         struct nrs_tbf_jobid *jobid;
884
885         list_for_each_entry(jobid, jobid_list, tj_linkage) {
886                 if (nrs_tbf_jobid_match(jobid, id))
887                         return 1;
888         }
889         return 0;
890 }
891
892 static int
893 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
894 {
895         struct cfs_lstr src;
896         struct cfs_lstr res;
897         int rc = 0;
898         ENTRY;
899
900         src.ls_str = str;
901         src.ls_len = len;
902         INIT_LIST_HEAD(jobid_list);
903         while (src.ls_str) {
904                 rc = cfs_gettok(&src, ' ', &res);
905                 if (rc == 0) {
906                         rc = -EINVAL;
907                         break;
908                 }
909                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
910                 if (rc)
911                         break;
912         }
913         if (rc)
914                 nrs_tbf_jobid_list_free(jobid_list);
915         RETURN(rc);
916 }
917
918 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
919 {
920         if (!list_empty(&cmd->u.tc_start.ts_jobids))
921                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
922         if (cmd->u.tc_start.ts_jobids_str)
923                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
924                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
925 }
926
927 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
928 {
929         struct cfs_lstr res;
930         int keylen = strlen(key);
931         int rc;
932
933         rc = cfs_gettok(src, '=', &res);
934         if (rc == 0 || res.ls_len != keylen ||
935             strncmp(res.ls_str, key, keylen) != 0 ||
936             src->ls_len <= 2 || src->ls_str[0] != '{' ||
937             src->ls_str[src->ls_len - 1] != '}')
938                 return -EINVAL;
939
940         /* Skip '{' and '}' */
941         src->ls_str++;
942         src->ls_len -= 2;
943         return 0;
944 }
945
946 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
947 {
948         struct cfs_lstr src;
949         int rc;
950
951         src.ls_str = id;
952         src.ls_len = strlen(id);
953         rc = nrs_tbf_check_id_value(&src, "jobid");
954         if (rc)
955                 return rc;
956
957         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
958         if (cmd->u.tc_start.ts_jobids_str == NULL)
959                 return -ENOMEM;
960
961         memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
962
963         /* parse jobid list */
964         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
965                                       strlen(cmd->u.tc_start.ts_jobids_str),
966                                       &cmd->u.tc_start.ts_jobids);
967         if (rc)
968                 nrs_tbf_jobid_cmd_fini(cmd);
969
970         return rc;
971 }
972
973 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
974                                    struct nrs_tbf_rule *rule,
975                                    struct nrs_tbf_cmd *start)
976 {
977         int rc = 0;
978
979         LASSERT(start->u.tc_start.ts_jobids_str);
980         OBD_ALLOC(rule->tr_jobids_str,
981                   strlen(start->u.tc_start.ts_jobids_str) + 1);
982         if (rule->tr_jobids_str == NULL)
983                 return -ENOMEM;
984
985         memcpy(rule->tr_jobids_str,
986                start->u.tc_start.ts_jobids_str,
987                strlen(start->u.tc_start.ts_jobids_str));
988
989         INIT_LIST_HEAD(&rule->tr_jobids);
990         if (!list_empty(&start->u.tc_start.ts_jobids)) {
991                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
992                                               strlen(rule->tr_jobids_str),
993                                               &rule->tr_jobids);
994                 if (rc)
995                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
996         }
997         if (rc)
998                 OBD_FREE(rule->tr_jobids_str,
999                          strlen(start->u.tc_start.ts_jobids_str) + 1);
1000         return rc;
1001 }
1002
1003 static int
1004 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1005 {
1006         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1007                    rule->tr_jobids_str, rule->tr_rpc_rate,
1008                    atomic_read(&rule->tr_ref) - 1);
1009         return 0;
1010 }
1011
1012 static int
1013 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1014                          struct nrs_tbf_client *cli)
1015 {
1016         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1017 }
1018
1019 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1020 {
1021         if (!list_empty(&rule->tr_jobids))
1022                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1023         LASSERT(rule->tr_jobids_str != NULL);
1024         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1025 }
1026
1027 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1028         .o_name = NRS_TBF_TYPE_JOBID,
1029         .o_startup = nrs_tbf_jobid_startup,
1030         .o_cli_find = nrs_tbf_jobid_cli_find,
1031         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1032         .o_cli_put = nrs_tbf_jobid_cli_put,
1033         .o_cli_init = nrs_tbf_jobid_cli_init,
1034         .o_rule_init = nrs_tbf_jobid_rule_init,
1035         .o_rule_dump = nrs_tbf_jobid_rule_dump,
1036         .o_rule_match = nrs_tbf_jobid_rule_match,
1037         .o_rule_fini = nrs_tbf_jobid_rule_fini,
1038 };
1039
1040 /**
1041  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1042  *
1043  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
1044  * nrs_tbf_client objects.
1045  */
1046 #define NRS_TBF_NID_BKT_BITS    8
1047 #define NRS_TBF_NID_BITS        16
1048
1049 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1050                                   unsigned mask)
1051 {
1052         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1053 }
1054
1055 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1056 {
1057         lnet_nid_t            *nid = (lnet_nid_t *)key;
1058         struct nrs_tbf_client *cli = hlist_entry(hnode,
1059                                                      struct nrs_tbf_client,
1060                                                      tc_hnode);
1061
1062         return *nid == cli->tc_nid;
1063 }
1064
1065 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1066 {
1067         struct nrs_tbf_client *cli = hlist_entry(hnode,
1068                                                      struct nrs_tbf_client,
1069                                                      tc_hnode);
1070
1071         return &cli->tc_nid;
1072 }
1073
1074 static void *nrs_tbf_nid_hop_object(struct hlist_node *hnode)
1075 {
1076         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
1077 }
1078
1079 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1080 {
1081         struct nrs_tbf_client *cli = hlist_entry(hnode,
1082                                                      struct nrs_tbf_client,
1083                                                      tc_hnode);
1084
1085         atomic_inc(&cli->tc_ref);
1086 }
1087
1088 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1089 {
1090         struct nrs_tbf_client *cli = hlist_entry(hnode,
1091                                                      struct nrs_tbf_client,
1092                                                      tc_hnode);
1093
1094         atomic_dec(&cli->tc_ref);
1095 }
1096
1097 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1098 {
1099         struct nrs_tbf_client *cli = hlist_entry(hnode,
1100                                                      struct nrs_tbf_client,
1101                                                      tc_hnode);
1102
1103         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1104                  "Busy TBF object from client with NID %s, with %d refs\n",
1105                  libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1106
1107         nrs_tbf_cli_fini(cli);
1108 }
1109
1110 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1111         .hs_hash        = nrs_tbf_nid_hop_hash,
1112         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1113         .hs_key         = nrs_tbf_nid_hop_key,
1114         .hs_object      = nrs_tbf_nid_hop_object,
1115         .hs_get         = nrs_tbf_nid_hop_get,
1116         .hs_put         = nrs_tbf_nid_hop_put,
1117         .hs_put_locked  = nrs_tbf_nid_hop_put,
1118         .hs_exit        = nrs_tbf_nid_hop_exit,
1119 };
1120
1121 static struct nrs_tbf_client *
1122 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1123                      struct ptlrpc_request *req)
1124 {
1125         return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1126 }
1127
1128 static struct nrs_tbf_client *
1129 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1130                         struct nrs_tbf_client *cli)
1131 {
1132         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1133                                        &cli->tc_hnode);
1134 }
1135
1136 static void
1137 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1138                       struct nrs_tbf_client *cli)
1139 {
1140         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1141 }
1142
1143 static int
1144 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1145                     struct nrs_tbf_head *head)
1146 {
1147         struct nrs_tbf_cmd      start;
1148         int rc;
1149
1150         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1151                                             NRS_TBF_NID_BITS,
1152                                             NRS_TBF_NID_BITS,
1153                                             NRS_TBF_NID_BKT_BITS, 0,
1154                                             CFS_HASH_MIN_THETA,
1155                                             CFS_HASH_MAX_THETA,
1156                                             &nrs_tbf_nid_hash_ops,
1157                                             CFS_HASH_RW_BKTLOCK);
1158         if (head->th_cli_hash == NULL)
1159                 return -ENOMEM;
1160
1161         memset(&start, 0, sizeof(start));
1162         start.u.tc_start.ts_nids_str = "*";
1163
1164         start.u.tc_start.ts_rpc_rate = tbf_rate;
1165         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1166         start.tc_name = NRS_TBF_DEFAULT_RULE;
1167         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1168         rc = nrs_tbf_rule_start(policy, head, &start);
1169         if (rc) {
1170                 cfs_hash_putref(head->th_cli_hash);
1171                 head->th_cli_hash = NULL;
1172         }
1173
1174         return rc;
1175 }
1176
1177 static void
1178 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1179                              struct ptlrpc_request *req)
1180 {
1181         cli->tc_nid = req->rq_peer.nid;
1182 }
1183
1184 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1185                                  struct nrs_tbf_rule *rule,
1186                                  struct nrs_tbf_cmd *start)
1187 {
1188         LASSERT(start->u.tc_start.ts_nids_str);
1189         OBD_ALLOC(rule->tr_nids_str,
1190                   strlen(start->u.tc_start.ts_nids_str) + 1);
1191         if (rule->tr_nids_str == NULL)
1192                 return -ENOMEM;
1193
1194         memcpy(rule->tr_nids_str,
1195                start->u.tc_start.ts_nids_str,
1196                strlen(start->u.tc_start.ts_nids_str));
1197
1198         INIT_LIST_HEAD(&rule->tr_nids);
1199         if (!list_empty(&start->u.tc_start.ts_nids)) {
1200                 if (cfs_parse_nidlist(rule->tr_nids_str,
1201                                       strlen(rule->tr_nids_str),
1202                                       &rule->tr_nids) <= 0) {
1203                         CERROR("nids {%s} illegal\n",
1204                                rule->tr_nids_str);
1205                         OBD_FREE(rule->tr_nids_str,
1206                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1207                         return -EINVAL;
1208                 }
1209         }
1210         return 0;
1211 }
1212
1213 static int
1214 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1215 {
1216         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1217                    rule->tr_nids_str, rule->tr_rpc_rate,
1218                    atomic_read(&rule->tr_ref) - 1);
1219         return 0;
1220 }
1221
1222 static int
1223 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1224                        struct nrs_tbf_client *cli)
1225 {
1226         return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1227 }
1228
1229 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1230 {
1231         if (!list_empty(&rule->tr_nids))
1232                 cfs_free_nidlist(&rule->tr_nids);
1233         LASSERT(rule->tr_nids_str != NULL);
1234         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1235 }
1236
1237 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1238 {
1239         if (!list_empty(&cmd->u.tc_start.ts_nids))
1240                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1241         if (cmd->u.tc_start.ts_nids_str)
1242                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1243                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1244 }
1245
1246 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1247 {
1248         struct cfs_lstr src;
1249         int rc;
1250
1251         src.ls_str = id;
1252         src.ls_len = strlen(id);
1253         rc = nrs_tbf_check_id_value(&src, "nid");
1254         if (rc)
1255                 return rc;
1256
1257         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1258         if (cmd->u.tc_start.ts_nids_str == NULL)
1259                 return -ENOMEM;
1260
1261         memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1262
1263         /* parse NID list */
1264         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1265                               strlen(cmd->u.tc_start.ts_nids_str),
1266                               &cmd->u.tc_start.ts_nids) <= 0) {
1267                 nrs_tbf_nid_cmd_fini(cmd);
1268                 return -EINVAL;
1269         }
1270
1271         return 0;
1272 }
1273
1274 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1275         .o_name = NRS_TBF_TYPE_NID,
1276         .o_startup = nrs_tbf_nid_startup,
1277         .o_cli_find = nrs_tbf_nid_cli_find,
1278         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1279         .o_cli_put = nrs_tbf_nid_cli_put,
1280         .o_cli_init = nrs_tbf_nid_cli_init,
1281         .o_rule_init = nrs_tbf_nid_rule_init,
1282         .o_rule_dump = nrs_tbf_nid_rule_dump,
1283         .o_rule_match = nrs_tbf_nid_rule_match,
1284         .o_rule_fini = nrs_tbf_nid_rule_fini,
1285 };
1286
1287 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1288                                  unsigned mask)
1289 {
1290         return cfs_hash_djb2_hash(key, strlen(key), mask);
1291 }
1292
1293 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1294 {
1295         struct nrs_tbf_client *cli = hlist_entry(hnode,
1296                                                  struct nrs_tbf_client,
1297                                                  tc_hnode);
1298
1299         return (strcmp(cli->tc_key, key) == 0);
1300 }
1301
1302 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1303 {
1304         struct nrs_tbf_client *cli = hlist_entry(hnode,
1305                                                  struct nrs_tbf_client,
1306                                                  tc_hnode);
1307         return cli->tc_key;
1308 }
1309
1310 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
1311 {
1312         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
1313 }
1314
1315 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1316 {
1317         struct nrs_tbf_client *cli = hlist_entry(hnode,
1318                                                  struct nrs_tbf_client,
1319                                                  tc_hnode);
1320
1321         atomic_inc(&cli->tc_ref);
1322 }
1323
1324 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1325 {
1326         struct nrs_tbf_client *cli = hlist_entry(hnode,
1327                                                  struct nrs_tbf_client,
1328                                                  tc_hnode);
1329
1330         atomic_dec(&cli->tc_ref);
1331 }
1332
1333 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1334
1335 {
1336         struct nrs_tbf_client *cli = hlist_entry(hnode,
1337                                                  struct nrs_tbf_client,
1338                                                  tc_hnode);
1339
1340         LASSERT(atomic_read(&cli->tc_ref) == 0);
1341         nrs_tbf_cli_fini(cli);
1342 }
1343
1344 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1345         .hs_hash        = nrs_tbf_hop_hash,
1346         .hs_keycmp      = nrs_tbf_hop_keycmp,
1347         .hs_key         = nrs_tbf_hop_key,
1348         .hs_object      = nrs_tbf_hop_object,
1349         .hs_get         = nrs_tbf_hop_get,
1350         .hs_put         = nrs_tbf_hop_put,
1351         .hs_put_locked  = nrs_tbf_hop_put,
1352         .hs_exit        = nrs_tbf_hop_exit,
1353 };
1354
1355 #define NRS_TBF_GENERIC_BKT_BITS        10
1356 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1357                                         CFS_HASH_NO_ITEMREF | \
1358                                         CFS_HASH_DEPTH)
1359
1360 static int
1361 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1362 {
1363         struct nrs_tbf_cmd       start;
1364         struct nrs_tbf_bucket   *bkt;
1365         int                      bits;
1366         int                      i;
1367         int                      rc;
1368         struct cfs_hash_bd       bd;
1369
1370         bits = nrs_tbf_jobid_hash_order();
1371         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1372                 bits = NRS_TBF_GENERIC_BKT_BITS;
1373         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1374                                             bits, bits,
1375                                             NRS_TBF_GENERIC_BKT_BITS,
1376                                             sizeof(*bkt), 0, 0,
1377                                             &nrs_tbf_hash_ops,
1378                                             NRS_TBF_GENERIC_HASH_FLAGS);
1379         if (head->th_cli_hash == NULL)
1380                 return -ENOMEM;
1381
1382         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1383                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1384                 INIT_LIST_HEAD(&bkt->ntb_lru);
1385         }
1386
1387         memset(&start, 0, sizeof(start));
1388         start.u.tc_start.ts_conds_str = "*";
1389
1390         start.u.tc_start.ts_rpc_rate = tbf_rate;
1391         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1392         start.tc_name = NRS_TBF_DEFAULT_RULE;
1393         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1394         rc = nrs_tbf_rule_start(policy, head, &start);
1395         if (rc)
1396                 cfs_hash_putref(head->th_cli_hash);
1397
1398         return rc;
1399 }
1400
1401 static struct nrs_tbf_client *
1402 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1403                         const char *key)
1404 {
1405         struct hlist_node *hnode;
1406         struct nrs_tbf_client *cli;
1407
1408         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1409         if (hnode == NULL)
1410                 return NULL;
1411
1412         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
1413         if (!list_empty(&cli->tc_lru))
1414                 list_del_init(&cli->tc_lru);
1415         return cli;
1416 }
1417
1418 static struct nrs_tbf_client *
1419 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1420 {
1421         struct nrs_tbf_client *cli;
1422         struct cfs_hash *hs = head->th_cli_hash;
1423         struct cfs_hash_bd bd;
1424         char keystr[NRS_TBF_KEY_LEN] = { '\0' };
1425         const char *jobid;
1426         __u32 opc;
1427
1428         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1429         if (jobid == NULL)
1430                 jobid = NRS_TBF_JOBID_NULL;
1431         opc = lustre_msg_get_opc(req->rq_reqmsg);
1432         snprintf(keystr, sizeof(keystr), "%s_%s_%d", jobid,
1433                  libcfs_nid2str(req->rq_peer.nid), opc);
1434         LASSERT(strlen(keystr) < NRS_TBF_KEY_LEN);
1435         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1436         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1437         cfs_hash_bd_unlock(hs, &bd, 1);
1438
1439         return cli;
1440 }
1441
1442 static struct nrs_tbf_client *
1443 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1444                     struct nrs_tbf_client *cli)
1445 {
1446         const char              *key;
1447         struct nrs_tbf_client   *ret;
1448         struct cfs_hash         *hs = head->th_cli_hash;
1449         struct cfs_hash_bd       bd;
1450
1451         key = cli->tc_key;
1452         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1453         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1454         if (ret == NULL) {
1455                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1456                 ret = cli;
1457         }
1458         cfs_hash_bd_unlock(hs, &bd, 1);
1459
1460         return ret;
1461 }
1462
1463 static void
1464 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1465 {
1466         struct cfs_hash_bd       bd;
1467         struct cfs_hash         *hs = head->th_cli_hash;
1468         struct nrs_tbf_bucket   *bkt;
1469         int                      hw;
1470         struct list_head         zombies;
1471
1472         INIT_LIST_HEAD(&zombies);
1473         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1474         bkt = cfs_hash_bd_extra_get(hs, &bd);
1475         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1476                 return;
1477         LASSERT(list_empty(&cli->tc_lru));
1478         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1479
1480         /**
1481          * Check and purge the LRU, there is at least one client in the LRU.
1482          */
1483         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1484         while (cfs_hash_bd_count_get(&bd) > hw) {
1485                 if (unlikely(list_empty(&bkt->ntb_lru)))
1486                         break;
1487                 cli = list_entry(bkt->ntb_lru.next,
1488                                  struct nrs_tbf_client,
1489                                  tc_lru);
1490                 LASSERT(atomic_read(&cli->tc_ref) == 0);
1491                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1492                 list_move(&cli->tc_lru, &zombies);
1493         }
1494         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1495
1496         while (!list_empty(&zombies)) {
1497                 cli = container_of0(zombies.next,
1498                                     struct nrs_tbf_client, tc_lru);
1499                 list_del_init(&cli->tc_lru);
1500                 nrs_tbf_cli_fini(cli);
1501         }
1502 }
1503
1504 static void
1505 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1506                          struct ptlrpc_request *req)
1507 {
1508         char keystr[NRS_TBF_KEY_LEN];
1509         const char *jobid;
1510         __u32 opc;
1511
1512         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1513         if (jobid == NULL)
1514                 jobid = NRS_TBF_JOBID_NULL;
1515         opc = lustre_msg_get_opc(req->rq_reqmsg);
1516         snprintf(keystr, sizeof(keystr), "%s_%s_%d", jobid,
1517                  libcfs_nid2str(req->rq_peer.nid), opc);
1518
1519         LASSERT(strlen(keystr) < NRS_TBF_KEY_LEN);
1520         INIT_LIST_HEAD(&cli->tc_lru);
1521         memcpy(cli->tc_key, keystr, strlen(keystr));
1522         memcpy(cli->tc_jobid, jobid, strlen(jobid));
1523         cli->tc_nid = req->rq_peer.nid;
1524         cli->tc_opcode = opc;
1525 }
1526
1527 static void
1528 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1529 {
1530         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1531                 expr->te_field < NRS_TBF_FIELD_MAX);
1532         switch (expr->te_field) {
1533         case NRS_TBF_FIELD_NID:
1534                 cfs_free_nidlist(&expr->te_cond);
1535                 break;
1536         case NRS_TBF_FIELD_JOBID:
1537                 nrs_tbf_jobid_list_free(&expr->te_cond);
1538                 break;
1539         case NRS_TBF_FIELD_OPCODE:
1540                 CFS_FREE_BITMAP(expr->te_opcodes);
1541                 break;
1542         default:
1543                 LBUG();
1544         }
1545         OBD_FREE_PTR(expr);
1546 }
1547
1548 static void
1549 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1550 {
1551         struct nrs_tbf_expression *expression;
1552         struct nrs_tbf_expression *n;
1553
1554         LASSERT(list_empty(&conjunction->tc_linkage));
1555         list_for_each_entry_safe(expression, n,
1556                                  &conjunction->tc_expressions,
1557                                  te_linkage) {
1558                 list_del_init(&expression->te_linkage);
1559                 nrs_tbf_expression_free(expression);
1560         }
1561         OBD_FREE_PTR(conjunction);
1562 }
1563
1564 static void
1565 nrs_tbf_conds_free(struct list_head *cond_list)
1566 {
1567         struct nrs_tbf_conjunction *conjunction;
1568         struct nrs_tbf_conjunction *n;
1569
1570         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1571                 list_del_init(&conjunction->tc_linkage);
1572                 nrs_tbf_conjunction_free(conjunction);
1573         }
1574 }
1575
1576 static void
1577 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1578 {
1579         if (!list_empty(&cmd->u.tc_start.ts_conds))
1580                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1581         if (cmd->u.tc_start.ts_conds_str)
1582                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1583                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1584 }
1585
1586 #define NRS_TBF_DISJUNCTION_DELIM       (',')
1587 #define NRS_TBF_CONJUNCTION_DELIM       ('&')
1588 #define NRS_TBF_EXPRESSION_DELIM        ('=')
1589
1590 static inline bool
1591 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1592 {
1593         int len = strlen(str);
1594
1595         return (field->ls_len == len &&
1596                 strncmp(field->ls_str, str, len) == 0);
1597 }
1598
1599 static int
1600 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1601
1602 static int
1603 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1604 {
1605         struct nrs_tbf_expression *expr;
1606         struct cfs_lstr field;
1607         int rc = 0;
1608
1609         OBD_ALLOC(expr, sizeof(struct nrs_tbf_expression));
1610         if (expr == NULL)
1611                 return -ENOMEM;
1612
1613         rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1614         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1615             src->ls_str[src->ls_len - 1] != '}')
1616                 GOTO(out, rc = -EINVAL);
1617
1618         /* Skip '{' and '}' */
1619         src->ls_str++;
1620         src->ls_len -= 2;
1621
1622         if (nrs_tbf_check_field(&field, "nid")) {
1623                 if (cfs_parse_nidlist(src->ls_str,
1624                                       src->ls_len,
1625                                       &expr->te_cond) <= 0)
1626                         GOTO(out, rc = -EINVAL);
1627                 expr->te_field = NRS_TBF_FIELD_NID;
1628         } else if (nrs_tbf_check_field(&field, "jobid")) {
1629                 if (nrs_tbf_jobid_list_parse(src->ls_str,
1630                                              src->ls_len,
1631                                              &expr->te_cond) < 0)
1632                         GOTO(out, rc = -EINVAL);
1633                 expr->te_field = NRS_TBF_FIELD_JOBID;
1634         } else if (nrs_tbf_check_field(&field, "opcode")) {
1635                 if (nrs_tbf_opcode_list_parse(src->ls_str,
1636                                               src->ls_len,
1637                                               &expr->te_opcodes) < 0)
1638                         GOTO(out, rc = -EINVAL);
1639                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1640         } else
1641                 GOTO(out, rc = -EINVAL);
1642
1643         list_add_tail(&expr->te_linkage, cond_list);
1644         return 0;
1645 out:
1646         OBD_FREE_PTR(expr);
1647         return rc;
1648 }
1649
1650 static int
1651 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1652 {
1653         struct nrs_tbf_conjunction *conjunction;
1654         struct cfs_lstr expr;
1655         int rc = 0;
1656
1657         OBD_ALLOC(conjunction, sizeof(struct nrs_tbf_conjunction));
1658         if (conjunction == NULL)
1659                 return -ENOMEM;
1660
1661         INIT_LIST_HEAD(&conjunction->tc_expressions);
1662         list_add_tail(&conjunction->tc_linkage, cond_list);
1663
1664         while (src->ls_str) {
1665                 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1666                 if (rc == 0) {
1667                         rc = -EINVAL;
1668                         break;
1669                 }
1670                 rc = nrs_tbf_expression_parse(&expr,
1671                                               &conjunction->tc_expressions);
1672                 if (rc)
1673                         break;
1674         }
1675         return rc;
1676 }
1677
1678 static int
1679 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1680 {
1681         struct cfs_lstr src;
1682         struct cfs_lstr res;
1683         int rc = 0;
1684
1685         src.ls_str = str;
1686         src.ls_len = len;
1687         INIT_LIST_HEAD(cond_list);
1688         while (src.ls_str) {
1689                 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1690                 if (rc == 0) {
1691                         rc = -EINVAL;
1692                         break;
1693                 }
1694                 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1695                 if (rc)
1696                         break;
1697         }
1698         return rc;
1699 }
1700
1701 static int
1702 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1703 {
1704         int rc;
1705
1706         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1707         if (cmd->u.tc_start.ts_conds_str == NULL)
1708                 return -ENOMEM;
1709
1710         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1711
1712         /* Parse hybird NID and JOBID conditions */
1713         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1714                                  strlen(cmd->u.tc_start.ts_conds_str),
1715                                  &cmd->u.tc_start.ts_conds);
1716         if (rc)
1717                 nrs_tbf_generic_cmd_fini(cmd);
1718
1719         return rc;
1720 }
1721
1722 static int
1723 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1724                          struct nrs_tbf_rule *rule,
1725                          struct nrs_tbf_client *cli)
1726 {
1727         switch (expr->te_field) {
1728         case NRS_TBF_FIELD_NID:
1729                 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1730         case NRS_TBF_FIELD_JOBID:
1731                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1732         case NRS_TBF_FIELD_OPCODE:
1733                 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1734         default:
1735                 return 0;
1736         }
1737 }
1738
1739 static int
1740 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1741                           struct nrs_tbf_rule *rule,
1742                           struct nrs_tbf_client *cli)
1743 {
1744         struct nrs_tbf_expression *expr;
1745         int matched;
1746
1747         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
1748                 matched = nrs_tbf_expression_match(expr, rule, cli);
1749                 if (!matched)
1750                         return 0;
1751         }
1752
1753         return 1;
1754 }
1755
1756 static int
1757 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
1758 {
1759         struct nrs_tbf_conjunction *conjunction;
1760         int matched;
1761
1762         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
1763                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
1764                 if (matched)
1765                         return 1;
1766         }
1767
1768         return 0;
1769 }
1770
1771 static void
1772 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
1773 {
1774         if (!list_empty(&rule->tr_conds))
1775                 nrs_tbf_conds_free(&rule->tr_conds);
1776         LASSERT(rule->tr_conds_str != NULL);
1777         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
1778 }
1779
1780 static int
1781 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
1782                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
1783 {
1784         int rc = 0;
1785
1786         LASSERT(start->u.tc_start.ts_conds_str);
1787         OBD_ALLOC(rule->tr_conds_str,
1788                   strlen(start->u.tc_start.ts_conds_str) + 1);
1789         if (rule->tr_conds_str == NULL)
1790                 return -ENOMEM;
1791
1792         memcpy(rule->tr_conds_str,
1793                start->u.tc_start.ts_conds_str,
1794                strlen(start->u.tc_start.ts_conds_str));
1795
1796         INIT_LIST_HEAD(&rule->tr_conds);
1797         if (!list_empty(&start->u.tc_start.ts_conds)) {
1798                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
1799                                          strlen(rule->tr_conds_str),
1800                                          &rule->tr_conds);
1801         }
1802         if (rc)
1803                 nrs_tbf_generic_rule_fini(rule);
1804
1805         return rc;
1806 }
1807
1808 static int
1809 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1810 {
1811         seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
1812                    rule->tr_conds_str, rule->tr_rpc_rate,
1813                    atomic_read(&rule->tr_ref) - 1);
1814         return 0;
1815 }
1816
1817 static int
1818 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
1819                            struct nrs_tbf_client *cli)
1820 {
1821         return nrs_tbf_cond_match(rule, cli);
1822 }
1823
1824 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
1825         .o_name = NRS_TBF_TYPE_GENERIC,
1826         .o_startup = nrs_tbf_startup,
1827         .o_cli_find = nrs_tbf_cli_find,
1828         .o_cli_findadd = nrs_tbf_cli_findadd,
1829         .o_cli_put = nrs_tbf_cli_put,
1830         .o_cli_init = nrs_tbf_generic_cli_init,
1831         .o_rule_init = nrs_tbf_rule_init,
1832         .o_rule_dump = nrs_tbf_generic_rule_dump,
1833         .o_rule_match = nrs_tbf_generic_rule_match,
1834         .o_rule_fini = nrs_tbf_generic_rule_fini,
1835 };
1836
1837 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
1838 {
1839         if (rule->tr_opcodes != NULL)
1840                 CFS_FREE_BITMAP(rule->tr_opcodes);
1841
1842         LASSERT(rule->tr_opcodes_str != NULL);
1843         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
1844 }
1845
1846 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
1847                                         unsigned mask)
1848 {
1849         return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
1850 }
1851
1852 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
1853 {
1854         const __u32     *opc = key;
1855         struct nrs_tbf_client *cli = hlist_entry(hnode,
1856                                                  struct nrs_tbf_client,
1857                                                  tc_hnode);
1858
1859         return *opc == cli->tc_opcode;
1860 }
1861
1862 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
1863 {
1864         struct nrs_tbf_client *cli = hlist_entry(hnode,
1865                                                  struct nrs_tbf_client,
1866                                                  tc_hnode);
1867
1868         return &cli->tc_opcode;
1869 }
1870
1871 static void *nrs_tbf_opcode_hop_object(struct hlist_node *hnode)
1872 {
1873         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
1874 }
1875
1876 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
1877                                    struct hlist_node *hnode)
1878 {
1879         struct nrs_tbf_client *cli = hlist_entry(hnode,
1880                                                  struct nrs_tbf_client,
1881                                                  tc_hnode);
1882
1883         atomic_inc(&cli->tc_ref);
1884 }
1885
1886 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
1887                                    struct hlist_node *hnode)
1888 {
1889         struct nrs_tbf_client *cli = hlist_entry(hnode,
1890                                                  struct nrs_tbf_client,
1891                                                  tc_hnode);
1892
1893         atomic_dec(&cli->tc_ref);
1894 }
1895
1896 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
1897                                     struct hlist_node *hnode)
1898 {
1899         struct nrs_tbf_client *cli = hlist_entry(hnode,
1900                                                  struct nrs_tbf_client,
1901                                                  tc_hnode);
1902
1903         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1904                  "Busy TBF object from client with opcode %s, with %d refs\n",
1905                  ll_opcode2str(cli->tc_opcode),
1906                  atomic_read(&cli->tc_ref));
1907
1908         nrs_tbf_cli_fini(cli);
1909 }
1910 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
1911         .hs_hash        = nrs_tbf_opcode_hop_hash,
1912         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
1913         .hs_key         = nrs_tbf_opcode_hop_key,
1914         .hs_object      = nrs_tbf_opcode_hop_object,
1915         .hs_get         = nrs_tbf_opcode_hop_get,
1916         .hs_put         = nrs_tbf_opcode_hop_put,
1917         .hs_put_locked  = nrs_tbf_opcode_hop_put,
1918         .hs_exit        = nrs_tbf_opcode_hop_exit,
1919 };
1920
1921 static int
1922 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
1923                     struct nrs_tbf_head *head)
1924 {
1925         struct nrs_tbf_cmd      start = { 0 };
1926         int rc;
1927
1928         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1929                                             NRS_TBF_NID_BITS,
1930                                             NRS_TBF_NID_BITS,
1931                                             NRS_TBF_NID_BKT_BITS, 0,
1932                                             CFS_HASH_MIN_THETA,
1933                                             CFS_HASH_MAX_THETA,
1934                                             &nrs_tbf_opcode_hash_ops,
1935                                             CFS_HASH_RW_BKTLOCK);
1936         if (head->th_cli_hash == NULL)
1937                 return -ENOMEM;
1938
1939         start.u.tc_start.ts_opcodes = NULL;
1940         start.u.tc_start.ts_opcodes_str = "*";
1941
1942         start.u.tc_start.ts_rpc_rate = tbf_rate;
1943         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1944         start.tc_name = NRS_TBF_DEFAULT_RULE;
1945         rc = nrs_tbf_rule_start(policy, head, &start);
1946
1947         return rc;
1948 }
1949
1950 static struct nrs_tbf_client *
1951 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
1952                         struct ptlrpc_request *req)
1953 {
1954         __u32 opc;
1955
1956         opc = lustre_msg_get_opc(req->rq_reqmsg);
1957         return cfs_hash_lookup(head->th_cli_hash, &opc);
1958 }
1959
1960 static struct nrs_tbf_client *
1961 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
1962                            struct nrs_tbf_client *cli)
1963 {
1964         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
1965                                        &cli->tc_hnode);
1966 }
1967
1968 static void
1969 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
1970                         struct ptlrpc_request *req)
1971 {
1972         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
1973 }
1974
1975 #define MAX_OPCODE_LEN  32
1976 static int
1977 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
1978 {
1979         int     op = 0;
1980         char    opcode_str[MAX_OPCODE_LEN];
1981
1982         if (id->ls_len + 1 > MAX_OPCODE_LEN)
1983                 return -EINVAL;
1984
1985         memcpy(opcode_str, id->ls_str, id->ls_len);
1986         opcode_str[id->ls_len] = '\0';
1987
1988         op = ll_str2opcode(opcode_str);
1989         if (op < 0)
1990                 return -EINVAL;
1991
1992         cfs_bitmap_set(opcodes, op);
1993         return 0;
1994 }
1995
1996 static int
1997 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
1998 {
1999         struct cfs_bitmap *opcodes;
2000         struct cfs_lstr src;
2001         struct cfs_lstr res;
2002         int rc = 0;
2003         ENTRY;
2004
2005         opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
2006         if (opcodes == NULL)
2007                 return -ENOMEM;
2008
2009         src.ls_str = str;
2010         src.ls_len = len;
2011         while (src.ls_str) {
2012                 rc = cfs_gettok(&src, ' ', &res);
2013                 if (rc == 0) {
2014                         rc = -EINVAL;
2015                         break;
2016                 }
2017                 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2018                 if (rc)
2019                         break;
2020         }
2021
2022         if (rc == 0)
2023                 *bitmaptr = opcodes;
2024         else
2025                 CFS_FREE_BITMAP(opcodes);
2026
2027         RETURN(rc);
2028 }
2029
2030 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2031 {
2032         if (cmd->u.tc_start.ts_opcodes)
2033                 CFS_FREE_BITMAP(cmd->u.tc_start.ts_opcodes);
2034
2035         if (cmd->u.tc_start.ts_opcodes_str)
2036                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2037                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2038
2039 }
2040
2041 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2042 {
2043         struct cfs_lstr src;
2044         int rc;
2045
2046         src.ls_str = id;
2047         src.ls_len = strlen(id);
2048         rc = nrs_tbf_check_id_value(&src, "opcode");
2049         if (rc)
2050                 return rc;
2051
2052         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2053         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2054                 return -ENOMEM;
2055
2056         memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2057
2058         /* parse opcode list */
2059         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2060                                        strlen(cmd->u.tc_start.ts_opcodes_str),
2061                                        &cmd->u.tc_start.ts_opcodes);
2062         if (rc)
2063                 nrs_tbf_opcode_cmd_fini(cmd);
2064
2065         return rc;
2066 }
2067
2068 static int
2069 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2070                           struct nrs_tbf_client *cli)
2071 {
2072         if (rule->tr_opcodes == NULL)
2073                 return 0;
2074
2075         return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2076 }
2077
2078 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2079                                     struct nrs_tbf_rule *rule,
2080                                     struct nrs_tbf_cmd *start)
2081 {
2082         int rc = 0;
2083
2084         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2085         OBD_ALLOC(rule->tr_opcodes_str,
2086                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2087         if (rule->tr_opcodes_str == NULL)
2088                 return -ENOMEM;
2089
2090         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2091                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2092
2093         /* Default rule '*' */
2094         if (start->u.tc_start.ts_opcodes == NULL)
2095                 return 0;
2096
2097         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2098                                        strlen(rule->tr_opcodes_str),
2099                                        &rule->tr_opcodes);
2100         if (rc)
2101                 OBD_FREE(rule->tr_opcodes_str,
2102                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2103
2104         return rc;
2105 }
2106
2107 static int
2108 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2109 {
2110         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2111                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2112                    atomic_read(&rule->tr_ref) - 1);
2113         return 0;
2114 }
2115
2116
2117 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2118         .o_name = NRS_TBF_TYPE_OPCODE,
2119         .o_startup = nrs_tbf_opcode_startup,
2120         .o_cli_find = nrs_tbf_opcode_cli_find,
2121         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2122         .o_cli_put = nrs_tbf_nid_cli_put,
2123         .o_cli_init = nrs_tbf_opcode_cli_init,
2124         .o_rule_init = nrs_tbf_opcode_rule_init,
2125         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2126         .o_rule_match = nrs_tbf_opcode_rule_match,
2127         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2128 };
2129
2130 static struct nrs_tbf_type nrs_tbf_types[] = {
2131         {
2132                 .ntt_name = NRS_TBF_TYPE_JOBID,
2133                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2134                 .ntt_ops = &nrs_tbf_jobid_ops,
2135         },
2136         {
2137                 .ntt_name = NRS_TBF_TYPE_NID,
2138                 .ntt_flag = NRS_TBF_FLAG_NID,
2139                 .ntt_ops = &nrs_tbf_nid_ops,
2140         },
2141         {
2142                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2143                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2144                 .ntt_ops = &nrs_tbf_opcode_ops,
2145         },
2146         {
2147                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2148                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2149                 .ntt_ops = &nrs_tbf_generic_ops,
2150         },
2151 };
2152
2153 /**
2154  * Is called before the policy transitions into
2155  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2156  * policy-specific private data structure.
2157  *
2158  * \param[in] policy The policy to start
2159  *
2160  * \retval -ENOMEM OOM error
2161  * \retval  0      success
2162  *
2163  * \see nrs_policy_register()
2164  * \see nrs_policy_ctl()
2165  */
2166 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2167 {
2168         struct nrs_tbf_head     *head;
2169         struct nrs_tbf_ops      *ops;
2170         __u32                    type;
2171         char                    *name;
2172         int found = 0;
2173         int i;
2174         int rc = 0;
2175
2176         if (arg == NULL)
2177                 name = NRS_TBF_TYPE_GENERIC;
2178         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2179                 name = arg;
2180         else
2181                 GOTO(out, rc = -EINVAL);
2182
2183         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2184                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2185                         ops = nrs_tbf_types[i].ntt_ops;
2186                         type = nrs_tbf_types[i].ntt_flag;
2187                         found = 1;
2188                         break;
2189                 }
2190         }
2191         if (found == 0)
2192                 GOTO(out, rc = -ENOTSUPP);
2193
2194         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2195         if (head == NULL)
2196                 GOTO(out, rc = -ENOMEM);
2197
2198         memcpy(head->th_type, name, strlen(name));
2199         head->th_type[strlen(name)] = '\0';
2200         head->th_ops = ops;
2201         head->th_type_flag = type;
2202
2203         head->th_binheap = cfs_binheap_create(&nrs_tbf_heap_ops,
2204                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2205                                               nrs_pol2cptab(policy),
2206                                               nrs_pol2cptid(policy));
2207         if (head->th_binheap == NULL)
2208                 GOTO(out_free_head, rc = -ENOMEM);
2209
2210         atomic_set(&head->th_rule_sequence, 0);
2211         spin_lock_init(&head->th_rule_lock);
2212         INIT_LIST_HEAD(&head->th_list);
2213         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2214         head->th_timer.function = nrs_tbf_timer_cb;
2215         rc = head->th_ops->o_startup(policy, head);
2216         if (rc)
2217                 GOTO(out_free_heap, rc);
2218
2219         policy->pol_private = head;
2220         return 0;
2221 out_free_heap:
2222         cfs_binheap_destroy(head->th_binheap);
2223 out_free_head:
2224         OBD_FREE_PTR(head);
2225 out:
2226         return rc;
2227 }
2228
2229 /**
2230  * Is called before the policy transitions into
2231  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2232  * private data structure.
2233  *
2234  * \param[in] policy The policy to stop
2235  *
2236  * \see nrs_policy_stop0()
2237  */
2238 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2239 {
2240         struct nrs_tbf_head *head = policy->pol_private;
2241         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2242         struct nrs_tbf_rule *rule, *n;
2243
2244         LASSERT(head != NULL);
2245         LASSERT(head->th_cli_hash != NULL);
2246         hrtimer_cancel(&head->th_timer);
2247         /* Should cleanup hash first before free rules */
2248         cfs_hash_putref(head->th_cli_hash);
2249         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2250                 list_del_init(&rule->tr_linkage);
2251                 nrs_tbf_rule_put(rule);
2252         }
2253         LASSERT(list_empty(&head->th_list));
2254         LASSERT(head->th_binheap != NULL);
2255         LASSERT(cfs_binheap_is_empty(head->th_binheap));
2256         cfs_binheap_destroy(head->th_binheap);
2257         OBD_FREE_PTR(head);
2258         nrs->nrs_throttling = 0;
2259         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2260 }
2261
2262 /**
2263  * Performs a policy-specific ctl function on TBF policy instances; similar
2264  * to ioctl.
2265  *
2266  * \param[in]     policy the policy instance
2267  * \param[in]     opc    the opcode
2268  * \param[in,out] arg    used for passing parameters and information
2269  *
2270  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2271  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2272  *
2273  * \retval 0   operation carried out successfully
2274  * \retval -ve error
2275  */
2276 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2277                        enum ptlrpc_nrs_ctl opc,
2278                        void *arg)
2279 {
2280         int rc = 0;
2281         ENTRY;
2282
2283         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2284
2285         switch ((enum nrs_ctl_tbf)opc) {
2286         default:
2287                 RETURN(-EINVAL);
2288
2289         /**
2290          * Read RPC rate size of a policy instance.
2291          */
2292         case NRS_CTL_TBF_RD_RULE: {
2293                 struct nrs_tbf_head *head = policy->pol_private;
2294                 struct seq_file *m = (struct seq_file *) arg;
2295                 struct ptlrpc_service_part *svcpt;
2296
2297                 svcpt = policy->pol_nrs->nrs_svcpt;
2298                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2299
2300                 rc = nrs_tbf_rule_dump_all(head, m);
2301                 }
2302                 break;
2303
2304         /**
2305          * Write RPC rate of a policy instance.
2306          */
2307         case NRS_CTL_TBF_WR_RULE: {
2308                 struct nrs_tbf_head *head = policy->pol_private;
2309                 struct nrs_tbf_cmd *cmd;
2310
2311                 cmd = (struct nrs_tbf_cmd *)arg;
2312                 rc = nrs_tbf_command(policy,
2313                                      head,
2314                                      cmd);
2315                 }
2316                 break;
2317         /**
2318          * Read the TBF policy type of a policy instance.
2319          */
2320         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2321                 struct nrs_tbf_head *head = policy->pol_private;
2322
2323                 *(__u32 *)arg = head->th_type_flag;
2324                 }
2325                 break;
2326         }
2327
2328         RETURN(rc);
2329 }
2330
2331 /**
2332  * Is called for obtaining a TBF policy resource.
2333  *
2334  * \param[in]  policy     The policy on which the request is being asked for
2335  * \param[in]  nrq        The request for which resources are being taken
2336  * \param[in]  parent     Parent resource, unused in this policy
2337  * \param[out] resp       Resources references are placed in this array
2338  * \param[in]  moving_req Signifies limited caller context; unused in this
2339  *                        policy
2340  *
2341  *
2342  * \see nrs_resource_get_safe()
2343  */
2344 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2345                            struct ptlrpc_nrs_request *nrq,
2346                            const struct ptlrpc_nrs_resource *parent,
2347                            struct ptlrpc_nrs_resource **resp,
2348                            bool moving_req)
2349 {
2350         struct nrs_tbf_head   *head;
2351         struct nrs_tbf_client *cli;
2352         struct nrs_tbf_client *tmp;
2353         struct ptlrpc_request *req;
2354
2355         if (parent == NULL) {
2356                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2357                 return 0;
2358         }
2359
2360         head = container_of(parent, struct nrs_tbf_head, th_res);
2361         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2362         cli = head->th_ops->o_cli_find(head, req);
2363         if (cli != NULL) {
2364                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2365                 LASSERT(cli->tc_rule);
2366                 if (cli->tc_rule_sequence !=
2367                     atomic_read(&head->th_rule_sequence) ||
2368                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2369                         struct nrs_tbf_rule *rule;
2370
2371                         CDEBUG(D_RPCTRACE,
2372                                "TBF class@%p rate %llu sequence %d, "
2373                                "rule flags %d, head sequence %d\n",
2374                                cli, cli->tc_rpc_rate,
2375                                cli->tc_rule_sequence,
2376                                cli->tc_rule->tr_flags,
2377                                atomic_read(&head->th_rule_sequence));
2378                         rule = nrs_tbf_rule_match(head, cli);
2379                         if (rule != cli->tc_rule) {
2380                                 nrs_tbf_cli_reset(head, rule, cli);
2381                         } else {
2382                                 if (cli->tc_rule_generation != rule->tr_generation)
2383                                         nrs_tbf_cli_reset_value(head, cli);
2384                                 nrs_tbf_rule_put(rule);
2385                         }
2386                 } else if (cli->tc_rule_generation !=
2387                            cli->tc_rule->tr_generation) {
2388                         nrs_tbf_cli_reset_value(head, cli);
2389                 }
2390                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2391                 goto out;
2392         }
2393
2394         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2395                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2396         if (cli == NULL)
2397                 return -ENOMEM;
2398
2399         nrs_tbf_cli_init(head, cli, req);
2400         tmp = head->th_ops->o_cli_findadd(head, cli);
2401         if (tmp != cli) {
2402                 atomic_dec(&cli->tc_ref);
2403                 nrs_tbf_cli_fini(cli);
2404                 cli = tmp;
2405         }
2406 out:
2407         *resp = &cli->tc_res;
2408
2409         return 1;
2410 }
2411
2412 /**
2413  * Called when releasing references to the resource hierachy obtained for a
2414  * request for scheduling using the TBF policy.
2415  *
2416  * \param[in] policy   the policy the resource belongs to
2417  * \param[in] res      the resource to be released
2418  */
2419 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
2420                             const struct ptlrpc_nrs_resource *res)
2421 {
2422         struct nrs_tbf_head   *head;
2423         struct nrs_tbf_client *cli;
2424
2425         /**
2426          * Do nothing for freeing parent, nrs_tbf_net resources
2427          */
2428         if (res->res_parent == NULL)
2429                 return;
2430
2431         cli = container_of(res, struct nrs_tbf_client, tc_res);
2432         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
2433
2434         head->th_ops->o_cli_put(head, cli);
2435 }
2436
2437 /**
2438  * Called when getting a request from the TBF policy for handling, or just
2439  * peeking; removes the request from the policy when it is to be handled.
2440  *
2441  * \param[in] policy The policy
2442  * \param[in] peek   When set, signifies that we just want to examine the
2443  *                   request, and not handle it, so the request is not removed
2444  *                   from the policy.
2445  * \param[in] force  Force the policy to return a request; unused in this
2446  *                   policy
2447  *
2448  * \retval The request to be handled; this is the next request in the TBF
2449  *         rule
2450  *
2451  * \see ptlrpc_nrs_req_get_nolock()
2452  * \see nrs_request_get()
2453  */
2454 static
2455 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
2456                                            bool peek, bool force)
2457 {
2458         struct nrs_tbf_head       *head = policy->pol_private;
2459         struct ptlrpc_nrs_request *nrq = NULL;
2460         struct nrs_tbf_client     *cli;
2461         struct cfs_binheap_node   *node;
2462
2463         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2464
2465         if (!peek && policy->pol_nrs->nrs_throttling)
2466                 return NULL;
2467
2468         node = cfs_binheap_root(head->th_binheap);
2469         if (unlikely(node == NULL))
2470                 return NULL;
2471
2472         cli = container_of(node, struct nrs_tbf_client, tc_node);
2473         LASSERT(cli->tc_in_heap);
2474         if (peek) {
2475                 nrq = list_entry(cli->tc_list.next,
2476                                      struct ptlrpc_nrs_request,
2477                                      nr_u.tbf.tr_list);
2478         } else {
2479                 struct nrs_tbf_rule *rule = cli->tc_rule;
2480                 __u64 now = ktime_to_ns(ktime_get());
2481                 __u64 passed;
2482                 __u64 ntoken;
2483                 __u64 deadline;
2484                 __u64 old_resid = 0;
2485
2486                 deadline = cli->tc_check_time +
2487                           cli->tc_nsecs;
2488                 LASSERT(now >= cli->tc_check_time);
2489                 passed = now - cli->tc_check_time;
2490                 ntoken = passed * cli->tc_rpc_rate;
2491                 do_div(ntoken, NSEC_PER_SEC);
2492
2493                 ntoken += cli->tc_ntoken;
2494                 if (rule->tr_flags & NTRS_REALTIME) {
2495                         LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
2496                         old_resid = cli->tc_nsecs_resid;
2497                         cli->tc_nsecs_resid += passed % cli->tc_nsecs;
2498                         if (cli->tc_nsecs_resid > cli->tc_nsecs) {
2499                                 ntoken++;
2500                                 cli->tc_nsecs_resid -= cli->tc_nsecs;
2501                         }
2502                 } else if (ntoken > cli->tc_depth)
2503                         ntoken = cli->tc_depth;
2504
2505                 if (ntoken > 0) {
2506                         struct ptlrpc_request *req;
2507                         nrq = list_entry(cli->tc_list.next,
2508                                              struct ptlrpc_nrs_request,
2509                                              nr_u.tbf.tr_list);
2510                         req = container_of(nrq,
2511                                            struct ptlrpc_request,
2512                                            rq_nrq);
2513                         ntoken--;
2514                         cli->tc_ntoken = ntoken;
2515                         cli->tc_check_time = now;
2516                         list_del_init(&nrq->nr_u.tbf.tr_list);
2517                         if (list_empty(&cli->tc_list)) {
2518                                 cfs_binheap_remove(head->th_binheap,
2519                                                    &cli->tc_node);
2520                                 cli->tc_in_heap = false;
2521                         } else {
2522                                 if (!(rule->tr_flags & NTRS_REALTIME))
2523                                         cli->tc_deadline = now + cli->tc_nsecs;
2524                                 cfs_binheap_relocate(head->th_binheap,
2525                                                      &cli->tc_node);
2526                         }
2527                         CDEBUG(D_RPCTRACE,
2528                                "TBF dequeues: class@%p rate %llu gen %llu "
2529                                "token %llu, rule@%p rate %llu gen %llu\n",
2530                                cli, cli->tc_rpc_rate,
2531                                cli->tc_rule_generation, cli->tc_ntoken,
2532                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
2533                                cli->tc_rule->tr_generation);
2534                 } else {
2535                         ktime_t time;
2536
2537                         if (rule->tr_flags & NTRS_REALTIME) {
2538                                 cli->tc_deadline = deadline;
2539                                 cli->tc_nsecs_resid = old_resid;
2540                                 cfs_binheap_relocate(head->th_binheap,
2541                                                      &cli->tc_node);
2542                                 if (node != cfs_binheap_root(head->th_binheap))
2543                                         return nrs_tbf_req_get(policy,
2544                                                                peek, force);
2545                         }
2546                         policy->pol_nrs->nrs_throttling = 1;
2547                         head->th_deadline = deadline;
2548                         time = ktime_set(0, 0);
2549                         time = ktime_add_ns(time, deadline);
2550                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
2551                 }
2552         }
2553
2554         return nrq;
2555 }
2556
2557 /**
2558  * Adds request \a nrq to \a policy's list of queued requests
2559  *
2560  * \param[in] policy The policy
2561  * \param[in] nrq    The request to add
2562  *
2563  * \retval 0 success; nrs_request_enqueue() assumes this function will always
2564  *                    succeed
2565  */
2566 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
2567                            struct ptlrpc_nrs_request *nrq)
2568 {
2569         struct nrs_tbf_head   *head;
2570         struct nrs_tbf_client *cli;
2571         int                    rc = 0;
2572
2573         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2574
2575         cli = container_of(nrs_request_resource(nrq),
2576                            struct nrs_tbf_client, tc_res);
2577         head = container_of(nrs_request_resource(nrq)->res_parent,
2578                             struct nrs_tbf_head, th_res);
2579         if (list_empty(&cli->tc_list)) {
2580                 LASSERT(!cli->tc_in_heap);
2581                 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
2582                 rc = cfs_binheap_insert(head->th_binheap, &cli->tc_node);
2583                 if (rc == 0) {
2584                         cli->tc_in_heap = true;
2585                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
2586                         list_add_tail(&nrq->nr_u.tbf.tr_list,
2587                                           &cli->tc_list);
2588                         if (policy->pol_nrs->nrs_throttling) {
2589                                 __u64 deadline = cli->tc_deadline;
2590                                 if ((head->th_deadline > deadline) &&
2591                                     (hrtimer_try_to_cancel(&head->th_timer)
2592                                      >= 0)) {
2593                                         ktime_t time;
2594                                         head->th_deadline = deadline;
2595                                         time = ktime_set(0, 0);
2596                                         time = ktime_add_ns(time, deadline);
2597                                         hrtimer_start(&head->th_timer, time,
2598                                                       HRTIMER_MODE_ABS);
2599                                 }
2600                         }
2601                 }
2602         } else {
2603                 LASSERT(cli->tc_in_heap);
2604                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
2605                 list_add_tail(&nrq->nr_u.tbf.tr_list,
2606                                   &cli->tc_list);
2607         }
2608
2609         if (rc == 0)
2610                 CDEBUG(D_RPCTRACE,
2611                        "TBF enqueues: class@%p rate %llu gen %llu "
2612                        "token %llu, rule@%p rate %llu gen %llu\n",
2613                        cli, cli->tc_rpc_rate,
2614                        cli->tc_rule_generation, cli->tc_ntoken,
2615                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
2616                        cli->tc_rule->tr_generation);
2617
2618         return rc;
2619 }
2620
2621 /**
2622  * Removes request \a nrq from \a policy's list of queued requests.
2623  *
2624  * \param[in] policy The policy
2625  * \param[in] nrq    The request to remove
2626  */
2627 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
2628                              struct ptlrpc_nrs_request *nrq)
2629 {
2630         struct nrs_tbf_head   *head;
2631         struct nrs_tbf_client *cli;
2632
2633         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2634
2635         cli = container_of(nrs_request_resource(nrq),
2636                            struct nrs_tbf_client, tc_res);
2637         head = container_of(nrs_request_resource(nrq)->res_parent,
2638                             struct nrs_tbf_head, th_res);
2639
2640         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
2641         list_del_init(&nrq->nr_u.tbf.tr_list);
2642         if (list_empty(&cli->tc_list)) {
2643                 cfs_binheap_remove(head->th_binheap,
2644                                    &cli->tc_node);
2645                 cli->tc_in_heap = false;
2646         } else {
2647                 cfs_binheap_relocate(head->th_binheap,
2648                                      &cli->tc_node);
2649         }
2650 }
2651
2652 /**
2653  * Prints a debug statement right before the request \a nrq stops being
2654  * handled.
2655  *
2656  * \param[in] policy The policy handling the request
2657  * \param[in] nrq    The request being handled
2658  *
2659  * \see ptlrpc_server_finish_request()
2660  * \see ptlrpc_nrs_req_stop_nolock()
2661  */
2662 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
2663                               struct ptlrpc_nrs_request *nrq)
2664 {
2665         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
2666                                                   rq_nrq);
2667
2668         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2669
2670         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
2671                policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
2672                nrq->nr_u.tbf.tr_sequence);
2673 }
2674
2675 #ifdef CONFIG_PROC_FS
2676
2677 /**
2678  * lprocfs interface
2679  */
2680
2681 /**
2682  * The maximum RPC rate.
2683  */
2684 #define LPROCFS_NRS_RATE_MAX            65535
2685
2686 static int
2687 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
2688 {
2689         struct ptlrpc_service       *svc = m->private;
2690         int                          rc;
2691
2692         seq_printf(m, "regular_requests:\n");
2693         /**
2694          * Perform two separate calls to this as only one of the NRS heads'
2695          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
2696          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
2697          */
2698         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
2699                                        NRS_POL_NAME_TBF,
2700                                        NRS_CTL_TBF_RD_RULE,
2701                                        false, m);
2702         if (rc == 0) {
2703                 /**
2704                  * -ENOSPC means buf in the parameter m is overflow, return 0
2705                  * here to let upper layer function seq_read alloc a larger
2706                  * memory area and do this process again.
2707                  */
2708         } else if (rc == -ENOSPC) {
2709                 return 0;
2710
2711                 /**
2712                  * Ignore -ENODEV as the regular NRS head's policy may be in the
2713                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
2714                  */
2715         } else if (rc != -ENODEV) {
2716                 return rc;
2717         }
2718
2719         if (!nrs_svc_has_hp(svc))
2720                 goto no_hp;
2721
2722         seq_printf(m, "high_priority_requests:\n");
2723         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
2724                                        NRS_POL_NAME_TBF,
2725                                        NRS_CTL_TBF_RD_RULE,
2726                                        false, m);
2727         if (rc == 0) {
2728                 /**
2729                  * -ENOSPC means buf in the parameter m is overflow, return 0
2730                  * here to let upper layer function seq_read alloc a larger
2731                  * memory area and do this process again.
2732                  */
2733         } else if (rc == -ENOSPC) {
2734                 return 0;
2735         }
2736
2737 no_hp:
2738
2739         return rc;
2740 }
2741
2742 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
2743 {
2744         int rc;
2745
2746         switch (cmd->u.tc_start.ts_valid_type) {
2747         case NRS_TBF_FLAG_JOBID:
2748                 rc = nrs_tbf_jobid_parse(cmd, token);
2749                 break;
2750         case NRS_TBF_FLAG_NID:
2751                 rc = nrs_tbf_nid_parse(cmd, token);
2752                 break;
2753         case NRS_TBF_FLAG_OPCODE:
2754                 rc = nrs_tbf_opcode_parse(cmd, token);
2755                 break;
2756         case NRS_TBF_FLAG_GENERIC:
2757                 rc = nrs_tbf_generic_parse(cmd, token);
2758                 break;
2759         default:
2760                 RETURN(-EINVAL);
2761         }
2762
2763         return rc;
2764 }
2765
2766 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
2767 {
2768         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
2769                 if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_JOBID)
2770                         nrs_tbf_jobid_cmd_fini(cmd);
2771                 else if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_NID)
2772                         nrs_tbf_nid_cmd_fini(cmd);
2773                 else if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_OPCODE)
2774                         nrs_tbf_opcode_cmd_fini(cmd);
2775                 else if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_GENERIC)
2776                         nrs_tbf_generic_cmd_fini(cmd);
2777         }
2778 }
2779
2780 static bool name_is_valid(const char *name)
2781 {
2782         int i;
2783
2784         for (i = 0; i < strlen(name); i++) {
2785                 if ((!isalnum(name[i])) &&
2786                     (name[i] != '_'))
2787                         return false;
2788         }
2789         return true;
2790 }
2791
2792 static int
2793 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
2794 {
2795         char    *key;
2796         char    *val;
2797         int      rc;
2798         __u64    rate;
2799
2800         val = buffer;
2801         key = strsep(&val, "=");
2802         if (val == NULL || strlen(val) == 0)
2803                 return -EINVAL;
2804
2805         /* Key of the value pair */
2806         if (strcmp(key, "rate") == 0) {
2807                 rc = kstrtoull(val, 10, &rate);
2808                 if (rc)
2809                         return rc;
2810
2811                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
2812                         return -EINVAL;
2813
2814                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
2815                         cmd->u.tc_start.ts_rpc_rate = rate;
2816                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
2817                         cmd->u.tc_change.tc_rpc_rate = rate;
2818                 else
2819                         return -EINVAL;
2820         }  else if (strcmp(key, "rank") == 0) {
2821                 if (!name_is_valid(val))
2822                         return -EINVAL;
2823
2824                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
2825                         cmd->u.tc_start.ts_next_name = val;
2826                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
2827                         cmd->u.tc_change.tc_next_name = val;
2828                 else
2829                         return -EINVAL;
2830         } else if (strcmp(key, "realtime") == 0) {
2831                 unsigned long realtime;
2832
2833                 rc = kstrtoul(val, 10, &realtime);
2834                 if (rc)
2835                         return rc;
2836
2837                 if (realtime > 0)
2838                         cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
2839         } else {
2840                 return -EINVAL;
2841         }
2842         return 0;
2843 }
2844
2845 static int
2846 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
2847 {
2848         char    *val;
2849         char    *token;
2850         int      rc;
2851
2852         val = buffer;
2853         while (val != NULL && strlen(val) != 0) {
2854                 token = strsep(&val, " ");
2855                 rc = nrs_tbf_parse_value_pair(cmd, token);
2856                 if (rc)
2857                         return rc;
2858         }
2859
2860         switch (cmd->tc_cmd) {
2861         case NRS_CTL_TBF_START_RULE:
2862                 if (cmd->u.tc_start.ts_rpc_rate == 0)
2863                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
2864                 break;
2865         case NRS_CTL_TBF_CHANGE_RULE:
2866                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
2867                     cmd->u.tc_change.tc_next_name == NULL)
2868                         return -EINVAL;
2869                 break;
2870         case NRS_CTL_TBF_STOP_RULE:
2871                 break;
2872         default:
2873                 return -EINVAL;
2874         }
2875         return 0;
2876 }
2877
2878 static struct nrs_tbf_cmd *
2879 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
2880 {
2881         static struct nrs_tbf_cmd       *cmd;
2882         char                            *token;
2883         char                            *val;
2884         int                              rc = 0;
2885
2886         OBD_ALLOC_PTR(cmd);
2887         if (cmd == NULL)
2888                 GOTO(out, rc = -ENOMEM);
2889         memset(cmd, 0, sizeof(*cmd));
2890
2891         val = buffer;
2892         token = strsep(&val, " ");
2893         if (val == NULL || strlen(val) == 0)
2894                 GOTO(out_free_cmd, rc = -EINVAL);
2895
2896         /* Type of the command */
2897         if (strcmp(token, "start") == 0) {
2898                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
2899                 cmd->u.tc_start.ts_valid_type = type_flag;
2900         } else if (strcmp(token, "stop") == 0)
2901                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
2902         else if (strcmp(token, "change") == 0)
2903                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
2904         else
2905                 GOTO(out_free_cmd, rc = -EINVAL);
2906
2907         /* Name of the rule */
2908         token = strsep(&val, " ");
2909         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) ||
2910             !name_is_valid(token))
2911                 GOTO(out_free_cmd, rc = -EINVAL);
2912         cmd->tc_name = token;
2913
2914         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
2915                 /* List of ID */
2916                 LASSERT(val);
2917                 token = val;
2918                 val = strrchr(token, '}');
2919                 if (!val)
2920                         GOTO(out_free_cmd, rc = -EINVAL);
2921
2922                 /* Skip '}' */
2923                 val++;
2924                 if (*val == '\0') {
2925                         val = NULL;
2926                 } else if (*val == ' ') {
2927                         *val = '\0';
2928                         val++;
2929                 } else
2930                         GOTO(out_free_cmd, rc = -EINVAL);
2931
2932                 rc = nrs_tbf_id_parse(cmd, token);
2933                 if (rc)
2934                         GOTO(out_free_cmd, rc);
2935         }
2936
2937         rc = nrs_tbf_parse_value_pairs(cmd, val);
2938         if (rc)
2939                 GOTO(out_cmd_fini, rc = -EINVAL);
2940         goto out;
2941 out_cmd_fini:
2942         nrs_tbf_cmd_fini(cmd);
2943 out_free_cmd:
2944         OBD_FREE_PTR(cmd);
2945 out:
2946         if (rc)
2947                 cmd = ERR_PTR(rc);
2948         return cmd;
2949 }
2950
2951 /**
2952  * Get the TBF policy type (nid, jobid, etc) preset by
2953  * proc entry 'nrs_policies' for command buffer parsing.
2954  *
2955  * \param[in] svc the PTLRPC service
2956  * \param[in] queue the NRS queue type
2957  *
2958  * \retval the preset TBF policy type flag
2959  */
2960 static __u32
2961 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
2962 {
2963         __u32   type;
2964         int     rc;
2965
2966         rc = ptlrpc_nrs_policy_control(svc, queue,
2967                                        NRS_POL_NAME_TBF,
2968                                        NRS_CTL_TBF_RD_TYPE_FLAG,
2969                                        true, &type);
2970         if (rc != 0)
2971                 type = NRS_TBF_FLAG_INVALID;
2972
2973         return type;
2974 }
2975
2976 extern struct nrs_core nrs_core;
2977 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
2978 static ssize_t
2979 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
2980                                       const char __user *buffer,
2981                                       size_t count, loff_t *off)
2982 {
2983         struct seq_file           *m = file->private_data;
2984         struct ptlrpc_service     *svc = m->private;
2985         char                      *kernbuf;
2986         char                      *val;
2987         int                        rc;
2988         static struct nrs_tbf_cmd *cmd;
2989         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
2990         unsigned long              length;
2991         char                      *token;
2992
2993         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
2994         if (kernbuf == NULL)
2995                 GOTO(out, rc = -ENOMEM);
2996
2997         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
2998                 GOTO(out_free_kernbuff, rc = -EINVAL);
2999
3000         if (copy_from_user(kernbuf, buffer, count))
3001                 GOTO(out_free_kernbuff, rc = -EFAULT);
3002
3003         val = kernbuf;
3004         token = strsep(&val, " ");
3005         if (val == NULL)
3006                 GOTO(out_free_kernbuff, rc = -EINVAL);
3007
3008         if (strcmp(token, "reg") == 0) {
3009                 queue = PTLRPC_NRS_QUEUE_REG;
3010         } else if (strcmp(token, "hp") == 0) {
3011                 queue = PTLRPC_NRS_QUEUE_HP;
3012         } else {
3013                 kernbuf[strlen(token)] = ' ';
3014                 val = kernbuf;
3015         }
3016         length = strlen(val);
3017
3018         if (length == 0)
3019                 GOTO(out_free_kernbuff, rc = -EINVAL);
3020
3021         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3022                 GOTO(out_free_kernbuff, rc = -ENODEV);
3023         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3024                 queue = PTLRPC_NRS_QUEUE_REG;
3025
3026         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3027         if (IS_ERR(cmd))
3028                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3029
3030         /**
3031          * Serialize NRS core lprocfs operations with policy registration/
3032          * unregistration.
3033          */
3034         mutex_lock(&nrs_core.nrs_mutex);
3035         rc = ptlrpc_nrs_policy_control(svc, queue,
3036                                        NRS_POL_NAME_TBF,
3037                                        NRS_CTL_TBF_WR_RULE,
3038                                        false, cmd);
3039         mutex_unlock(&nrs_core.nrs_mutex);
3040
3041         nrs_tbf_cmd_fini(cmd);
3042         OBD_FREE_PTR(cmd);
3043 out_free_kernbuff:
3044         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3045 out:
3046         return rc ? rc : count;
3047 }
3048 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3049
3050 /**
3051  * Initializes a TBF policy's lprocfs interface for service \a svc
3052  *
3053  * \param[in] svc the service
3054  *
3055  * \retval 0    success
3056  * \retval != 0 error
3057  */
3058 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3059 {
3060         struct lprocfs_vars nrs_tbf_lprocfs_vars[] = {
3061                 { .name         = "nrs_tbf_rule",
3062                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3063                   .data = svc },
3064                 { NULL }
3065         };
3066
3067         if (svc->srv_procroot == NULL)
3068                 return 0;
3069
3070         return lprocfs_add_vars(svc->srv_procroot, nrs_tbf_lprocfs_vars, NULL);
3071 }
3072
3073 /**
3074  * Cleans up a TBF policy's lprocfs interface for service \a svc
3075  *
3076  * \param[in] svc the service
3077  */
3078 static void nrs_tbf_lprocfs_fini(struct ptlrpc_service *svc)
3079 {
3080         if (svc->srv_procroot == NULL)
3081                 return;
3082
3083         lprocfs_remove_proc_entry("nrs_tbf_rule", svc->srv_procroot);
3084 }
3085
3086 #endif /* CONFIG_PROC_FS */
3087
3088 /**
3089  * TBF policy operations
3090  */
3091 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3092         .op_policy_start        = nrs_tbf_start,
3093         .op_policy_stop         = nrs_tbf_stop,
3094         .op_policy_ctl          = nrs_tbf_ctl,
3095         .op_res_get             = nrs_tbf_res_get,
3096         .op_res_put             = nrs_tbf_res_put,
3097         .op_req_get             = nrs_tbf_req_get,
3098         .op_req_enqueue         = nrs_tbf_req_add,
3099         .op_req_dequeue         = nrs_tbf_req_del,
3100         .op_req_stop            = nrs_tbf_req_stop,
3101 #ifdef CONFIG_PROC_FS
3102         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3103         .op_lprocfs_fini        = nrs_tbf_lprocfs_fini,
3104 #endif
3105 };
3106
3107 /**
3108  * TBF policy configuration
3109  */
3110 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3111         .nc_name                = NRS_POL_NAME_TBF,
3112         .nc_ops                 = &nrs_tbf_ops,
3113         .nc_compat              = nrs_policy_compat_all,
3114 };
3115
3116 /** @} tbf */
3117
3118 /** @} nrs */
3119
3120 #endif /* HAVE_SERVER_SUPPORT */