Whamcloud - gitweb
LU-10391 ptlrpc: change rq_peer to struct lnet_nid
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 /**
35  * \addtogoup nrs
36  * @{
37  */
38
39 #define DEBUG_SUBSYSTEM S_RPC
40 #include <obd_support.h>
41 #include <obd_class.h>
42 #include <libcfs/libcfs.h>
43 #include <lustre_req_layout.h>
44 #include "ptlrpc_internal.h"
45
46 /**
47  * \name tbf
48  *
49  * Token Bucket Filter over client NIDs
50  *
51  * @{
52  */
53
54 #define NRS_POL_NAME_TBF        "tbf"
55
56 static int tbf_jobid_cache_size = 8192;
57 module_param(tbf_jobid_cache_size, int, 0644);
58 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
59
60 static int tbf_rate = 10000;
61 module_param(tbf_rate, int, 0644);
62 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
63
64 static int tbf_depth = 3;
65 module_param(tbf_depth, int, 0644);
66 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
67
68 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
69 {
70         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
71                                                  th_timer);
72         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
73         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
74
75         nrs->nrs_throttling = 0;
76         wake_up(&svcpt->scp_waitq);
77
78         return HRTIMER_NORESTART;
79 }
80
81 #define NRS_TBF_DEFAULT_RULE "default"
82
83 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
84 {
85         LASSERT(atomic_read(&rule->tr_ref) == 0);
86         LASSERT(list_empty(&rule->tr_cli_list));
87         LASSERT(list_empty(&rule->tr_linkage));
88
89         rule->tr_head->th_ops->o_rule_fini(rule);
90         OBD_FREE_PTR(rule);
91 }
92
93 /**
94  * Decreases the rule's usage reference count, and stops the rule in case it
95  * was already stopping and have no more outstanding usage references (which
96  * indicates it has no more queued or started requests, and can be safely
97  * stopped).
98  */
99 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
100 {
101         if (atomic_dec_and_test(&rule->tr_ref))
102                 nrs_tbf_rule_fini(rule);
103 }
104
105 /**
106  * Increases the rule's usage reference count.
107  */
108 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
109 {
110         atomic_inc(&rule->tr_ref);
111 }
112
113 static void
114 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
115 {
116         LASSERT(!list_empty(&cli->tc_linkage));
117         LASSERT(cli->tc_rule);
118         spin_lock(&cli->tc_rule->tr_rule_lock);
119         list_del_init(&cli->tc_linkage);
120         spin_unlock(&cli->tc_rule->tr_rule_lock);
121         nrs_tbf_rule_put(cli->tc_rule);
122         cli->tc_rule = NULL;
123 }
124
125 static void
126 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
127                         struct nrs_tbf_client *cli)
128
129 {
130         struct nrs_tbf_rule *rule = cli->tc_rule;
131
132         cli->tc_rpc_rate = rule->tr_rpc_rate;
133         cli->tc_nsecs = rule->tr_nsecs_per_rpc;
134         cli->tc_nsecs_resid = 0;
135         cli->tc_depth = rule->tr_depth;
136         cli->tc_ntoken = rule->tr_depth;
137         cli->tc_check_time = ktime_to_ns(ktime_get());
138         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
139         cli->tc_rule_generation = rule->tr_generation;
140
141         if (cli->tc_in_heap)
142                 binheap_relocate(head->th_binheap,
143                                  &cli->tc_node);
144 }
145
146 static void
147 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
148                   struct nrs_tbf_rule *rule,
149                   struct nrs_tbf_client *cli)
150 {
151         spin_lock(&cli->tc_rule_lock);
152         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
153                 LASSERT(rule != cli->tc_rule);
154                 nrs_tbf_cli_rule_put(cli);
155         }
156         LASSERT(cli->tc_rule == NULL);
157         LASSERT(list_empty(&cli->tc_linkage));
158         /* Rule's ref is added before called */
159         cli->tc_rule = rule;
160         spin_lock(&rule->tr_rule_lock);
161         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
162         spin_unlock(&rule->tr_rule_lock);
163         spin_unlock(&cli->tc_rule_lock);
164         nrs_tbf_cli_reset_value(head, cli);
165 }
166
167 static int
168 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
169 {
170         return rule->tr_head->th_ops->o_rule_dump(rule, m);
171 }
172
173 static int
174 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
175 {
176         struct nrs_tbf_rule *rule;
177         int rc = 0;
178
179         LASSERT(head != NULL);
180         spin_lock(&head->th_rule_lock);
181         /* List the rules from newest to oldest */
182         list_for_each_entry(rule, &head->th_list, tr_linkage) {
183                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
184                 rc = nrs_tbf_rule_dump(rule, m);
185                 if (rc) {
186                         rc = -ENOSPC;
187                         break;
188                 }
189         }
190         spin_unlock(&head->th_rule_lock);
191
192         return rc;
193 }
194
195 static struct nrs_tbf_rule *
196 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
197                          const char *name)
198 {
199         struct nrs_tbf_rule *rule;
200
201         LASSERT(head != NULL);
202         list_for_each_entry(rule, &head->th_list, tr_linkage) {
203                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
204                 if (strcmp(rule->tr_name, name) == 0) {
205                         nrs_tbf_rule_get(rule);
206                         return rule;
207                 }
208         }
209         return NULL;
210 }
211
212 static struct nrs_tbf_rule *
213 nrs_tbf_rule_find(struct nrs_tbf_head *head,
214                   const char *name)
215 {
216         struct nrs_tbf_rule *rule;
217
218         LASSERT(head != NULL);
219         spin_lock(&head->th_rule_lock);
220         rule = nrs_tbf_rule_find_nolock(head, name);
221         spin_unlock(&head->th_rule_lock);
222         return rule;
223 }
224
225 static struct nrs_tbf_rule *
226 nrs_tbf_rule_match(struct nrs_tbf_head *head,
227                    struct nrs_tbf_client *cli)
228 {
229         struct nrs_tbf_rule *rule = NULL;
230         struct nrs_tbf_rule *tmp_rule;
231
232         spin_lock(&head->th_rule_lock);
233         /* Match the newest rule in the list */
234         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
235                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
236                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
237                         rule = tmp_rule;
238                         break;
239                 }
240         }
241
242         if (rule == NULL)
243                 rule = head->th_rule;
244
245         nrs_tbf_rule_get(rule);
246         spin_unlock(&head->th_rule_lock);
247         return rule;
248 }
249
250 static void
251 nrs_tbf_cli_init(struct nrs_tbf_head *head,
252                  struct nrs_tbf_client *cli,
253                  struct ptlrpc_request *req)
254 {
255         struct nrs_tbf_rule *rule;
256
257         memset(cli, 0, sizeof(*cli));
258         cli->tc_in_heap = false;
259         head->th_ops->o_cli_init(cli, req);
260         INIT_LIST_HEAD(&cli->tc_list);
261         INIT_LIST_HEAD(&cli->tc_linkage);
262         spin_lock_init(&cli->tc_rule_lock);
263         atomic_set(&cli->tc_ref, 1);
264         rule = nrs_tbf_rule_match(head, cli);
265         nrs_tbf_cli_reset(head, rule, cli);
266 }
267
268 static void
269 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
270 {
271         LASSERT(list_empty(&cli->tc_list));
272         LASSERT(!cli->tc_in_heap);
273         LASSERT(atomic_read(&cli->tc_ref) == 0);
274         spin_lock(&cli->tc_rule_lock);
275         nrs_tbf_cli_rule_put(cli);
276         spin_unlock(&cli->tc_rule_lock);
277         OBD_FREE_PTR(cli);
278 }
279
280 static int
281 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
282                    struct nrs_tbf_head *head,
283                    struct nrs_tbf_cmd *start)
284 {
285         struct nrs_tbf_rule     *rule;
286         struct nrs_tbf_rule     *tmp_rule;
287         struct nrs_tbf_rule     *next_rule;
288         char                    *next_name = start->u.tc_start.ts_next_name;
289         int                      rc;
290
291         rule = nrs_tbf_rule_find(head, start->tc_name);
292         if (rule) {
293                 nrs_tbf_rule_put(rule);
294                 return -EEXIST;
295         }
296
297         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
298         if (rule == NULL)
299                 return -ENOMEM;
300
301         strlcpy(rule->tr_name, start->tc_name, sizeof(rule->tr_name));
302         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
303         rule->tr_flags = start->u.tc_start.ts_rule_flags;
304         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
305         rule->tr_depth = tbf_depth;
306         atomic_set(&rule->tr_ref, 1);
307         INIT_LIST_HEAD(&rule->tr_cli_list);
308         INIT_LIST_HEAD(&rule->tr_nids);
309         INIT_LIST_HEAD(&rule->tr_linkage);
310         spin_lock_init(&rule->tr_rule_lock);
311         rule->tr_head = head;
312
313         rc = head->th_ops->o_rule_init(policy, rule, start);
314         if (rc) {
315                 OBD_FREE_PTR(rule);
316                 return rc;
317         }
318
319         /* Add as the newest rule */
320         spin_lock(&head->th_rule_lock);
321         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
322         if (tmp_rule) {
323                 spin_unlock(&head->th_rule_lock);
324                 nrs_tbf_rule_put(tmp_rule);
325                 nrs_tbf_rule_put(rule);
326                 return -EEXIST;
327         }
328
329         if (next_name) {
330                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
331                 if (!next_rule) {
332                         spin_unlock(&head->th_rule_lock);
333                         nrs_tbf_rule_put(rule);
334                         return -ENOENT;
335                 }
336
337                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
338                 nrs_tbf_rule_put(next_rule);
339         } else {
340                 /* Add on the top of the rule list */
341                 list_add(&rule->tr_linkage, &head->th_list);
342         }
343         spin_unlock(&head->th_rule_lock);
344         atomic_inc(&head->th_rule_sequence);
345         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
346                 rule->tr_flags |= NTRS_DEFAULT;
347                 LASSERT(head->th_rule == NULL);
348                 head->th_rule = rule;
349         }
350
351         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %llu gen %llu\n",
352                rule, rule->tr_rpc_rate, rule->tr_generation);
353
354         return 0;
355 }
356
357 /**
358  * Change the rank of a rule in the rule list
359  *
360  * The matched rule will be moved to the position right before another
361  * given rule.
362  *
363  * \param[in] policy    the policy instance
364  * \param[in] head      the TBF policy instance
365  * \param[in] name      the rule name to be moved
366  * \param[in] next_name the rule name before which the matched rule will be
367  *                      moved
368  *
369  */
370 static int
371 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
372                          struct nrs_tbf_head *head,
373                          char *name,
374                          char *next_name)
375 {
376         struct nrs_tbf_rule     *rule = NULL;
377         struct nrs_tbf_rule     *next_rule = NULL;
378         int                      rc = 0;
379
380         LASSERT(head != NULL);
381
382         spin_lock(&head->th_rule_lock);
383         rule = nrs_tbf_rule_find_nolock(head, name);
384         if (!rule)
385                 GOTO(out, rc = -ENOENT);
386
387         if (strcmp(name, next_name) == 0)
388                 GOTO(out_put, rc);
389
390         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
391         if (!next_rule)
392                 GOTO(out_put, rc = -ENOENT);
393
394         /* rules may be adjacent in same list, so list_move() isn't safe here */
395         list_move_tail(&rule->tr_linkage, &next_rule->tr_linkage);
396         nrs_tbf_rule_put(next_rule);
397 out_put:
398         nrs_tbf_rule_put(rule);
399 out:
400         spin_unlock(&head->th_rule_lock);
401         return rc;
402 }
403
404 static int
405 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
406                          struct nrs_tbf_head *head,
407                          char *name,
408                          __u64 rate)
409 {
410         struct nrs_tbf_rule *rule;
411
412         assert_spin_locked(&policy->pol_nrs->nrs_lock);
413
414         rule = nrs_tbf_rule_find(head, name);
415         if (rule == NULL)
416                 return -ENOENT;
417
418         rule->tr_rpc_rate = rate;
419         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
420         rule->tr_generation++;
421         nrs_tbf_rule_put(rule);
422
423         return 0;
424 }
425
426 static int
427 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
428                     struct nrs_tbf_head *head,
429                     struct nrs_tbf_cmd *change)
430 {
431         __u64    rate = change->u.tc_change.tc_rpc_rate;
432         char    *next_name = change->u.tc_change.tc_next_name;
433         int      rc;
434
435         if (rate != 0) {
436                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
437                                               rate);
438                 if (rc)
439                         return rc;
440         }
441
442         if (next_name) {
443                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
444                                               next_name);
445                 if (rc)
446                         return rc;
447         }
448
449         return 0;
450 }
451
452 static int
453 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
454                   struct nrs_tbf_head *head,
455                   struct nrs_tbf_cmd *stop)
456 {
457         struct nrs_tbf_rule *rule;
458
459         assert_spin_locked(&policy->pol_nrs->nrs_lock);
460
461         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
462                 return -EPERM;
463
464         rule = nrs_tbf_rule_find(head, stop->tc_name);
465         if (rule == NULL)
466                 return -ENOENT;
467
468         list_del_init(&rule->tr_linkage);
469         rule->tr_flags |= NTRS_STOPPING;
470         nrs_tbf_rule_put(rule);
471         nrs_tbf_rule_put(rule);
472
473         return 0;
474 }
475
476 static int
477 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
478                 struct nrs_tbf_head *head,
479                 struct nrs_tbf_cmd *cmd)
480 {
481         int rc;
482
483         assert_spin_locked(&policy->pol_nrs->nrs_lock);
484
485         switch (cmd->tc_cmd) {
486         case NRS_CTL_TBF_START_RULE:
487                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
488                         return -EINVAL;
489
490                 spin_unlock(&policy->pol_nrs->nrs_lock);
491                 rc = nrs_tbf_rule_start(policy, head, cmd);
492                 spin_lock(&policy->pol_nrs->nrs_lock);
493                 return rc;
494         case NRS_CTL_TBF_CHANGE_RULE:
495                 rc = nrs_tbf_rule_change(policy, head, cmd);
496                 return rc;
497         case NRS_CTL_TBF_STOP_RULE:
498                 rc = nrs_tbf_rule_stop(policy, head, cmd);
499                 /* Take it as a success, if not exists at all */
500                 return rc == -ENOENT ? 0 : rc;
501         default:
502                 return -EFAULT;
503         }
504 }
505
506 /**
507  * Binary heap predicate.
508  *
509  * \param[in] e1 the first binheap node to compare
510  * \param[in] e2 the second binheap node to compare
511  *
512  * \retval 0 e1 > e2
513  * \retval 1 e1 < e2
514  */
515 static int
516 tbf_cli_compare(struct binheap_node *e1, struct binheap_node *e2)
517 {
518         struct nrs_tbf_client *cli1;
519         struct nrs_tbf_client *cli2;
520
521         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
522         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
523
524         if (cli1->tc_deadline < cli2->tc_deadline)
525                 return 1;
526         else if (cli1->tc_deadline > cli2->tc_deadline)
527                 return 0;
528
529         if (cli1->tc_check_time < cli2->tc_check_time)
530                 return 1;
531         else if (cli1->tc_check_time > cli2->tc_check_time)
532                 return 0;
533
534         /* Maybe need more comparasion, e.g. request number in the rules */
535         return 1;
536 }
537
538 /**
539  * TBF binary heap operations
540  */
541 static struct binheap_ops nrs_tbf_heap_ops = {
542         .hop_enter      = NULL,
543         .hop_exit       = NULL,
544         .hop_compare    = tbf_cli_compare,
545 };
546
547 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
548                                   unsigned mask)
549 {
550         return cfs_hash_djb2_hash(key, strlen(key), mask);
551 }
552
553 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
554 {
555         struct nrs_tbf_client *cli = hlist_entry(hnode,
556                                                      struct nrs_tbf_client,
557                                                      tc_hnode);
558
559         return (strcmp(cli->tc_jobid, key) == 0);
560 }
561
562 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
563 {
564         struct nrs_tbf_client *cli = hlist_entry(hnode,
565                                                      struct nrs_tbf_client,
566                                                      tc_hnode);
567
568         return cli->tc_jobid;
569 }
570
571 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
572 {
573         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
574 }
575
576 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
577 {
578         struct nrs_tbf_client *cli = hlist_entry(hnode,
579                                                      struct nrs_tbf_client,
580                                                      tc_hnode);
581
582         atomic_inc(&cli->tc_ref);
583 }
584
585 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
586 {
587         struct nrs_tbf_client *cli = hlist_entry(hnode,
588                                                      struct nrs_tbf_client,
589                                                      tc_hnode);
590
591         atomic_dec(&cli->tc_ref);
592 }
593
594 static void
595 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
596
597 {
598         struct nrs_tbf_client *cli = hlist_entry(hnode,
599                                                  struct nrs_tbf_client,
600                                                  tc_hnode);
601
602         LASSERT(atomic_read(&cli->tc_ref) == 0);
603         nrs_tbf_cli_fini(cli);
604 }
605
606 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
607         .hs_hash        = nrs_tbf_jobid_hop_hash,
608         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
609         .hs_key         = nrs_tbf_jobid_hop_key,
610         .hs_object      = nrs_tbf_hop_object,
611         .hs_get         = nrs_tbf_jobid_hop_get,
612         .hs_put         = nrs_tbf_jobid_hop_put,
613         .hs_put_locked  = nrs_tbf_jobid_hop_put,
614         .hs_exit        = nrs_tbf_jobid_hop_exit,
615 };
616
617 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
618                                   CFS_HASH_NO_ITEMREF | \
619                                   CFS_HASH_DEPTH)
620
621 static struct nrs_tbf_client *
622 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
623                           struct cfs_hash_bd *bd,
624                           const char *jobid)
625 {
626         struct hlist_node *hnode;
627         struct nrs_tbf_client *cli;
628
629         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
630         if (hnode == NULL)
631                 return NULL;
632
633         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
634         if (!list_empty(&cli->tc_lru))
635                 list_del_init(&cli->tc_lru);
636         return cli;
637 }
638
639 #define NRS_TBF_JOBID_NULL ""
640
641 static struct nrs_tbf_client *
642 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
643                        struct ptlrpc_request *req)
644 {
645         const char              *jobid;
646         struct nrs_tbf_client   *cli;
647         struct cfs_hash         *hs = head->th_cli_hash;
648         struct cfs_hash_bd               bd;
649
650         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
651         if (jobid == NULL)
652                 jobid = NRS_TBF_JOBID_NULL;
653         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
654         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
655         cfs_hash_bd_unlock(hs, &bd, 1);
656
657         return cli;
658 }
659
660 static struct nrs_tbf_client *
661 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
662                           struct nrs_tbf_client *cli)
663 {
664         const char              *jobid;
665         struct nrs_tbf_client   *ret;
666         struct cfs_hash         *hs = head->th_cli_hash;
667         struct cfs_hash_bd               bd;
668
669         jobid = cli->tc_jobid;
670         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
671         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
672         if (ret == NULL) {
673                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
674                 ret = cli;
675         }
676         cfs_hash_bd_unlock(hs, &bd, 1);
677
678         return ret;
679 }
680
681 static void
682 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
683                       struct nrs_tbf_client *cli)
684 {
685         struct cfs_hash_bd               bd;
686         struct cfs_hash         *hs = head->th_cli_hash;
687         struct nrs_tbf_bucket   *bkt;
688         int                      hw;
689         LIST_HEAD(zombies);
690
691         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
692         bkt = cfs_hash_bd_extra_get(hs, &bd);
693         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
694                 return;
695         LASSERT(list_empty(&cli->tc_lru));
696         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
697
698         /*
699          * Check and purge the LRU, there is at least one client in the LRU.
700          */
701         hw = tbf_jobid_cache_size >>
702              (hs->hs_cur_bits - hs->hs_bkt_bits);
703         while (cfs_hash_bd_count_get(&bd) > hw) {
704                 if (unlikely(list_empty(&bkt->ntb_lru)))
705                         break;
706                 cli = list_entry(bkt->ntb_lru.next,
707                                      struct nrs_tbf_client,
708                                      tc_lru);
709                 LASSERT(atomic_read(&cli->tc_ref) == 0);
710                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
711                 list_move(&cli->tc_lru, &zombies);
712         }
713         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
714
715         while (!list_empty(&zombies)) {
716                 cli = container_of(zombies.next,
717                                    struct nrs_tbf_client, tc_lru);
718                 list_del_init(&cli->tc_lru);
719                 nrs_tbf_cli_fini(cli);
720         }
721 }
722
723 static void
724 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
725                        struct ptlrpc_request *req)
726 {
727         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
728
729         if (jobid == NULL)
730                 jobid = NRS_TBF_JOBID_NULL;
731         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
732         INIT_LIST_HEAD(&cli->tc_lru);
733         memcpy(cli->tc_jobid, jobid, strlen(jobid));
734 }
735
736 static int nrs_tbf_jobid_hash_order(void)
737 {
738         int bits;
739
740         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
741                 ;
742
743         return bits;
744 }
745
746 #define NRS_TBF_JOBID_BKT_BITS 10
747
748 static int
749 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
750                       struct nrs_tbf_head *head)
751 {
752         struct nrs_tbf_cmd       start;
753         struct nrs_tbf_bucket   *bkt;
754         int                      bits;
755         int                      i;
756         int                      rc;
757         struct cfs_hash_bd       bd;
758
759         bits = nrs_tbf_jobid_hash_order();
760         if (bits < NRS_TBF_JOBID_BKT_BITS)
761                 bits = NRS_TBF_JOBID_BKT_BITS;
762         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
763                                             bits,
764                                             bits,
765                                             NRS_TBF_JOBID_BKT_BITS,
766                                             sizeof(*bkt),
767                                             0,
768                                             0,
769                                             &nrs_tbf_jobid_hash_ops,
770                                             NRS_TBF_JOBID_HASH_FLAGS);
771         if (head->th_cli_hash == NULL)
772                 return -ENOMEM;
773
774         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
775                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
776                 INIT_LIST_HEAD(&bkt->ntb_lru);
777         }
778
779         memset(&start, 0, sizeof(start));
780         start.u.tc_start.ts_jobids_str = "*";
781
782         start.u.tc_start.ts_rpc_rate = tbf_rate;
783         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
784         start.tc_name = NRS_TBF_DEFAULT_RULE;
785         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
786         rc = nrs_tbf_rule_start(policy, head, &start);
787         if (rc) {
788                 cfs_hash_putref(head->th_cli_hash);
789                 head->th_cli_hash = NULL;
790         }
791
792         return rc;
793 }
794
795 /**
796  * Frees jobid of \a list.
797  *
798  */
799 static void
800 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
801 {
802         struct nrs_tbf_jobid *jobid, *n;
803
804         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
805                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
806                 list_del(&jobid->tj_linkage);
807                 OBD_FREE_PTR(jobid);
808         }
809 }
810
811 static int
812 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
813 {
814         struct nrs_tbf_jobid *jobid;
815         char *ptr;
816
817         OBD_ALLOC_PTR(jobid);
818         if (jobid == NULL)
819                 return -ENOMEM;
820
821         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
822         if (jobid->tj_id == NULL) {
823                 OBD_FREE_PTR(jobid);
824                 return -ENOMEM;
825         }
826
827         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
828         ptr = lprocfs_strnstr(id->ls_str, "*", id->ls_len);
829         if (ptr == NULL)
830                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
831         else
832                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
833
834         list_add_tail(&jobid->tj_linkage, jobid_list);
835         return 0;
836 }
837
838 static bool
839 cfs_match_wildcard(const char *pattern, const char *content)
840 {
841         if (*pattern == '\0' && *content == '\0')
842                 return true;
843
844         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
845                 return false;
846
847         while (*pattern == *content) {
848                 pattern++;
849                 content++;
850                 if (*pattern == '\0' && *content == '\0')
851                         return true;
852
853                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
854                     *content == '\0')
855                         return false;
856         }
857
858         if (*pattern == '*')
859                 return (cfs_match_wildcard(pattern + 1, content) ||
860                         cfs_match_wildcard(pattern, content + 1));
861
862         return false;
863 }
864
865 static inline bool
866 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
867 {
868         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
869                 return strcmp(jobid->tj_id, id) == 0;
870
871         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
872                 return cfs_match_wildcard(jobid->tj_id, id);
873
874         return false;
875 }
876
877 static int
878 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
879 {
880         struct nrs_tbf_jobid *jobid;
881
882         list_for_each_entry(jobid, jobid_list, tj_linkage) {
883                 if (nrs_tbf_jobid_match(jobid, id))
884                         return 1;
885         }
886         return 0;
887 }
888
889 static int
890 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
891 {
892         struct cfs_lstr src;
893         struct cfs_lstr res;
894         int rc = 0;
895         ENTRY;
896
897         src.ls_str = str;
898         src.ls_len = len;
899         INIT_LIST_HEAD(jobid_list);
900         while (src.ls_str) {
901                 rc = cfs_gettok(&src, ' ', &res);
902                 if (rc == 0) {
903                         rc = -EINVAL;
904                         break;
905                 }
906                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
907                 if (rc)
908                         break;
909         }
910         if (rc)
911                 nrs_tbf_jobid_list_free(jobid_list);
912         RETURN(rc);
913 }
914
915 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
916 {
917         if (!list_empty(&cmd->u.tc_start.ts_jobids))
918                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
919         if (cmd->u.tc_start.ts_jobids_str)
920                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
921                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
922 }
923
924 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
925 {
926         struct cfs_lstr res;
927         int keylen = strlen(key);
928         int rc;
929
930         rc = cfs_gettok(src, '=', &res);
931         if (rc == 0 || res.ls_len != keylen ||
932             strncmp(res.ls_str, key, keylen) != 0 ||
933             !src->ls_str || src->ls_len <= 2 ||
934             src->ls_str[0] != '{' || src->ls_str[src->ls_len - 1] != '}')
935                 return -EINVAL;
936
937         /* Skip '{' and '}' */
938         src->ls_str++;
939         src->ls_len -= 2;
940         return 0;
941 }
942
943 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
944 {
945         struct cfs_lstr src;
946         int rc;
947
948         src.ls_str = id;
949         src.ls_len = strlen(id);
950         rc = nrs_tbf_check_id_value(&src, "jobid");
951         if (rc)
952                 return rc;
953
954         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
955         if (cmd->u.tc_start.ts_jobids_str == NULL)
956                 return -ENOMEM;
957
958         memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
959
960         /* parse jobid list */
961         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
962                                       strlen(cmd->u.tc_start.ts_jobids_str),
963                                       &cmd->u.tc_start.ts_jobids);
964         if (rc)
965                 nrs_tbf_jobid_cmd_fini(cmd);
966
967         return rc;
968 }
969
970 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
971                                    struct nrs_tbf_rule *rule,
972                                    struct nrs_tbf_cmd *start)
973 {
974         int rc = 0;
975
976         LASSERT(start->u.tc_start.ts_jobids_str);
977         OBD_ALLOC(rule->tr_jobids_str,
978                   strlen(start->u.tc_start.ts_jobids_str) + 1);
979         if (rule->tr_jobids_str == NULL)
980                 return -ENOMEM;
981
982         memcpy(rule->tr_jobids_str,
983                start->u.tc_start.ts_jobids_str,
984                strlen(start->u.tc_start.ts_jobids_str));
985
986         INIT_LIST_HEAD(&rule->tr_jobids);
987         if (!list_empty(&start->u.tc_start.ts_jobids)) {
988                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
989                                               strlen(rule->tr_jobids_str),
990                                               &rule->tr_jobids);
991                 if (rc)
992                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
993         }
994         if (rc)
995                 OBD_FREE(rule->tr_jobids_str,
996                          strlen(start->u.tc_start.ts_jobids_str) + 1);
997         return rc;
998 }
999
1000 static int
1001 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1002 {
1003         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1004                    rule->tr_jobids_str, rule->tr_rpc_rate,
1005                    atomic_read(&rule->tr_ref) - 1);
1006         return 0;
1007 }
1008
1009 static int
1010 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1011                          struct nrs_tbf_client *cli)
1012 {
1013         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1014 }
1015
1016 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1017 {
1018         if (!list_empty(&rule->tr_jobids))
1019                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1020         LASSERT(rule->tr_jobids_str != NULL);
1021         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1022 }
1023
1024 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1025         .o_name = NRS_TBF_TYPE_JOBID,
1026         .o_startup = nrs_tbf_jobid_startup,
1027         .o_cli_find = nrs_tbf_jobid_cli_find,
1028         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1029         .o_cli_put = nrs_tbf_jobid_cli_put,
1030         .o_cli_init = nrs_tbf_jobid_cli_init,
1031         .o_rule_init = nrs_tbf_jobid_rule_init,
1032         .o_rule_dump = nrs_tbf_jobid_rule_dump,
1033         .o_rule_match = nrs_tbf_jobid_rule_match,
1034         .o_rule_fini = nrs_tbf_jobid_rule_fini,
1035 };
1036
1037 /**
1038  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1039  *
1040  * This uses ptlrpc_request::rq_peer.nid (as nid4) as its key, in order to hash
1041  * nrs_tbf_client objects.
1042  */
1043 #define NRS_TBF_NID_BKT_BITS    8
1044 #define NRS_TBF_NID_BITS        16
1045
1046 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1047                                   unsigned mask)
1048 {
1049         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1050 }
1051
1052 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1053 {
1054         lnet_nid_t            *nid = (lnet_nid_t *)key;
1055         struct nrs_tbf_client *cli = hlist_entry(hnode,
1056                                                      struct nrs_tbf_client,
1057                                                      tc_hnode);
1058
1059         return *nid == cli->tc_nid;
1060 }
1061
1062 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1063 {
1064         struct nrs_tbf_client *cli = hlist_entry(hnode,
1065                                                      struct nrs_tbf_client,
1066                                                      tc_hnode);
1067
1068         return &cli->tc_nid;
1069 }
1070
1071 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1072 {
1073         struct nrs_tbf_client *cli = hlist_entry(hnode,
1074                                                      struct nrs_tbf_client,
1075                                                      tc_hnode);
1076
1077         atomic_inc(&cli->tc_ref);
1078 }
1079
1080 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1081 {
1082         struct nrs_tbf_client *cli = hlist_entry(hnode,
1083                                                      struct nrs_tbf_client,
1084                                                      tc_hnode);
1085
1086         atomic_dec(&cli->tc_ref);
1087 }
1088
1089 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1090 {
1091         struct nrs_tbf_client *cli = hlist_entry(hnode,
1092                                                      struct nrs_tbf_client,
1093                                                      tc_hnode);
1094
1095         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1096                  "Busy TBF object from client with NID %s, with %d refs\n",
1097                  libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1098
1099         nrs_tbf_cli_fini(cli);
1100 }
1101
1102 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1103         .hs_hash        = nrs_tbf_nid_hop_hash,
1104         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1105         .hs_key         = nrs_tbf_nid_hop_key,
1106         .hs_object      = nrs_tbf_hop_object,
1107         .hs_get         = nrs_tbf_nid_hop_get,
1108         .hs_put         = nrs_tbf_nid_hop_put,
1109         .hs_put_locked  = nrs_tbf_nid_hop_put,
1110         .hs_exit        = nrs_tbf_nid_hop_exit,
1111 };
1112
1113 static struct nrs_tbf_client *
1114 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1115                      struct ptlrpc_request *req)
1116 {
1117         lnet_nid_t nid4 = lnet_nid_to_nid4(&req->rq_peer.nid);
1118
1119         return cfs_hash_lookup(head->th_cli_hash, &nid4);
1120 }
1121
1122 static struct nrs_tbf_client *
1123 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1124                         struct nrs_tbf_client *cli)
1125 {
1126         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1127                                        &cli->tc_hnode);
1128 }
1129
1130 static void
1131 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1132                       struct nrs_tbf_client *cli)
1133 {
1134         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1135 }
1136
1137 static int
1138 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1139                     struct nrs_tbf_head *head)
1140 {
1141         struct nrs_tbf_cmd      start;
1142         int rc;
1143
1144         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1145                                             NRS_TBF_NID_BITS,
1146                                             NRS_TBF_NID_BITS,
1147                                             NRS_TBF_NID_BKT_BITS, 0,
1148                                             CFS_HASH_MIN_THETA,
1149                                             CFS_HASH_MAX_THETA,
1150                                             &nrs_tbf_nid_hash_ops,
1151                                             CFS_HASH_RW_BKTLOCK);
1152         if (head->th_cli_hash == NULL)
1153                 return -ENOMEM;
1154
1155         memset(&start, 0, sizeof(start));
1156         start.u.tc_start.ts_nids_str = "*";
1157
1158         start.u.tc_start.ts_rpc_rate = tbf_rate;
1159         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1160         start.tc_name = NRS_TBF_DEFAULT_RULE;
1161         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1162         rc = nrs_tbf_rule_start(policy, head, &start);
1163         if (rc) {
1164                 cfs_hash_putref(head->th_cli_hash);
1165                 head->th_cli_hash = NULL;
1166         }
1167
1168         return rc;
1169 }
1170
1171 static void
1172 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1173                              struct ptlrpc_request *req)
1174 {
1175         cli->tc_nid = lnet_nid_to_nid4(&req->rq_peer.nid);
1176 }
1177
1178 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1179                                  struct nrs_tbf_rule *rule,
1180                                  struct nrs_tbf_cmd *start)
1181 {
1182         LASSERT(start->u.tc_start.ts_nids_str);
1183         OBD_ALLOC(rule->tr_nids_str,
1184                   strlen(start->u.tc_start.ts_nids_str) + 1);
1185         if (rule->tr_nids_str == NULL)
1186                 return -ENOMEM;
1187
1188         memcpy(rule->tr_nids_str,
1189                start->u.tc_start.ts_nids_str,
1190                strlen(start->u.tc_start.ts_nids_str));
1191
1192         INIT_LIST_HEAD(&rule->tr_nids);
1193         if (!list_empty(&start->u.tc_start.ts_nids)) {
1194                 if (cfs_parse_nidlist(rule->tr_nids_str,
1195                                       strlen(rule->tr_nids_str),
1196                                       &rule->tr_nids) <= 0) {
1197                         CERROR("nids {%s} illegal\n",
1198                                rule->tr_nids_str);
1199                         OBD_FREE(rule->tr_nids_str,
1200                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1201                         return -EINVAL;
1202                 }
1203         }
1204         return 0;
1205 }
1206
1207 static int
1208 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1209 {
1210         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1211                    rule->tr_nids_str, rule->tr_rpc_rate,
1212                    atomic_read(&rule->tr_ref) - 1);
1213         return 0;
1214 }
1215
1216 static int
1217 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1218                        struct nrs_tbf_client *cli)
1219 {
1220         return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1221 }
1222
1223 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1224 {
1225         if (!list_empty(&rule->tr_nids))
1226                 cfs_free_nidlist(&rule->tr_nids);
1227         LASSERT(rule->tr_nids_str != NULL);
1228         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1229 }
1230
1231 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1232 {
1233         if (!list_empty(&cmd->u.tc_start.ts_nids))
1234                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1235         if (cmd->u.tc_start.ts_nids_str)
1236                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1237                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1238 }
1239
1240 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1241 {
1242         struct cfs_lstr src;
1243         int rc;
1244
1245         src.ls_str = id;
1246         src.ls_len = strlen(id);
1247         rc = nrs_tbf_check_id_value(&src, "nid");
1248         if (rc)
1249                 return rc;
1250
1251         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1252         if (cmd->u.tc_start.ts_nids_str == NULL)
1253                 return -ENOMEM;
1254
1255         memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1256
1257         /* parse NID list */
1258         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1259                               strlen(cmd->u.tc_start.ts_nids_str),
1260                               &cmd->u.tc_start.ts_nids) <= 0) {
1261                 nrs_tbf_nid_cmd_fini(cmd);
1262                 return -EINVAL;
1263         }
1264
1265         return 0;
1266 }
1267
1268 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1269         .o_name = NRS_TBF_TYPE_NID,
1270         .o_startup = nrs_tbf_nid_startup,
1271         .o_cli_find = nrs_tbf_nid_cli_find,
1272         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1273         .o_cli_put = nrs_tbf_nid_cli_put,
1274         .o_cli_init = nrs_tbf_nid_cli_init,
1275         .o_rule_init = nrs_tbf_nid_rule_init,
1276         .o_rule_dump = nrs_tbf_nid_rule_dump,
1277         .o_rule_match = nrs_tbf_nid_rule_match,
1278         .o_rule_fini = nrs_tbf_nid_rule_fini,
1279 };
1280
1281 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1282                                  unsigned mask)
1283 {
1284         return cfs_hash_djb2_hash(key, strlen(key), mask);
1285 }
1286
1287 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1288 {
1289         struct nrs_tbf_client *cli = hlist_entry(hnode,
1290                                                  struct nrs_tbf_client,
1291                                                  tc_hnode);
1292
1293         return (strcmp(cli->tc_key, key) == 0);
1294 }
1295
1296 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1297 {
1298         struct nrs_tbf_client *cli = hlist_entry(hnode,
1299                                                  struct nrs_tbf_client,
1300                                                  tc_hnode);
1301         return cli->tc_key;
1302 }
1303
1304 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1305 {
1306         struct nrs_tbf_client *cli = hlist_entry(hnode,
1307                                                  struct nrs_tbf_client,
1308                                                  tc_hnode);
1309
1310         atomic_inc(&cli->tc_ref);
1311 }
1312
1313 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1314 {
1315         struct nrs_tbf_client *cli = hlist_entry(hnode,
1316                                                  struct nrs_tbf_client,
1317                                                  tc_hnode);
1318
1319         atomic_dec(&cli->tc_ref);
1320 }
1321
1322 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1323
1324 {
1325         struct nrs_tbf_client *cli = hlist_entry(hnode,
1326                                                  struct nrs_tbf_client,
1327                                                  tc_hnode);
1328
1329         LASSERT(atomic_read(&cli->tc_ref) == 0);
1330         nrs_tbf_cli_fini(cli);
1331 }
1332
1333 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1334         .hs_hash        = nrs_tbf_hop_hash,
1335         .hs_keycmp      = nrs_tbf_hop_keycmp,
1336         .hs_key         = nrs_tbf_hop_key,
1337         .hs_object      = nrs_tbf_hop_object,
1338         .hs_get         = nrs_tbf_hop_get,
1339         .hs_put         = nrs_tbf_hop_put,
1340         .hs_put_locked  = nrs_tbf_hop_put,
1341         .hs_exit        = nrs_tbf_hop_exit,
1342 };
1343
1344 #define NRS_TBF_GENERIC_BKT_BITS        10
1345 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1346                                         CFS_HASH_NO_ITEMREF | \
1347                                         CFS_HASH_DEPTH)
1348
1349 static int
1350 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1351 {
1352         struct nrs_tbf_cmd       start;
1353         struct nrs_tbf_bucket   *bkt;
1354         int                      bits;
1355         int                      i;
1356         int                      rc;
1357         struct cfs_hash_bd       bd;
1358
1359         bits = nrs_tbf_jobid_hash_order();
1360         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1361                 bits = NRS_TBF_GENERIC_BKT_BITS;
1362         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1363                                             bits, bits,
1364                                             NRS_TBF_GENERIC_BKT_BITS,
1365                                             sizeof(*bkt), 0, 0,
1366                                             &nrs_tbf_hash_ops,
1367                                             NRS_TBF_GENERIC_HASH_FLAGS);
1368         if (head->th_cli_hash == NULL)
1369                 return -ENOMEM;
1370
1371         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1372                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1373                 INIT_LIST_HEAD(&bkt->ntb_lru);
1374         }
1375
1376         memset(&start, 0, sizeof(start));
1377         start.u.tc_start.ts_conds_str = "*";
1378
1379         start.u.tc_start.ts_rpc_rate = tbf_rate;
1380         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1381         start.tc_name = NRS_TBF_DEFAULT_RULE;
1382         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1383         rc = nrs_tbf_rule_start(policy, head, &start);
1384         if (rc)
1385                 cfs_hash_putref(head->th_cli_hash);
1386
1387         return rc;
1388 }
1389
1390 static struct nrs_tbf_client *
1391 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1392                         const char *key)
1393 {
1394         struct hlist_node *hnode;
1395         struct nrs_tbf_client *cli;
1396
1397         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1398         if (hnode == NULL)
1399                 return NULL;
1400
1401         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
1402         if (!list_empty(&cli->tc_lru))
1403                 list_del_init(&cli->tc_lru);
1404         return cli;
1405 }
1406
1407 /**
1408  * ONLY opcode presented in this function will be checked in
1409  * nrs_tbf_id_cli_set(). That means, we can add or remove an
1410  * opcode to enable or disable requests handled in nrs_tbf
1411  */
1412 static struct req_format *req_fmt(__u32 opcode)
1413 {
1414         switch (opcode) {
1415         case OST_GETATTR:
1416                 return &RQF_OST_GETATTR;
1417         case OST_SETATTR:
1418                 return &RQF_OST_SETATTR;
1419         case OST_READ:
1420                 return &RQF_OST_BRW_READ;
1421         case OST_WRITE:
1422                 return &RQF_OST_BRW_WRITE;
1423         /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1424          * in most case. Should they be removed? */
1425         case OST_CREATE:
1426                 return &RQF_OST_CREATE;
1427         case OST_DESTROY:
1428                 return &RQF_OST_DESTROY;
1429         case OST_PUNCH:
1430                 return &RQF_OST_PUNCH;
1431         case OST_SYNC:
1432                 return &RQF_OST_SYNC;
1433         case OST_LADVISE:
1434                 return &RQF_OST_LADVISE;
1435         case MDS_GETATTR:
1436                 return &RQF_MDS_GETATTR;
1437         case MDS_GETATTR_NAME:
1438                 return &RQF_MDS_GETATTR_NAME;
1439         /* close is skipped to avoid LDLM cancel slowness */
1440 #if 0
1441         case MDS_CLOSE:
1442                 return &RQF_MDS_CLOSE;
1443 #endif
1444         case MDS_REINT:
1445                 return &RQF_MDS_REINT;
1446         case MDS_READPAGE:
1447                 return &RQF_MDS_READPAGE;
1448         case MDS_GET_ROOT:
1449                 return &RQF_MDS_GET_ROOT;
1450         case MDS_STATFS:
1451                 return &RQF_MDS_STATFS;
1452         case MDS_SYNC:
1453                 return &RQF_MDS_SYNC;
1454         case MDS_QUOTACTL:
1455                 return &RQF_MDS_QUOTACTL;
1456         case MDS_GETXATTR:
1457                 return &RQF_MDS_GETXATTR;
1458         case MDS_GET_INFO:
1459                 return &RQF_MDS_GET_INFO;
1460         /* HSM op is skipped */
1461 #if 0 
1462         case MDS_HSM_STATE_GET:
1463                 return &RQF_MDS_HSM_STATE_GET;
1464         case MDS_HSM_STATE_SET:
1465                 return &RQF_MDS_HSM_STATE_SET;
1466         case MDS_HSM_ACTION:
1467                 return &RQF_MDS_HSM_ACTION;
1468         case MDS_HSM_CT_REGISTER:
1469                 return &RQF_MDS_HSM_CT_REGISTER;
1470         case MDS_HSM_CT_UNREGISTER:
1471                 return &RQF_MDS_HSM_CT_UNREGISTER;
1472 #endif
1473         case MDS_SWAP_LAYOUTS:
1474                 return &RQF_MDS_SWAP_LAYOUTS;
1475         case LDLM_ENQUEUE:
1476                 return &RQF_LDLM_ENQUEUE;
1477         default:
1478                 return NULL;
1479         }
1480 }
1481
1482 static struct req_format *intent_req_fmt(__u32 it_opc)
1483 {
1484         if (it_opc & (IT_OPEN | IT_CREAT))
1485                 return &RQF_LDLM_INTENT_OPEN;
1486         else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1487                 return &RQF_LDLM_INTENT_GETATTR;
1488         else if (it_opc & IT_GETXATTR)
1489                 return &RQF_LDLM_INTENT_GETXATTR;
1490         else if (it_opc & (IT_GLIMPSE | IT_BRW))
1491                 return &RQF_LDLM_INTENT;
1492         else
1493                 return NULL;
1494 }
1495
1496 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1497                               struct tbf_id *id)
1498 {
1499         struct ost_body *body;
1500
1501         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1502         if (body != NULL) {
1503                 id->ti_uid = body->oa.o_uid;
1504                 id->ti_gid = body->oa.o_gid;
1505                 return 0;
1506         }
1507
1508         return -EINVAL;
1509 }
1510
1511 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1512                                       struct tbf_id *id)
1513 {
1514         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1515                                                     &RMF_MDT_BODY);
1516         LASSERT(b != NULL);
1517
1518         /* TODO: nodemaping feature converts {ug}id from individual
1519          * clients to the actual ones of the file system. Some work
1520          * may be needed to fix this. */
1521         id->ti_uid = b->mbo_uid;
1522         id->ti_gid = b->mbo_gid;
1523 }
1524
1525 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1526                                            struct tbf_id *id)
1527 {
1528         struct mdt_rec_reint *rec;
1529
1530         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1531         LASSERT(rec != NULL);
1532
1533         /* use the fs{ug}id as {ug}id of the process */
1534         id->ti_uid = rec->rr_fsuid;
1535         id->ti_gid = rec->rr_fsgid;
1536 }
1537
1538 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1539                               struct tbf_id *id)
1540 {
1541         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1542         int rc = 0;
1543
1544         switch (opc) {
1545         case MDS_GETATTR:
1546         case MDS_GETATTR_NAME:
1547         case MDS_GET_ROOT:
1548         case MDS_READPAGE:
1549         case MDS_SYNC:
1550         case MDS_GETXATTR:
1551         case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1552                 unpack_ugid_from_mdt_body(req, id);
1553                 break;
1554         case MDS_CLOSE:
1555         case MDS_REINT:
1556                 unpack_ugid_from_mdt_rec_reint(req, id);
1557                 break;
1558         default:
1559                 rc = -EINVAL;
1560                 break;
1561         }
1562         return rc;
1563 }
1564
1565 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1566                               struct tbf_id *id)
1567 {
1568         struct ldlm_intent *lit;
1569         struct req_format *fmt;
1570
1571         if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1572                 return -EINVAL;
1573
1574         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1575         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1576         if (lit == NULL)
1577                 return -EINVAL;
1578
1579         fmt = intent_req_fmt(lit->opc);
1580         if (fmt == NULL)
1581                 return -EINVAL;
1582
1583         req_capsule_extend(&req->rq_pill, fmt);
1584
1585         if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1586                 unpack_ugid_from_mdt_body(req, id);
1587         else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1588                 unpack_ugid_from_mdt_rec_reint(req, id);
1589         else
1590                 return -EINVAL;
1591         return 0;
1592 }
1593
1594 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1595                               enum nrs_tbf_flag ti_type)
1596 {
1597         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1598         struct req_format *fmt = req_fmt(opc);
1599         bool fmt_unset = false;
1600         int rc;
1601
1602         memset(id, 0, sizeof(struct tbf_id));
1603         id->ti_type = ti_type;
1604
1605         if (fmt == NULL)
1606                 return -EINVAL;
1607         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1608         if (req->rq_pill.rc_fmt == NULL) {
1609                 req_capsule_set(&req->rq_pill, fmt);
1610                 fmt_unset = true;
1611         }
1612
1613         if (opc < OST_LAST_OPC)
1614                 rc = ost_tbf_id_cli_set(req, id);
1615         else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1616                 rc = mdt_tbf_id_cli_set(req, id);
1617         else if (opc == LDLM_ENQUEUE)
1618                 rc = ldlm_tbf_id_cli_set(req, id);
1619         else
1620                 rc = -EINVAL;
1621
1622         /* restore it to the initialized state */
1623         if (fmt_unset)
1624                 req->rq_pill.rc_fmt = NULL;
1625         return rc;
1626 }
1627
1628 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1629                                        struct ptlrpc_request *req,
1630                                        char *keystr, size_t keystr_sz)
1631 {
1632         const char *jobid;
1633         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1634         struct tbf_id id;
1635
1636         nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1637         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1638         if (jobid == NULL)
1639                 jobid = NRS_TBF_JOBID_NULL;
1640
1641         snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1642                  libcfs_nidstr(&req->rq_peer.nid), opc, id.ti_uid,
1643                  id.ti_gid);
1644
1645         if (cli) {
1646                 INIT_LIST_HEAD(&cli->tc_lru);
1647                 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1648                 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1649                 cli->tc_nid = lnet_nid_to_nid4(&req->rq_peer.nid);
1650                 cli->tc_opcode = opc;
1651                 cli->tc_id = id;
1652         }
1653 }
1654
1655 static struct nrs_tbf_client *
1656 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1657 {
1658         struct nrs_tbf_client *cli;
1659         struct cfs_hash *hs = head->th_cli_hash;
1660         struct cfs_hash_bd bd;
1661         char keystr[NRS_TBF_KEY_LEN];
1662
1663         nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1664         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1665         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1666         cfs_hash_bd_unlock(hs, &bd, 1);
1667
1668         return cli;
1669 }
1670
1671 static struct nrs_tbf_client *
1672 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1673                     struct nrs_tbf_client *cli)
1674 {
1675         const char              *key;
1676         struct nrs_tbf_client   *ret;
1677         struct cfs_hash         *hs = head->th_cli_hash;
1678         struct cfs_hash_bd       bd;
1679
1680         key = cli->tc_key;
1681         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1682         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1683         if (ret == NULL) {
1684                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1685                 ret = cli;
1686         }
1687         cfs_hash_bd_unlock(hs, &bd, 1);
1688
1689         return ret;
1690 }
1691
1692 static void
1693 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1694 {
1695         struct cfs_hash_bd       bd;
1696         struct cfs_hash         *hs = head->th_cli_hash;
1697         struct nrs_tbf_bucket   *bkt;
1698         int                      hw;
1699         LIST_HEAD(zombies);
1700
1701         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1702         bkt = cfs_hash_bd_extra_get(hs, &bd);
1703         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1704                 return;
1705         LASSERT(list_empty(&cli->tc_lru));
1706         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1707
1708         /**
1709          * Check and purge the LRU, there is at least one client in the LRU.
1710          */
1711         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1712         while (cfs_hash_bd_count_get(&bd) > hw) {
1713                 if (unlikely(list_empty(&bkt->ntb_lru)))
1714                         break;
1715                 cli = list_entry(bkt->ntb_lru.next,
1716                                  struct nrs_tbf_client,
1717                                  tc_lru);
1718                 LASSERT(atomic_read(&cli->tc_ref) == 0);
1719                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1720                 list_move(&cli->tc_lru, &zombies);
1721         }
1722         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1723
1724         while (!list_empty(&zombies)) {
1725                 cli = container_of(zombies.next,
1726                                    struct nrs_tbf_client, tc_lru);
1727                 list_del_init(&cli->tc_lru);
1728                 nrs_tbf_cli_fini(cli);
1729         }
1730 }
1731
1732 static void
1733 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1734                          struct ptlrpc_request *req)
1735 {
1736         char keystr[NRS_TBF_KEY_LEN];
1737
1738         nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1739 }
1740
1741 static void
1742 nrs_tbf_id_list_free(struct list_head *uid_list)
1743 {
1744         struct nrs_tbf_id *nti_id, *n;
1745
1746         list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1747                 list_del_init(&nti_id->nti_linkage);
1748                 OBD_FREE_PTR(nti_id);
1749         }
1750 }
1751
1752 static void
1753 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1754 {
1755         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1756                 expr->te_field < NRS_TBF_FIELD_MAX);
1757         switch (expr->te_field) {
1758         case NRS_TBF_FIELD_NID:
1759                 cfs_free_nidlist(&expr->te_cond);
1760                 break;
1761         case NRS_TBF_FIELD_JOBID:
1762                 nrs_tbf_jobid_list_free(&expr->te_cond);
1763                 break;
1764         case NRS_TBF_FIELD_OPCODE:
1765                 bitmap_free(expr->te_opcodes);
1766                 break;
1767         case NRS_TBF_FIELD_UID:
1768         case NRS_TBF_FIELD_GID:
1769                 nrs_tbf_id_list_free(&expr->te_cond);
1770                 break;
1771         default:
1772                 LBUG();
1773         }
1774         OBD_FREE_PTR(expr);
1775 }
1776
1777 static void
1778 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1779 {
1780         struct nrs_tbf_expression *expression;
1781         struct nrs_tbf_expression *n;
1782
1783         LASSERT(list_empty(&conjunction->tc_linkage));
1784         list_for_each_entry_safe(expression, n,
1785                                  &conjunction->tc_expressions,
1786                                  te_linkage) {
1787                 list_del_init(&expression->te_linkage);
1788                 nrs_tbf_expression_free(expression);
1789         }
1790         OBD_FREE_PTR(conjunction);
1791 }
1792
1793 static void
1794 nrs_tbf_conds_free(struct list_head *cond_list)
1795 {
1796         struct nrs_tbf_conjunction *conjunction;
1797         struct nrs_tbf_conjunction *n;
1798
1799         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1800                 list_del_init(&conjunction->tc_linkage);
1801                 nrs_tbf_conjunction_free(conjunction);
1802         }
1803 }
1804
1805 static void
1806 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1807 {
1808         if (!list_empty(&cmd->u.tc_start.ts_conds))
1809                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1810         if (cmd->u.tc_start.ts_conds_str)
1811                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1812                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1813 }
1814
1815 #define NRS_TBF_DISJUNCTION_DELIM       (',')
1816 #define NRS_TBF_CONJUNCTION_DELIM       ('&')
1817 #define NRS_TBF_EXPRESSION_DELIM        ('=')
1818
1819 static inline bool
1820 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1821 {
1822         int len = strlen(str);
1823
1824         return (field->ls_len == len &&
1825                 strncmp(field->ls_str, str, len) == 0);
1826 }
1827
1828 static int
1829 nrs_tbf_opcode_list_parse(char *str, int len, unsigned long **bitmaptr);
1830 static int
1831 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
1832                       enum nrs_tbf_flag tif);
1833
1834 static int
1835 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1836 {
1837         struct nrs_tbf_expression *expr;
1838         struct cfs_lstr field;
1839         int rc = 0;
1840
1841         OBD_ALLOC_PTR(expr);
1842         if (expr == NULL)
1843                 return -ENOMEM;
1844
1845         rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1846         if (rc == 0 || !src->ls_str || src->ls_len <= 2 ||
1847             src->ls_str[0] != '{' || src->ls_str[src->ls_len - 1] != '}')
1848                 GOTO(out, rc = -EINVAL);
1849
1850         /* Skip '{' and '}' */
1851         src->ls_str++;
1852         src->ls_len -= 2;
1853
1854         if (nrs_tbf_check_field(&field, "nid")) {
1855                 if (cfs_parse_nidlist(src->ls_str,
1856                                       src->ls_len,
1857                                       &expr->te_cond) <= 0)
1858                         GOTO(out, rc = -EINVAL);
1859                 expr->te_field = NRS_TBF_FIELD_NID;
1860         } else if (nrs_tbf_check_field(&field, "jobid")) {
1861                 if (nrs_tbf_jobid_list_parse(src->ls_str,
1862                                              src->ls_len,
1863                                              &expr->te_cond) < 0)
1864                         GOTO(out, rc = -EINVAL);
1865                 expr->te_field = NRS_TBF_FIELD_JOBID;
1866         } else if (nrs_tbf_check_field(&field, "opcode")) {
1867                 if (nrs_tbf_opcode_list_parse(src->ls_str,
1868                                               src->ls_len,
1869                                               &expr->te_opcodes) < 0)
1870                         GOTO(out, rc = -EINVAL);
1871                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1872         } else if (nrs_tbf_check_field(&field, "uid")) {
1873                 if (nrs_tbf_id_list_parse(src->ls_str,
1874                                           src->ls_len,
1875                                           &expr->te_cond,
1876                                           NRS_TBF_FLAG_UID) < 0)
1877                         GOTO(out, rc = -EINVAL);
1878                 expr->te_field = NRS_TBF_FIELD_UID;
1879         } else if (nrs_tbf_check_field(&field, "gid")) {
1880                 if (nrs_tbf_id_list_parse(src->ls_str,
1881                                           src->ls_len,
1882                                           &expr->te_cond,
1883                                           NRS_TBF_FLAG_GID) < 0)
1884                         GOTO(out, rc = -EINVAL);
1885                 expr->te_field = NRS_TBF_FIELD_GID;
1886         } else {
1887                 GOTO(out, rc = -EINVAL);
1888         }
1889
1890         list_add_tail(&expr->te_linkage, cond_list);
1891         return 0;
1892 out:
1893         OBD_FREE_PTR(expr);
1894         return rc;
1895 }
1896
1897 static int
1898 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1899 {
1900         struct nrs_tbf_conjunction *conjunction;
1901         struct cfs_lstr expr;
1902         int rc = 0;
1903
1904         OBD_ALLOC_PTR(conjunction);
1905         if (conjunction == NULL)
1906                 return -ENOMEM;
1907
1908         INIT_LIST_HEAD(&conjunction->tc_expressions);
1909         list_add_tail(&conjunction->tc_linkage, cond_list);
1910
1911         while (src->ls_str) {
1912                 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1913                 if (rc == 0) {
1914                         rc = -EINVAL;
1915                         break;
1916                 }
1917                 rc = nrs_tbf_expression_parse(&expr,
1918                                               &conjunction->tc_expressions);
1919                 if (rc)
1920                         break;
1921         }
1922         return rc;
1923 }
1924
1925 static int
1926 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1927 {
1928         struct cfs_lstr src;
1929         struct cfs_lstr res;
1930         int rc = 0;
1931
1932         src.ls_str = str;
1933         src.ls_len = len;
1934         INIT_LIST_HEAD(cond_list);
1935         while (src.ls_str) {
1936                 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1937                 if (rc == 0) {
1938                         rc = -EINVAL;
1939                         break;
1940                 }
1941                 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1942                 if (rc)
1943                         break;
1944         }
1945         return rc;
1946 }
1947
1948 static int
1949 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1950 {
1951         int rc;
1952
1953         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1954         if (cmd->u.tc_start.ts_conds_str == NULL)
1955                 return -ENOMEM;
1956
1957         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1958
1959         /* Parse hybird NID and JOBID conditions */
1960         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1961                                  strlen(cmd->u.tc_start.ts_conds_str),
1962                                  &cmd->u.tc_start.ts_conds);
1963         if (rc)
1964                 nrs_tbf_generic_cmd_fini(cmd);
1965
1966         return rc;
1967 }
1968
1969 static int
1970 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1971
1972 static int
1973 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1974                          struct nrs_tbf_rule *rule,
1975                          struct nrs_tbf_client *cli)
1976 {
1977         switch (expr->te_field) {
1978         case NRS_TBF_FIELD_NID:
1979                 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1980         case NRS_TBF_FIELD_JOBID:
1981                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1982         case NRS_TBF_FIELD_OPCODE:
1983                 return test_bit(cli->tc_opcode, expr->te_opcodes);
1984         case NRS_TBF_FIELD_UID:
1985         case NRS_TBF_FIELD_GID:
1986                 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1987         default:
1988                 return 0;
1989         }
1990 }
1991
1992 static int
1993 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1994                           struct nrs_tbf_rule *rule,
1995                           struct nrs_tbf_client *cli)
1996 {
1997         struct nrs_tbf_expression *expr;
1998         int matched;
1999
2000         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
2001                 matched = nrs_tbf_expression_match(expr, rule, cli);
2002                 if (!matched)
2003                         return 0;
2004         }
2005
2006         return 1;
2007 }
2008
2009 static int
2010 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
2011 {
2012         struct nrs_tbf_conjunction *conjunction;
2013         int matched;
2014
2015         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2016                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2017                 if (matched)
2018                         return 1;
2019         }
2020
2021         return 0;
2022 }
2023
2024 static void
2025 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2026 {
2027         if (!list_empty(&rule->tr_conds))
2028                 nrs_tbf_conds_free(&rule->tr_conds);
2029         LASSERT(rule->tr_conds_str != NULL);
2030         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2031 }
2032
2033 static int
2034 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2035                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2036 {
2037         int rc = 0;
2038
2039         LASSERT(start->u.tc_start.ts_conds_str);
2040         OBD_ALLOC(rule->tr_conds_str,
2041                   strlen(start->u.tc_start.ts_conds_str) + 1);
2042         if (rule->tr_conds_str == NULL)
2043                 return -ENOMEM;
2044
2045         memcpy(rule->tr_conds_str,
2046                start->u.tc_start.ts_conds_str,
2047                strlen(start->u.tc_start.ts_conds_str));
2048
2049         INIT_LIST_HEAD(&rule->tr_conds);
2050         if (!list_empty(&start->u.tc_start.ts_conds)) {
2051                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2052                                          strlen(rule->tr_conds_str),
2053                                          &rule->tr_conds);
2054         }
2055         if (rc)
2056                 nrs_tbf_generic_rule_fini(rule);
2057
2058         return rc;
2059 }
2060
2061 static int
2062 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2063 {
2064         seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
2065                    rule->tr_conds_str, rule->tr_rpc_rate,
2066                    atomic_read(&rule->tr_ref) - 1);
2067         return 0;
2068 }
2069
2070 static int
2071 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2072                            struct nrs_tbf_client *cli)
2073 {
2074         return nrs_tbf_cond_match(rule, cli);
2075 }
2076
2077 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2078         .o_name = NRS_TBF_TYPE_GENERIC,
2079         .o_startup = nrs_tbf_startup,
2080         .o_cli_find = nrs_tbf_cli_find,
2081         .o_cli_findadd = nrs_tbf_cli_findadd,
2082         .o_cli_put = nrs_tbf_cli_put,
2083         .o_cli_init = nrs_tbf_generic_cli_init,
2084         .o_rule_init = nrs_tbf_rule_init,
2085         .o_rule_dump = nrs_tbf_generic_rule_dump,
2086         .o_rule_match = nrs_tbf_generic_rule_match,
2087         .o_rule_fini = nrs_tbf_generic_rule_fini,
2088 };
2089
2090 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2091 {
2092         if (rule->tr_opcodes)
2093                 bitmap_free(rule->tr_opcodes);
2094
2095         LASSERT(rule->tr_opcodes_str != NULL);
2096         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2097 }
2098
2099 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2100                                         unsigned mask)
2101 {
2102         return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2103 }
2104
2105 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2106 {
2107         const __u32     *opc = key;
2108         struct nrs_tbf_client *cli = hlist_entry(hnode,
2109                                                  struct nrs_tbf_client,
2110                                                  tc_hnode);
2111
2112         return *opc == cli->tc_opcode;
2113 }
2114
2115 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2116 {
2117         struct nrs_tbf_client *cli = hlist_entry(hnode,
2118                                                  struct nrs_tbf_client,
2119                                                  tc_hnode);
2120
2121         return &cli->tc_opcode;
2122 }
2123
2124 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2125                                    struct hlist_node *hnode)
2126 {
2127         struct nrs_tbf_client *cli = hlist_entry(hnode,
2128                                                  struct nrs_tbf_client,
2129                                                  tc_hnode);
2130
2131         atomic_inc(&cli->tc_ref);
2132 }
2133
2134 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2135                                    struct hlist_node *hnode)
2136 {
2137         struct nrs_tbf_client *cli = hlist_entry(hnode,
2138                                                  struct nrs_tbf_client,
2139                                                  tc_hnode);
2140
2141         atomic_dec(&cli->tc_ref);
2142 }
2143
2144 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2145                                     struct hlist_node *hnode)
2146 {
2147         struct nrs_tbf_client *cli = hlist_entry(hnode,
2148                                                  struct nrs_tbf_client,
2149                                                  tc_hnode);
2150
2151         LASSERTF(atomic_read(&cli->tc_ref) == 0,
2152                  "Busy TBF object from client with opcode %s, with %d refs\n",
2153                  ll_opcode2str(cli->tc_opcode),
2154                  atomic_read(&cli->tc_ref));
2155
2156         nrs_tbf_cli_fini(cli);
2157 }
2158 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2159         .hs_hash        = nrs_tbf_opcode_hop_hash,
2160         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
2161         .hs_key         = nrs_tbf_opcode_hop_key,
2162         .hs_object      = nrs_tbf_hop_object,
2163         .hs_get         = nrs_tbf_opcode_hop_get,
2164         .hs_put         = nrs_tbf_opcode_hop_put,
2165         .hs_put_locked  = nrs_tbf_opcode_hop_put,
2166         .hs_exit        = nrs_tbf_opcode_hop_exit,
2167 };
2168
2169 static int
2170 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2171                     struct nrs_tbf_head *head)
2172 {
2173         struct nrs_tbf_cmd      start = { 0 };
2174         int rc;
2175
2176         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2177                                             NRS_TBF_NID_BITS,
2178                                             NRS_TBF_NID_BITS,
2179                                             NRS_TBF_NID_BKT_BITS, 0,
2180                                             CFS_HASH_MIN_THETA,
2181                                             CFS_HASH_MAX_THETA,
2182                                             &nrs_tbf_opcode_hash_ops,
2183                                             CFS_HASH_RW_BKTLOCK);
2184         if (head->th_cli_hash == NULL)
2185                 return -ENOMEM;
2186
2187         start.u.tc_start.ts_opcodes_str = "*";
2188
2189         start.u.tc_start.ts_rpc_rate = tbf_rate;
2190         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2191         start.tc_name = NRS_TBF_DEFAULT_RULE;
2192         rc = nrs_tbf_rule_start(policy, head, &start);
2193
2194         return rc;
2195 }
2196
2197 static struct nrs_tbf_client *
2198 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2199                         struct ptlrpc_request *req)
2200 {
2201         __u32 opc;
2202
2203         opc = lustre_msg_get_opc(req->rq_reqmsg);
2204         return cfs_hash_lookup(head->th_cli_hash, &opc);
2205 }
2206
2207 static struct nrs_tbf_client *
2208 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2209                            struct nrs_tbf_client *cli)
2210 {
2211         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2212                                        &cli->tc_hnode);
2213 }
2214
2215 static void
2216 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2217                         struct ptlrpc_request *req)
2218 {
2219         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2220 }
2221
2222 #define MAX_OPCODE_LEN  32
2223 static int
2224 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, unsigned long *opcodes)
2225 {
2226         int     op = 0;
2227         char    opcode_str[MAX_OPCODE_LEN];
2228
2229         if (id->ls_len + 1 > MAX_OPCODE_LEN)
2230                 return -EINVAL;
2231
2232         memcpy(opcode_str, id->ls_str, id->ls_len);
2233         opcode_str[id->ls_len] = '\0';
2234
2235         op = ll_str2opcode(opcode_str);
2236         if (op < 0)
2237                 return -EINVAL;
2238
2239         set_bit(op, opcodes);
2240         return 0;
2241 }
2242
2243 static int
2244 nrs_tbf_opcode_list_parse(char *str, int len, unsigned long **bitmaptr)
2245 {
2246         unsigned long *opcodes;
2247         struct cfs_lstr src;
2248         struct cfs_lstr res;
2249         int rc = 0;
2250
2251         ENTRY;
2252         opcodes = bitmap_zalloc(LUSTRE_MAX_OPCODES, GFP_KERNEL);
2253         if (!opcodes)
2254                 return -ENOMEM;
2255
2256         src.ls_str = str;
2257         src.ls_len = len;
2258         while (src.ls_str) {
2259                 rc = cfs_gettok(&src, ' ', &res);
2260                 if (rc == 0) {
2261                         rc = -EINVAL;
2262                         break;
2263                 }
2264                 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2265                 if (rc)
2266                         break;
2267         }
2268
2269         if (rc == 0 && bitmaptr)
2270                 *bitmaptr = opcodes;
2271         else
2272                 bitmap_free(opcodes);
2273
2274         RETURN(rc);
2275 }
2276
2277 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2278 {
2279         if (cmd->u.tc_start.ts_opcodes_str)
2280                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2281                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2282
2283 }
2284
2285 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2286 {
2287         struct cfs_lstr src;
2288         int rc;
2289
2290         src.ls_str = id;
2291         src.ls_len = strlen(id);
2292         rc = nrs_tbf_check_id_value(&src, "opcode");
2293         if (rc)
2294                 return rc;
2295
2296         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2297         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2298                 return -ENOMEM;
2299
2300         memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2301
2302         /* parse opcode list */
2303         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2304                                        strlen(cmd->u.tc_start.ts_opcodes_str),
2305                                        NULL);
2306         if (rc)
2307                 nrs_tbf_opcode_cmd_fini(cmd);
2308
2309         return rc;
2310 }
2311
2312 static int
2313 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2314                           struct nrs_tbf_client *cli)
2315 {
2316         if (rule->tr_opcodes == NULL)
2317                 return 0;
2318
2319         return test_bit(cli->tc_opcode, rule->tr_opcodes);
2320 }
2321
2322 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2323                                     struct nrs_tbf_rule *rule,
2324                                     struct nrs_tbf_cmd *start)
2325 {
2326         int rc = 0;
2327
2328         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2329         OBD_ALLOC(rule->tr_opcodes_str,
2330                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2331         if (rule->tr_opcodes_str == NULL)
2332                 return -ENOMEM;
2333
2334         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2335                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2336
2337         /* Default rule '*' */
2338         if (strcmp(start->u.tc_start.ts_opcodes_str, "*") == 0)
2339                 return 0;
2340
2341         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2342                                        strlen(rule->tr_opcodes_str),
2343                                        &rule->tr_opcodes);
2344         if (rc)
2345                 OBD_FREE(rule->tr_opcodes_str,
2346                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2347
2348         return rc;
2349 }
2350
2351 static int
2352 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2353 {
2354         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2355                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2356                    atomic_read(&rule->tr_ref) - 1);
2357         return 0;
2358 }
2359
2360
2361 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2362         .o_name = NRS_TBF_TYPE_OPCODE,
2363         .o_startup = nrs_tbf_opcode_startup,
2364         .o_cli_find = nrs_tbf_opcode_cli_find,
2365         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2366         .o_cli_put = nrs_tbf_nid_cli_put,
2367         .o_cli_init = nrs_tbf_opcode_cli_init,
2368         .o_rule_init = nrs_tbf_opcode_rule_init,
2369         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2370         .o_rule_match = nrs_tbf_opcode_rule_match,
2371         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2372 };
2373
2374 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2375                                     unsigned mask)
2376 {
2377         return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2378 }
2379
2380 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2381 {
2382         const struct tbf_id *opc = key;
2383         enum nrs_tbf_flag ntf;
2384         struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2385                                                  tc_hnode);
2386         ntf = opc->ti_type & cli->tc_id.ti_type;
2387         if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2388                 return 0;
2389
2390         if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2391                 return 0;
2392
2393         return 1;
2394 }
2395
2396 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2397 {
2398         struct nrs_tbf_client *cli = hlist_entry(hnode,
2399                                                  struct nrs_tbf_client,
2400                                                  tc_hnode);
2401         return &cli->tc_id;
2402 }
2403
2404 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2405 {
2406         struct nrs_tbf_client *cli = hlist_entry(hnode,
2407                                                  struct nrs_tbf_client,
2408                                                  tc_hnode);
2409
2410         atomic_inc(&cli->tc_ref);
2411 }
2412
2413 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2414 {
2415         struct nrs_tbf_client *cli = hlist_entry(hnode,
2416                                                  struct nrs_tbf_client,
2417                                                  tc_hnode);
2418
2419         atomic_dec(&cli->tc_ref);
2420 }
2421
2422 static void
2423 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2424
2425 {
2426         struct nrs_tbf_client *cli = hlist_entry(hnode,
2427                                                  struct nrs_tbf_client,
2428                                                  tc_hnode);
2429
2430         LASSERT(atomic_read(&cli->tc_ref) == 0);
2431         nrs_tbf_cli_fini(cli);
2432 }
2433
2434 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2435         .hs_hash        = nrs_tbf_id_hop_hash,
2436         .hs_keycmp      = nrs_tbf_id_hop_keycmp,
2437         .hs_key         = nrs_tbf_id_hop_key,
2438         .hs_object      = nrs_tbf_hop_object,
2439         .hs_get         = nrs_tbf_id_hop_get,
2440         .hs_put         = nrs_tbf_id_hop_put,
2441         .hs_put_locked  = nrs_tbf_id_hop_put,
2442         .hs_exit        = nrs_tbf_id_hop_exit,
2443 };
2444
2445 static int
2446 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2447                    struct nrs_tbf_head *head)
2448 {
2449         struct nrs_tbf_cmd start;
2450         int rc;
2451
2452         head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2453                                             NRS_TBF_NID_BITS,
2454                                             NRS_TBF_NID_BITS,
2455                                             NRS_TBF_NID_BKT_BITS, 0,
2456                                             CFS_HASH_MIN_THETA,
2457                                             CFS_HASH_MAX_THETA,
2458                                             &nrs_tbf_id_hash_ops,
2459                                             CFS_HASH_RW_BKTLOCK);
2460         if (head->th_cli_hash == NULL)
2461                 return -ENOMEM;
2462
2463         memset(&start, 0, sizeof(start));
2464         start.u.tc_start.ts_ids_str = "*";
2465         start.u.tc_start.ts_rpc_rate = tbf_rate;
2466         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2467         start.tc_name = NRS_TBF_DEFAULT_RULE;
2468         INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2469         rc = nrs_tbf_rule_start(policy, head, &start);
2470         if (rc) {
2471                 cfs_hash_putref(head->th_cli_hash);
2472                 head->th_cli_hash = NULL;
2473         }
2474
2475         return rc;
2476 }
2477
2478 static struct nrs_tbf_client *
2479 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2480                     struct ptlrpc_request *req)
2481 {
2482         struct tbf_id id;
2483
2484         LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2485                 head->th_type_flag == NRS_TBF_FLAG_GID);
2486
2487         nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2488         return cfs_hash_lookup(head->th_cli_hash, &id);
2489 }
2490
2491 static struct nrs_tbf_client *
2492 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2493                        struct nrs_tbf_client *cli)
2494 {
2495         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2496                                        &cli->tc_hnode);
2497 }
2498
2499 static void
2500 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2501                      struct ptlrpc_request *req)
2502 {
2503         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2504 }
2505
2506 static void
2507 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2508                      struct ptlrpc_request *req)
2509 {
2510         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2511 }
2512
2513 static int
2514 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2515 {
2516         struct nrs_tbf_id *nti_id;
2517         enum nrs_tbf_flag flag;
2518
2519         list_for_each_entry(nti_id, id_list, nti_linkage) {
2520                 flag = id.ti_type & nti_id->nti_id.ti_type;
2521                 if (!flag)
2522                         continue;
2523
2524                 if ((flag & NRS_TBF_FLAG_UID) &&
2525                     (id.ti_uid != nti_id->nti_id.ti_uid))
2526                         continue;
2527
2528                 if ((flag & NRS_TBF_FLAG_GID) &&
2529                     (id.ti_gid != nti_id->nti_id.ti_gid))
2530                         continue;
2531
2532                 return 1;
2533         }
2534         return 0;
2535 }
2536
2537 static int
2538 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2539                       struct nrs_tbf_client *cli)
2540 {
2541         return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2542 }
2543
2544 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2545 {
2546         nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2547
2548         if (cmd->u.tc_start.ts_ids_str)
2549                 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2550                          strlen(cmd->u.tc_start.ts_ids_str) + 1);
2551 }
2552
2553 static int
2554 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
2555                       enum nrs_tbf_flag tif)
2556 {
2557         struct cfs_lstr src;
2558         struct cfs_lstr res;
2559         int rc = 0;
2560         struct tbf_id id = { 0 };
2561         ENTRY;
2562
2563         if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2564                 RETURN(-EINVAL);
2565
2566         src.ls_str = str;
2567         src.ls_len = len;
2568         INIT_LIST_HEAD(id_list);
2569         while (src.ls_str) {
2570                 struct nrs_tbf_id *nti_id;
2571
2572                 if (cfs_gettok(&src, ' ', &res) == 0)
2573                         GOTO(out, rc = -EINVAL);
2574
2575                 id.ti_type = tif;
2576                 if (tif == NRS_TBF_FLAG_UID) {
2577                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2578                                                &id.ti_uid, 0, (u32)~0U))
2579                                 GOTO(out, rc = -EINVAL);
2580                 } else {
2581                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2582                                                &id.ti_gid, 0, (u32)~0U))
2583                                 GOTO(out, rc = -EINVAL);
2584                 }
2585
2586                 OBD_ALLOC_PTR(nti_id);
2587                 if (nti_id == NULL)
2588                         GOTO(out, rc = -ENOMEM);
2589
2590                 nti_id->nti_id = id;
2591                 list_add_tail(&nti_id->nti_linkage, id_list);
2592         }
2593 out:
2594         if (rc)
2595                 nrs_tbf_id_list_free(id_list);
2596         RETURN(rc);
2597 }
2598
2599 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2600 {
2601         struct cfs_lstr src;
2602         int rc;
2603         enum nrs_tbf_flag tif;
2604
2605         tif = cmd->u.tc_start.ts_valid_type;
2606
2607         src.ls_str = id;
2608         src.ls_len = strlen(id);
2609
2610         rc = nrs_tbf_check_id_value(&src,
2611                                     tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2612         if (rc)
2613                 return rc;
2614
2615         OBD_ALLOC(cmd->u.tc_start.ts_ids_str, src.ls_len + 1);
2616         if (cmd->u.tc_start.ts_ids_str == NULL)
2617                 return -ENOMEM;
2618
2619         strlcpy(cmd->u.tc_start.ts_ids_str, src.ls_str, src.ls_len + 1);
2620
2621         rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2622                                    strlen(cmd->u.tc_start.ts_ids_str),
2623                                    &cmd->u.tc_start.ts_ids, tif);
2624         if (rc)
2625                 nrs_tbf_id_cmd_fini(cmd);
2626
2627         return rc;
2628 }
2629
2630 static int
2631 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2632                      struct nrs_tbf_rule *rule,
2633                      struct nrs_tbf_cmd *start)
2634 {
2635         struct nrs_tbf_head *head = rule->tr_head;
2636         int rc = 0;
2637         enum nrs_tbf_flag tif = head->th_type_flag;
2638         int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2639
2640         LASSERT(start->u.tc_start.ts_ids_str);
2641         INIT_LIST_HEAD(&rule->tr_ids);
2642
2643         OBD_ALLOC(rule->tr_ids_str, ids_len);
2644         if (rule->tr_ids_str == NULL)
2645                 return -ENOMEM;
2646
2647         strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2648                 ids_len);
2649
2650         if (!list_empty(&start->u.tc_start.ts_ids)) {
2651                 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2652                                            strlen(rule->tr_ids_str),
2653                                            &rule->tr_ids, tif);
2654                 if (rc)
2655                         CERROR("%ss {%s} illegal\n",
2656                                tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2657                                rule->tr_ids_str);
2658         }
2659         if (rc) {
2660                 OBD_FREE(rule->tr_ids_str, ids_len);
2661                 rule->tr_ids_str = NULL;
2662         }
2663         return rc;
2664 }
2665
2666 static int
2667 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2668 {
2669         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2670                    rule->tr_ids_str, rule->tr_rpc_rate,
2671                    atomic_read(&rule->tr_ref) - 1);
2672         return 0;
2673 }
2674
2675 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2676 {
2677         nrs_tbf_id_list_free(&rule->tr_ids);
2678         if (rule->tr_ids_str != NULL)
2679                 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2680 }
2681
2682 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2683         .o_name = NRS_TBF_TYPE_UID,
2684         .o_startup = nrs_tbf_id_startup,
2685         .o_cli_find = nrs_tbf_id_cli_find,
2686         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2687         .o_cli_put = nrs_tbf_nid_cli_put,
2688         .o_cli_init = nrs_tbf_uid_cli_init,
2689         .o_rule_init = nrs_tbf_id_rule_init,
2690         .o_rule_dump = nrs_tbf_id_rule_dump,
2691         .o_rule_match = nrs_tbf_id_rule_match,
2692         .o_rule_fini = nrs_tbf_id_rule_fini,
2693 };
2694
2695 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2696         .o_name = NRS_TBF_TYPE_GID,
2697         .o_startup = nrs_tbf_id_startup,
2698         .o_cli_find = nrs_tbf_id_cli_find,
2699         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2700         .o_cli_put = nrs_tbf_nid_cli_put,
2701         .o_cli_init = nrs_tbf_gid_cli_init,
2702         .o_rule_init = nrs_tbf_id_rule_init,
2703         .o_rule_dump = nrs_tbf_id_rule_dump,
2704         .o_rule_match = nrs_tbf_id_rule_match,
2705         .o_rule_fini = nrs_tbf_id_rule_fini,
2706 };
2707
2708 static struct nrs_tbf_type nrs_tbf_types[] = {
2709         {
2710                 .ntt_name = NRS_TBF_TYPE_JOBID,
2711                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2712                 .ntt_ops = &nrs_tbf_jobid_ops,
2713         },
2714         {
2715                 .ntt_name = NRS_TBF_TYPE_NID,
2716                 .ntt_flag = NRS_TBF_FLAG_NID,
2717                 .ntt_ops = &nrs_tbf_nid_ops,
2718         },
2719         {
2720                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2721                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2722                 .ntt_ops = &nrs_tbf_opcode_ops,
2723         },
2724         {
2725                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2726                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2727                 .ntt_ops = &nrs_tbf_generic_ops,
2728         },
2729         {
2730                 .ntt_name = NRS_TBF_TYPE_UID,
2731                 .ntt_flag = NRS_TBF_FLAG_UID,
2732                 .ntt_ops = &nrs_tbf_uid_ops,
2733         },
2734         {
2735                 .ntt_name = NRS_TBF_TYPE_GID,
2736                 .ntt_flag = NRS_TBF_FLAG_GID,
2737                 .ntt_ops = &nrs_tbf_gid_ops,
2738         },
2739 };
2740
2741 /**
2742  * Is called before the policy transitions into
2743  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2744  * policy-specific private data structure.
2745  *
2746  * \param[in] policy The policy to start
2747  *
2748  * \retval -ENOMEM OOM error
2749  * \retval  0      success
2750  *
2751  * \see nrs_policy_register()
2752  * \see nrs_policy_ctl()
2753  */
2754 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2755 {
2756         struct nrs_tbf_head     *head;
2757         struct nrs_tbf_ops      *ops;
2758         __u32                    type;
2759         char                    *name;
2760         int found = 0;
2761         int i;
2762         int rc = 0;
2763
2764         if (arg == NULL)
2765                 name = NRS_TBF_TYPE_GENERIC;
2766         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2767                 name = arg;
2768         else
2769                 GOTO(out, rc = -EINVAL);
2770
2771         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2772                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2773                         ops = nrs_tbf_types[i].ntt_ops;
2774                         type = nrs_tbf_types[i].ntt_flag;
2775                         found = 1;
2776                         break;
2777                 }
2778         }
2779         if (found == 0)
2780                 GOTO(out, rc = -ENOTSUPP);
2781
2782         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2783         if (head == NULL)
2784                 GOTO(out, rc = -ENOMEM);
2785
2786         memcpy(head->th_type, name, strlen(name));
2787         head->th_type[strlen(name)] = '\0';
2788         head->th_ops = ops;
2789         head->th_type_flag = type;
2790
2791         head->th_binheap = binheap_create(&nrs_tbf_heap_ops,
2792                                           CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2793                                           nrs_pol2cptab(policy),
2794                                           nrs_pol2cptid(policy));
2795         if (head->th_binheap == NULL)
2796                 GOTO(out_free_head, rc = -ENOMEM);
2797
2798         atomic_set(&head->th_rule_sequence, 0);
2799         spin_lock_init(&head->th_rule_lock);
2800         INIT_LIST_HEAD(&head->th_list);
2801         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2802         head->th_timer.function = nrs_tbf_timer_cb;
2803         rc = head->th_ops->o_startup(policy, head);
2804         if (rc)
2805                 GOTO(out_free_heap, rc);
2806
2807         policy->pol_private = head;
2808         return 0;
2809 out_free_heap:
2810         binheap_destroy(head->th_binheap);
2811 out_free_head:
2812         OBD_FREE_PTR(head);
2813 out:
2814         return rc;
2815 }
2816
2817 /**
2818  * Is called before the policy transitions into
2819  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2820  * private data structure.
2821  *
2822  * \param[in] policy The policy to stop
2823  *
2824  * \see nrs_policy_stop0()
2825  */
2826 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2827 {
2828         struct nrs_tbf_head *head = policy->pol_private;
2829         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2830         struct nrs_tbf_rule *rule, *n;
2831
2832         LASSERT(head != NULL);
2833         LASSERT(head->th_cli_hash != NULL);
2834         hrtimer_cancel(&head->th_timer);
2835         /* Should cleanup hash first before free rules */
2836         cfs_hash_putref(head->th_cli_hash);
2837         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2838                 list_del_init(&rule->tr_linkage);
2839                 nrs_tbf_rule_put(rule);
2840         }
2841         LASSERT(list_empty(&head->th_list));
2842         LASSERT(head->th_binheap != NULL);
2843         LASSERT(binheap_is_empty(head->th_binheap));
2844         binheap_destroy(head->th_binheap);
2845         OBD_FREE_PTR(head);
2846         nrs->nrs_throttling = 0;
2847         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2848 }
2849
2850 /**
2851  * Performs a policy-specific ctl function on TBF policy instances; similar
2852  * to ioctl.
2853  *
2854  * \param[in]     policy the policy instance
2855  * \param[in]     opc    the opcode
2856  * \param[in,out] arg    used for passing parameters and information
2857  *
2858  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2859  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2860  *
2861  * \retval 0   operation carried out successfully
2862  * \retval -ve error
2863  */
2864 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2865                        enum ptlrpc_nrs_ctl opc,
2866                        void *arg)
2867 {
2868         int rc = 0;
2869         ENTRY;
2870
2871         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2872
2873         switch ((enum nrs_ctl_tbf)opc) {
2874         default:
2875                 RETURN(-EINVAL);
2876
2877         /**
2878          * Read RPC rate size of a policy instance.
2879          */
2880         case NRS_CTL_TBF_RD_RULE: {
2881                 struct nrs_tbf_head *head = policy->pol_private;
2882                 struct seq_file *m = arg;
2883                 struct ptlrpc_service_part *svcpt;
2884
2885                 svcpt = policy->pol_nrs->nrs_svcpt;
2886                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2887
2888                 rc = nrs_tbf_rule_dump_all(head, m);
2889                 }
2890                 break;
2891
2892         /**
2893          * Write RPC rate of a policy instance.
2894          */
2895         case NRS_CTL_TBF_WR_RULE: {
2896                 struct nrs_tbf_head *head = policy->pol_private;
2897                 struct nrs_tbf_cmd *cmd;
2898
2899                 cmd = (struct nrs_tbf_cmd *)arg;
2900                 rc = nrs_tbf_command(policy,
2901                                      head,
2902                                      cmd);
2903                 }
2904                 break;
2905         /**
2906          * Read the TBF policy type of a policy instance.
2907          */
2908         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2909                 struct nrs_tbf_head *head = policy->pol_private;
2910
2911                 *(__u32 *)arg = head->th_type_flag;
2912                 }
2913                 break;
2914         }
2915
2916         RETURN(rc);
2917 }
2918
2919 /**
2920  * Is called for obtaining a TBF policy resource.
2921  *
2922  * \param[in]  policy     The policy on which the request is being asked for
2923  * \param[in]  nrq        The request for which resources are being taken
2924  * \param[in]  parent     Parent resource, unused in this policy
2925  * \param[out] resp       Resources references are placed in this array
2926  * \param[in]  moving_req Signifies limited caller context; unused in this
2927  *                        policy
2928  *
2929  *
2930  * \see nrs_resource_get_safe()
2931  */
2932 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2933                            struct ptlrpc_nrs_request *nrq,
2934                            const struct ptlrpc_nrs_resource *parent,
2935                            struct ptlrpc_nrs_resource **resp,
2936                            bool moving_req)
2937 {
2938         struct nrs_tbf_head   *head;
2939         struct nrs_tbf_client *cli;
2940         struct nrs_tbf_client *tmp;
2941         struct ptlrpc_request *req;
2942
2943         if (parent == NULL) {
2944                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2945                 return 0;
2946         }
2947
2948         head = container_of(parent, struct nrs_tbf_head, th_res);
2949         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2950         cli = head->th_ops->o_cli_find(head, req);
2951         if (cli != NULL) {
2952                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2953                 LASSERT(cli->tc_rule);
2954                 if (cli->tc_rule_sequence !=
2955                     atomic_read(&head->th_rule_sequence) ||
2956                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2957                         struct nrs_tbf_rule *rule;
2958
2959                         CDEBUG(D_RPCTRACE,
2960                                "TBF class@%p rate %llu sequence %d, "
2961                                "rule flags %d, head sequence %d\n",
2962                                cli, cli->tc_rpc_rate,
2963                                cli->tc_rule_sequence,
2964                                cli->tc_rule->tr_flags,
2965                                atomic_read(&head->th_rule_sequence));
2966                         rule = nrs_tbf_rule_match(head, cli);
2967                         if (rule != cli->tc_rule) {
2968                                 nrs_tbf_cli_reset(head, rule, cli);
2969                         } else {
2970                                 if (cli->tc_rule_generation != rule->tr_generation)
2971                                         nrs_tbf_cli_reset_value(head, cli);
2972                                 nrs_tbf_rule_put(rule);
2973                         }
2974                 } else if (cli->tc_rule_generation !=
2975                            cli->tc_rule->tr_generation) {
2976                         nrs_tbf_cli_reset_value(head, cli);
2977                 }
2978                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2979                 goto out;
2980         }
2981
2982         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2983                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2984         if (cli == NULL)
2985                 return -ENOMEM;
2986
2987         nrs_tbf_cli_init(head, cli, req);
2988         tmp = head->th_ops->o_cli_findadd(head, cli);
2989         if (tmp != cli) {
2990                 atomic_dec(&cli->tc_ref);
2991                 nrs_tbf_cli_fini(cli);
2992                 cli = tmp;
2993         }
2994 out:
2995         *resp = &cli->tc_res;
2996
2997         return 1;
2998 }
2999
3000 /**
3001  * Called when releasing references to the resource hierachy obtained for a
3002  * request for scheduling using the TBF policy.
3003  *
3004  * \param[in] policy   the policy the resource belongs to
3005  * \param[in] res      the resource to be released
3006  */
3007 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
3008                             const struct ptlrpc_nrs_resource *res)
3009 {
3010         struct nrs_tbf_head   *head;
3011         struct nrs_tbf_client *cli;
3012
3013         /**
3014          * Do nothing for freeing parent, nrs_tbf_net resources
3015          */
3016         if (res->res_parent == NULL)
3017                 return;
3018
3019         cli = container_of(res, struct nrs_tbf_client, tc_res);
3020         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
3021
3022         head->th_ops->o_cli_put(head, cli);
3023 }
3024
3025 /**
3026  * Called when getting a request from the TBF policy for handling, or just
3027  * peeking; removes the request from the policy when it is to be handled.
3028  *
3029  * \param[in] policy The policy
3030  * \param[in] peek   When set, signifies that we just want to examine the
3031  *                   request, and not handle it, so the request is not removed
3032  *                   from the policy.
3033  * \param[in] force  Force the policy to return a request; unused in this
3034  *                   policy
3035  *
3036  * \retval The request to be handled; this is the next request in the TBF
3037  *         rule
3038  *
3039  * \see ptlrpc_nrs_req_get_nolock()
3040  * \see nrs_request_get()
3041  */
3042 static
3043 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3044                                            bool peek, bool force)
3045 {
3046         struct nrs_tbf_head       *head = policy->pol_private;
3047         struct ptlrpc_nrs_request *nrq = NULL;
3048         struct nrs_tbf_client     *cli;
3049         struct binheap_node       *node;
3050
3051         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3052
3053         if (!peek && policy->pol_nrs->nrs_throttling)
3054                 return NULL;
3055
3056         node = binheap_root(head->th_binheap);
3057         if (unlikely(node == NULL))
3058                 return NULL;
3059
3060         cli = container_of(node, struct nrs_tbf_client, tc_node);
3061         LASSERT(cli->tc_in_heap);
3062         if (peek) {
3063                 nrq = list_entry(cli->tc_list.next,
3064                                      struct ptlrpc_nrs_request,
3065                                      nr_u.tbf.tr_list);
3066         } else {
3067                 struct nrs_tbf_rule *rule = cli->tc_rule;
3068                 __u64 now = ktime_to_ns(ktime_get());
3069                 __u64 passed;
3070                 __u64 ntoken;
3071                 __u64 deadline;
3072                 __u64 old_resid = 0;
3073
3074                 deadline = cli->tc_check_time +
3075                           cli->tc_nsecs;
3076                 LASSERT(now >= cli->tc_check_time);
3077                 passed = now - cli->tc_check_time;
3078                 ntoken = passed * cli->tc_rpc_rate;
3079                 do_div(ntoken, NSEC_PER_SEC);
3080
3081                 ntoken += cli->tc_ntoken;
3082                 if (rule->tr_flags & NTRS_REALTIME) {
3083                         LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3084                         old_resid = cli->tc_nsecs_resid;
3085                         cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3086                         if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3087                                 ntoken++;
3088                                 cli->tc_nsecs_resid -= cli->tc_nsecs;
3089                         }
3090                 } else if (ntoken > cli->tc_depth)
3091                         ntoken = cli->tc_depth;
3092
3093                 if (ntoken > 0) {
3094                         struct ptlrpc_request *req;
3095                         nrq = list_entry(cli->tc_list.next,
3096                                              struct ptlrpc_nrs_request,
3097                                              nr_u.tbf.tr_list);
3098                         req = container_of(nrq,
3099                                            struct ptlrpc_request,
3100                                            rq_nrq);
3101                         ntoken--;
3102                         cli->tc_ntoken = ntoken;
3103                         cli->tc_check_time = now;
3104                         list_del_init(&nrq->nr_u.tbf.tr_list);
3105                         if (list_empty(&cli->tc_list)) {
3106                                 binheap_remove(head->th_binheap,
3107                                                &cli->tc_node);
3108                                 cli->tc_in_heap = false;
3109                         } else {
3110                                 if (!(rule->tr_flags & NTRS_REALTIME))
3111                                         cli->tc_deadline = now + cli->tc_nsecs;
3112                                 binheap_relocate(head->th_binheap,
3113                                                  &cli->tc_node);
3114                         }
3115                         CDEBUG(D_RPCTRACE,
3116                                "TBF dequeues: class@%p rate %llu gen %llu token %llu, rule@%p rate %llu gen %llu\n",
3117                                cli, cli->tc_rpc_rate,
3118                                cli->tc_rule_generation, cli->tc_ntoken,
3119                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3120                                cli->tc_rule->tr_generation);
3121                 } else {
3122                         ktime_t time;
3123
3124                         if (rule->tr_flags & NTRS_REALTIME) {
3125                                 cli->tc_deadline = deadline;
3126                                 cli->tc_nsecs_resid = old_resid;
3127                                 binheap_relocate(head->th_binheap,
3128                                                  &cli->tc_node);
3129                                 if (node != binheap_root(head->th_binheap))
3130                                         return nrs_tbf_req_get(policy,
3131                                                                peek, force);
3132                         }
3133                         policy->pol_nrs->nrs_throttling = 1;
3134                         head->th_deadline = deadline;
3135                         time = ktime_set(0, 0);
3136                         time = ktime_add_ns(time, deadline);
3137                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3138                 }
3139         }
3140
3141         return nrq;
3142 }
3143
3144 /**
3145  * Adds request \a nrq to \a policy's list of queued requests
3146  *
3147  * \param[in] policy The policy
3148  * \param[in] nrq    The request to add
3149  *
3150  * \retval 0 success; nrs_request_enqueue() assumes this function will always
3151  *                    succeed
3152  */
3153 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3154                            struct ptlrpc_nrs_request *nrq)
3155 {
3156         struct nrs_tbf_head   *head;
3157         struct nrs_tbf_client *cli;
3158         int                    rc = 0;
3159
3160         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3161
3162         cli = container_of(nrs_request_resource(nrq),
3163                            struct nrs_tbf_client, tc_res);
3164         head = container_of(nrs_request_resource(nrq)->res_parent,
3165                             struct nrs_tbf_head, th_res);
3166         if (list_empty(&cli->tc_list)) {
3167                 LASSERT(!cli->tc_in_heap);
3168                 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3169                 rc = binheap_insert(head->th_binheap, &cli->tc_node);
3170                 if (rc == 0) {
3171                         cli->tc_in_heap = true;
3172                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3173                         list_add_tail(&nrq->nr_u.tbf.tr_list,
3174                                           &cli->tc_list);
3175                         if (policy->pol_nrs->nrs_throttling) {
3176                                 __u64 deadline = cli->tc_deadline;
3177                                 if ((head->th_deadline > deadline) &&
3178                                     (hrtimer_try_to_cancel(&head->th_timer)
3179                                      >= 0)) {
3180                                         ktime_t time;
3181                                         head->th_deadline = deadline;
3182                                         time = ktime_set(0, 0);
3183                                         time = ktime_add_ns(time, deadline);
3184                                         hrtimer_start(&head->th_timer, time,
3185                                                       HRTIMER_MODE_ABS);
3186                                 }
3187                         }
3188                 }
3189         } else {
3190                 LASSERT(cli->tc_in_heap);
3191                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3192                 list_add_tail(&nrq->nr_u.tbf.tr_list,
3193                                   &cli->tc_list);
3194         }
3195
3196         if (rc == 0)
3197                 CDEBUG(D_RPCTRACE,
3198                        "TBF enqueues: class@%p rate %llu gen %llu token %llu, rule@%p rate %llu gen %llu\n",
3199                        cli, cli->tc_rpc_rate,
3200                        cli->tc_rule_generation, cli->tc_ntoken,
3201                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3202                        cli->tc_rule->tr_generation);
3203
3204         return rc;
3205 }
3206
3207 /**
3208  * Removes request \a nrq from \a policy's list of queued requests.
3209  *
3210  * \param[in] policy The policy
3211  * \param[in] nrq    The request to remove
3212  */
3213 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3214                              struct ptlrpc_nrs_request *nrq)
3215 {
3216         struct nrs_tbf_head   *head;
3217         struct nrs_tbf_client *cli;
3218
3219         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3220
3221         cli = container_of(nrs_request_resource(nrq),
3222                            struct nrs_tbf_client, tc_res);
3223         head = container_of(nrs_request_resource(nrq)->res_parent,
3224                             struct nrs_tbf_head, th_res);
3225
3226         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3227         list_del_init(&nrq->nr_u.tbf.tr_list);
3228         if (list_empty(&cli->tc_list)) {
3229                 binheap_remove(head->th_binheap,
3230                                &cli->tc_node);
3231                 cli->tc_in_heap = false;
3232         } else {
3233                 binheap_relocate(head->th_binheap,
3234                                  &cli->tc_node);
3235         }
3236 }
3237
3238 /**
3239  * Prints a debug statement right before the request \a nrq stops being
3240  * handled.
3241  *
3242  * \param[in] policy The policy handling the request
3243  * \param[in] nrq    The request being handled
3244  *
3245  * \see ptlrpc_server_finish_request()
3246  * \see ptlrpc_nrs_req_stop_nolock()
3247  */
3248 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3249                               struct ptlrpc_nrs_request *nrq)
3250 {
3251         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3252                                                   rq_nrq);
3253
3254         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3255
3256         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3257                policy->pol_desc->pd_name, libcfs_idstr(&req->rq_peer),
3258                nrq->nr_u.tbf.tr_sequence);
3259 }
3260
3261 /**
3262  * debugfs interface
3263  */
3264
3265 /**
3266  * The maximum RPC rate.
3267  */
3268 #define LPROCFS_NRS_RATE_MAX            1000000ULL      /* 1rpc/us */
3269
3270 static int
3271 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3272 {
3273         struct ptlrpc_service       *svc = m->private;
3274         int                          rc;
3275
3276         seq_printf(m, "regular_requests:\n");
3277         /**
3278          * Perform two separate calls to this as only one of the NRS heads'
3279          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3280          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3281          */
3282         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3283                                        NRS_POL_NAME_TBF,
3284                                        NRS_CTL_TBF_RD_RULE,
3285                                        false, m);
3286         if (rc == 0) {
3287                 /**
3288                  * -ENOSPC means buf in the parameter m is overflow, return 0
3289                  * here to let upper layer function seq_read alloc a larger
3290                  * memory area and do this process again.
3291                  */
3292         } else if (rc == -ENOSPC) {
3293                 return 0;
3294
3295                 /**
3296                  * Ignore -ENODEV as the regular NRS head's policy may be in the
3297                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3298                  */
3299         } else if (rc != -ENODEV) {
3300                 return rc;
3301         }
3302
3303         if (!nrs_svc_has_hp(svc))
3304                 goto no_hp;
3305
3306         seq_printf(m, "high_priority_requests:\n");
3307         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3308                                        NRS_POL_NAME_TBF,
3309                                        NRS_CTL_TBF_RD_RULE,
3310                                        false, m);
3311         if (rc == 0) {
3312                 /**
3313                  * -ENOSPC means buf in the parameter m is overflow, return 0
3314                  * here to let upper layer function seq_read alloc a larger
3315                  * memory area and do this process again.
3316                  */
3317         } else if (rc == -ENOSPC) {
3318                 return 0;
3319         }
3320
3321 no_hp:
3322
3323         return rc;
3324 }
3325
3326 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3327 {
3328         int rc;
3329         ENTRY;
3330
3331         switch (cmd->u.tc_start.ts_valid_type) {
3332         case NRS_TBF_FLAG_JOBID:
3333                 rc = nrs_tbf_jobid_parse(cmd, token);
3334                 break;
3335         case NRS_TBF_FLAG_NID:
3336                 rc = nrs_tbf_nid_parse(cmd, token);
3337                 break;
3338         case NRS_TBF_FLAG_OPCODE:
3339                 rc = nrs_tbf_opcode_parse(cmd, token);
3340                 break;
3341         case NRS_TBF_FLAG_GENERIC:
3342                 rc = nrs_tbf_generic_parse(cmd, token);
3343                 break;
3344         case NRS_TBF_FLAG_UID:
3345         case NRS_TBF_FLAG_GID:
3346                 rc = nrs_tbf_ug_id_parse(cmd, token);
3347                 break;
3348         default:
3349                 RETURN(-EINVAL);
3350         }
3351
3352         RETURN(rc);
3353 }
3354
3355 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3356 {
3357         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3358                 switch (cmd->u.tc_start.ts_valid_type) {
3359                 case NRS_TBF_FLAG_JOBID:
3360                         nrs_tbf_jobid_cmd_fini(cmd);
3361                         break;
3362                 case NRS_TBF_FLAG_NID:
3363                         nrs_tbf_nid_cmd_fini(cmd);
3364                         break;
3365                 case NRS_TBF_FLAG_OPCODE:
3366                         nrs_tbf_opcode_cmd_fini(cmd);
3367                         break;
3368                 case NRS_TBF_FLAG_GENERIC:
3369                         nrs_tbf_generic_cmd_fini(cmd);
3370                         break;
3371                 case NRS_TBF_FLAG_UID:
3372                 case NRS_TBF_FLAG_GID:
3373                         nrs_tbf_id_cmd_fini(cmd);
3374                         break;
3375                 default:
3376                         CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3377                               cmd->u.tc_start.ts_valid_type);
3378                 }
3379         }
3380 }
3381
3382 static int check_rule_name(const char *name)
3383 {
3384         int i;
3385
3386         if (name[0] == '\0')
3387                 return -EINVAL;
3388
3389         for (i = 0; name[i] != '\0' && i < MAX_TBF_NAME; i++) {
3390                 if (!isalnum(name[i]) && name[i] != '_')
3391                         return -EINVAL;
3392         }
3393
3394         if (i == MAX_TBF_NAME)
3395                 return -ENAMETOOLONG;
3396
3397         return 0;
3398 }
3399
3400 static int
3401 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3402 {
3403         char    *key;
3404         char    *val;
3405         int      rc;
3406         __u64    rate;
3407
3408         val = buffer;
3409         key = strsep(&val, "=");
3410         if (val == NULL || strlen(val) == 0)
3411                 return -EINVAL;
3412
3413         /* Key of the value pair */
3414         if (strcmp(key, "rate") == 0) {
3415                 rc = kstrtoull(val, 10, &rate);
3416                 if (rc)
3417                         return rc;
3418
3419                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3420                         return -EINVAL;
3421
3422                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3423                         cmd->u.tc_start.ts_rpc_rate = rate;
3424                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3425                         cmd->u.tc_change.tc_rpc_rate = rate;
3426                 else
3427                         return -EINVAL;
3428         }  else if (strcmp(key, "rank") == 0) {
3429                 rc = check_rule_name(val);
3430                 if (rc)
3431                         return rc;
3432
3433                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3434                         cmd->u.tc_start.ts_next_name = val;
3435                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3436                         cmd->u.tc_change.tc_next_name = val;
3437                 else
3438                         return -EINVAL;
3439         } else if (strcmp(key, "realtime") == 0) {
3440                 unsigned long realtime;
3441
3442                 rc = kstrtoul(val, 10, &realtime);
3443                 if (rc)
3444                         return rc;
3445
3446                 if (realtime > 0)
3447                         cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3448         } else {
3449                 return -EINVAL;
3450         }
3451         return 0;
3452 }
3453
3454 static int
3455 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3456 {
3457         char    *val;
3458         char    *token;
3459         int      rc;
3460
3461         val = buffer;
3462         while (val != NULL && strlen(val) != 0) {
3463                 token = strsep(&val, " ");
3464                 rc = nrs_tbf_parse_value_pair(cmd, token);
3465                 if (rc)
3466                         return rc;
3467         }
3468
3469         switch (cmd->tc_cmd) {
3470         case NRS_CTL_TBF_START_RULE:
3471                 if (cmd->u.tc_start.ts_rpc_rate == 0)
3472                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3473                 break;
3474         case NRS_CTL_TBF_CHANGE_RULE:
3475                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3476                     cmd->u.tc_change.tc_next_name == NULL)
3477                         return -EINVAL;
3478                 break;
3479         case NRS_CTL_TBF_STOP_RULE:
3480                 break;
3481         default:
3482                 return -EINVAL;
3483         }
3484         return 0;
3485 }
3486
3487 static struct nrs_tbf_cmd *
3488 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3489 {
3490         struct nrs_tbf_cmd *cmd;
3491         char *token;
3492         char *val;
3493         int rc = 0;
3494
3495         OBD_ALLOC_PTR(cmd);
3496         if (cmd == NULL)
3497                 GOTO(out, rc = -ENOMEM);
3498         memset(cmd, 0, sizeof(*cmd));
3499
3500         val = buffer;
3501         token = strsep(&val, " ");
3502         if (val == NULL || strlen(val) == 0)
3503                 GOTO(out_free_cmd, rc = -EINVAL);
3504
3505         /* Type of the command */
3506         if (strcmp(token, "start") == 0) {
3507                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3508                 cmd->u.tc_start.ts_valid_type = type_flag;
3509         } else if (strcmp(token, "stop") == 0)
3510                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3511         else if (strcmp(token, "change") == 0)
3512                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3513         else
3514                 GOTO(out_free_cmd, rc = -EINVAL);
3515
3516         /* Name of the rule */
3517         token = strsep(&val, " ");
3518         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE))
3519                 GOTO(out_free_cmd, rc = -EINVAL);
3520
3521         rc = check_rule_name(token);
3522         if (rc)
3523                 GOTO(out_free_cmd, rc);
3524
3525         cmd->tc_name = token;
3526
3527         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3528                 /* List of ID */
3529                 LASSERT(val);
3530                 token = val;
3531                 val = strrchr(token, '}');
3532                 if (!val)
3533                         GOTO(out_free_cmd, rc = -EINVAL);
3534
3535                 /* Skip '}' */
3536                 val++;
3537                 if (*val == '\0') {
3538                         val = NULL;
3539                 } else if (*val == ' ') {
3540                         *val = '\0';
3541                         val++;
3542                 } else
3543                         GOTO(out_free_cmd, rc = -EINVAL);
3544
3545                 rc = nrs_tbf_id_parse(cmd, token);
3546                 if (rc)
3547                         GOTO(out_free_cmd, rc);
3548         }
3549
3550         rc = nrs_tbf_parse_value_pairs(cmd, val);
3551         if (rc)
3552                 GOTO(out_cmd_fini, rc = -EINVAL);
3553         goto out;
3554 out_cmd_fini:
3555         nrs_tbf_cmd_fini(cmd);
3556 out_free_cmd:
3557         OBD_FREE_PTR(cmd);
3558 out:
3559         if (rc)
3560                 cmd = ERR_PTR(rc);
3561         return cmd;
3562 }
3563
3564 /**
3565  * Get the TBF policy type (nid, jobid, etc) preset by
3566  * proc entry 'nrs_policies' for command buffer parsing.
3567  *
3568  * \param[in] svc the PTLRPC service
3569  * \param[in] queue the NRS queue type
3570  *
3571  * \retval the preset TBF policy type flag
3572  */
3573 static __u32
3574 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3575 {
3576         __u32   type;
3577         int     rc;
3578
3579         rc = ptlrpc_nrs_policy_control(svc, queue,
3580                                        NRS_POL_NAME_TBF,
3581                                        NRS_CTL_TBF_RD_TYPE_FLAG,
3582                                        true, &type);
3583         if (rc != 0)
3584                 type = NRS_TBF_FLAG_INVALID;
3585
3586         return type;
3587 }
3588
3589 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3590 static ssize_t
3591 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3592                                       const char __user *buffer,
3593                                       size_t count, loff_t *off)
3594 {
3595         struct seq_file *m = file->private_data;
3596         struct ptlrpc_service *svc = m->private;
3597         char *kernbuf;
3598         char *val;
3599         int rc;
3600         struct nrs_tbf_cmd *cmd;
3601         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3602         unsigned long length;
3603         char *token;
3604
3605         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3606         if (kernbuf == NULL)
3607                 GOTO(out, rc = -ENOMEM);
3608
3609         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3610                 GOTO(out_free_kernbuff, rc = -EINVAL);
3611
3612         if (copy_from_user(kernbuf, buffer, count))
3613                 GOTO(out_free_kernbuff, rc = -EFAULT);
3614
3615         val = kernbuf;
3616         token = strsep(&val, " ");
3617         if (val == NULL)
3618                 GOTO(out_free_kernbuff, rc = -EINVAL);
3619
3620         if (strcmp(token, "reg") == 0) {
3621                 queue = PTLRPC_NRS_QUEUE_REG;
3622         } else if (strcmp(token, "hp") == 0) {
3623                 queue = PTLRPC_NRS_QUEUE_HP;
3624         } else {
3625                 kernbuf[strlen(token)] = ' ';
3626                 val = kernbuf;
3627         }
3628         length = strlen(val);
3629
3630         if (length == 0)
3631                 GOTO(out_free_kernbuff, rc = -EINVAL);
3632
3633         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3634                 GOTO(out_free_kernbuff, rc = -ENODEV);
3635         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3636                 queue = PTLRPC_NRS_QUEUE_REG;
3637
3638         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3639         if (IS_ERR(cmd))
3640                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3641
3642         /**
3643          * Serialize NRS core lprocfs operations with policy registration/
3644          * unregistration.
3645          */
3646         mutex_lock(&nrs_core.nrs_mutex);
3647         rc = ptlrpc_nrs_policy_control(svc, queue,
3648                                        NRS_POL_NAME_TBF,
3649                                        NRS_CTL_TBF_WR_RULE,
3650                                        false, cmd);
3651         mutex_unlock(&nrs_core.nrs_mutex);
3652
3653         nrs_tbf_cmd_fini(cmd);
3654         OBD_FREE_PTR(cmd);
3655 out_free_kernbuff:
3656         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3657 out:
3658         return rc ? rc : count;
3659 }
3660
3661 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3662
3663 /**
3664  * Initializes a TBF policy's lprocfs interface for service \a svc
3665  *
3666  * \param[in] svc the service
3667  *
3668  * \retval 0    success
3669  * \retval != 0 error
3670  */
3671 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3672 {
3673         struct ldebugfs_vars nrs_tbf_lprocfs_vars[] = {
3674                 { .name         = "nrs_tbf_rule",
3675                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3676                   .data = svc },
3677                 { NULL }
3678         };
3679
3680         if (!svc->srv_debugfs_entry)
3681                 return 0;
3682
3683         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_tbf_lprocfs_vars, NULL);
3684
3685         return 0;
3686 }
3687
3688 /**
3689  * TBF policy operations
3690  */
3691 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3692         .op_policy_start        = nrs_tbf_start,
3693         .op_policy_stop         = nrs_tbf_stop,
3694         .op_policy_ctl          = nrs_tbf_ctl,
3695         .op_res_get             = nrs_tbf_res_get,
3696         .op_res_put             = nrs_tbf_res_put,
3697         .op_req_get             = nrs_tbf_req_get,
3698         .op_req_enqueue         = nrs_tbf_req_add,
3699         .op_req_dequeue         = nrs_tbf_req_del,
3700         .op_req_stop            = nrs_tbf_req_stop,
3701         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3702 };
3703
3704 /**
3705  * TBF policy configuration
3706  */
3707 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3708         .nc_name                = NRS_POL_NAME_TBF,
3709         .nc_ops                 = &nrs_tbf_ops,
3710         .nc_compat              = nrs_policy_compat_all,
3711 };
3712
3713 /** @} tbf */
3714
3715 /** @} nrs */