Whamcloud - gitweb
653b55e6f0a5983ad948302a96bfd06b94208a33
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 /**
35  * \addtogoup nrs
36  * @{
37  */
38
39 #define DEBUG_SUBSYSTEM S_RPC
40 #include <obd_support.h>
41 #include <obd_class.h>
42 #include <libcfs/libcfs.h>
43 #include <lustre_req_layout.h>
44 #include "ptlrpc_internal.h"
45
46 /**
47  * \name tbf
48  *
49  * Token Bucket Filter over client NIDs
50  *
51  * @{
52  */
53
54 #define NRS_POL_NAME_TBF        "tbf"
55
56 static int tbf_jobid_cache_size = 8192;
57 module_param(tbf_jobid_cache_size, int, 0644);
58 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
59
60 static int tbf_rate = 10000;
61 module_param(tbf_rate, int, 0644);
62 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
63
64 static int tbf_depth = 3;
65 module_param(tbf_depth, int, 0644);
66 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
67
68 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
69 {
70         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
71                                                  th_timer);
72         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
73         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
74
75         nrs->nrs_throttling = 0;
76         wake_up(&svcpt->scp_waitq);
77
78         return HRTIMER_NORESTART;
79 }
80
81 #define NRS_TBF_DEFAULT_RULE "default"
82
83 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
84 {
85         LASSERT(atomic_read(&rule->tr_ref) == 0);
86         LASSERT(list_empty(&rule->tr_cli_list));
87         LASSERT(list_empty(&rule->tr_linkage));
88
89         rule->tr_head->th_ops->o_rule_fini(rule);
90         OBD_FREE_PTR(rule);
91 }
92
93 /**
94  * Decreases the rule's usage reference count, and stops the rule in case it
95  * was already stopping and have no more outstanding usage references (which
96  * indicates it has no more queued or started requests, and can be safely
97  * stopped).
98  */
99 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
100 {
101         if (atomic_dec_and_test(&rule->tr_ref))
102                 nrs_tbf_rule_fini(rule);
103 }
104
105 /**
106  * Increases the rule's usage reference count.
107  */
108 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
109 {
110         atomic_inc(&rule->tr_ref);
111 }
112
113 static void
114 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
115 {
116         LASSERT(!list_empty(&cli->tc_linkage));
117         LASSERT(cli->tc_rule);
118         spin_lock(&cli->tc_rule->tr_rule_lock);
119         list_del_init(&cli->tc_linkage);
120         spin_unlock(&cli->tc_rule->tr_rule_lock);
121         nrs_tbf_rule_put(cli->tc_rule);
122         cli->tc_rule = NULL;
123 }
124
125 static void
126 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
127                         struct nrs_tbf_client *cli)
128
129 {
130         struct nrs_tbf_rule *rule = cli->tc_rule;
131
132         cli->tc_rpc_rate = rule->tr_rpc_rate;
133         cli->tc_nsecs = rule->tr_nsecs_per_rpc;
134         cli->tc_depth = rule->tr_depth;
135         cli->tc_ntoken = rule->tr_depth;
136         cli->tc_check_time = ktime_to_ns(ktime_get());
137         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
138         cli->tc_rule_generation = rule->tr_generation;
139
140         if (cli->tc_in_heap)
141                 binheap_relocate(head->th_binheap,
142                                  &cli->tc_node);
143 }
144
145 static void
146 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
147                   struct nrs_tbf_rule *rule,
148                   struct nrs_tbf_client *cli)
149 {
150         spin_lock(&cli->tc_rule_lock);
151         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
152                 LASSERT(rule != cli->tc_rule);
153                 nrs_tbf_cli_rule_put(cli);
154         }
155         LASSERT(cli->tc_rule == NULL);
156         LASSERT(list_empty(&cli->tc_linkage));
157         /* Rule's ref is added before called */
158         cli->tc_rule = rule;
159         spin_lock(&rule->tr_rule_lock);
160         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
161         spin_unlock(&rule->tr_rule_lock);
162         spin_unlock(&cli->tc_rule_lock);
163         nrs_tbf_cli_reset_value(head, cli);
164 }
165
166 static int
167 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
168 {
169         return rule->tr_head->th_ops->o_rule_dump(rule, m);
170 }
171
172 static int
173 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
174 {
175         struct nrs_tbf_rule *rule;
176         int rc = 0;
177
178         LASSERT(head != NULL);
179         spin_lock(&head->th_rule_lock);
180         /* List the rules from newest to oldest */
181         list_for_each_entry(rule, &head->th_list, tr_linkage) {
182                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
183                 rc = nrs_tbf_rule_dump(rule, m);
184                 if (rc) {
185                         rc = -ENOSPC;
186                         break;
187                 }
188         }
189         spin_unlock(&head->th_rule_lock);
190
191         return rc;
192 }
193
194 static struct nrs_tbf_rule *
195 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
196                          const char *name)
197 {
198         struct nrs_tbf_rule *rule;
199
200         LASSERT(head != NULL);
201         list_for_each_entry(rule, &head->th_list, tr_linkage) {
202                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
203                 if (strcmp(rule->tr_name, name) == 0) {
204                         nrs_tbf_rule_get(rule);
205                         return rule;
206                 }
207         }
208         return NULL;
209 }
210
211 static struct nrs_tbf_rule *
212 nrs_tbf_rule_find(struct nrs_tbf_head *head,
213                   const char *name)
214 {
215         struct nrs_tbf_rule *rule;
216
217         LASSERT(head != NULL);
218         spin_lock(&head->th_rule_lock);
219         rule = nrs_tbf_rule_find_nolock(head, name);
220         spin_unlock(&head->th_rule_lock);
221         return rule;
222 }
223
224 static struct nrs_tbf_rule *
225 nrs_tbf_rule_match(struct nrs_tbf_head *head,
226                    struct nrs_tbf_client *cli)
227 {
228         struct nrs_tbf_rule *rule = NULL;
229         struct nrs_tbf_rule *tmp_rule;
230
231         spin_lock(&head->th_rule_lock);
232         /* Match the newest rule in the list */
233         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
234                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
235                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
236                         rule = tmp_rule;
237                         break;
238                 }
239         }
240
241         if (rule == NULL)
242                 rule = head->th_rule;
243
244         nrs_tbf_rule_get(rule);
245         spin_unlock(&head->th_rule_lock);
246         return rule;
247 }
248
249 static void
250 nrs_tbf_cli_init(struct nrs_tbf_head *head,
251                  struct nrs_tbf_client *cli,
252                  struct ptlrpc_request *req)
253 {
254         struct nrs_tbf_rule *rule;
255
256         memset(cli, 0, sizeof(*cli));
257         cli->tc_in_heap = false;
258         head->th_ops->o_cli_init(cli, req);
259         INIT_LIST_HEAD(&cli->tc_list);
260         INIT_LIST_HEAD(&cli->tc_linkage);
261         spin_lock_init(&cli->tc_rule_lock);
262         atomic_set(&cli->tc_ref, 1);
263         rule = nrs_tbf_rule_match(head, cli);
264         nrs_tbf_cli_reset(head, rule, cli);
265 }
266
267 static void
268 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
269 {
270         LASSERT(list_empty(&cli->tc_list));
271         LASSERT(!cli->tc_in_heap);
272         LASSERT(atomic_read(&cli->tc_ref) == 0);
273         spin_lock(&cli->tc_rule_lock);
274         nrs_tbf_cli_rule_put(cli);
275         spin_unlock(&cli->tc_rule_lock);
276         OBD_FREE_PTR(cli);
277 }
278
279 static int
280 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
281                    struct nrs_tbf_head *head,
282                    struct nrs_tbf_cmd *start)
283 {
284         struct nrs_tbf_rule     *rule;
285         struct nrs_tbf_rule     *tmp_rule;
286         struct nrs_tbf_rule     *next_rule;
287         char                    *next_name = start->u.tc_start.ts_next_name;
288         int                      rc;
289
290         rule = nrs_tbf_rule_find(head, start->tc_name);
291         if (rule) {
292                 nrs_tbf_rule_put(rule);
293                 return -EEXIST;
294         }
295
296         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
297         if (rule == NULL)
298                 return -ENOMEM;
299
300         memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
301         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
302         rule->tr_flags = start->u.tc_start.ts_rule_flags;
303         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
304         rule->tr_depth = tbf_depth;
305         atomic_set(&rule->tr_ref, 1);
306         INIT_LIST_HEAD(&rule->tr_cli_list);
307         INIT_LIST_HEAD(&rule->tr_nids);
308         INIT_LIST_HEAD(&rule->tr_linkage);
309         spin_lock_init(&rule->tr_rule_lock);
310         rule->tr_head = head;
311
312         rc = head->th_ops->o_rule_init(policy, rule, start);
313         if (rc) {
314                 OBD_FREE_PTR(rule);
315                 return rc;
316         }
317
318         /* Add as the newest rule */
319         spin_lock(&head->th_rule_lock);
320         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
321         if (tmp_rule) {
322                 spin_unlock(&head->th_rule_lock);
323                 nrs_tbf_rule_put(tmp_rule);
324                 nrs_tbf_rule_put(rule);
325                 return -EEXIST;
326         }
327
328         if (next_name) {
329                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
330                 if (!next_rule) {
331                         spin_unlock(&head->th_rule_lock);
332                         nrs_tbf_rule_put(rule);
333                         return -ENOENT;
334                 }
335
336                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
337                 nrs_tbf_rule_put(next_rule);
338         } else {
339                 /* Add on the top of the rule list */
340                 list_add(&rule->tr_linkage, &head->th_list);
341         }
342         spin_unlock(&head->th_rule_lock);
343         atomic_inc(&head->th_rule_sequence);
344         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
345                 rule->tr_flags |= NTRS_DEFAULT;
346                 LASSERT(head->th_rule == NULL);
347                 head->th_rule = rule;
348         }
349
350         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %u gen %llu\n",
351                rule, rule->tr_rpc_rate, rule->tr_generation);
352
353         return 0;
354 }
355
356 /**
357  * Change the rank of a rule in the rule list
358  *
359  * The matched rule will be moved to the position right before another
360  * given rule.
361  *
362  * \param[in] policy    the policy instance
363  * \param[in] head      the TBF policy instance
364  * \param[in] name      the rule name to be moved
365  * \param[in] next_name the rule name before which the matched rule will be
366  *                      moved
367  *
368  */
369 static int
370 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
371                          struct nrs_tbf_head *head,
372                          char *name,
373                          char *next_name)
374 {
375         struct nrs_tbf_rule     *rule = NULL;
376         struct nrs_tbf_rule     *next_rule = NULL;
377         int                      rc = 0;
378
379         LASSERT(head != NULL);
380
381         spin_lock(&head->th_rule_lock);
382         rule = nrs_tbf_rule_find_nolock(head, name);
383         if (!rule)
384                 GOTO(out, rc = -ENOENT);
385
386         if (strcmp(name, next_name) == 0)
387                 GOTO(out_put, rc);
388
389         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
390         if (!next_rule)
391                 GOTO(out_put, rc = -ENOENT);
392
393         /* rules may be adjacent in same list, so list_move() isn't safe here */
394         list_move_tail(&rule->tr_linkage, &next_rule->tr_linkage);
395         nrs_tbf_rule_put(next_rule);
396 out_put:
397         nrs_tbf_rule_put(rule);
398 out:
399         spin_unlock(&head->th_rule_lock);
400         return rc;
401 }
402
403 static int
404 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
405                          struct nrs_tbf_head *head,
406                          char *name,
407                          __u64 rate)
408 {
409         struct nrs_tbf_rule *rule;
410
411         assert_spin_locked(&policy->pol_nrs->nrs_lock);
412
413         rule = nrs_tbf_rule_find(head, name);
414         if (rule == NULL)
415                 return -ENOENT;
416
417         rule->tr_rpc_rate = rate;
418         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
419         rule->tr_generation++;
420         nrs_tbf_rule_put(rule);
421
422         return 0;
423 }
424
425 static int
426 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
427                     struct nrs_tbf_head *head,
428                     struct nrs_tbf_cmd *change)
429 {
430         __u64    rate = change->u.tc_change.tc_rpc_rate;
431         char    *next_name = change->u.tc_change.tc_next_name;
432         int      rc;
433
434         if (rate != 0) {
435                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
436                                               rate);
437                 if (rc)
438                         return rc;
439         }
440
441         if (next_name) {
442                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
443                                               next_name);
444                 if (rc)
445                         return rc;
446         }
447
448         return 0;
449 }
450
451 static int
452 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
453                   struct nrs_tbf_head *head,
454                   struct nrs_tbf_cmd *stop)
455 {
456         struct nrs_tbf_rule *rule;
457
458         assert_spin_locked(&policy->pol_nrs->nrs_lock);
459
460         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
461                 return -EPERM;
462
463         rule = nrs_tbf_rule_find(head, stop->tc_name);
464         if (rule == NULL)
465                 return -ENOENT;
466
467         list_del_init(&rule->tr_linkage);
468         rule->tr_flags |= NTRS_STOPPING;
469         nrs_tbf_rule_put(rule);
470         nrs_tbf_rule_put(rule);
471
472         return 0;
473 }
474
475 static int
476 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
477                 struct nrs_tbf_head *head,
478                 struct nrs_tbf_cmd *cmd)
479 {
480         int rc;
481
482         assert_spin_locked(&policy->pol_nrs->nrs_lock);
483
484         switch (cmd->tc_cmd) {
485         case NRS_CTL_TBF_START_RULE:
486                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
487                         return -EINVAL;
488
489                 spin_unlock(&policy->pol_nrs->nrs_lock);
490                 rc = nrs_tbf_rule_start(policy, head, cmd);
491                 spin_lock(&policy->pol_nrs->nrs_lock);
492                 return rc;
493         case NRS_CTL_TBF_CHANGE_RULE:
494                 rc = nrs_tbf_rule_change(policy, head, cmd);
495                 return rc;
496         case NRS_CTL_TBF_STOP_RULE:
497                 rc = nrs_tbf_rule_stop(policy, head, cmd);
498                 /* Take it as a success, if not exists at all */
499                 return rc == -ENOENT ? 0 : rc;
500         default:
501                 return -EFAULT;
502         }
503 }
504
505 /**
506  * Binary heap predicate.
507  *
508  * \param[in] e1 the first binheap node to compare
509  * \param[in] e2 the second binheap node to compare
510  *
511  * \retval 0 e1 > e2
512  * \retval 1 e1 < e2
513  */
514 static int
515 tbf_cli_compare(struct binheap_node *e1, struct binheap_node *e2)
516 {
517         struct nrs_tbf_client *cli1;
518         struct nrs_tbf_client *cli2;
519
520         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
521         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
522
523         if (cli1->tc_deadline < cli2->tc_deadline)
524                 return 1;
525         else if (cli1->tc_deadline > cli2->tc_deadline)
526                 return 0;
527
528         if (cli1->tc_check_time < cli2->tc_check_time)
529                 return 1;
530         else if (cli1->tc_check_time > cli2->tc_check_time)
531                 return 0;
532
533         /* Maybe need more comparasion, e.g. request number in the rules */
534         return 1;
535 }
536
537 /**
538  * TBF binary heap operations
539  */
540 static struct binheap_ops nrs_tbf_heap_ops = {
541         .hop_enter      = NULL,
542         .hop_exit       = NULL,
543         .hop_compare    = tbf_cli_compare,
544 };
545
546 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
547                                   unsigned mask)
548 {
549         return cfs_hash_djb2_hash(key, strlen(key), mask);
550 }
551
552 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
553 {
554         struct nrs_tbf_client *cli = hlist_entry(hnode,
555                                                      struct nrs_tbf_client,
556                                                      tc_hnode);
557
558         return (strcmp(cli->tc_jobid, key) == 0);
559 }
560
561 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
562 {
563         struct nrs_tbf_client *cli = hlist_entry(hnode,
564                                                      struct nrs_tbf_client,
565                                                      tc_hnode);
566
567         return cli->tc_jobid;
568 }
569
570 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
571 {
572         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
573 }
574
575 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
576 {
577         struct nrs_tbf_client *cli = hlist_entry(hnode,
578                                                      struct nrs_tbf_client,
579                                                      tc_hnode);
580
581         atomic_inc(&cli->tc_ref);
582 }
583
584 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
585 {
586         struct nrs_tbf_client *cli = hlist_entry(hnode,
587                                                      struct nrs_tbf_client,
588                                                      tc_hnode);
589
590         atomic_dec(&cli->tc_ref);
591 }
592
593 static void
594 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
595
596 {
597         struct nrs_tbf_client *cli = hlist_entry(hnode,
598                                                  struct nrs_tbf_client,
599                                                  tc_hnode);
600
601         LASSERT(atomic_read(&cli->tc_ref) == 0);
602         nrs_tbf_cli_fini(cli);
603 }
604
605 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
606         .hs_hash        = nrs_tbf_jobid_hop_hash,
607         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
608         .hs_key         = nrs_tbf_jobid_hop_key,
609         .hs_object      = nrs_tbf_hop_object,
610         .hs_get         = nrs_tbf_jobid_hop_get,
611         .hs_put         = nrs_tbf_jobid_hop_put,
612         .hs_put_locked  = nrs_tbf_jobid_hop_put,
613         .hs_exit        = nrs_tbf_jobid_hop_exit,
614 };
615
616 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
617                                   CFS_HASH_NO_ITEMREF | \
618                                   CFS_HASH_DEPTH)
619
620 static struct nrs_tbf_client *
621 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
622                           struct cfs_hash_bd *bd,
623                           const char *jobid)
624 {
625         struct hlist_node *hnode;
626         struct nrs_tbf_client *cli;
627
628         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
629         if (hnode == NULL)
630                 return NULL;
631
632         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
633         if (!list_empty(&cli->tc_lru))
634                 list_del_init(&cli->tc_lru);
635         return cli;
636 }
637
638 #define NRS_TBF_JOBID_NULL ""
639
640 static struct nrs_tbf_client *
641 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
642                        struct ptlrpc_request *req)
643 {
644         const char              *jobid;
645         struct nrs_tbf_client   *cli;
646         struct cfs_hash         *hs = head->th_cli_hash;
647         struct cfs_hash_bd               bd;
648
649         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
650         if (jobid == NULL)
651                 jobid = NRS_TBF_JOBID_NULL;
652         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
653         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
654         cfs_hash_bd_unlock(hs, &bd, 1);
655
656         return cli;
657 }
658
659 static struct nrs_tbf_client *
660 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
661                           struct nrs_tbf_client *cli)
662 {
663         const char              *jobid;
664         struct nrs_tbf_client   *ret;
665         struct cfs_hash         *hs = head->th_cli_hash;
666         struct cfs_hash_bd               bd;
667
668         jobid = cli->tc_jobid;
669         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
670         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
671         if (ret == NULL) {
672                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
673                 ret = cli;
674         }
675         cfs_hash_bd_unlock(hs, &bd, 1);
676
677         return ret;
678 }
679
680 static void
681 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
682                       struct nrs_tbf_client *cli)
683 {
684         struct cfs_hash_bd               bd;
685         struct cfs_hash         *hs = head->th_cli_hash;
686         struct nrs_tbf_bucket   *bkt;
687         int                      hw;
688         LIST_HEAD(zombies);
689
690         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
691         bkt = cfs_hash_bd_extra_get(hs, &bd);
692         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
693                 return;
694         LASSERT(list_empty(&cli->tc_lru));
695         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
696
697         /*
698          * Check and purge the LRU, there is at least one client in the LRU.
699          */
700         hw = tbf_jobid_cache_size >>
701              (hs->hs_cur_bits - hs->hs_bkt_bits);
702         while (cfs_hash_bd_count_get(&bd) > hw) {
703                 if (unlikely(list_empty(&bkt->ntb_lru)))
704                         break;
705                 cli = list_entry(bkt->ntb_lru.next,
706                                      struct nrs_tbf_client,
707                                      tc_lru);
708                 LASSERT(atomic_read(&cli->tc_ref) == 0);
709                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
710                 list_move(&cli->tc_lru, &zombies);
711         }
712         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
713
714         while (!list_empty(&zombies)) {
715                 cli = container_of(zombies.next,
716                                    struct nrs_tbf_client, tc_lru);
717                 list_del_init(&cli->tc_lru);
718                 nrs_tbf_cli_fini(cli);
719         }
720 }
721
722 static void
723 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
724                        struct ptlrpc_request *req)
725 {
726         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
727
728         if (jobid == NULL)
729                 jobid = NRS_TBF_JOBID_NULL;
730         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
731         INIT_LIST_HEAD(&cli->tc_lru);
732         memcpy(cli->tc_jobid, jobid, strlen(jobid));
733 }
734
735 static int nrs_tbf_jobid_hash_order(void)
736 {
737         int bits;
738
739         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
740                 ;
741
742         return bits;
743 }
744
745 #define NRS_TBF_JOBID_BKT_BITS 10
746
747 static int
748 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
749                       struct nrs_tbf_head *head)
750 {
751         struct nrs_tbf_cmd       start;
752         struct nrs_tbf_bucket   *bkt;
753         int                      bits;
754         int                      i;
755         int                      rc;
756         struct cfs_hash_bd       bd;
757
758         bits = nrs_tbf_jobid_hash_order();
759         if (bits < NRS_TBF_JOBID_BKT_BITS)
760                 bits = NRS_TBF_JOBID_BKT_BITS;
761         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
762                                             bits,
763                                             bits,
764                                             NRS_TBF_JOBID_BKT_BITS,
765                                             sizeof(*bkt),
766                                             0,
767                                             0,
768                                             &nrs_tbf_jobid_hash_ops,
769                                             NRS_TBF_JOBID_HASH_FLAGS);
770         if (head->th_cli_hash == NULL)
771                 return -ENOMEM;
772
773         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
774                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
775                 INIT_LIST_HEAD(&bkt->ntb_lru);
776         }
777
778         memset(&start, 0, sizeof(start));
779         start.u.tc_start.ts_jobids_str = "*";
780
781         start.u.tc_start.ts_rpc_rate = tbf_rate;
782         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
783         start.tc_name = NRS_TBF_DEFAULT_RULE;
784         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
785         rc = nrs_tbf_rule_start(policy, head, &start);
786         if (rc) {
787                 cfs_hash_putref(head->th_cli_hash);
788                 head->th_cli_hash = NULL;
789         }
790
791         return rc;
792 }
793
794 /**
795  * Frees jobid of \a list.
796  *
797  */
798 static void
799 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
800 {
801         struct nrs_tbf_jobid *jobid, *n;
802
803         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
804                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
805                 list_del(&jobid->tj_linkage);
806                 OBD_FREE_PTR(jobid);
807         }
808 }
809
810 static int
811 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
812 {
813         struct nrs_tbf_jobid *jobid;
814         char *ptr;
815
816         OBD_ALLOC_PTR(jobid);
817         if (jobid == NULL)
818                 return -ENOMEM;
819
820         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
821         if (jobid->tj_id == NULL) {
822                 OBD_FREE_PTR(jobid);
823                 return -ENOMEM;
824         }
825
826         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
827         ptr = lprocfs_strnstr(id->ls_str, "*", id->ls_len);
828         if (ptr == NULL)
829                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
830         else
831                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
832
833         list_add_tail(&jobid->tj_linkage, jobid_list);
834         return 0;
835 }
836
837 static bool
838 cfs_match_wildcard(const char *pattern, const char *content)
839 {
840         if (*pattern == '\0' && *content == '\0')
841                 return true;
842
843         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
844                 return false;
845
846         while (*pattern == *content) {
847                 pattern++;
848                 content++;
849                 if (*pattern == '\0' && *content == '\0')
850                         return true;
851
852                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
853                     *content == '\0')
854                         return false;
855         }
856
857         if (*pattern == '*')
858                 return (cfs_match_wildcard(pattern + 1, content) ||
859                         cfs_match_wildcard(pattern, content + 1));
860
861         return false;
862 }
863
864 static inline bool
865 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
866 {
867         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
868                 return strcmp(jobid->tj_id, id) == 0;
869
870         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
871                 return cfs_match_wildcard(jobid->tj_id, id);
872
873         return false;
874 }
875
876 static int
877 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
878 {
879         struct nrs_tbf_jobid *jobid;
880
881         list_for_each_entry(jobid, jobid_list, tj_linkage) {
882                 if (nrs_tbf_jobid_match(jobid, id))
883                         return 1;
884         }
885         return 0;
886 }
887
888 static int
889 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
890 {
891         struct cfs_lstr src;
892         struct cfs_lstr res;
893         int rc = 0;
894         ENTRY;
895
896         src.ls_str = str;
897         src.ls_len = len;
898         INIT_LIST_HEAD(jobid_list);
899         while (src.ls_str) {
900                 rc = cfs_gettok(&src, ' ', &res);
901                 if (rc == 0) {
902                         rc = -EINVAL;
903                         break;
904                 }
905                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
906                 if (rc)
907                         break;
908         }
909         if (rc)
910                 nrs_tbf_jobid_list_free(jobid_list);
911         RETURN(rc);
912 }
913
914 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
915 {
916         if (!list_empty(&cmd->u.tc_start.ts_jobids))
917                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
918         if (cmd->u.tc_start.ts_jobids_str)
919                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
920                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
921 }
922
923 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
924 {
925         struct cfs_lstr res;
926         int keylen = strlen(key);
927         int rc;
928
929         rc = cfs_gettok(src, '=', &res);
930         if (rc == 0 || res.ls_len != keylen ||
931             strncmp(res.ls_str, key, keylen) != 0 ||
932             src->ls_len <= 2 || src->ls_str[0] != '{' ||
933             src->ls_str[src->ls_len - 1] != '}')
934                 return -EINVAL;
935
936         /* Skip '{' and '}' */
937         src->ls_str++;
938         src->ls_len -= 2;
939         return 0;
940 }
941
942 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
943 {
944         struct cfs_lstr src;
945         int rc;
946
947         src.ls_str = id;
948         src.ls_len = strlen(id);
949         rc = nrs_tbf_check_id_value(&src, "jobid");
950         if (rc)
951                 return rc;
952
953         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
954         if (cmd->u.tc_start.ts_jobids_str == NULL)
955                 return -ENOMEM;
956
957         memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
958
959         /* parse jobid list */
960         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
961                                       strlen(cmd->u.tc_start.ts_jobids_str),
962                                       &cmd->u.tc_start.ts_jobids);
963         if (rc)
964                 nrs_tbf_jobid_cmd_fini(cmd);
965
966         return rc;
967 }
968
969 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
970                                    struct nrs_tbf_rule *rule,
971                                    struct nrs_tbf_cmd *start)
972 {
973         int rc = 0;
974
975         LASSERT(start->u.tc_start.ts_jobids_str);
976         OBD_ALLOC(rule->tr_jobids_str,
977                   strlen(start->u.tc_start.ts_jobids_str) + 1);
978         if (rule->tr_jobids_str == NULL)
979                 return -ENOMEM;
980
981         memcpy(rule->tr_jobids_str,
982                start->u.tc_start.ts_jobids_str,
983                strlen(start->u.tc_start.ts_jobids_str));
984
985         INIT_LIST_HEAD(&rule->tr_jobids);
986         if (!list_empty(&start->u.tc_start.ts_jobids)) {
987                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
988                                               strlen(rule->tr_jobids_str),
989                                               &rule->tr_jobids);
990                 if (rc)
991                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
992         }
993         if (rc)
994                 OBD_FREE(rule->tr_jobids_str,
995                          strlen(start->u.tc_start.ts_jobids_str) + 1);
996         return rc;
997 }
998
999 static int
1000 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1001 {
1002         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
1003                    rule->tr_jobids_str, rule->tr_rpc_rate,
1004                    atomic_read(&rule->tr_ref) - 1);
1005         return 0;
1006 }
1007
1008 static int
1009 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1010                          struct nrs_tbf_client *cli)
1011 {
1012         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1013 }
1014
1015 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1016 {
1017         if (!list_empty(&rule->tr_jobids))
1018                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1019         LASSERT(rule->tr_jobids_str != NULL);
1020         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1021 }
1022
1023 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1024         .o_name = NRS_TBF_TYPE_JOBID,
1025         .o_startup = nrs_tbf_jobid_startup,
1026         .o_cli_find = nrs_tbf_jobid_cli_find,
1027         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1028         .o_cli_put = nrs_tbf_jobid_cli_put,
1029         .o_cli_init = nrs_tbf_jobid_cli_init,
1030         .o_rule_init = nrs_tbf_jobid_rule_init,
1031         .o_rule_dump = nrs_tbf_jobid_rule_dump,
1032         .o_rule_match = nrs_tbf_jobid_rule_match,
1033         .o_rule_fini = nrs_tbf_jobid_rule_fini,
1034 };
1035
1036 /**
1037  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1038  *
1039  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
1040  * nrs_tbf_client objects.
1041  */
1042 #define NRS_TBF_NID_BKT_BITS    8
1043 #define NRS_TBF_NID_BITS        16
1044
1045 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1046                                   unsigned mask)
1047 {
1048         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1049 }
1050
1051 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1052 {
1053         lnet_nid_t            *nid = (lnet_nid_t *)key;
1054         struct nrs_tbf_client *cli = hlist_entry(hnode,
1055                                                      struct nrs_tbf_client,
1056                                                      tc_hnode);
1057
1058         return *nid == cli->tc_nid;
1059 }
1060
1061 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1062 {
1063         struct nrs_tbf_client *cli = hlist_entry(hnode,
1064                                                      struct nrs_tbf_client,
1065                                                      tc_hnode);
1066
1067         return &cli->tc_nid;
1068 }
1069
1070 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1071 {
1072         struct nrs_tbf_client *cli = hlist_entry(hnode,
1073                                                      struct nrs_tbf_client,
1074                                                      tc_hnode);
1075
1076         atomic_inc(&cli->tc_ref);
1077 }
1078
1079 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1080 {
1081         struct nrs_tbf_client *cli = hlist_entry(hnode,
1082                                                      struct nrs_tbf_client,
1083                                                      tc_hnode);
1084
1085         atomic_dec(&cli->tc_ref);
1086 }
1087
1088 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1089 {
1090         struct nrs_tbf_client *cli = hlist_entry(hnode,
1091                                                      struct nrs_tbf_client,
1092                                                      tc_hnode);
1093
1094         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1095                  "Busy TBF object from client with NID %s, with %d refs\n",
1096                  libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1097
1098         nrs_tbf_cli_fini(cli);
1099 }
1100
1101 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1102         .hs_hash        = nrs_tbf_nid_hop_hash,
1103         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1104         .hs_key         = nrs_tbf_nid_hop_key,
1105         .hs_object      = nrs_tbf_hop_object,
1106         .hs_get         = nrs_tbf_nid_hop_get,
1107         .hs_put         = nrs_tbf_nid_hop_put,
1108         .hs_put_locked  = nrs_tbf_nid_hop_put,
1109         .hs_exit        = nrs_tbf_nid_hop_exit,
1110 };
1111
1112 static struct nrs_tbf_client *
1113 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1114                      struct ptlrpc_request *req)
1115 {
1116         return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1117 }
1118
1119 static struct nrs_tbf_client *
1120 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1121                         struct nrs_tbf_client *cli)
1122 {
1123         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1124                                        &cli->tc_hnode);
1125 }
1126
1127 static void
1128 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1129                       struct nrs_tbf_client *cli)
1130 {
1131         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1132 }
1133
1134 static int
1135 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1136                     struct nrs_tbf_head *head)
1137 {
1138         struct nrs_tbf_cmd      start;
1139         int rc;
1140
1141         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1142                                             NRS_TBF_NID_BITS,
1143                                             NRS_TBF_NID_BITS,
1144                                             NRS_TBF_NID_BKT_BITS, 0,
1145                                             CFS_HASH_MIN_THETA,
1146                                             CFS_HASH_MAX_THETA,
1147                                             &nrs_tbf_nid_hash_ops,
1148                                             CFS_HASH_RW_BKTLOCK);
1149         if (head->th_cli_hash == NULL)
1150                 return -ENOMEM;
1151
1152         memset(&start, 0, sizeof(start));
1153         start.u.tc_start.ts_nids_str = "*";
1154
1155         start.u.tc_start.ts_rpc_rate = tbf_rate;
1156         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1157         start.tc_name = NRS_TBF_DEFAULT_RULE;
1158         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1159         rc = nrs_tbf_rule_start(policy, head, &start);
1160         if (rc) {
1161                 cfs_hash_putref(head->th_cli_hash);
1162                 head->th_cli_hash = NULL;
1163         }
1164
1165         return rc;
1166 }
1167
1168 static void
1169 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1170                              struct ptlrpc_request *req)
1171 {
1172         cli->tc_nid = req->rq_peer.nid;
1173 }
1174
1175 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1176                                  struct nrs_tbf_rule *rule,
1177                                  struct nrs_tbf_cmd *start)
1178 {
1179         LASSERT(start->u.tc_start.ts_nids_str);
1180         OBD_ALLOC(rule->tr_nids_str,
1181                   strlen(start->u.tc_start.ts_nids_str) + 1);
1182         if (rule->tr_nids_str == NULL)
1183                 return -ENOMEM;
1184
1185         memcpy(rule->tr_nids_str,
1186                start->u.tc_start.ts_nids_str,
1187                strlen(start->u.tc_start.ts_nids_str));
1188
1189         INIT_LIST_HEAD(&rule->tr_nids);
1190         if (!list_empty(&start->u.tc_start.ts_nids)) {
1191                 if (cfs_parse_nidlist(rule->tr_nids_str,
1192                                       strlen(rule->tr_nids_str),
1193                                       &rule->tr_nids) <= 0) {
1194                         CERROR("nids {%s} illegal\n",
1195                                rule->tr_nids_str);
1196                         OBD_FREE(rule->tr_nids_str,
1197                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1198                         return -EINVAL;
1199                 }
1200         }
1201         return 0;
1202 }
1203
1204 static int
1205 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1206 {
1207         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
1208                    rule->tr_nids_str, rule->tr_rpc_rate,
1209                    atomic_read(&rule->tr_ref) - 1);
1210         return 0;
1211 }
1212
1213 static int
1214 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1215                        struct nrs_tbf_client *cli)
1216 {
1217         return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1218 }
1219
1220 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1221 {
1222         if (!list_empty(&rule->tr_nids))
1223                 cfs_free_nidlist(&rule->tr_nids);
1224         LASSERT(rule->tr_nids_str != NULL);
1225         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1226 }
1227
1228 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1229 {
1230         if (!list_empty(&cmd->u.tc_start.ts_nids))
1231                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1232         if (cmd->u.tc_start.ts_nids_str)
1233                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1234                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1235 }
1236
1237 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1238 {
1239         struct cfs_lstr src;
1240         int rc;
1241
1242         src.ls_str = id;
1243         src.ls_len = strlen(id);
1244         rc = nrs_tbf_check_id_value(&src, "nid");
1245         if (rc)
1246                 return rc;
1247
1248         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1249         if (cmd->u.tc_start.ts_nids_str == NULL)
1250                 return -ENOMEM;
1251
1252         memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1253
1254         /* parse NID list */
1255         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1256                               strlen(cmd->u.tc_start.ts_nids_str),
1257                               &cmd->u.tc_start.ts_nids) <= 0) {
1258                 nrs_tbf_nid_cmd_fini(cmd);
1259                 return -EINVAL;
1260         }
1261
1262         return 0;
1263 }
1264
1265 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1266         .o_name = NRS_TBF_TYPE_NID,
1267         .o_startup = nrs_tbf_nid_startup,
1268         .o_cli_find = nrs_tbf_nid_cli_find,
1269         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1270         .o_cli_put = nrs_tbf_nid_cli_put,
1271         .o_cli_init = nrs_tbf_nid_cli_init,
1272         .o_rule_init = nrs_tbf_nid_rule_init,
1273         .o_rule_dump = nrs_tbf_nid_rule_dump,
1274         .o_rule_match = nrs_tbf_nid_rule_match,
1275         .o_rule_fini = nrs_tbf_nid_rule_fini,
1276 };
1277
1278 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1279                                  unsigned mask)
1280 {
1281         return cfs_hash_djb2_hash(key, strlen(key), mask);
1282 }
1283
1284 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1285 {
1286         struct nrs_tbf_client *cli = hlist_entry(hnode,
1287                                                  struct nrs_tbf_client,
1288                                                  tc_hnode);
1289
1290         return (strcmp(cli->tc_key, key) == 0);
1291 }
1292
1293 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1294 {
1295         struct nrs_tbf_client *cli = hlist_entry(hnode,
1296                                                  struct nrs_tbf_client,
1297                                                  tc_hnode);
1298         return cli->tc_key;
1299 }
1300
1301 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1302 {
1303         struct nrs_tbf_client *cli = hlist_entry(hnode,
1304                                                  struct nrs_tbf_client,
1305                                                  tc_hnode);
1306
1307         atomic_inc(&cli->tc_ref);
1308 }
1309
1310 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1311 {
1312         struct nrs_tbf_client *cli = hlist_entry(hnode,
1313                                                  struct nrs_tbf_client,
1314                                                  tc_hnode);
1315
1316         atomic_dec(&cli->tc_ref);
1317 }
1318
1319 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1320
1321 {
1322         struct nrs_tbf_client *cli = hlist_entry(hnode,
1323                                                  struct nrs_tbf_client,
1324                                                  tc_hnode);
1325
1326         LASSERT(atomic_read(&cli->tc_ref) == 0);
1327         nrs_tbf_cli_fini(cli);
1328 }
1329
1330 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1331         .hs_hash        = nrs_tbf_hop_hash,
1332         .hs_keycmp      = nrs_tbf_hop_keycmp,
1333         .hs_key         = nrs_tbf_hop_key,
1334         .hs_object      = nrs_tbf_hop_object,
1335         .hs_get         = nrs_tbf_hop_get,
1336         .hs_put         = nrs_tbf_hop_put,
1337         .hs_put_locked  = nrs_tbf_hop_put,
1338         .hs_exit        = nrs_tbf_hop_exit,
1339 };
1340
1341 #define NRS_TBF_GENERIC_BKT_BITS        10
1342 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1343                                         CFS_HASH_NO_ITEMREF | \
1344                                         CFS_HASH_DEPTH)
1345
1346 static int
1347 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1348 {
1349         struct nrs_tbf_cmd       start;
1350         struct nrs_tbf_bucket   *bkt;
1351         int                      bits;
1352         int                      i;
1353         int                      rc;
1354         struct cfs_hash_bd       bd;
1355
1356         bits = nrs_tbf_jobid_hash_order();
1357         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1358                 bits = NRS_TBF_GENERIC_BKT_BITS;
1359         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1360                                             bits, bits,
1361                                             NRS_TBF_GENERIC_BKT_BITS,
1362                                             sizeof(*bkt), 0, 0,
1363                                             &nrs_tbf_hash_ops,
1364                                             NRS_TBF_GENERIC_HASH_FLAGS);
1365         if (head->th_cli_hash == NULL)
1366                 return -ENOMEM;
1367
1368         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1369                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1370                 INIT_LIST_HEAD(&bkt->ntb_lru);
1371         }
1372
1373         memset(&start, 0, sizeof(start));
1374         start.u.tc_start.ts_conds_str = "*";
1375
1376         start.u.tc_start.ts_rpc_rate = tbf_rate;
1377         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1378         start.tc_name = NRS_TBF_DEFAULT_RULE;
1379         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1380         rc = nrs_tbf_rule_start(policy, head, &start);
1381         if (rc)
1382                 cfs_hash_putref(head->th_cli_hash);
1383
1384         return rc;
1385 }
1386
1387 static struct nrs_tbf_client *
1388 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1389                         const char *key)
1390 {
1391         struct hlist_node *hnode;
1392         struct nrs_tbf_client *cli;
1393
1394         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1395         if (hnode == NULL)
1396                 return NULL;
1397
1398         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
1399         if (!list_empty(&cli->tc_lru))
1400                 list_del_init(&cli->tc_lru);
1401         return cli;
1402 }
1403
1404 /**
1405  * ONLY opcode presented in this function will be checked in
1406  * nrs_tbf_id_cli_set(). That means, we can add or remove an
1407  * opcode to enable or disable requests handled in nrs_tbf
1408  */
1409 static struct req_format *req_fmt(__u32 opcode)
1410 {
1411         switch (opcode) {
1412         case OST_GETATTR:
1413                 return &RQF_OST_GETATTR;
1414         case OST_SETATTR:
1415                 return &RQF_OST_SETATTR;
1416         case OST_READ:
1417                 return &RQF_OST_BRW_READ;
1418         case OST_WRITE:
1419                 return &RQF_OST_BRW_WRITE;
1420         /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1421          * in most case. Should they be removed? */
1422         case OST_CREATE:
1423                 return &RQF_OST_CREATE;
1424         case OST_DESTROY:
1425                 return &RQF_OST_DESTROY;
1426         case OST_PUNCH:
1427                 return &RQF_OST_PUNCH;
1428         case OST_SYNC:
1429                 return &RQF_OST_SYNC;
1430         case OST_LADVISE:
1431                 return &RQF_OST_LADVISE;
1432         case MDS_GETATTR:
1433                 return &RQF_MDS_GETATTR;
1434         case MDS_GETATTR_NAME:
1435                 return &RQF_MDS_GETATTR_NAME;
1436         /* close is skipped to avoid LDLM cancel slowness */
1437 #if 0
1438         case MDS_CLOSE:
1439                 return &RQF_MDS_CLOSE;
1440 #endif
1441         case MDS_REINT:
1442                 return &RQF_MDS_REINT;
1443         case MDS_READPAGE:
1444                 return &RQF_MDS_READPAGE;
1445         case MDS_GET_ROOT:
1446                 return &RQF_MDS_GET_ROOT;
1447         case MDS_STATFS:
1448                 return &RQF_MDS_STATFS;
1449         case MDS_SYNC:
1450                 return &RQF_MDS_SYNC;
1451         case MDS_QUOTACTL:
1452                 return &RQF_MDS_QUOTACTL;
1453         case MDS_GETXATTR:
1454                 return &RQF_MDS_GETXATTR;
1455         case MDS_GET_INFO:
1456                 return &RQF_MDS_GET_INFO;
1457         /* HSM op is skipped */
1458 #if 0 
1459         case MDS_HSM_STATE_GET:
1460                 return &RQF_MDS_HSM_STATE_GET;
1461         case MDS_HSM_STATE_SET:
1462                 return &RQF_MDS_HSM_STATE_SET;
1463         case MDS_HSM_ACTION:
1464                 return &RQF_MDS_HSM_ACTION;
1465         case MDS_HSM_CT_REGISTER:
1466                 return &RQF_MDS_HSM_CT_REGISTER;
1467         case MDS_HSM_CT_UNREGISTER:
1468                 return &RQF_MDS_HSM_CT_UNREGISTER;
1469 #endif
1470         case MDS_SWAP_LAYOUTS:
1471                 return &RQF_MDS_SWAP_LAYOUTS;
1472         case LDLM_ENQUEUE:
1473                 return &RQF_LDLM_ENQUEUE;
1474         default:
1475                 return NULL;
1476         }
1477 }
1478
1479 static struct req_format *intent_req_fmt(__u32 it_opc)
1480 {
1481         if (it_opc & (IT_OPEN | IT_CREAT))
1482                 return &RQF_LDLM_INTENT_OPEN;
1483         else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1484                 return &RQF_LDLM_INTENT_GETATTR;
1485         else if (it_opc & IT_GETXATTR)
1486                 return &RQF_LDLM_INTENT_GETXATTR;
1487         else if (it_opc & (IT_GLIMPSE | IT_BRW))
1488                 return &RQF_LDLM_INTENT;
1489         else
1490                 return NULL;
1491 }
1492
1493 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1494                               struct tbf_id *id)
1495 {
1496         struct ost_body *body;
1497
1498         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1499         if (body != NULL) {
1500                 id->ti_uid = body->oa.o_uid;
1501                 id->ti_gid = body->oa.o_gid;
1502                 return 0;
1503         }
1504
1505         return -EINVAL;
1506 }
1507
1508 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1509                                       struct tbf_id *id)
1510 {
1511         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1512                                                     &RMF_MDT_BODY);
1513         LASSERT(b != NULL);
1514
1515         /* TODO: nodemaping feature converts {ug}id from individual
1516          * clients to the actual ones of the file system. Some work
1517          * may be needed to fix this. */
1518         id->ti_uid = b->mbo_uid;
1519         id->ti_gid = b->mbo_gid;
1520 }
1521
1522 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1523                                            struct tbf_id *id)
1524 {
1525         struct mdt_rec_reint *rec;
1526
1527         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1528         LASSERT(rec != NULL);
1529
1530         /* use the fs{ug}id as {ug}id of the process */
1531         id->ti_uid = rec->rr_fsuid;
1532         id->ti_gid = rec->rr_fsgid;
1533 }
1534
1535 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1536                               struct tbf_id *id)
1537 {
1538         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1539         int rc = 0;
1540
1541         switch (opc) {
1542         case MDS_GETATTR:
1543         case MDS_GETATTR_NAME:
1544         case MDS_GET_ROOT:
1545         case MDS_READPAGE:
1546         case MDS_SYNC:
1547         case MDS_GETXATTR:
1548         case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1549                 unpack_ugid_from_mdt_body(req, id);
1550                 break;
1551         case MDS_CLOSE:
1552         case MDS_REINT:
1553                 unpack_ugid_from_mdt_rec_reint(req, id);
1554                 break;
1555         default:
1556                 rc = -EINVAL;
1557                 break;
1558         }
1559         return rc;
1560 }
1561
1562 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1563                               struct tbf_id *id)
1564 {
1565         struct ldlm_intent *lit;
1566         struct req_format *fmt;
1567
1568         if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1569                 return -EINVAL;
1570
1571         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1572         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1573         if (lit == NULL)
1574                 return -EINVAL;
1575
1576         fmt = intent_req_fmt(lit->opc);
1577         if (fmt == NULL)
1578                 return -EINVAL;
1579
1580         req_capsule_extend(&req->rq_pill, fmt);
1581
1582         if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1583                 unpack_ugid_from_mdt_body(req, id);
1584         else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1585                 unpack_ugid_from_mdt_rec_reint(req, id);
1586         else
1587                 return -EINVAL;
1588         return 0;
1589 }
1590
1591 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1592                               enum nrs_tbf_flag ti_type)
1593 {
1594         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1595         struct req_format *fmt = req_fmt(opc);
1596         bool fmt_unset = false;
1597         int rc;
1598
1599         memset(id, 0, sizeof(struct tbf_id));
1600         id->ti_type = ti_type;
1601
1602         if (fmt == NULL)
1603                 return -EINVAL;
1604         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1605         if (req->rq_pill.rc_fmt == NULL) {
1606                 req_capsule_set(&req->rq_pill, fmt);
1607                 fmt_unset = true;
1608         }
1609
1610         if (opc < OST_LAST_OPC)
1611                 rc = ost_tbf_id_cli_set(req, id);
1612         else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1613                 rc = mdt_tbf_id_cli_set(req, id);
1614         else if (opc == LDLM_ENQUEUE)
1615                 rc = ldlm_tbf_id_cli_set(req, id);
1616         else
1617                 rc = -EINVAL;
1618
1619         /* restore it to the initialized state */
1620         if (fmt_unset)
1621                 req->rq_pill.rc_fmt = NULL;
1622         return rc;
1623 }
1624
1625 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1626                                        struct ptlrpc_request *req,
1627                                        char *keystr, size_t keystr_sz)
1628 {
1629         const char *jobid;
1630         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1631         struct tbf_id id;
1632
1633         nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1634         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1635         if (jobid == NULL)
1636                 jobid = NRS_TBF_JOBID_NULL;
1637
1638         snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1639                  libcfs_nid2str(req->rq_peer.nid), opc, id.ti_uid,
1640                  id.ti_gid);
1641
1642         if (cli) {
1643                 INIT_LIST_HEAD(&cli->tc_lru);
1644                 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1645                 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1646                 cli->tc_nid = req->rq_peer.nid;
1647                 cli->tc_opcode = opc;
1648                 cli->tc_id = id;
1649         }
1650 }
1651
1652 static struct nrs_tbf_client *
1653 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1654 {
1655         struct nrs_tbf_client *cli;
1656         struct cfs_hash *hs = head->th_cli_hash;
1657         struct cfs_hash_bd bd;
1658         char keystr[NRS_TBF_KEY_LEN];
1659
1660         nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1661         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1662         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1663         cfs_hash_bd_unlock(hs, &bd, 1);
1664
1665         return cli;
1666 }
1667
1668 static struct nrs_tbf_client *
1669 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1670                     struct nrs_tbf_client *cli)
1671 {
1672         const char              *key;
1673         struct nrs_tbf_client   *ret;
1674         struct cfs_hash         *hs = head->th_cli_hash;
1675         struct cfs_hash_bd       bd;
1676
1677         key = cli->tc_key;
1678         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1679         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1680         if (ret == NULL) {
1681                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1682                 ret = cli;
1683         }
1684         cfs_hash_bd_unlock(hs, &bd, 1);
1685
1686         return ret;
1687 }
1688
1689 static void
1690 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1691 {
1692         struct cfs_hash_bd       bd;
1693         struct cfs_hash         *hs = head->th_cli_hash;
1694         struct nrs_tbf_bucket   *bkt;
1695         int                      hw;
1696         LIST_HEAD(zombies);
1697
1698         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1699         bkt = cfs_hash_bd_extra_get(hs, &bd);
1700         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1701                 return;
1702         LASSERT(list_empty(&cli->tc_lru));
1703         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1704
1705         /**
1706          * Check and purge the LRU, there is at least one client in the LRU.
1707          */
1708         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1709         while (cfs_hash_bd_count_get(&bd) > hw) {
1710                 if (unlikely(list_empty(&bkt->ntb_lru)))
1711                         break;
1712                 cli = list_entry(bkt->ntb_lru.next,
1713                                  struct nrs_tbf_client,
1714                                  tc_lru);
1715                 LASSERT(atomic_read(&cli->tc_ref) == 0);
1716                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1717                 list_move(&cli->tc_lru, &zombies);
1718         }
1719         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1720
1721         while (!list_empty(&zombies)) {
1722                 cli = container_of(zombies.next,
1723                                    struct nrs_tbf_client, tc_lru);
1724                 list_del_init(&cli->tc_lru);
1725                 nrs_tbf_cli_fini(cli);
1726         }
1727 }
1728
1729 static void
1730 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1731                          struct ptlrpc_request *req)
1732 {
1733         char keystr[NRS_TBF_KEY_LEN];
1734
1735         nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1736 }
1737
1738 static void
1739 nrs_tbf_id_list_free(struct list_head *uid_list)
1740 {
1741         struct nrs_tbf_id *nti_id, *n;
1742
1743         list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1744                 list_del_init(&nti_id->nti_linkage);
1745                 OBD_FREE_PTR(nti_id);
1746         }
1747 }
1748
1749 static void
1750 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1751 {
1752         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1753                 expr->te_field < NRS_TBF_FIELD_MAX);
1754         switch (expr->te_field) {
1755         case NRS_TBF_FIELD_NID:
1756                 cfs_free_nidlist(&expr->te_cond);
1757                 break;
1758         case NRS_TBF_FIELD_JOBID:
1759                 nrs_tbf_jobid_list_free(&expr->te_cond);
1760                 break;
1761         case NRS_TBF_FIELD_OPCODE:
1762                 CFS_FREE_BITMAP(expr->te_opcodes);
1763                 break;
1764         case NRS_TBF_FIELD_UID:
1765         case NRS_TBF_FIELD_GID:
1766                 nrs_tbf_id_list_free(&expr->te_cond);
1767                 break;
1768         default:
1769                 LBUG();
1770         }
1771         OBD_FREE_PTR(expr);
1772 }
1773
1774 static void
1775 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1776 {
1777         struct nrs_tbf_expression *expression;
1778         struct nrs_tbf_expression *n;
1779
1780         LASSERT(list_empty(&conjunction->tc_linkage));
1781         list_for_each_entry_safe(expression, n,
1782                                  &conjunction->tc_expressions,
1783                                  te_linkage) {
1784                 list_del_init(&expression->te_linkage);
1785                 nrs_tbf_expression_free(expression);
1786         }
1787         OBD_FREE_PTR(conjunction);
1788 }
1789
1790 static void
1791 nrs_tbf_conds_free(struct list_head *cond_list)
1792 {
1793         struct nrs_tbf_conjunction *conjunction;
1794         struct nrs_tbf_conjunction *n;
1795
1796         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1797                 list_del_init(&conjunction->tc_linkage);
1798                 nrs_tbf_conjunction_free(conjunction);
1799         }
1800 }
1801
1802 static void
1803 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1804 {
1805         if (!list_empty(&cmd->u.tc_start.ts_conds))
1806                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1807         if (cmd->u.tc_start.ts_conds_str)
1808                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1809                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1810 }
1811
1812 #define NRS_TBF_DISJUNCTION_DELIM       (',')
1813 #define NRS_TBF_CONJUNCTION_DELIM       ('&')
1814 #define NRS_TBF_EXPRESSION_DELIM        ('=')
1815
1816 static inline bool
1817 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1818 {
1819         int len = strlen(str);
1820
1821         return (field->ls_len == len &&
1822                 strncmp(field->ls_str, str, len) == 0);
1823 }
1824
1825 static int
1826 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1827 static int
1828 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
1829                       enum nrs_tbf_flag tif);
1830
1831 static int
1832 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1833 {
1834         struct nrs_tbf_expression *expr;
1835         struct cfs_lstr field;
1836         int rc = 0;
1837
1838         OBD_ALLOC_PTR(expr);
1839         if (expr == NULL)
1840                 return -ENOMEM;
1841
1842         rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1843         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1844             src->ls_str[src->ls_len - 1] != '}')
1845                 GOTO(out, rc = -EINVAL);
1846
1847         /* Skip '{' and '}' */
1848         src->ls_str++;
1849         src->ls_len -= 2;
1850
1851         if (nrs_tbf_check_field(&field, "nid")) {
1852                 if (cfs_parse_nidlist(src->ls_str,
1853                                       src->ls_len,
1854                                       &expr->te_cond) <= 0)
1855                         GOTO(out, rc = -EINVAL);
1856                 expr->te_field = NRS_TBF_FIELD_NID;
1857         } else if (nrs_tbf_check_field(&field, "jobid")) {
1858                 if (nrs_tbf_jobid_list_parse(src->ls_str,
1859                                              src->ls_len,
1860                                              &expr->te_cond) < 0)
1861                         GOTO(out, rc = -EINVAL);
1862                 expr->te_field = NRS_TBF_FIELD_JOBID;
1863         } else if (nrs_tbf_check_field(&field, "opcode")) {
1864                 if (nrs_tbf_opcode_list_parse(src->ls_str,
1865                                               src->ls_len,
1866                                               &expr->te_opcodes) < 0)
1867                         GOTO(out, rc = -EINVAL);
1868                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1869         } else if (nrs_tbf_check_field(&field, "uid")) {
1870                 if (nrs_tbf_id_list_parse(src->ls_str,
1871                                           src->ls_len,
1872                                           &expr->te_cond,
1873                                           NRS_TBF_FLAG_UID) < 0)
1874                         GOTO(out, rc = -EINVAL);
1875                 expr->te_field = NRS_TBF_FIELD_UID;
1876         } else if (nrs_tbf_check_field(&field, "gid")) {
1877                 if (nrs_tbf_id_list_parse(src->ls_str,
1878                                           src->ls_len,
1879                                           &expr->te_cond,
1880                                           NRS_TBF_FLAG_GID) < 0)
1881                         GOTO(out, rc = -EINVAL);
1882                 expr->te_field = NRS_TBF_FIELD_GID;
1883         } else {
1884                 GOTO(out, rc = -EINVAL);
1885         }
1886
1887         list_add_tail(&expr->te_linkage, cond_list);
1888         return 0;
1889 out:
1890         OBD_FREE_PTR(expr);
1891         return rc;
1892 }
1893
1894 static int
1895 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1896 {
1897         struct nrs_tbf_conjunction *conjunction;
1898         struct cfs_lstr expr;
1899         int rc = 0;
1900
1901         OBD_ALLOC_PTR(conjunction);
1902         if (conjunction == NULL)
1903                 return -ENOMEM;
1904
1905         INIT_LIST_HEAD(&conjunction->tc_expressions);
1906         list_add_tail(&conjunction->tc_linkage, cond_list);
1907
1908         while (src->ls_str) {
1909                 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1910                 if (rc == 0) {
1911                         rc = -EINVAL;
1912                         break;
1913                 }
1914                 rc = nrs_tbf_expression_parse(&expr,
1915                                               &conjunction->tc_expressions);
1916                 if (rc)
1917                         break;
1918         }
1919         return rc;
1920 }
1921
1922 static int
1923 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1924 {
1925         struct cfs_lstr src;
1926         struct cfs_lstr res;
1927         int rc = 0;
1928
1929         src.ls_str = str;
1930         src.ls_len = len;
1931         INIT_LIST_HEAD(cond_list);
1932         while (src.ls_str) {
1933                 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1934                 if (rc == 0) {
1935                         rc = -EINVAL;
1936                         break;
1937                 }
1938                 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1939                 if (rc)
1940                         break;
1941         }
1942         return rc;
1943 }
1944
1945 static int
1946 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1947 {
1948         int rc;
1949
1950         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1951         if (cmd->u.tc_start.ts_conds_str == NULL)
1952                 return -ENOMEM;
1953
1954         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1955
1956         /* Parse hybird NID and JOBID conditions */
1957         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1958                                  strlen(cmd->u.tc_start.ts_conds_str),
1959                                  &cmd->u.tc_start.ts_conds);
1960         if (rc)
1961                 nrs_tbf_generic_cmd_fini(cmd);
1962
1963         return rc;
1964 }
1965
1966 static int
1967 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1968
1969 static int
1970 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1971                          struct nrs_tbf_rule *rule,
1972                          struct nrs_tbf_client *cli)
1973 {
1974         switch (expr->te_field) {
1975         case NRS_TBF_FIELD_NID:
1976                 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1977         case NRS_TBF_FIELD_JOBID:
1978                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1979         case NRS_TBF_FIELD_OPCODE:
1980                 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1981         case NRS_TBF_FIELD_UID:
1982         case NRS_TBF_FIELD_GID:
1983                 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1984         default:
1985                 return 0;
1986         }
1987 }
1988
1989 static int
1990 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1991                           struct nrs_tbf_rule *rule,
1992                           struct nrs_tbf_client *cli)
1993 {
1994         struct nrs_tbf_expression *expr;
1995         int matched;
1996
1997         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
1998                 matched = nrs_tbf_expression_match(expr, rule, cli);
1999                 if (!matched)
2000                         return 0;
2001         }
2002
2003         return 1;
2004 }
2005
2006 static int
2007 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
2008 {
2009         struct nrs_tbf_conjunction *conjunction;
2010         int matched;
2011
2012         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2013                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2014                 if (matched)
2015                         return 1;
2016         }
2017
2018         return 0;
2019 }
2020
2021 static void
2022 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2023 {
2024         if (!list_empty(&rule->tr_conds))
2025                 nrs_tbf_conds_free(&rule->tr_conds);
2026         LASSERT(rule->tr_conds_str != NULL);
2027         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2028 }
2029
2030 static int
2031 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2032                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2033 {
2034         int rc = 0;
2035
2036         LASSERT(start->u.tc_start.ts_conds_str);
2037         OBD_ALLOC(rule->tr_conds_str,
2038                   strlen(start->u.tc_start.ts_conds_str) + 1);
2039         if (rule->tr_conds_str == NULL)
2040                 return -ENOMEM;
2041
2042         memcpy(rule->tr_conds_str,
2043                start->u.tc_start.ts_conds_str,
2044                strlen(start->u.tc_start.ts_conds_str));
2045
2046         INIT_LIST_HEAD(&rule->tr_conds);
2047         if (!list_empty(&start->u.tc_start.ts_conds)) {
2048                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2049                                          strlen(rule->tr_conds_str),
2050                                          &rule->tr_conds);
2051         }
2052         if (rc)
2053                 nrs_tbf_generic_rule_fini(rule);
2054
2055         return rc;
2056 }
2057
2058 static int
2059 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2060 {
2061         seq_printf(m, "%s %s %u, ref %d\n", rule->tr_name,
2062                    rule->tr_conds_str, rule->tr_rpc_rate,
2063                    atomic_read(&rule->tr_ref) - 1);
2064         return 0;
2065 }
2066
2067 static int
2068 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2069                            struct nrs_tbf_client *cli)
2070 {
2071         return nrs_tbf_cond_match(rule, cli);
2072 }
2073
2074 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2075         .o_name = NRS_TBF_TYPE_GENERIC,
2076         .o_startup = nrs_tbf_startup,
2077         .o_cli_find = nrs_tbf_cli_find,
2078         .o_cli_findadd = nrs_tbf_cli_findadd,
2079         .o_cli_put = nrs_tbf_cli_put,
2080         .o_cli_init = nrs_tbf_generic_cli_init,
2081         .o_rule_init = nrs_tbf_rule_init,
2082         .o_rule_dump = nrs_tbf_generic_rule_dump,
2083         .o_rule_match = nrs_tbf_generic_rule_match,
2084         .o_rule_fini = nrs_tbf_generic_rule_fini,
2085 };
2086
2087 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2088 {
2089         if (rule->tr_opcodes != NULL)
2090                 CFS_FREE_BITMAP(rule->tr_opcodes);
2091
2092         LASSERT(rule->tr_opcodes_str != NULL);
2093         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2094 }
2095
2096 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2097                                         unsigned mask)
2098 {
2099         return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2100 }
2101
2102 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2103 {
2104         const __u32     *opc = key;
2105         struct nrs_tbf_client *cli = hlist_entry(hnode,
2106                                                  struct nrs_tbf_client,
2107                                                  tc_hnode);
2108
2109         return *opc == cli->tc_opcode;
2110 }
2111
2112 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2113 {
2114         struct nrs_tbf_client *cli = hlist_entry(hnode,
2115                                                  struct nrs_tbf_client,
2116                                                  tc_hnode);
2117
2118         return &cli->tc_opcode;
2119 }
2120
2121 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2122                                    struct hlist_node *hnode)
2123 {
2124         struct nrs_tbf_client *cli = hlist_entry(hnode,
2125                                                  struct nrs_tbf_client,
2126                                                  tc_hnode);
2127
2128         atomic_inc(&cli->tc_ref);
2129 }
2130
2131 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2132                                    struct hlist_node *hnode)
2133 {
2134         struct nrs_tbf_client *cli = hlist_entry(hnode,
2135                                                  struct nrs_tbf_client,
2136                                                  tc_hnode);
2137
2138         atomic_dec(&cli->tc_ref);
2139 }
2140
2141 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2142                                     struct hlist_node *hnode)
2143 {
2144         struct nrs_tbf_client *cli = hlist_entry(hnode,
2145                                                  struct nrs_tbf_client,
2146                                                  tc_hnode);
2147
2148         LASSERTF(atomic_read(&cli->tc_ref) == 0,
2149                  "Busy TBF object from client with opcode %s, with %d refs\n",
2150                  ll_opcode2str(cli->tc_opcode),
2151                  atomic_read(&cli->tc_ref));
2152
2153         nrs_tbf_cli_fini(cli);
2154 }
2155 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2156         .hs_hash        = nrs_tbf_opcode_hop_hash,
2157         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
2158         .hs_key         = nrs_tbf_opcode_hop_key,
2159         .hs_object      = nrs_tbf_hop_object,
2160         .hs_get         = nrs_tbf_opcode_hop_get,
2161         .hs_put         = nrs_tbf_opcode_hop_put,
2162         .hs_put_locked  = nrs_tbf_opcode_hop_put,
2163         .hs_exit        = nrs_tbf_opcode_hop_exit,
2164 };
2165
2166 static int
2167 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2168                     struct nrs_tbf_head *head)
2169 {
2170         struct nrs_tbf_cmd      start = { 0 };
2171         int rc;
2172
2173         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2174                                             NRS_TBF_NID_BITS,
2175                                             NRS_TBF_NID_BITS,
2176                                             NRS_TBF_NID_BKT_BITS, 0,
2177                                             CFS_HASH_MIN_THETA,
2178                                             CFS_HASH_MAX_THETA,
2179                                             &nrs_tbf_opcode_hash_ops,
2180                                             CFS_HASH_RW_BKTLOCK);
2181         if (head->th_cli_hash == NULL)
2182                 return -ENOMEM;
2183
2184         start.u.tc_start.ts_opcodes_str = "*";
2185
2186         start.u.tc_start.ts_rpc_rate = tbf_rate;
2187         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2188         start.tc_name = NRS_TBF_DEFAULT_RULE;
2189         rc = nrs_tbf_rule_start(policy, head, &start);
2190
2191         return rc;
2192 }
2193
2194 static struct nrs_tbf_client *
2195 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2196                         struct ptlrpc_request *req)
2197 {
2198         __u32 opc;
2199
2200         opc = lustre_msg_get_opc(req->rq_reqmsg);
2201         return cfs_hash_lookup(head->th_cli_hash, &opc);
2202 }
2203
2204 static struct nrs_tbf_client *
2205 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2206                            struct nrs_tbf_client *cli)
2207 {
2208         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2209                                        &cli->tc_hnode);
2210 }
2211
2212 static void
2213 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2214                         struct ptlrpc_request *req)
2215 {
2216         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2217 }
2218
2219 #define MAX_OPCODE_LEN  32
2220 static int
2221 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
2222 {
2223         int     op = 0;
2224         char    opcode_str[MAX_OPCODE_LEN];
2225
2226         if (id->ls_len + 1 > MAX_OPCODE_LEN)
2227                 return -EINVAL;
2228
2229         memcpy(opcode_str, id->ls_str, id->ls_len);
2230         opcode_str[id->ls_len] = '\0';
2231
2232         op = ll_str2opcode(opcode_str);
2233         if (op < 0)
2234                 return -EINVAL;
2235
2236         cfs_bitmap_set(opcodes, op);
2237         return 0;
2238 }
2239
2240 static int
2241 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
2242 {
2243         struct cfs_bitmap *opcodes;
2244         struct cfs_lstr src;
2245         struct cfs_lstr res;
2246         int rc = 0;
2247         ENTRY;
2248
2249         opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
2250         if (opcodes == NULL)
2251                 return -ENOMEM;
2252
2253         src.ls_str = str;
2254         src.ls_len = len;
2255         while (src.ls_str) {
2256                 rc = cfs_gettok(&src, ' ', &res);
2257                 if (rc == 0) {
2258                         rc = -EINVAL;
2259                         break;
2260                 }
2261                 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2262                 if (rc)
2263                         break;
2264         }
2265
2266         if (rc == 0 && bitmaptr)
2267                 *bitmaptr = opcodes;
2268         else
2269                 CFS_FREE_BITMAP(opcodes);
2270
2271         RETURN(rc);
2272 }
2273
2274 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2275 {
2276         if (cmd->u.tc_start.ts_opcodes_str)
2277                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2278                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2279
2280 }
2281
2282 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2283 {
2284         struct cfs_lstr src;
2285         int rc;
2286
2287         src.ls_str = id;
2288         src.ls_len = strlen(id);
2289         rc = nrs_tbf_check_id_value(&src, "opcode");
2290         if (rc)
2291                 return rc;
2292
2293         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2294         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2295                 return -ENOMEM;
2296
2297         memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2298
2299         /* parse opcode list */
2300         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2301                                        strlen(cmd->u.tc_start.ts_opcodes_str),
2302                                        NULL);
2303         if (rc)
2304                 nrs_tbf_opcode_cmd_fini(cmd);
2305
2306         return rc;
2307 }
2308
2309 static int
2310 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2311                           struct nrs_tbf_client *cli)
2312 {
2313         if (rule->tr_opcodes == NULL)
2314                 return 0;
2315
2316         return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2317 }
2318
2319 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2320                                     struct nrs_tbf_rule *rule,
2321                                     struct nrs_tbf_cmd *start)
2322 {
2323         int rc = 0;
2324
2325         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2326         OBD_ALLOC(rule->tr_opcodes_str,
2327                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2328         if (rule->tr_opcodes_str == NULL)
2329                 return -ENOMEM;
2330
2331         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2332                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2333
2334         /* Default rule '*' */
2335         if (strcmp(start->u.tc_start.ts_opcodes_str, "*") == 0)
2336                 return 0;
2337
2338         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2339                                        strlen(rule->tr_opcodes_str),
2340                                        &rule->tr_opcodes);
2341         if (rc)
2342                 OBD_FREE(rule->tr_opcodes_str,
2343                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2344
2345         return rc;
2346 }
2347
2348 static int
2349 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2350 {
2351         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
2352                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2353                    atomic_read(&rule->tr_ref) - 1);
2354         return 0;
2355 }
2356
2357
2358 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2359         .o_name = NRS_TBF_TYPE_OPCODE,
2360         .o_startup = nrs_tbf_opcode_startup,
2361         .o_cli_find = nrs_tbf_opcode_cli_find,
2362         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2363         .o_cli_put = nrs_tbf_nid_cli_put,
2364         .o_cli_init = nrs_tbf_opcode_cli_init,
2365         .o_rule_init = nrs_tbf_opcode_rule_init,
2366         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2367         .o_rule_match = nrs_tbf_opcode_rule_match,
2368         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2369 };
2370
2371 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2372                                     unsigned mask)
2373 {
2374         return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2375 }
2376
2377 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2378 {
2379         const struct tbf_id *opc = key;
2380         enum nrs_tbf_flag ntf;
2381         struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2382                                                  tc_hnode);
2383         ntf = opc->ti_type & cli->tc_id.ti_type;
2384         if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2385                 return 0;
2386
2387         if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2388                 return 0;
2389
2390         return 1;
2391 }
2392
2393 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2394 {
2395         struct nrs_tbf_client *cli = hlist_entry(hnode,
2396                                                  struct nrs_tbf_client,
2397                                                  tc_hnode);
2398         return &cli->tc_id;
2399 }
2400
2401 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2402 {
2403         struct nrs_tbf_client *cli = hlist_entry(hnode,
2404                                                  struct nrs_tbf_client,
2405                                                  tc_hnode);
2406
2407         atomic_inc(&cli->tc_ref);
2408 }
2409
2410 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2411 {
2412         struct nrs_tbf_client *cli = hlist_entry(hnode,
2413                                                  struct nrs_tbf_client,
2414                                                  tc_hnode);
2415
2416         atomic_dec(&cli->tc_ref);
2417 }
2418
2419 static void
2420 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2421
2422 {
2423         struct nrs_tbf_client *cli = hlist_entry(hnode,
2424                                                  struct nrs_tbf_client,
2425                                                  tc_hnode);
2426
2427         LASSERT(atomic_read(&cli->tc_ref) == 0);
2428         nrs_tbf_cli_fini(cli);
2429 }
2430
2431 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2432         .hs_hash        = nrs_tbf_id_hop_hash,
2433         .hs_keycmp      = nrs_tbf_id_hop_keycmp,
2434         .hs_key         = nrs_tbf_id_hop_key,
2435         .hs_object      = nrs_tbf_hop_object,
2436         .hs_get         = nrs_tbf_id_hop_get,
2437         .hs_put         = nrs_tbf_id_hop_put,
2438         .hs_put_locked  = nrs_tbf_id_hop_put,
2439         .hs_exit        = nrs_tbf_id_hop_exit,
2440 };
2441
2442 static int
2443 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2444                    struct nrs_tbf_head *head)
2445 {
2446         struct nrs_tbf_cmd start;
2447         int rc;
2448
2449         head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2450                                             NRS_TBF_NID_BITS,
2451                                             NRS_TBF_NID_BITS,
2452                                             NRS_TBF_NID_BKT_BITS, 0,
2453                                             CFS_HASH_MIN_THETA,
2454                                             CFS_HASH_MAX_THETA,
2455                                             &nrs_tbf_id_hash_ops,
2456                                             CFS_HASH_RW_BKTLOCK);
2457         if (head->th_cli_hash == NULL)
2458                 return -ENOMEM;
2459
2460         memset(&start, 0, sizeof(start));
2461         start.u.tc_start.ts_ids_str = "*";
2462         start.u.tc_start.ts_rpc_rate = tbf_rate;
2463         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2464         start.tc_name = NRS_TBF_DEFAULT_RULE;
2465         INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2466         rc = nrs_tbf_rule_start(policy, head, &start);
2467         if (rc) {
2468                 cfs_hash_putref(head->th_cli_hash);
2469                 head->th_cli_hash = NULL;
2470         }
2471
2472         return rc;
2473 }
2474
2475 static struct nrs_tbf_client *
2476 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2477                     struct ptlrpc_request *req)
2478 {
2479         struct tbf_id id;
2480
2481         LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2482                 head->th_type_flag == NRS_TBF_FLAG_GID);
2483
2484         nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2485         return cfs_hash_lookup(head->th_cli_hash, &id);
2486 }
2487
2488 static struct nrs_tbf_client *
2489 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2490                        struct nrs_tbf_client *cli)
2491 {
2492         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2493                                        &cli->tc_hnode);
2494 }
2495
2496 static void
2497 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2498                      struct ptlrpc_request *req)
2499 {
2500         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2501 }
2502
2503 static void
2504 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2505                      struct ptlrpc_request *req)
2506 {
2507         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2508 }
2509
2510 static int
2511 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2512 {
2513         struct nrs_tbf_id *nti_id;
2514         enum nrs_tbf_flag flag;
2515
2516         list_for_each_entry(nti_id, id_list, nti_linkage) {
2517                 flag = id.ti_type & nti_id->nti_id.ti_type;
2518                 if (!flag)
2519                         continue;
2520
2521                 if ((flag & NRS_TBF_FLAG_UID) &&
2522                     (id.ti_uid != nti_id->nti_id.ti_uid))
2523                         continue;
2524
2525                 if ((flag & NRS_TBF_FLAG_GID) &&
2526                     (id.ti_gid != nti_id->nti_id.ti_gid))
2527                         continue;
2528
2529                 return 1;
2530         }
2531         return 0;
2532 }
2533
2534 static int
2535 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2536                       struct nrs_tbf_client *cli)
2537 {
2538         return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2539 }
2540
2541 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2542 {
2543         nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2544
2545         if (cmd->u.tc_start.ts_ids_str)
2546                 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2547                          strlen(cmd->u.tc_start.ts_ids_str) + 1);
2548 }
2549
2550 static int
2551 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
2552                       enum nrs_tbf_flag tif)
2553 {
2554         struct cfs_lstr src;
2555         struct cfs_lstr res;
2556         int rc = 0;
2557         struct tbf_id id = { 0 };
2558         ENTRY;
2559
2560         if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2561                 RETURN(-EINVAL);
2562
2563         src.ls_str = str;
2564         src.ls_len = len;
2565         INIT_LIST_HEAD(id_list);
2566         while (src.ls_str) {
2567                 struct nrs_tbf_id *nti_id;
2568
2569                 if (cfs_gettok(&src, ' ', &res) == 0)
2570                         GOTO(out, rc = -EINVAL);
2571
2572                 id.ti_type = tif;
2573                 if (tif == NRS_TBF_FLAG_UID) {
2574                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2575                                                &id.ti_uid, 0, (u32)~0U))
2576                                 GOTO(out, rc = -EINVAL);
2577                 } else {
2578                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2579                                                &id.ti_gid, 0, (u32)~0U))
2580                                 GOTO(out, rc = -EINVAL);
2581                 }
2582
2583                 OBD_ALLOC_PTR(nti_id);
2584                 if (nti_id == NULL)
2585                         GOTO(out, rc = -ENOMEM);
2586
2587                 nti_id->nti_id = id;
2588                 list_add_tail(&nti_id->nti_linkage, id_list);
2589         }
2590 out:
2591         if (rc)
2592                 nrs_tbf_id_list_free(id_list);
2593         RETURN(rc);
2594 }
2595
2596 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2597 {
2598         struct cfs_lstr src;
2599         int rc;
2600         enum nrs_tbf_flag tif;
2601
2602         tif = cmd->u.tc_start.ts_valid_type;
2603
2604         src.ls_str = id;
2605         src.ls_len = strlen(id);
2606
2607         rc = nrs_tbf_check_id_value(&src,
2608                                     tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2609         if (rc)
2610                 return rc;
2611
2612         OBD_ALLOC(cmd->u.tc_start.ts_ids_str, src.ls_len + 1);
2613         if (cmd->u.tc_start.ts_ids_str == NULL)
2614                 return -ENOMEM;
2615
2616         strlcpy(cmd->u.tc_start.ts_ids_str, src.ls_str, src.ls_len + 1);
2617
2618         rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2619                                    strlen(cmd->u.tc_start.ts_ids_str),
2620                                    &cmd->u.tc_start.ts_ids, tif);
2621         if (rc)
2622                 nrs_tbf_id_cmd_fini(cmd);
2623
2624         return rc;
2625 }
2626
2627 static int
2628 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2629                      struct nrs_tbf_rule *rule,
2630                      struct nrs_tbf_cmd *start)
2631 {
2632         struct nrs_tbf_head *head = rule->tr_head;
2633         int rc = 0;
2634         enum nrs_tbf_flag tif = head->th_type_flag;
2635         int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2636
2637         LASSERT(start->u.tc_start.ts_ids_str);
2638         INIT_LIST_HEAD(&rule->tr_ids);
2639
2640         OBD_ALLOC(rule->tr_ids_str, ids_len);
2641         if (rule->tr_ids_str == NULL)
2642                 return -ENOMEM;
2643
2644         strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2645                 ids_len);
2646
2647         if (!list_empty(&start->u.tc_start.ts_ids)) {
2648                 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2649                                            strlen(rule->tr_ids_str),
2650                                            &rule->tr_ids, tif);
2651                 if (rc)
2652                         CERROR("%ss {%s} illegal\n",
2653                                tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2654                                rule->tr_ids_str);
2655         }
2656         if (rc) {
2657                 OBD_FREE(rule->tr_ids_str, ids_len);
2658                 rule->tr_ids_str = NULL;
2659         }
2660         return rc;
2661 }
2662
2663 static int
2664 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2665 {
2666         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
2667                    rule->tr_ids_str, rule->tr_rpc_rate,
2668                    atomic_read(&rule->tr_ref) - 1);
2669         return 0;
2670 }
2671
2672 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2673 {
2674         nrs_tbf_id_list_free(&rule->tr_ids);
2675         if (rule->tr_ids_str != NULL)
2676                 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2677 }
2678
2679 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2680         .o_name = NRS_TBF_TYPE_UID,
2681         .o_startup = nrs_tbf_id_startup,
2682         .o_cli_find = nrs_tbf_id_cli_find,
2683         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2684         .o_cli_put = nrs_tbf_nid_cli_put,
2685         .o_cli_init = nrs_tbf_uid_cli_init,
2686         .o_rule_init = nrs_tbf_id_rule_init,
2687         .o_rule_dump = nrs_tbf_id_rule_dump,
2688         .o_rule_match = nrs_tbf_id_rule_match,
2689         .o_rule_fini = nrs_tbf_id_rule_fini,
2690 };
2691
2692 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2693         .o_name = NRS_TBF_TYPE_GID,
2694         .o_startup = nrs_tbf_id_startup,
2695         .o_cli_find = nrs_tbf_id_cli_find,
2696         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2697         .o_cli_put = nrs_tbf_nid_cli_put,
2698         .o_cli_init = nrs_tbf_gid_cli_init,
2699         .o_rule_init = nrs_tbf_id_rule_init,
2700         .o_rule_dump = nrs_tbf_id_rule_dump,
2701         .o_rule_match = nrs_tbf_id_rule_match,
2702         .o_rule_fini = nrs_tbf_id_rule_fini,
2703 };
2704
2705 static struct nrs_tbf_type nrs_tbf_types[] = {
2706         {
2707                 .ntt_name = NRS_TBF_TYPE_JOBID,
2708                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2709                 .ntt_ops = &nrs_tbf_jobid_ops,
2710         },
2711         {
2712                 .ntt_name = NRS_TBF_TYPE_NID,
2713                 .ntt_flag = NRS_TBF_FLAG_NID,
2714                 .ntt_ops = &nrs_tbf_nid_ops,
2715         },
2716         {
2717                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2718                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2719                 .ntt_ops = &nrs_tbf_opcode_ops,
2720         },
2721         {
2722                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2723                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2724                 .ntt_ops = &nrs_tbf_generic_ops,
2725         },
2726         {
2727                 .ntt_name = NRS_TBF_TYPE_UID,
2728                 .ntt_flag = NRS_TBF_FLAG_UID,
2729                 .ntt_ops = &nrs_tbf_uid_ops,
2730         },
2731         {
2732                 .ntt_name = NRS_TBF_TYPE_GID,
2733                 .ntt_flag = NRS_TBF_FLAG_GID,
2734                 .ntt_ops = &nrs_tbf_gid_ops,
2735         },
2736 };
2737
2738 /**
2739  * Is called before the policy transitions into
2740  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2741  * policy-specific private data structure.
2742  *
2743  * \param[in] policy The policy to start
2744  *
2745  * \retval -ENOMEM OOM error
2746  * \retval  0      success
2747  *
2748  * \see nrs_policy_register()
2749  * \see nrs_policy_ctl()
2750  */
2751 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2752 {
2753         struct nrs_tbf_head     *head;
2754         struct nrs_tbf_ops      *ops;
2755         __u32                    type;
2756         char                    *name;
2757         int found = 0;
2758         int i;
2759         int rc = 0;
2760
2761         if (arg == NULL)
2762                 name = NRS_TBF_TYPE_GENERIC;
2763         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2764                 name = arg;
2765         else
2766                 GOTO(out, rc = -EINVAL);
2767
2768         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2769                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2770                         ops = nrs_tbf_types[i].ntt_ops;
2771                         type = nrs_tbf_types[i].ntt_flag;
2772                         found = 1;
2773                         break;
2774                 }
2775         }
2776         if (found == 0)
2777                 GOTO(out, rc = -ENOTSUPP);
2778
2779         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2780         if (head == NULL)
2781                 GOTO(out, rc = -ENOMEM);
2782
2783         memcpy(head->th_type, name, strlen(name));
2784         head->th_type[strlen(name)] = '\0';
2785         head->th_ops = ops;
2786         head->th_type_flag = type;
2787
2788         head->th_binheap = binheap_create(&nrs_tbf_heap_ops,
2789                                           CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2790                                           nrs_pol2cptab(policy),
2791                                           nrs_pol2cptid(policy));
2792         if (head->th_binheap == NULL)
2793                 GOTO(out_free_head, rc = -ENOMEM);
2794
2795         atomic_set(&head->th_rule_sequence, 0);
2796         spin_lock_init(&head->th_rule_lock);
2797         INIT_LIST_HEAD(&head->th_list);
2798         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2799         head->th_timer.function = nrs_tbf_timer_cb;
2800         rc = head->th_ops->o_startup(policy, head);
2801         if (rc)
2802                 GOTO(out_free_heap, rc);
2803
2804         policy->pol_private = head;
2805         return 0;
2806 out_free_heap:
2807         binheap_destroy(head->th_binheap);
2808 out_free_head:
2809         OBD_FREE_PTR(head);
2810 out:
2811         return rc;
2812 }
2813
2814 /**
2815  * Is called before the policy transitions into
2816  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2817  * private data structure.
2818  *
2819  * \param[in] policy The policy to stop
2820  *
2821  * \see nrs_policy_stop0()
2822  */
2823 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2824 {
2825         struct nrs_tbf_head *head = policy->pol_private;
2826         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2827         struct nrs_tbf_rule *rule, *n;
2828
2829         LASSERT(head != NULL);
2830         LASSERT(head->th_cli_hash != NULL);
2831         hrtimer_cancel(&head->th_timer);
2832         /* Should cleanup hash first before free rules */
2833         cfs_hash_putref(head->th_cli_hash);
2834         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2835                 list_del_init(&rule->tr_linkage);
2836                 nrs_tbf_rule_put(rule);
2837         }
2838         LASSERT(list_empty(&head->th_list));
2839         LASSERT(head->th_binheap != NULL);
2840         LASSERT(binheap_is_empty(head->th_binheap));
2841         binheap_destroy(head->th_binheap);
2842         OBD_FREE_PTR(head);
2843         nrs->nrs_throttling = 0;
2844         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2845 }
2846
2847 /**
2848  * Performs a policy-specific ctl function on TBF policy instances; similar
2849  * to ioctl.
2850  *
2851  * \param[in]     policy the policy instance
2852  * \param[in]     opc    the opcode
2853  * \param[in,out] arg    used for passing parameters and information
2854  *
2855  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2856  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2857  *
2858  * \retval 0   operation carried out successfully
2859  * \retval -ve error
2860  */
2861 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2862                        enum ptlrpc_nrs_ctl opc,
2863                        void *arg)
2864 {
2865         int rc = 0;
2866         ENTRY;
2867
2868         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2869
2870         switch ((enum nrs_ctl_tbf)opc) {
2871         default:
2872                 RETURN(-EINVAL);
2873
2874         /**
2875          * Read RPC rate size of a policy instance.
2876          */
2877         case NRS_CTL_TBF_RD_RULE: {
2878                 struct nrs_tbf_head *head = policy->pol_private;
2879                 struct seq_file *m = arg;
2880                 struct ptlrpc_service_part *svcpt;
2881
2882                 svcpt = policy->pol_nrs->nrs_svcpt;
2883                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2884
2885                 rc = nrs_tbf_rule_dump_all(head, m);
2886                 }
2887                 break;
2888
2889         /**
2890          * Write RPC rate of a policy instance.
2891          */
2892         case NRS_CTL_TBF_WR_RULE: {
2893                 struct nrs_tbf_head *head = policy->pol_private;
2894                 struct nrs_tbf_cmd *cmd;
2895
2896                 cmd = (struct nrs_tbf_cmd *)arg;
2897                 rc = nrs_tbf_command(policy,
2898                                      head,
2899                                      cmd);
2900                 }
2901                 break;
2902         /**
2903          * Read the TBF policy type of a policy instance.
2904          */
2905         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2906                 struct nrs_tbf_head *head = policy->pol_private;
2907
2908                 *(__u32 *)arg = head->th_type_flag;
2909                 }
2910                 break;
2911         }
2912
2913         RETURN(rc);
2914 }
2915
2916 /**
2917  * Is called for obtaining a TBF policy resource.
2918  *
2919  * \param[in]  policy     The policy on which the request is being asked for
2920  * \param[in]  nrq        The request for which resources are being taken
2921  * \param[in]  parent     Parent resource, unused in this policy
2922  * \param[out] resp       Resources references are placed in this array
2923  * \param[in]  moving_req Signifies limited caller context; unused in this
2924  *                        policy
2925  *
2926  *
2927  * \see nrs_resource_get_safe()
2928  */
2929 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2930                            struct ptlrpc_nrs_request *nrq,
2931                            const struct ptlrpc_nrs_resource *parent,
2932                            struct ptlrpc_nrs_resource **resp,
2933                            bool moving_req)
2934 {
2935         struct nrs_tbf_head   *head;
2936         struct nrs_tbf_client *cli;
2937         struct nrs_tbf_client *tmp;
2938         struct ptlrpc_request *req;
2939
2940         if (parent == NULL) {
2941                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2942                 return 0;
2943         }
2944
2945         head = container_of(parent, struct nrs_tbf_head, th_res);
2946         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2947         cli = head->th_ops->o_cli_find(head, req);
2948         if (cli != NULL) {
2949                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2950                 LASSERT(cli->tc_rule);
2951                 if (cli->tc_rule_sequence !=
2952                     atomic_read(&head->th_rule_sequence) ||
2953                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2954                         struct nrs_tbf_rule *rule;
2955
2956                         CDEBUG(D_RPCTRACE,
2957                                "TBF class@%p rate %u sequence %d, "
2958                                "rule flags %d, head sequence %d\n",
2959                                cli, cli->tc_rpc_rate,
2960                                cli->tc_rule_sequence,
2961                                cli->tc_rule->tr_flags,
2962                                atomic_read(&head->th_rule_sequence));
2963                         rule = nrs_tbf_rule_match(head, cli);
2964                         if (rule != cli->tc_rule) {
2965                                 nrs_tbf_cli_reset(head, rule, cli);
2966                         } else {
2967                                 if (cli->tc_rule_generation != rule->tr_generation)
2968                                         nrs_tbf_cli_reset_value(head, cli);
2969                                 nrs_tbf_rule_put(rule);
2970                         }
2971                 } else if (cli->tc_rule_generation !=
2972                            cli->tc_rule->tr_generation) {
2973                         nrs_tbf_cli_reset_value(head, cli);
2974                 }
2975                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2976                 goto out;
2977         }
2978
2979         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2980                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2981         if (cli == NULL)
2982                 return -ENOMEM;
2983
2984         nrs_tbf_cli_init(head, cli, req);
2985         tmp = head->th_ops->o_cli_findadd(head, cli);
2986         if (tmp != cli) {
2987                 atomic_dec(&cli->tc_ref);
2988                 nrs_tbf_cli_fini(cli);
2989                 cli = tmp;
2990         }
2991 out:
2992         *resp = &cli->tc_res;
2993
2994         return 1;
2995 }
2996
2997 /**
2998  * Called when releasing references to the resource hierachy obtained for a
2999  * request for scheduling using the TBF policy.
3000  *
3001  * \param[in] policy   the policy the resource belongs to
3002  * \param[in] res      the resource to be released
3003  */
3004 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
3005                             const struct ptlrpc_nrs_resource *res)
3006 {
3007         struct nrs_tbf_head   *head;
3008         struct nrs_tbf_client *cli;
3009
3010         /**
3011          * Do nothing for freeing parent, nrs_tbf_net resources
3012          */
3013         if (res->res_parent == NULL)
3014                 return;
3015
3016         cli = container_of(res, struct nrs_tbf_client, tc_res);
3017         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
3018
3019         head->th_ops->o_cli_put(head, cli);
3020 }
3021
3022 /**
3023  * Called when getting a request from the TBF policy for handling, or just
3024  * peeking; removes the request from the policy when it is to be handled.
3025  *
3026  * \param[in] policy The policy
3027  * \param[in] peek   When set, signifies that we just want to examine the
3028  *                   request, and not handle it, so the request is not removed
3029  *                   from the policy.
3030  * \param[in] force  Force the policy to return a request; unused in this
3031  *                   policy
3032  *
3033  * \retval The request to be handled; this is the next request in the TBF
3034  *         rule
3035  *
3036  * \see ptlrpc_nrs_req_get_nolock()
3037  * \see nrs_request_get()
3038  */
3039 static
3040 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3041                                            bool peek, bool force)
3042 {
3043         struct nrs_tbf_head       *head = policy->pol_private;
3044         struct ptlrpc_nrs_request *nrq = NULL;
3045         struct nrs_tbf_client     *cli;
3046         struct binheap_node       *node;
3047
3048         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3049
3050         if (!peek && policy->pol_nrs->nrs_throttling)
3051                 return NULL;
3052
3053         node = binheap_root(head->th_binheap);
3054         if (unlikely(node == NULL))
3055                 return NULL;
3056
3057         cli = container_of(node, struct nrs_tbf_client, tc_node);
3058         LASSERT(cli->tc_in_heap);
3059         if (peek) {
3060                 nrq = list_entry(cli->tc_list.next,
3061                                      struct ptlrpc_nrs_request,
3062                                      nr_u.tbf.tr_list);
3063         } else {
3064                 struct nrs_tbf_rule *rule = cli->tc_rule;
3065                 __u64 now = ktime_to_ns(ktime_get());
3066                 __u64 passed;
3067                 __u64 ntoken;
3068                 __u64 deadline;
3069                 __u64 old_resid = 0;
3070
3071                 deadline = cli->tc_check_time +
3072                           cli->tc_nsecs;
3073                 LASSERT(now >= cli->tc_check_time);
3074                 passed = now - cli->tc_check_time;
3075                 ntoken = passed * cli->tc_rpc_rate;
3076                 do_div(ntoken, NSEC_PER_SEC);
3077
3078                 ntoken += cli->tc_ntoken;
3079                 if (rule->tr_flags & NTRS_REALTIME) {
3080                         LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3081                         old_resid = cli->tc_nsecs_resid;
3082                         cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3083                         if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3084                                 ntoken++;
3085                                 cli->tc_nsecs_resid -= cli->tc_nsecs;
3086                         }
3087                 } else if (ntoken > cli->tc_depth)
3088                         ntoken = cli->tc_depth;
3089
3090                 if (ntoken > 0) {
3091                         struct ptlrpc_request *req;
3092                         nrq = list_entry(cli->tc_list.next,
3093                                              struct ptlrpc_nrs_request,
3094                                              nr_u.tbf.tr_list);
3095                         req = container_of(nrq,
3096                                            struct ptlrpc_request,
3097                                            rq_nrq);
3098                         ntoken--;
3099                         cli->tc_ntoken = ntoken;
3100                         cli->tc_check_time = now;
3101                         list_del_init(&nrq->nr_u.tbf.tr_list);
3102                         if (list_empty(&cli->tc_list)) {
3103                                 binheap_remove(head->th_binheap,
3104                                                &cli->tc_node);
3105                                 cli->tc_in_heap = false;
3106                         } else {
3107                                 if (!(rule->tr_flags & NTRS_REALTIME))
3108                                         cli->tc_deadline = now + cli->tc_nsecs;
3109                                 binheap_relocate(head->th_binheap,
3110                                                  &cli->tc_node);
3111                         }
3112                         CDEBUG(D_RPCTRACE,
3113                                "TBF dequeues: class@%p rate %u gen %llu "
3114                                "token %llu, rule@%p rate %u gen %llu\n",
3115                                cli, cli->tc_rpc_rate,
3116                                cli->tc_rule_generation, cli->tc_ntoken,
3117                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3118                                cli->tc_rule->tr_generation);
3119                 } else {
3120                         ktime_t time;
3121
3122                         if (rule->tr_flags & NTRS_REALTIME) {
3123                                 cli->tc_deadline = deadline;
3124                                 cli->tc_nsecs_resid = old_resid;
3125                                 binheap_relocate(head->th_binheap,
3126                                                  &cli->tc_node);
3127                                 if (node != binheap_root(head->th_binheap))
3128                                         return nrs_tbf_req_get(policy,
3129                                                                peek, force);
3130                         }
3131                         policy->pol_nrs->nrs_throttling = 1;
3132                         head->th_deadline = deadline;
3133                         time = ktime_set(0, 0);
3134                         time = ktime_add_ns(time, deadline);
3135                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3136                 }
3137         }
3138
3139         return nrq;
3140 }
3141
3142 /**
3143  * Adds request \a nrq to \a policy's list of queued requests
3144  *
3145  * \param[in] policy The policy
3146  * \param[in] nrq    The request to add
3147  *
3148  * \retval 0 success; nrs_request_enqueue() assumes this function will always
3149  *                    succeed
3150  */
3151 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3152                            struct ptlrpc_nrs_request *nrq)
3153 {
3154         struct nrs_tbf_head   *head;
3155         struct nrs_tbf_client *cli;
3156         int                    rc = 0;
3157
3158         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3159
3160         cli = container_of(nrs_request_resource(nrq),
3161                            struct nrs_tbf_client, tc_res);
3162         head = container_of(nrs_request_resource(nrq)->res_parent,
3163                             struct nrs_tbf_head, th_res);
3164         if (list_empty(&cli->tc_list)) {
3165                 LASSERT(!cli->tc_in_heap);
3166                 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3167                 rc = binheap_insert(head->th_binheap, &cli->tc_node);
3168                 if (rc == 0) {
3169                         cli->tc_in_heap = true;
3170                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3171                         list_add_tail(&nrq->nr_u.tbf.tr_list,
3172                                           &cli->tc_list);
3173                         if (policy->pol_nrs->nrs_throttling) {
3174                                 __u64 deadline = cli->tc_deadline;
3175                                 if ((head->th_deadline > deadline) &&
3176                                     (hrtimer_try_to_cancel(&head->th_timer)
3177                                      >= 0)) {
3178                                         ktime_t time;
3179                                         head->th_deadline = deadline;
3180                                         time = ktime_set(0, 0);
3181                                         time = ktime_add_ns(time, deadline);
3182                                         hrtimer_start(&head->th_timer, time,
3183                                                       HRTIMER_MODE_ABS);
3184                                 }
3185                         }
3186                 }
3187         } else {
3188                 LASSERT(cli->tc_in_heap);
3189                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3190                 list_add_tail(&nrq->nr_u.tbf.tr_list,
3191                                   &cli->tc_list);
3192         }
3193
3194         if (rc == 0)
3195                 CDEBUG(D_RPCTRACE,
3196                        "TBF enqueues: class@%p rate %u gen %llu "
3197                        "token %llu, rule@%p rate %u gen %llu\n",
3198                        cli, cli->tc_rpc_rate,
3199                        cli->tc_rule_generation, cli->tc_ntoken,
3200                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3201                        cli->tc_rule->tr_generation);
3202
3203         return rc;
3204 }
3205
3206 /**
3207  * Removes request \a nrq from \a policy's list of queued requests.
3208  *
3209  * \param[in] policy The policy
3210  * \param[in] nrq    The request to remove
3211  */
3212 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3213                              struct ptlrpc_nrs_request *nrq)
3214 {
3215         struct nrs_tbf_head   *head;
3216         struct nrs_tbf_client *cli;
3217
3218         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3219
3220         cli = container_of(nrs_request_resource(nrq),
3221                            struct nrs_tbf_client, tc_res);
3222         head = container_of(nrs_request_resource(nrq)->res_parent,
3223                             struct nrs_tbf_head, th_res);
3224
3225         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3226         list_del_init(&nrq->nr_u.tbf.tr_list);
3227         if (list_empty(&cli->tc_list)) {
3228                 binheap_remove(head->th_binheap,
3229                                &cli->tc_node);
3230                 cli->tc_in_heap = false;
3231         } else {
3232                 binheap_relocate(head->th_binheap,
3233                                  &cli->tc_node);
3234         }
3235 }
3236
3237 /**
3238  * Prints a debug statement right before the request \a nrq stops being
3239  * handled.
3240  *
3241  * \param[in] policy The policy handling the request
3242  * \param[in] nrq    The request being handled
3243  *
3244  * \see ptlrpc_server_finish_request()
3245  * \see ptlrpc_nrs_req_stop_nolock()
3246  */
3247 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3248                               struct ptlrpc_nrs_request *nrq)
3249 {
3250         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3251                                                   rq_nrq);
3252
3253         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3254
3255         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3256                policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
3257                nrq->nr_u.tbf.tr_sequence);
3258 }
3259
3260 /**
3261  * debugfs interface
3262  */
3263
3264 /**
3265  * The maximum RPC rate.
3266  */
3267 #define LPROCFS_NRS_RATE_MAX            65535
3268
3269 static int
3270 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3271 {
3272         struct ptlrpc_service       *svc = m->private;
3273         int                          rc;
3274
3275         seq_printf(m, "regular_requests:\n");
3276         /**
3277          * Perform two separate calls to this as only one of the NRS heads'
3278          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3279          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3280          */
3281         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3282                                        NRS_POL_NAME_TBF,
3283                                        NRS_CTL_TBF_RD_RULE,
3284                                        false, m);
3285         if (rc == 0) {
3286                 /**
3287                  * -ENOSPC means buf in the parameter m is overflow, return 0
3288                  * here to let upper layer function seq_read alloc a larger
3289                  * memory area and do this process again.
3290                  */
3291         } else if (rc == -ENOSPC) {
3292                 return 0;
3293
3294                 /**
3295                  * Ignore -ENODEV as the regular NRS head's policy may be in the
3296                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3297                  */
3298         } else if (rc != -ENODEV) {
3299                 return rc;
3300         }
3301
3302         if (!nrs_svc_has_hp(svc))
3303                 goto no_hp;
3304
3305         seq_printf(m, "high_priority_requests:\n");
3306         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3307                                        NRS_POL_NAME_TBF,
3308                                        NRS_CTL_TBF_RD_RULE,
3309                                        false, m);
3310         if (rc == 0) {
3311                 /**
3312                  * -ENOSPC means buf in the parameter m is overflow, return 0
3313                  * here to let upper layer function seq_read alloc a larger
3314                  * memory area and do this process again.
3315                  */
3316         } else if (rc == -ENOSPC) {
3317                 return 0;
3318         }
3319
3320 no_hp:
3321
3322         return rc;
3323 }
3324
3325 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3326 {
3327         int rc;
3328         ENTRY;
3329
3330         switch (cmd->u.tc_start.ts_valid_type) {
3331         case NRS_TBF_FLAG_JOBID:
3332                 rc = nrs_tbf_jobid_parse(cmd, token);
3333                 break;
3334         case NRS_TBF_FLAG_NID:
3335                 rc = nrs_tbf_nid_parse(cmd, token);
3336                 break;
3337         case NRS_TBF_FLAG_OPCODE:
3338                 rc = nrs_tbf_opcode_parse(cmd, token);
3339                 break;
3340         case NRS_TBF_FLAG_GENERIC:
3341                 rc = nrs_tbf_generic_parse(cmd, token);
3342                 break;
3343         case NRS_TBF_FLAG_UID:
3344         case NRS_TBF_FLAG_GID:
3345                 rc = nrs_tbf_ug_id_parse(cmd, token);
3346                 break;
3347         default:
3348                 RETURN(-EINVAL);
3349         }
3350
3351         RETURN(rc);
3352 }
3353
3354 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3355 {
3356         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3357                 switch (cmd->u.tc_start.ts_valid_type) {
3358                 case NRS_TBF_FLAG_JOBID:
3359                         nrs_tbf_jobid_cmd_fini(cmd);
3360                         break;
3361                 case NRS_TBF_FLAG_NID:
3362                         nrs_tbf_nid_cmd_fini(cmd);
3363                         break;
3364                 case NRS_TBF_FLAG_OPCODE:
3365                         nrs_tbf_opcode_cmd_fini(cmd);
3366                         break;
3367                 case NRS_TBF_FLAG_GENERIC:
3368                         nrs_tbf_generic_cmd_fini(cmd);
3369                         break;
3370                 case NRS_TBF_FLAG_UID:
3371                 case NRS_TBF_FLAG_GID:
3372                         nrs_tbf_id_cmd_fini(cmd);
3373                         break;
3374                 default:
3375                         CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3376                               cmd->u.tc_start.ts_valid_type);
3377                 }
3378         }
3379 }
3380
3381 static bool name_is_valid(const char *name)
3382 {
3383         int i;
3384
3385         for (i = 0; i < strlen(name); i++) {
3386                 if ((!isalnum(name[i])) &&
3387                     (name[i] != '_'))
3388                         return false;
3389         }
3390         return true;
3391 }
3392
3393 static int
3394 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3395 {
3396         char    *key;
3397         char    *val;
3398         int      rc;
3399         __u64    rate;
3400
3401         val = buffer;
3402         key = strsep(&val, "=");
3403         if (val == NULL || strlen(val) == 0)
3404                 return -EINVAL;
3405
3406         /* Key of the value pair */
3407         if (strcmp(key, "rate") == 0) {
3408                 rc = kstrtoull(val, 10, &rate);
3409                 if (rc)
3410                         return rc;
3411
3412                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3413                         return -EINVAL;
3414
3415                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3416                         cmd->u.tc_start.ts_rpc_rate = rate;
3417                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3418                         cmd->u.tc_change.tc_rpc_rate = rate;
3419                 else
3420                         return -EINVAL;
3421         }  else if (strcmp(key, "rank") == 0) {
3422                 if (!name_is_valid(val))
3423                         return -EINVAL;
3424
3425                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3426                         cmd->u.tc_start.ts_next_name = val;
3427                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3428                         cmd->u.tc_change.tc_next_name = val;
3429                 else
3430                         return -EINVAL;
3431         } else if (strcmp(key, "realtime") == 0) {
3432                 unsigned long realtime;
3433
3434                 rc = kstrtoul(val, 10, &realtime);
3435                 if (rc)
3436                         return rc;
3437
3438                 if (realtime > 0)
3439                         cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3440         } else {
3441                 return -EINVAL;
3442         }
3443         return 0;
3444 }
3445
3446 static int
3447 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3448 {
3449         char    *val;
3450         char    *token;
3451         int      rc;
3452
3453         val = buffer;
3454         while (val != NULL && strlen(val) != 0) {
3455                 token = strsep(&val, " ");
3456                 rc = nrs_tbf_parse_value_pair(cmd, token);
3457                 if (rc)
3458                         return rc;
3459         }
3460
3461         switch (cmd->tc_cmd) {
3462         case NRS_CTL_TBF_START_RULE:
3463                 if (cmd->u.tc_start.ts_rpc_rate == 0)
3464                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3465                 break;
3466         case NRS_CTL_TBF_CHANGE_RULE:
3467                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3468                     cmd->u.tc_change.tc_next_name == NULL)
3469                         return -EINVAL;
3470                 break;
3471         case NRS_CTL_TBF_STOP_RULE:
3472                 break;
3473         default:
3474                 return -EINVAL;
3475         }
3476         return 0;
3477 }
3478
3479 static struct nrs_tbf_cmd *
3480 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3481 {
3482         static struct nrs_tbf_cmd       *cmd;
3483         char                            *token;
3484         char                            *val;
3485         int                              rc = 0;
3486
3487         OBD_ALLOC_PTR(cmd);
3488         if (cmd == NULL)
3489                 GOTO(out, rc = -ENOMEM);
3490         memset(cmd, 0, sizeof(*cmd));
3491
3492         val = buffer;
3493         token = strsep(&val, " ");
3494         if (val == NULL || strlen(val) == 0)
3495                 GOTO(out_free_cmd, rc = -EINVAL);
3496
3497         /* Type of the command */
3498         if (strcmp(token, "start") == 0) {
3499                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3500                 cmd->u.tc_start.ts_valid_type = type_flag;
3501         } else if (strcmp(token, "stop") == 0)
3502                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3503         else if (strcmp(token, "change") == 0)
3504                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3505         else
3506                 GOTO(out_free_cmd, rc = -EINVAL);
3507
3508         /* Name of the rule */
3509         token = strsep(&val, " ");
3510         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) ||
3511             !name_is_valid(token))
3512                 GOTO(out_free_cmd, rc = -EINVAL);
3513         cmd->tc_name = token;
3514
3515         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3516                 /* List of ID */
3517                 LASSERT(val);
3518                 token = val;
3519                 val = strrchr(token, '}');
3520                 if (!val)
3521                         GOTO(out_free_cmd, rc = -EINVAL);
3522
3523                 /* Skip '}' */
3524                 val++;
3525                 if (*val == '\0') {
3526                         val = NULL;
3527                 } else if (*val == ' ') {
3528                         *val = '\0';
3529                         val++;
3530                 } else
3531                         GOTO(out_free_cmd, rc = -EINVAL);
3532
3533                 rc = nrs_tbf_id_parse(cmd, token);
3534                 if (rc)
3535                         GOTO(out_free_cmd, rc);
3536         }
3537
3538         rc = nrs_tbf_parse_value_pairs(cmd, val);
3539         if (rc)
3540                 GOTO(out_cmd_fini, rc = -EINVAL);
3541         goto out;
3542 out_cmd_fini:
3543         nrs_tbf_cmd_fini(cmd);
3544 out_free_cmd:
3545         OBD_FREE_PTR(cmd);
3546 out:
3547         if (rc)
3548                 cmd = ERR_PTR(rc);
3549         return cmd;
3550 }
3551
3552 /**
3553  * Get the TBF policy type (nid, jobid, etc) preset by
3554  * proc entry 'nrs_policies' for command buffer parsing.
3555  *
3556  * \param[in] svc the PTLRPC service
3557  * \param[in] queue the NRS queue type
3558  *
3559  * \retval the preset TBF policy type flag
3560  */
3561 static __u32
3562 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3563 {
3564         __u32   type;
3565         int     rc;
3566
3567         rc = ptlrpc_nrs_policy_control(svc, queue,
3568                                        NRS_POL_NAME_TBF,
3569                                        NRS_CTL_TBF_RD_TYPE_FLAG,
3570                                        true, &type);
3571         if (rc != 0)
3572                 type = NRS_TBF_FLAG_INVALID;
3573
3574         return type;
3575 }
3576
3577 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3578 static ssize_t
3579 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3580                                       const char __user *buffer,
3581                                       size_t count, loff_t *off)
3582 {
3583         struct seq_file           *m = file->private_data;
3584         struct ptlrpc_service     *svc = m->private;
3585         char                      *kernbuf;
3586         char                      *val;
3587         int                        rc;
3588         static struct nrs_tbf_cmd *cmd;
3589         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3590         unsigned long              length;
3591         char                      *token;
3592
3593         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3594         if (kernbuf == NULL)
3595                 GOTO(out, rc = -ENOMEM);
3596
3597         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3598                 GOTO(out_free_kernbuff, rc = -EINVAL);
3599
3600         if (copy_from_user(kernbuf, buffer, count))
3601                 GOTO(out_free_kernbuff, rc = -EFAULT);
3602
3603         val = kernbuf;
3604         token = strsep(&val, " ");
3605         if (val == NULL)
3606                 GOTO(out_free_kernbuff, rc = -EINVAL);
3607
3608         if (strcmp(token, "reg") == 0) {
3609                 queue = PTLRPC_NRS_QUEUE_REG;
3610         } else if (strcmp(token, "hp") == 0) {
3611                 queue = PTLRPC_NRS_QUEUE_HP;
3612         } else {
3613                 kernbuf[strlen(token)] = ' ';
3614                 val = kernbuf;
3615         }
3616         length = strlen(val);
3617
3618         if (length == 0)
3619                 GOTO(out_free_kernbuff, rc = -EINVAL);
3620
3621         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3622                 GOTO(out_free_kernbuff, rc = -ENODEV);
3623         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3624                 queue = PTLRPC_NRS_QUEUE_REG;
3625
3626         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3627         if (IS_ERR(cmd))
3628                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3629
3630         /**
3631          * Serialize NRS core lprocfs operations with policy registration/
3632          * unregistration.
3633          */
3634         mutex_lock(&nrs_core.nrs_mutex);
3635         rc = ptlrpc_nrs_policy_control(svc, queue,
3636                                        NRS_POL_NAME_TBF,
3637                                        NRS_CTL_TBF_WR_RULE,
3638                                        false, cmd);
3639         mutex_unlock(&nrs_core.nrs_mutex);
3640
3641         nrs_tbf_cmd_fini(cmd);
3642         OBD_FREE_PTR(cmd);
3643 out_free_kernbuff:
3644         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3645 out:
3646         return rc ? rc : count;
3647 }
3648
3649 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3650
3651 /**
3652  * Initializes a TBF policy's lprocfs interface for service \a svc
3653  *
3654  * \param[in] svc the service
3655  *
3656  * \retval 0    success
3657  * \retval != 0 error
3658  */
3659 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3660 {
3661         struct ldebugfs_vars nrs_tbf_lprocfs_vars[] = {
3662                 { .name         = "nrs_tbf_rule",
3663                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3664                   .data = svc },
3665                 { NULL }
3666         };
3667
3668         if (!svc->srv_debugfs_entry)
3669                 return 0;
3670
3671         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_tbf_lprocfs_vars, NULL);
3672
3673         return 0;
3674 }
3675
3676 /**
3677  * TBF policy operations
3678  */
3679 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3680         .op_policy_start        = nrs_tbf_start,
3681         .op_policy_stop         = nrs_tbf_stop,
3682         .op_policy_ctl          = nrs_tbf_ctl,
3683         .op_res_get             = nrs_tbf_res_get,
3684         .op_res_put             = nrs_tbf_res_put,
3685         .op_req_get             = nrs_tbf_req_get,
3686         .op_req_enqueue         = nrs_tbf_req_add,
3687         .op_req_dequeue         = nrs_tbf_req_del,
3688         .op_req_stop            = nrs_tbf_req_stop,
3689         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3690 };
3691
3692 /**
3693  * TBF policy configuration
3694  */
3695 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3696         .nc_name                = NRS_POL_NAME_TBF,
3697         .nc_ops                 = &nrs_tbf_ops,
3698         .nc_compat              = nrs_policy_compat_all,
3699 };
3700
3701 /** @} tbf */
3702
3703 /** @} nrs */