Whamcloud - gitweb
LU-9658 ptlrpc: Add QoS for uid and gid in NRS-TBF
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 #ifdef HAVE_SERVER_SUPPORT
35
36 /**
37  * \addtogoup nrs
38  * @{
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <libcfs/libcfs.h>
45 #include <lustre_req_layout.h>
46 #include "ptlrpc_internal.h"
47
48 /**
49  * \name tbf
50  *
51  * Token Bucket Filter over client NIDs
52  *
53  * @{
54  */
55
56 #define NRS_POL_NAME_TBF        "tbf"
57
58 static int tbf_jobid_cache_size = 8192;
59 module_param(tbf_jobid_cache_size, int, 0644);
60 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
61
62 static int tbf_rate = 10000;
63 module_param(tbf_rate, int, 0644);
64 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
65
66 static int tbf_depth = 3;
67 module_param(tbf_depth, int, 0644);
68 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
69
70 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
71 {
72         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
73                                                  th_timer);
74         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
75         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
76
77         nrs->nrs_throttling = 0;
78         wake_up(&svcpt->scp_waitq);
79
80         return HRTIMER_NORESTART;
81 }
82
83 #define NRS_TBF_DEFAULT_RULE "default"
84
85 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
86 {
87         LASSERT(atomic_read(&rule->tr_ref) == 0);
88         LASSERT(list_empty(&rule->tr_cli_list));
89         LASSERT(list_empty(&rule->tr_linkage));
90
91         rule->tr_head->th_ops->o_rule_fini(rule);
92         OBD_FREE_PTR(rule);
93 }
94
95 /**
96  * Decreases the rule's usage reference count, and stops the rule in case it
97  * was already stopping and have no more outstanding usage references (which
98  * indicates it has no more queued or started requests, and can be safely
99  * stopped).
100  */
101 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
102 {
103         if (atomic_dec_and_test(&rule->tr_ref))
104                 nrs_tbf_rule_fini(rule);
105 }
106
107 /**
108  * Increases the rule's usage reference count.
109  */
110 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
111 {
112         atomic_inc(&rule->tr_ref);
113 }
114
115 static void
116 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
117 {
118         LASSERT(!list_empty(&cli->tc_linkage));
119         LASSERT(cli->tc_rule);
120         spin_lock(&cli->tc_rule->tr_rule_lock);
121         list_del_init(&cli->tc_linkage);
122         spin_unlock(&cli->tc_rule->tr_rule_lock);
123         nrs_tbf_rule_put(cli->tc_rule);
124         cli->tc_rule = NULL;
125 }
126
127 static void
128 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
129                         struct nrs_tbf_client *cli)
130
131 {
132         struct nrs_tbf_rule *rule = cli->tc_rule;
133
134         cli->tc_rpc_rate = rule->tr_rpc_rate;
135         cli->tc_nsecs = rule->tr_nsecs;
136         cli->tc_depth = rule->tr_depth;
137         cli->tc_ntoken = rule->tr_depth;
138         cli->tc_check_time = ktime_to_ns(ktime_get());
139         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
140         cli->tc_rule_generation = rule->tr_generation;
141
142         if (cli->tc_in_heap)
143                 cfs_binheap_relocate(head->th_binheap,
144                                      &cli->tc_node);
145 }
146
147 static void
148 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
149                   struct nrs_tbf_rule *rule,
150                   struct nrs_tbf_client *cli)
151 {
152         spin_lock(&cli->tc_rule_lock);
153         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
154                 LASSERT(rule != cli->tc_rule);
155                 nrs_tbf_cli_rule_put(cli);
156         }
157         LASSERT(cli->tc_rule == NULL);
158         LASSERT(list_empty(&cli->tc_linkage));
159         /* Rule's ref is added before called */
160         cli->tc_rule = rule;
161         spin_lock(&rule->tr_rule_lock);
162         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
163         spin_unlock(&rule->tr_rule_lock);
164         spin_unlock(&cli->tc_rule_lock);
165         nrs_tbf_cli_reset_value(head, cli);
166 }
167
168 static int
169 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
170 {
171         return rule->tr_head->th_ops->o_rule_dump(rule, m);
172 }
173
174 static int
175 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
176 {
177         struct nrs_tbf_rule *rule;
178         int rc = 0;
179
180         LASSERT(head != NULL);
181         spin_lock(&head->th_rule_lock);
182         /* List the rules from newest to oldest */
183         list_for_each_entry(rule, &head->th_list, tr_linkage) {
184                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
185                 rc = nrs_tbf_rule_dump(rule, m);
186                 if (rc) {
187                         rc = -ENOSPC;
188                         break;
189                 }
190         }
191         spin_unlock(&head->th_rule_lock);
192
193         return rc;
194 }
195
196 static struct nrs_tbf_rule *
197 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
198                          const char *name)
199 {
200         struct nrs_tbf_rule *rule;
201
202         LASSERT(head != NULL);
203         list_for_each_entry(rule, &head->th_list, tr_linkage) {
204                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
205                 if (strcmp(rule->tr_name, name) == 0) {
206                         nrs_tbf_rule_get(rule);
207                         return rule;
208                 }
209         }
210         return NULL;
211 }
212
213 static struct nrs_tbf_rule *
214 nrs_tbf_rule_find(struct nrs_tbf_head *head,
215                   const char *name)
216 {
217         struct nrs_tbf_rule *rule;
218
219         LASSERT(head != NULL);
220         spin_lock(&head->th_rule_lock);
221         rule = nrs_tbf_rule_find_nolock(head, name);
222         spin_unlock(&head->th_rule_lock);
223         return rule;
224 }
225
226 static struct nrs_tbf_rule *
227 nrs_tbf_rule_match(struct nrs_tbf_head *head,
228                    struct nrs_tbf_client *cli)
229 {
230         struct nrs_tbf_rule *rule = NULL;
231         struct nrs_tbf_rule *tmp_rule;
232
233         spin_lock(&head->th_rule_lock);
234         /* Match the newest rule in the list */
235         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
236                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
237                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
238                         rule = tmp_rule;
239                         break;
240                 }
241         }
242
243         if (rule == NULL)
244                 rule = head->th_rule;
245
246         nrs_tbf_rule_get(rule);
247         spin_unlock(&head->th_rule_lock);
248         return rule;
249 }
250
251 static void
252 nrs_tbf_cli_init(struct nrs_tbf_head *head,
253                  struct nrs_tbf_client *cli,
254                  struct ptlrpc_request *req)
255 {
256         struct nrs_tbf_rule *rule;
257
258         memset(cli, 0, sizeof(*cli));
259         cli->tc_in_heap = false;
260         head->th_ops->o_cli_init(cli, req);
261         INIT_LIST_HEAD(&cli->tc_list);
262         INIT_LIST_HEAD(&cli->tc_linkage);
263         spin_lock_init(&cli->tc_rule_lock);
264         atomic_set(&cli->tc_ref, 1);
265         rule = nrs_tbf_rule_match(head, cli);
266         nrs_tbf_cli_reset(head, rule, cli);
267 }
268
269 static void
270 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
271 {
272         LASSERT(list_empty(&cli->tc_list));
273         LASSERT(!cli->tc_in_heap);
274         LASSERT(atomic_read(&cli->tc_ref) == 0);
275         spin_lock(&cli->tc_rule_lock);
276         nrs_tbf_cli_rule_put(cli);
277         spin_unlock(&cli->tc_rule_lock);
278         OBD_FREE_PTR(cli);
279 }
280
281 static int
282 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
283                    struct nrs_tbf_head *head,
284                    struct nrs_tbf_cmd *start)
285 {
286         struct nrs_tbf_rule     *rule;
287         struct nrs_tbf_rule     *tmp_rule;
288         struct nrs_tbf_rule     *next_rule;
289         char                    *next_name = start->u.tc_start.ts_next_name;
290         int                      rc;
291
292         rule = nrs_tbf_rule_find(head, start->tc_name);
293         if (rule) {
294                 nrs_tbf_rule_put(rule);
295                 return -EEXIST;
296         }
297
298         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
299         if (rule == NULL)
300                 return -ENOMEM;
301
302         memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
303         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
304         rule->tr_flags = start->u.tc_start.ts_rule_flags;
305         rule->tr_nsecs = NSEC_PER_SEC;
306         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
307         rule->tr_depth = tbf_depth;
308         atomic_set(&rule->tr_ref, 1);
309         INIT_LIST_HEAD(&rule->tr_cli_list);
310         INIT_LIST_HEAD(&rule->tr_nids);
311         INIT_LIST_HEAD(&rule->tr_linkage);
312         spin_lock_init(&rule->tr_rule_lock);
313         rule->tr_head = head;
314
315         rc = head->th_ops->o_rule_init(policy, rule, start);
316         if (rc) {
317                 OBD_FREE_PTR(rule);
318                 return rc;
319         }
320
321         /* Add as the newest rule */
322         spin_lock(&head->th_rule_lock);
323         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
324         if (tmp_rule) {
325                 spin_unlock(&head->th_rule_lock);
326                 nrs_tbf_rule_put(tmp_rule);
327                 nrs_tbf_rule_put(rule);
328                 return -EEXIST;
329         }
330
331         if (next_name) {
332                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
333                 if (!next_rule) {
334                         spin_unlock(&head->th_rule_lock);
335                         nrs_tbf_rule_put(rule);
336                         return -ENOENT;
337                 }
338
339                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
340                 nrs_tbf_rule_put(next_rule);
341         } else {
342                 /* Add on the top of the rule list */
343                 list_add(&rule->tr_linkage, &head->th_list);
344         }
345         spin_unlock(&head->th_rule_lock);
346         atomic_inc(&head->th_rule_sequence);
347         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
348                 rule->tr_flags |= NTRS_DEFAULT;
349                 LASSERT(head->th_rule == NULL);
350                 head->th_rule = rule;
351         }
352
353         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %llu gen %llu\n",
354                rule, rule->tr_rpc_rate, rule->tr_generation);
355
356         return 0;
357 }
358
359 /**
360  * Change the rank of a rule in the rule list
361  *
362  * The matched rule will be moved to the position right before another
363  * given rule.
364  *
365  * \param[in] policy    the policy instance
366  * \param[in] head      the TBF policy instance
367  * \param[in] name      the rule name to be moved
368  * \param[in] next_name the rule name before which the matched rule will be
369  *                      moved
370  *
371  */
372 static int
373 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
374                          struct nrs_tbf_head *head,
375                          char *name,
376                          char *next_name)
377 {
378         struct nrs_tbf_rule     *rule = NULL;
379         struct nrs_tbf_rule     *next_rule = NULL;
380         int                      rc = 0;
381
382         LASSERT(head != NULL);
383
384         spin_lock(&head->th_rule_lock);
385         rule = nrs_tbf_rule_find_nolock(head, name);
386         if (!rule)
387                 GOTO(out, rc = -ENOENT);
388
389         if (strcmp(name, next_name) == 0)
390                 GOTO(out_put, rc);
391
392         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
393         if (!next_rule)
394                 GOTO(out_put, rc = -ENOENT);
395
396         list_move(&rule->tr_linkage, next_rule->tr_linkage.prev);
397         nrs_tbf_rule_put(next_rule);
398 out_put:
399         nrs_tbf_rule_put(rule);
400 out:
401         spin_unlock(&head->th_rule_lock);
402         return rc;
403 }
404
405 static int
406 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
407                          struct nrs_tbf_head *head,
408                          char *name,
409                          __u64 rate)
410 {
411         struct nrs_tbf_rule *rule;
412
413         assert_spin_locked(&policy->pol_nrs->nrs_lock);
414
415         rule = nrs_tbf_rule_find(head, name);
416         if (rule == NULL)
417                 return -ENOENT;
418
419         rule->tr_rpc_rate = rate;
420         rule->tr_nsecs = NSEC_PER_SEC;
421         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
422         rule->tr_generation++;
423         nrs_tbf_rule_put(rule);
424
425         return 0;
426 }
427
428 static int
429 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
430                     struct nrs_tbf_head *head,
431                     struct nrs_tbf_cmd *change)
432 {
433         __u64    rate = change->u.tc_change.tc_rpc_rate;
434         char    *next_name = change->u.tc_change.tc_next_name;
435         int      rc;
436
437         if (rate != 0) {
438                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
439                                               rate);
440                 if (rc)
441                         return rc;
442         }
443
444         if (next_name) {
445                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
446                                               next_name);
447                 if (rc)
448                         return rc;
449         }
450
451         return 0;
452 }
453
454 static int
455 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
456                   struct nrs_tbf_head *head,
457                   struct nrs_tbf_cmd *stop)
458 {
459         struct nrs_tbf_rule *rule;
460
461         assert_spin_locked(&policy->pol_nrs->nrs_lock);
462
463         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
464                 return -EPERM;
465
466         rule = nrs_tbf_rule_find(head, stop->tc_name);
467         if (rule == NULL)
468                 return -ENOENT;
469
470         list_del_init(&rule->tr_linkage);
471         rule->tr_flags |= NTRS_STOPPING;
472         nrs_tbf_rule_put(rule);
473         nrs_tbf_rule_put(rule);
474
475         return 0;
476 }
477
478 static int
479 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
480                 struct nrs_tbf_head *head,
481                 struct nrs_tbf_cmd *cmd)
482 {
483         int rc;
484
485         assert_spin_locked(&policy->pol_nrs->nrs_lock);
486
487         switch (cmd->tc_cmd) {
488         case NRS_CTL_TBF_START_RULE:
489                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
490                         return -EINVAL;
491
492                 spin_unlock(&policy->pol_nrs->nrs_lock);
493                 rc = nrs_tbf_rule_start(policy, head, cmd);
494                 spin_lock(&policy->pol_nrs->nrs_lock);
495                 return rc;
496         case NRS_CTL_TBF_CHANGE_RULE:
497                 rc = nrs_tbf_rule_change(policy, head, cmd);
498                 return rc;
499         case NRS_CTL_TBF_STOP_RULE:
500                 rc = nrs_tbf_rule_stop(policy, head, cmd);
501                 /* Take it as a success, if not exists at all */
502                 return rc == -ENOENT ? 0 : rc;
503         default:
504                 return -EFAULT;
505         }
506 }
507
508 /**
509  * Binary heap predicate.
510  *
511  * \param[in] e1 the first binheap node to compare
512  * \param[in] e2 the second binheap node to compare
513  *
514  * \retval 0 e1 > e2
515  * \retval 1 e1 < e2
516  */
517 static int
518 tbf_cli_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
519 {
520         struct nrs_tbf_client *cli1;
521         struct nrs_tbf_client *cli2;
522
523         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
524         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
525
526         if (cli1->tc_deadline < cli2->tc_deadline)
527                 return 1;
528         else if (cli1->tc_deadline > cli2->tc_deadline)
529                 return 0;
530
531         if (cli1->tc_check_time < cli2->tc_check_time)
532                 return 1;
533         else if (cli1->tc_check_time > cli2->tc_check_time)
534                 return 0;
535
536         /* Maybe need more comparasion, e.g. request number in the rules */
537         return 1;
538 }
539
540 /**
541  * TBF binary heap operations
542  */
543 static struct cfs_binheap_ops nrs_tbf_heap_ops = {
544         .hop_enter      = NULL,
545         .hop_exit       = NULL,
546         .hop_compare    = tbf_cli_compare,
547 };
548
549 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
550                                   unsigned mask)
551 {
552         return cfs_hash_djb2_hash(key, strlen(key), mask);
553 }
554
555 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
556 {
557         struct nrs_tbf_client *cli = hlist_entry(hnode,
558                                                      struct nrs_tbf_client,
559                                                      tc_hnode);
560
561         return (strcmp(cli->tc_jobid, key) == 0);
562 }
563
564 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
565 {
566         struct nrs_tbf_client *cli = hlist_entry(hnode,
567                                                      struct nrs_tbf_client,
568                                                      tc_hnode);
569
570         return cli->tc_jobid;
571 }
572
573 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
574 {
575         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
576 }
577
578 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
579 {
580         struct nrs_tbf_client *cli = hlist_entry(hnode,
581                                                      struct nrs_tbf_client,
582                                                      tc_hnode);
583
584         atomic_inc(&cli->tc_ref);
585 }
586
587 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
588 {
589         struct nrs_tbf_client *cli = hlist_entry(hnode,
590                                                      struct nrs_tbf_client,
591                                                      tc_hnode);
592
593         atomic_dec(&cli->tc_ref);
594 }
595
596 static void
597 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
598
599 {
600         struct nrs_tbf_client *cli = hlist_entry(hnode,
601                                                  struct nrs_tbf_client,
602                                                  tc_hnode);
603
604         LASSERT(atomic_read(&cli->tc_ref) == 0);
605         nrs_tbf_cli_fini(cli);
606 }
607
608 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
609         .hs_hash        = nrs_tbf_jobid_hop_hash,
610         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
611         .hs_key         = nrs_tbf_jobid_hop_key,
612         .hs_object      = nrs_tbf_hop_object,
613         .hs_get         = nrs_tbf_jobid_hop_get,
614         .hs_put         = nrs_tbf_jobid_hop_put,
615         .hs_put_locked  = nrs_tbf_jobid_hop_put,
616         .hs_exit        = nrs_tbf_jobid_hop_exit,
617 };
618
619 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
620                                   CFS_HASH_NO_ITEMREF | \
621                                   CFS_HASH_DEPTH)
622
623 static struct nrs_tbf_client *
624 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
625                           struct cfs_hash_bd *bd,
626                           const char *jobid)
627 {
628         struct hlist_node *hnode;
629         struct nrs_tbf_client *cli;
630
631         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
632         if (hnode == NULL)
633                 return NULL;
634
635         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
636         if (!list_empty(&cli->tc_lru))
637                 list_del_init(&cli->tc_lru);
638         return cli;
639 }
640
641 #define NRS_TBF_JOBID_NULL ""
642
643 static struct nrs_tbf_client *
644 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
645                        struct ptlrpc_request *req)
646 {
647         const char              *jobid;
648         struct nrs_tbf_client   *cli;
649         struct cfs_hash         *hs = head->th_cli_hash;
650         struct cfs_hash_bd               bd;
651
652         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
653         if (jobid == NULL)
654                 jobid = NRS_TBF_JOBID_NULL;
655         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
656         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
657         cfs_hash_bd_unlock(hs, &bd, 1);
658
659         return cli;
660 }
661
662 static struct nrs_tbf_client *
663 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
664                           struct nrs_tbf_client *cli)
665 {
666         const char              *jobid;
667         struct nrs_tbf_client   *ret;
668         struct cfs_hash         *hs = head->th_cli_hash;
669         struct cfs_hash_bd               bd;
670
671         jobid = cli->tc_jobid;
672         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
673         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
674         if (ret == NULL) {
675                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
676                 ret = cli;
677         }
678         cfs_hash_bd_unlock(hs, &bd, 1);
679
680         return ret;
681 }
682
683 static void
684 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
685                       struct nrs_tbf_client *cli)
686 {
687         struct cfs_hash_bd               bd;
688         struct cfs_hash         *hs = head->th_cli_hash;
689         struct nrs_tbf_bucket   *bkt;
690         int                      hw;
691         struct list_head        zombies;
692
693         INIT_LIST_HEAD(&zombies);
694         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
695         bkt = cfs_hash_bd_extra_get(hs, &bd);
696         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
697                 return;
698         LASSERT(list_empty(&cli->tc_lru));
699         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
700
701         /*
702          * Check and purge the LRU, there is at least one client in the LRU.
703          */
704         hw = tbf_jobid_cache_size >>
705              (hs->hs_cur_bits - hs->hs_bkt_bits);
706         while (cfs_hash_bd_count_get(&bd) > hw) {
707                 if (unlikely(list_empty(&bkt->ntb_lru)))
708                         break;
709                 cli = list_entry(bkt->ntb_lru.next,
710                                      struct nrs_tbf_client,
711                                      tc_lru);
712                 LASSERT(atomic_read(&cli->tc_ref) == 0);
713                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
714                 list_move(&cli->tc_lru, &zombies);
715         }
716         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
717
718         while (!list_empty(&zombies)) {
719                 cli = container_of0(zombies.next,
720                                     struct nrs_tbf_client, tc_lru);
721                 list_del_init(&cli->tc_lru);
722                 nrs_tbf_cli_fini(cli);
723         }
724 }
725
726 static void
727 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
728                        struct ptlrpc_request *req)
729 {
730         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
731
732         if (jobid == NULL)
733                 jobid = NRS_TBF_JOBID_NULL;
734         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
735         INIT_LIST_HEAD(&cli->tc_lru);
736         memcpy(cli->tc_jobid, jobid, strlen(jobid));
737 }
738
739 static int nrs_tbf_jobid_hash_order(void)
740 {
741         int bits;
742
743         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
744                 ;
745
746         return bits;
747 }
748
749 #define NRS_TBF_JOBID_BKT_BITS 10
750
751 static int
752 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
753                       struct nrs_tbf_head *head)
754 {
755         struct nrs_tbf_cmd       start;
756         struct nrs_tbf_bucket   *bkt;
757         int                      bits;
758         int                      i;
759         int                      rc;
760         struct cfs_hash_bd       bd;
761
762         bits = nrs_tbf_jobid_hash_order();
763         if (bits < NRS_TBF_JOBID_BKT_BITS)
764                 bits = NRS_TBF_JOBID_BKT_BITS;
765         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
766                                             bits,
767                                             bits,
768                                             NRS_TBF_JOBID_BKT_BITS,
769                                             sizeof(*bkt),
770                                             0,
771                                             0,
772                                             &nrs_tbf_jobid_hash_ops,
773                                             NRS_TBF_JOBID_HASH_FLAGS);
774         if (head->th_cli_hash == NULL)
775                 return -ENOMEM;
776
777         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
778                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
779                 INIT_LIST_HEAD(&bkt->ntb_lru);
780         }
781
782         memset(&start, 0, sizeof(start));
783         start.u.tc_start.ts_jobids_str = "*";
784
785         start.u.tc_start.ts_rpc_rate = tbf_rate;
786         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
787         start.tc_name = NRS_TBF_DEFAULT_RULE;
788         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
789         rc = nrs_tbf_rule_start(policy, head, &start);
790         if (rc) {
791                 cfs_hash_putref(head->th_cli_hash);
792                 head->th_cli_hash = NULL;
793         }
794
795         return rc;
796 }
797
798 /**
799  * Frees jobid of \a list.
800  *
801  */
802 static void
803 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
804 {
805         struct nrs_tbf_jobid *jobid, *n;
806
807         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
808                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
809                 list_del(&jobid->tj_linkage);
810                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
811         }
812 }
813
814 static int
815 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
816 {
817         struct nrs_tbf_jobid *jobid;
818         struct cfs_lstr res;
819         int rc;
820
821         OBD_ALLOC(jobid, sizeof(struct nrs_tbf_jobid));
822         if (jobid == NULL)
823                 return -ENOMEM;
824
825         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
826         if (jobid->tj_id == NULL) {
827                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
828                 return -ENOMEM;
829         }
830
831         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
832         rc = cfs_gettok(id, '*', &res);
833         if (rc == 0)
834                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
835         else
836                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
837
838         list_add_tail(&jobid->tj_linkage, jobid_list);
839         return 0;
840 }
841
842 static bool
843 cfs_match_wildcard(const char *pattern, const char *content)
844 {
845         if (*pattern == '\0' && *content == '\0')
846                 return true;
847
848         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
849                 return false;
850
851         while (*pattern == *content) {
852                 pattern++;
853                 content++;
854                 if (*pattern == '\0' && *content == '\0')
855                         return true;
856
857                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
858                     *content == '\0')
859                         return false;
860         }
861
862         if (*pattern == '*')
863                 return (cfs_match_wildcard(pattern + 1, content) ||
864                         cfs_match_wildcard(pattern, content + 1));
865
866         return false;
867 }
868
869 static inline bool
870 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
871 {
872         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
873                 return strcmp(jobid->tj_id, id) == 0;
874
875         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
876                 return cfs_match_wildcard(jobid->tj_id, id);
877
878         return false;
879 }
880
881 static int
882 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
883 {
884         struct nrs_tbf_jobid *jobid;
885
886         list_for_each_entry(jobid, jobid_list, tj_linkage) {
887                 if (nrs_tbf_jobid_match(jobid, id))
888                         return 1;
889         }
890         return 0;
891 }
892
893 static int
894 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
895 {
896         struct cfs_lstr src;
897         struct cfs_lstr res;
898         int rc = 0;
899         ENTRY;
900
901         src.ls_str = str;
902         src.ls_len = len;
903         INIT_LIST_HEAD(jobid_list);
904         while (src.ls_str) {
905                 rc = cfs_gettok(&src, ' ', &res);
906                 if (rc == 0) {
907                         rc = -EINVAL;
908                         break;
909                 }
910                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
911                 if (rc)
912                         break;
913         }
914         if (rc)
915                 nrs_tbf_jobid_list_free(jobid_list);
916         RETURN(rc);
917 }
918
919 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
920 {
921         if (!list_empty(&cmd->u.tc_start.ts_jobids))
922                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
923         if (cmd->u.tc_start.ts_jobids_str)
924                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
925                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
926 }
927
928 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
929 {
930         struct cfs_lstr res;
931         int keylen = strlen(key);
932         int rc;
933
934         rc = cfs_gettok(src, '=', &res);
935         if (rc == 0 || res.ls_len != keylen ||
936             strncmp(res.ls_str, key, keylen) != 0 ||
937             src->ls_len <= 2 || src->ls_str[0] != '{' ||
938             src->ls_str[src->ls_len - 1] != '}')
939                 return -EINVAL;
940
941         /* Skip '{' and '}' */
942         src->ls_str++;
943         src->ls_len -= 2;
944         return 0;
945 }
946
947 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
948 {
949         struct cfs_lstr src;
950         int rc;
951
952         src.ls_str = id;
953         src.ls_len = strlen(id);
954         rc = nrs_tbf_check_id_value(&src, "jobid");
955         if (rc)
956                 return rc;
957
958         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
959         if (cmd->u.tc_start.ts_jobids_str == NULL)
960                 return -ENOMEM;
961
962         memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
963
964         /* parse jobid list */
965         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
966                                       strlen(cmd->u.tc_start.ts_jobids_str),
967                                       &cmd->u.tc_start.ts_jobids);
968         if (rc)
969                 nrs_tbf_jobid_cmd_fini(cmd);
970
971         return rc;
972 }
973
974 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
975                                    struct nrs_tbf_rule *rule,
976                                    struct nrs_tbf_cmd *start)
977 {
978         int rc = 0;
979
980         LASSERT(start->u.tc_start.ts_jobids_str);
981         OBD_ALLOC(rule->tr_jobids_str,
982                   strlen(start->u.tc_start.ts_jobids_str) + 1);
983         if (rule->tr_jobids_str == NULL)
984                 return -ENOMEM;
985
986         memcpy(rule->tr_jobids_str,
987                start->u.tc_start.ts_jobids_str,
988                strlen(start->u.tc_start.ts_jobids_str));
989
990         INIT_LIST_HEAD(&rule->tr_jobids);
991         if (!list_empty(&start->u.tc_start.ts_jobids)) {
992                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
993                                               strlen(rule->tr_jobids_str),
994                                               &rule->tr_jobids);
995                 if (rc)
996                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
997         }
998         if (rc)
999                 OBD_FREE(rule->tr_jobids_str,
1000                          strlen(start->u.tc_start.ts_jobids_str) + 1);
1001         return rc;
1002 }
1003
1004 static int
1005 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1006 {
1007         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1008                    rule->tr_jobids_str, rule->tr_rpc_rate,
1009                    atomic_read(&rule->tr_ref) - 1);
1010         return 0;
1011 }
1012
1013 static int
1014 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1015                          struct nrs_tbf_client *cli)
1016 {
1017         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1018 }
1019
1020 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1021 {
1022         if (!list_empty(&rule->tr_jobids))
1023                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1024         LASSERT(rule->tr_jobids_str != NULL);
1025         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1026 }
1027
1028 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1029         .o_name = NRS_TBF_TYPE_JOBID,
1030         .o_startup = nrs_tbf_jobid_startup,
1031         .o_cli_find = nrs_tbf_jobid_cli_find,
1032         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1033         .o_cli_put = nrs_tbf_jobid_cli_put,
1034         .o_cli_init = nrs_tbf_jobid_cli_init,
1035         .o_rule_init = nrs_tbf_jobid_rule_init,
1036         .o_rule_dump = nrs_tbf_jobid_rule_dump,
1037         .o_rule_match = nrs_tbf_jobid_rule_match,
1038         .o_rule_fini = nrs_tbf_jobid_rule_fini,
1039 };
1040
1041 /**
1042  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1043  *
1044  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
1045  * nrs_tbf_client objects.
1046  */
1047 #define NRS_TBF_NID_BKT_BITS    8
1048 #define NRS_TBF_NID_BITS        16
1049
1050 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1051                                   unsigned mask)
1052 {
1053         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1054 }
1055
1056 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1057 {
1058         lnet_nid_t            *nid = (lnet_nid_t *)key;
1059         struct nrs_tbf_client *cli = hlist_entry(hnode,
1060                                                      struct nrs_tbf_client,
1061                                                      tc_hnode);
1062
1063         return *nid == cli->tc_nid;
1064 }
1065
1066 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1067 {
1068         struct nrs_tbf_client *cli = hlist_entry(hnode,
1069                                                      struct nrs_tbf_client,
1070                                                      tc_hnode);
1071
1072         return &cli->tc_nid;
1073 }
1074
1075 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1076 {
1077         struct nrs_tbf_client *cli = hlist_entry(hnode,
1078                                                      struct nrs_tbf_client,
1079                                                      tc_hnode);
1080
1081         atomic_inc(&cli->tc_ref);
1082 }
1083
1084 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1085 {
1086         struct nrs_tbf_client *cli = hlist_entry(hnode,
1087                                                      struct nrs_tbf_client,
1088                                                      tc_hnode);
1089
1090         atomic_dec(&cli->tc_ref);
1091 }
1092
1093 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1094 {
1095         struct nrs_tbf_client *cli = hlist_entry(hnode,
1096                                                      struct nrs_tbf_client,
1097                                                      tc_hnode);
1098
1099         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1100                  "Busy TBF object from client with NID %s, with %d refs\n",
1101                  libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1102
1103         nrs_tbf_cli_fini(cli);
1104 }
1105
1106 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1107         .hs_hash        = nrs_tbf_nid_hop_hash,
1108         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1109         .hs_key         = nrs_tbf_nid_hop_key,
1110         .hs_object      = nrs_tbf_hop_object,
1111         .hs_get         = nrs_tbf_nid_hop_get,
1112         .hs_put         = nrs_tbf_nid_hop_put,
1113         .hs_put_locked  = nrs_tbf_nid_hop_put,
1114         .hs_exit        = nrs_tbf_nid_hop_exit,
1115 };
1116
1117 static struct nrs_tbf_client *
1118 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1119                      struct ptlrpc_request *req)
1120 {
1121         return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1122 }
1123
1124 static struct nrs_tbf_client *
1125 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1126                         struct nrs_tbf_client *cli)
1127 {
1128         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1129                                        &cli->tc_hnode);
1130 }
1131
1132 static void
1133 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1134                       struct nrs_tbf_client *cli)
1135 {
1136         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1137 }
1138
1139 static int
1140 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1141                     struct nrs_tbf_head *head)
1142 {
1143         struct nrs_tbf_cmd      start;
1144         int rc;
1145
1146         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1147                                             NRS_TBF_NID_BITS,
1148                                             NRS_TBF_NID_BITS,
1149                                             NRS_TBF_NID_BKT_BITS, 0,
1150                                             CFS_HASH_MIN_THETA,
1151                                             CFS_HASH_MAX_THETA,
1152                                             &nrs_tbf_nid_hash_ops,
1153                                             CFS_HASH_RW_BKTLOCK);
1154         if (head->th_cli_hash == NULL)
1155                 return -ENOMEM;
1156
1157         memset(&start, 0, sizeof(start));
1158         start.u.tc_start.ts_nids_str = "*";
1159
1160         start.u.tc_start.ts_rpc_rate = tbf_rate;
1161         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1162         start.tc_name = NRS_TBF_DEFAULT_RULE;
1163         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1164         rc = nrs_tbf_rule_start(policy, head, &start);
1165         if (rc) {
1166                 cfs_hash_putref(head->th_cli_hash);
1167                 head->th_cli_hash = NULL;
1168         }
1169
1170         return rc;
1171 }
1172
1173 static void
1174 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1175                              struct ptlrpc_request *req)
1176 {
1177         cli->tc_nid = req->rq_peer.nid;
1178 }
1179
1180 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1181                                  struct nrs_tbf_rule *rule,
1182                                  struct nrs_tbf_cmd *start)
1183 {
1184         LASSERT(start->u.tc_start.ts_nids_str);
1185         OBD_ALLOC(rule->tr_nids_str,
1186                   strlen(start->u.tc_start.ts_nids_str) + 1);
1187         if (rule->tr_nids_str == NULL)
1188                 return -ENOMEM;
1189
1190         memcpy(rule->tr_nids_str,
1191                start->u.tc_start.ts_nids_str,
1192                strlen(start->u.tc_start.ts_nids_str));
1193
1194         INIT_LIST_HEAD(&rule->tr_nids);
1195         if (!list_empty(&start->u.tc_start.ts_nids)) {
1196                 if (cfs_parse_nidlist(rule->tr_nids_str,
1197                                       strlen(rule->tr_nids_str),
1198                                       &rule->tr_nids) <= 0) {
1199                         CERROR("nids {%s} illegal\n",
1200                                rule->tr_nids_str);
1201                         OBD_FREE(rule->tr_nids_str,
1202                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1203                         return -EINVAL;
1204                 }
1205         }
1206         return 0;
1207 }
1208
1209 static int
1210 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1211 {
1212         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1213                    rule->tr_nids_str, rule->tr_rpc_rate,
1214                    atomic_read(&rule->tr_ref) - 1);
1215         return 0;
1216 }
1217
1218 static int
1219 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1220                        struct nrs_tbf_client *cli)
1221 {
1222         return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1223 }
1224
1225 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1226 {
1227         if (!list_empty(&rule->tr_nids))
1228                 cfs_free_nidlist(&rule->tr_nids);
1229         LASSERT(rule->tr_nids_str != NULL);
1230         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1231 }
1232
1233 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1234 {
1235         if (!list_empty(&cmd->u.tc_start.ts_nids))
1236                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1237         if (cmd->u.tc_start.ts_nids_str)
1238                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1239                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1240 }
1241
1242 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1243 {
1244         struct cfs_lstr src;
1245         int rc;
1246
1247         src.ls_str = id;
1248         src.ls_len = strlen(id);
1249         rc = nrs_tbf_check_id_value(&src, "nid");
1250         if (rc)
1251                 return rc;
1252
1253         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1254         if (cmd->u.tc_start.ts_nids_str == NULL)
1255                 return -ENOMEM;
1256
1257         memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1258
1259         /* parse NID list */
1260         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1261                               strlen(cmd->u.tc_start.ts_nids_str),
1262                               &cmd->u.tc_start.ts_nids) <= 0) {
1263                 nrs_tbf_nid_cmd_fini(cmd);
1264                 return -EINVAL;
1265         }
1266
1267         return 0;
1268 }
1269
1270 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1271         .o_name = NRS_TBF_TYPE_NID,
1272         .o_startup = nrs_tbf_nid_startup,
1273         .o_cli_find = nrs_tbf_nid_cli_find,
1274         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1275         .o_cli_put = nrs_tbf_nid_cli_put,
1276         .o_cli_init = nrs_tbf_nid_cli_init,
1277         .o_rule_init = nrs_tbf_nid_rule_init,
1278         .o_rule_dump = nrs_tbf_nid_rule_dump,
1279         .o_rule_match = nrs_tbf_nid_rule_match,
1280         .o_rule_fini = nrs_tbf_nid_rule_fini,
1281 };
1282
1283 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1284                                  unsigned mask)
1285 {
1286         return cfs_hash_djb2_hash(key, strlen(key), mask);
1287 }
1288
1289 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1290 {
1291         struct nrs_tbf_client *cli = hlist_entry(hnode,
1292                                                  struct nrs_tbf_client,
1293                                                  tc_hnode);
1294
1295         return (strcmp(cli->tc_key, key) == 0);
1296 }
1297
1298 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1299 {
1300         struct nrs_tbf_client *cli = hlist_entry(hnode,
1301                                                  struct nrs_tbf_client,
1302                                                  tc_hnode);
1303         return cli->tc_key;
1304 }
1305
1306 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1307 {
1308         struct nrs_tbf_client *cli = hlist_entry(hnode,
1309                                                  struct nrs_tbf_client,
1310                                                  tc_hnode);
1311
1312         atomic_inc(&cli->tc_ref);
1313 }
1314
1315 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1316 {
1317         struct nrs_tbf_client *cli = hlist_entry(hnode,
1318                                                  struct nrs_tbf_client,
1319                                                  tc_hnode);
1320
1321         atomic_dec(&cli->tc_ref);
1322 }
1323
1324 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1325
1326 {
1327         struct nrs_tbf_client *cli = hlist_entry(hnode,
1328                                                  struct nrs_tbf_client,
1329                                                  tc_hnode);
1330
1331         LASSERT(atomic_read(&cli->tc_ref) == 0);
1332         nrs_tbf_cli_fini(cli);
1333 }
1334
1335 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1336         .hs_hash        = nrs_tbf_hop_hash,
1337         .hs_keycmp      = nrs_tbf_hop_keycmp,
1338         .hs_key         = nrs_tbf_hop_key,
1339         .hs_object      = nrs_tbf_hop_object,
1340         .hs_get         = nrs_tbf_hop_get,
1341         .hs_put         = nrs_tbf_hop_put,
1342         .hs_put_locked  = nrs_tbf_hop_put,
1343         .hs_exit        = nrs_tbf_hop_exit,
1344 };
1345
1346 #define NRS_TBF_GENERIC_BKT_BITS        10
1347 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1348                                         CFS_HASH_NO_ITEMREF | \
1349                                         CFS_HASH_DEPTH)
1350
1351 static int
1352 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1353 {
1354         struct nrs_tbf_cmd       start;
1355         struct nrs_tbf_bucket   *bkt;
1356         int                      bits;
1357         int                      i;
1358         int                      rc;
1359         struct cfs_hash_bd       bd;
1360
1361         bits = nrs_tbf_jobid_hash_order();
1362         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1363                 bits = NRS_TBF_GENERIC_BKT_BITS;
1364         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1365                                             bits, bits,
1366                                             NRS_TBF_GENERIC_BKT_BITS,
1367                                             sizeof(*bkt), 0, 0,
1368                                             &nrs_tbf_hash_ops,
1369                                             NRS_TBF_GENERIC_HASH_FLAGS);
1370         if (head->th_cli_hash == NULL)
1371                 return -ENOMEM;
1372
1373         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1374                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1375                 INIT_LIST_HEAD(&bkt->ntb_lru);
1376         }
1377
1378         memset(&start, 0, sizeof(start));
1379         start.u.tc_start.ts_conds_str = "*";
1380
1381         start.u.tc_start.ts_rpc_rate = tbf_rate;
1382         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1383         start.tc_name = NRS_TBF_DEFAULT_RULE;
1384         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1385         rc = nrs_tbf_rule_start(policy, head, &start);
1386         if (rc)
1387                 cfs_hash_putref(head->th_cli_hash);
1388
1389         return rc;
1390 }
1391
1392 static struct nrs_tbf_client *
1393 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1394                         const char *key)
1395 {
1396         struct hlist_node *hnode;
1397         struct nrs_tbf_client *cli;
1398
1399         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1400         if (hnode == NULL)
1401                 return NULL;
1402
1403         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
1404         if (!list_empty(&cli->tc_lru))
1405                 list_del_init(&cli->tc_lru);
1406         return cli;
1407 }
1408
1409 /**
1410  * ONLY opcode presented in this function will be checked in
1411  * nrs_tbf_id_cli_set(). That means, we can add or remove an
1412  * opcode to enable or disable requests handled in nrs_tbf
1413  */
1414 static struct req_format *req_fmt(__u32 opcode)
1415 {
1416         switch (opcode) {
1417         case OST_GETATTR:
1418                 return &RQF_OST_GETATTR;
1419         case OST_SETATTR:
1420                 return &RQF_OST_SETATTR;
1421         case OST_READ:
1422                 return &RQF_OST_BRW_READ;
1423         case OST_WRITE:
1424                 return &RQF_OST_BRW_WRITE;
1425         /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1426          * in most case. Should they be removed? */
1427         case OST_CREATE:
1428                 return &RQF_OST_CREATE;
1429         case OST_DESTROY:
1430                 return &RQF_OST_DESTROY;
1431         case OST_PUNCH:
1432                 return &RQF_OST_PUNCH;
1433         case OST_SYNC:
1434                 return &RQF_OST_SYNC;
1435         case OST_LADVISE:
1436                 return &RQF_OST_LADVISE;
1437         case MDS_GETATTR:
1438                 return &RQF_MDS_GETATTR;
1439         case MDS_GETATTR_NAME:
1440                 return &RQF_MDS_GETATTR_NAME;
1441         /* close is skipped to avoid LDLM cancel slowness */
1442 #if 0
1443         case MDS_CLOSE:
1444                 return &RQF_MDS_CLOSE;
1445 #endif
1446         case MDS_REINT:
1447                 return &RQF_MDS_REINT;
1448         case MDS_READPAGE:
1449                 return &RQF_MDS_READPAGE;
1450         case MDS_GET_ROOT:
1451                 return &RQF_MDS_GET_ROOT;
1452         case MDS_STATFS:
1453                 return &RQF_MDS_STATFS;
1454         case MDS_SYNC:
1455                 return &RQF_MDS_SYNC;
1456         case MDS_QUOTACTL:
1457                 return &RQF_MDS_QUOTACTL;
1458         case MDS_GETXATTR:
1459                 return &RQF_MDS_GETXATTR;
1460         case MDS_GET_INFO:
1461                 return &RQF_MDS_GET_INFO;
1462         /* HSM op is skipped */
1463 #if 0 
1464         case MDS_HSM_STATE_GET:
1465                 return &RQF_MDS_HSM_STATE_GET;
1466         case MDS_HSM_STATE_SET:
1467                 return &RQF_MDS_HSM_STATE_SET;
1468         case MDS_HSM_ACTION:
1469                 return &RQF_MDS_HSM_ACTION;
1470         case MDS_HSM_CT_REGISTER:
1471                 return &RQF_MDS_HSM_CT_REGISTER;
1472         case MDS_HSM_CT_UNREGISTER:
1473                 return &RQF_MDS_HSM_CT_UNREGISTER;
1474 #endif
1475         case MDS_SWAP_LAYOUTS:
1476                 return &RQF_MDS_SWAP_LAYOUTS;
1477         case LDLM_ENQUEUE:
1478                 return &RQF_LDLM_ENQUEUE;
1479         default:
1480                 return NULL;
1481         }
1482 }
1483
1484 static struct req_format *intent_req_fmt(__u32 it_opc)
1485 {
1486         if (it_opc & (IT_OPEN | IT_CREAT))
1487                 return &RQF_LDLM_INTENT_OPEN;
1488         else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1489                 return &RQF_LDLM_INTENT_GETATTR;
1490         else if (it_opc & IT_UNLINK)
1491                 return &RQF_LDLM_INTENT_UNLINK;
1492         else if (it_opc & IT_GETXATTR)
1493                 return &RQF_LDLM_INTENT_GETXATTR;
1494         else if (it_opc & (IT_GLIMPSE | IT_BRW))
1495                 return &RQF_LDLM_INTENT;
1496         else
1497                 return NULL;
1498 }
1499
1500 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1501                               struct tbf_id *id)
1502 {
1503         struct ost_body *body;
1504
1505         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1506         if (body != NULL) {
1507                 id->ti_uid = body->oa.o_uid;
1508                 id->ti_gid = body->oa.o_gid;
1509                 return 0;
1510         }
1511
1512         return -EINVAL;
1513 }
1514
1515 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1516                                       struct tbf_id *id)
1517 {
1518         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1519                                                     &RMF_MDT_BODY);
1520         LASSERT(b != NULL);
1521
1522         /* TODO: nodemaping feature converts {ug}id from individual
1523          * clients to the actual ones of the file system. Some work
1524          * may be needed to fix this. */
1525         id->ti_uid = b->mbo_uid;
1526         id->ti_gid = b->mbo_gid;
1527 }
1528
1529 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1530                                            struct tbf_id *id)
1531 {
1532         struct mdt_rec_reint *rec;
1533
1534         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1535         LASSERT(rec != NULL);
1536
1537         /* use the fs{ug}id as {ug}id of the process */
1538         id->ti_uid = rec->rr_fsuid;
1539         id->ti_gid = rec->rr_fsgid;
1540 }
1541
1542 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1543                               struct tbf_id *id)
1544 {
1545         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1546         int rc = 0;
1547
1548         switch (opc) {
1549         case MDS_GETATTR:
1550         case MDS_GETATTR_NAME:
1551         case MDS_GET_ROOT:
1552         case MDS_READPAGE:
1553         case MDS_SYNC:
1554         case MDS_GETXATTR:
1555         case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1556                 unpack_ugid_from_mdt_body(req, id);
1557                 break;
1558         case MDS_CLOSE:
1559         case MDS_REINT:
1560                 unpack_ugid_from_mdt_rec_reint(req, id);
1561                 break;
1562         default:
1563                 rc = -EINVAL;
1564                 break;
1565         }
1566         return rc;
1567 }
1568
1569 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1570                               struct tbf_id *id)
1571 {
1572         struct ldlm_intent *lit;
1573         struct req_format *fmt;
1574
1575         if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1576                 return -EINVAL;
1577
1578         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1579         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1580         if (lit == NULL)
1581                 return -EINVAL;
1582
1583         fmt = intent_req_fmt(lit->opc);
1584         if (fmt == NULL)
1585                 return -EINVAL;
1586
1587         req_capsule_extend(&req->rq_pill, fmt);
1588
1589         if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1590                 unpack_ugid_from_mdt_body(req, id);
1591         else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1592                 unpack_ugid_from_mdt_rec_reint(req, id);
1593         else
1594                 return -EINVAL;
1595         return 0;
1596 }
1597
1598 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1599                               enum nrs_tbf_flag ti_type)
1600 {
1601         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1602         struct req_format *fmt = req_fmt(opc);
1603         bool fmt_unset = false;
1604         int rc;
1605
1606         memset(id, 0, sizeof(struct tbf_id));
1607         id->ti_type = ti_type;
1608
1609         if (fmt == NULL)
1610                 return -EINVAL;
1611         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1612         if (req->rq_pill.rc_fmt == NULL) {
1613                 req_capsule_set(&req->rq_pill, fmt);
1614                 fmt_unset = true;
1615         }
1616
1617         if (opc < OST_LAST_OPC)
1618                 rc = ost_tbf_id_cli_set(req, id);
1619         else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1620                 rc = mdt_tbf_id_cli_set(req, id);
1621         else if (opc == LDLM_ENQUEUE)
1622                 rc = ldlm_tbf_id_cli_set(req, id);
1623         else
1624                 rc = -EINVAL;
1625
1626         /* restore it to the initialized state */
1627         if (fmt_unset)
1628                 req->rq_pill.rc_fmt = NULL;
1629         return rc;
1630 }
1631
1632 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1633                                        struct ptlrpc_request *req,
1634                                        char *keystr, size_t keystr_sz)
1635 {
1636         const char *jobid;
1637         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1638         struct tbf_id id;
1639
1640         nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1641         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1642         if (jobid == NULL)
1643                 jobid = NRS_TBF_JOBID_NULL;
1644
1645         snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1646                  libcfs_nid2str(req->rq_peer.nid), opc, id.ti_uid,
1647                  id.ti_gid);
1648
1649         if (cli) {
1650                 INIT_LIST_HEAD(&cli->tc_lru);
1651                 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1652                 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1653                 cli->tc_nid = req->rq_peer.nid;
1654                 cli->tc_opcode = opc;
1655                 cli->tc_id = id;
1656         }
1657 }
1658
1659 static struct nrs_tbf_client *
1660 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1661 {
1662         struct nrs_tbf_client *cli;
1663         struct cfs_hash *hs = head->th_cli_hash;
1664         struct cfs_hash_bd bd;
1665         char keystr[NRS_TBF_KEY_LEN];
1666
1667         nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1668         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1669         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1670         cfs_hash_bd_unlock(hs, &bd, 1);
1671
1672         return cli;
1673 }
1674
1675 static struct nrs_tbf_client *
1676 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1677                     struct nrs_tbf_client *cli)
1678 {
1679         const char              *key;
1680         struct nrs_tbf_client   *ret;
1681         struct cfs_hash         *hs = head->th_cli_hash;
1682         struct cfs_hash_bd       bd;
1683
1684         key = cli->tc_key;
1685         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1686         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1687         if (ret == NULL) {
1688                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1689                 ret = cli;
1690         }
1691         cfs_hash_bd_unlock(hs, &bd, 1);
1692
1693         return ret;
1694 }
1695
1696 static void
1697 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1698 {
1699         struct cfs_hash_bd       bd;
1700         struct cfs_hash         *hs = head->th_cli_hash;
1701         struct nrs_tbf_bucket   *bkt;
1702         int                      hw;
1703         struct list_head         zombies;
1704
1705         INIT_LIST_HEAD(&zombies);
1706         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1707         bkt = cfs_hash_bd_extra_get(hs, &bd);
1708         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1709                 return;
1710         LASSERT(list_empty(&cli->tc_lru));
1711         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1712
1713         /**
1714          * Check and purge the LRU, there is at least one client in the LRU.
1715          */
1716         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1717         while (cfs_hash_bd_count_get(&bd) > hw) {
1718                 if (unlikely(list_empty(&bkt->ntb_lru)))
1719                         break;
1720                 cli = list_entry(bkt->ntb_lru.next,
1721                                  struct nrs_tbf_client,
1722                                  tc_lru);
1723                 LASSERT(atomic_read(&cli->tc_ref) == 0);
1724                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1725                 list_move(&cli->tc_lru, &zombies);
1726         }
1727         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1728
1729         while (!list_empty(&zombies)) {
1730                 cli = container_of0(zombies.next,
1731                                     struct nrs_tbf_client, tc_lru);
1732                 list_del_init(&cli->tc_lru);
1733                 nrs_tbf_cli_fini(cli);
1734         }
1735 }
1736
1737 static void
1738 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1739                          struct ptlrpc_request *req)
1740 {
1741         char keystr[NRS_TBF_KEY_LEN];
1742
1743         nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1744 }
1745
1746 static void
1747 nrs_tbf_id_list_free(struct list_head *uid_list)
1748 {
1749         struct nrs_tbf_id *nti_id, *n;
1750
1751         list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1752                 list_del_init(&nti_id->nti_linkage);
1753                 OBD_FREE_PTR(nti_id);
1754         }
1755 }
1756
1757 static void
1758 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1759 {
1760         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1761                 expr->te_field < NRS_TBF_FIELD_MAX);
1762         switch (expr->te_field) {
1763         case NRS_TBF_FIELD_NID:
1764                 cfs_free_nidlist(&expr->te_cond);
1765                 break;
1766         case NRS_TBF_FIELD_JOBID:
1767                 nrs_tbf_jobid_list_free(&expr->te_cond);
1768                 break;
1769         case NRS_TBF_FIELD_OPCODE:
1770                 CFS_FREE_BITMAP(expr->te_opcodes);
1771                 break;
1772         case NRS_TBF_FIELD_UID:
1773         case NRS_TBF_FIELD_GID:
1774                 nrs_tbf_id_list_free(&expr->te_cond);
1775                 break;
1776         default:
1777                 LBUG();
1778         }
1779         OBD_FREE_PTR(expr);
1780 }
1781
1782 static void
1783 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1784 {
1785         struct nrs_tbf_expression *expression;
1786         struct nrs_tbf_expression *n;
1787
1788         LASSERT(list_empty(&conjunction->tc_linkage));
1789         list_for_each_entry_safe(expression, n,
1790                                  &conjunction->tc_expressions,
1791                                  te_linkage) {
1792                 list_del_init(&expression->te_linkage);
1793                 nrs_tbf_expression_free(expression);
1794         }
1795         OBD_FREE_PTR(conjunction);
1796 }
1797
1798 static void
1799 nrs_tbf_conds_free(struct list_head *cond_list)
1800 {
1801         struct nrs_tbf_conjunction *conjunction;
1802         struct nrs_tbf_conjunction *n;
1803
1804         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1805                 list_del_init(&conjunction->tc_linkage);
1806                 nrs_tbf_conjunction_free(conjunction);
1807         }
1808 }
1809
1810 static void
1811 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1812 {
1813         if (!list_empty(&cmd->u.tc_start.ts_conds))
1814                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1815         if (cmd->u.tc_start.ts_conds_str)
1816                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1817                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1818 }
1819
1820 #define NRS_TBF_DISJUNCTION_DELIM       (',')
1821 #define NRS_TBF_CONJUNCTION_DELIM       ('&')
1822 #define NRS_TBF_EXPRESSION_DELIM        ('=')
1823
1824 static inline bool
1825 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1826 {
1827         int len = strlen(str);
1828
1829         return (field->ls_len == len &&
1830                 strncmp(field->ls_str, str, len) == 0);
1831 }
1832
1833 static int
1834 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1835 static int
1836 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
1837                       enum nrs_tbf_flag tif);
1838
1839 static int
1840 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1841 {
1842         struct nrs_tbf_expression *expr;
1843         struct cfs_lstr field;
1844         int rc = 0;
1845
1846         OBD_ALLOC(expr, sizeof(struct nrs_tbf_expression));
1847         if (expr == NULL)
1848                 return -ENOMEM;
1849
1850         rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1851         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1852             src->ls_str[src->ls_len - 1] != '}')
1853                 GOTO(out, rc = -EINVAL);
1854
1855         /* Skip '{' and '}' */
1856         src->ls_str++;
1857         src->ls_len -= 2;
1858
1859         if (nrs_tbf_check_field(&field, "nid")) {
1860                 if (cfs_parse_nidlist(src->ls_str,
1861                                       src->ls_len,
1862                                       &expr->te_cond) <= 0)
1863                         GOTO(out, rc = -EINVAL);
1864                 expr->te_field = NRS_TBF_FIELD_NID;
1865         } else if (nrs_tbf_check_field(&field, "jobid")) {
1866                 if (nrs_tbf_jobid_list_parse(src->ls_str,
1867                                              src->ls_len,
1868                                              &expr->te_cond) < 0)
1869                         GOTO(out, rc = -EINVAL);
1870                 expr->te_field = NRS_TBF_FIELD_JOBID;
1871         } else if (nrs_tbf_check_field(&field, "opcode")) {
1872                 if (nrs_tbf_opcode_list_parse(src->ls_str,
1873                                               src->ls_len,
1874                                               &expr->te_opcodes) < 0)
1875                         GOTO(out, rc = -EINVAL);
1876                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1877         } else if (nrs_tbf_check_field(&field, "uid")) {
1878                 if (nrs_tbf_id_list_parse(src->ls_str,
1879                                           src->ls_len,
1880                                           &expr->te_cond,
1881                                           NRS_TBF_FLAG_UID) < 0)
1882                         GOTO(out, rc = -EINVAL);
1883                 expr->te_field = NRS_TBF_FIELD_UID;
1884         } else if (nrs_tbf_check_field(&field, "gid")) {
1885                 if (nrs_tbf_id_list_parse(src->ls_str,
1886                                           src->ls_len,
1887                                           &expr->te_cond,
1888                                           NRS_TBF_FLAG_GID) < 0)
1889                         GOTO(out, rc = -EINVAL);
1890                 expr->te_field = NRS_TBF_FIELD_GID;
1891         } else {
1892                 GOTO(out, rc = -EINVAL);
1893         }
1894
1895         list_add_tail(&expr->te_linkage, cond_list);
1896         return 0;
1897 out:
1898         OBD_FREE_PTR(expr);
1899         return rc;
1900 }
1901
1902 static int
1903 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1904 {
1905         struct nrs_tbf_conjunction *conjunction;
1906         struct cfs_lstr expr;
1907         int rc = 0;
1908
1909         OBD_ALLOC(conjunction, sizeof(struct nrs_tbf_conjunction));
1910         if (conjunction == NULL)
1911                 return -ENOMEM;
1912
1913         INIT_LIST_HEAD(&conjunction->tc_expressions);
1914         list_add_tail(&conjunction->tc_linkage, cond_list);
1915
1916         while (src->ls_str) {
1917                 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1918                 if (rc == 0) {
1919                         rc = -EINVAL;
1920                         break;
1921                 }
1922                 rc = nrs_tbf_expression_parse(&expr,
1923                                               &conjunction->tc_expressions);
1924                 if (rc)
1925                         break;
1926         }
1927         return rc;
1928 }
1929
1930 static int
1931 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1932 {
1933         struct cfs_lstr src;
1934         struct cfs_lstr res;
1935         int rc = 0;
1936
1937         src.ls_str = str;
1938         src.ls_len = len;
1939         INIT_LIST_HEAD(cond_list);
1940         while (src.ls_str) {
1941                 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1942                 if (rc == 0) {
1943                         rc = -EINVAL;
1944                         break;
1945                 }
1946                 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1947                 if (rc)
1948                         break;
1949         }
1950         return rc;
1951 }
1952
1953 static int
1954 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1955 {
1956         int rc;
1957
1958         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1959         if (cmd->u.tc_start.ts_conds_str == NULL)
1960                 return -ENOMEM;
1961
1962         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1963
1964         /* Parse hybird NID and JOBID conditions */
1965         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1966                                  strlen(cmd->u.tc_start.ts_conds_str),
1967                                  &cmd->u.tc_start.ts_conds);
1968         if (rc)
1969                 nrs_tbf_generic_cmd_fini(cmd);
1970
1971         return rc;
1972 }
1973
1974 static int
1975 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1976
1977 static int
1978 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1979                          struct nrs_tbf_rule *rule,
1980                          struct nrs_tbf_client *cli)
1981 {
1982         switch (expr->te_field) {
1983         case NRS_TBF_FIELD_NID:
1984                 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1985         case NRS_TBF_FIELD_JOBID:
1986                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1987         case NRS_TBF_FIELD_OPCODE:
1988                 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1989         case NRS_TBF_FIELD_UID:
1990         case NRS_TBF_FIELD_GID:
1991                 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1992         default:
1993                 return 0;
1994         }
1995 }
1996
1997 static int
1998 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1999                           struct nrs_tbf_rule *rule,
2000                           struct nrs_tbf_client *cli)
2001 {
2002         struct nrs_tbf_expression *expr;
2003         int matched;
2004
2005         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
2006                 matched = nrs_tbf_expression_match(expr, rule, cli);
2007                 if (!matched)
2008                         return 0;
2009         }
2010
2011         return 1;
2012 }
2013
2014 static int
2015 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
2016 {
2017         struct nrs_tbf_conjunction *conjunction;
2018         int matched;
2019
2020         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2021                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2022                 if (matched)
2023                         return 1;
2024         }
2025
2026         return 0;
2027 }
2028
2029 static void
2030 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2031 {
2032         if (!list_empty(&rule->tr_conds))
2033                 nrs_tbf_conds_free(&rule->tr_conds);
2034         LASSERT(rule->tr_conds_str != NULL);
2035         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2036 }
2037
2038 static int
2039 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2040                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2041 {
2042         int rc = 0;
2043
2044         LASSERT(start->u.tc_start.ts_conds_str);
2045         OBD_ALLOC(rule->tr_conds_str,
2046                   strlen(start->u.tc_start.ts_conds_str) + 1);
2047         if (rule->tr_conds_str == NULL)
2048                 return -ENOMEM;
2049
2050         memcpy(rule->tr_conds_str,
2051                start->u.tc_start.ts_conds_str,
2052                strlen(start->u.tc_start.ts_conds_str));
2053
2054         INIT_LIST_HEAD(&rule->tr_conds);
2055         if (!list_empty(&start->u.tc_start.ts_conds)) {
2056                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2057                                          strlen(rule->tr_conds_str),
2058                                          &rule->tr_conds);
2059         }
2060         if (rc)
2061                 nrs_tbf_generic_rule_fini(rule);
2062
2063         return rc;
2064 }
2065
2066 static int
2067 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2068 {
2069         seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
2070                    rule->tr_conds_str, rule->tr_rpc_rate,
2071                    atomic_read(&rule->tr_ref) - 1);
2072         return 0;
2073 }
2074
2075 static int
2076 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2077                            struct nrs_tbf_client *cli)
2078 {
2079         return nrs_tbf_cond_match(rule, cli);
2080 }
2081
2082 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2083         .o_name = NRS_TBF_TYPE_GENERIC,
2084         .o_startup = nrs_tbf_startup,
2085         .o_cli_find = nrs_tbf_cli_find,
2086         .o_cli_findadd = nrs_tbf_cli_findadd,
2087         .o_cli_put = nrs_tbf_cli_put,
2088         .o_cli_init = nrs_tbf_generic_cli_init,
2089         .o_rule_init = nrs_tbf_rule_init,
2090         .o_rule_dump = nrs_tbf_generic_rule_dump,
2091         .o_rule_match = nrs_tbf_generic_rule_match,
2092         .o_rule_fini = nrs_tbf_generic_rule_fini,
2093 };
2094
2095 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2096 {
2097         if (rule->tr_opcodes != NULL)
2098                 CFS_FREE_BITMAP(rule->tr_opcodes);
2099
2100         LASSERT(rule->tr_opcodes_str != NULL);
2101         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2102 }
2103
2104 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2105                                         unsigned mask)
2106 {
2107         return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2108 }
2109
2110 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2111 {
2112         const __u32     *opc = key;
2113         struct nrs_tbf_client *cli = hlist_entry(hnode,
2114                                                  struct nrs_tbf_client,
2115                                                  tc_hnode);
2116
2117         return *opc == cli->tc_opcode;
2118 }
2119
2120 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2121 {
2122         struct nrs_tbf_client *cli = hlist_entry(hnode,
2123                                                  struct nrs_tbf_client,
2124                                                  tc_hnode);
2125
2126         return &cli->tc_opcode;
2127 }
2128
2129 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2130                                    struct hlist_node *hnode)
2131 {
2132         struct nrs_tbf_client *cli = hlist_entry(hnode,
2133                                                  struct nrs_tbf_client,
2134                                                  tc_hnode);
2135
2136         atomic_inc(&cli->tc_ref);
2137 }
2138
2139 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2140                                    struct hlist_node *hnode)
2141 {
2142         struct nrs_tbf_client *cli = hlist_entry(hnode,
2143                                                  struct nrs_tbf_client,
2144                                                  tc_hnode);
2145
2146         atomic_dec(&cli->tc_ref);
2147 }
2148
2149 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2150                                     struct hlist_node *hnode)
2151 {
2152         struct nrs_tbf_client *cli = hlist_entry(hnode,
2153                                                  struct nrs_tbf_client,
2154                                                  tc_hnode);
2155
2156         LASSERTF(atomic_read(&cli->tc_ref) == 0,
2157                  "Busy TBF object from client with opcode %s, with %d refs\n",
2158                  ll_opcode2str(cli->tc_opcode),
2159                  atomic_read(&cli->tc_ref));
2160
2161         nrs_tbf_cli_fini(cli);
2162 }
2163 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2164         .hs_hash        = nrs_tbf_opcode_hop_hash,
2165         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
2166         .hs_key         = nrs_tbf_opcode_hop_key,
2167         .hs_object      = nrs_tbf_hop_object,
2168         .hs_get         = nrs_tbf_opcode_hop_get,
2169         .hs_put         = nrs_tbf_opcode_hop_put,
2170         .hs_put_locked  = nrs_tbf_opcode_hop_put,
2171         .hs_exit        = nrs_tbf_opcode_hop_exit,
2172 };
2173
2174 static int
2175 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2176                     struct nrs_tbf_head *head)
2177 {
2178         struct nrs_tbf_cmd      start = { 0 };
2179         int rc;
2180
2181         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2182                                             NRS_TBF_NID_BITS,
2183                                             NRS_TBF_NID_BITS,
2184                                             NRS_TBF_NID_BKT_BITS, 0,
2185                                             CFS_HASH_MIN_THETA,
2186                                             CFS_HASH_MAX_THETA,
2187                                             &nrs_tbf_opcode_hash_ops,
2188                                             CFS_HASH_RW_BKTLOCK);
2189         if (head->th_cli_hash == NULL)
2190                 return -ENOMEM;
2191
2192         start.u.tc_start.ts_opcodes = NULL;
2193         start.u.tc_start.ts_opcodes_str = "*";
2194
2195         start.u.tc_start.ts_rpc_rate = tbf_rate;
2196         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2197         start.tc_name = NRS_TBF_DEFAULT_RULE;
2198         rc = nrs_tbf_rule_start(policy, head, &start);
2199
2200         return rc;
2201 }
2202
2203 static struct nrs_tbf_client *
2204 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2205                         struct ptlrpc_request *req)
2206 {
2207         __u32 opc;
2208
2209         opc = lustre_msg_get_opc(req->rq_reqmsg);
2210         return cfs_hash_lookup(head->th_cli_hash, &opc);
2211 }
2212
2213 static struct nrs_tbf_client *
2214 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2215                            struct nrs_tbf_client *cli)
2216 {
2217         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2218                                        &cli->tc_hnode);
2219 }
2220
2221 static void
2222 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2223                         struct ptlrpc_request *req)
2224 {
2225         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2226 }
2227
2228 #define MAX_OPCODE_LEN  32
2229 static int
2230 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
2231 {
2232         int     op = 0;
2233         char    opcode_str[MAX_OPCODE_LEN];
2234
2235         if (id->ls_len + 1 > MAX_OPCODE_LEN)
2236                 return -EINVAL;
2237
2238         memcpy(opcode_str, id->ls_str, id->ls_len);
2239         opcode_str[id->ls_len] = '\0';
2240
2241         op = ll_str2opcode(opcode_str);
2242         if (op < 0)
2243                 return -EINVAL;
2244
2245         cfs_bitmap_set(opcodes, op);
2246         return 0;
2247 }
2248
2249 static int
2250 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
2251 {
2252         struct cfs_bitmap *opcodes;
2253         struct cfs_lstr src;
2254         struct cfs_lstr res;
2255         int rc = 0;
2256         ENTRY;
2257
2258         opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
2259         if (opcodes == NULL)
2260                 return -ENOMEM;
2261
2262         src.ls_str = str;
2263         src.ls_len = len;
2264         while (src.ls_str) {
2265                 rc = cfs_gettok(&src, ' ', &res);
2266                 if (rc == 0) {
2267                         rc = -EINVAL;
2268                         break;
2269                 }
2270                 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2271                 if (rc)
2272                         break;
2273         }
2274
2275         if (rc == 0)
2276                 *bitmaptr = opcodes;
2277         else
2278                 CFS_FREE_BITMAP(opcodes);
2279
2280         RETURN(rc);
2281 }
2282
2283 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2284 {
2285         if (cmd->u.tc_start.ts_opcodes)
2286                 CFS_FREE_BITMAP(cmd->u.tc_start.ts_opcodes);
2287
2288         if (cmd->u.tc_start.ts_opcodes_str)
2289                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2290                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2291
2292 }
2293
2294 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2295 {
2296         struct cfs_lstr src;
2297         int rc;
2298
2299         src.ls_str = id;
2300         src.ls_len = strlen(id);
2301         rc = nrs_tbf_check_id_value(&src, "opcode");
2302         if (rc)
2303                 return rc;
2304
2305         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2306         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2307                 return -ENOMEM;
2308
2309         memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2310
2311         /* parse opcode list */
2312         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2313                                        strlen(cmd->u.tc_start.ts_opcodes_str),
2314                                        &cmd->u.tc_start.ts_opcodes);
2315         if (rc)
2316                 nrs_tbf_opcode_cmd_fini(cmd);
2317
2318         return rc;
2319 }
2320
2321 static int
2322 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2323                           struct nrs_tbf_client *cli)
2324 {
2325         if (rule->tr_opcodes == NULL)
2326                 return 0;
2327
2328         return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2329 }
2330
2331 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2332                                     struct nrs_tbf_rule *rule,
2333                                     struct nrs_tbf_cmd *start)
2334 {
2335         int rc = 0;
2336
2337         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2338         OBD_ALLOC(rule->tr_opcodes_str,
2339                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2340         if (rule->tr_opcodes_str == NULL)
2341                 return -ENOMEM;
2342
2343         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2344                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2345
2346         /* Default rule '*' */
2347         if (start->u.tc_start.ts_opcodes == NULL)
2348                 return 0;
2349
2350         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2351                                        strlen(rule->tr_opcodes_str),
2352                                        &rule->tr_opcodes);
2353         if (rc)
2354                 OBD_FREE(rule->tr_opcodes_str,
2355                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2356
2357         return rc;
2358 }
2359
2360 static int
2361 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2362 {
2363         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2364                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2365                    atomic_read(&rule->tr_ref) - 1);
2366         return 0;
2367 }
2368
2369
2370 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2371         .o_name = NRS_TBF_TYPE_OPCODE,
2372         .o_startup = nrs_tbf_opcode_startup,
2373         .o_cli_find = nrs_tbf_opcode_cli_find,
2374         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2375         .o_cli_put = nrs_tbf_nid_cli_put,
2376         .o_cli_init = nrs_tbf_opcode_cli_init,
2377         .o_rule_init = nrs_tbf_opcode_rule_init,
2378         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2379         .o_rule_match = nrs_tbf_opcode_rule_match,
2380         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2381 };
2382
2383 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2384                                     unsigned mask)
2385 {
2386         return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2387 }
2388
2389 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2390 {
2391         const struct tbf_id *opc = key;
2392         enum nrs_tbf_flag ntf;
2393         struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2394                                                  tc_hnode);
2395         ntf = opc->ti_type & cli->tc_id.ti_type;
2396         if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2397                 return 0;
2398
2399         if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2400                 return 0;
2401
2402         return 1;
2403 }
2404
2405 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2406 {
2407         struct nrs_tbf_client *cli = hlist_entry(hnode,
2408                                                  struct nrs_tbf_client,
2409                                                  tc_hnode);
2410         return &cli->tc_id;
2411 }
2412
2413 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2414 {
2415         struct nrs_tbf_client *cli = hlist_entry(hnode,
2416                                                  struct nrs_tbf_client,
2417                                                  tc_hnode);
2418
2419         atomic_inc(&cli->tc_ref);
2420 }
2421
2422 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2423 {
2424         struct nrs_tbf_client *cli = hlist_entry(hnode,
2425                                                  struct nrs_tbf_client,
2426                                                  tc_hnode);
2427
2428         atomic_dec(&cli->tc_ref);
2429 }
2430
2431 static void
2432 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2433
2434 {
2435         struct nrs_tbf_client *cli = hlist_entry(hnode,
2436                                                  struct nrs_tbf_client,
2437                                                  tc_hnode);
2438
2439         LASSERT(atomic_read(&cli->tc_ref) == 0);
2440         nrs_tbf_cli_fini(cli);
2441 }
2442
2443 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2444         .hs_hash        = nrs_tbf_id_hop_hash,
2445         .hs_keycmp      = nrs_tbf_id_hop_keycmp,
2446         .hs_key         = nrs_tbf_id_hop_key,
2447         .hs_object      = nrs_tbf_hop_object,
2448         .hs_get         = nrs_tbf_id_hop_get,
2449         .hs_put         = nrs_tbf_id_hop_put,
2450         .hs_put_locked  = nrs_tbf_id_hop_put,
2451         .hs_exit        = nrs_tbf_id_hop_exit,
2452 };
2453
2454 static int
2455 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2456                    struct nrs_tbf_head *head)
2457 {
2458         struct nrs_tbf_cmd start;
2459         int rc;
2460
2461         head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2462                                             NRS_TBF_NID_BITS,
2463                                             NRS_TBF_NID_BITS,
2464                                             NRS_TBF_NID_BKT_BITS, 0,
2465                                             CFS_HASH_MIN_THETA,
2466                                             CFS_HASH_MAX_THETA,
2467                                             &nrs_tbf_id_hash_ops,
2468                                             CFS_HASH_RW_BKTLOCK);
2469         if (head->th_cli_hash == NULL)
2470                 return -ENOMEM;
2471
2472         memset(&start, 0, sizeof(start));
2473         start.u.tc_start.ts_ids_str = "*";
2474         start.u.tc_start.ts_rpc_rate = tbf_rate;
2475         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2476         start.tc_name = NRS_TBF_DEFAULT_RULE;
2477         INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2478         rc = nrs_tbf_rule_start(policy, head, &start);
2479         if (rc) {
2480                 cfs_hash_putref(head->th_cli_hash);
2481                 head->th_cli_hash = NULL;
2482         }
2483
2484         return rc;
2485 }
2486
2487 static struct nrs_tbf_client *
2488 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2489                     struct ptlrpc_request *req)
2490 {
2491         struct tbf_id id;
2492
2493         LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2494                 head->th_type_flag == NRS_TBF_FLAG_GID);
2495
2496         nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2497         return cfs_hash_lookup(head->th_cli_hash, &id);
2498 }
2499
2500 static struct nrs_tbf_client *
2501 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2502                        struct nrs_tbf_client *cli)
2503 {
2504         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2505                                        &cli->tc_hnode);
2506 }
2507
2508 static void
2509 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2510                      struct ptlrpc_request *req)
2511 {
2512         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2513 }
2514
2515 static void
2516 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2517                      struct ptlrpc_request *req)
2518 {
2519         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2520 }
2521
2522 static int
2523 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2524 {
2525         struct nrs_tbf_id *nti_id;
2526         enum nrs_tbf_flag flag;
2527
2528         list_for_each_entry(nti_id, id_list, nti_linkage) {
2529                 flag = id.ti_type & nti_id->nti_id.ti_type;
2530                 if (!flag)
2531                         continue;
2532
2533                 if ((flag & NRS_TBF_FLAG_UID) &&
2534                     (id.ti_uid != nti_id->nti_id.ti_uid))
2535                         continue;
2536
2537                 if ((flag & NRS_TBF_FLAG_GID) &&
2538                     (id.ti_gid != nti_id->nti_id.ti_gid))
2539                         continue;
2540
2541                 return 1;
2542         }
2543         return 0;
2544 }
2545
2546 static int
2547 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2548                       struct nrs_tbf_client *cli)
2549 {
2550         return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2551 }
2552
2553 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2554 {
2555         nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2556
2557         if (cmd->u.tc_start.ts_ids_str)
2558                 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2559                          strlen(cmd->u.tc_start.ts_ids_str) + 1);
2560 }
2561
2562 static int
2563 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
2564                       enum nrs_tbf_flag tif)
2565 {
2566         struct cfs_lstr src;
2567         struct cfs_lstr res;
2568         int rc = 0;
2569         struct tbf_id id = { 0 };
2570         ENTRY;
2571
2572         if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2573                 RETURN(-EINVAL);
2574
2575         src.ls_str = str;
2576         src.ls_len = len;
2577         INIT_LIST_HEAD(id_list);
2578         while (src.ls_str) {
2579                 struct nrs_tbf_id *nti_id;
2580
2581                 if (cfs_gettok(&src, ' ', &res) == 0)
2582                         GOTO(out, rc = -EINVAL);
2583
2584                 id.ti_type = tif;
2585                 if (tif == NRS_TBF_FLAG_UID) {
2586                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2587                                                &id.ti_uid, 0, (u32)~0U))
2588                                 GOTO(out, rc = -EINVAL);
2589                 } else {
2590                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2591                                                &id.ti_gid, 0, (u32)~0U))
2592                                 GOTO(out, rc = -EINVAL);
2593                 }
2594
2595                 OBD_ALLOC_PTR(nti_id);
2596                 if (nti_id == NULL)
2597                         GOTO(out, rc = -ENOMEM);
2598
2599                 nti_id->nti_id = id;
2600                 list_add_tail(&nti_id->nti_linkage, id_list);
2601         }
2602 out:
2603         if (rc)
2604                 nrs_tbf_id_list_free(id_list);
2605         RETURN(rc);
2606 }
2607
2608 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2609 {
2610         struct cfs_lstr src;
2611         int rc;
2612         enum nrs_tbf_flag tif;
2613
2614         tif = cmd->u.tc_start.ts_valid_type;
2615
2616         src.ls_str = id;
2617         src.ls_len = strlen(id);
2618
2619         rc = nrs_tbf_check_id_value(&src,
2620                                     tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2621         if (rc)
2622                 return rc;
2623
2624         OBD_ALLOC(cmd->u.tc_start.ts_ids_str, src.ls_len + 1);
2625         if (cmd->u.tc_start.ts_ids_str == NULL)
2626                 return -ENOMEM;
2627
2628         strlcpy(cmd->u.tc_start.ts_ids_str, src.ls_str, src.ls_len + 1);
2629
2630         rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2631                                    strlen(cmd->u.tc_start.ts_ids_str),
2632                                    &cmd->u.tc_start.ts_ids, tif);
2633         if (rc)
2634                 nrs_tbf_id_cmd_fini(cmd);
2635
2636         return rc;
2637 }
2638
2639 static int
2640 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2641                      struct nrs_tbf_rule *rule,
2642                      struct nrs_tbf_cmd *start)
2643 {
2644         struct nrs_tbf_head *head = rule->tr_head;
2645         int rc = 0;
2646         enum nrs_tbf_flag tif = head->th_type_flag;
2647         int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2648
2649         LASSERT(start->u.tc_start.ts_ids_str);
2650         INIT_LIST_HEAD(&rule->tr_ids);
2651
2652         OBD_ALLOC(rule->tr_ids_str, ids_len);
2653         if (rule->tr_ids_str == NULL)
2654                 return -ENOMEM;
2655
2656         strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2657                 ids_len);
2658
2659         if (!list_empty(&start->u.tc_start.ts_ids)) {
2660                 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2661                                            strlen(rule->tr_ids_str),
2662                                            &rule->tr_ids, tif);
2663                 if (rc)
2664                         CERROR("%ss {%s} illegal\n",
2665                                tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2666                                rule->tr_ids_str);
2667         }
2668         if (rc) {
2669                 OBD_FREE(rule->tr_ids_str, ids_len);
2670                 rule->tr_ids_str = NULL;
2671         }
2672         return rc;
2673 }
2674
2675 static int
2676 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2677 {
2678         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2679                    rule->tr_ids_str, rule->tr_rpc_rate,
2680                    atomic_read(&rule->tr_ref) - 1);
2681         return 0;
2682 }
2683
2684 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2685 {
2686         nrs_tbf_id_list_free(&rule->tr_ids);
2687         if (rule->tr_ids_str != NULL)
2688                 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2689 }
2690
2691 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2692         .o_name = NRS_TBF_TYPE_UID,
2693         .o_startup = nrs_tbf_id_startup,
2694         .o_cli_find = nrs_tbf_id_cli_find,
2695         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2696         .o_cli_put = nrs_tbf_nid_cli_put,
2697         .o_cli_init = nrs_tbf_uid_cli_init,
2698         .o_rule_init = nrs_tbf_id_rule_init,
2699         .o_rule_dump = nrs_tbf_id_rule_dump,
2700         .o_rule_match = nrs_tbf_id_rule_match,
2701         .o_rule_fini = nrs_tbf_id_rule_fini,
2702 };
2703
2704 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2705         .o_name = NRS_TBF_TYPE_GID,
2706         .o_startup = nrs_tbf_id_startup,
2707         .o_cli_find = nrs_tbf_id_cli_find,
2708         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2709         .o_cli_put = nrs_tbf_nid_cli_put,
2710         .o_cli_init = nrs_tbf_gid_cli_init,
2711         .o_rule_init = nrs_tbf_id_rule_init,
2712         .o_rule_dump = nrs_tbf_id_rule_dump,
2713         .o_rule_match = nrs_tbf_id_rule_match,
2714         .o_rule_fini = nrs_tbf_id_rule_fini,
2715 };
2716
2717 static struct nrs_tbf_type nrs_tbf_types[] = {
2718         {
2719                 .ntt_name = NRS_TBF_TYPE_JOBID,
2720                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2721                 .ntt_ops = &nrs_tbf_jobid_ops,
2722         },
2723         {
2724                 .ntt_name = NRS_TBF_TYPE_NID,
2725                 .ntt_flag = NRS_TBF_FLAG_NID,
2726                 .ntt_ops = &nrs_tbf_nid_ops,
2727         },
2728         {
2729                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2730                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2731                 .ntt_ops = &nrs_tbf_opcode_ops,
2732         },
2733         {
2734                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2735                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2736                 .ntt_ops = &nrs_tbf_generic_ops,
2737         },
2738         {
2739                 .ntt_name = NRS_TBF_TYPE_UID,
2740                 .ntt_flag = NRS_TBF_FLAG_UID,
2741                 .ntt_ops = &nrs_tbf_uid_ops,
2742         },
2743         {
2744                 .ntt_name = NRS_TBF_TYPE_GID,
2745                 .ntt_flag = NRS_TBF_FLAG_GID,
2746                 .ntt_ops = &nrs_tbf_gid_ops,
2747         },
2748 };
2749
2750 /**
2751  * Is called before the policy transitions into
2752  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2753  * policy-specific private data structure.
2754  *
2755  * \param[in] policy The policy to start
2756  *
2757  * \retval -ENOMEM OOM error
2758  * \retval  0      success
2759  *
2760  * \see nrs_policy_register()
2761  * \see nrs_policy_ctl()
2762  */
2763 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2764 {
2765         struct nrs_tbf_head     *head;
2766         struct nrs_tbf_ops      *ops;
2767         __u32                    type;
2768         char                    *name;
2769         int found = 0;
2770         int i;
2771         int rc = 0;
2772
2773         if (arg == NULL)
2774                 name = NRS_TBF_TYPE_GENERIC;
2775         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2776                 name = arg;
2777         else
2778                 GOTO(out, rc = -EINVAL);
2779
2780         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2781                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2782                         ops = nrs_tbf_types[i].ntt_ops;
2783                         type = nrs_tbf_types[i].ntt_flag;
2784                         found = 1;
2785                         break;
2786                 }
2787         }
2788         if (found == 0)
2789                 GOTO(out, rc = -ENOTSUPP);
2790
2791         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2792         if (head == NULL)
2793                 GOTO(out, rc = -ENOMEM);
2794
2795         memcpy(head->th_type, name, strlen(name));
2796         head->th_type[strlen(name)] = '\0';
2797         head->th_ops = ops;
2798         head->th_type_flag = type;
2799
2800         head->th_binheap = cfs_binheap_create(&nrs_tbf_heap_ops,
2801                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2802                                               nrs_pol2cptab(policy),
2803                                               nrs_pol2cptid(policy));
2804         if (head->th_binheap == NULL)
2805                 GOTO(out_free_head, rc = -ENOMEM);
2806
2807         atomic_set(&head->th_rule_sequence, 0);
2808         spin_lock_init(&head->th_rule_lock);
2809         INIT_LIST_HEAD(&head->th_list);
2810         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2811         head->th_timer.function = nrs_tbf_timer_cb;
2812         rc = head->th_ops->o_startup(policy, head);
2813         if (rc)
2814                 GOTO(out_free_heap, rc);
2815
2816         policy->pol_private = head;
2817         return 0;
2818 out_free_heap:
2819         cfs_binheap_destroy(head->th_binheap);
2820 out_free_head:
2821         OBD_FREE_PTR(head);
2822 out:
2823         return rc;
2824 }
2825
2826 /**
2827  * Is called before the policy transitions into
2828  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2829  * private data structure.
2830  *
2831  * \param[in] policy The policy to stop
2832  *
2833  * \see nrs_policy_stop0()
2834  */
2835 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2836 {
2837         struct nrs_tbf_head *head = policy->pol_private;
2838         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2839         struct nrs_tbf_rule *rule, *n;
2840
2841         LASSERT(head != NULL);
2842         LASSERT(head->th_cli_hash != NULL);
2843         hrtimer_cancel(&head->th_timer);
2844         /* Should cleanup hash first before free rules */
2845         cfs_hash_putref(head->th_cli_hash);
2846         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2847                 list_del_init(&rule->tr_linkage);
2848                 nrs_tbf_rule_put(rule);
2849         }
2850         LASSERT(list_empty(&head->th_list));
2851         LASSERT(head->th_binheap != NULL);
2852         LASSERT(cfs_binheap_is_empty(head->th_binheap));
2853         cfs_binheap_destroy(head->th_binheap);
2854         OBD_FREE_PTR(head);
2855         nrs->nrs_throttling = 0;
2856         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2857 }
2858
2859 /**
2860  * Performs a policy-specific ctl function on TBF policy instances; similar
2861  * to ioctl.
2862  *
2863  * \param[in]     policy the policy instance
2864  * \param[in]     opc    the opcode
2865  * \param[in,out] arg    used for passing parameters and information
2866  *
2867  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2868  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2869  *
2870  * \retval 0   operation carried out successfully
2871  * \retval -ve error
2872  */
2873 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2874                        enum ptlrpc_nrs_ctl opc,
2875                        void *arg)
2876 {
2877         int rc = 0;
2878         ENTRY;
2879
2880         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2881
2882         switch ((enum nrs_ctl_tbf)opc) {
2883         default:
2884                 RETURN(-EINVAL);
2885
2886         /**
2887          * Read RPC rate size of a policy instance.
2888          */
2889         case NRS_CTL_TBF_RD_RULE: {
2890                 struct nrs_tbf_head *head = policy->pol_private;
2891                 struct seq_file *m = (struct seq_file *) arg;
2892                 struct ptlrpc_service_part *svcpt;
2893
2894                 svcpt = policy->pol_nrs->nrs_svcpt;
2895                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2896
2897                 rc = nrs_tbf_rule_dump_all(head, m);
2898                 }
2899                 break;
2900
2901         /**
2902          * Write RPC rate of a policy instance.
2903          */
2904         case NRS_CTL_TBF_WR_RULE: {
2905                 struct nrs_tbf_head *head = policy->pol_private;
2906                 struct nrs_tbf_cmd *cmd;
2907
2908                 cmd = (struct nrs_tbf_cmd *)arg;
2909                 rc = nrs_tbf_command(policy,
2910                                      head,
2911                                      cmd);
2912                 }
2913                 break;
2914         /**
2915          * Read the TBF policy type of a policy instance.
2916          */
2917         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2918                 struct nrs_tbf_head *head = policy->pol_private;
2919
2920                 *(__u32 *)arg = head->th_type_flag;
2921                 }
2922                 break;
2923         }
2924
2925         RETURN(rc);
2926 }
2927
2928 /**
2929  * Is called for obtaining a TBF policy resource.
2930  *
2931  * \param[in]  policy     The policy on which the request is being asked for
2932  * \param[in]  nrq        The request for which resources are being taken
2933  * \param[in]  parent     Parent resource, unused in this policy
2934  * \param[out] resp       Resources references are placed in this array
2935  * \param[in]  moving_req Signifies limited caller context; unused in this
2936  *                        policy
2937  *
2938  *
2939  * \see nrs_resource_get_safe()
2940  */
2941 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2942                            struct ptlrpc_nrs_request *nrq,
2943                            const struct ptlrpc_nrs_resource *parent,
2944                            struct ptlrpc_nrs_resource **resp,
2945                            bool moving_req)
2946 {
2947         struct nrs_tbf_head   *head;
2948         struct nrs_tbf_client *cli;
2949         struct nrs_tbf_client *tmp;
2950         struct ptlrpc_request *req;
2951
2952         if (parent == NULL) {
2953                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2954                 return 0;
2955         }
2956
2957         head = container_of(parent, struct nrs_tbf_head, th_res);
2958         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2959         cli = head->th_ops->o_cli_find(head, req);
2960         if (cli != NULL) {
2961                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2962                 LASSERT(cli->tc_rule);
2963                 if (cli->tc_rule_sequence !=
2964                     atomic_read(&head->th_rule_sequence) ||
2965                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2966                         struct nrs_tbf_rule *rule;
2967
2968                         CDEBUG(D_RPCTRACE,
2969                                "TBF class@%p rate %llu sequence %d, "
2970                                "rule flags %d, head sequence %d\n",
2971                                cli, cli->tc_rpc_rate,
2972                                cli->tc_rule_sequence,
2973                                cli->tc_rule->tr_flags,
2974                                atomic_read(&head->th_rule_sequence));
2975                         rule = nrs_tbf_rule_match(head, cli);
2976                         if (rule != cli->tc_rule) {
2977                                 nrs_tbf_cli_reset(head, rule, cli);
2978                         } else {
2979                                 if (cli->tc_rule_generation != rule->tr_generation)
2980                                         nrs_tbf_cli_reset_value(head, cli);
2981                                 nrs_tbf_rule_put(rule);
2982                         }
2983                 } else if (cli->tc_rule_generation !=
2984                            cli->tc_rule->tr_generation) {
2985                         nrs_tbf_cli_reset_value(head, cli);
2986                 }
2987                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2988                 goto out;
2989         }
2990
2991         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2992                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2993         if (cli == NULL)
2994                 return -ENOMEM;
2995
2996         nrs_tbf_cli_init(head, cli, req);
2997         tmp = head->th_ops->o_cli_findadd(head, cli);
2998         if (tmp != cli) {
2999                 atomic_dec(&cli->tc_ref);
3000                 nrs_tbf_cli_fini(cli);
3001                 cli = tmp;
3002         }
3003 out:
3004         *resp = &cli->tc_res;
3005
3006         return 1;
3007 }
3008
3009 /**
3010  * Called when releasing references to the resource hierachy obtained for a
3011  * request for scheduling using the TBF policy.
3012  *
3013  * \param[in] policy   the policy the resource belongs to
3014  * \param[in] res      the resource to be released
3015  */
3016 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
3017                             const struct ptlrpc_nrs_resource *res)
3018 {
3019         struct nrs_tbf_head   *head;
3020         struct nrs_tbf_client *cli;
3021
3022         /**
3023          * Do nothing for freeing parent, nrs_tbf_net resources
3024          */
3025         if (res->res_parent == NULL)
3026                 return;
3027
3028         cli = container_of(res, struct nrs_tbf_client, tc_res);
3029         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
3030
3031         head->th_ops->o_cli_put(head, cli);
3032 }
3033
3034 /**
3035  * Called when getting a request from the TBF policy for handling, or just
3036  * peeking; removes the request from the policy when it is to be handled.
3037  *
3038  * \param[in] policy The policy
3039  * \param[in] peek   When set, signifies that we just want to examine the
3040  *                   request, and not handle it, so the request is not removed
3041  *                   from the policy.
3042  * \param[in] force  Force the policy to return a request; unused in this
3043  *                   policy
3044  *
3045  * \retval The request to be handled; this is the next request in the TBF
3046  *         rule
3047  *
3048  * \see ptlrpc_nrs_req_get_nolock()
3049  * \see nrs_request_get()
3050  */
3051 static
3052 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3053                                            bool peek, bool force)
3054 {
3055         struct nrs_tbf_head       *head = policy->pol_private;
3056         struct ptlrpc_nrs_request *nrq = NULL;
3057         struct nrs_tbf_client     *cli;
3058         struct cfs_binheap_node   *node;
3059
3060         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3061
3062         if (!peek && policy->pol_nrs->nrs_throttling)
3063                 return NULL;
3064
3065         node = cfs_binheap_root(head->th_binheap);
3066         if (unlikely(node == NULL))
3067                 return NULL;
3068
3069         cli = container_of(node, struct nrs_tbf_client, tc_node);
3070         LASSERT(cli->tc_in_heap);
3071         if (peek) {
3072                 nrq = list_entry(cli->tc_list.next,
3073                                      struct ptlrpc_nrs_request,
3074                                      nr_u.tbf.tr_list);
3075         } else {
3076                 struct nrs_tbf_rule *rule = cli->tc_rule;
3077                 __u64 now = ktime_to_ns(ktime_get());
3078                 __u64 passed;
3079                 __u64 ntoken;
3080                 __u64 deadline;
3081                 __u64 old_resid = 0;
3082
3083                 deadline = cli->tc_check_time +
3084                           cli->tc_nsecs;
3085                 LASSERT(now >= cli->tc_check_time);
3086                 passed = now - cli->tc_check_time;
3087                 ntoken = passed * cli->tc_rpc_rate;
3088                 do_div(ntoken, NSEC_PER_SEC);
3089
3090                 ntoken += cli->tc_ntoken;
3091                 if (rule->tr_flags & NTRS_REALTIME) {
3092                         LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3093                         old_resid = cli->tc_nsecs_resid;
3094                         cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3095                         if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3096                                 ntoken++;
3097                                 cli->tc_nsecs_resid -= cli->tc_nsecs;
3098                         }
3099                 } else if (ntoken > cli->tc_depth)
3100                         ntoken = cli->tc_depth;
3101
3102                 if (ntoken > 0) {
3103                         struct ptlrpc_request *req;
3104                         nrq = list_entry(cli->tc_list.next,
3105                                              struct ptlrpc_nrs_request,
3106                                              nr_u.tbf.tr_list);
3107                         req = container_of(nrq,
3108                                            struct ptlrpc_request,
3109                                            rq_nrq);
3110                         ntoken--;
3111                         cli->tc_ntoken = ntoken;
3112                         cli->tc_check_time = now;
3113                         list_del_init(&nrq->nr_u.tbf.tr_list);
3114                         if (list_empty(&cli->tc_list)) {
3115                                 cfs_binheap_remove(head->th_binheap,
3116                                                    &cli->tc_node);
3117                                 cli->tc_in_heap = false;
3118                         } else {
3119                                 if (!(rule->tr_flags & NTRS_REALTIME))
3120                                         cli->tc_deadline = now + cli->tc_nsecs;
3121                                 cfs_binheap_relocate(head->th_binheap,
3122                                                      &cli->tc_node);
3123                         }
3124                         CDEBUG(D_RPCTRACE,
3125                                "TBF dequeues: class@%p rate %llu gen %llu "
3126                                "token %llu, rule@%p rate %llu gen %llu\n",
3127                                cli, cli->tc_rpc_rate,
3128                                cli->tc_rule_generation, cli->tc_ntoken,
3129                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3130                                cli->tc_rule->tr_generation);
3131                 } else {
3132                         ktime_t time;
3133
3134                         if (rule->tr_flags & NTRS_REALTIME) {
3135                                 cli->tc_deadline = deadline;
3136                                 cli->tc_nsecs_resid = old_resid;
3137                                 cfs_binheap_relocate(head->th_binheap,
3138                                                      &cli->tc_node);
3139                                 if (node != cfs_binheap_root(head->th_binheap))
3140                                         return nrs_tbf_req_get(policy,
3141                                                                peek, force);
3142                         }
3143                         policy->pol_nrs->nrs_throttling = 1;
3144                         head->th_deadline = deadline;
3145                         time = ktime_set(0, 0);
3146                         time = ktime_add_ns(time, deadline);
3147                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3148                 }
3149         }
3150
3151         return nrq;
3152 }
3153
3154 /**
3155  * Adds request \a nrq to \a policy's list of queued requests
3156  *
3157  * \param[in] policy The policy
3158  * \param[in] nrq    The request to add
3159  *
3160  * \retval 0 success; nrs_request_enqueue() assumes this function will always
3161  *                    succeed
3162  */
3163 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3164                            struct ptlrpc_nrs_request *nrq)
3165 {
3166         struct nrs_tbf_head   *head;
3167         struct nrs_tbf_client *cli;
3168         int                    rc = 0;
3169
3170         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3171
3172         cli = container_of(nrs_request_resource(nrq),
3173                            struct nrs_tbf_client, tc_res);
3174         head = container_of(nrs_request_resource(nrq)->res_parent,
3175                             struct nrs_tbf_head, th_res);
3176         if (list_empty(&cli->tc_list)) {
3177                 LASSERT(!cli->tc_in_heap);
3178                 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3179                 rc = cfs_binheap_insert(head->th_binheap, &cli->tc_node);
3180                 if (rc == 0) {
3181                         cli->tc_in_heap = true;
3182                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3183                         list_add_tail(&nrq->nr_u.tbf.tr_list,
3184                                           &cli->tc_list);
3185                         if (policy->pol_nrs->nrs_throttling) {
3186                                 __u64 deadline = cli->tc_deadline;
3187                                 if ((head->th_deadline > deadline) &&
3188                                     (hrtimer_try_to_cancel(&head->th_timer)
3189                                      >= 0)) {
3190                                         ktime_t time;
3191                                         head->th_deadline = deadline;
3192                                         time = ktime_set(0, 0);
3193                                         time = ktime_add_ns(time, deadline);
3194                                         hrtimer_start(&head->th_timer, time,
3195                                                       HRTIMER_MODE_ABS);
3196                                 }
3197                         }
3198                 }
3199         } else {
3200                 LASSERT(cli->tc_in_heap);
3201                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3202                 list_add_tail(&nrq->nr_u.tbf.tr_list,
3203                                   &cli->tc_list);
3204         }
3205
3206         if (rc == 0)
3207                 CDEBUG(D_RPCTRACE,
3208                        "TBF enqueues: class@%p rate %llu gen %llu "
3209                        "token %llu, rule@%p rate %llu gen %llu\n",
3210                        cli, cli->tc_rpc_rate,
3211                        cli->tc_rule_generation, cli->tc_ntoken,
3212                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3213                        cli->tc_rule->tr_generation);
3214
3215         return rc;
3216 }
3217
3218 /**
3219  * Removes request \a nrq from \a policy's list of queued requests.
3220  *
3221  * \param[in] policy The policy
3222  * \param[in] nrq    The request to remove
3223  */
3224 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3225                              struct ptlrpc_nrs_request *nrq)
3226 {
3227         struct nrs_tbf_head   *head;
3228         struct nrs_tbf_client *cli;
3229
3230         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3231
3232         cli = container_of(nrs_request_resource(nrq),
3233                            struct nrs_tbf_client, tc_res);
3234         head = container_of(nrs_request_resource(nrq)->res_parent,
3235                             struct nrs_tbf_head, th_res);
3236
3237         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3238         list_del_init(&nrq->nr_u.tbf.tr_list);
3239         if (list_empty(&cli->tc_list)) {
3240                 cfs_binheap_remove(head->th_binheap,
3241                                    &cli->tc_node);
3242                 cli->tc_in_heap = false;
3243         } else {
3244                 cfs_binheap_relocate(head->th_binheap,
3245                                      &cli->tc_node);
3246         }
3247 }
3248
3249 /**
3250  * Prints a debug statement right before the request \a nrq stops being
3251  * handled.
3252  *
3253  * \param[in] policy The policy handling the request
3254  * \param[in] nrq    The request being handled
3255  *
3256  * \see ptlrpc_server_finish_request()
3257  * \see ptlrpc_nrs_req_stop_nolock()
3258  */
3259 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3260                               struct ptlrpc_nrs_request *nrq)
3261 {
3262         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3263                                                   rq_nrq);
3264
3265         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3266
3267         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3268                policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
3269                nrq->nr_u.tbf.tr_sequence);
3270 }
3271
3272 #ifdef CONFIG_PROC_FS
3273
3274 /**
3275  * lprocfs interface
3276  */
3277
3278 /**
3279  * The maximum RPC rate.
3280  */
3281 #define LPROCFS_NRS_RATE_MAX            65535
3282
3283 static int
3284 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3285 {
3286         struct ptlrpc_service       *svc = m->private;
3287         int                          rc;
3288
3289         seq_printf(m, "regular_requests:\n");
3290         /**
3291          * Perform two separate calls to this as only one of the NRS heads'
3292          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3293          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3294          */
3295         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3296                                        NRS_POL_NAME_TBF,
3297                                        NRS_CTL_TBF_RD_RULE,
3298                                        false, m);
3299         if (rc == 0) {
3300                 /**
3301                  * -ENOSPC means buf in the parameter m is overflow, return 0
3302                  * here to let upper layer function seq_read alloc a larger
3303                  * memory area and do this process again.
3304                  */
3305         } else if (rc == -ENOSPC) {
3306                 return 0;
3307
3308                 /**
3309                  * Ignore -ENODEV as the regular NRS head's policy may be in the
3310                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3311                  */
3312         } else if (rc != -ENODEV) {
3313                 return rc;
3314         }
3315
3316         if (!nrs_svc_has_hp(svc))
3317                 goto no_hp;
3318
3319         seq_printf(m, "high_priority_requests:\n");
3320         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3321                                        NRS_POL_NAME_TBF,
3322                                        NRS_CTL_TBF_RD_RULE,
3323                                        false, m);
3324         if (rc == 0) {
3325                 /**
3326                  * -ENOSPC means buf in the parameter m is overflow, return 0
3327                  * here to let upper layer function seq_read alloc a larger
3328                  * memory area and do this process again.
3329                  */
3330         } else if (rc == -ENOSPC) {
3331                 return 0;
3332         }
3333
3334 no_hp:
3335
3336         return rc;
3337 }
3338
3339 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3340 {
3341         int rc;
3342         ENTRY;
3343
3344         switch (cmd->u.tc_start.ts_valid_type) {
3345         case NRS_TBF_FLAG_JOBID:
3346                 rc = nrs_tbf_jobid_parse(cmd, token);
3347                 break;
3348         case NRS_TBF_FLAG_NID:
3349                 rc = nrs_tbf_nid_parse(cmd, token);
3350                 break;
3351         case NRS_TBF_FLAG_OPCODE:
3352                 rc = nrs_tbf_opcode_parse(cmd, token);
3353                 break;
3354         case NRS_TBF_FLAG_GENERIC:
3355                 rc = nrs_tbf_generic_parse(cmd, token);
3356                 break;
3357         case NRS_TBF_FLAG_UID:
3358         case NRS_TBF_FLAG_GID:
3359                 rc = nrs_tbf_ug_id_parse(cmd, token);
3360                 break;
3361         default:
3362                 RETURN(-EINVAL);
3363         }
3364
3365         RETURN(rc);
3366 }
3367
3368 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3369 {
3370         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3371                 switch (cmd->u.tc_start.ts_valid_type) {
3372                 case NRS_TBF_FLAG_JOBID:
3373                         nrs_tbf_jobid_cmd_fini(cmd);
3374                         break;
3375                 case NRS_TBF_FLAG_NID:
3376                         nrs_tbf_nid_cmd_fini(cmd);
3377                         break;
3378                 case NRS_TBF_FLAG_OPCODE:
3379                         nrs_tbf_opcode_cmd_fini(cmd);
3380                         break;
3381                 case NRS_TBF_FLAG_GENERIC:
3382                         nrs_tbf_generic_cmd_fini(cmd);
3383                         break;
3384                 case NRS_TBF_FLAG_UID:
3385                 case NRS_TBF_FLAG_GID:
3386                         nrs_tbf_id_cmd_fini(cmd);
3387                         break;
3388                 default:
3389                         CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3390                               cmd->u.tc_start.ts_valid_type);
3391                 }
3392         }
3393 }
3394
3395 static bool name_is_valid(const char *name)
3396 {
3397         int i;
3398
3399         for (i = 0; i < strlen(name); i++) {
3400                 if ((!isalnum(name[i])) &&
3401                     (name[i] != '_'))
3402                         return false;
3403         }
3404         return true;
3405 }
3406
3407 static int
3408 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3409 {
3410         char    *key;
3411         char    *val;
3412         int      rc;
3413         __u64    rate;
3414
3415         val = buffer;
3416         key = strsep(&val, "=");
3417         if (val == NULL || strlen(val) == 0)
3418                 return -EINVAL;
3419
3420         /* Key of the value pair */
3421         if (strcmp(key, "rate") == 0) {
3422                 rc = kstrtoull(val, 10, &rate);
3423                 if (rc)
3424                         return rc;
3425
3426                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3427                         return -EINVAL;
3428
3429                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3430                         cmd->u.tc_start.ts_rpc_rate = rate;
3431                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3432                         cmd->u.tc_change.tc_rpc_rate = rate;
3433                 else
3434                         return -EINVAL;
3435         }  else if (strcmp(key, "rank") == 0) {
3436                 if (!name_is_valid(val))
3437                         return -EINVAL;
3438
3439                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3440                         cmd->u.tc_start.ts_next_name = val;
3441                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3442                         cmd->u.tc_change.tc_next_name = val;
3443                 else
3444                         return -EINVAL;
3445         } else if (strcmp(key, "realtime") == 0) {
3446                 unsigned long realtime;
3447
3448                 rc = kstrtoul(val, 10, &realtime);
3449                 if (rc)
3450                         return rc;
3451
3452                 if (realtime > 0)
3453                         cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3454         } else {
3455                 return -EINVAL;
3456         }
3457         return 0;
3458 }
3459
3460 static int
3461 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3462 {
3463         char    *val;
3464         char    *token;
3465         int      rc;
3466
3467         val = buffer;
3468         while (val != NULL && strlen(val) != 0) {
3469                 token = strsep(&val, " ");
3470                 rc = nrs_tbf_parse_value_pair(cmd, token);
3471                 if (rc)
3472                         return rc;
3473         }
3474
3475         switch (cmd->tc_cmd) {
3476         case NRS_CTL_TBF_START_RULE:
3477                 if (cmd->u.tc_start.ts_rpc_rate == 0)
3478                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3479                 break;
3480         case NRS_CTL_TBF_CHANGE_RULE:
3481                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3482                     cmd->u.tc_change.tc_next_name == NULL)
3483                         return -EINVAL;
3484                 break;
3485         case NRS_CTL_TBF_STOP_RULE:
3486                 break;
3487         default:
3488                 return -EINVAL;
3489         }
3490         return 0;
3491 }
3492
3493 static struct nrs_tbf_cmd *
3494 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3495 {
3496         static struct nrs_tbf_cmd       *cmd;
3497         char                            *token;
3498         char                            *val;
3499         int                              rc = 0;
3500
3501         OBD_ALLOC_PTR(cmd);
3502         if (cmd == NULL)
3503                 GOTO(out, rc = -ENOMEM);
3504         memset(cmd, 0, sizeof(*cmd));
3505
3506         val = buffer;
3507         token = strsep(&val, " ");
3508         if (val == NULL || strlen(val) == 0)
3509                 GOTO(out_free_cmd, rc = -EINVAL);
3510
3511         /* Type of the command */
3512         if (strcmp(token, "start") == 0) {
3513                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3514                 cmd->u.tc_start.ts_valid_type = type_flag;
3515         } else if (strcmp(token, "stop") == 0)
3516                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3517         else if (strcmp(token, "change") == 0)
3518                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3519         else
3520                 GOTO(out_free_cmd, rc = -EINVAL);
3521
3522         /* Name of the rule */
3523         token = strsep(&val, " ");
3524         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) ||
3525             !name_is_valid(token))
3526                 GOTO(out_free_cmd, rc = -EINVAL);
3527         cmd->tc_name = token;
3528
3529         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3530                 /* List of ID */
3531                 LASSERT(val);
3532                 token = val;
3533                 val = strrchr(token, '}');
3534                 if (!val)
3535                         GOTO(out_free_cmd, rc = -EINVAL);
3536
3537                 /* Skip '}' */
3538                 val++;
3539                 if (*val == '\0') {
3540                         val = NULL;
3541                 } else if (*val == ' ') {
3542                         *val = '\0';
3543                         val++;
3544                 } else
3545                         GOTO(out_free_cmd, rc = -EINVAL);
3546
3547                 rc = nrs_tbf_id_parse(cmd, token);
3548                 if (rc)
3549                         GOTO(out_free_cmd, rc);
3550         }
3551
3552         rc = nrs_tbf_parse_value_pairs(cmd, val);
3553         if (rc)
3554                 GOTO(out_cmd_fini, rc = -EINVAL);
3555         goto out;
3556 out_cmd_fini:
3557         nrs_tbf_cmd_fini(cmd);
3558 out_free_cmd:
3559         OBD_FREE_PTR(cmd);
3560 out:
3561         if (rc)
3562                 cmd = ERR_PTR(rc);
3563         return cmd;
3564 }
3565
3566 /**
3567  * Get the TBF policy type (nid, jobid, etc) preset by
3568  * proc entry 'nrs_policies' for command buffer parsing.
3569  *
3570  * \param[in] svc the PTLRPC service
3571  * \param[in] queue the NRS queue type
3572  *
3573  * \retval the preset TBF policy type flag
3574  */
3575 static __u32
3576 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3577 {
3578         __u32   type;
3579         int     rc;
3580
3581         rc = ptlrpc_nrs_policy_control(svc, queue,
3582                                        NRS_POL_NAME_TBF,
3583                                        NRS_CTL_TBF_RD_TYPE_FLAG,
3584                                        true, &type);
3585         if (rc != 0)
3586                 type = NRS_TBF_FLAG_INVALID;
3587
3588         return type;
3589 }
3590
3591 extern struct nrs_core nrs_core;
3592 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3593 static ssize_t
3594 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3595                                       const char __user *buffer,
3596                                       size_t count, loff_t *off)
3597 {
3598         struct seq_file           *m = file->private_data;
3599         struct ptlrpc_service     *svc = m->private;
3600         char                      *kernbuf;
3601         char                      *val;
3602         int                        rc;
3603         static struct nrs_tbf_cmd *cmd;
3604         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3605         unsigned long              length;
3606         char                      *token;
3607
3608         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3609         if (kernbuf == NULL)
3610                 GOTO(out, rc = -ENOMEM);
3611
3612         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3613                 GOTO(out_free_kernbuff, rc = -EINVAL);
3614
3615         if (copy_from_user(kernbuf, buffer, count))
3616                 GOTO(out_free_kernbuff, rc = -EFAULT);
3617
3618         val = kernbuf;
3619         token = strsep(&val, " ");
3620         if (val == NULL)
3621                 GOTO(out_free_kernbuff, rc = -EINVAL);
3622
3623         if (strcmp(token, "reg") == 0) {
3624                 queue = PTLRPC_NRS_QUEUE_REG;
3625         } else if (strcmp(token, "hp") == 0) {
3626                 queue = PTLRPC_NRS_QUEUE_HP;
3627         } else {
3628                 kernbuf[strlen(token)] = ' ';
3629                 val = kernbuf;
3630         }
3631         length = strlen(val);
3632
3633         if (length == 0)
3634                 GOTO(out_free_kernbuff, rc = -EINVAL);
3635
3636         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3637                 GOTO(out_free_kernbuff, rc = -ENODEV);
3638         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3639                 queue = PTLRPC_NRS_QUEUE_REG;
3640
3641         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3642         if (IS_ERR(cmd))
3643                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3644
3645         /**
3646          * Serialize NRS core lprocfs operations with policy registration/
3647          * unregistration.
3648          */
3649         mutex_lock(&nrs_core.nrs_mutex);
3650         rc = ptlrpc_nrs_policy_control(svc, queue,
3651                                        NRS_POL_NAME_TBF,
3652                                        NRS_CTL_TBF_WR_RULE,
3653                                        false, cmd);
3654         mutex_unlock(&nrs_core.nrs_mutex);
3655
3656         nrs_tbf_cmd_fini(cmd);
3657         OBD_FREE_PTR(cmd);
3658 out_free_kernbuff:
3659         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3660 out:
3661         return rc ? rc : count;
3662 }
3663 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3664
3665 /**
3666  * Initializes a TBF policy's lprocfs interface for service \a svc
3667  *
3668  * \param[in] svc the service
3669  *
3670  * \retval 0    success
3671  * \retval != 0 error
3672  */
3673 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3674 {
3675         struct lprocfs_vars nrs_tbf_lprocfs_vars[] = {
3676                 { .name         = "nrs_tbf_rule",
3677                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3678                   .data = svc },
3679                 { NULL }
3680         };
3681
3682         if (svc->srv_procroot == NULL)
3683                 return 0;
3684
3685         return lprocfs_add_vars(svc->srv_procroot, nrs_tbf_lprocfs_vars, NULL);
3686 }
3687
3688 /**
3689  * Cleans up a TBF policy's lprocfs interface for service \a svc
3690  *
3691  * \param[in] svc the service
3692  */
3693 static void nrs_tbf_lprocfs_fini(struct ptlrpc_service *svc)
3694 {
3695         if (svc->srv_procroot == NULL)
3696                 return;
3697
3698         lprocfs_remove_proc_entry("nrs_tbf_rule", svc->srv_procroot);
3699 }
3700
3701 #endif /* CONFIG_PROC_FS */
3702
3703 /**
3704  * TBF policy operations
3705  */
3706 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3707         .op_policy_start        = nrs_tbf_start,
3708         .op_policy_stop         = nrs_tbf_stop,
3709         .op_policy_ctl          = nrs_tbf_ctl,
3710         .op_res_get             = nrs_tbf_res_get,
3711         .op_res_put             = nrs_tbf_res_put,
3712         .op_req_get             = nrs_tbf_req_get,
3713         .op_req_enqueue         = nrs_tbf_req_add,
3714         .op_req_dequeue         = nrs_tbf_req_del,
3715         .op_req_stop            = nrs_tbf_req_stop,
3716 #ifdef CONFIG_PROC_FS
3717         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3718         .op_lprocfs_fini        = nrs_tbf_lprocfs_fini,
3719 #endif
3720 };
3721
3722 /**
3723  * TBF policy configuration
3724  */
3725 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3726         .nc_name                = NRS_POL_NAME_TBF,
3727         .nc_ops                 = &nrs_tbf_ops,
3728         .nc_compat              = nrs_policy_compat_all,
3729 };
3730
3731 /** @} tbf */
3732
3733 /** @} nrs */
3734
3735 #endif /* HAVE_SERVER_SUPPORT */