Whamcloud - gitweb
LU-14366 mdt: lfs mkdir should return -EEXIST if exists
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 /**
35  * \addtogoup nrs
36  * @{
37  */
38
39 #define DEBUG_SUBSYSTEM S_RPC
40 #include <obd_support.h>
41 #include <obd_class.h>
42 #include <libcfs/libcfs.h>
43 #include <lustre_req_layout.h>
44 #include "ptlrpc_internal.h"
45
46 /**
47  * \name tbf
48  *
49  * Token Bucket Filter over client NIDs
50  *
51  * @{
52  */
53
54 #define NRS_POL_NAME_TBF        "tbf"
55
56 static int tbf_jobid_cache_size = 8192;
57 module_param(tbf_jobid_cache_size, int, 0644);
58 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
59
60 static int tbf_rate = 10000;
61 module_param(tbf_rate, int, 0644);
62 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
63
64 static int tbf_depth = 3;
65 module_param(tbf_depth, int, 0644);
66 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
67
68 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
69 {
70         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
71                                                  th_timer);
72         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
73         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
74
75         nrs->nrs_throttling = 0;
76         wake_up(&svcpt->scp_waitq);
77
78         return HRTIMER_NORESTART;
79 }
80
81 #define NRS_TBF_DEFAULT_RULE "default"
82
83 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
84 {
85         LASSERT(atomic_read(&rule->tr_ref) == 0);
86         LASSERT(list_empty(&rule->tr_cli_list));
87         LASSERT(list_empty(&rule->tr_linkage));
88
89         rule->tr_head->th_ops->o_rule_fini(rule);
90         OBD_FREE_PTR(rule);
91 }
92
93 /**
94  * Decreases the rule's usage reference count, and stops the rule in case it
95  * was already stopping and have no more outstanding usage references (which
96  * indicates it has no more queued or started requests, and can be safely
97  * stopped).
98  */
99 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
100 {
101         if (atomic_dec_and_test(&rule->tr_ref))
102                 nrs_tbf_rule_fini(rule);
103 }
104
105 /**
106  * Increases the rule's usage reference count.
107  */
108 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
109 {
110         atomic_inc(&rule->tr_ref);
111 }
112
113 static void
114 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
115 {
116         LASSERT(!list_empty(&cli->tc_linkage));
117         LASSERT(cli->tc_rule);
118         spin_lock(&cli->tc_rule->tr_rule_lock);
119         list_del_init(&cli->tc_linkage);
120         spin_unlock(&cli->tc_rule->tr_rule_lock);
121         nrs_tbf_rule_put(cli->tc_rule);
122         cli->tc_rule = NULL;
123 }
124
125 static void
126 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
127                         struct nrs_tbf_client *cli)
128
129 {
130         struct nrs_tbf_rule *rule = cli->tc_rule;
131
132         cli->tc_rpc_rate = rule->tr_rpc_rate;
133         cli->tc_nsecs = rule->tr_nsecs_per_rpc;
134         cli->tc_depth = rule->tr_depth;
135         cli->tc_ntoken = rule->tr_depth;
136         cli->tc_check_time = ktime_to_ns(ktime_get());
137         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
138         cli->tc_rule_generation = rule->tr_generation;
139
140         if (cli->tc_in_heap)
141                 binheap_relocate(head->th_binheap,
142                                  &cli->tc_node);
143 }
144
145 static void
146 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
147                   struct nrs_tbf_rule *rule,
148                   struct nrs_tbf_client *cli)
149 {
150         spin_lock(&cli->tc_rule_lock);
151         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
152                 LASSERT(rule != cli->tc_rule);
153                 nrs_tbf_cli_rule_put(cli);
154         }
155         LASSERT(cli->tc_rule == NULL);
156         LASSERT(list_empty(&cli->tc_linkage));
157         /* Rule's ref is added before called */
158         cli->tc_rule = rule;
159         spin_lock(&rule->tr_rule_lock);
160         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
161         spin_unlock(&rule->tr_rule_lock);
162         spin_unlock(&cli->tc_rule_lock);
163         nrs_tbf_cli_reset_value(head, cli);
164 }
165
166 static int
167 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
168 {
169         return rule->tr_head->th_ops->o_rule_dump(rule, m);
170 }
171
172 static int
173 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
174 {
175         struct nrs_tbf_rule *rule;
176         int rc = 0;
177
178         LASSERT(head != NULL);
179         spin_lock(&head->th_rule_lock);
180         /* List the rules from newest to oldest */
181         list_for_each_entry(rule, &head->th_list, tr_linkage) {
182                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
183                 rc = nrs_tbf_rule_dump(rule, m);
184                 if (rc) {
185                         rc = -ENOSPC;
186                         break;
187                 }
188         }
189         spin_unlock(&head->th_rule_lock);
190
191         return rc;
192 }
193
194 static struct nrs_tbf_rule *
195 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
196                          const char *name)
197 {
198         struct nrs_tbf_rule *rule;
199
200         LASSERT(head != NULL);
201         list_for_each_entry(rule, &head->th_list, tr_linkage) {
202                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
203                 if (strcmp(rule->tr_name, name) == 0) {
204                         nrs_tbf_rule_get(rule);
205                         return rule;
206                 }
207         }
208         return NULL;
209 }
210
211 static struct nrs_tbf_rule *
212 nrs_tbf_rule_find(struct nrs_tbf_head *head,
213                   const char *name)
214 {
215         struct nrs_tbf_rule *rule;
216
217         LASSERT(head != NULL);
218         spin_lock(&head->th_rule_lock);
219         rule = nrs_tbf_rule_find_nolock(head, name);
220         spin_unlock(&head->th_rule_lock);
221         return rule;
222 }
223
224 static struct nrs_tbf_rule *
225 nrs_tbf_rule_match(struct nrs_tbf_head *head,
226                    struct nrs_tbf_client *cli)
227 {
228         struct nrs_tbf_rule *rule = NULL;
229         struct nrs_tbf_rule *tmp_rule;
230
231         spin_lock(&head->th_rule_lock);
232         /* Match the newest rule in the list */
233         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
234                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
235                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
236                         rule = tmp_rule;
237                         break;
238                 }
239         }
240
241         if (rule == NULL)
242                 rule = head->th_rule;
243
244         nrs_tbf_rule_get(rule);
245         spin_unlock(&head->th_rule_lock);
246         return rule;
247 }
248
249 static void
250 nrs_tbf_cli_init(struct nrs_tbf_head *head,
251                  struct nrs_tbf_client *cli,
252                  struct ptlrpc_request *req)
253 {
254         struct nrs_tbf_rule *rule;
255
256         memset(cli, 0, sizeof(*cli));
257         cli->tc_in_heap = false;
258         head->th_ops->o_cli_init(cli, req);
259         INIT_LIST_HEAD(&cli->tc_list);
260         INIT_LIST_HEAD(&cli->tc_linkage);
261         spin_lock_init(&cli->tc_rule_lock);
262         atomic_set(&cli->tc_ref, 1);
263         rule = nrs_tbf_rule_match(head, cli);
264         nrs_tbf_cli_reset(head, rule, cli);
265 }
266
267 static void
268 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
269 {
270         LASSERT(list_empty(&cli->tc_list));
271         LASSERT(!cli->tc_in_heap);
272         LASSERT(atomic_read(&cli->tc_ref) == 0);
273         spin_lock(&cli->tc_rule_lock);
274         nrs_tbf_cli_rule_put(cli);
275         spin_unlock(&cli->tc_rule_lock);
276         OBD_FREE_PTR(cli);
277 }
278
279 static int
280 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
281                    struct nrs_tbf_head *head,
282                    struct nrs_tbf_cmd *start)
283 {
284         struct nrs_tbf_rule     *rule;
285         struct nrs_tbf_rule     *tmp_rule;
286         struct nrs_tbf_rule     *next_rule;
287         char                    *next_name = start->u.tc_start.ts_next_name;
288         int                      rc;
289
290         rule = nrs_tbf_rule_find(head, start->tc_name);
291         if (rule) {
292                 nrs_tbf_rule_put(rule);
293                 return -EEXIST;
294         }
295
296         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
297         if (rule == NULL)
298                 return -ENOMEM;
299
300         memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
301         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
302         rule->tr_flags = start->u.tc_start.ts_rule_flags;
303         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
304         rule->tr_depth = tbf_depth;
305         atomic_set(&rule->tr_ref, 1);
306         INIT_LIST_HEAD(&rule->tr_cli_list);
307         INIT_LIST_HEAD(&rule->tr_nids);
308         INIT_LIST_HEAD(&rule->tr_linkage);
309         spin_lock_init(&rule->tr_rule_lock);
310         rule->tr_head = head;
311
312         rc = head->th_ops->o_rule_init(policy, rule, start);
313         if (rc) {
314                 OBD_FREE_PTR(rule);
315                 return rc;
316         }
317
318         /* Add as the newest rule */
319         spin_lock(&head->th_rule_lock);
320         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
321         if (tmp_rule) {
322                 spin_unlock(&head->th_rule_lock);
323                 nrs_tbf_rule_put(tmp_rule);
324                 nrs_tbf_rule_put(rule);
325                 return -EEXIST;
326         }
327
328         if (next_name) {
329                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
330                 if (!next_rule) {
331                         spin_unlock(&head->th_rule_lock);
332                         nrs_tbf_rule_put(rule);
333                         return -ENOENT;
334                 }
335
336                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
337                 nrs_tbf_rule_put(next_rule);
338         } else {
339                 /* Add on the top of the rule list */
340                 list_add(&rule->tr_linkage, &head->th_list);
341         }
342         spin_unlock(&head->th_rule_lock);
343         atomic_inc(&head->th_rule_sequence);
344         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
345                 rule->tr_flags |= NTRS_DEFAULT;
346                 LASSERT(head->th_rule == NULL);
347                 head->th_rule = rule;
348         }
349
350         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %u gen %llu\n",
351                rule, rule->tr_rpc_rate, rule->tr_generation);
352
353         return 0;
354 }
355
356 /**
357  * Change the rank of a rule in the rule list
358  *
359  * The matched rule will be moved to the position right before another
360  * given rule.
361  *
362  * \param[in] policy    the policy instance
363  * \param[in] head      the TBF policy instance
364  * \param[in] name      the rule name to be moved
365  * \param[in] next_name the rule name before which the matched rule will be
366  *                      moved
367  *
368  */
369 static int
370 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
371                          struct nrs_tbf_head *head,
372                          char *name,
373                          char *next_name)
374 {
375         struct nrs_tbf_rule     *rule = NULL;
376         struct nrs_tbf_rule     *next_rule = NULL;
377         int                      rc = 0;
378
379         LASSERT(head != NULL);
380
381         spin_lock(&head->th_rule_lock);
382         rule = nrs_tbf_rule_find_nolock(head, name);
383         if (!rule)
384                 GOTO(out, rc = -ENOENT);
385
386         if (strcmp(name, next_name) == 0)
387                 GOTO(out_put, rc);
388
389         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
390         if (!next_rule)
391                 GOTO(out_put, rc = -ENOENT);
392
393         list_move(&rule->tr_linkage, next_rule->tr_linkage.prev);
394         nrs_tbf_rule_put(next_rule);
395 out_put:
396         nrs_tbf_rule_put(rule);
397 out:
398         spin_unlock(&head->th_rule_lock);
399         return rc;
400 }
401
402 static int
403 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
404                          struct nrs_tbf_head *head,
405                          char *name,
406                          __u64 rate)
407 {
408         struct nrs_tbf_rule *rule;
409
410         assert_spin_locked(&policy->pol_nrs->nrs_lock);
411
412         rule = nrs_tbf_rule_find(head, name);
413         if (rule == NULL)
414                 return -ENOENT;
415
416         rule->tr_rpc_rate = rate;
417         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
418         rule->tr_generation++;
419         nrs_tbf_rule_put(rule);
420
421         return 0;
422 }
423
424 static int
425 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
426                     struct nrs_tbf_head *head,
427                     struct nrs_tbf_cmd *change)
428 {
429         __u64    rate = change->u.tc_change.tc_rpc_rate;
430         char    *next_name = change->u.tc_change.tc_next_name;
431         int      rc;
432
433         if (rate != 0) {
434                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
435                                               rate);
436                 if (rc)
437                         return rc;
438         }
439
440         if (next_name) {
441                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
442                                               next_name);
443                 if (rc)
444                         return rc;
445         }
446
447         return 0;
448 }
449
450 static int
451 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
452                   struct nrs_tbf_head *head,
453                   struct nrs_tbf_cmd *stop)
454 {
455         struct nrs_tbf_rule *rule;
456
457         assert_spin_locked(&policy->pol_nrs->nrs_lock);
458
459         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
460                 return -EPERM;
461
462         rule = nrs_tbf_rule_find(head, stop->tc_name);
463         if (rule == NULL)
464                 return -ENOENT;
465
466         list_del_init(&rule->tr_linkage);
467         rule->tr_flags |= NTRS_STOPPING;
468         nrs_tbf_rule_put(rule);
469         nrs_tbf_rule_put(rule);
470
471         return 0;
472 }
473
474 static int
475 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
476                 struct nrs_tbf_head *head,
477                 struct nrs_tbf_cmd *cmd)
478 {
479         int rc;
480
481         assert_spin_locked(&policy->pol_nrs->nrs_lock);
482
483         switch (cmd->tc_cmd) {
484         case NRS_CTL_TBF_START_RULE:
485                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
486                         return -EINVAL;
487
488                 spin_unlock(&policy->pol_nrs->nrs_lock);
489                 rc = nrs_tbf_rule_start(policy, head, cmd);
490                 spin_lock(&policy->pol_nrs->nrs_lock);
491                 return rc;
492         case NRS_CTL_TBF_CHANGE_RULE:
493                 rc = nrs_tbf_rule_change(policy, head, cmd);
494                 return rc;
495         case NRS_CTL_TBF_STOP_RULE:
496                 rc = nrs_tbf_rule_stop(policy, head, cmd);
497                 /* Take it as a success, if not exists at all */
498                 return rc == -ENOENT ? 0 : rc;
499         default:
500                 return -EFAULT;
501         }
502 }
503
504 /**
505  * Binary heap predicate.
506  *
507  * \param[in] e1 the first binheap node to compare
508  * \param[in] e2 the second binheap node to compare
509  *
510  * \retval 0 e1 > e2
511  * \retval 1 e1 < e2
512  */
513 static int
514 tbf_cli_compare(struct binheap_node *e1, struct binheap_node *e2)
515 {
516         struct nrs_tbf_client *cli1;
517         struct nrs_tbf_client *cli2;
518
519         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
520         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
521
522         if (cli1->tc_deadline < cli2->tc_deadline)
523                 return 1;
524         else if (cli1->tc_deadline > cli2->tc_deadline)
525                 return 0;
526
527         if (cli1->tc_check_time < cli2->tc_check_time)
528                 return 1;
529         else if (cli1->tc_check_time > cli2->tc_check_time)
530                 return 0;
531
532         /* Maybe need more comparasion, e.g. request number in the rules */
533         return 1;
534 }
535
536 /**
537  * TBF binary heap operations
538  */
539 static struct binheap_ops nrs_tbf_heap_ops = {
540         .hop_enter      = NULL,
541         .hop_exit       = NULL,
542         .hop_compare    = tbf_cli_compare,
543 };
544
545 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
546                                   unsigned mask)
547 {
548         return cfs_hash_djb2_hash(key, strlen(key), mask);
549 }
550
551 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
552 {
553         struct nrs_tbf_client *cli = hlist_entry(hnode,
554                                                      struct nrs_tbf_client,
555                                                      tc_hnode);
556
557         return (strcmp(cli->tc_jobid, key) == 0);
558 }
559
560 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
561 {
562         struct nrs_tbf_client *cli = hlist_entry(hnode,
563                                                      struct nrs_tbf_client,
564                                                      tc_hnode);
565
566         return cli->tc_jobid;
567 }
568
569 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
570 {
571         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
572 }
573
574 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
575 {
576         struct nrs_tbf_client *cli = hlist_entry(hnode,
577                                                      struct nrs_tbf_client,
578                                                      tc_hnode);
579
580         atomic_inc(&cli->tc_ref);
581 }
582
583 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
584 {
585         struct nrs_tbf_client *cli = hlist_entry(hnode,
586                                                      struct nrs_tbf_client,
587                                                      tc_hnode);
588
589         atomic_dec(&cli->tc_ref);
590 }
591
592 static void
593 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
594
595 {
596         struct nrs_tbf_client *cli = hlist_entry(hnode,
597                                                  struct nrs_tbf_client,
598                                                  tc_hnode);
599
600         LASSERT(atomic_read(&cli->tc_ref) == 0);
601         nrs_tbf_cli_fini(cli);
602 }
603
604 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
605         .hs_hash        = nrs_tbf_jobid_hop_hash,
606         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
607         .hs_key         = nrs_tbf_jobid_hop_key,
608         .hs_object      = nrs_tbf_hop_object,
609         .hs_get         = nrs_tbf_jobid_hop_get,
610         .hs_put         = nrs_tbf_jobid_hop_put,
611         .hs_put_locked  = nrs_tbf_jobid_hop_put,
612         .hs_exit        = nrs_tbf_jobid_hop_exit,
613 };
614
615 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
616                                   CFS_HASH_NO_ITEMREF | \
617                                   CFS_HASH_DEPTH)
618
619 static struct nrs_tbf_client *
620 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
621                           struct cfs_hash_bd *bd,
622                           const char *jobid)
623 {
624         struct hlist_node *hnode;
625         struct nrs_tbf_client *cli;
626
627         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
628         if (hnode == NULL)
629                 return NULL;
630
631         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
632         if (!list_empty(&cli->tc_lru))
633                 list_del_init(&cli->tc_lru);
634         return cli;
635 }
636
637 #define NRS_TBF_JOBID_NULL ""
638
639 static struct nrs_tbf_client *
640 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
641                        struct ptlrpc_request *req)
642 {
643         const char              *jobid;
644         struct nrs_tbf_client   *cli;
645         struct cfs_hash         *hs = head->th_cli_hash;
646         struct cfs_hash_bd               bd;
647
648         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
649         if (jobid == NULL)
650                 jobid = NRS_TBF_JOBID_NULL;
651         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
652         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
653         cfs_hash_bd_unlock(hs, &bd, 1);
654
655         return cli;
656 }
657
658 static struct nrs_tbf_client *
659 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
660                           struct nrs_tbf_client *cli)
661 {
662         const char              *jobid;
663         struct nrs_tbf_client   *ret;
664         struct cfs_hash         *hs = head->th_cli_hash;
665         struct cfs_hash_bd               bd;
666
667         jobid = cli->tc_jobid;
668         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
669         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
670         if (ret == NULL) {
671                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
672                 ret = cli;
673         }
674         cfs_hash_bd_unlock(hs, &bd, 1);
675
676         return ret;
677 }
678
679 static void
680 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
681                       struct nrs_tbf_client *cli)
682 {
683         struct cfs_hash_bd               bd;
684         struct cfs_hash         *hs = head->th_cli_hash;
685         struct nrs_tbf_bucket   *bkt;
686         int                      hw;
687         LIST_HEAD(zombies);
688
689         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
690         bkt = cfs_hash_bd_extra_get(hs, &bd);
691         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
692                 return;
693         LASSERT(list_empty(&cli->tc_lru));
694         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
695
696         /*
697          * Check and purge the LRU, there is at least one client in the LRU.
698          */
699         hw = tbf_jobid_cache_size >>
700              (hs->hs_cur_bits - hs->hs_bkt_bits);
701         while (cfs_hash_bd_count_get(&bd) > hw) {
702                 if (unlikely(list_empty(&bkt->ntb_lru)))
703                         break;
704                 cli = list_entry(bkt->ntb_lru.next,
705                                      struct nrs_tbf_client,
706                                      tc_lru);
707                 LASSERT(atomic_read(&cli->tc_ref) == 0);
708                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
709                 list_move(&cli->tc_lru, &zombies);
710         }
711         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
712
713         while (!list_empty(&zombies)) {
714                 cli = container_of(zombies.next,
715                                    struct nrs_tbf_client, tc_lru);
716                 list_del_init(&cli->tc_lru);
717                 nrs_tbf_cli_fini(cli);
718         }
719 }
720
721 static void
722 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
723                        struct ptlrpc_request *req)
724 {
725         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
726
727         if (jobid == NULL)
728                 jobid = NRS_TBF_JOBID_NULL;
729         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
730         INIT_LIST_HEAD(&cli->tc_lru);
731         memcpy(cli->tc_jobid, jobid, strlen(jobid));
732 }
733
734 static int nrs_tbf_jobid_hash_order(void)
735 {
736         int bits;
737
738         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
739                 ;
740
741         return bits;
742 }
743
744 #define NRS_TBF_JOBID_BKT_BITS 10
745
746 static int
747 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
748                       struct nrs_tbf_head *head)
749 {
750         struct nrs_tbf_cmd       start;
751         struct nrs_tbf_bucket   *bkt;
752         int                      bits;
753         int                      i;
754         int                      rc;
755         struct cfs_hash_bd       bd;
756
757         bits = nrs_tbf_jobid_hash_order();
758         if (bits < NRS_TBF_JOBID_BKT_BITS)
759                 bits = NRS_TBF_JOBID_BKT_BITS;
760         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
761                                             bits,
762                                             bits,
763                                             NRS_TBF_JOBID_BKT_BITS,
764                                             sizeof(*bkt),
765                                             0,
766                                             0,
767                                             &nrs_tbf_jobid_hash_ops,
768                                             NRS_TBF_JOBID_HASH_FLAGS);
769         if (head->th_cli_hash == NULL)
770                 return -ENOMEM;
771
772         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
773                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
774                 INIT_LIST_HEAD(&bkt->ntb_lru);
775         }
776
777         memset(&start, 0, sizeof(start));
778         start.u.tc_start.ts_jobids_str = "*";
779
780         start.u.tc_start.ts_rpc_rate = tbf_rate;
781         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
782         start.tc_name = NRS_TBF_DEFAULT_RULE;
783         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
784         rc = nrs_tbf_rule_start(policy, head, &start);
785         if (rc) {
786                 cfs_hash_putref(head->th_cli_hash);
787                 head->th_cli_hash = NULL;
788         }
789
790         return rc;
791 }
792
793 /**
794  * Frees jobid of \a list.
795  *
796  */
797 static void
798 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
799 {
800         struct nrs_tbf_jobid *jobid, *n;
801
802         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
803                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
804                 list_del(&jobid->tj_linkage);
805                 OBD_FREE_PTR(jobid);
806         }
807 }
808
809 static int
810 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
811 {
812         struct nrs_tbf_jobid *jobid;
813         char *ptr;
814
815         OBD_ALLOC_PTR(jobid);
816         if (jobid == NULL)
817                 return -ENOMEM;
818
819         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
820         if (jobid->tj_id == NULL) {
821                 OBD_FREE_PTR(jobid);
822                 return -ENOMEM;
823         }
824
825         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
826         ptr = lprocfs_strnstr(id->ls_str, "*", id->ls_len);
827         if (ptr == NULL)
828                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
829         else
830                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
831
832         list_add_tail(&jobid->tj_linkage, jobid_list);
833         return 0;
834 }
835
836 static bool
837 cfs_match_wildcard(const char *pattern, const char *content)
838 {
839         if (*pattern == '\0' && *content == '\0')
840                 return true;
841
842         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
843                 return false;
844
845         while (*pattern == *content) {
846                 pattern++;
847                 content++;
848                 if (*pattern == '\0' && *content == '\0')
849                         return true;
850
851                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
852                     *content == '\0')
853                         return false;
854         }
855
856         if (*pattern == '*')
857                 return (cfs_match_wildcard(pattern + 1, content) ||
858                         cfs_match_wildcard(pattern, content + 1));
859
860         return false;
861 }
862
863 static inline bool
864 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
865 {
866         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
867                 return strcmp(jobid->tj_id, id) == 0;
868
869         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
870                 return cfs_match_wildcard(jobid->tj_id, id);
871
872         return false;
873 }
874
875 static int
876 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
877 {
878         struct nrs_tbf_jobid *jobid;
879
880         list_for_each_entry(jobid, jobid_list, tj_linkage) {
881                 if (nrs_tbf_jobid_match(jobid, id))
882                         return 1;
883         }
884         return 0;
885 }
886
887 static int
888 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
889 {
890         struct cfs_lstr src;
891         struct cfs_lstr res;
892         int rc = 0;
893         ENTRY;
894
895         src.ls_str = str;
896         src.ls_len = len;
897         INIT_LIST_HEAD(jobid_list);
898         while (src.ls_str) {
899                 rc = cfs_gettok(&src, ' ', &res);
900                 if (rc == 0) {
901                         rc = -EINVAL;
902                         break;
903                 }
904                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
905                 if (rc)
906                         break;
907         }
908         if (rc)
909                 nrs_tbf_jobid_list_free(jobid_list);
910         RETURN(rc);
911 }
912
913 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
914 {
915         if (!list_empty(&cmd->u.tc_start.ts_jobids))
916                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
917         if (cmd->u.tc_start.ts_jobids_str)
918                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
919                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
920 }
921
922 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
923 {
924         struct cfs_lstr res;
925         int keylen = strlen(key);
926         int rc;
927
928         rc = cfs_gettok(src, '=', &res);
929         if (rc == 0 || res.ls_len != keylen ||
930             strncmp(res.ls_str, key, keylen) != 0 ||
931             src->ls_len <= 2 || src->ls_str[0] != '{' ||
932             src->ls_str[src->ls_len - 1] != '}')
933                 return -EINVAL;
934
935         /* Skip '{' and '}' */
936         src->ls_str++;
937         src->ls_len -= 2;
938         return 0;
939 }
940
941 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
942 {
943         struct cfs_lstr src;
944         int rc;
945
946         src.ls_str = id;
947         src.ls_len = strlen(id);
948         rc = nrs_tbf_check_id_value(&src, "jobid");
949         if (rc)
950                 return rc;
951
952         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
953         if (cmd->u.tc_start.ts_jobids_str == NULL)
954                 return -ENOMEM;
955
956         memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
957
958         /* parse jobid list */
959         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
960                                       strlen(cmd->u.tc_start.ts_jobids_str),
961                                       &cmd->u.tc_start.ts_jobids);
962         if (rc)
963                 nrs_tbf_jobid_cmd_fini(cmd);
964
965         return rc;
966 }
967
968 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
969                                    struct nrs_tbf_rule *rule,
970                                    struct nrs_tbf_cmd *start)
971 {
972         int rc = 0;
973
974         LASSERT(start->u.tc_start.ts_jobids_str);
975         OBD_ALLOC(rule->tr_jobids_str,
976                   strlen(start->u.tc_start.ts_jobids_str) + 1);
977         if (rule->tr_jobids_str == NULL)
978                 return -ENOMEM;
979
980         memcpy(rule->tr_jobids_str,
981                start->u.tc_start.ts_jobids_str,
982                strlen(start->u.tc_start.ts_jobids_str));
983
984         INIT_LIST_HEAD(&rule->tr_jobids);
985         if (!list_empty(&start->u.tc_start.ts_jobids)) {
986                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
987                                               strlen(rule->tr_jobids_str),
988                                               &rule->tr_jobids);
989                 if (rc)
990                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
991         }
992         if (rc)
993                 OBD_FREE(rule->tr_jobids_str,
994                          strlen(start->u.tc_start.ts_jobids_str) + 1);
995         return rc;
996 }
997
998 static int
999 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1000 {
1001         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
1002                    rule->tr_jobids_str, rule->tr_rpc_rate,
1003                    atomic_read(&rule->tr_ref) - 1);
1004         return 0;
1005 }
1006
1007 static int
1008 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1009                          struct nrs_tbf_client *cli)
1010 {
1011         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1012 }
1013
1014 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1015 {
1016         if (!list_empty(&rule->tr_jobids))
1017                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1018         LASSERT(rule->tr_jobids_str != NULL);
1019         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1020 }
1021
1022 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1023         .o_name = NRS_TBF_TYPE_JOBID,
1024         .o_startup = nrs_tbf_jobid_startup,
1025         .o_cli_find = nrs_tbf_jobid_cli_find,
1026         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1027         .o_cli_put = nrs_tbf_jobid_cli_put,
1028         .o_cli_init = nrs_tbf_jobid_cli_init,
1029         .o_rule_init = nrs_tbf_jobid_rule_init,
1030         .o_rule_dump = nrs_tbf_jobid_rule_dump,
1031         .o_rule_match = nrs_tbf_jobid_rule_match,
1032         .o_rule_fini = nrs_tbf_jobid_rule_fini,
1033 };
1034
1035 /**
1036  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1037  *
1038  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
1039  * nrs_tbf_client objects.
1040  */
1041 #define NRS_TBF_NID_BKT_BITS    8
1042 #define NRS_TBF_NID_BITS        16
1043
1044 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1045                                   unsigned mask)
1046 {
1047         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1048 }
1049
1050 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1051 {
1052         lnet_nid_t            *nid = (lnet_nid_t *)key;
1053         struct nrs_tbf_client *cli = hlist_entry(hnode,
1054                                                      struct nrs_tbf_client,
1055                                                      tc_hnode);
1056
1057         return *nid == cli->tc_nid;
1058 }
1059
1060 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1061 {
1062         struct nrs_tbf_client *cli = hlist_entry(hnode,
1063                                                      struct nrs_tbf_client,
1064                                                      tc_hnode);
1065
1066         return &cli->tc_nid;
1067 }
1068
1069 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1070 {
1071         struct nrs_tbf_client *cli = hlist_entry(hnode,
1072                                                      struct nrs_tbf_client,
1073                                                      tc_hnode);
1074
1075         atomic_inc(&cli->tc_ref);
1076 }
1077
1078 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1079 {
1080         struct nrs_tbf_client *cli = hlist_entry(hnode,
1081                                                      struct nrs_tbf_client,
1082                                                      tc_hnode);
1083
1084         atomic_dec(&cli->tc_ref);
1085 }
1086
1087 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1088 {
1089         struct nrs_tbf_client *cli = hlist_entry(hnode,
1090                                                      struct nrs_tbf_client,
1091                                                      tc_hnode);
1092
1093         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1094                  "Busy TBF object from client with NID %s, with %d refs\n",
1095                  libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1096
1097         nrs_tbf_cli_fini(cli);
1098 }
1099
1100 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1101         .hs_hash        = nrs_tbf_nid_hop_hash,
1102         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1103         .hs_key         = nrs_tbf_nid_hop_key,
1104         .hs_object      = nrs_tbf_hop_object,
1105         .hs_get         = nrs_tbf_nid_hop_get,
1106         .hs_put         = nrs_tbf_nid_hop_put,
1107         .hs_put_locked  = nrs_tbf_nid_hop_put,
1108         .hs_exit        = nrs_tbf_nid_hop_exit,
1109 };
1110
1111 static struct nrs_tbf_client *
1112 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1113                      struct ptlrpc_request *req)
1114 {
1115         return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1116 }
1117
1118 static struct nrs_tbf_client *
1119 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1120                         struct nrs_tbf_client *cli)
1121 {
1122         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1123                                        &cli->tc_hnode);
1124 }
1125
1126 static void
1127 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1128                       struct nrs_tbf_client *cli)
1129 {
1130         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1131 }
1132
1133 static int
1134 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1135                     struct nrs_tbf_head *head)
1136 {
1137         struct nrs_tbf_cmd      start;
1138         int rc;
1139
1140         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1141                                             NRS_TBF_NID_BITS,
1142                                             NRS_TBF_NID_BITS,
1143                                             NRS_TBF_NID_BKT_BITS, 0,
1144                                             CFS_HASH_MIN_THETA,
1145                                             CFS_HASH_MAX_THETA,
1146                                             &nrs_tbf_nid_hash_ops,
1147                                             CFS_HASH_RW_BKTLOCK);
1148         if (head->th_cli_hash == NULL)
1149                 return -ENOMEM;
1150
1151         memset(&start, 0, sizeof(start));
1152         start.u.tc_start.ts_nids_str = "*";
1153
1154         start.u.tc_start.ts_rpc_rate = tbf_rate;
1155         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1156         start.tc_name = NRS_TBF_DEFAULT_RULE;
1157         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1158         rc = nrs_tbf_rule_start(policy, head, &start);
1159         if (rc) {
1160                 cfs_hash_putref(head->th_cli_hash);
1161                 head->th_cli_hash = NULL;
1162         }
1163
1164         return rc;
1165 }
1166
1167 static void
1168 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1169                              struct ptlrpc_request *req)
1170 {
1171         cli->tc_nid = req->rq_peer.nid;
1172 }
1173
1174 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1175                                  struct nrs_tbf_rule *rule,
1176                                  struct nrs_tbf_cmd *start)
1177 {
1178         LASSERT(start->u.tc_start.ts_nids_str);
1179         OBD_ALLOC(rule->tr_nids_str,
1180                   strlen(start->u.tc_start.ts_nids_str) + 1);
1181         if (rule->tr_nids_str == NULL)
1182                 return -ENOMEM;
1183
1184         memcpy(rule->tr_nids_str,
1185                start->u.tc_start.ts_nids_str,
1186                strlen(start->u.tc_start.ts_nids_str));
1187
1188         INIT_LIST_HEAD(&rule->tr_nids);
1189         if (!list_empty(&start->u.tc_start.ts_nids)) {
1190                 if (cfs_parse_nidlist(rule->tr_nids_str,
1191                                       strlen(rule->tr_nids_str),
1192                                       &rule->tr_nids) <= 0) {
1193                         CERROR("nids {%s} illegal\n",
1194                                rule->tr_nids_str);
1195                         OBD_FREE(rule->tr_nids_str,
1196                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1197                         return -EINVAL;
1198                 }
1199         }
1200         return 0;
1201 }
1202
1203 static int
1204 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1205 {
1206         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
1207                    rule->tr_nids_str, rule->tr_rpc_rate,
1208                    atomic_read(&rule->tr_ref) - 1);
1209         return 0;
1210 }
1211
1212 static int
1213 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1214                        struct nrs_tbf_client *cli)
1215 {
1216         return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1217 }
1218
1219 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1220 {
1221         if (!list_empty(&rule->tr_nids))
1222                 cfs_free_nidlist(&rule->tr_nids);
1223         LASSERT(rule->tr_nids_str != NULL);
1224         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1225 }
1226
1227 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1228 {
1229         if (!list_empty(&cmd->u.tc_start.ts_nids))
1230                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1231         if (cmd->u.tc_start.ts_nids_str)
1232                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1233                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1234 }
1235
1236 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1237 {
1238         struct cfs_lstr src;
1239         int rc;
1240
1241         src.ls_str = id;
1242         src.ls_len = strlen(id);
1243         rc = nrs_tbf_check_id_value(&src, "nid");
1244         if (rc)
1245                 return rc;
1246
1247         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1248         if (cmd->u.tc_start.ts_nids_str == NULL)
1249                 return -ENOMEM;
1250
1251         memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1252
1253         /* parse NID list */
1254         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1255                               strlen(cmd->u.tc_start.ts_nids_str),
1256                               &cmd->u.tc_start.ts_nids) <= 0) {
1257                 nrs_tbf_nid_cmd_fini(cmd);
1258                 return -EINVAL;
1259         }
1260
1261         return 0;
1262 }
1263
1264 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1265         .o_name = NRS_TBF_TYPE_NID,
1266         .o_startup = nrs_tbf_nid_startup,
1267         .o_cli_find = nrs_tbf_nid_cli_find,
1268         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1269         .o_cli_put = nrs_tbf_nid_cli_put,
1270         .o_cli_init = nrs_tbf_nid_cli_init,
1271         .o_rule_init = nrs_tbf_nid_rule_init,
1272         .o_rule_dump = nrs_tbf_nid_rule_dump,
1273         .o_rule_match = nrs_tbf_nid_rule_match,
1274         .o_rule_fini = nrs_tbf_nid_rule_fini,
1275 };
1276
1277 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1278                                  unsigned mask)
1279 {
1280         return cfs_hash_djb2_hash(key, strlen(key), mask);
1281 }
1282
1283 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1284 {
1285         struct nrs_tbf_client *cli = hlist_entry(hnode,
1286                                                  struct nrs_tbf_client,
1287                                                  tc_hnode);
1288
1289         return (strcmp(cli->tc_key, key) == 0);
1290 }
1291
1292 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1293 {
1294         struct nrs_tbf_client *cli = hlist_entry(hnode,
1295                                                  struct nrs_tbf_client,
1296                                                  tc_hnode);
1297         return cli->tc_key;
1298 }
1299
1300 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1301 {
1302         struct nrs_tbf_client *cli = hlist_entry(hnode,
1303                                                  struct nrs_tbf_client,
1304                                                  tc_hnode);
1305
1306         atomic_inc(&cli->tc_ref);
1307 }
1308
1309 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1310 {
1311         struct nrs_tbf_client *cli = hlist_entry(hnode,
1312                                                  struct nrs_tbf_client,
1313                                                  tc_hnode);
1314
1315         atomic_dec(&cli->tc_ref);
1316 }
1317
1318 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1319
1320 {
1321         struct nrs_tbf_client *cli = hlist_entry(hnode,
1322                                                  struct nrs_tbf_client,
1323                                                  tc_hnode);
1324
1325         LASSERT(atomic_read(&cli->tc_ref) == 0);
1326         nrs_tbf_cli_fini(cli);
1327 }
1328
1329 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1330         .hs_hash        = nrs_tbf_hop_hash,
1331         .hs_keycmp      = nrs_tbf_hop_keycmp,
1332         .hs_key         = nrs_tbf_hop_key,
1333         .hs_object      = nrs_tbf_hop_object,
1334         .hs_get         = nrs_tbf_hop_get,
1335         .hs_put         = nrs_tbf_hop_put,
1336         .hs_put_locked  = nrs_tbf_hop_put,
1337         .hs_exit        = nrs_tbf_hop_exit,
1338 };
1339
1340 #define NRS_TBF_GENERIC_BKT_BITS        10
1341 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1342                                         CFS_HASH_NO_ITEMREF | \
1343                                         CFS_HASH_DEPTH)
1344
1345 static int
1346 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1347 {
1348         struct nrs_tbf_cmd       start;
1349         struct nrs_tbf_bucket   *bkt;
1350         int                      bits;
1351         int                      i;
1352         int                      rc;
1353         struct cfs_hash_bd       bd;
1354
1355         bits = nrs_tbf_jobid_hash_order();
1356         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1357                 bits = NRS_TBF_GENERIC_BKT_BITS;
1358         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1359                                             bits, bits,
1360                                             NRS_TBF_GENERIC_BKT_BITS,
1361                                             sizeof(*bkt), 0, 0,
1362                                             &nrs_tbf_hash_ops,
1363                                             NRS_TBF_GENERIC_HASH_FLAGS);
1364         if (head->th_cli_hash == NULL)
1365                 return -ENOMEM;
1366
1367         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1368                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1369                 INIT_LIST_HEAD(&bkt->ntb_lru);
1370         }
1371
1372         memset(&start, 0, sizeof(start));
1373         start.u.tc_start.ts_conds_str = "*";
1374
1375         start.u.tc_start.ts_rpc_rate = tbf_rate;
1376         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1377         start.tc_name = NRS_TBF_DEFAULT_RULE;
1378         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1379         rc = nrs_tbf_rule_start(policy, head, &start);
1380         if (rc)
1381                 cfs_hash_putref(head->th_cli_hash);
1382
1383         return rc;
1384 }
1385
1386 static struct nrs_tbf_client *
1387 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1388                         const char *key)
1389 {
1390         struct hlist_node *hnode;
1391         struct nrs_tbf_client *cli;
1392
1393         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1394         if (hnode == NULL)
1395                 return NULL;
1396
1397         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
1398         if (!list_empty(&cli->tc_lru))
1399                 list_del_init(&cli->tc_lru);
1400         return cli;
1401 }
1402
1403 /**
1404  * ONLY opcode presented in this function will be checked in
1405  * nrs_tbf_id_cli_set(). That means, we can add or remove an
1406  * opcode to enable or disable requests handled in nrs_tbf
1407  */
1408 static struct req_format *req_fmt(__u32 opcode)
1409 {
1410         switch (opcode) {
1411         case OST_GETATTR:
1412                 return &RQF_OST_GETATTR;
1413         case OST_SETATTR:
1414                 return &RQF_OST_SETATTR;
1415         case OST_READ:
1416                 return &RQF_OST_BRW_READ;
1417         case OST_WRITE:
1418                 return &RQF_OST_BRW_WRITE;
1419         /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1420          * in most case. Should they be removed? */
1421         case OST_CREATE:
1422                 return &RQF_OST_CREATE;
1423         case OST_DESTROY:
1424                 return &RQF_OST_DESTROY;
1425         case OST_PUNCH:
1426                 return &RQF_OST_PUNCH;
1427         case OST_SYNC:
1428                 return &RQF_OST_SYNC;
1429         case OST_LADVISE:
1430                 return &RQF_OST_LADVISE;
1431         case MDS_GETATTR:
1432                 return &RQF_MDS_GETATTR;
1433         case MDS_GETATTR_NAME:
1434                 return &RQF_MDS_GETATTR_NAME;
1435         /* close is skipped to avoid LDLM cancel slowness */
1436 #if 0
1437         case MDS_CLOSE:
1438                 return &RQF_MDS_CLOSE;
1439 #endif
1440         case MDS_REINT:
1441                 return &RQF_MDS_REINT;
1442         case MDS_READPAGE:
1443                 return &RQF_MDS_READPAGE;
1444         case MDS_GET_ROOT:
1445                 return &RQF_MDS_GET_ROOT;
1446         case MDS_STATFS:
1447                 return &RQF_MDS_STATFS;
1448         case MDS_SYNC:
1449                 return &RQF_MDS_SYNC;
1450         case MDS_QUOTACTL:
1451                 return &RQF_MDS_QUOTACTL;
1452         case MDS_GETXATTR:
1453                 return &RQF_MDS_GETXATTR;
1454         case MDS_GET_INFO:
1455                 return &RQF_MDS_GET_INFO;
1456         /* HSM op is skipped */
1457 #if 0 
1458         case MDS_HSM_STATE_GET:
1459                 return &RQF_MDS_HSM_STATE_GET;
1460         case MDS_HSM_STATE_SET:
1461                 return &RQF_MDS_HSM_STATE_SET;
1462         case MDS_HSM_ACTION:
1463                 return &RQF_MDS_HSM_ACTION;
1464         case MDS_HSM_CT_REGISTER:
1465                 return &RQF_MDS_HSM_CT_REGISTER;
1466         case MDS_HSM_CT_UNREGISTER:
1467                 return &RQF_MDS_HSM_CT_UNREGISTER;
1468 #endif
1469         case MDS_SWAP_LAYOUTS:
1470                 return &RQF_MDS_SWAP_LAYOUTS;
1471         case LDLM_ENQUEUE:
1472                 return &RQF_LDLM_ENQUEUE;
1473         default:
1474                 return NULL;
1475         }
1476 }
1477
1478 static struct req_format *intent_req_fmt(__u32 it_opc)
1479 {
1480         if (it_opc & (IT_OPEN | IT_CREAT))
1481                 return &RQF_LDLM_INTENT_OPEN;
1482         else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1483                 return &RQF_LDLM_INTENT_GETATTR;
1484         else if (it_opc & IT_GETXATTR)
1485                 return &RQF_LDLM_INTENT_GETXATTR;
1486         else if (it_opc & (IT_GLIMPSE | IT_BRW))
1487                 return &RQF_LDLM_INTENT;
1488         else
1489                 return NULL;
1490 }
1491
1492 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1493                               struct tbf_id *id)
1494 {
1495         struct ost_body *body;
1496
1497         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1498         if (body != NULL) {
1499                 id->ti_uid = body->oa.o_uid;
1500                 id->ti_gid = body->oa.o_gid;
1501                 return 0;
1502         }
1503
1504         return -EINVAL;
1505 }
1506
1507 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1508                                       struct tbf_id *id)
1509 {
1510         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1511                                                     &RMF_MDT_BODY);
1512         LASSERT(b != NULL);
1513
1514         /* TODO: nodemaping feature converts {ug}id from individual
1515          * clients to the actual ones of the file system. Some work
1516          * may be needed to fix this. */
1517         id->ti_uid = b->mbo_uid;
1518         id->ti_gid = b->mbo_gid;
1519 }
1520
1521 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1522                                            struct tbf_id *id)
1523 {
1524         struct mdt_rec_reint *rec;
1525
1526         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1527         LASSERT(rec != NULL);
1528
1529         /* use the fs{ug}id as {ug}id of the process */
1530         id->ti_uid = rec->rr_fsuid;
1531         id->ti_gid = rec->rr_fsgid;
1532 }
1533
1534 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1535                               struct tbf_id *id)
1536 {
1537         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1538         int rc = 0;
1539
1540         switch (opc) {
1541         case MDS_GETATTR:
1542         case MDS_GETATTR_NAME:
1543         case MDS_GET_ROOT:
1544         case MDS_READPAGE:
1545         case MDS_SYNC:
1546         case MDS_GETXATTR:
1547         case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1548                 unpack_ugid_from_mdt_body(req, id);
1549                 break;
1550         case MDS_CLOSE:
1551         case MDS_REINT:
1552                 unpack_ugid_from_mdt_rec_reint(req, id);
1553                 break;
1554         default:
1555                 rc = -EINVAL;
1556                 break;
1557         }
1558         return rc;
1559 }
1560
1561 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1562                               struct tbf_id *id)
1563 {
1564         struct ldlm_intent *lit;
1565         struct req_format *fmt;
1566
1567         if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1568                 return -EINVAL;
1569
1570         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1571         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1572         if (lit == NULL)
1573                 return -EINVAL;
1574
1575         fmt = intent_req_fmt(lit->opc);
1576         if (fmt == NULL)
1577                 return -EINVAL;
1578
1579         req_capsule_extend(&req->rq_pill, fmt);
1580
1581         if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1582                 unpack_ugid_from_mdt_body(req, id);
1583         else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1584                 unpack_ugid_from_mdt_rec_reint(req, id);
1585         else
1586                 return -EINVAL;
1587         return 0;
1588 }
1589
1590 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1591                               enum nrs_tbf_flag ti_type)
1592 {
1593         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1594         struct req_format *fmt = req_fmt(opc);
1595         bool fmt_unset = false;
1596         int rc;
1597
1598         memset(id, 0, sizeof(struct tbf_id));
1599         id->ti_type = ti_type;
1600
1601         if (fmt == NULL)
1602                 return -EINVAL;
1603         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1604         if (req->rq_pill.rc_fmt == NULL) {
1605                 req_capsule_set(&req->rq_pill, fmt);
1606                 fmt_unset = true;
1607         }
1608
1609         if (opc < OST_LAST_OPC)
1610                 rc = ost_tbf_id_cli_set(req, id);
1611         else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1612                 rc = mdt_tbf_id_cli_set(req, id);
1613         else if (opc == LDLM_ENQUEUE)
1614                 rc = ldlm_tbf_id_cli_set(req, id);
1615         else
1616                 rc = -EINVAL;
1617
1618         /* restore it to the initialized state */
1619         if (fmt_unset)
1620                 req->rq_pill.rc_fmt = NULL;
1621         return rc;
1622 }
1623
1624 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1625                                        struct ptlrpc_request *req,
1626                                        char *keystr, size_t keystr_sz)
1627 {
1628         const char *jobid;
1629         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1630         struct tbf_id id;
1631
1632         nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1633         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1634         if (jobid == NULL)
1635                 jobid = NRS_TBF_JOBID_NULL;
1636
1637         snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1638                  libcfs_nid2str(req->rq_peer.nid), opc, id.ti_uid,
1639                  id.ti_gid);
1640
1641         if (cli) {
1642                 INIT_LIST_HEAD(&cli->tc_lru);
1643                 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1644                 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1645                 cli->tc_nid = req->rq_peer.nid;
1646                 cli->tc_opcode = opc;
1647                 cli->tc_id = id;
1648         }
1649 }
1650
1651 static struct nrs_tbf_client *
1652 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1653 {
1654         struct nrs_tbf_client *cli;
1655         struct cfs_hash *hs = head->th_cli_hash;
1656         struct cfs_hash_bd bd;
1657         char keystr[NRS_TBF_KEY_LEN];
1658
1659         nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1660         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1661         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1662         cfs_hash_bd_unlock(hs, &bd, 1);
1663
1664         return cli;
1665 }
1666
1667 static struct nrs_tbf_client *
1668 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1669                     struct nrs_tbf_client *cli)
1670 {
1671         const char              *key;
1672         struct nrs_tbf_client   *ret;
1673         struct cfs_hash         *hs = head->th_cli_hash;
1674         struct cfs_hash_bd       bd;
1675
1676         key = cli->tc_key;
1677         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1678         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1679         if (ret == NULL) {
1680                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1681                 ret = cli;
1682         }
1683         cfs_hash_bd_unlock(hs, &bd, 1);
1684
1685         return ret;
1686 }
1687
1688 static void
1689 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1690 {
1691         struct cfs_hash_bd       bd;
1692         struct cfs_hash         *hs = head->th_cli_hash;
1693         struct nrs_tbf_bucket   *bkt;
1694         int                      hw;
1695         LIST_HEAD(zombies);
1696
1697         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1698         bkt = cfs_hash_bd_extra_get(hs, &bd);
1699         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1700                 return;
1701         LASSERT(list_empty(&cli->tc_lru));
1702         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1703
1704         /**
1705          * Check and purge the LRU, there is at least one client in the LRU.
1706          */
1707         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1708         while (cfs_hash_bd_count_get(&bd) > hw) {
1709                 if (unlikely(list_empty(&bkt->ntb_lru)))
1710                         break;
1711                 cli = list_entry(bkt->ntb_lru.next,
1712                                  struct nrs_tbf_client,
1713                                  tc_lru);
1714                 LASSERT(atomic_read(&cli->tc_ref) == 0);
1715                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1716                 list_move(&cli->tc_lru, &zombies);
1717         }
1718         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1719
1720         while (!list_empty(&zombies)) {
1721                 cli = container_of(zombies.next,
1722                                    struct nrs_tbf_client, tc_lru);
1723                 list_del_init(&cli->tc_lru);
1724                 nrs_tbf_cli_fini(cli);
1725         }
1726 }
1727
1728 static void
1729 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1730                          struct ptlrpc_request *req)
1731 {
1732         char keystr[NRS_TBF_KEY_LEN];
1733
1734         nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1735 }
1736
1737 static void
1738 nrs_tbf_id_list_free(struct list_head *uid_list)
1739 {
1740         struct nrs_tbf_id *nti_id, *n;
1741
1742         list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1743                 list_del_init(&nti_id->nti_linkage);
1744                 OBD_FREE_PTR(nti_id);
1745         }
1746 }
1747
1748 static void
1749 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1750 {
1751         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1752                 expr->te_field < NRS_TBF_FIELD_MAX);
1753         switch (expr->te_field) {
1754         case NRS_TBF_FIELD_NID:
1755                 cfs_free_nidlist(&expr->te_cond);
1756                 break;
1757         case NRS_TBF_FIELD_JOBID:
1758                 nrs_tbf_jobid_list_free(&expr->te_cond);
1759                 break;
1760         case NRS_TBF_FIELD_OPCODE:
1761                 CFS_FREE_BITMAP(expr->te_opcodes);
1762                 break;
1763         case NRS_TBF_FIELD_UID:
1764         case NRS_TBF_FIELD_GID:
1765                 nrs_tbf_id_list_free(&expr->te_cond);
1766                 break;
1767         default:
1768                 LBUG();
1769         }
1770         OBD_FREE_PTR(expr);
1771 }
1772
1773 static void
1774 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1775 {
1776         struct nrs_tbf_expression *expression;
1777         struct nrs_tbf_expression *n;
1778
1779         LASSERT(list_empty(&conjunction->tc_linkage));
1780         list_for_each_entry_safe(expression, n,
1781                                  &conjunction->tc_expressions,
1782                                  te_linkage) {
1783                 list_del_init(&expression->te_linkage);
1784                 nrs_tbf_expression_free(expression);
1785         }
1786         OBD_FREE_PTR(conjunction);
1787 }
1788
1789 static void
1790 nrs_tbf_conds_free(struct list_head *cond_list)
1791 {
1792         struct nrs_tbf_conjunction *conjunction;
1793         struct nrs_tbf_conjunction *n;
1794
1795         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1796                 list_del_init(&conjunction->tc_linkage);
1797                 nrs_tbf_conjunction_free(conjunction);
1798         }
1799 }
1800
1801 static void
1802 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1803 {
1804         if (!list_empty(&cmd->u.tc_start.ts_conds))
1805                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1806         if (cmd->u.tc_start.ts_conds_str)
1807                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1808                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1809 }
1810
1811 #define NRS_TBF_DISJUNCTION_DELIM       (',')
1812 #define NRS_TBF_CONJUNCTION_DELIM       ('&')
1813 #define NRS_TBF_EXPRESSION_DELIM        ('=')
1814
1815 static inline bool
1816 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1817 {
1818         int len = strlen(str);
1819
1820         return (field->ls_len == len &&
1821                 strncmp(field->ls_str, str, len) == 0);
1822 }
1823
1824 static int
1825 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1826 static int
1827 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
1828                       enum nrs_tbf_flag tif);
1829
1830 static int
1831 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1832 {
1833         struct nrs_tbf_expression *expr;
1834         struct cfs_lstr field;
1835         int rc = 0;
1836
1837         OBD_ALLOC_PTR(expr);
1838         if (expr == NULL)
1839                 return -ENOMEM;
1840
1841         rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1842         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1843             src->ls_str[src->ls_len - 1] != '}')
1844                 GOTO(out, rc = -EINVAL);
1845
1846         /* Skip '{' and '}' */
1847         src->ls_str++;
1848         src->ls_len -= 2;
1849
1850         if (nrs_tbf_check_field(&field, "nid")) {
1851                 if (cfs_parse_nidlist(src->ls_str,
1852                                       src->ls_len,
1853                                       &expr->te_cond) <= 0)
1854                         GOTO(out, rc = -EINVAL);
1855                 expr->te_field = NRS_TBF_FIELD_NID;
1856         } else if (nrs_tbf_check_field(&field, "jobid")) {
1857                 if (nrs_tbf_jobid_list_parse(src->ls_str,
1858                                              src->ls_len,
1859                                              &expr->te_cond) < 0)
1860                         GOTO(out, rc = -EINVAL);
1861                 expr->te_field = NRS_TBF_FIELD_JOBID;
1862         } else if (nrs_tbf_check_field(&field, "opcode")) {
1863                 if (nrs_tbf_opcode_list_parse(src->ls_str,
1864                                               src->ls_len,
1865                                               &expr->te_opcodes) < 0)
1866                         GOTO(out, rc = -EINVAL);
1867                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1868         } else if (nrs_tbf_check_field(&field, "uid")) {
1869                 if (nrs_tbf_id_list_parse(src->ls_str,
1870                                           src->ls_len,
1871                                           &expr->te_cond,
1872                                           NRS_TBF_FLAG_UID) < 0)
1873                         GOTO(out, rc = -EINVAL);
1874                 expr->te_field = NRS_TBF_FIELD_UID;
1875         } else if (nrs_tbf_check_field(&field, "gid")) {
1876                 if (nrs_tbf_id_list_parse(src->ls_str,
1877                                           src->ls_len,
1878                                           &expr->te_cond,
1879                                           NRS_TBF_FLAG_GID) < 0)
1880                         GOTO(out, rc = -EINVAL);
1881                 expr->te_field = NRS_TBF_FIELD_GID;
1882         } else {
1883                 GOTO(out, rc = -EINVAL);
1884         }
1885
1886         list_add_tail(&expr->te_linkage, cond_list);
1887         return 0;
1888 out:
1889         OBD_FREE_PTR(expr);
1890         return rc;
1891 }
1892
1893 static int
1894 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1895 {
1896         struct nrs_tbf_conjunction *conjunction;
1897         struct cfs_lstr expr;
1898         int rc = 0;
1899
1900         OBD_ALLOC_PTR(conjunction);
1901         if (conjunction == NULL)
1902                 return -ENOMEM;
1903
1904         INIT_LIST_HEAD(&conjunction->tc_expressions);
1905         list_add_tail(&conjunction->tc_linkage, cond_list);
1906
1907         while (src->ls_str) {
1908                 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1909                 if (rc == 0) {
1910                         rc = -EINVAL;
1911                         break;
1912                 }
1913                 rc = nrs_tbf_expression_parse(&expr,
1914                                               &conjunction->tc_expressions);
1915                 if (rc)
1916                         break;
1917         }
1918         return rc;
1919 }
1920
1921 static int
1922 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1923 {
1924         struct cfs_lstr src;
1925         struct cfs_lstr res;
1926         int rc = 0;
1927
1928         src.ls_str = str;
1929         src.ls_len = len;
1930         INIT_LIST_HEAD(cond_list);
1931         while (src.ls_str) {
1932                 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1933                 if (rc == 0) {
1934                         rc = -EINVAL;
1935                         break;
1936                 }
1937                 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1938                 if (rc)
1939                         break;
1940         }
1941         return rc;
1942 }
1943
1944 static int
1945 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1946 {
1947         int rc;
1948
1949         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1950         if (cmd->u.tc_start.ts_conds_str == NULL)
1951                 return -ENOMEM;
1952
1953         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1954
1955         /* Parse hybird NID and JOBID conditions */
1956         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1957                                  strlen(cmd->u.tc_start.ts_conds_str),
1958                                  &cmd->u.tc_start.ts_conds);
1959         if (rc)
1960                 nrs_tbf_generic_cmd_fini(cmd);
1961
1962         return rc;
1963 }
1964
1965 static int
1966 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1967
1968 static int
1969 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1970                          struct nrs_tbf_rule *rule,
1971                          struct nrs_tbf_client *cli)
1972 {
1973         switch (expr->te_field) {
1974         case NRS_TBF_FIELD_NID:
1975                 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1976         case NRS_TBF_FIELD_JOBID:
1977                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1978         case NRS_TBF_FIELD_OPCODE:
1979                 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1980         case NRS_TBF_FIELD_UID:
1981         case NRS_TBF_FIELD_GID:
1982                 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1983         default:
1984                 return 0;
1985         }
1986 }
1987
1988 static int
1989 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1990                           struct nrs_tbf_rule *rule,
1991                           struct nrs_tbf_client *cli)
1992 {
1993         struct nrs_tbf_expression *expr;
1994         int matched;
1995
1996         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
1997                 matched = nrs_tbf_expression_match(expr, rule, cli);
1998                 if (!matched)
1999                         return 0;
2000         }
2001
2002         return 1;
2003 }
2004
2005 static int
2006 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
2007 {
2008         struct nrs_tbf_conjunction *conjunction;
2009         int matched;
2010
2011         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2012                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2013                 if (matched)
2014                         return 1;
2015         }
2016
2017         return 0;
2018 }
2019
2020 static void
2021 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2022 {
2023         if (!list_empty(&rule->tr_conds))
2024                 nrs_tbf_conds_free(&rule->tr_conds);
2025         LASSERT(rule->tr_conds_str != NULL);
2026         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2027 }
2028
2029 static int
2030 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2031                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2032 {
2033         int rc = 0;
2034
2035         LASSERT(start->u.tc_start.ts_conds_str);
2036         OBD_ALLOC(rule->tr_conds_str,
2037                   strlen(start->u.tc_start.ts_conds_str) + 1);
2038         if (rule->tr_conds_str == NULL)
2039                 return -ENOMEM;
2040
2041         memcpy(rule->tr_conds_str,
2042                start->u.tc_start.ts_conds_str,
2043                strlen(start->u.tc_start.ts_conds_str));
2044
2045         INIT_LIST_HEAD(&rule->tr_conds);
2046         if (!list_empty(&start->u.tc_start.ts_conds)) {
2047                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2048                                          strlen(rule->tr_conds_str),
2049                                          &rule->tr_conds);
2050         }
2051         if (rc)
2052                 nrs_tbf_generic_rule_fini(rule);
2053
2054         return rc;
2055 }
2056
2057 static int
2058 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2059 {
2060         seq_printf(m, "%s %s %u, ref %d\n", rule->tr_name,
2061                    rule->tr_conds_str, rule->tr_rpc_rate,
2062                    atomic_read(&rule->tr_ref) - 1);
2063         return 0;
2064 }
2065
2066 static int
2067 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2068                            struct nrs_tbf_client *cli)
2069 {
2070         return nrs_tbf_cond_match(rule, cli);
2071 }
2072
2073 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2074         .o_name = NRS_TBF_TYPE_GENERIC,
2075         .o_startup = nrs_tbf_startup,
2076         .o_cli_find = nrs_tbf_cli_find,
2077         .o_cli_findadd = nrs_tbf_cli_findadd,
2078         .o_cli_put = nrs_tbf_cli_put,
2079         .o_cli_init = nrs_tbf_generic_cli_init,
2080         .o_rule_init = nrs_tbf_rule_init,
2081         .o_rule_dump = nrs_tbf_generic_rule_dump,
2082         .o_rule_match = nrs_tbf_generic_rule_match,
2083         .o_rule_fini = nrs_tbf_generic_rule_fini,
2084 };
2085
2086 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2087 {
2088         if (rule->tr_opcodes != NULL)
2089                 CFS_FREE_BITMAP(rule->tr_opcodes);
2090
2091         LASSERT(rule->tr_opcodes_str != NULL);
2092         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2093 }
2094
2095 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2096                                         unsigned mask)
2097 {
2098         return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2099 }
2100
2101 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2102 {
2103         const __u32     *opc = key;
2104         struct nrs_tbf_client *cli = hlist_entry(hnode,
2105                                                  struct nrs_tbf_client,
2106                                                  tc_hnode);
2107
2108         return *opc == cli->tc_opcode;
2109 }
2110
2111 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2112 {
2113         struct nrs_tbf_client *cli = hlist_entry(hnode,
2114                                                  struct nrs_tbf_client,
2115                                                  tc_hnode);
2116
2117         return &cli->tc_opcode;
2118 }
2119
2120 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2121                                    struct hlist_node *hnode)
2122 {
2123         struct nrs_tbf_client *cli = hlist_entry(hnode,
2124                                                  struct nrs_tbf_client,
2125                                                  tc_hnode);
2126
2127         atomic_inc(&cli->tc_ref);
2128 }
2129
2130 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2131                                    struct hlist_node *hnode)
2132 {
2133         struct nrs_tbf_client *cli = hlist_entry(hnode,
2134                                                  struct nrs_tbf_client,
2135                                                  tc_hnode);
2136
2137         atomic_dec(&cli->tc_ref);
2138 }
2139
2140 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2141                                     struct hlist_node *hnode)
2142 {
2143         struct nrs_tbf_client *cli = hlist_entry(hnode,
2144                                                  struct nrs_tbf_client,
2145                                                  tc_hnode);
2146
2147         LASSERTF(atomic_read(&cli->tc_ref) == 0,
2148                  "Busy TBF object from client with opcode %s, with %d refs\n",
2149                  ll_opcode2str(cli->tc_opcode),
2150                  atomic_read(&cli->tc_ref));
2151
2152         nrs_tbf_cli_fini(cli);
2153 }
2154 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2155         .hs_hash        = nrs_tbf_opcode_hop_hash,
2156         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
2157         .hs_key         = nrs_tbf_opcode_hop_key,
2158         .hs_object      = nrs_tbf_hop_object,
2159         .hs_get         = nrs_tbf_opcode_hop_get,
2160         .hs_put         = nrs_tbf_opcode_hop_put,
2161         .hs_put_locked  = nrs_tbf_opcode_hop_put,
2162         .hs_exit        = nrs_tbf_opcode_hop_exit,
2163 };
2164
2165 static int
2166 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2167                     struct nrs_tbf_head *head)
2168 {
2169         struct nrs_tbf_cmd      start = { 0 };
2170         int rc;
2171
2172         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2173                                             NRS_TBF_NID_BITS,
2174                                             NRS_TBF_NID_BITS,
2175                                             NRS_TBF_NID_BKT_BITS, 0,
2176                                             CFS_HASH_MIN_THETA,
2177                                             CFS_HASH_MAX_THETA,
2178                                             &nrs_tbf_opcode_hash_ops,
2179                                             CFS_HASH_RW_BKTLOCK);
2180         if (head->th_cli_hash == NULL)
2181                 return -ENOMEM;
2182
2183         start.u.tc_start.ts_opcodes_str = "*";
2184
2185         start.u.tc_start.ts_rpc_rate = tbf_rate;
2186         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2187         start.tc_name = NRS_TBF_DEFAULT_RULE;
2188         rc = nrs_tbf_rule_start(policy, head, &start);
2189
2190         return rc;
2191 }
2192
2193 static struct nrs_tbf_client *
2194 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2195                         struct ptlrpc_request *req)
2196 {
2197         __u32 opc;
2198
2199         opc = lustre_msg_get_opc(req->rq_reqmsg);
2200         return cfs_hash_lookup(head->th_cli_hash, &opc);
2201 }
2202
2203 static struct nrs_tbf_client *
2204 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2205                            struct nrs_tbf_client *cli)
2206 {
2207         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2208                                        &cli->tc_hnode);
2209 }
2210
2211 static void
2212 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2213                         struct ptlrpc_request *req)
2214 {
2215         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2216 }
2217
2218 #define MAX_OPCODE_LEN  32
2219 static int
2220 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
2221 {
2222         int     op = 0;
2223         char    opcode_str[MAX_OPCODE_LEN];
2224
2225         if (id->ls_len + 1 > MAX_OPCODE_LEN)
2226                 return -EINVAL;
2227
2228         memcpy(opcode_str, id->ls_str, id->ls_len);
2229         opcode_str[id->ls_len] = '\0';
2230
2231         op = ll_str2opcode(opcode_str);
2232         if (op < 0)
2233                 return -EINVAL;
2234
2235         cfs_bitmap_set(opcodes, op);
2236         return 0;
2237 }
2238
2239 static int
2240 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
2241 {
2242         struct cfs_bitmap *opcodes;
2243         struct cfs_lstr src;
2244         struct cfs_lstr res;
2245         int rc = 0;
2246         ENTRY;
2247
2248         opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
2249         if (opcodes == NULL)
2250                 return -ENOMEM;
2251
2252         src.ls_str = str;
2253         src.ls_len = len;
2254         while (src.ls_str) {
2255                 rc = cfs_gettok(&src, ' ', &res);
2256                 if (rc == 0) {
2257                         rc = -EINVAL;
2258                         break;
2259                 }
2260                 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2261                 if (rc)
2262                         break;
2263         }
2264
2265         if (rc == 0 && bitmaptr)
2266                 *bitmaptr = opcodes;
2267         else
2268                 CFS_FREE_BITMAP(opcodes);
2269
2270         RETURN(rc);
2271 }
2272
2273 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2274 {
2275         if (cmd->u.tc_start.ts_opcodes_str)
2276                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2277                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2278
2279 }
2280
2281 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2282 {
2283         struct cfs_lstr src;
2284         int rc;
2285
2286         src.ls_str = id;
2287         src.ls_len = strlen(id);
2288         rc = nrs_tbf_check_id_value(&src, "opcode");
2289         if (rc)
2290                 return rc;
2291
2292         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2293         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2294                 return -ENOMEM;
2295
2296         memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2297
2298         /* parse opcode list */
2299         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2300                                        strlen(cmd->u.tc_start.ts_opcodes_str),
2301                                        NULL);
2302         if (rc)
2303                 nrs_tbf_opcode_cmd_fini(cmd);
2304
2305         return rc;
2306 }
2307
2308 static int
2309 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2310                           struct nrs_tbf_client *cli)
2311 {
2312         if (rule->tr_opcodes == NULL)
2313                 return 0;
2314
2315         return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2316 }
2317
2318 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2319                                     struct nrs_tbf_rule *rule,
2320                                     struct nrs_tbf_cmd *start)
2321 {
2322         int rc = 0;
2323
2324         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2325         OBD_ALLOC(rule->tr_opcodes_str,
2326                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2327         if (rule->tr_opcodes_str == NULL)
2328                 return -ENOMEM;
2329
2330         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2331                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2332
2333         /* Default rule '*' */
2334         if (strcmp(start->u.tc_start.ts_opcodes_str, "*") == 0)
2335                 return 0;
2336
2337         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2338                                        strlen(rule->tr_opcodes_str),
2339                                        &rule->tr_opcodes);
2340         if (rc)
2341                 OBD_FREE(rule->tr_opcodes_str,
2342                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2343
2344         return rc;
2345 }
2346
2347 static int
2348 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2349 {
2350         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
2351                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2352                    atomic_read(&rule->tr_ref) - 1);
2353         return 0;
2354 }
2355
2356
2357 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2358         .o_name = NRS_TBF_TYPE_OPCODE,
2359         .o_startup = nrs_tbf_opcode_startup,
2360         .o_cli_find = nrs_tbf_opcode_cli_find,
2361         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2362         .o_cli_put = nrs_tbf_nid_cli_put,
2363         .o_cli_init = nrs_tbf_opcode_cli_init,
2364         .o_rule_init = nrs_tbf_opcode_rule_init,
2365         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2366         .o_rule_match = nrs_tbf_opcode_rule_match,
2367         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2368 };
2369
2370 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2371                                     unsigned mask)
2372 {
2373         return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2374 }
2375
2376 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2377 {
2378         const struct tbf_id *opc = key;
2379         enum nrs_tbf_flag ntf;
2380         struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2381                                                  tc_hnode);
2382         ntf = opc->ti_type & cli->tc_id.ti_type;
2383         if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2384                 return 0;
2385
2386         if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2387                 return 0;
2388
2389         return 1;
2390 }
2391
2392 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2393 {
2394         struct nrs_tbf_client *cli = hlist_entry(hnode,
2395                                                  struct nrs_tbf_client,
2396                                                  tc_hnode);
2397         return &cli->tc_id;
2398 }
2399
2400 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2401 {
2402         struct nrs_tbf_client *cli = hlist_entry(hnode,
2403                                                  struct nrs_tbf_client,
2404                                                  tc_hnode);
2405
2406         atomic_inc(&cli->tc_ref);
2407 }
2408
2409 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2410 {
2411         struct nrs_tbf_client *cli = hlist_entry(hnode,
2412                                                  struct nrs_tbf_client,
2413                                                  tc_hnode);
2414
2415         atomic_dec(&cli->tc_ref);
2416 }
2417
2418 static void
2419 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2420
2421 {
2422         struct nrs_tbf_client *cli = hlist_entry(hnode,
2423                                                  struct nrs_tbf_client,
2424                                                  tc_hnode);
2425
2426         LASSERT(atomic_read(&cli->tc_ref) == 0);
2427         nrs_tbf_cli_fini(cli);
2428 }
2429
2430 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2431         .hs_hash        = nrs_tbf_id_hop_hash,
2432         .hs_keycmp      = nrs_tbf_id_hop_keycmp,
2433         .hs_key         = nrs_tbf_id_hop_key,
2434         .hs_object      = nrs_tbf_hop_object,
2435         .hs_get         = nrs_tbf_id_hop_get,
2436         .hs_put         = nrs_tbf_id_hop_put,
2437         .hs_put_locked  = nrs_tbf_id_hop_put,
2438         .hs_exit        = nrs_tbf_id_hop_exit,
2439 };
2440
2441 static int
2442 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2443                    struct nrs_tbf_head *head)
2444 {
2445         struct nrs_tbf_cmd start;
2446         int rc;
2447
2448         head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2449                                             NRS_TBF_NID_BITS,
2450                                             NRS_TBF_NID_BITS,
2451                                             NRS_TBF_NID_BKT_BITS, 0,
2452                                             CFS_HASH_MIN_THETA,
2453                                             CFS_HASH_MAX_THETA,
2454                                             &nrs_tbf_id_hash_ops,
2455                                             CFS_HASH_RW_BKTLOCK);
2456         if (head->th_cli_hash == NULL)
2457                 return -ENOMEM;
2458
2459         memset(&start, 0, sizeof(start));
2460         start.u.tc_start.ts_ids_str = "*";
2461         start.u.tc_start.ts_rpc_rate = tbf_rate;
2462         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2463         start.tc_name = NRS_TBF_DEFAULT_RULE;
2464         INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2465         rc = nrs_tbf_rule_start(policy, head, &start);
2466         if (rc) {
2467                 cfs_hash_putref(head->th_cli_hash);
2468                 head->th_cli_hash = NULL;
2469         }
2470
2471         return rc;
2472 }
2473
2474 static struct nrs_tbf_client *
2475 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2476                     struct ptlrpc_request *req)
2477 {
2478         struct tbf_id id;
2479
2480         LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2481                 head->th_type_flag == NRS_TBF_FLAG_GID);
2482
2483         nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2484         return cfs_hash_lookup(head->th_cli_hash, &id);
2485 }
2486
2487 static struct nrs_tbf_client *
2488 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2489                        struct nrs_tbf_client *cli)
2490 {
2491         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2492                                        &cli->tc_hnode);
2493 }
2494
2495 static void
2496 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2497                      struct ptlrpc_request *req)
2498 {
2499         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2500 }
2501
2502 static void
2503 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2504                      struct ptlrpc_request *req)
2505 {
2506         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2507 }
2508
2509 static int
2510 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2511 {
2512         struct nrs_tbf_id *nti_id;
2513         enum nrs_tbf_flag flag;
2514
2515         list_for_each_entry(nti_id, id_list, nti_linkage) {
2516                 flag = id.ti_type & nti_id->nti_id.ti_type;
2517                 if (!flag)
2518                         continue;
2519
2520                 if ((flag & NRS_TBF_FLAG_UID) &&
2521                     (id.ti_uid != nti_id->nti_id.ti_uid))
2522                         continue;
2523
2524                 if ((flag & NRS_TBF_FLAG_GID) &&
2525                     (id.ti_gid != nti_id->nti_id.ti_gid))
2526                         continue;
2527
2528                 return 1;
2529         }
2530         return 0;
2531 }
2532
2533 static int
2534 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2535                       struct nrs_tbf_client *cli)
2536 {
2537         return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2538 }
2539
2540 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2541 {
2542         nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2543
2544         if (cmd->u.tc_start.ts_ids_str)
2545                 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2546                          strlen(cmd->u.tc_start.ts_ids_str) + 1);
2547 }
2548
2549 static int
2550 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
2551                       enum nrs_tbf_flag tif)
2552 {
2553         struct cfs_lstr src;
2554         struct cfs_lstr res;
2555         int rc = 0;
2556         struct tbf_id id = { 0 };
2557         ENTRY;
2558
2559         if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2560                 RETURN(-EINVAL);
2561
2562         src.ls_str = str;
2563         src.ls_len = len;
2564         INIT_LIST_HEAD(id_list);
2565         while (src.ls_str) {
2566                 struct nrs_tbf_id *nti_id;
2567
2568                 if (cfs_gettok(&src, ' ', &res) == 0)
2569                         GOTO(out, rc = -EINVAL);
2570
2571                 id.ti_type = tif;
2572                 if (tif == NRS_TBF_FLAG_UID) {
2573                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2574                                                &id.ti_uid, 0, (u32)~0U))
2575                                 GOTO(out, rc = -EINVAL);
2576                 } else {
2577                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2578                                                &id.ti_gid, 0, (u32)~0U))
2579                                 GOTO(out, rc = -EINVAL);
2580                 }
2581
2582                 OBD_ALLOC_PTR(nti_id);
2583                 if (nti_id == NULL)
2584                         GOTO(out, rc = -ENOMEM);
2585
2586                 nti_id->nti_id = id;
2587                 list_add_tail(&nti_id->nti_linkage, id_list);
2588         }
2589 out:
2590         if (rc)
2591                 nrs_tbf_id_list_free(id_list);
2592         RETURN(rc);
2593 }
2594
2595 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2596 {
2597         struct cfs_lstr src;
2598         int rc;
2599         enum nrs_tbf_flag tif;
2600
2601         tif = cmd->u.tc_start.ts_valid_type;
2602
2603         src.ls_str = id;
2604         src.ls_len = strlen(id);
2605
2606         rc = nrs_tbf_check_id_value(&src,
2607                                     tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2608         if (rc)
2609                 return rc;
2610
2611         OBD_ALLOC(cmd->u.tc_start.ts_ids_str, src.ls_len + 1);
2612         if (cmd->u.tc_start.ts_ids_str == NULL)
2613                 return -ENOMEM;
2614
2615         strlcpy(cmd->u.tc_start.ts_ids_str, src.ls_str, src.ls_len + 1);
2616
2617         rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2618                                    strlen(cmd->u.tc_start.ts_ids_str),
2619                                    &cmd->u.tc_start.ts_ids, tif);
2620         if (rc)
2621                 nrs_tbf_id_cmd_fini(cmd);
2622
2623         return rc;
2624 }
2625
2626 static int
2627 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2628                      struct nrs_tbf_rule *rule,
2629                      struct nrs_tbf_cmd *start)
2630 {
2631         struct nrs_tbf_head *head = rule->tr_head;
2632         int rc = 0;
2633         enum nrs_tbf_flag tif = head->th_type_flag;
2634         int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2635
2636         LASSERT(start->u.tc_start.ts_ids_str);
2637         INIT_LIST_HEAD(&rule->tr_ids);
2638
2639         OBD_ALLOC(rule->tr_ids_str, ids_len);
2640         if (rule->tr_ids_str == NULL)
2641                 return -ENOMEM;
2642
2643         strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2644                 ids_len);
2645
2646         if (!list_empty(&start->u.tc_start.ts_ids)) {
2647                 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2648                                            strlen(rule->tr_ids_str),
2649                                            &rule->tr_ids, tif);
2650                 if (rc)
2651                         CERROR("%ss {%s} illegal\n",
2652                                tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2653                                rule->tr_ids_str);
2654         }
2655         if (rc) {
2656                 OBD_FREE(rule->tr_ids_str, ids_len);
2657                 rule->tr_ids_str = NULL;
2658         }
2659         return rc;
2660 }
2661
2662 static int
2663 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2664 {
2665         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
2666                    rule->tr_ids_str, rule->tr_rpc_rate,
2667                    atomic_read(&rule->tr_ref) - 1);
2668         return 0;
2669 }
2670
2671 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2672 {
2673         nrs_tbf_id_list_free(&rule->tr_ids);
2674         if (rule->tr_ids_str != NULL)
2675                 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2676 }
2677
2678 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2679         .o_name = NRS_TBF_TYPE_UID,
2680         .o_startup = nrs_tbf_id_startup,
2681         .o_cli_find = nrs_tbf_id_cli_find,
2682         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2683         .o_cli_put = nrs_tbf_nid_cli_put,
2684         .o_cli_init = nrs_tbf_uid_cli_init,
2685         .o_rule_init = nrs_tbf_id_rule_init,
2686         .o_rule_dump = nrs_tbf_id_rule_dump,
2687         .o_rule_match = nrs_tbf_id_rule_match,
2688         .o_rule_fini = nrs_tbf_id_rule_fini,
2689 };
2690
2691 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2692         .o_name = NRS_TBF_TYPE_GID,
2693         .o_startup = nrs_tbf_id_startup,
2694         .o_cli_find = nrs_tbf_id_cli_find,
2695         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2696         .o_cli_put = nrs_tbf_nid_cli_put,
2697         .o_cli_init = nrs_tbf_gid_cli_init,
2698         .o_rule_init = nrs_tbf_id_rule_init,
2699         .o_rule_dump = nrs_tbf_id_rule_dump,
2700         .o_rule_match = nrs_tbf_id_rule_match,
2701         .o_rule_fini = nrs_tbf_id_rule_fini,
2702 };
2703
2704 static struct nrs_tbf_type nrs_tbf_types[] = {
2705         {
2706                 .ntt_name = NRS_TBF_TYPE_JOBID,
2707                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2708                 .ntt_ops = &nrs_tbf_jobid_ops,
2709         },
2710         {
2711                 .ntt_name = NRS_TBF_TYPE_NID,
2712                 .ntt_flag = NRS_TBF_FLAG_NID,
2713                 .ntt_ops = &nrs_tbf_nid_ops,
2714         },
2715         {
2716                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2717                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2718                 .ntt_ops = &nrs_tbf_opcode_ops,
2719         },
2720         {
2721                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2722                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2723                 .ntt_ops = &nrs_tbf_generic_ops,
2724         },
2725         {
2726                 .ntt_name = NRS_TBF_TYPE_UID,
2727                 .ntt_flag = NRS_TBF_FLAG_UID,
2728                 .ntt_ops = &nrs_tbf_uid_ops,
2729         },
2730         {
2731                 .ntt_name = NRS_TBF_TYPE_GID,
2732                 .ntt_flag = NRS_TBF_FLAG_GID,
2733                 .ntt_ops = &nrs_tbf_gid_ops,
2734         },
2735 };
2736
2737 /**
2738  * Is called before the policy transitions into
2739  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2740  * policy-specific private data structure.
2741  *
2742  * \param[in] policy The policy to start
2743  *
2744  * \retval -ENOMEM OOM error
2745  * \retval  0      success
2746  *
2747  * \see nrs_policy_register()
2748  * \see nrs_policy_ctl()
2749  */
2750 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2751 {
2752         struct nrs_tbf_head     *head;
2753         struct nrs_tbf_ops      *ops;
2754         __u32                    type;
2755         char                    *name;
2756         int found = 0;
2757         int i;
2758         int rc = 0;
2759
2760         if (arg == NULL)
2761                 name = NRS_TBF_TYPE_GENERIC;
2762         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2763                 name = arg;
2764         else
2765                 GOTO(out, rc = -EINVAL);
2766
2767         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2768                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2769                         ops = nrs_tbf_types[i].ntt_ops;
2770                         type = nrs_tbf_types[i].ntt_flag;
2771                         found = 1;
2772                         break;
2773                 }
2774         }
2775         if (found == 0)
2776                 GOTO(out, rc = -ENOTSUPP);
2777
2778         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2779         if (head == NULL)
2780                 GOTO(out, rc = -ENOMEM);
2781
2782         memcpy(head->th_type, name, strlen(name));
2783         head->th_type[strlen(name)] = '\0';
2784         head->th_ops = ops;
2785         head->th_type_flag = type;
2786
2787         head->th_binheap = binheap_create(&nrs_tbf_heap_ops,
2788                                           CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2789                                           nrs_pol2cptab(policy),
2790                                           nrs_pol2cptid(policy));
2791         if (head->th_binheap == NULL)
2792                 GOTO(out_free_head, rc = -ENOMEM);
2793
2794         atomic_set(&head->th_rule_sequence, 0);
2795         spin_lock_init(&head->th_rule_lock);
2796         INIT_LIST_HEAD(&head->th_list);
2797         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2798         head->th_timer.function = nrs_tbf_timer_cb;
2799         rc = head->th_ops->o_startup(policy, head);
2800         if (rc)
2801                 GOTO(out_free_heap, rc);
2802
2803         policy->pol_private = head;
2804         return 0;
2805 out_free_heap:
2806         binheap_destroy(head->th_binheap);
2807 out_free_head:
2808         OBD_FREE_PTR(head);
2809 out:
2810         return rc;
2811 }
2812
2813 /**
2814  * Is called before the policy transitions into
2815  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2816  * private data structure.
2817  *
2818  * \param[in] policy The policy to stop
2819  *
2820  * \see nrs_policy_stop0()
2821  */
2822 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2823 {
2824         struct nrs_tbf_head *head = policy->pol_private;
2825         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2826         struct nrs_tbf_rule *rule, *n;
2827
2828         LASSERT(head != NULL);
2829         LASSERT(head->th_cli_hash != NULL);
2830         hrtimer_cancel(&head->th_timer);
2831         /* Should cleanup hash first before free rules */
2832         cfs_hash_putref(head->th_cli_hash);
2833         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2834                 list_del_init(&rule->tr_linkage);
2835                 nrs_tbf_rule_put(rule);
2836         }
2837         LASSERT(list_empty(&head->th_list));
2838         LASSERT(head->th_binheap != NULL);
2839         LASSERT(binheap_is_empty(head->th_binheap));
2840         binheap_destroy(head->th_binheap);
2841         OBD_FREE_PTR(head);
2842         nrs->nrs_throttling = 0;
2843         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2844 }
2845
2846 /**
2847  * Performs a policy-specific ctl function on TBF policy instances; similar
2848  * to ioctl.
2849  *
2850  * \param[in]     policy the policy instance
2851  * \param[in]     opc    the opcode
2852  * \param[in,out] arg    used for passing parameters and information
2853  *
2854  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2855  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2856  *
2857  * \retval 0   operation carried out successfully
2858  * \retval -ve error
2859  */
2860 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2861                        enum ptlrpc_nrs_ctl opc,
2862                        void *arg)
2863 {
2864         int rc = 0;
2865         ENTRY;
2866
2867         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2868
2869         switch ((enum nrs_ctl_tbf)opc) {
2870         default:
2871                 RETURN(-EINVAL);
2872
2873         /**
2874          * Read RPC rate size of a policy instance.
2875          */
2876         case NRS_CTL_TBF_RD_RULE: {
2877                 struct nrs_tbf_head *head = policy->pol_private;
2878                 struct seq_file *m = arg;
2879                 struct ptlrpc_service_part *svcpt;
2880
2881                 svcpt = policy->pol_nrs->nrs_svcpt;
2882                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2883
2884                 rc = nrs_tbf_rule_dump_all(head, m);
2885                 }
2886                 break;
2887
2888         /**
2889          * Write RPC rate of a policy instance.
2890          */
2891         case NRS_CTL_TBF_WR_RULE: {
2892                 struct nrs_tbf_head *head = policy->pol_private;
2893                 struct nrs_tbf_cmd *cmd;
2894
2895                 cmd = (struct nrs_tbf_cmd *)arg;
2896                 rc = nrs_tbf_command(policy,
2897                                      head,
2898                                      cmd);
2899                 }
2900                 break;
2901         /**
2902          * Read the TBF policy type of a policy instance.
2903          */
2904         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2905                 struct nrs_tbf_head *head = policy->pol_private;
2906
2907                 *(__u32 *)arg = head->th_type_flag;
2908                 }
2909                 break;
2910         }
2911
2912         RETURN(rc);
2913 }
2914
2915 /**
2916  * Is called for obtaining a TBF policy resource.
2917  *
2918  * \param[in]  policy     The policy on which the request is being asked for
2919  * \param[in]  nrq        The request for which resources are being taken
2920  * \param[in]  parent     Parent resource, unused in this policy
2921  * \param[out] resp       Resources references are placed in this array
2922  * \param[in]  moving_req Signifies limited caller context; unused in this
2923  *                        policy
2924  *
2925  *
2926  * \see nrs_resource_get_safe()
2927  */
2928 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2929                            struct ptlrpc_nrs_request *nrq,
2930                            const struct ptlrpc_nrs_resource *parent,
2931                            struct ptlrpc_nrs_resource **resp,
2932                            bool moving_req)
2933 {
2934         struct nrs_tbf_head   *head;
2935         struct nrs_tbf_client *cli;
2936         struct nrs_tbf_client *tmp;
2937         struct ptlrpc_request *req;
2938
2939         if (parent == NULL) {
2940                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2941                 return 0;
2942         }
2943
2944         head = container_of(parent, struct nrs_tbf_head, th_res);
2945         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2946         cli = head->th_ops->o_cli_find(head, req);
2947         if (cli != NULL) {
2948                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2949                 LASSERT(cli->tc_rule);
2950                 if (cli->tc_rule_sequence !=
2951                     atomic_read(&head->th_rule_sequence) ||
2952                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2953                         struct nrs_tbf_rule *rule;
2954
2955                         CDEBUG(D_RPCTRACE,
2956                                "TBF class@%p rate %u sequence %d, "
2957                                "rule flags %d, head sequence %d\n",
2958                                cli, cli->tc_rpc_rate,
2959                                cli->tc_rule_sequence,
2960                                cli->tc_rule->tr_flags,
2961                                atomic_read(&head->th_rule_sequence));
2962                         rule = nrs_tbf_rule_match(head, cli);
2963                         if (rule != cli->tc_rule) {
2964                                 nrs_tbf_cli_reset(head, rule, cli);
2965                         } else {
2966                                 if (cli->tc_rule_generation != rule->tr_generation)
2967                                         nrs_tbf_cli_reset_value(head, cli);
2968                                 nrs_tbf_rule_put(rule);
2969                         }
2970                 } else if (cli->tc_rule_generation !=
2971                            cli->tc_rule->tr_generation) {
2972                         nrs_tbf_cli_reset_value(head, cli);
2973                 }
2974                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2975                 goto out;
2976         }
2977
2978         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2979                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2980         if (cli == NULL)
2981                 return -ENOMEM;
2982
2983         nrs_tbf_cli_init(head, cli, req);
2984         tmp = head->th_ops->o_cli_findadd(head, cli);
2985         if (tmp != cli) {
2986                 atomic_dec(&cli->tc_ref);
2987                 nrs_tbf_cli_fini(cli);
2988                 cli = tmp;
2989         }
2990 out:
2991         *resp = &cli->tc_res;
2992
2993         return 1;
2994 }
2995
2996 /**
2997  * Called when releasing references to the resource hierachy obtained for a
2998  * request for scheduling using the TBF policy.
2999  *
3000  * \param[in] policy   the policy the resource belongs to
3001  * \param[in] res      the resource to be released
3002  */
3003 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
3004                             const struct ptlrpc_nrs_resource *res)
3005 {
3006         struct nrs_tbf_head   *head;
3007         struct nrs_tbf_client *cli;
3008
3009         /**
3010          * Do nothing for freeing parent, nrs_tbf_net resources
3011          */
3012         if (res->res_parent == NULL)
3013                 return;
3014
3015         cli = container_of(res, struct nrs_tbf_client, tc_res);
3016         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
3017
3018         head->th_ops->o_cli_put(head, cli);
3019 }
3020
3021 /**
3022  * Called when getting a request from the TBF policy for handling, or just
3023  * peeking; removes the request from the policy when it is to be handled.
3024  *
3025  * \param[in] policy The policy
3026  * \param[in] peek   When set, signifies that we just want to examine the
3027  *                   request, and not handle it, so the request is not removed
3028  *                   from the policy.
3029  * \param[in] force  Force the policy to return a request; unused in this
3030  *                   policy
3031  *
3032  * \retval The request to be handled; this is the next request in the TBF
3033  *         rule
3034  *
3035  * \see ptlrpc_nrs_req_get_nolock()
3036  * \see nrs_request_get()
3037  */
3038 static
3039 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3040                                            bool peek, bool force)
3041 {
3042         struct nrs_tbf_head       *head = policy->pol_private;
3043         struct ptlrpc_nrs_request *nrq = NULL;
3044         struct nrs_tbf_client     *cli;
3045         struct binheap_node       *node;
3046
3047         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3048
3049         if (!peek && policy->pol_nrs->nrs_throttling)
3050                 return NULL;
3051
3052         node = binheap_root(head->th_binheap);
3053         if (unlikely(node == NULL))
3054                 return NULL;
3055
3056         cli = container_of(node, struct nrs_tbf_client, tc_node);
3057         LASSERT(cli->tc_in_heap);
3058         if (peek) {
3059                 nrq = list_entry(cli->tc_list.next,
3060                                      struct ptlrpc_nrs_request,
3061                                      nr_u.tbf.tr_list);
3062         } else {
3063                 struct nrs_tbf_rule *rule = cli->tc_rule;
3064                 __u64 now = ktime_to_ns(ktime_get());
3065                 __u64 passed;
3066                 __u64 ntoken;
3067                 __u64 deadline;
3068                 __u64 old_resid = 0;
3069
3070                 deadline = cli->tc_check_time +
3071                           cli->tc_nsecs;
3072                 LASSERT(now >= cli->tc_check_time);
3073                 passed = now - cli->tc_check_time;
3074                 ntoken = passed * cli->tc_rpc_rate;
3075                 do_div(ntoken, NSEC_PER_SEC);
3076
3077                 ntoken += cli->tc_ntoken;
3078                 if (rule->tr_flags & NTRS_REALTIME) {
3079                         LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3080                         old_resid = cli->tc_nsecs_resid;
3081                         cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3082                         if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3083                                 ntoken++;
3084                                 cli->tc_nsecs_resid -= cli->tc_nsecs;
3085                         }
3086                 } else if (ntoken > cli->tc_depth)
3087                         ntoken = cli->tc_depth;
3088
3089                 if (ntoken > 0) {
3090                         struct ptlrpc_request *req;
3091                         nrq = list_entry(cli->tc_list.next,
3092                                              struct ptlrpc_nrs_request,
3093                                              nr_u.tbf.tr_list);
3094                         req = container_of(nrq,
3095                                            struct ptlrpc_request,
3096                                            rq_nrq);
3097                         ntoken--;
3098                         cli->tc_ntoken = ntoken;
3099                         cli->tc_check_time = now;
3100                         list_del_init(&nrq->nr_u.tbf.tr_list);
3101                         if (list_empty(&cli->tc_list)) {
3102                                 binheap_remove(head->th_binheap,
3103                                                &cli->tc_node);
3104                                 cli->tc_in_heap = false;
3105                         } else {
3106                                 if (!(rule->tr_flags & NTRS_REALTIME))
3107                                         cli->tc_deadline = now + cli->tc_nsecs;
3108                                 binheap_relocate(head->th_binheap,
3109                                                  &cli->tc_node);
3110                         }
3111                         CDEBUG(D_RPCTRACE,
3112                                "TBF dequeues: class@%p rate %u gen %llu "
3113                                "token %llu, rule@%p rate %u gen %llu\n",
3114                                cli, cli->tc_rpc_rate,
3115                                cli->tc_rule_generation, cli->tc_ntoken,
3116                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3117                                cli->tc_rule->tr_generation);
3118                 } else {
3119                         ktime_t time;
3120
3121                         if (rule->tr_flags & NTRS_REALTIME) {
3122                                 cli->tc_deadline = deadline;
3123                                 cli->tc_nsecs_resid = old_resid;
3124                                 binheap_relocate(head->th_binheap,
3125                                                  &cli->tc_node);
3126                                 if (node != binheap_root(head->th_binheap))
3127                                         return nrs_tbf_req_get(policy,
3128                                                                peek, force);
3129                         }
3130                         policy->pol_nrs->nrs_throttling = 1;
3131                         head->th_deadline = deadline;
3132                         time = ktime_set(0, 0);
3133                         time = ktime_add_ns(time, deadline);
3134                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3135                 }
3136         }
3137
3138         return nrq;
3139 }
3140
3141 /**
3142  * Adds request \a nrq to \a policy's list of queued requests
3143  *
3144  * \param[in] policy The policy
3145  * \param[in] nrq    The request to add
3146  *
3147  * \retval 0 success; nrs_request_enqueue() assumes this function will always
3148  *                    succeed
3149  */
3150 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3151                            struct ptlrpc_nrs_request *nrq)
3152 {
3153         struct nrs_tbf_head   *head;
3154         struct nrs_tbf_client *cli;
3155         int                    rc = 0;
3156
3157         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3158
3159         cli = container_of(nrs_request_resource(nrq),
3160                            struct nrs_tbf_client, tc_res);
3161         head = container_of(nrs_request_resource(nrq)->res_parent,
3162                             struct nrs_tbf_head, th_res);
3163         if (list_empty(&cli->tc_list)) {
3164                 LASSERT(!cli->tc_in_heap);
3165                 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3166                 rc = binheap_insert(head->th_binheap, &cli->tc_node);
3167                 if (rc == 0) {
3168                         cli->tc_in_heap = true;
3169                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3170                         list_add_tail(&nrq->nr_u.tbf.tr_list,
3171                                           &cli->tc_list);
3172                         if (policy->pol_nrs->nrs_throttling) {
3173                                 __u64 deadline = cli->tc_deadline;
3174                                 if ((head->th_deadline > deadline) &&
3175                                     (hrtimer_try_to_cancel(&head->th_timer)
3176                                      >= 0)) {
3177                                         ktime_t time;
3178                                         head->th_deadline = deadline;
3179                                         time = ktime_set(0, 0);
3180                                         time = ktime_add_ns(time, deadline);
3181                                         hrtimer_start(&head->th_timer, time,
3182                                                       HRTIMER_MODE_ABS);
3183                                 }
3184                         }
3185                 }
3186         } else {
3187                 LASSERT(cli->tc_in_heap);
3188                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3189                 list_add_tail(&nrq->nr_u.tbf.tr_list,
3190                                   &cli->tc_list);
3191         }
3192
3193         if (rc == 0)
3194                 CDEBUG(D_RPCTRACE,
3195                        "TBF enqueues: class@%p rate %u gen %llu "
3196                        "token %llu, rule@%p rate %u gen %llu\n",
3197                        cli, cli->tc_rpc_rate,
3198                        cli->tc_rule_generation, cli->tc_ntoken,
3199                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3200                        cli->tc_rule->tr_generation);
3201
3202         return rc;
3203 }
3204
3205 /**
3206  * Removes request \a nrq from \a policy's list of queued requests.
3207  *
3208  * \param[in] policy The policy
3209  * \param[in] nrq    The request to remove
3210  */
3211 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3212                              struct ptlrpc_nrs_request *nrq)
3213 {
3214         struct nrs_tbf_head   *head;
3215         struct nrs_tbf_client *cli;
3216
3217         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3218
3219         cli = container_of(nrs_request_resource(nrq),
3220                            struct nrs_tbf_client, tc_res);
3221         head = container_of(nrs_request_resource(nrq)->res_parent,
3222                             struct nrs_tbf_head, th_res);
3223
3224         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3225         list_del_init(&nrq->nr_u.tbf.tr_list);
3226         if (list_empty(&cli->tc_list)) {
3227                 binheap_remove(head->th_binheap,
3228                                &cli->tc_node);
3229                 cli->tc_in_heap = false;
3230         } else {
3231                 binheap_relocate(head->th_binheap,
3232                                  &cli->tc_node);
3233         }
3234 }
3235
3236 /**
3237  * Prints a debug statement right before the request \a nrq stops being
3238  * handled.
3239  *
3240  * \param[in] policy The policy handling the request
3241  * \param[in] nrq    The request being handled
3242  *
3243  * \see ptlrpc_server_finish_request()
3244  * \see ptlrpc_nrs_req_stop_nolock()
3245  */
3246 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3247                               struct ptlrpc_nrs_request *nrq)
3248 {
3249         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3250                                                   rq_nrq);
3251
3252         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3253
3254         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3255                policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
3256                nrq->nr_u.tbf.tr_sequence);
3257 }
3258
3259 /**
3260  * debugfs interface
3261  */
3262
3263 /**
3264  * The maximum RPC rate.
3265  */
3266 #define LPROCFS_NRS_RATE_MAX            65535
3267
3268 static int
3269 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3270 {
3271         struct ptlrpc_service       *svc = m->private;
3272         int                          rc;
3273
3274         seq_printf(m, "regular_requests:\n");
3275         /**
3276          * Perform two separate calls to this as only one of the NRS heads'
3277          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3278          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3279          */
3280         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3281                                        NRS_POL_NAME_TBF,
3282                                        NRS_CTL_TBF_RD_RULE,
3283                                        false, m);
3284         if (rc == 0) {
3285                 /**
3286                  * -ENOSPC means buf in the parameter m is overflow, return 0
3287                  * here to let upper layer function seq_read alloc a larger
3288                  * memory area and do this process again.
3289                  */
3290         } else if (rc == -ENOSPC) {
3291                 return 0;
3292
3293                 /**
3294                  * Ignore -ENODEV as the regular NRS head's policy may be in the
3295                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3296                  */
3297         } else if (rc != -ENODEV) {
3298                 return rc;
3299         }
3300
3301         if (!nrs_svc_has_hp(svc))
3302                 goto no_hp;
3303
3304         seq_printf(m, "high_priority_requests:\n");
3305         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3306                                        NRS_POL_NAME_TBF,
3307                                        NRS_CTL_TBF_RD_RULE,
3308                                        false, m);
3309         if (rc == 0) {
3310                 /**
3311                  * -ENOSPC means buf in the parameter m is overflow, return 0
3312                  * here to let upper layer function seq_read alloc a larger
3313                  * memory area and do this process again.
3314                  */
3315         } else if (rc == -ENOSPC) {
3316                 return 0;
3317         }
3318
3319 no_hp:
3320
3321         return rc;
3322 }
3323
3324 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3325 {
3326         int rc;
3327         ENTRY;
3328
3329         switch (cmd->u.tc_start.ts_valid_type) {
3330         case NRS_TBF_FLAG_JOBID:
3331                 rc = nrs_tbf_jobid_parse(cmd, token);
3332                 break;
3333         case NRS_TBF_FLAG_NID:
3334                 rc = nrs_tbf_nid_parse(cmd, token);
3335                 break;
3336         case NRS_TBF_FLAG_OPCODE:
3337                 rc = nrs_tbf_opcode_parse(cmd, token);
3338                 break;
3339         case NRS_TBF_FLAG_GENERIC:
3340                 rc = nrs_tbf_generic_parse(cmd, token);
3341                 break;
3342         case NRS_TBF_FLAG_UID:
3343         case NRS_TBF_FLAG_GID:
3344                 rc = nrs_tbf_ug_id_parse(cmd, token);
3345                 break;
3346         default:
3347                 RETURN(-EINVAL);
3348         }
3349
3350         RETURN(rc);
3351 }
3352
3353 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3354 {
3355         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3356                 switch (cmd->u.tc_start.ts_valid_type) {
3357                 case NRS_TBF_FLAG_JOBID:
3358                         nrs_tbf_jobid_cmd_fini(cmd);
3359                         break;
3360                 case NRS_TBF_FLAG_NID:
3361                         nrs_tbf_nid_cmd_fini(cmd);
3362                         break;
3363                 case NRS_TBF_FLAG_OPCODE:
3364                         nrs_tbf_opcode_cmd_fini(cmd);
3365                         break;
3366                 case NRS_TBF_FLAG_GENERIC:
3367                         nrs_tbf_generic_cmd_fini(cmd);
3368                         break;
3369                 case NRS_TBF_FLAG_UID:
3370                 case NRS_TBF_FLAG_GID:
3371                         nrs_tbf_id_cmd_fini(cmd);
3372                         break;
3373                 default:
3374                         CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3375                               cmd->u.tc_start.ts_valid_type);
3376                 }
3377         }
3378 }
3379
3380 static bool name_is_valid(const char *name)
3381 {
3382         int i;
3383
3384         for (i = 0; i < strlen(name); i++) {
3385                 if ((!isalnum(name[i])) &&
3386                     (name[i] != '_'))
3387                         return false;
3388         }
3389         return true;
3390 }
3391
3392 static int
3393 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3394 {
3395         char    *key;
3396         char    *val;
3397         int      rc;
3398         __u64    rate;
3399
3400         val = buffer;
3401         key = strsep(&val, "=");
3402         if (val == NULL || strlen(val) == 0)
3403                 return -EINVAL;
3404
3405         /* Key of the value pair */
3406         if (strcmp(key, "rate") == 0) {
3407                 rc = kstrtoull(val, 10, &rate);
3408                 if (rc)
3409                         return rc;
3410
3411                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3412                         return -EINVAL;
3413
3414                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3415                         cmd->u.tc_start.ts_rpc_rate = rate;
3416                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3417                         cmd->u.tc_change.tc_rpc_rate = rate;
3418                 else
3419                         return -EINVAL;
3420         }  else if (strcmp(key, "rank") == 0) {
3421                 if (!name_is_valid(val))
3422                         return -EINVAL;
3423
3424                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3425                         cmd->u.tc_start.ts_next_name = val;
3426                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3427                         cmd->u.tc_change.tc_next_name = val;
3428                 else
3429                         return -EINVAL;
3430         } else if (strcmp(key, "realtime") == 0) {
3431                 unsigned long realtime;
3432
3433                 rc = kstrtoul(val, 10, &realtime);
3434                 if (rc)
3435                         return rc;
3436
3437                 if (realtime > 0)
3438                         cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3439         } else {
3440                 return -EINVAL;
3441         }
3442         return 0;
3443 }
3444
3445 static int
3446 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3447 {
3448         char    *val;
3449         char    *token;
3450         int      rc;
3451
3452         val = buffer;
3453         while (val != NULL && strlen(val) != 0) {
3454                 token = strsep(&val, " ");
3455                 rc = nrs_tbf_parse_value_pair(cmd, token);
3456                 if (rc)
3457                         return rc;
3458         }
3459
3460         switch (cmd->tc_cmd) {
3461         case NRS_CTL_TBF_START_RULE:
3462                 if (cmd->u.tc_start.ts_rpc_rate == 0)
3463                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3464                 break;
3465         case NRS_CTL_TBF_CHANGE_RULE:
3466                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3467                     cmd->u.tc_change.tc_next_name == NULL)
3468                         return -EINVAL;
3469                 break;
3470         case NRS_CTL_TBF_STOP_RULE:
3471                 break;
3472         default:
3473                 return -EINVAL;
3474         }
3475         return 0;
3476 }
3477
3478 static struct nrs_tbf_cmd *
3479 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3480 {
3481         static struct nrs_tbf_cmd       *cmd;
3482         char                            *token;
3483         char                            *val;
3484         int                              rc = 0;
3485
3486         OBD_ALLOC_PTR(cmd);
3487         if (cmd == NULL)
3488                 GOTO(out, rc = -ENOMEM);
3489         memset(cmd, 0, sizeof(*cmd));
3490
3491         val = buffer;
3492         token = strsep(&val, " ");
3493         if (val == NULL || strlen(val) == 0)
3494                 GOTO(out_free_cmd, rc = -EINVAL);
3495
3496         /* Type of the command */
3497         if (strcmp(token, "start") == 0) {
3498                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3499                 cmd->u.tc_start.ts_valid_type = type_flag;
3500         } else if (strcmp(token, "stop") == 0)
3501                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3502         else if (strcmp(token, "change") == 0)
3503                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3504         else
3505                 GOTO(out_free_cmd, rc = -EINVAL);
3506
3507         /* Name of the rule */
3508         token = strsep(&val, " ");
3509         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) ||
3510             !name_is_valid(token))
3511                 GOTO(out_free_cmd, rc = -EINVAL);
3512         cmd->tc_name = token;
3513
3514         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3515                 /* List of ID */
3516                 LASSERT(val);
3517                 token = val;
3518                 val = strrchr(token, '}');
3519                 if (!val)
3520                         GOTO(out_free_cmd, rc = -EINVAL);
3521
3522                 /* Skip '}' */
3523                 val++;
3524                 if (*val == '\0') {
3525                         val = NULL;
3526                 } else if (*val == ' ') {
3527                         *val = '\0';
3528                         val++;
3529                 } else
3530                         GOTO(out_free_cmd, rc = -EINVAL);
3531
3532                 rc = nrs_tbf_id_parse(cmd, token);
3533                 if (rc)
3534                         GOTO(out_free_cmd, rc);
3535         }
3536
3537         rc = nrs_tbf_parse_value_pairs(cmd, val);
3538         if (rc)
3539                 GOTO(out_cmd_fini, rc = -EINVAL);
3540         goto out;
3541 out_cmd_fini:
3542         nrs_tbf_cmd_fini(cmd);
3543 out_free_cmd:
3544         OBD_FREE_PTR(cmd);
3545 out:
3546         if (rc)
3547                 cmd = ERR_PTR(rc);
3548         return cmd;
3549 }
3550
3551 /**
3552  * Get the TBF policy type (nid, jobid, etc) preset by
3553  * proc entry 'nrs_policies' for command buffer parsing.
3554  *
3555  * \param[in] svc the PTLRPC service
3556  * \param[in] queue the NRS queue type
3557  *
3558  * \retval the preset TBF policy type flag
3559  */
3560 static __u32
3561 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3562 {
3563         __u32   type;
3564         int     rc;
3565
3566         rc = ptlrpc_nrs_policy_control(svc, queue,
3567                                        NRS_POL_NAME_TBF,
3568                                        NRS_CTL_TBF_RD_TYPE_FLAG,
3569                                        true, &type);
3570         if (rc != 0)
3571                 type = NRS_TBF_FLAG_INVALID;
3572
3573         return type;
3574 }
3575
3576 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3577 static ssize_t
3578 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3579                                       const char __user *buffer,
3580                                       size_t count, loff_t *off)
3581 {
3582         struct seq_file           *m = file->private_data;
3583         struct ptlrpc_service     *svc = m->private;
3584         char                      *kernbuf;
3585         char                      *val;
3586         int                        rc;
3587         static struct nrs_tbf_cmd *cmd;
3588         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3589         unsigned long              length;
3590         char                      *token;
3591
3592         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3593         if (kernbuf == NULL)
3594                 GOTO(out, rc = -ENOMEM);
3595
3596         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3597                 GOTO(out_free_kernbuff, rc = -EINVAL);
3598
3599         if (copy_from_user(kernbuf, buffer, count))
3600                 GOTO(out_free_kernbuff, rc = -EFAULT);
3601
3602         val = kernbuf;
3603         token = strsep(&val, " ");
3604         if (val == NULL)
3605                 GOTO(out_free_kernbuff, rc = -EINVAL);
3606
3607         if (strcmp(token, "reg") == 0) {
3608                 queue = PTLRPC_NRS_QUEUE_REG;
3609         } else if (strcmp(token, "hp") == 0) {
3610                 queue = PTLRPC_NRS_QUEUE_HP;
3611         } else {
3612                 kernbuf[strlen(token)] = ' ';
3613                 val = kernbuf;
3614         }
3615         length = strlen(val);
3616
3617         if (length == 0)
3618                 GOTO(out_free_kernbuff, rc = -EINVAL);
3619
3620         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3621                 GOTO(out_free_kernbuff, rc = -ENODEV);
3622         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3623                 queue = PTLRPC_NRS_QUEUE_REG;
3624
3625         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3626         if (IS_ERR(cmd))
3627                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3628
3629         /**
3630          * Serialize NRS core lprocfs operations with policy registration/
3631          * unregistration.
3632          */
3633         mutex_lock(&nrs_core.nrs_mutex);
3634         rc = ptlrpc_nrs_policy_control(svc, queue,
3635                                        NRS_POL_NAME_TBF,
3636                                        NRS_CTL_TBF_WR_RULE,
3637                                        false, cmd);
3638         mutex_unlock(&nrs_core.nrs_mutex);
3639
3640         nrs_tbf_cmd_fini(cmd);
3641         OBD_FREE_PTR(cmd);
3642 out_free_kernbuff:
3643         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3644 out:
3645         return rc ? rc : count;
3646 }
3647
3648 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3649
3650 /**
3651  * Initializes a TBF policy's lprocfs interface for service \a svc
3652  *
3653  * \param[in] svc the service
3654  *
3655  * \retval 0    success
3656  * \retval != 0 error
3657  */
3658 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3659 {
3660         struct ldebugfs_vars nrs_tbf_lprocfs_vars[] = {
3661                 { .name         = "nrs_tbf_rule",
3662                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3663                   .data = svc },
3664                 { NULL }
3665         };
3666
3667         if (!svc->srv_debugfs_entry)
3668                 return 0;
3669
3670         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_tbf_lprocfs_vars, NULL);
3671
3672         return 0;
3673 }
3674
3675 /**
3676  * TBF policy operations
3677  */
3678 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3679         .op_policy_start        = nrs_tbf_start,
3680         .op_policy_stop         = nrs_tbf_stop,
3681         .op_policy_ctl          = nrs_tbf_ctl,
3682         .op_res_get             = nrs_tbf_res_get,
3683         .op_res_put             = nrs_tbf_res_put,
3684         .op_req_get             = nrs_tbf_req_get,
3685         .op_req_enqueue         = nrs_tbf_req_add,
3686         .op_req_dequeue         = nrs_tbf_req_del,
3687         .op_req_stop            = nrs_tbf_req_stop,
3688         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3689 };
3690
3691 /**
3692  * TBF policy configuration
3693  */
3694 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3695         .nc_name                = NRS_POL_NAME_TBF,
3696         .nc_ops                 = &nrs_tbf_ops,
3697         .nc_compat              = nrs_policy_compat_all,
3698 };
3699
3700 /** @} tbf */
3701
3702 /** @} nrs */