Whamcloud - gitweb
LU-8066 ldlm: move all remaining files from procfs to debugfs
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 #ifdef HAVE_SERVER_SUPPORT
35
36 /**
37  * \addtogoup nrs
38  * @{
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <libcfs/libcfs.h>
45 #include <lustre_req_layout.h>
46 #include "ptlrpc_internal.h"
47
48 /**
49  * \name tbf
50  *
51  * Token Bucket Filter over client NIDs
52  *
53  * @{
54  */
55
56 #define NRS_POL_NAME_TBF        "tbf"
57
58 static int tbf_jobid_cache_size = 8192;
59 module_param(tbf_jobid_cache_size, int, 0644);
60 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
61
62 static int tbf_rate = 10000;
63 module_param(tbf_rate, int, 0644);
64 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
65
66 static int tbf_depth = 3;
67 module_param(tbf_depth, int, 0644);
68 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
69
70 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
71 {
72         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
73                                                  th_timer);
74         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
75         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
76
77         nrs->nrs_throttling = 0;
78         wake_up(&svcpt->scp_waitq);
79
80         return HRTIMER_NORESTART;
81 }
82
83 #define NRS_TBF_DEFAULT_RULE "default"
84
85 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
86 {
87         LASSERT(atomic_read(&rule->tr_ref) == 0);
88         LASSERT(list_empty(&rule->tr_cli_list));
89         LASSERT(list_empty(&rule->tr_linkage));
90
91         rule->tr_head->th_ops->o_rule_fini(rule);
92         OBD_FREE_PTR(rule);
93 }
94
95 /**
96  * Decreases the rule's usage reference count, and stops the rule in case it
97  * was already stopping and have no more outstanding usage references (which
98  * indicates it has no more queued or started requests, and can be safely
99  * stopped).
100  */
101 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
102 {
103         if (atomic_dec_and_test(&rule->tr_ref))
104                 nrs_tbf_rule_fini(rule);
105 }
106
107 /**
108  * Increases the rule's usage reference count.
109  */
110 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
111 {
112         atomic_inc(&rule->tr_ref);
113 }
114
115 static void
116 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
117 {
118         LASSERT(!list_empty(&cli->tc_linkage));
119         LASSERT(cli->tc_rule);
120         spin_lock(&cli->tc_rule->tr_rule_lock);
121         list_del_init(&cli->tc_linkage);
122         spin_unlock(&cli->tc_rule->tr_rule_lock);
123         nrs_tbf_rule_put(cli->tc_rule);
124         cli->tc_rule = NULL;
125 }
126
127 static void
128 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
129                         struct nrs_tbf_client *cli)
130
131 {
132         struct nrs_tbf_rule *rule = cli->tc_rule;
133
134         cli->tc_rpc_rate = rule->tr_rpc_rate;
135         cli->tc_nsecs = rule->tr_nsecs;
136         cli->tc_depth = rule->tr_depth;
137         cli->tc_ntoken = rule->tr_depth;
138         cli->tc_check_time = ktime_to_ns(ktime_get());
139         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
140         cli->tc_rule_generation = rule->tr_generation;
141
142         if (cli->tc_in_heap)
143                 cfs_binheap_relocate(head->th_binheap,
144                                      &cli->tc_node);
145 }
146
147 static void
148 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
149                   struct nrs_tbf_rule *rule,
150                   struct nrs_tbf_client *cli)
151 {
152         spin_lock(&cli->tc_rule_lock);
153         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
154                 LASSERT(rule != cli->tc_rule);
155                 nrs_tbf_cli_rule_put(cli);
156         }
157         LASSERT(cli->tc_rule == NULL);
158         LASSERT(list_empty(&cli->tc_linkage));
159         /* Rule's ref is added before called */
160         cli->tc_rule = rule;
161         spin_lock(&rule->tr_rule_lock);
162         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
163         spin_unlock(&rule->tr_rule_lock);
164         spin_unlock(&cli->tc_rule_lock);
165         nrs_tbf_cli_reset_value(head, cli);
166 }
167
168 static int
169 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
170 {
171         return rule->tr_head->th_ops->o_rule_dump(rule, m);
172 }
173
174 static int
175 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
176 {
177         struct nrs_tbf_rule *rule;
178         int rc = 0;
179
180         LASSERT(head != NULL);
181         spin_lock(&head->th_rule_lock);
182         /* List the rules from newest to oldest */
183         list_for_each_entry(rule, &head->th_list, tr_linkage) {
184                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
185                 rc = nrs_tbf_rule_dump(rule, m);
186                 if (rc) {
187                         rc = -ENOSPC;
188                         break;
189                 }
190         }
191         spin_unlock(&head->th_rule_lock);
192
193         return rc;
194 }
195
196 static struct nrs_tbf_rule *
197 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
198                          const char *name)
199 {
200         struct nrs_tbf_rule *rule;
201
202         LASSERT(head != NULL);
203         list_for_each_entry(rule, &head->th_list, tr_linkage) {
204                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
205                 if (strcmp(rule->tr_name, name) == 0) {
206                         nrs_tbf_rule_get(rule);
207                         return rule;
208                 }
209         }
210         return NULL;
211 }
212
213 static struct nrs_tbf_rule *
214 nrs_tbf_rule_find(struct nrs_tbf_head *head,
215                   const char *name)
216 {
217         struct nrs_tbf_rule *rule;
218
219         LASSERT(head != NULL);
220         spin_lock(&head->th_rule_lock);
221         rule = nrs_tbf_rule_find_nolock(head, name);
222         spin_unlock(&head->th_rule_lock);
223         return rule;
224 }
225
226 static struct nrs_tbf_rule *
227 nrs_tbf_rule_match(struct nrs_tbf_head *head,
228                    struct nrs_tbf_client *cli)
229 {
230         struct nrs_tbf_rule *rule = NULL;
231         struct nrs_tbf_rule *tmp_rule;
232
233         spin_lock(&head->th_rule_lock);
234         /* Match the newest rule in the list */
235         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
236                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
237                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
238                         rule = tmp_rule;
239                         break;
240                 }
241         }
242
243         if (rule == NULL)
244                 rule = head->th_rule;
245
246         nrs_tbf_rule_get(rule);
247         spin_unlock(&head->th_rule_lock);
248         return rule;
249 }
250
251 static void
252 nrs_tbf_cli_init(struct nrs_tbf_head *head,
253                  struct nrs_tbf_client *cli,
254                  struct ptlrpc_request *req)
255 {
256         struct nrs_tbf_rule *rule;
257
258         memset(cli, 0, sizeof(*cli));
259         cli->tc_in_heap = false;
260         head->th_ops->o_cli_init(cli, req);
261         INIT_LIST_HEAD(&cli->tc_list);
262         INIT_LIST_HEAD(&cli->tc_linkage);
263         spin_lock_init(&cli->tc_rule_lock);
264         atomic_set(&cli->tc_ref, 1);
265         rule = nrs_tbf_rule_match(head, cli);
266         nrs_tbf_cli_reset(head, rule, cli);
267 }
268
269 static void
270 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
271 {
272         LASSERT(list_empty(&cli->tc_list));
273         LASSERT(!cli->tc_in_heap);
274         LASSERT(atomic_read(&cli->tc_ref) == 0);
275         spin_lock(&cli->tc_rule_lock);
276         nrs_tbf_cli_rule_put(cli);
277         spin_unlock(&cli->tc_rule_lock);
278         OBD_FREE_PTR(cli);
279 }
280
281 static int
282 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
283                    struct nrs_tbf_head *head,
284                    struct nrs_tbf_cmd *start)
285 {
286         struct nrs_tbf_rule     *rule;
287         struct nrs_tbf_rule     *tmp_rule;
288         struct nrs_tbf_rule     *next_rule;
289         char                    *next_name = start->u.tc_start.ts_next_name;
290         int                      rc;
291
292         rule = nrs_tbf_rule_find(head, start->tc_name);
293         if (rule) {
294                 nrs_tbf_rule_put(rule);
295                 return -EEXIST;
296         }
297
298         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
299         if (rule == NULL)
300                 return -ENOMEM;
301
302         memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
303         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
304         rule->tr_flags = start->u.tc_start.ts_rule_flags;
305         rule->tr_nsecs = NSEC_PER_SEC;
306         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
307         rule->tr_depth = tbf_depth;
308         atomic_set(&rule->tr_ref, 1);
309         INIT_LIST_HEAD(&rule->tr_cli_list);
310         INIT_LIST_HEAD(&rule->tr_nids);
311         INIT_LIST_HEAD(&rule->tr_linkage);
312         spin_lock_init(&rule->tr_rule_lock);
313         rule->tr_head = head;
314
315         rc = head->th_ops->o_rule_init(policy, rule, start);
316         if (rc) {
317                 OBD_FREE_PTR(rule);
318                 return rc;
319         }
320
321         /* Add as the newest rule */
322         spin_lock(&head->th_rule_lock);
323         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
324         if (tmp_rule) {
325                 spin_unlock(&head->th_rule_lock);
326                 nrs_tbf_rule_put(tmp_rule);
327                 nrs_tbf_rule_put(rule);
328                 return -EEXIST;
329         }
330
331         if (next_name) {
332                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
333                 if (!next_rule) {
334                         spin_unlock(&head->th_rule_lock);
335                         nrs_tbf_rule_put(rule);
336                         return -ENOENT;
337                 }
338
339                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
340                 nrs_tbf_rule_put(next_rule);
341         } else {
342                 /* Add on the top of the rule list */
343                 list_add(&rule->tr_linkage, &head->th_list);
344         }
345         spin_unlock(&head->th_rule_lock);
346         atomic_inc(&head->th_rule_sequence);
347         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
348                 rule->tr_flags |= NTRS_DEFAULT;
349                 LASSERT(head->th_rule == NULL);
350                 head->th_rule = rule;
351         }
352
353         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %llu gen %llu\n",
354                rule, rule->tr_rpc_rate, rule->tr_generation);
355
356         return 0;
357 }
358
359 /**
360  * Change the rank of a rule in the rule list
361  *
362  * The matched rule will be moved to the position right before another
363  * given rule.
364  *
365  * \param[in] policy    the policy instance
366  * \param[in] head      the TBF policy instance
367  * \param[in] name      the rule name to be moved
368  * \param[in] next_name the rule name before which the matched rule will be
369  *                      moved
370  *
371  */
372 static int
373 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
374                          struct nrs_tbf_head *head,
375                          char *name,
376                          char *next_name)
377 {
378         struct nrs_tbf_rule     *rule = NULL;
379         struct nrs_tbf_rule     *next_rule = NULL;
380         int                      rc = 0;
381
382         LASSERT(head != NULL);
383
384         spin_lock(&head->th_rule_lock);
385         rule = nrs_tbf_rule_find_nolock(head, name);
386         if (!rule)
387                 GOTO(out, rc = -ENOENT);
388
389         if (strcmp(name, next_name) == 0)
390                 GOTO(out_put, rc);
391
392         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
393         if (!next_rule)
394                 GOTO(out_put, rc = -ENOENT);
395
396         list_move(&rule->tr_linkage, next_rule->tr_linkage.prev);
397         nrs_tbf_rule_put(next_rule);
398 out_put:
399         nrs_tbf_rule_put(rule);
400 out:
401         spin_unlock(&head->th_rule_lock);
402         return rc;
403 }
404
405 static int
406 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
407                          struct nrs_tbf_head *head,
408                          char *name,
409                          __u64 rate)
410 {
411         struct nrs_tbf_rule *rule;
412
413         assert_spin_locked(&policy->pol_nrs->nrs_lock);
414
415         rule = nrs_tbf_rule_find(head, name);
416         if (rule == NULL)
417                 return -ENOENT;
418
419         rule->tr_rpc_rate = rate;
420         rule->tr_nsecs = NSEC_PER_SEC;
421         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
422         rule->tr_generation++;
423         nrs_tbf_rule_put(rule);
424
425         return 0;
426 }
427
428 static int
429 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
430                     struct nrs_tbf_head *head,
431                     struct nrs_tbf_cmd *change)
432 {
433         __u64    rate = change->u.tc_change.tc_rpc_rate;
434         char    *next_name = change->u.tc_change.tc_next_name;
435         int      rc;
436
437         if (rate != 0) {
438                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
439                                               rate);
440                 if (rc)
441                         return rc;
442         }
443
444         if (next_name) {
445                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
446                                               next_name);
447                 if (rc)
448                         return rc;
449         }
450
451         return 0;
452 }
453
454 static int
455 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
456                   struct nrs_tbf_head *head,
457                   struct nrs_tbf_cmd *stop)
458 {
459         struct nrs_tbf_rule *rule;
460
461         assert_spin_locked(&policy->pol_nrs->nrs_lock);
462
463         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
464                 return -EPERM;
465
466         rule = nrs_tbf_rule_find(head, stop->tc_name);
467         if (rule == NULL)
468                 return -ENOENT;
469
470         list_del_init(&rule->tr_linkage);
471         rule->tr_flags |= NTRS_STOPPING;
472         nrs_tbf_rule_put(rule);
473         nrs_tbf_rule_put(rule);
474
475         return 0;
476 }
477
478 static int
479 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
480                 struct nrs_tbf_head *head,
481                 struct nrs_tbf_cmd *cmd)
482 {
483         int rc;
484
485         assert_spin_locked(&policy->pol_nrs->nrs_lock);
486
487         switch (cmd->tc_cmd) {
488         case NRS_CTL_TBF_START_RULE:
489                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
490                         return -EINVAL;
491
492                 spin_unlock(&policy->pol_nrs->nrs_lock);
493                 rc = nrs_tbf_rule_start(policy, head, cmd);
494                 spin_lock(&policy->pol_nrs->nrs_lock);
495                 return rc;
496         case NRS_CTL_TBF_CHANGE_RULE:
497                 rc = nrs_tbf_rule_change(policy, head, cmd);
498                 return rc;
499         case NRS_CTL_TBF_STOP_RULE:
500                 rc = nrs_tbf_rule_stop(policy, head, cmd);
501                 /* Take it as a success, if not exists at all */
502                 return rc == -ENOENT ? 0 : rc;
503         default:
504                 return -EFAULT;
505         }
506 }
507
508 /**
509  * Binary heap predicate.
510  *
511  * \param[in] e1 the first binheap node to compare
512  * \param[in] e2 the second binheap node to compare
513  *
514  * \retval 0 e1 > e2
515  * \retval 1 e1 < e2
516  */
517 static int
518 tbf_cli_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
519 {
520         struct nrs_tbf_client *cli1;
521         struct nrs_tbf_client *cli2;
522
523         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
524         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
525
526         if (cli1->tc_deadline < cli2->tc_deadline)
527                 return 1;
528         else if (cli1->tc_deadline > cli2->tc_deadline)
529                 return 0;
530
531         if (cli1->tc_check_time < cli2->tc_check_time)
532                 return 1;
533         else if (cli1->tc_check_time > cli2->tc_check_time)
534                 return 0;
535
536         /* Maybe need more comparasion, e.g. request number in the rules */
537         return 1;
538 }
539
540 /**
541  * TBF binary heap operations
542  */
543 static struct cfs_binheap_ops nrs_tbf_heap_ops = {
544         .hop_enter      = NULL,
545         .hop_exit       = NULL,
546         .hop_compare    = tbf_cli_compare,
547 };
548
549 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
550                                   unsigned mask)
551 {
552         return cfs_hash_djb2_hash(key, strlen(key), mask);
553 }
554
555 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
556 {
557         struct nrs_tbf_client *cli = hlist_entry(hnode,
558                                                      struct nrs_tbf_client,
559                                                      tc_hnode);
560
561         return (strcmp(cli->tc_jobid, key) == 0);
562 }
563
564 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
565 {
566         struct nrs_tbf_client *cli = hlist_entry(hnode,
567                                                      struct nrs_tbf_client,
568                                                      tc_hnode);
569
570         return cli->tc_jobid;
571 }
572
573 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
574 {
575         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
576 }
577
578 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
579 {
580         struct nrs_tbf_client *cli = hlist_entry(hnode,
581                                                      struct nrs_tbf_client,
582                                                      tc_hnode);
583
584         atomic_inc(&cli->tc_ref);
585 }
586
587 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
588 {
589         struct nrs_tbf_client *cli = hlist_entry(hnode,
590                                                      struct nrs_tbf_client,
591                                                      tc_hnode);
592
593         atomic_dec(&cli->tc_ref);
594 }
595
596 static void
597 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
598
599 {
600         struct nrs_tbf_client *cli = hlist_entry(hnode,
601                                                  struct nrs_tbf_client,
602                                                  tc_hnode);
603
604         LASSERT(atomic_read(&cli->tc_ref) == 0);
605         nrs_tbf_cli_fini(cli);
606 }
607
608 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
609         .hs_hash        = nrs_tbf_jobid_hop_hash,
610         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
611         .hs_key         = nrs_tbf_jobid_hop_key,
612         .hs_object      = nrs_tbf_hop_object,
613         .hs_get         = nrs_tbf_jobid_hop_get,
614         .hs_put         = nrs_tbf_jobid_hop_put,
615         .hs_put_locked  = nrs_tbf_jobid_hop_put,
616         .hs_exit        = nrs_tbf_jobid_hop_exit,
617 };
618
619 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
620                                   CFS_HASH_NO_ITEMREF | \
621                                   CFS_HASH_DEPTH)
622
623 static struct nrs_tbf_client *
624 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
625                           struct cfs_hash_bd *bd,
626                           const char *jobid)
627 {
628         struct hlist_node *hnode;
629         struct nrs_tbf_client *cli;
630
631         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
632         if (hnode == NULL)
633                 return NULL;
634
635         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
636         if (!list_empty(&cli->tc_lru))
637                 list_del_init(&cli->tc_lru);
638         return cli;
639 }
640
641 #define NRS_TBF_JOBID_NULL ""
642
643 static struct nrs_tbf_client *
644 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
645                        struct ptlrpc_request *req)
646 {
647         const char              *jobid;
648         struct nrs_tbf_client   *cli;
649         struct cfs_hash         *hs = head->th_cli_hash;
650         struct cfs_hash_bd               bd;
651
652         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
653         if (jobid == NULL)
654                 jobid = NRS_TBF_JOBID_NULL;
655         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
656         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
657         cfs_hash_bd_unlock(hs, &bd, 1);
658
659         return cli;
660 }
661
662 static struct nrs_tbf_client *
663 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
664                           struct nrs_tbf_client *cli)
665 {
666         const char              *jobid;
667         struct nrs_tbf_client   *ret;
668         struct cfs_hash         *hs = head->th_cli_hash;
669         struct cfs_hash_bd               bd;
670
671         jobid = cli->tc_jobid;
672         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
673         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
674         if (ret == NULL) {
675                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
676                 ret = cli;
677         }
678         cfs_hash_bd_unlock(hs, &bd, 1);
679
680         return ret;
681 }
682
683 static void
684 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
685                       struct nrs_tbf_client *cli)
686 {
687         struct cfs_hash_bd               bd;
688         struct cfs_hash         *hs = head->th_cli_hash;
689         struct nrs_tbf_bucket   *bkt;
690         int                      hw;
691         struct list_head        zombies;
692
693         INIT_LIST_HEAD(&zombies);
694         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
695         bkt = cfs_hash_bd_extra_get(hs, &bd);
696         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
697                 return;
698         LASSERT(list_empty(&cli->tc_lru));
699         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
700
701         /*
702          * Check and purge the LRU, there is at least one client in the LRU.
703          */
704         hw = tbf_jobid_cache_size >>
705              (hs->hs_cur_bits - hs->hs_bkt_bits);
706         while (cfs_hash_bd_count_get(&bd) > hw) {
707                 if (unlikely(list_empty(&bkt->ntb_lru)))
708                         break;
709                 cli = list_entry(bkt->ntb_lru.next,
710                                      struct nrs_tbf_client,
711                                      tc_lru);
712                 LASSERT(atomic_read(&cli->tc_ref) == 0);
713                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
714                 list_move(&cli->tc_lru, &zombies);
715         }
716         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
717
718         while (!list_empty(&zombies)) {
719                 cli = container_of0(zombies.next,
720                                     struct nrs_tbf_client, tc_lru);
721                 list_del_init(&cli->tc_lru);
722                 nrs_tbf_cli_fini(cli);
723         }
724 }
725
726 static void
727 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
728                        struct ptlrpc_request *req)
729 {
730         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
731
732         if (jobid == NULL)
733                 jobid = NRS_TBF_JOBID_NULL;
734         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
735         INIT_LIST_HEAD(&cli->tc_lru);
736         memcpy(cli->tc_jobid, jobid, strlen(jobid));
737 }
738
739 static int nrs_tbf_jobid_hash_order(void)
740 {
741         int bits;
742
743         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
744                 ;
745
746         return bits;
747 }
748
749 #define NRS_TBF_JOBID_BKT_BITS 10
750
751 static int
752 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
753                       struct nrs_tbf_head *head)
754 {
755         struct nrs_tbf_cmd       start;
756         struct nrs_tbf_bucket   *bkt;
757         int                      bits;
758         int                      i;
759         int                      rc;
760         struct cfs_hash_bd       bd;
761
762         bits = nrs_tbf_jobid_hash_order();
763         if (bits < NRS_TBF_JOBID_BKT_BITS)
764                 bits = NRS_TBF_JOBID_BKT_BITS;
765         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
766                                             bits,
767                                             bits,
768                                             NRS_TBF_JOBID_BKT_BITS,
769                                             sizeof(*bkt),
770                                             0,
771                                             0,
772                                             &nrs_tbf_jobid_hash_ops,
773                                             NRS_TBF_JOBID_HASH_FLAGS);
774         if (head->th_cli_hash == NULL)
775                 return -ENOMEM;
776
777         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
778                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
779                 INIT_LIST_HEAD(&bkt->ntb_lru);
780         }
781
782         memset(&start, 0, sizeof(start));
783         start.u.tc_start.ts_jobids_str = "*";
784
785         start.u.tc_start.ts_rpc_rate = tbf_rate;
786         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
787         start.tc_name = NRS_TBF_DEFAULT_RULE;
788         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
789         rc = nrs_tbf_rule_start(policy, head, &start);
790         if (rc) {
791                 cfs_hash_putref(head->th_cli_hash);
792                 head->th_cli_hash = NULL;
793         }
794
795         return rc;
796 }
797
798 /**
799  * Frees jobid of \a list.
800  *
801  */
802 static void
803 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
804 {
805         struct nrs_tbf_jobid *jobid, *n;
806
807         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
808                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
809                 list_del(&jobid->tj_linkage);
810                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
811         }
812 }
813
814 static int
815 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
816 {
817         struct nrs_tbf_jobid *jobid;
818         char *ptr;
819
820         OBD_ALLOC(jobid, sizeof(struct nrs_tbf_jobid));
821         if (jobid == NULL)
822                 return -ENOMEM;
823
824         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
825         if (jobid->tj_id == NULL) {
826                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
827                 return -ENOMEM;
828         }
829
830         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
831         ptr = lprocfs_strnstr(id->ls_str, "*", id->ls_len);
832         if (ptr == NULL)
833                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
834         else
835                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
836
837         list_add_tail(&jobid->tj_linkage, jobid_list);
838         return 0;
839 }
840
841 static bool
842 cfs_match_wildcard(const char *pattern, const char *content)
843 {
844         if (*pattern == '\0' && *content == '\0')
845                 return true;
846
847         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
848                 return false;
849
850         while (*pattern == *content) {
851                 pattern++;
852                 content++;
853                 if (*pattern == '\0' && *content == '\0')
854                         return true;
855
856                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
857                     *content == '\0')
858                         return false;
859         }
860
861         if (*pattern == '*')
862                 return (cfs_match_wildcard(pattern + 1, content) ||
863                         cfs_match_wildcard(pattern, content + 1));
864
865         return false;
866 }
867
868 static inline bool
869 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
870 {
871         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
872                 return strcmp(jobid->tj_id, id) == 0;
873
874         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
875                 return cfs_match_wildcard(jobid->tj_id, id);
876
877         return false;
878 }
879
880 static int
881 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
882 {
883         struct nrs_tbf_jobid *jobid;
884
885         list_for_each_entry(jobid, jobid_list, tj_linkage) {
886                 if (nrs_tbf_jobid_match(jobid, id))
887                         return 1;
888         }
889         return 0;
890 }
891
892 static int
893 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
894 {
895         struct cfs_lstr src;
896         struct cfs_lstr res;
897         int rc = 0;
898         ENTRY;
899
900         src.ls_str = str;
901         src.ls_len = len;
902         INIT_LIST_HEAD(jobid_list);
903         while (src.ls_str) {
904                 rc = cfs_gettok(&src, ' ', &res);
905                 if (rc == 0) {
906                         rc = -EINVAL;
907                         break;
908                 }
909                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
910                 if (rc)
911                         break;
912         }
913         if (rc)
914                 nrs_tbf_jobid_list_free(jobid_list);
915         RETURN(rc);
916 }
917
918 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
919 {
920         if (!list_empty(&cmd->u.tc_start.ts_jobids))
921                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
922         if (cmd->u.tc_start.ts_jobids_str)
923                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
924                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
925 }
926
927 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
928 {
929         struct cfs_lstr res;
930         int keylen = strlen(key);
931         int rc;
932
933         rc = cfs_gettok(src, '=', &res);
934         if (rc == 0 || res.ls_len != keylen ||
935             strncmp(res.ls_str, key, keylen) != 0 ||
936             src->ls_len <= 2 || src->ls_str[0] != '{' ||
937             src->ls_str[src->ls_len - 1] != '}')
938                 return -EINVAL;
939
940         /* Skip '{' and '}' */
941         src->ls_str++;
942         src->ls_len -= 2;
943         return 0;
944 }
945
946 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
947 {
948         struct cfs_lstr src;
949         int rc;
950
951         src.ls_str = id;
952         src.ls_len = strlen(id);
953         rc = nrs_tbf_check_id_value(&src, "jobid");
954         if (rc)
955                 return rc;
956
957         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
958         if (cmd->u.tc_start.ts_jobids_str == NULL)
959                 return -ENOMEM;
960
961         memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
962
963         /* parse jobid list */
964         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
965                                       strlen(cmd->u.tc_start.ts_jobids_str),
966                                       &cmd->u.tc_start.ts_jobids);
967         if (rc)
968                 nrs_tbf_jobid_cmd_fini(cmd);
969
970         return rc;
971 }
972
973 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
974                                    struct nrs_tbf_rule *rule,
975                                    struct nrs_tbf_cmd *start)
976 {
977         int rc = 0;
978
979         LASSERT(start->u.tc_start.ts_jobids_str);
980         OBD_ALLOC(rule->tr_jobids_str,
981                   strlen(start->u.tc_start.ts_jobids_str) + 1);
982         if (rule->tr_jobids_str == NULL)
983                 return -ENOMEM;
984
985         memcpy(rule->tr_jobids_str,
986                start->u.tc_start.ts_jobids_str,
987                strlen(start->u.tc_start.ts_jobids_str));
988
989         INIT_LIST_HEAD(&rule->tr_jobids);
990         if (!list_empty(&start->u.tc_start.ts_jobids)) {
991                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
992                                               strlen(rule->tr_jobids_str),
993                                               &rule->tr_jobids);
994                 if (rc)
995                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
996         }
997         if (rc)
998                 OBD_FREE(rule->tr_jobids_str,
999                          strlen(start->u.tc_start.ts_jobids_str) + 1);
1000         return rc;
1001 }
1002
1003 static int
1004 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1005 {
1006         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1007                    rule->tr_jobids_str, rule->tr_rpc_rate,
1008                    atomic_read(&rule->tr_ref) - 1);
1009         return 0;
1010 }
1011
1012 static int
1013 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1014                          struct nrs_tbf_client *cli)
1015 {
1016         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1017 }
1018
1019 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1020 {
1021         if (!list_empty(&rule->tr_jobids))
1022                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1023         LASSERT(rule->tr_jobids_str != NULL);
1024         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1025 }
1026
1027 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1028         .o_name = NRS_TBF_TYPE_JOBID,
1029         .o_startup = nrs_tbf_jobid_startup,
1030         .o_cli_find = nrs_tbf_jobid_cli_find,
1031         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1032         .o_cli_put = nrs_tbf_jobid_cli_put,
1033         .o_cli_init = nrs_tbf_jobid_cli_init,
1034         .o_rule_init = nrs_tbf_jobid_rule_init,
1035         .o_rule_dump = nrs_tbf_jobid_rule_dump,
1036         .o_rule_match = nrs_tbf_jobid_rule_match,
1037         .o_rule_fini = nrs_tbf_jobid_rule_fini,
1038 };
1039
1040 /**
1041  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1042  *
1043  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
1044  * nrs_tbf_client objects.
1045  */
1046 #define NRS_TBF_NID_BKT_BITS    8
1047 #define NRS_TBF_NID_BITS        16
1048
1049 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1050                                   unsigned mask)
1051 {
1052         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1053 }
1054
1055 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1056 {
1057         lnet_nid_t            *nid = (lnet_nid_t *)key;
1058         struct nrs_tbf_client *cli = hlist_entry(hnode,
1059                                                      struct nrs_tbf_client,
1060                                                      tc_hnode);
1061
1062         return *nid == cli->tc_nid;
1063 }
1064
1065 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1066 {
1067         struct nrs_tbf_client *cli = hlist_entry(hnode,
1068                                                      struct nrs_tbf_client,
1069                                                      tc_hnode);
1070
1071         return &cli->tc_nid;
1072 }
1073
1074 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1075 {
1076         struct nrs_tbf_client *cli = hlist_entry(hnode,
1077                                                      struct nrs_tbf_client,
1078                                                      tc_hnode);
1079
1080         atomic_inc(&cli->tc_ref);
1081 }
1082
1083 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1084 {
1085         struct nrs_tbf_client *cli = hlist_entry(hnode,
1086                                                      struct nrs_tbf_client,
1087                                                      tc_hnode);
1088
1089         atomic_dec(&cli->tc_ref);
1090 }
1091
1092 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1093 {
1094         struct nrs_tbf_client *cli = hlist_entry(hnode,
1095                                                      struct nrs_tbf_client,
1096                                                      tc_hnode);
1097
1098         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1099                  "Busy TBF object from client with NID %s, with %d refs\n",
1100                  libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1101
1102         nrs_tbf_cli_fini(cli);
1103 }
1104
1105 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1106         .hs_hash        = nrs_tbf_nid_hop_hash,
1107         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1108         .hs_key         = nrs_tbf_nid_hop_key,
1109         .hs_object      = nrs_tbf_hop_object,
1110         .hs_get         = nrs_tbf_nid_hop_get,
1111         .hs_put         = nrs_tbf_nid_hop_put,
1112         .hs_put_locked  = nrs_tbf_nid_hop_put,
1113         .hs_exit        = nrs_tbf_nid_hop_exit,
1114 };
1115
1116 static struct nrs_tbf_client *
1117 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1118                      struct ptlrpc_request *req)
1119 {
1120         return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1121 }
1122
1123 static struct nrs_tbf_client *
1124 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1125                         struct nrs_tbf_client *cli)
1126 {
1127         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1128                                        &cli->tc_hnode);
1129 }
1130
1131 static void
1132 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1133                       struct nrs_tbf_client *cli)
1134 {
1135         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1136 }
1137
1138 static int
1139 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1140                     struct nrs_tbf_head *head)
1141 {
1142         struct nrs_tbf_cmd      start;
1143         int rc;
1144
1145         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1146                                             NRS_TBF_NID_BITS,
1147                                             NRS_TBF_NID_BITS,
1148                                             NRS_TBF_NID_BKT_BITS, 0,
1149                                             CFS_HASH_MIN_THETA,
1150                                             CFS_HASH_MAX_THETA,
1151                                             &nrs_tbf_nid_hash_ops,
1152                                             CFS_HASH_RW_BKTLOCK);
1153         if (head->th_cli_hash == NULL)
1154                 return -ENOMEM;
1155
1156         memset(&start, 0, sizeof(start));
1157         start.u.tc_start.ts_nids_str = "*";
1158
1159         start.u.tc_start.ts_rpc_rate = tbf_rate;
1160         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1161         start.tc_name = NRS_TBF_DEFAULT_RULE;
1162         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1163         rc = nrs_tbf_rule_start(policy, head, &start);
1164         if (rc) {
1165                 cfs_hash_putref(head->th_cli_hash);
1166                 head->th_cli_hash = NULL;
1167         }
1168
1169         return rc;
1170 }
1171
1172 static void
1173 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1174                              struct ptlrpc_request *req)
1175 {
1176         cli->tc_nid = req->rq_peer.nid;
1177 }
1178
1179 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1180                                  struct nrs_tbf_rule *rule,
1181                                  struct nrs_tbf_cmd *start)
1182 {
1183         LASSERT(start->u.tc_start.ts_nids_str);
1184         OBD_ALLOC(rule->tr_nids_str,
1185                   strlen(start->u.tc_start.ts_nids_str) + 1);
1186         if (rule->tr_nids_str == NULL)
1187                 return -ENOMEM;
1188
1189         memcpy(rule->tr_nids_str,
1190                start->u.tc_start.ts_nids_str,
1191                strlen(start->u.tc_start.ts_nids_str));
1192
1193         INIT_LIST_HEAD(&rule->tr_nids);
1194         if (!list_empty(&start->u.tc_start.ts_nids)) {
1195                 if (cfs_parse_nidlist(rule->tr_nids_str,
1196                                       strlen(rule->tr_nids_str),
1197                                       &rule->tr_nids) <= 0) {
1198                         CERROR("nids {%s} illegal\n",
1199                                rule->tr_nids_str);
1200                         OBD_FREE(rule->tr_nids_str,
1201                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1202                         return -EINVAL;
1203                 }
1204         }
1205         return 0;
1206 }
1207
1208 static int
1209 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1210 {
1211         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1212                    rule->tr_nids_str, rule->tr_rpc_rate,
1213                    atomic_read(&rule->tr_ref) - 1);
1214         return 0;
1215 }
1216
1217 static int
1218 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1219                        struct nrs_tbf_client *cli)
1220 {
1221         return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1222 }
1223
1224 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1225 {
1226         if (!list_empty(&rule->tr_nids))
1227                 cfs_free_nidlist(&rule->tr_nids);
1228         LASSERT(rule->tr_nids_str != NULL);
1229         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1230 }
1231
1232 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1233 {
1234         if (!list_empty(&cmd->u.tc_start.ts_nids))
1235                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1236         if (cmd->u.tc_start.ts_nids_str)
1237                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1238                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1239 }
1240
1241 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1242 {
1243         struct cfs_lstr src;
1244         int rc;
1245
1246         src.ls_str = id;
1247         src.ls_len = strlen(id);
1248         rc = nrs_tbf_check_id_value(&src, "nid");
1249         if (rc)
1250                 return rc;
1251
1252         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1253         if (cmd->u.tc_start.ts_nids_str == NULL)
1254                 return -ENOMEM;
1255
1256         memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1257
1258         /* parse NID list */
1259         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1260                               strlen(cmd->u.tc_start.ts_nids_str),
1261                               &cmd->u.tc_start.ts_nids) <= 0) {
1262                 nrs_tbf_nid_cmd_fini(cmd);
1263                 return -EINVAL;
1264         }
1265
1266         return 0;
1267 }
1268
1269 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1270         .o_name = NRS_TBF_TYPE_NID,
1271         .o_startup = nrs_tbf_nid_startup,
1272         .o_cli_find = nrs_tbf_nid_cli_find,
1273         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1274         .o_cli_put = nrs_tbf_nid_cli_put,
1275         .o_cli_init = nrs_tbf_nid_cli_init,
1276         .o_rule_init = nrs_tbf_nid_rule_init,
1277         .o_rule_dump = nrs_tbf_nid_rule_dump,
1278         .o_rule_match = nrs_tbf_nid_rule_match,
1279         .o_rule_fini = nrs_tbf_nid_rule_fini,
1280 };
1281
1282 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1283                                  unsigned mask)
1284 {
1285         return cfs_hash_djb2_hash(key, strlen(key), mask);
1286 }
1287
1288 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1289 {
1290         struct nrs_tbf_client *cli = hlist_entry(hnode,
1291                                                  struct nrs_tbf_client,
1292                                                  tc_hnode);
1293
1294         return (strcmp(cli->tc_key, key) == 0);
1295 }
1296
1297 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1298 {
1299         struct nrs_tbf_client *cli = hlist_entry(hnode,
1300                                                  struct nrs_tbf_client,
1301                                                  tc_hnode);
1302         return cli->tc_key;
1303 }
1304
1305 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1306 {
1307         struct nrs_tbf_client *cli = hlist_entry(hnode,
1308                                                  struct nrs_tbf_client,
1309                                                  tc_hnode);
1310
1311         atomic_inc(&cli->tc_ref);
1312 }
1313
1314 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1315 {
1316         struct nrs_tbf_client *cli = hlist_entry(hnode,
1317                                                  struct nrs_tbf_client,
1318                                                  tc_hnode);
1319
1320         atomic_dec(&cli->tc_ref);
1321 }
1322
1323 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1324
1325 {
1326         struct nrs_tbf_client *cli = hlist_entry(hnode,
1327                                                  struct nrs_tbf_client,
1328                                                  tc_hnode);
1329
1330         LASSERT(atomic_read(&cli->tc_ref) == 0);
1331         nrs_tbf_cli_fini(cli);
1332 }
1333
1334 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1335         .hs_hash        = nrs_tbf_hop_hash,
1336         .hs_keycmp      = nrs_tbf_hop_keycmp,
1337         .hs_key         = nrs_tbf_hop_key,
1338         .hs_object      = nrs_tbf_hop_object,
1339         .hs_get         = nrs_tbf_hop_get,
1340         .hs_put         = nrs_tbf_hop_put,
1341         .hs_put_locked  = nrs_tbf_hop_put,
1342         .hs_exit        = nrs_tbf_hop_exit,
1343 };
1344
1345 #define NRS_TBF_GENERIC_BKT_BITS        10
1346 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1347                                         CFS_HASH_NO_ITEMREF | \
1348                                         CFS_HASH_DEPTH)
1349
1350 static int
1351 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1352 {
1353         struct nrs_tbf_cmd       start;
1354         struct nrs_tbf_bucket   *bkt;
1355         int                      bits;
1356         int                      i;
1357         int                      rc;
1358         struct cfs_hash_bd       bd;
1359
1360         bits = nrs_tbf_jobid_hash_order();
1361         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1362                 bits = NRS_TBF_GENERIC_BKT_BITS;
1363         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1364                                             bits, bits,
1365                                             NRS_TBF_GENERIC_BKT_BITS,
1366                                             sizeof(*bkt), 0, 0,
1367                                             &nrs_tbf_hash_ops,
1368                                             NRS_TBF_GENERIC_HASH_FLAGS);
1369         if (head->th_cli_hash == NULL)
1370                 return -ENOMEM;
1371
1372         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1373                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1374                 INIT_LIST_HEAD(&bkt->ntb_lru);
1375         }
1376
1377         memset(&start, 0, sizeof(start));
1378         start.u.tc_start.ts_conds_str = "*";
1379
1380         start.u.tc_start.ts_rpc_rate = tbf_rate;
1381         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1382         start.tc_name = NRS_TBF_DEFAULT_RULE;
1383         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1384         rc = nrs_tbf_rule_start(policy, head, &start);
1385         if (rc)
1386                 cfs_hash_putref(head->th_cli_hash);
1387
1388         return rc;
1389 }
1390
1391 static struct nrs_tbf_client *
1392 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1393                         const char *key)
1394 {
1395         struct hlist_node *hnode;
1396         struct nrs_tbf_client *cli;
1397
1398         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1399         if (hnode == NULL)
1400                 return NULL;
1401
1402         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
1403         if (!list_empty(&cli->tc_lru))
1404                 list_del_init(&cli->tc_lru);
1405         return cli;
1406 }
1407
1408 /**
1409  * ONLY opcode presented in this function will be checked in
1410  * nrs_tbf_id_cli_set(). That means, we can add or remove an
1411  * opcode to enable or disable requests handled in nrs_tbf
1412  */
1413 static struct req_format *req_fmt(__u32 opcode)
1414 {
1415         switch (opcode) {
1416         case OST_GETATTR:
1417                 return &RQF_OST_GETATTR;
1418         case OST_SETATTR:
1419                 return &RQF_OST_SETATTR;
1420         case OST_READ:
1421                 return &RQF_OST_BRW_READ;
1422         case OST_WRITE:
1423                 return &RQF_OST_BRW_WRITE;
1424         /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1425          * in most case. Should they be removed? */
1426         case OST_CREATE:
1427                 return &RQF_OST_CREATE;
1428         case OST_DESTROY:
1429                 return &RQF_OST_DESTROY;
1430         case OST_PUNCH:
1431                 return &RQF_OST_PUNCH;
1432         case OST_SYNC:
1433                 return &RQF_OST_SYNC;
1434         case OST_LADVISE:
1435                 return &RQF_OST_LADVISE;
1436         case MDS_GETATTR:
1437                 return &RQF_MDS_GETATTR;
1438         case MDS_GETATTR_NAME:
1439                 return &RQF_MDS_GETATTR_NAME;
1440         /* close is skipped to avoid LDLM cancel slowness */
1441 #if 0
1442         case MDS_CLOSE:
1443                 return &RQF_MDS_CLOSE;
1444 #endif
1445         case MDS_REINT:
1446                 return &RQF_MDS_REINT;
1447         case MDS_READPAGE:
1448                 return &RQF_MDS_READPAGE;
1449         case MDS_GET_ROOT:
1450                 return &RQF_MDS_GET_ROOT;
1451         case MDS_STATFS:
1452                 return &RQF_MDS_STATFS;
1453         case MDS_SYNC:
1454                 return &RQF_MDS_SYNC;
1455         case MDS_QUOTACTL:
1456                 return &RQF_MDS_QUOTACTL;
1457         case MDS_GETXATTR:
1458                 return &RQF_MDS_GETXATTR;
1459         case MDS_GET_INFO:
1460                 return &RQF_MDS_GET_INFO;
1461         /* HSM op is skipped */
1462 #if 0 
1463         case MDS_HSM_STATE_GET:
1464                 return &RQF_MDS_HSM_STATE_GET;
1465         case MDS_HSM_STATE_SET:
1466                 return &RQF_MDS_HSM_STATE_SET;
1467         case MDS_HSM_ACTION:
1468                 return &RQF_MDS_HSM_ACTION;
1469         case MDS_HSM_CT_REGISTER:
1470                 return &RQF_MDS_HSM_CT_REGISTER;
1471         case MDS_HSM_CT_UNREGISTER:
1472                 return &RQF_MDS_HSM_CT_UNREGISTER;
1473 #endif
1474         case MDS_SWAP_LAYOUTS:
1475                 return &RQF_MDS_SWAP_LAYOUTS;
1476         case LDLM_ENQUEUE:
1477                 return &RQF_LDLM_ENQUEUE;
1478         default:
1479                 return NULL;
1480         }
1481 }
1482
1483 static struct req_format *intent_req_fmt(__u32 it_opc)
1484 {
1485         if (it_opc & (IT_OPEN | IT_CREAT))
1486                 return &RQF_LDLM_INTENT_OPEN;
1487         else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1488                 return &RQF_LDLM_INTENT_GETATTR;
1489         else if (it_opc & IT_UNLINK)
1490                 return &RQF_LDLM_INTENT_UNLINK;
1491         else if (it_opc & IT_GETXATTR)
1492                 return &RQF_LDLM_INTENT_GETXATTR;
1493         else if (it_opc & (IT_GLIMPSE | IT_BRW))
1494                 return &RQF_LDLM_INTENT;
1495         else
1496                 return NULL;
1497 }
1498
1499 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1500                               struct tbf_id *id)
1501 {
1502         struct ost_body *body;
1503
1504         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1505         if (body != NULL) {
1506                 id->ti_uid = body->oa.o_uid;
1507                 id->ti_gid = body->oa.o_gid;
1508                 return 0;
1509         }
1510
1511         return -EINVAL;
1512 }
1513
1514 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1515                                       struct tbf_id *id)
1516 {
1517         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1518                                                     &RMF_MDT_BODY);
1519         LASSERT(b != NULL);
1520
1521         /* TODO: nodemaping feature converts {ug}id from individual
1522          * clients to the actual ones of the file system. Some work
1523          * may be needed to fix this. */
1524         id->ti_uid = b->mbo_uid;
1525         id->ti_gid = b->mbo_gid;
1526 }
1527
1528 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1529                                            struct tbf_id *id)
1530 {
1531         struct mdt_rec_reint *rec;
1532
1533         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1534         LASSERT(rec != NULL);
1535
1536         /* use the fs{ug}id as {ug}id of the process */
1537         id->ti_uid = rec->rr_fsuid;
1538         id->ti_gid = rec->rr_fsgid;
1539 }
1540
1541 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1542                               struct tbf_id *id)
1543 {
1544         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1545         int rc = 0;
1546
1547         switch (opc) {
1548         case MDS_GETATTR:
1549         case MDS_GETATTR_NAME:
1550         case MDS_GET_ROOT:
1551         case MDS_READPAGE:
1552         case MDS_SYNC:
1553         case MDS_GETXATTR:
1554         case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1555                 unpack_ugid_from_mdt_body(req, id);
1556                 break;
1557         case MDS_CLOSE:
1558         case MDS_REINT:
1559                 unpack_ugid_from_mdt_rec_reint(req, id);
1560                 break;
1561         default:
1562                 rc = -EINVAL;
1563                 break;
1564         }
1565         return rc;
1566 }
1567
1568 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1569                               struct tbf_id *id)
1570 {
1571         struct ldlm_intent *lit;
1572         struct req_format *fmt;
1573
1574         if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1575                 return -EINVAL;
1576
1577         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1578         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1579         if (lit == NULL)
1580                 return -EINVAL;
1581
1582         fmt = intent_req_fmt(lit->opc);
1583         if (fmt == NULL)
1584                 return -EINVAL;
1585
1586         req_capsule_extend(&req->rq_pill, fmt);
1587
1588         if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1589                 unpack_ugid_from_mdt_body(req, id);
1590         else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1591                 unpack_ugid_from_mdt_rec_reint(req, id);
1592         else
1593                 return -EINVAL;
1594         return 0;
1595 }
1596
1597 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1598                               enum nrs_tbf_flag ti_type)
1599 {
1600         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1601         struct req_format *fmt = req_fmt(opc);
1602         bool fmt_unset = false;
1603         int rc;
1604
1605         memset(id, 0, sizeof(struct tbf_id));
1606         id->ti_type = ti_type;
1607
1608         if (fmt == NULL)
1609                 return -EINVAL;
1610         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1611         if (req->rq_pill.rc_fmt == NULL) {
1612                 req_capsule_set(&req->rq_pill, fmt);
1613                 fmt_unset = true;
1614         }
1615
1616         if (opc < OST_LAST_OPC)
1617                 rc = ost_tbf_id_cli_set(req, id);
1618         else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1619                 rc = mdt_tbf_id_cli_set(req, id);
1620         else if (opc == LDLM_ENQUEUE)
1621                 rc = ldlm_tbf_id_cli_set(req, id);
1622         else
1623                 rc = -EINVAL;
1624
1625         /* restore it to the initialized state */
1626         if (fmt_unset)
1627                 req->rq_pill.rc_fmt = NULL;
1628         return rc;
1629 }
1630
1631 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1632                                        struct ptlrpc_request *req,
1633                                        char *keystr, size_t keystr_sz)
1634 {
1635         const char *jobid;
1636         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1637         struct tbf_id id;
1638
1639         nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1640         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1641         if (jobid == NULL)
1642                 jobid = NRS_TBF_JOBID_NULL;
1643
1644         snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1645                  libcfs_nid2str(req->rq_peer.nid), opc, id.ti_uid,
1646                  id.ti_gid);
1647
1648         if (cli) {
1649                 INIT_LIST_HEAD(&cli->tc_lru);
1650                 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1651                 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1652                 cli->tc_nid = req->rq_peer.nid;
1653                 cli->tc_opcode = opc;
1654                 cli->tc_id = id;
1655         }
1656 }
1657
1658 static struct nrs_tbf_client *
1659 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1660 {
1661         struct nrs_tbf_client *cli;
1662         struct cfs_hash *hs = head->th_cli_hash;
1663         struct cfs_hash_bd bd;
1664         char keystr[NRS_TBF_KEY_LEN];
1665
1666         nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1667         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1668         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1669         cfs_hash_bd_unlock(hs, &bd, 1);
1670
1671         return cli;
1672 }
1673
1674 static struct nrs_tbf_client *
1675 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1676                     struct nrs_tbf_client *cli)
1677 {
1678         const char              *key;
1679         struct nrs_tbf_client   *ret;
1680         struct cfs_hash         *hs = head->th_cli_hash;
1681         struct cfs_hash_bd       bd;
1682
1683         key = cli->tc_key;
1684         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1685         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1686         if (ret == NULL) {
1687                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1688                 ret = cli;
1689         }
1690         cfs_hash_bd_unlock(hs, &bd, 1);
1691
1692         return ret;
1693 }
1694
1695 static void
1696 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1697 {
1698         struct cfs_hash_bd       bd;
1699         struct cfs_hash         *hs = head->th_cli_hash;
1700         struct nrs_tbf_bucket   *bkt;
1701         int                      hw;
1702         struct list_head         zombies;
1703
1704         INIT_LIST_HEAD(&zombies);
1705         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1706         bkt = cfs_hash_bd_extra_get(hs, &bd);
1707         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1708                 return;
1709         LASSERT(list_empty(&cli->tc_lru));
1710         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1711
1712         /**
1713          * Check and purge the LRU, there is at least one client in the LRU.
1714          */
1715         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1716         while (cfs_hash_bd_count_get(&bd) > hw) {
1717                 if (unlikely(list_empty(&bkt->ntb_lru)))
1718                         break;
1719                 cli = list_entry(bkt->ntb_lru.next,
1720                                  struct nrs_tbf_client,
1721                                  tc_lru);
1722                 LASSERT(atomic_read(&cli->tc_ref) == 0);
1723                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1724                 list_move(&cli->tc_lru, &zombies);
1725         }
1726         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1727
1728         while (!list_empty(&zombies)) {
1729                 cli = container_of0(zombies.next,
1730                                     struct nrs_tbf_client, tc_lru);
1731                 list_del_init(&cli->tc_lru);
1732                 nrs_tbf_cli_fini(cli);
1733         }
1734 }
1735
1736 static void
1737 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1738                          struct ptlrpc_request *req)
1739 {
1740         char keystr[NRS_TBF_KEY_LEN];
1741
1742         nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1743 }
1744
1745 static void
1746 nrs_tbf_id_list_free(struct list_head *uid_list)
1747 {
1748         struct nrs_tbf_id *nti_id, *n;
1749
1750         list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1751                 list_del_init(&nti_id->nti_linkage);
1752                 OBD_FREE_PTR(nti_id);
1753         }
1754 }
1755
1756 static void
1757 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1758 {
1759         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1760                 expr->te_field < NRS_TBF_FIELD_MAX);
1761         switch (expr->te_field) {
1762         case NRS_TBF_FIELD_NID:
1763                 cfs_free_nidlist(&expr->te_cond);
1764                 break;
1765         case NRS_TBF_FIELD_JOBID:
1766                 nrs_tbf_jobid_list_free(&expr->te_cond);
1767                 break;
1768         case NRS_TBF_FIELD_OPCODE:
1769                 CFS_FREE_BITMAP(expr->te_opcodes);
1770                 break;
1771         case NRS_TBF_FIELD_UID:
1772         case NRS_TBF_FIELD_GID:
1773                 nrs_tbf_id_list_free(&expr->te_cond);
1774                 break;
1775         default:
1776                 LBUG();
1777         }
1778         OBD_FREE_PTR(expr);
1779 }
1780
1781 static void
1782 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1783 {
1784         struct nrs_tbf_expression *expression;
1785         struct nrs_tbf_expression *n;
1786
1787         LASSERT(list_empty(&conjunction->tc_linkage));
1788         list_for_each_entry_safe(expression, n,
1789                                  &conjunction->tc_expressions,
1790                                  te_linkage) {
1791                 list_del_init(&expression->te_linkage);
1792                 nrs_tbf_expression_free(expression);
1793         }
1794         OBD_FREE_PTR(conjunction);
1795 }
1796
1797 static void
1798 nrs_tbf_conds_free(struct list_head *cond_list)
1799 {
1800         struct nrs_tbf_conjunction *conjunction;
1801         struct nrs_tbf_conjunction *n;
1802
1803         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1804                 list_del_init(&conjunction->tc_linkage);
1805                 nrs_tbf_conjunction_free(conjunction);
1806         }
1807 }
1808
1809 static void
1810 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1811 {
1812         if (!list_empty(&cmd->u.tc_start.ts_conds))
1813                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1814         if (cmd->u.tc_start.ts_conds_str)
1815                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1816                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1817 }
1818
1819 #define NRS_TBF_DISJUNCTION_DELIM       (',')
1820 #define NRS_TBF_CONJUNCTION_DELIM       ('&')
1821 #define NRS_TBF_EXPRESSION_DELIM        ('=')
1822
1823 static inline bool
1824 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1825 {
1826         int len = strlen(str);
1827
1828         return (field->ls_len == len &&
1829                 strncmp(field->ls_str, str, len) == 0);
1830 }
1831
1832 static int
1833 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1834 static int
1835 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
1836                       enum nrs_tbf_flag tif);
1837
1838 static int
1839 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1840 {
1841         struct nrs_tbf_expression *expr;
1842         struct cfs_lstr field;
1843         int rc = 0;
1844
1845         OBD_ALLOC(expr, sizeof(struct nrs_tbf_expression));
1846         if (expr == NULL)
1847                 return -ENOMEM;
1848
1849         rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1850         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1851             src->ls_str[src->ls_len - 1] != '}')
1852                 GOTO(out, rc = -EINVAL);
1853
1854         /* Skip '{' and '}' */
1855         src->ls_str++;
1856         src->ls_len -= 2;
1857
1858         if (nrs_tbf_check_field(&field, "nid")) {
1859                 if (cfs_parse_nidlist(src->ls_str,
1860                                       src->ls_len,
1861                                       &expr->te_cond) <= 0)
1862                         GOTO(out, rc = -EINVAL);
1863                 expr->te_field = NRS_TBF_FIELD_NID;
1864         } else if (nrs_tbf_check_field(&field, "jobid")) {
1865                 if (nrs_tbf_jobid_list_parse(src->ls_str,
1866                                              src->ls_len,
1867                                              &expr->te_cond) < 0)
1868                         GOTO(out, rc = -EINVAL);
1869                 expr->te_field = NRS_TBF_FIELD_JOBID;
1870         } else if (nrs_tbf_check_field(&field, "opcode")) {
1871                 if (nrs_tbf_opcode_list_parse(src->ls_str,
1872                                               src->ls_len,
1873                                               &expr->te_opcodes) < 0)
1874                         GOTO(out, rc = -EINVAL);
1875                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1876         } else if (nrs_tbf_check_field(&field, "uid")) {
1877                 if (nrs_tbf_id_list_parse(src->ls_str,
1878                                           src->ls_len,
1879                                           &expr->te_cond,
1880                                           NRS_TBF_FLAG_UID) < 0)
1881                         GOTO(out, rc = -EINVAL);
1882                 expr->te_field = NRS_TBF_FIELD_UID;
1883         } else if (nrs_tbf_check_field(&field, "gid")) {
1884                 if (nrs_tbf_id_list_parse(src->ls_str,
1885                                           src->ls_len,
1886                                           &expr->te_cond,
1887                                           NRS_TBF_FLAG_GID) < 0)
1888                         GOTO(out, rc = -EINVAL);
1889                 expr->te_field = NRS_TBF_FIELD_GID;
1890         } else {
1891                 GOTO(out, rc = -EINVAL);
1892         }
1893
1894         list_add_tail(&expr->te_linkage, cond_list);
1895         return 0;
1896 out:
1897         OBD_FREE_PTR(expr);
1898         return rc;
1899 }
1900
1901 static int
1902 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1903 {
1904         struct nrs_tbf_conjunction *conjunction;
1905         struct cfs_lstr expr;
1906         int rc = 0;
1907
1908         OBD_ALLOC(conjunction, sizeof(struct nrs_tbf_conjunction));
1909         if (conjunction == NULL)
1910                 return -ENOMEM;
1911
1912         INIT_LIST_HEAD(&conjunction->tc_expressions);
1913         list_add_tail(&conjunction->tc_linkage, cond_list);
1914
1915         while (src->ls_str) {
1916                 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1917                 if (rc == 0) {
1918                         rc = -EINVAL;
1919                         break;
1920                 }
1921                 rc = nrs_tbf_expression_parse(&expr,
1922                                               &conjunction->tc_expressions);
1923                 if (rc)
1924                         break;
1925         }
1926         return rc;
1927 }
1928
1929 static int
1930 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1931 {
1932         struct cfs_lstr src;
1933         struct cfs_lstr res;
1934         int rc = 0;
1935
1936         src.ls_str = str;
1937         src.ls_len = len;
1938         INIT_LIST_HEAD(cond_list);
1939         while (src.ls_str) {
1940                 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1941                 if (rc == 0) {
1942                         rc = -EINVAL;
1943                         break;
1944                 }
1945                 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1946                 if (rc)
1947                         break;
1948         }
1949         return rc;
1950 }
1951
1952 static int
1953 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1954 {
1955         int rc;
1956
1957         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1958         if (cmd->u.tc_start.ts_conds_str == NULL)
1959                 return -ENOMEM;
1960
1961         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1962
1963         /* Parse hybird NID and JOBID conditions */
1964         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1965                                  strlen(cmd->u.tc_start.ts_conds_str),
1966                                  &cmd->u.tc_start.ts_conds);
1967         if (rc)
1968                 nrs_tbf_generic_cmd_fini(cmd);
1969
1970         return rc;
1971 }
1972
1973 static int
1974 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1975
1976 static int
1977 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1978                          struct nrs_tbf_rule *rule,
1979                          struct nrs_tbf_client *cli)
1980 {
1981         switch (expr->te_field) {
1982         case NRS_TBF_FIELD_NID:
1983                 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1984         case NRS_TBF_FIELD_JOBID:
1985                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1986         case NRS_TBF_FIELD_OPCODE:
1987                 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1988         case NRS_TBF_FIELD_UID:
1989         case NRS_TBF_FIELD_GID:
1990                 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1991         default:
1992                 return 0;
1993         }
1994 }
1995
1996 static int
1997 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1998                           struct nrs_tbf_rule *rule,
1999                           struct nrs_tbf_client *cli)
2000 {
2001         struct nrs_tbf_expression *expr;
2002         int matched;
2003
2004         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
2005                 matched = nrs_tbf_expression_match(expr, rule, cli);
2006                 if (!matched)
2007                         return 0;
2008         }
2009
2010         return 1;
2011 }
2012
2013 static int
2014 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
2015 {
2016         struct nrs_tbf_conjunction *conjunction;
2017         int matched;
2018
2019         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2020                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2021                 if (matched)
2022                         return 1;
2023         }
2024
2025         return 0;
2026 }
2027
2028 static void
2029 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2030 {
2031         if (!list_empty(&rule->tr_conds))
2032                 nrs_tbf_conds_free(&rule->tr_conds);
2033         LASSERT(rule->tr_conds_str != NULL);
2034         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2035 }
2036
2037 static int
2038 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2039                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2040 {
2041         int rc = 0;
2042
2043         LASSERT(start->u.tc_start.ts_conds_str);
2044         OBD_ALLOC(rule->tr_conds_str,
2045                   strlen(start->u.tc_start.ts_conds_str) + 1);
2046         if (rule->tr_conds_str == NULL)
2047                 return -ENOMEM;
2048
2049         memcpy(rule->tr_conds_str,
2050                start->u.tc_start.ts_conds_str,
2051                strlen(start->u.tc_start.ts_conds_str));
2052
2053         INIT_LIST_HEAD(&rule->tr_conds);
2054         if (!list_empty(&start->u.tc_start.ts_conds)) {
2055                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2056                                          strlen(rule->tr_conds_str),
2057                                          &rule->tr_conds);
2058         }
2059         if (rc)
2060                 nrs_tbf_generic_rule_fini(rule);
2061
2062         return rc;
2063 }
2064
2065 static int
2066 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2067 {
2068         seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
2069                    rule->tr_conds_str, rule->tr_rpc_rate,
2070                    atomic_read(&rule->tr_ref) - 1);
2071         return 0;
2072 }
2073
2074 static int
2075 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2076                            struct nrs_tbf_client *cli)
2077 {
2078         return nrs_tbf_cond_match(rule, cli);
2079 }
2080
2081 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2082         .o_name = NRS_TBF_TYPE_GENERIC,
2083         .o_startup = nrs_tbf_startup,
2084         .o_cli_find = nrs_tbf_cli_find,
2085         .o_cli_findadd = nrs_tbf_cli_findadd,
2086         .o_cli_put = nrs_tbf_cli_put,
2087         .o_cli_init = nrs_tbf_generic_cli_init,
2088         .o_rule_init = nrs_tbf_rule_init,
2089         .o_rule_dump = nrs_tbf_generic_rule_dump,
2090         .o_rule_match = nrs_tbf_generic_rule_match,
2091         .o_rule_fini = nrs_tbf_generic_rule_fini,
2092 };
2093
2094 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2095 {
2096         if (rule->tr_opcodes != NULL)
2097                 CFS_FREE_BITMAP(rule->tr_opcodes);
2098
2099         LASSERT(rule->tr_opcodes_str != NULL);
2100         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2101 }
2102
2103 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2104                                         unsigned mask)
2105 {
2106         return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2107 }
2108
2109 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2110 {
2111         const __u32     *opc = key;
2112         struct nrs_tbf_client *cli = hlist_entry(hnode,
2113                                                  struct nrs_tbf_client,
2114                                                  tc_hnode);
2115
2116         return *opc == cli->tc_opcode;
2117 }
2118
2119 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2120 {
2121         struct nrs_tbf_client *cli = hlist_entry(hnode,
2122                                                  struct nrs_tbf_client,
2123                                                  tc_hnode);
2124
2125         return &cli->tc_opcode;
2126 }
2127
2128 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2129                                    struct hlist_node *hnode)
2130 {
2131         struct nrs_tbf_client *cli = hlist_entry(hnode,
2132                                                  struct nrs_tbf_client,
2133                                                  tc_hnode);
2134
2135         atomic_inc(&cli->tc_ref);
2136 }
2137
2138 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2139                                    struct hlist_node *hnode)
2140 {
2141         struct nrs_tbf_client *cli = hlist_entry(hnode,
2142                                                  struct nrs_tbf_client,
2143                                                  tc_hnode);
2144
2145         atomic_dec(&cli->tc_ref);
2146 }
2147
2148 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2149                                     struct hlist_node *hnode)
2150 {
2151         struct nrs_tbf_client *cli = hlist_entry(hnode,
2152                                                  struct nrs_tbf_client,
2153                                                  tc_hnode);
2154
2155         LASSERTF(atomic_read(&cli->tc_ref) == 0,
2156                  "Busy TBF object from client with opcode %s, with %d refs\n",
2157                  ll_opcode2str(cli->tc_opcode),
2158                  atomic_read(&cli->tc_ref));
2159
2160         nrs_tbf_cli_fini(cli);
2161 }
2162 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2163         .hs_hash        = nrs_tbf_opcode_hop_hash,
2164         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
2165         .hs_key         = nrs_tbf_opcode_hop_key,
2166         .hs_object      = nrs_tbf_hop_object,
2167         .hs_get         = nrs_tbf_opcode_hop_get,
2168         .hs_put         = nrs_tbf_opcode_hop_put,
2169         .hs_put_locked  = nrs_tbf_opcode_hop_put,
2170         .hs_exit        = nrs_tbf_opcode_hop_exit,
2171 };
2172
2173 static int
2174 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2175                     struct nrs_tbf_head *head)
2176 {
2177         struct nrs_tbf_cmd      start = { 0 };
2178         int rc;
2179
2180         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2181                                             NRS_TBF_NID_BITS,
2182                                             NRS_TBF_NID_BITS,
2183                                             NRS_TBF_NID_BKT_BITS, 0,
2184                                             CFS_HASH_MIN_THETA,
2185                                             CFS_HASH_MAX_THETA,
2186                                             &nrs_tbf_opcode_hash_ops,
2187                                             CFS_HASH_RW_BKTLOCK);
2188         if (head->th_cli_hash == NULL)
2189                 return -ENOMEM;
2190
2191         start.u.tc_start.ts_opcodes = NULL;
2192         start.u.tc_start.ts_opcodes_str = "*";
2193
2194         start.u.tc_start.ts_rpc_rate = tbf_rate;
2195         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2196         start.tc_name = NRS_TBF_DEFAULT_RULE;
2197         rc = nrs_tbf_rule_start(policy, head, &start);
2198
2199         return rc;
2200 }
2201
2202 static struct nrs_tbf_client *
2203 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2204                         struct ptlrpc_request *req)
2205 {
2206         __u32 opc;
2207
2208         opc = lustre_msg_get_opc(req->rq_reqmsg);
2209         return cfs_hash_lookup(head->th_cli_hash, &opc);
2210 }
2211
2212 static struct nrs_tbf_client *
2213 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2214                            struct nrs_tbf_client *cli)
2215 {
2216         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2217                                        &cli->tc_hnode);
2218 }
2219
2220 static void
2221 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2222                         struct ptlrpc_request *req)
2223 {
2224         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2225 }
2226
2227 #define MAX_OPCODE_LEN  32
2228 static int
2229 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
2230 {
2231         int     op = 0;
2232         char    opcode_str[MAX_OPCODE_LEN];
2233
2234         if (id->ls_len + 1 > MAX_OPCODE_LEN)
2235                 return -EINVAL;
2236
2237         memcpy(opcode_str, id->ls_str, id->ls_len);
2238         opcode_str[id->ls_len] = '\0';
2239
2240         op = ll_str2opcode(opcode_str);
2241         if (op < 0)
2242                 return -EINVAL;
2243
2244         cfs_bitmap_set(opcodes, op);
2245         return 0;
2246 }
2247
2248 static int
2249 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
2250 {
2251         struct cfs_bitmap *opcodes;
2252         struct cfs_lstr src;
2253         struct cfs_lstr res;
2254         int rc = 0;
2255         ENTRY;
2256
2257         opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
2258         if (opcodes == NULL)
2259                 return -ENOMEM;
2260
2261         src.ls_str = str;
2262         src.ls_len = len;
2263         while (src.ls_str) {
2264                 rc = cfs_gettok(&src, ' ', &res);
2265                 if (rc == 0) {
2266                         rc = -EINVAL;
2267                         break;
2268                 }
2269                 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2270                 if (rc)
2271                         break;
2272         }
2273
2274         if (rc == 0)
2275                 *bitmaptr = opcodes;
2276         else
2277                 CFS_FREE_BITMAP(opcodes);
2278
2279         RETURN(rc);
2280 }
2281
2282 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2283 {
2284         if (cmd->u.tc_start.ts_opcodes)
2285                 CFS_FREE_BITMAP(cmd->u.tc_start.ts_opcodes);
2286
2287         if (cmd->u.tc_start.ts_opcodes_str)
2288                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2289                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2290
2291 }
2292
2293 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2294 {
2295         struct cfs_lstr src;
2296         int rc;
2297
2298         src.ls_str = id;
2299         src.ls_len = strlen(id);
2300         rc = nrs_tbf_check_id_value(&src, "opcode");
2301         if (rc)
2302                 return rc;
2303
2304         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2305         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2306                 return -ENOMEM;
2307
2308         memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2309
2310         /* parse opcode list */
2311         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2312                                        strlen(cmd->u.tc_start.ts_opcodes_str),
2313                                        &cmd->u.tc_start.ts_opcodes);
2314         if (rc)
2315                 nrs_tbf_opcode_cmd_fini(cmd);
2316
2317         return rc;
2318 }
2319
2320 static int
2321 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2322                           struct nrs_tbf_client *cli)
2323 {
2324         if (rule->tr_opcodes == NULL)
2325                 return 0;
2326
2327         return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2328 }
2329
2330 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2331                                     struct nrs_tbf_rule *rule,
2332                                     struct nrs_tbf_cmd *start)
2333 {
2334         int rc = 0;
2335
2336         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2337         OBD_ALLOC(rule->tr_opcodes_str,
2338                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2339         if (rule->tr_opcodes_str == NULL)
2340                 return -ENOMEM;
2341
2342         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2343                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2344
2345         /* Default rule '*' */
2346         if (start->u.tc_start.ts_opcodes == NULL)
2347                 return 0;
2348
2349         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2350                                        strlen(rule->tr_opcodes_str),
2351                                        &rule->tr_opcodes);
2352         if (rc)
2353                 OBD_FREE(rule->tr_opcodes_str,
2354                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2355
2356         return rc;
2357 }
2358
2359 static int
2360 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2361 {
2362         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2363                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2364                    atomic_read(&rule->tr_ref) - 1);
2365         return 0;
2366 }
2367
2368
2369 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2370         .o_name = NRS_TBF_TYPE_OPCODE,
2371         .o_startup = nrs_tbf_opcode_startup,
2372         .o_cli_find = nrs_tbf_opcode_cli_find,
2373         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2374         .o_cli_put = nrs_tbf_nid_cli_put,
2375         .o_cli_init = nrs_tbf_opcode_cli_init,
2376         .o_rule_init = nrs_tbf_opcode_rule_init,
2377         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2378         .o_rule_match = nrs_tbf_opcode_rule_match,
2379         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2380 };
2381
2382 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2383                                     unsigned mask)
2384 {
2385         return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2386 }
2387
2388 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2389 {
2390         const struct tbf_id *opc = key;
2391         enum nrs_tbf_flag ntf;
2392         struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2393                                                  tc_hnode);
2394         ntf = opc->ti_type & cli->tc_id.ti_type;
2395         if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2396                 return 0;
2397
2398         if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2399                 return 0;
2400
2401         return 1;
2402 }
2403
2404 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2405 {
2406         struct nrs_tbf_client *cli = hlist_entry(hnode,
2407                                                  struct nrs_tbf_client,
2408                                                  tc_hnode);
2409         return &cli->tc_id;
2410 }
2411
2412 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2413 {
2414         struct nrs_tbf_client *cli = hlist_entry(hnode,
2415                                                  struct nrs_tbf_client,
2416                                                  tc_hnode);
2417
2418         atomic_inc(&cli->tc_ref);
2419 }
2420
2421 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2422 {
2423         struct nrs_tbf_client *cli = hlist_entry(hnode,
2424                                                  struct nrs_tbf_client,
2425                                                  tc_hnode);
2426
2427         atomic_dec(&cli->tc_ref);
2428 }
2429
2430 static void
2431 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2432
2433 {
2434         struct nrs_tbf_client *cli = hlist_entry(hnode,
2435                                                  struct nrs_tbf_client,
2436                                                  tc_hnode);
2437
2438         LASSERT(atomic_read(&cli->tc_ref) == 0);
2439         nrs_tbf_cli_fini(cli);
2440 }
2441
2442 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2443         .hs_hash        = nrs_tbf_id_hop_hash,
2444         .hs_keycmp      = nrs_tbf_id_hop_keycmp,
2445         .hs_key         = nrs_tbf_id_hop_key,
2446         .hs_object      = nrs_tbf_hop_object,
2447         .hs_get         = nrs_tbf_id_hop_get,
2448         .hs_put         = nrs_tbf_id_hop_put,
2449         .hs_put_locked  = nrs_tbf_id_hop_put,
2450         .hs_exit        = nrs_tbf_id_hop_exit,
2451 };
2452
2453 static int
2454 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2455                    struct nrs_tbf_head *head)
2456 {
2457         struct nrs_tbf_cmd start;
2458         int rc;
2459
2460         head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2461                                             NRS_TBF_NID_BITS,
2462                                             NRS_TBF_NID_BITS,
2463                                             NRS_TBF_NID_BKT_BITS, 0,
2464                                             CFS_HASH_MIN_THETA,
2465                                             CFS_HASH_MAX_THETA,
2466                                             &nrs_tbf_id_hash_ops,
2467                                             CFS_HASH_RW_BKTLOCK);
2468         if (head->th_cli_hash == NULL)
2469                 return -ENOMEM;
2470
2471         memset(&start, 0, sizeof(start));
2472         start.u.tc_start.ts_ids_str = "*";
2473         start.u.tc_start.ts_rpc_rate = tbf_rate;
2474         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2475         start.tc_name = NRS_TBF_DEFAULT_RULE;
2476         INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2477         rc = nrs_tbf_rule_start(policy, head, &start);
2478         if (rc) {
2479                 cfs_hash_putref(head->th_cli_hash);
2480                 head->th_cli_hash = NULL;
2481         }
2482
2483         return rc;
2484 }
2485
2486 static struct nrs_tbf_client *
2487 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2488                     struct ptlrpc_request *req)
2489 {
2490         struct tbf_id id;
2491
2492         LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2493                 head->th_type_flag == NRS_TBF_FLAG_GID);
2494
2495         nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2496         return cfs_hash_lookup(head->th_cli_hash, &id);
2497 }
2498
2499 static struct nrs_tbf_client *
2500 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2501                        struct nrs_tbf_client *cli)
2502 {
2503         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2504                                        &cli->tc_hnode);
2505 }
2506
2507 static void
2508 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2509                      struct ptlrpc_request *req)
2510 {
2511         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2512 }
2513
2514 static void
2515 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2516                      struct ptlrpc_request *req)
2517 {
2518         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2519 }
2520
2521 static int
2522 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2523 {
2524         struct nrs_tbf_id *nti_id;
2525         enum nrs_tbf_flag flag;
2526
2527         list_for_each_entry(nti_id, id_list, nti_linkage) {
2528                 flag = id.ti_type & nti_id->nti_id.ti_type;
2529                 if (!flag)
2530                         continue;
2531
2532                 if ((flag & NRS_TBF_FLAG_UID) &&
2533                     (id.ti_uid != nti_id->nti_id.ti_uid))
2534                         continue;
2535
2536                 if ((flag & NRS_TBF_FLAG_GID) &&
2537                     (id.ti_gid != nti_id->nti_id.ti_gid))
2538                         continue;
2539
2540                 return 1;
2541         }
2542         return 0;
2543 }
2544
2545 static int
2546 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2547                       struct nrs_tbf_client *cli)
2548 {
2549         return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2550 }
2551
2552 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2553 {
2554         nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2555
2556         if (cmd->u.tc_start.ts_ids_str)
2557                 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2558                          strlen(cmd->u.tc_start.ts_ids_str) + 1);
2559 }
2560
2561 static int
2562 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
2563                       enum nrs_tbf_flag tif)
2564 {
2565         struct cfs_lstr src;
2566         struct cfs_lstr res;
2567         int rc = 0;
2568         struct tbf_id id = { 0 };
2569         ENTRY;
2570
2571         if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2572                 RETURN(-EINVAL);
2573
2574         src.ls_str = str;
2575         src.ls_len = len;
2576         INIT_LIST_HEAD(id_list);
2577         while (src.ls_str) {
2578                 struct nrs_tbf_id *nti_id;
2579
2580                 if (cfs_gettok(&src, ' ', &res) == 0)
2581                         GOTO(out, rc = -EINVAL);
2582
2583                 id.ti_type = tif;
2584                 if (tif == NRS_TBF_FLAG_UID) {
2585                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2586                                                &id.ti_uid, 0, (u32)~0U))
2587                                 GOTO(out, rc = -EINVAL);
2588                 } else {
2589                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2590                                                &id.ti_gid, 0, (u32)~0U))
2591                                 GOTO(out, rc = -EINVAL);
2592                 }
2593
2594                 OBD_ALLOC_PTR(nti_id);
2595                 if (nti_id == NULL)
2596                         GOTO(out, rc = -ENOMEM);
2597
2598                 nti_id->nti_id = id;
2599                 list_add_tail(&nti_id->nti_linkage, id_list);
2600         }
2601 out:
2602         if (rc)
2603                 nrs_tbf_id_list_free(id_list);
2604         RETURN(rc);
2605 }
2606
2607 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2608 {
2609         struct cfs_lstr src;
2610         int rc;
2611         enum nrs_tbf_flag tif;
2612
2613         tif = cmd->u.tc_start.ts_valid_type;
2614
2615         src.ls_str = id;
2616         src.ls_len = strlen(id);
2617
2618         rc = nrs_tbf_check_id_value(&src,
2619                                     tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2620         if (rc)
2621                 return rc;
2622
2623         OBD_ALLOC(cmd->u.tc_start.ts_ids_str, src.ls_len + 1);
2624         if (cmd->u.tc_start.ts_ids_str == NULL)
2625                 return -ENOMEM;
2626
2627         strlcpy(cmd->u.tc_start.ts_ids_str, src.ls_str, src.ls_len + 1);
2628
2629         rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2630                                    strlen(cmd->u.tc_start.ts_ids_str),
2631                                    &cmd->u.tc_start.ts_ids, tif);
2632         if (rc)
2633                 nrs_tbf_id_cmd_fini(cmd);
2634
2635         return rc;
2636 }
2637
2638 static int
2639 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2640                      struct nrs_tbf_rule *rule,
2641                      struct nrs_tbf_cmd *start)
2642 {
2643         struct nrs_tbf_head *head = rule->tr_head;
2644         int rc = 0;
2645         enum nrs_tbf_flag tif = head->th_type_flag;
2646         int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2647
2648         LASSERT(start->u.tc_start.ts_ids_str);
2649         INIT_LIST_HEAD(&rule->tr_ids);
2650
2651         OBD_ALLOC(rule->tr_ids_str, ids_len);
2652         if (rule->tr_ids_str == NULL)
2653                 return -ENOMEM;
2654
2655         strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2656                 ids_len);
2657
2658         if (!list_empty(&start->u.tc_start.ts_ids)) {
2659                 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2660                                            strlen(rule->tr_ids_str),
2661                                            &rule->tr_ids, tif);
2662                 if (rc)
2663                         CERROR("%ss {%s} illegal\n",
2664                                tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2665                                rule->tr_ids_str);
2666         }
2667         if (rc) {
2668                 OBD_FREE(rule->tr_ids_str, ids_len);
2669                 rule->tr_ids_str = NULL;
2670         }
2671         return rc;
2672 }
2673
2674 static int
2675 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2676 {
2677         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2678                    rule->tr_ids_str, rule->tr_rpc_rate,
2679                    atomic_read(&rule->tr_ref) - 1);
2680         return 0;
2681 }
2682
2683 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2684 {
2685         nrs_tbf_id_list_free(&rule->tr_ids);
2686         if (rule->tr_ids_str != NULL)
2687                 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2688 }
2689
2690 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2691         .o_name = NRS_TBF_TYPE_UID,
2692         .o_startup = nrs_tbf_id_startup,
2693         .o_cli_find = nrs_tbf_id_cli_find,
2694         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2695         .o_cli_put = nrs_tbf_nid_cli_put,
2696         .o_cli_init = nrs_tbf_uid_cli_init,
2697         .o_rule_init = nrs_tbf_id_rule_init,
2698         .o_rule_dump = nrs_tbf_id_rule_dump,
2699         .o_rule_match = nrs_tbf_id_rule_match,
2700         .o_rule_fini = nrs_tbf_id_rule_fini,
2701 };
2702
2703 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2704         .o_name = NRS_TBF_TYPE_GID,
2705         .o_startup = nrs_tbf_id_startup,
2706         .o_cli_find = nrs_tbf_id_cli_find,
2707         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2708         .o_cli_put = nrs_tbf_nid_cli_put,
2709         .o_cli_init = nrs_tbf_gid_cli_init,
2710         .o_rule_init = nrs_tbf_id_rule_init,
2711         .o_rule_dump = nrs_tbf_id_rule_dump,
2712         .o_rule_match = nrs_tbf_id_rule_match,
2713         .o_rule_fini = nrs_tbf_id_rule_fini,
2714 };
2715
2716 static struct nrs_tbf_type nrs_tbf_types[] = {
2717         {
2718                 .ntt_name = NRS_TBF_TYPE_JOBID,
2719                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2720                 .ntt_ops = &nrs_tbf_jobid_ops,
2721         },
2722         {
2723                 .ntt_name = NRS_TBF_TYPE_NID,
2724                 .ntt_flag = NRS_TBF_FLAG_NID,
2725                 .ntt_ops = &nrs_tbf_nid_ops,
2726         },
2727         {
2728                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2729                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2730                 .ntt_ops = &nrs_tbf_opcode_ops,
2731         },
2732         {
2733                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2734                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2735                 .ntt_ops = &nrs_tbf_generic_ops,
2736         },
2737         {
2738                 .ntt_name = NRS_TBF_TYPE_UID,
2739                 .ntt_flag = NRS_TBF_FLAG_UID,
2740                 .ntt_ops = &nrs_tbf_uid_ops,
2741         },
2742         {
2743                 .ntt_name = NRS_TBF_TYPE_GID,
2744                 .ntt_flag = NRS_TBF_FLAG_GID,
2745                 .ntt_ops = &nrs_tbf_gid_ops,
2746         },
2747 };
2748
2749 /**
2750  * Is called before the policy transitions into
2751  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2752  * policy-specific private data structure.
2753  *
2754  * \param[in] policy The policy to start
2755  *
2756  * \retval -ENOMEM OOM error
2757  * \retval  0      success
2758  *
2759  * \see nrs_policy_register()
2760  * \see nrs_policy_ctl()
2761  */
2762 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2763 {
2764         struct nrs_tbf_head     *head;
2765         struct nrs_tbf_ops      *ops;
2766         __u32                    type;
2767         char                    *name;
2768         int found = 0;
2769         int i;
2770         int rc = 0;
2771
2772         if (arg == NULL)
2773                 name = NRS_TBF_TYPE_GENERIC;
2774         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2775                 name = arg;
2776         else
2777                 GOTO(out, rc = -EINVAL);
2778
2779         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2780                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2781                         ops = nrs_tbf_types[i].ntt_ops;
2782                         type = nrs_tbf_types[i].ntt_flag;
2783                         found = 1;
2784                         break;
2785                 }
2786         }
2787         if (found == 0)
2788                 GOTO(out, rc = -ENOTSUPP);
2789
2790         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2791         if (head == NULL)
2792                 GOTO(out, rc = -ENOMEM);
2793
2794         memcpy(head->th_type, name, strlen(name));
2795         head->th_type[strlen(name)] = '\0';
2796         head->th_ops = ops;
2797         head->th_type_flag = type;
2798
2799         head->th_binheap = cfs_binheap_create(&nrs_tbf_heap_ops,
2800                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2801                                               nrs_pol2cptab(policy),
2802                                               nrs_pol2cptid(policy));
2803         if (head->th_binheap == NULL)
2804                 GOTO(out_free_head, rc = -ENOMEM);
2805
2806         atomic_set(&head->th_rule_sequence, 0);
2807         spin_lock_init(&head->th_rule_lock);
2808         INIT_LIST_HEAD(&head->th_list);
2809         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2810         head->th_timer.function = nrs_tbf_timer_cb;
2811         rc = head->th_ops->o_startup(policy, head);
2812         if (rc)
2813                 GOTO(out_free_heap, rc);
2814
2815         policy->pol_private = head;
2816         return 0;
2817 out_free_heap:
2818         cfs_binheap_destroy(head->th_binheap);
2819 out_free_head:
2820         OBD_FREE_PTR(head);
2821 out:
2822         return rc;
2823 }
2824
2825 /**
2826  * Is called before the policy transitions into
2827  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2828  * private data structure.
2829  *
2830  * \param[in] policy The policy to stop
2831  *
2832  * \see nrs_policy_stop0()
2833  */
2834 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2835 {
2836         struct nrs_tbf_head *head = policy->pol_private;
2837         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2838         struct nrs_tbf_rule *rule, *n;
2839
2840         LASSERT(head != NULL);
2841         LASSERT(head->th_cli_hash != NULL);
2842         hrtimer_cancel(&head->th_timer);
2843         /* Should cleanup hash first before free rules */
2844         cfs_hash_putref(head->th_cli_hash);
2845         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2846                 list_del_init(&rule->tr_linkage);
2847                 nrs_tbf_rule_put(rule);
2848         }
2849         LASSERT(list_empty(&head->th_list));
2850         LASSERT(head->th_binheap != NULL);
2851         LASSERT(cfs_binheap_is_empty(head->th_binheap));
2852         cfs_binheap_destroy(head->th_binheap);
2853         OBD_FREE_PTR(head);
2854         nrs->nrs_throttling = 0;
2855         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2856 }
2857
2858 /**
2859  * Performs a policy-specific ctl function on TBF policy instances; similar
2860  * to ioctl.
2861  *
2862  * \param[in]     policy the policy instance
2863  * \param[in]     opc    the opcode
2864  * \param[in,out] arg    used for passing parameters and information
2865  *
2866  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2867  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2868  *
2869  * \retval 0   operation carried out successfully
2870  * \retval -ve error
2871  */
2872 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2873                        enum ptlrpc_nrs_ctl opc,
2874                        void *arg)
2875 {
2876         int rc = 0;
2877         ENTRY;
2878
2879         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2880
2881         switch ((enum nrs_ctl_tbf)opc) {
2882         default:
2883                 RETURN(-EINVAL);
2884
2885         /**
2886          * Read RPC rate size of a policy instance.
2887          */
2888         case NRS_CTL_TBF_RD_RULE: {
2889                 struct nrs_tbf_head *head = policy->pol_private;
2890                 struct seq_file *m = (struct seq_file *) arg;
2891                 struct ptlrpc_service_part *svcpt;
2892
2893                 svcpt = policy->pol_nrs->nrs_svcpt;
2894                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2895
2896                 rc = nrs_tbf_rule_dump_all(head, m);
2897                 }
2898                 break;
2899
2900         /**
2901          * Write RPC rate of a policy instance.
2902          */
2903         case NRS_CTL_TBF_WR_RULE: {
2904                 struct nrs_tbf_head *head = policy->pol_private;
2905                 struct nrs_tbf_cmd *cmd;
2906
2907                 cmd = (struct nrs_tbf_cmd *)arg;
2908                 rc = nrs_tbf_command(policy,
2909                                      head,
2910                                      cmd);
2911                 }
2912                 break;
2913         /**
2914          * Read the TBF policy type of a policy instance.
2915          */
2916         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2917                 struct nrs_tbf_head *head = policy->pol_private;
2918
2919                 *(__u32 *)arg = head->th_type_flag;
2920                 }
2921                 break;
2922         }
2923
2924         RETURN(rc);
2925 }
2926
2927 /**
2928  * Is called for obtaining a TBF policy resource.
2929  *
2930  * \param[in]  policy     The policy on which the request is being asked for
2931  * \param[in]  nrq        The request for which resources are being taken
2932  * \param[in]  parent     Parent resource, unused in this policy
2933  * \param[out] resp       Resources references are placed in this array
2934  * \param[in]  moving_req Signifies limited caller context; unused in this
2935  *                        policy
2936  *
2937  *
2938  * \see nrs_resource_get_safe()
2939  */
2940 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2941                            struct ptlrpc_nrs_request *nrq,
2942                            const struct ptlrpc_nrs_resource *parent,
2943                            struct ptlrpc_nrs_resource **resp,
2944                            bool moving_req)
2945 {
2946         struct nrs_tbf_head   *head;
2947         struct nrs_tbf_client *cli;
2948         struct nrs_tbf_client *tmp;
2949         struct ptlrpc_request *req;
2950
2951         if (parent == NULL) {
2952                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2953                 return 0;
2954         }
2955
2956         head = container_of(parent, struct nrs_tbf_head, th_res);
2957         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2958         cli = head->th_ops->o_cli_find(head, req);
2959         if (cli != NULL) {
2960                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2961                 LASSERT(cli->tc_rule);
2962                 if (cli->tc_rule_sequence !=
2963                     atomic_read(&head->th_rule_sequence) ||
2964                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2965                         struct nrs_tbf_rule *rule;
2966
2967                         CDEBUG(D_RPCTRACE,
2968                                "TBF class@%p rate %llu sequence %d, "
2969                                "rule flags %d, head sequence %d\n",
2970                                cli, cli->tc_rpc_rate,
2971                                cli->tc_rule_sequence,
2972                                cli->tc_rule->tr_flags,
2973                                atomic_read(&head->th_rule_sequence));
2974                         rule = nrs_tbf_rule_match(head, cli);
2975                         if (rule != cli->tc_rule) {
2976                                 nrs_tbf_cli_reset(head, rule, cli);
2977                         } else {
2978                                 if (cli->tc_rule_generation != rule->tr_generation)
2979                                         nrs_tbf_cli_reset_value(head, cli);
2980                                 nrs_tbf_rule_put(rule);
2981                         }
2982                 } else if (cli->tc_rule_generation !=
2983                            cli->tc_rule->tr_generation) {
2984                         nrs_tbf_cli_reset_value(head, cli);
2985                 }
2986                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2987                 goto out;
2988         }
2989
2990         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2991                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2992         if (cli == NULL)
2993                 return -ENOMEM;
2994
2995         nrs_tbf_cli_init(head, cli, req);
2996         tmp = head->th_ops->o_cli_findadd(head, cli);
2997         if (tmp != cli) {
2998                 atomic_dec(&cli->tc_ref);
2999                 nrs_tbf_cli_fini(cli);
3000                 cli = tmp;
3001         }
3002 out:
3003         *resp = &cli->tc_res;
3004
3005         return 1;
3006 }
3007
3008 /**
3009  * Called when releasing references to the resource hierachy obtained for a
3010  * request for scheduling using the TBF policy.
3011  *
3012  * \param[in] policy   the policy the resource belongs to
3013  * \param[in] res      the resource to be released
3014  */
3015 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
3016                             const struct ptlrpc_nrs_resource *res)
3017 {
3018         struct nrs_tbf_head   *head;
3019         struct nrs_tbf_client *cli;
3020
3021         /**
3022          * Do nothing for freeing parent, nrs_tbf_net resources
3023          */
3024         if (res->res_parent == NULL)
3025                 return;
3026
3027         cli = container_of(res, struct nrs_tbf_client, tc_res);
3028         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
3029
3030         head->th_ops->o_cli_put(head, cli);
3031 }
3032
3033 /**
3034  * Called when getting a request from the TBF policy for handling, or just
3035  * peeking; removes the request from the policy when it is to be handled.
3036  *
3037  * \param[in] policy The policy
3038  * \param[in] peek   When set, signifies that we just want to examine the
3039  *                   request, and not handle it, so the request is not removed
3040  *                   from the policy.
3041  * \param[in] force  Force the policy to return a request; unused in this
3042  *                   policy
3043  *
3044  * \retval The request to be handled; this is the next request in the TBF
3045  *         rule
3046  *
3047  * \see ptlrpc_nrs_req_get_nolock()
3048  * \see nrs_request_get()
3049  */
3050 static
3051 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3052                                            bool peek, bool force)
3053 {
3054         struct nrs_tbf_head       *head = policy->pol_private;
3055         struct ptlrpc_nrs_request *nrq = NULL;
3056         struct nrs_tbf_client     *cli;
3057         struct cfs_binheap_node   *node;
3058
3059         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3060
3061         if (!peek && policy->pol_nrs->nrs_throttling)
3062                 return NULL;
3063
3064         node = cfs_binheap_root(head->th_binheap);
3065         if (unlikely(node == NULL))
3066                 return NULL;
3067
3068         cli = container_of(node, struct nrs_tbf_client, tc_node);
3069         LASSERT(cli->tc_in_heap);
3070         if (peek) {
3071                 nrq = list_entry(cli->tc_list.next,
3072                                      struct ptlrpc_nrs_request,
3073                                      nr_u.tbf.tr_list);
3074         } else {
3075                 struct nrs_tbf_rule *rule = cli->tc_rule;
3076                 __u64 now = ktime_to_ns(ktime_get());
3077                 __u64 passed;
3078                 __u64 ntoken;
3079                 __u64 deadline;
3080                 __u64 old_resid = 0;
3081
3082                 deadline = cli->tc_check_time +
3083                           cli->tc_nsecs;
3084                 LASSERT(now >= cli->tc_check_time);
3085                 passed = now - cli->tc_check_time;
3086                 ntoken = passed * cli->tc_rpc_rate;
3087                 do_div(ntoken, NSEC_PER_SEC);
3088
3089                 ntoken += cli->tc_ntoken;
3090                 if (rule->tr_flags & NTRS_REALTIME) {
3091                         LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3092                         old_resid = cli->tc_nsecs_resid;
3093                         cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3094                         if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3095                                 ntoken++;
3096                                 cli->tc_nsecs_resid -= cli->tc_nsecs;
3097                         }
3098                 } else if (ntoken > cli->tc_depth)
3099                         ntoken = cli->tc_depth;
3100
3101                 if (ntoken > 0) {
3102                         struct ptlrpc_request *req;
3103                         nrq = list_entry(cli->tc_list.next,
3104                                              struct ptlrpc_nrs_request,
3105                                              nr_u.tbf.tr_list);
3106                         req = container_of(nrq,
3107                                            struct ptlrpc_request,
3108                                            rq_nrq);
3109                         ntoken--;
3110                         cli->tc_ntoken = ntoken;
3111                         cli->tc_check_time = now;
3112                         list_del_init(&nrq->nr_u.tbf.tr_list);
3113                         if (list_empty(&cli->tc_list)) {
3114                                 cfs_binheap_remove(head->th_binheap,
3115                                                    &cli->tc_node);
3116                                 cli->tc_in_heap = false;
3117                         } else {
3118                                 if (!(rule->tr_flags & NTRS_REALTIME))
3119                                         cli->tc_deadline = now + cli->tc_nsecs;
3120                                 cfs_binheap_relocate(head->th_binheap,
3121                                                      &cli->tc_node);
3122                         }
3123                         CDEBUG(D_RPCTRACE,
3124                                "TBF dequeues: class@%p rate %llu gen %llu "
3125                                "token %llu, rule@%p rate %llu gen %llu\n",
3126                                cli, cli->tc_rpc_rate,
3127                                cli->tc_rule_generation, cli->tc_ntoken,
3128                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3129                                cli->tc_rule->tr_generation);
3130                 } else {
3131                         ktime_t time;
3132
3133                         if (rule->tr_flags & NTRS_REALTIME) {
3134                                 cli->tc_deadline = deadline;
3135                                 cli->tc_nsecs_resid = old_resid;
3136                                 cfs_binheap_relocate(head->th_binheap,
3137                                                      &cli->tc_node);
3138                                 if (node != cfs_binheap_root(head->th_binheap))
3139                                         return nrs_tbf_req_get(policy,
3140                                                                peek, force);
3141                         }
3142                         policy->pol_nrs->nrs_throttling = 1;
3143                         head->th_deadline = deadline;
3144                         time = ktime_set(0, 0);
3145                         time = ktime_add_ns(time, deadline);
3146                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3147                 }
3148         }
3149
3150         return nrq;
3151 }
3152
3153 /**
3154  * Adds request \a nrq to \a policy's list of queued requests
3155  *
3156  * \param[in] policy The policy
3157  * \param[in] nrq    The request to add
3158  *
3159  * \retval 0 success; nrs_request_enqueue() assumes this function will always
3160  *                    succeed
3161  */
3162 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3163                            struct ptlrpc_nrs_request *nrq)
3164 {
3165         struct nrs_tbf_head   *head;
3166         struct nrs_tbf_client *cli;
3167         int                    rc = 0;
3168
3169         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3170
3171         cli = container_of(nrs_request_resource(nrq),
3172                            struct nrs_tbf_client, tc_res);
3173         head = container_of(nrs_request_resource(nrq)->res_parent,
3174                             struct nrs_tbf_head, th_res);
3175         if (list_empty(&cli->tc_list)) {
3176                 LASSERT(!cli->tc_in_heap);
3177                 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3178                 rc = cfs_binheap_insert(head->th_binheap, &cli->tc_node);
3179                 if (rc == 0) {
3180                         cli->tc_in_heap = true;
3181                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3182                         list_add_tail(&nrq->nr_u.tbf.tr_list,
3183                                           &cli->tc_list);
3184                         if (policy->pol_nrs->nrs_throttling) {
3185                                 __u64 deadline = cli->tc_deadline;
3186                                 if ((head->th_deadline > deadline) &&
3187                                     (hrtimer_try_to_cancel(&head->th_timer)
3188                                      >= 0)) {
3189                                         ktime_t time;
3190                                         head->th_deadline = deadline;
3191                                         time = ktime_set(0, 0);
3192                                         time = ktime_add_ns(time, deadline);
3193                                         hrtimer_start(&head->th_timer, time,
3194                                                       HRTIMER_MODE_ABS);
3195                                 }
3196                         }
3197                 }
3198         } else {
3199                 LASSERT(cli->tc_in_heap);
3200                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3201                 list_add_tail(&nrq->nr_u.tbf.tr_list,
3202                                   &cli->tc_list);
3203         }
3204
3205         if (rc == 0)
3206                 CDEBUG(D_RPCTRACE,
3207                        "TBF enqueues: class@%p rate %llu gen %llu "
3208                        "token %llu, rule@%p rate %llu gen %llu\n",
3209                        cli, cli->tc_rpc_rate,
3210                        cli->tc_rule_generation, cli->tc_ntoken,
3211                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3212                        cli->tc_rule->tr_generation);
3213
3214         return rc;
3215 }
3216
3217 /**
3218  * Removes request \a nrq from \a policy's list of queued requests.
3219  *
3220  * \param[in] policy The policy
3221  * \param[in] nrq    The request to remove
3222  */
3223 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3224                              struct ptlrpc_nrs_request *nrq)
3225 {
3226         struct nrs_tbf_head   *head;
3227         struct nrs_tbf_client *cli;
3228
3229         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3230
3231         cli = container_of(nrs_request_resource(nrq),
3232                            struct nrs_tbf_client, tc_res);
3233         head = container_of(nrs_request_resource(nrq)->res_parent,
3234                             struct nrs_tbf_head, th_res);
3235
3236         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3237         list_del_init(&nrq->nr_u.tbf.tr_list);
3238         if (list_empty(&cli->tc_list)) {
3239                 cfs_binheap_remove(head->th_binheap,
3240                                    &cli->tc_node);
3241                 cli->tc_in_heap = false;
3242         } else {
3243                 cfs_binheap_relocate(head->th_binheap,
3244                                      &cli->tc_node);
3245         }
3246 }
3247
3248 /**
3249  * Prints a debug statement right before the request \a nrq stops being
3250  * handled.
3251  *
3252  * \param[in] policy The policy handling the request
3253  * \param[in] nrq    The request being handled
3254  *
3255  * \see ptlrpc_server_finish_request()
3256  * \see ptlrpc_nrs_req_stop_nolock()
3257  */
3258 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3259                               struct ptlrpc_nrs_request *nrq)
3260 {
3261         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3262                                                   rq_nrq);
3263
3264         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3265
3266         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3267                policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
3268                nrq->nr_u.tbf.tr_sequence);
3269 }
3270
3271 /**
3272  * debugfs interface
3273  */
3274
3275 /**
3276  * The maximum RPC rate.
3277  */
3278 #define LPROCFS_NRS_RATE_MAX            65535
3279
3280 static int
3281 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3282 {
3283         struct ptlrpc_service       *svc = m->private;
3284         int                          rc;
3285
3286         seq_printf(m, "regular_requests:\n");
3287         /**
3288          * Perform two separate calls to this as only one of the NRS heads'
3289          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3290          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3291          */
3292         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3293                                        NRS_POL_NAME_TBF,
3294                                        NRS_CTL_TBF_RD_RULE,
3295                                        false, m);
3296         if (rc == 0) {
3297                 /**
3298                  * -ENOSPC means buf in the parameter m is overflow, return 0
3299                  * here to let upper layer function seq_read alloc a larger
3300                  * memory area and do this process again.
3301                  */
3302         } else if (rc == -ENOSPC) {
3303                 return 0;
3304
3305                 /**
3306                  * Ignore -ENODEV as the regular NRS head's policy may be in the
3307                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3308                  */
3309         } else if (rc != -ENODEV) {
3310                 return rc;
3311         }
3312
3313         if (!nrs_svc_has_hp(svc))
3314                 goto no_hp;
3315
3316         seq_printf(m, "high_priority_requests:\n");
3317         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3318                                        NRS_POL_NAME_TBF,
3319                                        NRS_CTL_TBF_RD_RULE,
3320                                        false, m);
3321         if (rc == 0) {
3322                 /**
3323                  * -ENOSPC means buf in the parameter m is overflow, return 0
3324                  * here to let upper layer function seq_read alloc a larger
3325                  * memory area and do this process again.
3326                  */
3327         } else if (rc == -ENOSPC) {
3328                 return 0;
3329         }
3330
3331 no_hp:
3332
3333         return rc;
3334 }
3335
3336 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3337 {
3338         int rc;
3339         ENTRY;
3340
3341         switch (cmd->u.tc_start.ts_valid_type) {
3342         case NRS_TBF_FLAG_JOBID:
3343                 rc = nrs_tbf_jobid_parse(cmd, token);
3344                 break;
3345         case NRS_TBF_FLAG_NID:
3346                 rc = nrs_tbf_nid_parse(cmd, token);
3347                 break;
3348         case NRS_TBF_FLAG_OPCODE:
3349                 rc = nrs_tbf_opcode_parse(cmd, token);
3350                 break;
3351         case NRS_TBF_FLAG_GENERIC:
3352                 rc = nrs_tbf_generic_parse(cmd, token);
3353                 break;
3354         case NRS_TBF_FLAG_UID:
3355         case NRS_TBF_FLAG_GID:
3356                 rc = nrs_tbf_ug_id_parse(cmd, token);
3357                 break;
3358         default:
3359                 RETURN(-EINVAL);
3360         }
3361
3362         RETURN(rc);
3363 }
3364
3365 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3366 {
3367         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3368                 switch (cmd->u.tc_start.ts_valid_type) {
3369                 case NRS_TBF_FLAG_JOBID:
3370                         nrs_tbf_jobid_cmd_fini(cmd);
3371                         break;
3372                 case NRS_TBF_FLAG_NID:
3373                         nrs_tbf_nid_cmd_fini(cmd);
3374                         break;
3375                 case NRS_TBF_FLAG_OPCODE:
3376                         nrs_tbf_opcode_cmd_fini(cmd);
3377                         break;
3378                 case NRS_TBF_FLAG_GENERIC:
3379                         nrs_tbf_generic_cmd_fini(cmd);
3380                         break;
3381                 case NRS_TBF_FLAG_UID:
3382                 case NRS_TBF_FLAG_GID:
3383                         nrs_tbf_id_cmd_fini(cmd);
3384                         break;
3385                 default:
3386                         CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3387                               cmd->u.tc_start.ts_valid_type);
3388                 }
3389         }
3390 }
3391
3392 static bool name_is_valid(const char *name)
3393 {
3394         int i;
3395
3396         for (i = 0; i < strlen(name); i++) {
3397                 if ((!isalnum(name[i])) &&
3398                     (name[i] != '_'))
3399                         return false;
3400         }
3401         return true;
3402 }
3403
3404 static int
3405 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3406 {
3407         char    *key;
3408         char    *val;
3409         int      rc;
3410         __u64    rate;
3411
3412         val = buffer;
3413         key = strsep(&val, "=");
3414         if (val == NULL || strlen(val) == 0)
3415                 return -EINVAL;
3416
3417         /* Key of the value pair */
3418         if (strcmp(key, "rate") == 0) {
3419                 rc = kstrtoull(val, 10, &rate);
3420                 if (rc)
3421                         return rc;
3422
3423                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3424                         return -EINVAL;
3425
3426                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3427                         cmd->u.tc_start.ts_rpc_rate = rate;
3428                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3429                         cmd->u.tc_change.tc_rpc_rate = rate;
3430                 else
3431                         return -EINVAL;
3432         }  else if (strcmp(key, "rank") == 0) {
3433                 if (!name_is_valid(val))
3434                         return -EINVAL;
3435
3436                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3437                         cmd->u.tc_start.ts_next_name = val;
3438                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3439                         cmd->u.tc_change.tc_next_name = val;
3440                 else
3441                         return -EINVAL;
3442         } else if (strcmp(key, "realtime") == 0) {
3443                 unsigned long realtime;
3444
3445                 rc = kstrtoul(val, 10, &realtime);
3446                 if (rc)
3447                         return rc;
3448
3449                 if (realtime > 0)
3450                         cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3451         } else {
3452                 return -EINVAL;
3453         }
3454         return 0;
3455 }
3456
3457 static int
3458 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3459 {
3460         char    *val;
3461         char    *token;
3462         int      rc;
3463
3464         val = buffer;
3465         while (val != NULL && strlen(val) != 0) {
3466                 token = strsep(&val, " ");
3467                 rc = nrs_tbf_parse_value_pair(cmd, token);
3468                 if (rc)
3469                         return rc;
3470         }
3471
3472         switch (cmd->tc_cmd) {
3473         case NRS_CTL_TBF_START_RULE:
3474                 if (cmd->u.tc_start.ts_rpc_rate == 0)
3475                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3476                 break;
3477         case NRS_CTL_TBF_CHANGE_RULE:
3478                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3479                     cmd->u.tc_change.tc_next_name == NULL)
3480                         return -EINVAL;
3481                 break;
3482         case NRS_CTL_TBF_STOP_RULE:
3483                 break;
3484         default:
3485                 return -EINVAL;
3486         }
3487         return 0;
3488 }
3489
3490 static struct nrs_tbf_cmd *
3491 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3492 {
3493         static struct nrs_tbf_cmd       *cmd;
3494         char                            *token;
3495         char                            *val;
3496         int                              rc = 0;
3497
3498         OBD_ALLOC_PTR(cmd);
3499         if (cmd == NULL)
3500                 GOTO(out, rc = -ENOMEM);
3501         memset(cmd, 0, sizeof(*cmd));
3502
3503         val = buffer;
3504         token = strsep(&val, " ");
3505         if (val == NULL || strlen(val) == 0)
3506                 GOTO(out_free_cmd, rc = -EINVAL);
3507
3508         /* Type of the command */
3509         if (strcmp(token, "start") == 0) {
3510                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3511                 cmd->u.tc_start.ts_valid_type = type_flag;
3512         } else if (strcmp(token, "stop") == 0)
3513                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3514         else if (strcmp(token, "change") == 0)
3515                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3516         else
3517                 GOTO(out_free_cmd, rc = -EINVAL);
3518
3519         /* Name of the rule */
3520         token = strsep(&val, " ");
3521         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) ||
3522             !name_is_valid(token))
3523                 GOTO(out_free_cmd, rc = -EINVAL);
3524         cmd->tc_name = token;
3525
3526         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3527                 /* List of ID */
3528                 LASSERT(val);
3529                 token = val;
3530                 val = strrchr(token, '}');
3531                 if (!val)
3532                         GOTO(out_free_cmd, rc = -EINVAL);
3533
3534                 /* Skip '}' */
3535                 val++;
3536                 if (*val == '\0') {
3537                         val = NULL;
3538                 } else if (*val == ' ') {
3539                         *val = '\0';
3540                         val++;
3541                 } else
3542                         GOTO(out_free_cmd, rc = -EINVAL);
3543
3544                 rc = nrs_tbf_id_parse(cmd, token);
3545                 if (rc)
3546                         GOTO(out_free_cmd, rc);
3547         }
3548
3549         rc = nrs_tbf_parse_value_pairs(cmd, val);
3550         if (rc)
3551                 GOTO(out_cmd_fini, rc = -EINVAL);
3552         goto out;
3553 out_cmd_fini:
3554         nrs_tbf_cmd_fini(cmd);
3555 out_free_cmd:
3556         OBD_FREE_PTR(cmd);
3557 out:
3558         if (rc)
3559                 cmd = ERR_PTR(rc);
3560         return cmd;
3561 }
3562
3563 /**
3564  * Get the TBF policy type (nid, jobid, etc) preset by
3565  * proc entry 'nrs_policies' for command buffer parsing.
3566  *
3567  * \param[in] svc the PTLRPC service
3568  * \param[in] queue the NRS queue type
3569  *
3570  * \retval the preset TBF policy type flag
3571  */
3572 static __u32
3573 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3574 {
3575         __u32   type;
3576         int     rc;
3577
3578         rc = ptlrpc_nrs_policy_control(svc, queue,
3579                                        NRS_POL_NAME_TBF,
3580                                        NRS_CTL_TBF_RD_TYPE_FLAG,
3581                                        true, &type);
3582         if (rc != 0)
3583                 type = NRS_TBF_FLAG_INVALID;
3584
3585         return type;
3586 }
3587
3588 extern struct nrs_core nrs_core;
3589 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3590 static ssize_t
3591 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3592                                       const char __user *buffer,
3593                                       size_t count, loff_t *off)
3594 {
3595         struct seq_file           *m = file->private_data;
3596         struct ptlrpc_service     *svc = m->private;
3597         char                      *kernbuf;
3598         char                      *val;
3599         int                        rc;
3600         static struct nrs_tbf_cmd *cmd;
3601         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3602         unsigned long              length;
3603         char                      *token;
3604
3605         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3606         if (kernbuf == NULL)
3607                 GOTO(out, rc = -ENOMEM);
3608
3609         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3610                 GOTO(out_free_kernbuff, rc = -EINVAL);
3611
3612         if (copy_from_user(kernbuf, buffer, count))
3613                 GOTO(out_free_kernbuff, rc = -EFAULT);
3614
3615         val = kernbuf;
3616         token = strsep(&val, " ");
3617         if (val == NULL)
3618                 GOTO(out_free_kernbuff, rc = -EINVAL);
3619
3620         if (strcmp(token, "reg") == 0) {
3621                 queue = PTLRPC_NRS_QUEUE_REG;
3622         } else if (strcmp(token, "hp") == 0) {
3623                 queue = PTLRPC_NRS_QUEUE_HP;
3624         } else {
3625                 kernbuf[strlen(token)] = ' ';
3626                 val = kernbuf;
3627         }
3628         length = strlen(val);
3629
3630         if (length == 0)
3631                 GOTO(out_free_kernbuff, rc = -EINVAL);
3632
3633         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3634                 GOTO(out_free_kernbuff, rc = -ENODEV);
3635         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3636                 queue = PTLRPC_NRS_QUEUE_REG;
3637
3638         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3639         if (IS_ERR(cmd))
3640                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3641
3642         /**
3643          * Serialize NRS core lprocfs operations with policy registration/
3644          * unregistration.
3645          */
3646         mutex_lock(&nrs_core.nrs_mutex);
3647         rc = ptlrpc_nrs_policy_control(svc, queue,
3648                                        NRS_POL_NAME_TBF,
3649                                        NRS_CTL_TBF_WR_RULE,
3650                                        false, cmd);
3651         mutex_unlock(&nrs_core.nrs_mutex);
3652
3653         nrs_tbf_cmd_fini(cmd);
3654         OBD_FREE_PTR(cmd);
3655 out_free_kernbuff:
3656         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3657 out:
3658         return rc ? rc : count;
3659 }
3660
3661 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3662
3663 /**
3664  * Initializes a TBF policy's lprocfs interface for service \a svc
3665  *
3666  * \param[in] svc the service
3667  *
3668  * \retval 0    success
3669  * \retval != 0 error
3670  */
3671 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3672 {
3673         struct lprocfs_vars nrs_tbf_lprocfs_vars[] = {
3674                 { .name         = "nrs_tbf_rule",
3675                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3676                   .data = svc },
3677                 { NULL }
3678         };
3679
3680         if (IS_ERR_OR_NULL(svc->srv_debugfs_entry))
3681                 return 0;
3682
3683         return ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_tbf_lprocfs_vars,
3684                                  NULL);
3685 }
3686
3687 /**
3688  * TBF policy operations
3689  */
3690 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3691         .op_policy_start        = nrs_tbf_start,
3692         .op_policy_stop         = nrs_tbf_stop,
3693         .op_policy_ctl          = nrs_tbf_ctl,
3694         .op_res_get             = nrs_tbf_res_get,
3695         .op_res_put             = nrs_tbf_res_put,
3696         .op_req_get             = nrs_tbf_req_get,
3697         .op_req_enqueue         = nrs_tbf_req_add,
3698         .op_req_dequeue         = nrs_tbf_req_del,
3699         .op_req_stop            = nrs_tbf_req_stop,
3700         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3701 };
3702
3703 /**
3704  * TBF policy configuration
3705  */
3706 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3707         .nc_name                = NRS_POL_NAME_TBF,
3708         .nc_ops                 = &nrs_tbf_ops,
3709         .nc_compat              = nrs_policy_compat_all,
3710 };
3711
3712 /** @} tbf */
3713
3714 /** @} nrs */
3715
3716 #endif /* HAVE_SERVER_SUPPORT */