Whamcloud - gitweb
LU-6142 lustre: make ldebugfs_add_vars a void function
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 #ifdef HAVE_SERVER_SUPPORT
35
36 /**
37  * \addtogoup nrs
38  * @{
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <libcfs/libcfs.h>
45 #include <lustre_req_layout.h>
46 #include "ptlrpc_internal.h"
47
48 /**
49  * \name tbf
50  *
51  * Token Bucket Filter over client NIDs
52  *
53  * @{
54  */
55
56 #define NRS_POL_NAME_TBF        "tbf"
57
58 static int tbf_jobid_cache_size = 8192;
59 module_param(tbf_jobid_cache_size, int, 0644);
60 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
61
62 static int tbf_rate = 10000;
63 module_param(tbf_rate, int, 0644);
64 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
65
66 static int tbf_depth = 3;
67 module_param(tbf_depth, int, 0644);
68 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
69
70 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
71 {
72         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
73                                                  th_timer);
74         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
75         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
76
77         nrs->nrs_throttling = 0;
78         wake_up(&svcpt->scp_waitq);
79
80         return HRTIMER_NORESTART;
81 }
82
83 #define NRS_TBF_DEFAULT_RULE "default"
84
85 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
86 {
87         LASSERT(atomic_read(&rule->tr_ref) == 0);
88         LASSERT(list_empty(&rule->tr_cli_list));
89         LASSERT(list_empty(&rule->tr_linkage));
90
91         rule->tr_head->th_ops->o_rule_fini(rule);
92         OBD_FREE_PTR(rule);
93 }
94
95 /**
96  * Decreases the rule's usage reference count, and stops the rule in case it
97  * was already stopping and have no more outstanding usage references (which
98  * indicates it has no more queued or started requests, and can be safely
99  * stopped).
100  */
101 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
102 {
103         if (atomic_dec_and_test(&rule->tr_ref))
104                 nrs_tbf_rule_fini(rule);
105 }
106
107 /**
108  * Increases the rule's usage reference count.
109  */
110 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
111 {
112         atomic_inc(&rule->tr_ref);
113 }
114
115 static void
116 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
117 {
118         LASSERT(!list_empty(&cli->tc_linkage));
119         LASSERT(cli->tc_rule);
120         spin_lock(&cli->tc_rule->tr_rule_lock);
121         list_del_init(&cli->tc_linkage);
122         spin_unlock(&cli->tc_rule->tr_rule_lock);
123         nrs_tbf_rule_put(cli->tc_rule);
124         cli->tc_rule = NULL;
125 }
126
127 static void
128 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
129                         struct nrs_tbf_client *cli)
130
131 {
132         struct nrs_tbf_rule *rule = cli->tc_rule;
133
134         cli->tc_rpc_rate = rule->tr_rpc_rate;
135         cli->tc_nsecs = rule->tr_nsecs_per_rpc;
136         cli->tc_depth = rule->tr_depth;
137         cli->tc_ntoken = rule->tr_depth;
138         cli->tc_check_time = ktime_to_ns(ktime_get());
139         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
140         cli->tc_rule_generation = rule->tr_generation;
141
142         if (cli->tc_in_heap)
143                 cfs_binheap_relocate(head->th_binheap,
144                                      &cli->tc_node);
145 }
146
147 static void
148 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
149                   struct nrs_tbf_rule *rule,
150                   struct nrs_tbf_client *cli)
151 {
152         spin_lock(&cli->tc_rule_lock);
153         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
154                 LASSERT(rule != cli->tc_rule);
155                 nrs_tbf_cli_rule_put(cli);
156         }
157         LASSERT(cli->tc_rule == NULL);
158         LASSERT(list_empty(&cli->tc_linkage));
159         /* Rule's ref is added before called */
160         cli->tc_rule = rule;
161         spin_lock(&rule->tr_rule_lock);
162         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
163         spin_unlock(&rule->tr_rule_lock);
164         spin_unlock(&cli->tc_rule_lock);
165         nrs_tbf_cli_reset_value(head, cli);
166 }
167
168 static int
169 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
170 {
171         return rule->tr_head->th_ops->o_rule_dump(rule, m);
172 }
173
174 static int
175 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
176 {
177         struct nrs_tbf_rule *rule;
178         int rc = 0;
179
180         LASSERT(head != NULL);
181         spin_lock(&head->th_rule_lock);
182         /* List the rules from newest to oldest */
183         list_for_each_entry(rule, &head->th_list, tr_linkage) {
184                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
185                 rc = nrs_tbf_rule_dump(rule, m);
186                 if (rc) {
187                         rc = -ENOSPC;
188                         break;
189                 }
190         }
191         spin_unlock(&head->th_rule_lock);
192
193         return rc;
194 }
195
196 static struct nrs_tbf_rule *
197 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
198                          const char *name)
199 {
200         struct nrs_tbf_rule *rule;
201
202         LASSERT(head != NULL);
203         list_for_each_entry(rule, &head->th_list, tr_linkage) {
204                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
205                 if (strcmp(rule->tr_name, name) == 0) {
206                         nrs_tbf_rule_get(rule);
207                         return rule;
208                 }
209         }
210         return NULL;
211 }
212
213 static struct nrs_tbf_rule *
214 nrs_tbf_rule_find(struct nrs_tbf_head *head,
215                   const char *name)
216 {
217         struct nrs_tbf_rule *rule;
218
219         LASSERT(head != NULL);
220         spin_lock(&head->th_rule_lock);
221         rule = nrs_tbf_rule_find_nolock(head, name);
222         spin_unlock(&head->th_rule_lock);
223         return rule;
224 }
225
226 static struct nrs_tbf_rule *
227 nrs_tbf_rule_match(struct nrs_tbf_head *head,
228                    struct nrs_tbf_client *cli)
229 {
230         struct nrs_tbf_rule *rule = NULL;
231         struct nrs_tbf_rule *tmp_rule;
232
233         spin_lock(&head->th_rule_lock);
234         /* Match the newest rule in the list */
235         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
236                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
237                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
238                         rule = tmp_rule;
239                         break;
240                 }
241         }
242
243         if (rule == NULL)
244                 rule = head->th_rule;
245
246         nrs_tbf_rule_get(rule);
247         spin_unlock(&head->th_rule_lock);
248         return rule;
249 }
250
251 static void
252 nrs_tbf_cli_init(struct nrs_tbf_head *head,
253                  struct nrs_tbf_client *cli,
254                  struct ptlrpc_request *req)
255 {
256         struct nrs_tbf_rule *rule;
257
258         memset(cli, 0, sizeof(*cli));
259         cli->tc_in_heap = false;
260         head->th_ops->o_cli_init(cli, req);
261         INIT_LIST_HEAD(&cli->tc_list);
262         INIT_LIST_HEAD(&cli->tc_linkage);
263         spin_lock_init(&cli->tc_rule_lock);
264         atomic_set(&cli->tc_ref, 1);
265         rule = nrs_tbf_rule_match(head, cli);
266         nrs_tbf_cli_reset(head, rule, cli);
267 }
268
269 static void
270 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
271 {
272         LASSERT(list_empty(&cli->tc_list));
273         LASSERT(!cli->tc_in_heap);
274         LASSERT(atomic_read(&cli->tc_ref) == 0);
275         spin_lock(&cli->tc_rule_lock);
276         nrs_tbf_cli_rule_put(cli);
277         spin_unlock(&cli->tc_rule_lock);
278         OBD_FREE_PTR(cli);
279 }
280
281 static int
282 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
283                    struct nrs_tbf_head *head,
284                    struct nrs_tbf_cmd *start)
285 {
286         struct nrs_tbf_rule     *rule;
287         struct nrs_tbf_rule     *tmp_rule;
288         struct nrs_tbf_rule     *next_rule;
289         char                    *next_name = start->u.tc_start.ts_next_name;
290         int                      rc;
291
292         rule = nrs_tbf_rule_find(head, start->tc_name);
293         if (rule) {
294                 nrs_tbf_rule_put(rule);
295                 return -EEXIST;
296         }
297
298         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
299         if (rule == NULL)
300                 return -ENOMEM;
301
302         memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
303         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
304         rule->tr_flags = start->u.tc_start.ts_rule_flags;
305         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
306         rule->tr_depth = tbf_depth;
307         atomic_set(&rule->tr_ref, 1);
308         INIT_LIST_HEAD(&rule->tr_cli_list);
309         INIT_LIST_HEAD(&rule->tr_nids);
310         INIT_LIST_HEAD(&rule->tr_linkage);
311         spin_lock_init(&rule->tr_rule_lock);
312         rule->tr_head = head;
313
314         rc = head->th_ops->o_rule_init(policy, rule, start);
315         if (rc) {
316                 OBD_FREE_PTR(rule);
317                 return rc;
318         }
319
320         /* Add as the newest rule */
321         spin_lock(&head->th_rule_lock);
322         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
323         if (tmp_rule) {
324                 spin_unlock(&head->th_rule_lock);
325                 nrs_tbf_rule_put(tmp_rule);
326                 nrs_tbf_rule_put(rule);
327                 return -EEXIST;
328         }
329
330         if (next_name) {
331                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
332                 if (!next_rule) {
333                         spin_unlock(&head->th_rule_lock);
334                         nrs_tbf_rule_put(rule);
335                         return -ENOENT;
336                 }
337
338                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
339                 nrs_tbf_rule_put(next_rule);
340         } else {
341                 /* Add on the top of the rule list */
342                 list_add(&rule->tr_linkage, &head->th_list);
343         }
344         spin_unlock(&head->th_rule_lock);
345         atomic_inc(&head->th_rule_sequence);
346         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
347                 rule->tr_flags |= NTRS_DEFAULT;
348                 LASSERT(head->th_rule == NULL);
349                 head->th_rule = rule;
350         }
351
352         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %u gen %llu\n",
353                rule, rule->tr_rpc_rate, rule->tr_generation);
354
355         return 0;
356 }
357
358 /**
359  * Change the rank of a rule in the rule list
360  *
361  * The matched rule will be moved to the position right before another
362  * given rule.
363  *
364  * \param[in] policy    the policy instance
365  * \param[in] head      the TBF policy instance
366  * \param[in] name      the rule name to be moved
367  * \param[in] next_name the rule name before which the matched rule will be
368  *                      moved
369  *
370  */
371 static int
372 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
373                          struct nrs_tbf_head *head,
374                          char *name,
375                          char *next_name)
376 {
377         struct nrs_tbf_rule     *rule = NULL;
378         struct nrs_tbf_rule     *next_rule = NULL;
379         int                      rc = 0;
380
381         LASSERT(head != NULL);
382
383         spin_lock(&head->th_rule_lock);
384         rule = nrs_tbf_rule_find_nolock(head, name);
385         if (!rule)
386                 GOTO(out, rc = -ENOENT);
387
388         if (strcmp(name, next_name) == 0)
389                 GOTO(out_put, rc);
390
391         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
392         if (!next_rule)
393                 GOTO(out_put, rc = -ENOENT);
394
395         list_move(&rule->tr_linkage, next_rule->tr_linkage.prev);
396         nrs_tbf_rule_put(next_rule);
397 out_put:
398         nrs_tbf_rule_put(rule);
399 out:
400         spin_unlock(&head->th_rule_lock);
401         return rc;
402 }
403
404 static int
405 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
406                          struct nrs_tbf_head *head,
407                          char *name,
408                          __u64 rate)
409 {
410         struct nrs_tbf_rule *rule;
411
412         assert_spin_locked(&policy->pol_nrs->nrs_lock);
413
414         rule = nrs_tbf_rule_find(head, name);
415         if (rule == NULL)
416                 return -ENOENT;
417
418         rule->tr_rpc_rate = rate;
419         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
420         rule->tr_generation++;
421         nrs_tbf_rule_put(rule);
422
423         return 0;
424 }
425
426 static int
427 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
428                     struct nrs_tbf_head *head,
429                     struct nrs_tbf_cmd *change)
430 {
431         __u64    rate = change->u.tc_change.tc_rpc_rate;
432         char    *next_name = change->u.tc_change.tc_next_name;
433         int      rc;
434
435         if (rate != 0) {
436                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
437                                               rate);
438                 if (rc)
439                         return rc;
440         }
441
442         if (next_name) {
443                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
444                                               next_name);
445                 if (rc)
446                         return rc;
447         }
448
449         return 0;
450 }
451
452 static int
453 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
454                   struct nrs_tbf_head *head,
455                   struct nrs_tbf_cmd *stop)
456 {
457         struct nrs_tbf_rule *rule;
458
459         assert_spin_locked(&policy->pol_nrs->nrs_lock);
460
461         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
462                 return -EPERM;
463
464         rule = nrs_tbf_rule_find(head, stop->tc_name);
465         if (rule == NULL)
466                 return -ENOENT;
467
468         list_del_init(&rule->tr_linkage);
469         rule->tr_flags |= NTRS_STOPPING;
470         nrs_tbf_rule_put(rule);
471         nrs_tbf_rule_put(rule);
472
473         return 0;
474 }
475
476 static int
477 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
478                 struct nrs_tbf_head *head,
479                 struct nrs_tbf_cmd *cmd)
480 {
481         int rc;
482
483         assert_spin_locked(&policy->pol_nrs->nrs_lock);
484
485         switch (cmd->tc_cmd) {
486         case NRS_CTL_TBF_START_RULE:
487                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
488                         return -EINVAL;
489
490                 spin_unlock(&policy->pol_nrs->nrs_lock);
491                 rc = nrs_tbf_rule_start(policy, head, cmd);
492                 spin_lock(&policy->pol_nrs->nrs_lock);
493                 return rc;
494         case NRS_CTL_TBF_CHANGE_RULE:
495                 rc = nrs_tbf_rule_change(policy, head, cmd);
496                 return rc;
497         case NRS_CTL_TBF_STOP_RULE:
498                 rc = nrs_tbf_rule_stop(policy, head, cmd);
499                 /* Take it as a success, if not exists at all */
500                 return rc == -ENOENT ? 0 : rc;
501         default:
502                 return -EFAULT;
503         }
504 }
505
506 /**
507  * Binary heap predicate.
508  *
509  * \param[in] e1 the first binheap node to compare
510  * \param[in] e2 the second binheap node to compare
511  *
512  * \retval 0 e1 > e2
513  * \retval 1 e1 < e2
514  */
515 static int
516 tbf_cli_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
517 {
518         struct nrs_tbf_client *cli1;
519         struct nrs_tbf_client *cli2;
520
521         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
522         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
523
524         if (cli1->tc_deadline < cli2->tc_deadline)
525                 return 1;
526         else if (cli1->tc_deadline > cli2->tc_deadline)
527                 return 0;
528
529         if (cli1->tc_check_time < cli2->tc_check_time)
530                 return 1;
531         else if (cli1->tc_check_time > cli2->tc_check_time)
532                 return 0;
533
534         /* Maybe need more comparasion, e.g. request number in the rules */
535         return 1;
536 }
537
538 /**
539  * TBF binary heap operations
540  */
541 static struct cfs_binheap_ops nrs_tbf_heap_ops = {
542         .hop_enter      = NULL,
543         .hop_exit       = NULL,
544         .hop_compare    = tbf_cli_compare,
545 };
546
547 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
548                                   unsigned mask)
549 {
550         return cfs_hash_djb2_hash(key, strlen(key), mask);
551 }
552
553 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
554 {
555         struct nrs_tbf_client *cli = hlist_entry(hnode,
556                                                      struct nrs_tbf_client,
557                                                      tc_hnode);
558
559         return (strcmp(cli->tc_jobid, key) == 0);
560 }
561
562 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
563 {
564         struct nrs_tbf_client *cli = hlist_entry(hnode,
565                                                      struct nrs_tbf_client,
566                                                      tc_hnode);
567
568         return cli->tc_jobid;
569 }
570
571 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
572 {
573         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
574 }
575
576 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
577 {
578         struct nrs_tbf_client *cli = hlist_entry(hnode,
579                                                      struct nrs_tbf_client,
580                                                      tc_hnode);
581
582         atomic_inc(&cli->tc_ref);
583 }
584
585 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
586 {
587         struct nrs_tbf_client *cli = hlist_entry(hnode,
588                                                      struct nrs_tbf_client,
589                                                      tc_hnode);
590
591         atomic_dec(&cli->tc_ref);
592 }
593
594 static void
595 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
596
597 {
598         struct nrs_tbf_client *cli = hlist_entry(hnode,
599                                                  struct nrs_tbf_client,
600                                                  tc_hnode);
601
602         LASSERT(atomic_read(&cli->tc_ref) == 0);
603         nrs_tbf_cli_fini(cli);
604 }
605
606 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
607         .hs_hash        = nrs_tbf_jobid_hop_hash,
608         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
609         .hs_key         = nrs_tbf_jobid_hop_key,
610         .hs_object      = nrs_tbf_hop_object,
611         .hs_get         = nrs_tbf_jobid_hop_get,
612         .hs_put         = nrs_tbf_jobid_hop_put,
613         .hs_put_locked  = nrs_tbf_jobid_hop_put,
614         .hs_exit        = nrs_tbf_jobid_hop_exit,
615 };
616
617 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
618                                   CFS_HASH_NO_ITEMREF | \
619                                   CFS_HASH_DEPTH)
620
621 static struct nrs_tbf_client *
622 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
623                           struct cfs_hash_bd *bd,
624                           const char *jobid)
625 {
626         struct hlist_node *hnode;
627         struct nrs_tbf_client *cli;
628
629         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
630         if (hnode == NULL)
631                 return NULL;
632
633         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
634         if (!list_empty(&cli->tc_lru))
635                 list_del_init(&cli->tc_lru);
636         return cli;
637 }
638
639 #define NRS_TBF_JOBID_NULL ""
640
641 static struct nrs_tbf_client *
642 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
643                        struct ptlrpc_request *req)
644 {
645         const char              *jobid;
646         struct nrs_tbf_client   *cli;
647         struct cfs_hash         *hs = head->th_cli_hash;
648         struct cfs_hash_bd               bd;
649
650         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
651         if (jobid == NULL)
652                 jobid = NRS_TBF_JOBID_NULL;
653         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
654         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
655         cfs_hash_bd_unlock(hs, &bd, 1);
656
657         return cli;
658 }
659
660 static struct nrs_tbf_client *
661 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
662                           struct nrs_tbf_client *cli)
663 {
664         const char              *jobid;
665         struct nrs_tbf_client   *ret;
666         struct cfs_hash         *hs = head->th_cli_hash;
667         struct cfs_hash_bd               bd;
668
669         jobid = cli->tc_jobid;
670         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
671         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
672         if (ret == NULL) {
673                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
674                 ret = cli;
675         }
676         cfs_hash_bd_unlock(hs, &bd, 1);
677
678         return ret;
679 }
680
681 static void
682 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
683                       struct nrs_tbf_client *cli)
684 {
685         struct cfs_hash_bd               bd;
686         struct cfs_hash         *hs = head->th_cli_hash;
687         struct nrs_tbf_bucket   *bkt;
688         int                      hw;
689         LIST_HEAD(zombies);
690
691         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
692         bkt = cfs_hash_bd_extra_get(hs, &bd);
693         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
694                 return;
695         LASSERT(list_empty(&cli->tc_lru));
696         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
697
698         /*
699          * Check and purge the LRU, there is at least one client in the LRU.
700          */
701         hw = tbf_jobid_cache_size >>
702              (hs->hs_cur_bits - hs->hs_bkt_bits);
703         while (cfs_hash_bd_count_get(&bd) > hw) {
704                 if (unlikely(list_empty(&bkt->ntb_lru)))
705                         break;
706                 cli = list_entry(bkt->ntb_lru.next,
707                                      struct nrs_tbf_client,
708                                      tc_lru);
709                 LASSERT(atomic_read(&cli->tc_ref) == 0);
710                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
711                 list_move(&cli->tc_lru, &zombies);
712         }
713         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
714
715         while (!list_empty(&zombies)) {
716                 cli = container_of0(zombies.next,
717                                     struct nrs_tbf_client, tc_lru);
718                 list_del_init(&cli->tc_lru);
719                 nrs_tbf_cli_fini(cli);
720         }
721 }
722
723 static void
724 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
725                        struct ptlrpc_request *req)
726 {
727         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
728
729         if (jobid == NULL)
730                 jobid = NRS_TBF_JOBID_NULL;
731         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
732         INIT_LIST_HEAD(&cli->tc_lru);
733         memcpy(cli->tc_jobid, jobid, strlen(jobid));
734 }
735
736 static int nrs_tbf_jobid_hash_order(void)
737 {
738         int bits;
739
740         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
741                 ;
742
743         return bits;
744 }
745
746 #define NRS_TBF_JOBID_BKT_BITS 10
747
748 static int
749 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
750                       struct nrs_tbf_head *head)
751 {
752         struct nrs_tbf_cmd       start;
753         struct nrs_tbf_bucket   *bkt;
754         int                      bits;
755         int                      i;
756         int                      rc;
757         struct cfs_hash_bd       bd;
758
759         bits = nrs_tbf_jobid_hash_order();
760         if (bits < NRS_TBF_JOBID_BKT_BITS)
761                 bits = NRS_TBF_JOBID_BKT_BITS;
762         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
763                                             bits,
764                                             bits,
765                                             NRS_TBF_JOBID_BKT_BITS,
766                                             sizeof(*bkt),
767                                             0,
768                                             0,
769                                             &nrs_tbf_jobid_hash_ops,
770                                             NRS_TBF_JOBID_HASH_FLAGS);
771         if (head->th_cli_hash == NULL)
772                 return -ENOMEM;
773
774         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
775                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
776                 INIT_LIST_HEAD(&bkt->ntb_lru);
777         }
778
779         memset(&start, 0, sizeof(start));
780         start.u.tc_start.ts_jobids_str = "*";
781
782         start.u.tc_start.ts_rpc_rate = tbf_rate;
783         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
784         start.tc_name = NRS_TBF_DEFAULT_RULE;
785         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
786         rc = nrs_tbf_rule_start(policy, head, &start);
787         if (rc) {
788                 cfs_hash_putref(head->th_cli_hash);
789                 head->th_cli_hash = NULL;
790         }
791
792         return rc;
793 }
794
795 /**
796  * Frees jobid of \a list.
797  *
798  */
799 static void
800 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
801 {
802         struct nrs_tbf_jobid *jobid, *n;
803
804         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
805                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
806                 list_del(&jobid->tj_linkage);
807                 OBD_FREE_PTR(jobid);
808         }
809 }
810
811 static int
812 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
813 {
814         struct nrs_tbf_jobid *jobid;
815         char *ptr;
816
817         OBD_ALLOC_PTR(jobid);
818         if (jobid == NULL)
819                 return -ENOMEM;
820
821         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
822         if (jobid->tj_id == NULL) {
823                 OBD_FREE_PTR(jobid);
824                 return -ENOMEM;
825         }
826
827         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
828         ptr = lprocfs_strnstr(id->ls_str, "*", id->ls_len);
829         if (ptr == NULL)
830                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
831         else
832                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
833
834         list_add_tail(&jobid->tj_linkage, jobid_list);
835         return 0;
836 }
837
838 static bool
839 cfs_match_wildcard(const char *pattern, const char *content)
840 {
841         if (*pattern == '\0' && *content == '\0')
842                 return true;
843
844         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
845                 return false;
846
847         while (*pattern == *content) {
848                 pattern++;
849                 content++;
850                 if (*pattern == '\0' && *content == '\0')
851                         return true;
852
853                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
854                     *content == '\0')
855                         return false;
856         }
857
858         if (*pattern == '*')
859                 return (cfs_match_wildcard(pattern + 1, content) ||
860                         cfs_match_wildcard(pattern, content + 1));
861
862         return false;
863 }
864
865 static inline bool
866 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
867 {
868         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
869                 return strcmp(jobid->tj_id, id) == 0;
870
871         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
872                 return cfs_match_wildcard(jobid->tj_id, id);
873
874         return false;
875 }
876
877 static int
878 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
879 {
880         struct nrs_tbf_jobid *jobid;
881
882         list_for_each_entry(jobid, jobid_list, tj_linkage) {
883                 if (nrs_tbf_jobid_match(jobid, id))
884                         return 1;
885         }
886         return 0;
887 }
888
889 static int
890 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
891 {
892         struct cfs_lstr src;
893         struct cfs_lstr res;
894         int rc = 0;
895         ENTRY;
896
897         src.ls_str = str;
898         src.ls_len = len;
899         INIT_LIST_HEAD(jobid_list);
900         while (src.ls_str) {
901                 rc = cfs_gettok(&src, ' ', &res);
902                 if (rc == 0) {
903                         rc = -EINVAL;
904                         break;
905                 }
906                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
907                 if (rc)
908                         break;
909         }
910         if (rc)
911                 nrs_tbf_jobid_list_free(jobid_list);
912         RETURN(rc);
913 }
914
915 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
916 {
917         if (!list_empty(&cmd->u.tc_start.ts_jobids))
918                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
919         if (cmd->u.tc_start.ts_jobids_str)
920                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
921                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
922 }
923
924 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
925 {
926         struct cfs_lstr res;
927         int keylen = strlen(key);
928         int rc;
929
930         rc = cfs_gettok(src, '=', &res);
931         if (rc == 0 || res.ls_len != keylen ||
932             strncmp(res.ls_str, key, keylen) != 0 ||
933             src->ls_len <= 2 || src->ls_str[0] != '{' ||
934             src->ls_str[src->ls_len - 1] != '}')
935                 return -EINVAL;
936
937         /* Skip '{' and '}' */
938         src->ls_str++;
939         src->ls_len -= 2;
940         return 0;
941 }
942
943 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
944 {
945         struct cfs_lstr src;
946         int rc;
947
948         src.ls_str = id;
949         src.ls_len = strlen(id);
950         rc = nrs_tbf_check_id_value(&src, "jobid");
951         if (rc)
952                 return rc;
953
954         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
955         if (cmd->u.tc_start.ts_jobids_str == NULL)
956                 return -ENOMEM;
957
958         memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
959
960         /* parse jobid list */
961         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
962                                       strlen(cmd->u.tc_start.ts_jobids_str),
963                                       &cmd->u.tc_start.ts_jobids);
964         if (rc)
965                 nrs_tbf_jobid_cmd_fini(cmd);
966
967         return rc;
968 }
969
970 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
971                                    struct nrs_tbf_rule *rule,
972                                    struct nrs_tbf_cmd *start)
973 {
974         int rc = 0;
975
976         LASSERT(start->u.tc_start.ts_jobids_str);
977         OBD_ALLOC(rule->tr_jobids_str,
978                   strlen(start->u.tc_start.ts_jobids_str) + 1);
979         if (rule->tr_jobids_str == NULL)
980                 return -ENOMEM;
981
982         memcpy(rule->tr_jobids_str,
983                start->u.tc_start.ts_jobids_str,
984                strlen(start->u.tc_start.ts_jobids_str));
985
986         INIT_LIST_HEAD(&rule->tr_jobids);
987         if (!list_empty(&start->u.tc_start.ts_jobids)) {
988                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
989                                               strlen(rule->tr_jobids_str),
990                                               &rule->tr_jobids);
991                 if (rc)
992                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
993         }
994         if (rc)
995                 OBD_FREE(rule->tr_jobids_str,
996                          strlen(start->u.tc_start.ts_jobids_str) + 1);
997         return rc;
998 }
999
1000 static int
1001 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1002 {
1003         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
1004                    rule->tr_jobids_str, rule->tr_rpc_rate,
1005                    atomic_read(&rule->tr_ref) - 1);
1006         return 0;
1007 }
1008
1009 static int
1010 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1011                          struct nrs_tbf_client *cli)
1012 {
1013         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1014 }
1015
1016 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1017 {
1018         if (!list_empty(&rule->tr_jobids))
1019                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1020         LASSERT(rule->tr_jobids_str != NULL);
1021         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1022 }
1023
1024 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1025         .o_name = NRS_TBF_TYPE_JOBID,
1026         .o_startup = nrs_tbf_jobid_startup,
1027         .o_cli_find = nrs_tbf_jobid_cli_find,
1028         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1029         .o_cli_put = nrs_tbf_jobid_cli_put,
1030         .o_cli_init = nrs_tbf_jobid_cli_init,
1031         .o_rule_init = nrs_tbf_jobid_rule_init,
1032         .o_rule_dump = nrs_tbf_jobid_rule_dump,
1033         .o_rule_match = nrs_tbf_jobid_rule_match,
1034         .o_rule_fini = nrs_tbf_jobid_rule_fini,
1035 };
1036
1037 /**
1038  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1039  *
1040  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
1041  * nrs_tbf_client objects.
1042  */
1043 #define NRS_TBF_NID_BKT_BITS    8
1044 #define NRS_TBF_NID_BITS        16
1045
1046 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1047                                   unsigned mask)
1048 {
1049         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1050 }
1051
1052 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1053 {
1054         lnet_nid_t            *nid = (lnet_nid_t *)key;
1055         struct nrs_tbf_client *cli = hlist_entry(hnode,
1056                                                      struct nrs_tbf_client,
1057                                                      tc_hnode);
1058
1059         return *nid == cli->tc_nid;
1060 }
1061
1062 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1063 {
1064         struct nrs_tbf_client *cli = hlist_entry(hnode,
1065                                                      struct nrs_tbf_client,
1066                                                      tc_hnode);
1067
1068         return &cli->tc_nid;
1069 }
1070
1071 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1072 {
1073         struct nrs_tbf_client *cli = hlist_entry(hnode,
1074                                                      struct nrs_tbf_client,
1075                                                      tc_hnode);
1076
1077         atomic_inc(&cli->tc_ref);
1078 }
1079
1080 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1081 {
1082         struct nrs_tbf_client *cli = hlist_entry(hnode,
1083                                                      struct nrs_tbf_client,
1084                                                      tc_hnode);
1085
1086         atomic_dec(&cli->tc_ref);
1087 }
1088
1089 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1090 {
1091         struct nrs_tbf_client *cli = hlist_entry(hnode,
1092                                                      struct nrs_tbf_client,
1093                                                      tc_hnode);
1094
1095         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1096                  "Busy TBF object from client with NID %s, with %d refs\n",
1097                  libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1098
1099         nrs_tbf_cli_fini(cli);
1100 }
1101
1102 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1103         .hs_hash        = nrs_tbf_nid_hop_hash,
1104         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1105         .hs_key         = nrs_tbf_nid_hop_key,
1106         .hs_object      = nrs_tbf_hop_object,
1107         .hs_get         = nrs_tbf_nid_hop_get,
1108         .hs_put         = nrs_tbf_nid_hop_put,
1109         .hs_put_locked  = nrs_tbf_nid_hop_put,
1110         .hs_exit        = nrs_tbf_nid_hop_exit,
1111 };
1112
1113 static struct nrs_tbf_client *
1114 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1115                      struct ptlrpc_request *req)
1116 {
1117         return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1118 }
1119
1120 static struct nrs_tbf_client *
1121 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1122                         struct nrs_tbf_client *cli)
1123 {
1124         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1125                                        &cli->tc_hnode);
1126 }
1127
1128 static void
1129 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1130                       struct nrs_tbf_client *cli)
1131 {
1132         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1133 }
1134
1135 static int
1136 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1137                     struct nrs_tbf_head *head)
1138 {
1139         struct nrs_tbf_cmd      start;
1140         int rc;
1141
1142         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1143                                             NRS_TBF_NID_BITS,
1144                                             NRS_TBF_NID_BITS,
1145                                             NRS_TBF_NID_BKT_BITS, 0,
1146                                             CFS_HASH_MIN_THETA,
1147                                             CFS_HASH_MAX_THETA,
1148                                             &nrs_tbf_nid_hash_ops,
1149                                             CFS_HASH_RW_BKTLOCK);
1150         if (head->th_cli_hash == NULL)
1151                 return -ENOMEM;
1152
1153         memset(&start, 0, sizeof(start));
1154         start.u.tc_start.ts_nids_str = "*";
1155
1156         start.u.tc_start.ts_rpc_rate = tbf_rate;
1157         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1158         start.tc_name = NRS_TBF_DEFAULT_RULE;
1159         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1160         rc = nrs_tbf_rule_start(policy, head, &start);
1161         if (rc) {
1162                 cfs_hash_putref(head->th_cli_hash);
1163                 head->th_cli_hash = NULL;
1164         }
1165
1166         return rc;
1167 }
1168
1169 static void
1170 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1171                              struct ptlrpc_request *req)
1172 {
1173         cli->tc_nid = req->rq_peer.nid;
1174 }
1175
1176 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1177                                  struct nrs_tbf_rule *rule,
1178                                  struct nrs_tbf_cmd *start)
1179 {
1180         LASSERT(start->u.tc_start.ts_nids_str);
1181         OBD_ALLOC(rule->tr_nids_str,
1182                   strlen(start->u.tc_start.ts_nids_str) + 1);
1183         if (rule->tr_nids_str == NULL)
1184                 return -ENOMEM;
1185
1186         memcpy(rule->tr_nids_str,
1187                start->u.tc_start.ts_nids_str,
1188                strlen(start->u.tc_start.ts_nids_str));
1189
1190         INIT_LIST_HEAD(&rule->tr_nids);
1191         if (!list_empty(&start->u.tc_start.ts_nids)) {
1192                 if (cfs_parse_nidlist(rule->tr_nids_str,
1193                                       strlen(rule->tr_nids_str),
1194                                       &rule->tr_nids) <= 0) {
1195                         CERROR("nids {%s} illegal\n",
1196                                rule->tr_nids_str);
1197                         OBD_FREE(rule->tr_nids_str,
1198                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1199                         return -EINVAL;
1200                 }
1201         }
1202         return 0;
1203 }
1204
1205 static int
1206 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1207 {
1208         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
1209                    rule->tr_nids_str, rule->tr_rpc_rate,
1210                    atomic_read(&rule->tr_ref) - 1);
1211         return 0;
1212 }
1213
1214 static int
1215 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1216                        struct nrs_tbf_client *cli)
1217 {
1218         return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1219 }
1220
1221 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1222 {
1223         if (!list_empty(&rule->tr_nids))
1224                 cfs_free_nidlist(&rule->tr_nids);
1225         LASSERT(rule->tr_nids_str != NULL);
1226         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1227 }
1228
1229 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1230 {
1231         if (!list_empty(&cmd->u.tc_start.ts_nids))
1232                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1233         if (cmd->u.tc_start.ts_nids_str)
1234                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1235                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1236 }
1237
1238 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1239 {
1240         struct cfs_lstr src;
1241         int rc;
1242
1243         src.ls_str = id;
1244         src.ls_len = strlen(id);
1245         rc = nrs_tbf_check_id_value(&src, "nid");
1246         if (rc)
1247                 return rc;
1248
1249         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1250         if (cmd->u.tc_start.ts_nids_str == NULL)
1251                 return -ENOMEM;
1252
1253         memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1254
1255         /* parse NID list */
1256         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1257                               strlen(cmd->u.tc_start.ts_nids_str),
1258                               &cmd->u.tc_start.ts_nids) <= 0) {
1259                 nrs_tbf_nid_cmd_fini(cmd);
1260                 return -EINVAL;
1261         }
1262
1263         return 0;
1264 }
1265
1266 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1267         .o_name = NRS_TBF_TYPE_NID,
1268         .o_startup = nrs_tbf_nid_startup,
1269         .o_cli_find = nrs_tbf_nid_cli_find,
1270         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1271         .o_cli_put = nrs_tbf_nid_cli_put,
1272         .o_cli_init = nrs_tbf_nid_cli_init,
1273         .o_rule_init = nrs_tbf_nid_rule_init,
1274         .o_rule_dump = nrs_tbf_nid_rule_dump,
1275         .o_rule_match = nrs_tbf_nid_rule_match,
1276         .o_rule_fini = nrs_tbf_nid_rule_fini,
1277 };
1278
1279 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1280                                  unsigned mask)
1281 {
1282         return cfs_hash_djb2_hash(key, strlen(key), mask);
1283 }
1284
1285 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1286 {
1287         struct nrs_tbf_client *cli = hlist_entry(hnode,
1288                                                  struct nrs_tbf_client,
1289                                                  tc_hnode);
1290
1291         return (strcmp(cli->tc_key, key) == 0);
1292 }
1293
1294 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1295 {
1296         struct nrs_tbf_client *cli = hlist_entry(hnode,
1297                                                  struct nrs_tbf_client,
1298                                                  tc_hnode);
1299         return cli->tc_key;
1300 }
1301
1302 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1303 {
1304         struct nrs_tbf_client *cli = hlist_entry(hnode,
1305                                                  struct nrs_tbf_client,
1306                                                  tc_hnode);
1307
1308         atomic_inc(&cli->tc_ref);
1309 }
1310
1311 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1312 {
1313         struct nrs_tbf_client *cli = hlist_entry(hnode,
1314                                                  struct nrs_tbf_client,
1315                                                  tc_hnode);
1316
1317         atomic_dec(&cli->tc_ref);
1318 }
1319
1320 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1321
1322 {
1323         struct nrs_tbf_client *cli = hlist_entry(hnode,
1324                                                  struct nrs_tbf_client,
1325                                                  tc_hnode);
1326
1327         LASSERT(atomic_read(&cli->tc_ref) == 0);
1328         nrs_tbf_cli_fini(cli);
1329 }
1330
1331 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1332         .hs_hash        = nrs_tbf_hop_hash,
1333         .hs_keycmp      = nrs_tbf_hop_keycmp,
1334         .hs_key         = nrs_tbf_hop_key,
1335         .hs_object      = nrs_tbf_hop_object,
1336         .hs_get         = nrs_tbf_hop_get,
1337         .hs_put         = nrs_tbf_hop_put,
1338         .hs_put_locked  = nrs_tbf_hop_put,
1339         .hs_exit        = nrs_tbf_hop_exit,
1340 };
1341
1342 #define NRS_TBF_GENERIC_BKT_BITS        10
1343 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1344                                         CFS_HASH_NO_ITEMREF | \
1345                                         CFS_HASH_DEPTH)
1346
1347 static int
1348 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1349 {
1350         struct nrs_tbf_cmd       start;
1351         struct nrs_tbf_bucket   *bkt;
1352         int                      bits;
1353         int                      i;
1354         int                      rc;
1355         struct cfs_hash_bd       bd;
1356
1357         bits = nrs_tbf_jobid_hash_order();
1358         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1359                 bits = NRS_TBF_GENERIC_BKT_BITS;
1360         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1361                                             bits, bits,
1362                                             NRS_TBF_GENERIC_BKT_BITS,
1363                                             sizeof(*bkt), 0, 0,
1364                                             &nrs_tbf_hash_ops,
1365                                             NRS_TBF_GENERIC_HASH_FLAGS);
1366         if (head->th_cli_hash == NULL)
1367                 return -ENOMEM;
1368
1369         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1370                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1371                 INIT_LIST_HEAD(&bkt->ntb_lru);
1372         }
1373
1374         memset(&start, 0, sizeof(start));
1375         start.u.tc_start.ts_conds_str = "*";
1376
1377         start.u.tc_start.ts_rpc_rate = tbf_rate;
1378         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1379         start.tc_name = NRS_TBF_DEFAULT_RULE;
1380         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1381         rc = nrs_tbf_rule_start(policy, head, &start);
1382         if (rc)
1383                 cfs_hash_putref(head->th_cli_hash);
1384
1385         return rc;
1386 }
1387
1388 static struct nrs_tbf_client *
1389 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1390                         const char *key)
1391 {
1392         struct hlist_node *hnode;
1393         struct nrs_tbf_client *cli;
1394
1395         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1396         if (hnode == NULL)
1397                 return NULL;
1398
1399         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
1400         if (!list_empty(&cli->tc_lru))
1401                 list_del_init(&cli->tc_lru);
1402         return cli;
1403 }
1404
1405 /**
1406  * ONLY opcode presented in this function will be checked in
1407  * nrs_tbf_id_cli_set(). That means, we can add or remove an
1408  * opcode to enable or disable requests handled in nrs_tbf
1409  */
1410 static struct req_format *req_fmt(__u32 opcode)
1411 {
1412         switch (opcode) {
1413         case OST_GETATTR:
1414                 return &RQF_OST_GETATTR;
1415         case OST_SETATTR:
1416                 return &RQF_OST_SETATTR;
1417         case OST_READ:
1418                 return &RQF_OST_BRW_READ;
1419         case OST_WRITE:
1420                 return &RQF_OST_BRW_WRITE;
1421         /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1422          * in most case. Should they be removed? */
1423         case OST_CREATE:
1424                 return &RQF_OST_CREATE;
1425         case OST_DESTROY:
1426                 return &RQF_OST_DESTROY;
1427         case OST_PUNCH:
1428                 return &RQF_OST_PUNCH;
1429         case OST_SYNC:
1430                 return &RQF_OST_SYNC;
1431         case OST_LADVISE:
1432                 return &RQF_OST_LADVISE;
1433         case MDS_GETATTR:
1434                 return &RQF_MDS_GETATTR;
1435         case MDS_GETATTR_NAME:
1436                 return &RQF_MDS_GETATTR_NAME;
1437         /* close is skipped to avoid LDLM cancel slowness */
1438 #if 0
1439         case MDS_CLOSE:
1440                 return &RQF_MDS_CLOSE;
1441 #endif
1442         case MDS_REINT:
1443                 return &RQF_MDS_REINT;
1444         case MDS_READPAGE:
1445                 return &RQF_MDS_READPAGE;
1446         case MDS_GET_ROOT:
1447                 return &RQF_MDS_GET_ROOT;
1448         case MDS_STATFS:
1449                 return &RQF_MDS_STATFS;
1450         case MDS_SYNC:
1451                 return &RQF_MDS_SYNC;
1452         case MDS_QUOTACTL:
1453                 return &RQF_MDS_QUOTACTL;
1454         case MDS_GETXATTR:
1455                 return &RQF_MDS_GETXATTR;
1456         case MDS_GET_INFO:
1457                 return &RQF_MDS_GET_INFO;
1458         /* HSM op is skipped */
1459 #if 0 
1460         case MDS_HSM_STATE_GET:
1461                 return &RQF_MDS_HSM_STATE_GET;
1462         case MDS_HSM_STATE_SET:
1463                 return &RQF_MDS_HSM_STATE_SET;
1464         case MDS_HSM_ACTION:
1465                 return &RQF_MDS_HSM_ACTION;
1466         case MDS_HSM_CT_REGISTER:
1467                 return &RQF_MDS_HSM_CT_REGISTER;
1468         case MDS_HSM_CT_UNREGISTER:
1469                 return &RQF_MDS_HSM_CT_UNREGISTER;
1470 #endif
1471         case MDS_SWAP_LAYOUTS:
1472                 return &RQF_MDS_SWAP_LAYOUTS;
1473         case LDLM_ENQUEUE:
1474                 return &RQF_LDLM_ENQUEUE;
1475         default:
1476                 return NULL;
1477         }
1478 }
1479
1480 static struct req_format *intent_req_fmt(__u32 it_opc)
1481 {
1482         if (it_opc & (IT_OPEN | IT_CREAT))
1483                 return &RQF_LDLM_INTENT_OPEN;
1484         else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1485                 return &RQF_LDLM_INTENT_GETATTR;
1486         else if (it_opc & IT_GETXATTR)
1487                 return &RQF_LDLM_INTENT_GETXATTR;
1488         else if (it_opc & (IT_GLIMPSE | IT_BRW))
1489                 return &RQF_LDLM_INTENT;
1490         else
1491                 return NULL;
1492 }
1493
1494 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1495                               struct tbf_id *id)
1496 {
1497         struct ost_body *body;
1498
1499         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1500         if (body != NULL) {
1501                 id->ti_uid = body->oa.o_uid;
1502                 id->ti_gid = body->oa.o_gid;
1503                 return 0;
1504         }
1505
1506         return -EINVAL;
1507 }
1508
1509 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1510                                       struct tbf_id *id)
1511 {
1512         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1513                                                     &RMF_MDT_BODY);
1514         LASSERT(b != NULL);
1515
1516         /* TODO: nodemaping feature converts {ug}id from individual
1517          * clients to the actual ones of the file system. Some work
1518          * may be needed to fix this. */
1519         id->ti_uid = b->mbo_uid;
1520         id->ti_gid = b->mbo_gid;
1521 }
1522
1523 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1524                                            struct tbf_id *id)
1525 {
1526         struct mdt_rec_reint *rec;
1527
1528         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1529         LASSERT(rec != NULL);
1530
1531         /* use the fs{ug}id as {ug}id of the process */
1532         id->ti_uid = rec->rr_fsuid;
1533         id->ti_gid = rec->rr_fsgid;
1534 }
1535
1536 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1537                               struct tbf_id *id)
1538 {
1539         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1540         int rc = 0;
1541
1542         switch (opc) {
1543         case MDS_GETATTR:
1544         case MDS_GETATTR_NAME:
1545         case MDS_GET_ROOT:
1546         case MDS_READPAGE:
1547         case MDS_SYNC:
1548         case MDS_GETXATTR:
1549         case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1550                 unpack_ugid_from_mdt_body(req, id);
1551                 break;
1552         case MDS_CLOSE:
1553         case MDS_REINT:
1554                 unpack_ugid_from_mdt_rec_reint(req, id);
1555                 break;
1556         default:
1557                 rc = -EINVAL;
1558                 break;
1559         }
1560         return rc;
1561 }
1562
1563 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1564                               struct tbf_id *id)
1565 {
1566         struct ldlm_intent *lit;
1567         struct req_format *fmt;
1568
1569         if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1570                 return -EINVAL;
1571
1572         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1573         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1574         if (lit == NULL)
1575                 return -EINVAL;
1576
1577         fmt = intent_req_fmt(lit->opc);
1578         if (fmt == NULL)
1579                 return -EINVAL;
1580
1581         req_capsule_extend(&req->rq_pill, fmt);
1582
1583         if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1584                 unpack_ugid_from_mdt_body(req, id);
1585         else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1586                 unpack_ugid_from_mdt_rec_reint(req, id);
1587         else
1588                 return -EINVAL;
1589         return 0;
1590 }
1591
1592 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1593                               enum nrs_tbf_flag ti_type)
1594 {
1595         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1596         struct req_format *fmt = req_fmt(opc);
1597         bool fmt_unset = false;
1598         int rc;
1599
1600         memset(id, 0, sizeof(struct tbf_id));
1601         id->ti_type = ti_type;
1602
1603         if (fmt == NULL)
1604                 return -EINVAL;
1605         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1606         if (req->rq_pill.rc_fmt == NULL) {
1607                 req_capsule_set(&req->rq_pill, fmt);
1608                 fmt_unset = true;
1609         }
1610
1611         if (opc < OST_LAST_OPC)
1612                 rc = ost_tbf_id_cli_set(req, id);
1613         else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1614                 rc = mdt_tbf_id_cli_set(req, id);
1615         else if (opc == LDLM_ENQUEUE)
1616                 rc = ldlm_tbf_id_cli_set(req, id);
1617         else
1618                 rc = -EINVAL;
1619
1620         /* restore it to the initialized state */
1621         if (fmt_unset)
1622                 req->rq_pill.rc_fmt = NULL;
1623         return rc;
1624 }
1625
1626 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1627                                        struct ptlrpc_request *req,
1628                                        char *keystr, size_t keystr_sz)
1629 {
1630         const char *jobid;
1631         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1632         struct tbf_id id;
1633
1634         nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1635         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1636         if (jobid == NULL)
1637                 jobid = NRS_TBF_JOBID_NULL;
1638
1639         snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1640                  libcfs_nid2str(req->rq_peer.nid), opc, id.ti_uid,
1641                  id.ti_gid);
1642
1643         if (cli) {
1644                 INIT_LIST_HEAD(&cli->tc_lru);
1645                 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1646                 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1647                 cli->tc_nid = req->rq_peer.nid;
1648                 cli->tc_opcode = opc;
1649                 cli->tc_id = id;
1650         }
1651 }
1652
1653 static struct nrs_tbf_client *
1654 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1655 {
1656         struct nrs_tbf_client *cli;
1657         struct cfs_hash *hs = head->th_cli_hash;
1658         struct cfs_hash_bd bd;
1659         char keystr[NRS_TBF_KEY_LEN];
1660
1661         nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1662         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1663         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1664         cfs_hash_bd_unlock(hs, &bd, 1);
1665
1666         return cli;
1667 }
1668
1669 static struct nrs_tbf_client *
1670 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1671                     struct nrs_tbf_client *cli)
1672 {
1673         const char              *key;
1674         struct nrs_tbf_client   *ret;
1675         struct cfs_hash         *hs = head->th_cli_hash;
1676         struct cfs_hash_bd       bd;
1677
1678         key = cli->tc_key;
1679         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1680         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1681         if (ret == NULL) {
1682                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1683                 ret = cli;
1684         }
1685         cfs_hash_bd_unlock(hs, &bd, 1);
1686
1687         return ret;
1688 }
1689
1690 static void
1691 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1692 {
1693         struct cfs_hash_bd       bd;
1694         struct cfs_hash         *hs = head->th_cli_hash;
1695         struct nrs_tbf_bucket   *bkt;
1696         int                      hw;
1697         LIST_HEAD(zombies);
1698
1699         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1700         bkt = cfs_hash_bd_extra_get(hs, &bd);
1701         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1702                 return;
1703         LASSERT(list_empty(&cli->tc_lru));
1704         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1705
1706         /**
1707          * Check and purge the LRU, there is at least one client in the LRU.
1708          */
1709         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1710         while (cfs_hash_bd_count_get(&bd) > hw) {
1711                 if (unlikely(list_empty(&bkt->ntb_lru)))
1712                         break;
1713                 cli = list_entry(bkt->ntb_lru.next,
1714                                  struct nrs_tbf_client,
1715                                  tc_lru);
1716                 LASSERT(atomic_read(&cli->tc_ref) == 0);
1717                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1718                 list_move(&cli->tc_lru, &zombies);
1719         }
1720         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1721
1722         while (!list_empty(&zombies)) {
1723                 cli = container_of0(zombies.next,
1724                                     struct nrs_tbf_client, tc_lru);
1725                 list_del_init(&cli->tc_lru);
1726                 nrs_tbf_cli_fini(cli);
1727         }
1728 }
1729
1730 static void
1731 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1732                          struct ptlrpc_request *req)
1733 {
1734         char keystr[NRS_TBF_KEY_LEN];
1735
1736         nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1737 }
1738
1739 static void
1740 nrs_tbf_id_list_free(struct list_head *uid_list)
1741 {
1742         struct nrs_tbf_id *nti_id, *n;
1743
1744         list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1745                 list_del_init(&nti_id->nti_linkage);
1746                 OBD_FREE_PTR(nti_id);
1747         }
1748 }
1749
1750 static void
1751 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1752 {
1753         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1754                 expr->te_field < NRS_TBF_FIELD_MAX);
1755         switch (expr->te_field) {
1756         case NRS_TBF_FIELD_NID:
1757                 cfs_free_nidlist(&expr->te_cond);
1758                 break;
1759         case NRS_TBF_FIELD_JOBID:
1760                 nrs_tbf_jobid_list_free(&expr->te_cond);
1761                 break;
1762         case NRS_TBF_FIELD_OPCODE:
1763                 CFS_FREE_BITMAP(expr->te_opcodes);
1764                 break;
1765         case NRS_TBF_FIELD_UID:
1766         case NRS_TBF_FIELD_GID:
1767                 nrs_tbf_id_list_free(&expr->te_cond);
1768                 break;
1769         default:
1770                 LBUG();
1771         }
1772         OBD_FREE_PTR(expr);
1773 }
1774
1775 static void
1776 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1777 {
1778         struct nrs_tbf_expression *expression;
1779         struct nrs_tbf_expression *n;
1780
1781         LASSERT(list_empty(&conjunction->tc_linkage));
1782         list_for_each_entry_safe(expression, n,
1783                                  &conjunction->tc_expressions,
1784                                  te_linkage) {
1785                 list_del_init(&expression->te_linkage);
1786                 nrs_tbf_expression_free(expression);
1787         }
1788         OBD_FREE_PTR(conjunction);
1789 }
1790
1791 static void
1792 nrs_tbf_conds_free(struct list_head *cond_list)
1793 {
1794         struct nrs_tbf_conjunction *conjunction;
1795         struct nrs_tbf_conjunction *n;
1796
1797         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1798                 list_del_init(&conjunction->tc_linkage);
1799                 nrs_tbf_conjunction_free(conjunction);
1800         }
1801 }
1802
1803 static void
1804 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1805 {
1806         if (!list_empty(&cmd->u.tc_start.ts_conds))
1807                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1808         if (cmd->u.tc_start.ts_conds_str)
1809                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1810                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1811 }
1812
1813 #define NRS_TBF_DISJUNCTION_DELIM       (',')
1814 #define NRS_TBF_CONJUNCTION_DELIM       ('&')
1815 #define NRS_TBF_EXPRESSION_DELIM        ('=')
1816
1817 static inline bool
1818 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1819 {
1820         int len = strlen(str);
1821
1822         return (field->ls_len == len &&
1823                 strncmp(field->ls_str, str, len) == 0);
1824 }
1825
1826 static int
1827 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1828 static int
1829 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
1830                       enum nrs_tbf_flag tif);
1831
1832 static int
1833 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1834 {
1835         struct nrs_tbf_expression *expr;
1836         struct cfs_lstr field;
1837         int rc = 0;
1838
1839         OBD_ALLOC_PTR(expr);
1840         if (expr == NULL)
1841                 return -ENOMEM;
1842
1843         rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1844         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1845             src->ls_str[src->ls_len - 1] != '}')
1846                 GOTO(out, rc = -EINVAL);
1847
1848         /* Skip '{' and '}' */
1849         src->ls_str++;
1850         src->ls_len -= 2;
1851
1852         if (nrs_tbf_check_field(&field, "nid")) {
1853                 if (cfs_parse_nidlist(src->ls_str,
1854                                       src->ls_len,
1855                                       &expr->te_cond) <= 0)
1856                         GOTO(out, rc = -EINVAL);
1857                 expr->te_field = NRS_TBF_FIELD_NID;
1858         } else if (nrs_tbf_check_field(&field, "jobid")) {
1859                 if (nrs_tbf_jobid_list_parse(src->ls_str,
1860                                              src->ls_len,
1861                                              &expr->te_cond) < 0)
1862                         GOTO(out, rc = -EINVAL);
1863                 expr->te_field = NRS_TBF_FIELD_JOBID;
1864         } else if (nrs_tbf_check_field(&field, "opcode")) {
1865                 if (nrs_tbf_opcode_list_parse(src->ls_str,
1866                                               src->ls_len,
1867                                               &expr->te_opcodes) < 0)
1868                         GOTO(out, rc = -EINVAL);
1869                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1870         } else if (nrs_tbf_check_field(&field, "uid")) {
1871                 if (nrs_tbf_id_list_parse(src->ls_str,
1872                                           src->ls_len,
1873                                           &expr->te_cond,
1874                                           NRS_TBF_FLAG_UID) < 0)
1875                         GOTO(out, rc = -EINVAL);
1876                 expr->te_field = NRS_TBF_FIELD_UID;
1877         } else if (nrs_tbf_check_field(&field, "gid")) {
1878                 if (nrs_tbf_id_list_parse(src->ls_str,
1879                                           src->ls_len,
1880                                           &expr->te_cond,
1881                                           NRS_TBF_FLAG_GID) < 0)
1882                         GOTO(out, rc = -EINVAL);
1883                 expr->te_field = NRS_TBF_FIELD_GID;
1884         } else {
1885                 GOTO(out, rc = -EINVAL);
1886         }
1887
1888         list_add_tail(&expr->te_linkage, cond_list);
1889         return 0;
1890 out:
1891         OBD_FREE_PTR(expr);
1892         return rc;
1893 }
1894
1895 static int
1896 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1897 {
1898         struct nrs_tbf_conjunction *conjunction;
1899         struct cfs_lstr expr;
1900         int rc = 0;
1901
1902         OBD_ALLOC_PTR(conjunction);
1903         if (conjunction == NULL)
1904                 return -ENOMEM;
1905
1906         INIT_LIST_HEAD(&conjunction->tc_expressions);
1907         list_add_tail(&conjunction->tc_linkage, cond_list);
1908
1909         while (src->ls_str) {
1910                 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1911                 if (rc == 0) {
1912                         rc = -EINVAL;
1913                         break;
1914                 }
1915                 rc = nrs_tbf_expression_parse(&expr,
1916                                               &conjunction->tc_expressions);
1917                 if (rc)
1918                         break;
1919         }
1920         return rc;
1921 }
1922
1923 static int
1924 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1925 {
1926         struct cfs_lstr src;
1927         struct cfs_lstr res;
1928         int rc = 0;
1929
1930         src.ls_str = str;
1931         src.ls_len = len;
1932         INIT_LIST_HEAD(cond_list);
1933         while (src.ls_str) {
1934                 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1935                 if (rc == 0) {
1936                         rc = -EINVAL;
1937                         break;
1938                 }
1939                 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1940                 if (rc)
1941                         break;
1942         }
1943         return rc;
1944 }
1945
1946 static int
1947 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1948 {
1949         int rc;
1950
1951         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1952         if (cmd->u.tc_start.ts_conds_str == NULL)
1953                 return -ENOMEM;
1954
1955         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1956
1957         /* Parse hybird NID and JOBID conditions */
1958         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1959                                  strlen(cmd->u.tc_start.ts_conds_str),
1960                                  &cmd->u.tc_start.ts_conds);
1961         if (rc)
1962                 nrs_tbf_generic_cmd_fini(cmd);
1963
1964         return rc;
1965 }
1966
1967 static int
1968 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1969
1970 static int
1971 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1972                          struct nrs_tbf_rule *rule,
1973                          struct nrs_tbf_client *cli)
1974 {
1975         switch (expr->te_field) {
1976         case NRS_TBF_FIELD_NID:
1977                 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1978         case NRS_TBF_FIELD_JOBID:
1979                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1980         case NRS_TBF_FIELD_OPCODE:
1981                 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1982         case NRS_TBF_FIELD_UID:
1983         case NRS_TBF_FIELD_GID:
1984                 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1985         default:
1986                 return 0;
1987         }
1988 }
1989
1990 static int
1991 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1992                           struct nrs_tbf_rule *rule,
1993                           struct nrs_tbf_client *cli)
1994 {
1995         struct nrs_tbf_expression *expr;
1996         int matched;
1997
1998         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
1999                 matched = nrs_tbf_expression_match(expr, rule, cli);
2000                 if (!matched)
2001                         return 0;
2002         }
2003
2004         return 1;
2005 }
2006
2007 static int
2008 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
2009 {
2010         struct nrs_tbf_conjunction *conjunction;
2011         int matched;
2012
2013         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2014                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2015                 if (matched)
2016                         return 1;
2017         }
2018
2019         return 0;
2020 }
2021
2022 static void
2023 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2024 {
2025         if (!list_empty(&rule->tr_conds))
2026                 nrs_tbf_conds_free(&rule->tr_conds);
2027         LASSERT(rule->tr_conds_str != NULL);
2028         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2029 }
2030
2031 static int
2032 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2033                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2034 {
2035         int rc = 0;
2036
2037         LASSERT(start->u.tc_start.ts_conds_str);
2038         OBD_ALLOC(rule->tr_conds_str,
2039                   strlen(start->u.tc_start.ts_conds_str) + 1);
2040         if (rule->tr_conds_str == NULL)
2041                 return -ENOMEM;
2042
2043         memcpy(rule->tr_conds_str,
2044                start->u.tc_start.ts_conds_str,
2045                strlen(start->u.tc_start.ts_conds_str));
2046
2047         INIT_LIST_HEAD(&rule->tr_conds);
2048         if (!list_empty(&start->u.tc_start.ts_conds)) {
2049                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2050                                          strlen(rule->tr_conds_str),
2051                                          &rule->tr_conds);
2052         }
2053         if (rc)
2054                 nrs_tbf_generic_rule_fini(rule);
2055
2056         return rc;
2057 }
2058
2059 static int
2060 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2061 {
2062         seq_printf(m, "%s %s %u, ref %d\n", rule->tr_name,
2063                    rule->tr_conds_str, rule->tr_rpc_rate,
2064                    atomic_read(&rule->tr_ref) - 1);
2065         return 0;
2066 }
2067
2068 static int
2069 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2070                            struct nrs_tbf_client *cli)
2071 {
2072         return nrs_tbf_cond_match(rule, cli);
2073 }
2074
2075 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2076         .o_name = NRS_TBF_TYPE_GENERIC,
2077         .o_startup = nrs_tbf_startup,
2078         .o_cli_find = nrs_tbf_cli_find,
2079         .o_cli_findadd = nrs_tbf_cli_findadd,
2080         .o_cli_put = nrs_tbf_cli_put,
2081         .o_cli_init = nrs_tbf_generic_cli_init,
2082         .o_rule_init = nrs_tbf_rule_init,
2083         .o_rule_dump = nrs_tbf_generic_rule_dump,
2084         .o_rule_match = nrs_tbf_generic_rule_match,
2085         .o_rule_fini = nrs_tbf_generic_rule_fini,
2086 };
2087
2088 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2089 {
2090         if (rule->tr_opcodes != NULL)
2091                 CFS_FREE_BITMAP(rule->tr_opcodes);
2092
2093         LASSERT(rule->tr_opcodes_str != NULL);
2094         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2095 }
2096
2097 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2098                                         unsigned mask)
2099 {
2100         return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2101 }
2102
2103 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2104 {
2105         const __u32     *opc = key;
2106         struct nrs_tbf_client *cli = hlist_entry(hnode,
2107                                                  struct nrs_tbf_client,
2108                                                  tc_hnode);
2109
2110         return *opc == cli->tc_opcode;
2111 }
2112
2113 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2114 {
2115         struct nrs_tbf_client *cli = hlist_entry(hnode,
2116                                                  struct nrs_tbf_client,
2117                                                  tc_hnode);
2118
2119         return &cli->tc_opcode;
2120 }
2121
2122 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2123                                    struct hlist_node *hnode)
2124 {
2125         struct nrs_tbf_client *cli = hlist_entry(hnode,
2126                                                  struct nrs_tbf_client,
2127                                                  tc_hnode);
2128
2129         atomic_inc(&cli->tc_ref);
2130 }
2131
2132 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2133                                    struct hlist_node *hnode)
2134 {
2135         struct nrs_tbf_client *cli = hlist_entry(hnode,
2136                                                  struct nrs_tbf_client,
2137                                                  tc_hnode);
2138
2139         atomic_dec(&cli->tc_ref);
2140 }
2141
2142 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2143                                     struct hlist_node *hnode)
2144 {
2145         struct nrs_tbf_client *cli = hlist_entry(hnode,
2146                                                  struct nrs_tbf_client,
2147                                                  tc_hnode);
2148
2149         LASSERTF(atomic_read(&cli->tc_ref) == 0,
2150                  "Busy TBF object from client with opcode %s, with %d refs\n",
2151                  ll_opcode2str(cli->tc_opcode),
2152                  atomic_read(&cli->tc_ref));
2153
2154         nrs_tbf_cli_fini(cli);
2155 }
2156 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2157         .hs_hash        = nrs_tbf_opcode_hop_hash,
2158         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
2159         .hs_key         = nrs_tbf_opcode_hop_key,
2160         .hs_object      = nrs_tbf_hop_object,
2161         .hs_get         = nrs_tbf_opcode_hop_get,
2162         .hs_put         = nrs_tbf_opcode_hop_put,
2163         .hs_put_locked  = nrs_tbf_opcode_hop_put,
2164         .hs_exit        = nrs_tbf_opcode_hop_exit,
2165 };
2166
2167 static int
2168 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2169                     struct nrs_tbf_head *head)
2170 {
2171         struct nrs_tbf_cmd      start = { 0 };
2172         int rc;
2173
2174         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2175                                             NRS_TBF_NID_BITS,
2176                                             NRS_TBF_NID_BITS,
2177                                             NRS_TBF_NID_BKT_BITS, 0,
2178                                             CFS_HASH_MIN_THETA,
2179                                             CFS_HASH_MAX_THETA,
2180                                             &nrs_tbf_opcode_hash_ops,
2181                                             CFS_HASH_RW_BKTLOCK);
2182         if (head->th_cli_hash == NULL)
2183                 return -ENOMEM;
2184
2185         start.u.tc_start.ts_opcodes = NULL;
2186         start.u.tc_start.ts_opcodes_str = "*";
2187
2188         start.u.tc_start.ts_rpc_rate = tbf_rate;
2189         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2190         start.tc_name = NRS_TBF_DEFAULT_RULE;
2191         rc = nrs_tbf_rule_start(policy, head, &start);
2192
2193         return rc;
2194 }
2195
2196 static struct nrs_tbf_client *
2197 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2198                         struct ptlrpc_request *req)
2199 {
2200         __u32 opc;
2201
2202         opc = lustre_msg_get_opc(req->rq_reqmsg);
2203         return cfs_hash_lookup(head->th_cli_hash, &opc);
2204 }
2205
2206 static struct nrs_tbf_client *
2207 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2208                            struct nrs_tbf_client *cli)
2209 {
2210         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2211                                        &cli->tc_hnode);
2212 }
2213
2214 static void
2215 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2216                         struct ptlrpc_request *req)
2217 {
2218         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2219 }
2220
2221 #define MAX_OPCODE_LEN  32
2222 static int
2223 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
2224 {
2225         int     op = 0;
2226         char    opcode_str[MAX_OPCODE_LEN];
2227
2228         if (id->ls_len + 1 > MAX_OPCODE_LEN)
2229                 return -EINVAL;
2230
2231         memcpy(opcode_str, id->ls_str, id->ls_len);
2232         opcode_str[id->ls_len] = '\0';
2233
2234         op = ll_str2opcode(opcode_str);
2235         if (op < 0)
2236                 return -EINVAL;
2237
2238         cfs_bitmap_set(opcodes, op);
2239         return 0;
2240 }
2241
2242 static int
2243 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
2244 {
2245         struct cfs_bitmap *opcodes;
2246         struct cfs_lstr src;
2247         struct cfs_lstr res;
2248         int rc = 0;
2249         ENTRY;
2250
2251         opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
2252         if (opcodes == NULL)
2253                 return -ENOMEM;
2254
2255         src.ls_str = str;
2256         src.ls_len = len;
2257         while (src.ls_str) {
2258                 rc = cfs_gettok(&src, ' ', &res);
2259                 if (rc == 0) {
2260                         rc = -EINVAL;
2261                         break;
2262                 }
2263                 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2264                 if (rc)
2265                         break;
2266         }
2267
2268         if (rc == 0)
2269                 *bitmaptr = opcodes;
2270         else
2271                 CFS_FREE_BITMAP(opcodes);
2272
2273         RETURN(rc);
2274 }
2275
2276 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2277 {
2278         if (cmd->u.tc_start.ts_opcodes)
2279                 CFS_FREE_BITMAP(cmd->u.tc_start.ts_opcodes);
2280
2281         if (cmd->u.tc_start.ts_opcodes_str)
2282                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2283                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2284
2285 }
2286
2287 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2288 {
2289         struct cfs_lstr src;
2290         int rc;
2291
2292         src.ls_str = id;
2293         src.ls_len = strlen(id);
2294         rc = nrs_tbf_check_id_value(&src, "opcode");
2295         if (rc)
2296                 return rc;
2297
2298         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2299         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2300                 return -ENOMEM;
2301
2302         memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2303
2304         /* parse opcode list */
2305         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2306                                        strlen(cmd->u.tc_start.ts_opcodes_str),
2307                                        &cmd->u.tc_start.ts_opcodes);
2308         if (rc)
2309                 nrs_tbf_opcode_cmd_fini(cmd);
2310
2311         return rc;
2312 }
2313
2314 static int
2315 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2316                           struct nrs_tbf_client *cli)
2317 {
2318         if (rule->tr_opcodes == NULL)
2319                 return 0;
2320
2321         return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2322 }
2323
2324 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2325                                     struct nrs_tbf_rule *rule,
2326                                     struct nrs_tbf_cmd *start)
2327 {
2328         int rc = 0;
2329
2330         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2331         OBD_ALLOC(rule->tr_opcodes_str,
2332                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2333         if (rule->tr_opcodes_str == NULL)
2334                 return -ENOMEM;
2335
2336         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2337                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2338
2339         /* Default rule '*' */
2340         if (start->u.tc_start.ts_opcodes == NULL)
2341                 return 0;
2342
2343         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2344                                        strlen(rule->tr_opcodes_str),
2345                                        &rule->tr_opcodes);
2346         if (rc)
2347                 OBD_FREE(rule->tr_opcodes_str,
2348                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2349
2350         return rc;
2351 }
2352
2353 static int
2354 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2355 {
2356         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
2357                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2358                    atomic_read(&rule->tr_ref) - 1);
2359         return 0;
2360 }
2361
2362
2363 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2364         .o_name = NRS_TBF_TYPE_OPCODE,
2365         .o_startup = nrs_tbf_opcode_startup,
2366         .o_cli_find = nrs_tbf_opcode_cli_find,
2367         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2368         .o_cli_put = nrs_tbf_nid_cli_put,
2369         .o_cli_init = nrs_tbf_opcode_cli_init,
2370         .o_rule_init = nrs_tbf_opcode_rule_init,
2371         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2372         .o_rule_match = nrs_tbf_opcode_rule_match,
2373         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2374 };
2375
2376 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2377                                     unsigned mask)
2378 {
2379         return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2380 }
2381
2382 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2383 {
2384         const struct tbf_id *opc = key;
2385         enum nrs_tbf_flag ntf;
2386         struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2387                                                  tc_hnode);
2388         ntf = opc->ti_type & cli->tc_id.ti_type;
2389         if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2390                 return 0;
2391
2392         if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2393                 return 0;
2394
2395         return 1;
2396 }
2397
2398 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2399 {
2400         struct nrs_tbf_client *cli = hlist_entry(hnode,
2401                                                  struct nrs_tbf_client,
2402                                                  tc_hnode);
2403         return &cli->tc_id;
2404 }
2405
2406 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2407 {
2408         struct nrs_tbf_client *cli = hlist_entry(hnode,
2409                                                  struct nrs_tbf_client,
2410                                                  tc_hnode);
2411
2412         atomic_inc(&cli->tc_ref);
2413 }
2414
2415 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2416 {
2417         struct nrs_tbf_client *cli = hlist_entry(hnode,
2418                                                  struct nrs_tbf_client,
2419                                                  tc_hnode);
2420
2421         atomic_dec(&cli->tc_ref);
2422 }
2423
2424 static void
2425 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2426
2427 {
2428         struct nrs_tbf_client *cli = hlist_entry(hnode,
2429                                                  struct nrs_tbf_client,
2430                                                  tc_hnode);
2431
2432         LASSERT(atomic_read(&cli->tc_ref) == 0);
2433         nrs_tbf_cli_fini(cli);
2434 }
2435
2436 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2437         .hs_hash        = nrs_tbf_id_hop_hash,
2438         .hs_keycmp      = nrs_tbf_id_hop_keycmp,
2439         .hs_key         = nrs_tbf_id_hop_key,
2440         .hs_object      = nrs_tbf_hop_object,
2441         .hs_get         = nrs_tbf_id_hop_get,
2442         .hs_put         = nrs_tbf_id_hop_put,
2443         .hs_put_locked  = nrs_tbf_id_hop_put,
2444         .hs_exit        = nrs_tbf_id_hop_exit,
2445 };
2446
2447 static int
2448 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2449                    struct nrs_tbf_head *head)
2450 {
2451         struct nrs_tbf_cmd start;
2452         int rc;
2453
2454         head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2455                                             NRS_TBF_NID_BITS,
2456                                             NRS_TBF_NID_BITS,
2457                                             NRS_TBF_NID_BKT_BITS, 0,
2458                                             CFS_HASH_MIN_THETA,
2459                                             CFS_HASH_MAX_THETA,
2460                                             &nrs_tbf_id_hash_ops,
2461                                             CFS_HASH_RW_BKTLOCK);
2462         if (head->th_cli_hash == NULL)
2463                 return -ENOMEM;
2464
2465         memset(&start, 0, sizeof(start));
2466         start.u.tc_start.ts_ids_str = "*";
2467         start.u.tc_start.ts_rpc_rate = tbf_rate;
2468         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2469         start.tc_name = NRS_TBF_DEFAULT_RULE;
2470         INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2471         rc = nrs_tbf_rule_start(policy, head, &start);
2472         if (rc) {
2473                 cfs_hash_putref(head->th_cli_hash);
2474                 head->th_cli_hash = NULL;
2475         }
2476
2477         return rc;
2478 }
2479
2480 static struct nrs_tbf_client *
2481 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2482                     struct ptlrpc_request *req)
2483 {
2484         struct tbf_id id;
2485
2486         LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2487                 head->th_type_flag == NRS_TBF_FLAG_GID);
2488
2489         nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2490         return cfs_hash_lookup(head->th_cli_hash, &id);
2491 }
2492
2493 static struct nrs_tbf_client *
2494 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2495                        struct nrs_tbf_client *cli)
2496 {
2497         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2498                                        &cli->tc_hnode);
2499 }
2500
2501 static void
2502 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2503                      struct ptlrpc_request *req)
2504 {
2505         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2506 }
2507
2508 static void
2509 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2510                      struct ptlrpc_request *req)
2511 {
2512         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2513 }
2514
2515 static int
2516 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2517 {
2518         struct nrs_tbf_id *nti_id;
2519         enum nrs_tbf_flag flag;
2520
2521         list_for_each_entry(nti_id, id_list, nti_linkage) {
2522                 flag = id.ti_type & nti_id->nti_id.ti_type;
2523                 if (!flag)
2524                         continue;
2525
2526                 if ((flag & NRS_TBF_FLAG_UID) &&
2527                     (id.ti_uid != nti_id->nti_id.ti_uid))
2528                         continue;
2529
2530                 if ((flag & NRS_TBF_FLAG_GID) &&
2531                     (id.ti_gid != nti_id->nti_id.ti_gid))
2532                         continue;
2533
2534                 return 1;
2535         }
2536         return 0;
2537 }
2538
2539 static int
2540 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2541                       struct nrs_tbf_client *cli)
2542 {
2543         return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2544 }
2545
2546 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2547 {
2548         nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2549
2550         if (cmd->u.tc_start.ts_ids_str)
2551                 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2552                          strlen(cmd->u.tc_start.ts_ids_str) + 1);
2553 }
2554
2555 static int
2556 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
2557                       enum nrs_tbf_flag tif)
2558 {
2559         struct cfs_lstr src;
2560         struct cfs_lstr res;
2561         int rc = 0;
2562         struct tbf_id id = { 0 };
2563         ENTRY;
2564
2565         if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2566                 RETURN(-EINVAL);
2567
2568         src.ls_str = str;
2569         src.ls_len = len;
2570         INIT_LIST_HEAD(id_list);
2571         while (src.ls_str) {
2572                 struct nrs_tbf_id *nti_id;
2573
2574                 if (cfs_gettok(&src, ' ', &res) == 0)
2575                         GOTO(out, rc = -EINVAL);
2576
2577                 id.ti_type = tif;
2578                 if (tif == NRS_TBF_FLAG_UID) {
2579                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2580                                                &id.ti_uid, 0, (u32)~0U))
2581                                 GOTO(out, rc = -EINVAL);
2582                 } else {
2583                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2584                                                &id.ti_gid, 0, (u32)~0U))
2585                                 GOTO(out, rc = -EINVAL);
2586                 }
2587
2588                 OBD_ALLOC_PTR(nti_id);
2589                 if (nti_id == NULL)
2590                         GOTO(out, rc = -ENOMEM);
2591
2592                 nti_id->nti_id = id;
2593                 list_add_tail(&nti_id->nti_linkage, id_list);
2594         }
2595 out:
2596         if (rc)
2597                 nrs_tbf_id_list_free(id_list);
2598         RETURN(rc);
2599 }
2600
2601 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2602 {
2603         struct cfs_lstr src;
2604         int rc;
2605         enum nrs_tbf_flag tif;
2606
2607         tif = cmd->u.tc_start.ts_valid_type;
2608
2609         src.ls_str = id;
2610         src.ls_len = strlen(id);
2611
2612         rc = nrs_tbf_check_id_value(&src,
2613                                     tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2614         if (rc)
2615                 return rc;
2616
2617         OBD_ALLOC(cmd->u.tc_start.ts_ids_str, src.ls_len + 1);
2618         if (cmd->u.tc_start.ts_ids_str == NULL)
2619                 return -ENOMEM;
2620
2621         strlcpy(cmd->u.tc_start.ts_ids_str, src.ls_str, src.ls_len + 1);
2622
2623         rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2624                                    strlen(cmd->u.tc_start.ts_ids_str),
2625                                    &cmd->u.tc_start.ts_ids, tif);
2626         if (rc)
2627                 nrs_tbf_id_cmd_fini(cmd);
2628
2629         return rc;
2630 }
2631
2632 static int
2633 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2634                      struct nrs_tbf_rule *rule,
2635                      struct nrs_tbf_cmd *start)
2636 {
2637         struct nrs_tbf_head *head = rule->tr_head;
2638         int rc = 0;
2639         enum nrs_tbf_flag tif = head->th_type_flag;
2640         int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2641
2642         LASSERT(start->u.tc_start.ts_ids_str);
2643         INIT_LIST_HEAD(&rule->tr_ids);
2644
2645         OBD_ALLOC(rule->tr_ids_str, ids_len);
2646         if (rule->tr_ids_str == NULL)
2647                 return -ENOMEM;
2648
2649         strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2650                 ids_len);
2651
2652         if (!list_empty(&start->u.tc_start.ts_ids)) {
2653                 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2654                                            strlen(rule->tr_ids_str),
2655                                            &rule->tr_ids, tif);
2656                 if (rc)
2657                         CERROR("%ss {%s} illegal\n",
2658                                tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2659                                rule->tr_ids_str);
2660         }
2661         if (rc) {
2662                 OBD_FREE(rule->tr_ids_str, ids_len);
2663                 rule->tr_ids_str = NULL;
2664         }
2665         return rc;
2666 }
2667
2668 static int
2669 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2670 {
2671         seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
2672                    rule->tr_ids_str, rule->tr_rpc_rate,
2673                    atomic_read(&rule->tr_ref) - 1);
2674         return 0;
2675 }
2676
2677 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2678 {
2679         nrs_tbf_id_list_free(&rule->tr_ids);
2680         if (rule->tr_ids_str != NULL)
2681                 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2682 }
2683
2684 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2685         .o_name = NRS_TBF_TYPE_UID,
2686         .o_startup = nrs_tbf_id_startup,
2687         .o_cli_find = nrs_tbf_id_cli_find,
2688         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2689         .o_cli_put = nrs_tbf_nid_cli_put,
2690         .o_cli_init = nrs_tbf_uid_cli_init,
2691         .o_rule_init = nrs_tbf_id_rule_init,
2692         .o_rule_dump = nrs_tbf_id_rule_dump,
2693         .o_rule_match = nrs_tbf_id_rule_match,
2694         .o_rule_fini = nrs_tbf_id_rule_fini,
2695 };
2696
2697 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2698         .o_name = NRS_TBF_TYPE_GID,
2699         .o_startup = nrs_tbf_id_startup,
2700         .o_cli_find = nrs_tbf_id_cli_find,
2701         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2702         .o_cli_put = nrs_tbf_nid_cli_put,
2703         .o_cli_init = nrs_tbf_gid_cli_init,
2704         .o_rule_init = nrs_tbf_id_rule_init,
2705         .o_rule_dump = nrs_tbf_id_rule_dump,
2706         .o_rule_match = nrs_tbf_id_rule_match,
2707         .o_rule_fini = nrs_tbf_id_rule_fini,
2708 };
2709
2710 static struct nrs_tbf_type nrs_tbf_types[] = {
2711         {
2712                 .ntt_name = NRS_TBF_TYPE_JOBID,
2713                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2714                 .ntt_ops = &nrs_tbf_jobid_ops,
2715         },
2716         {
2717                 .ntt_name = NRS_TBF_TYPE_NID,
2718                 .ntt_flag = NRS_TBF_FLAG_NID,
2719                 .ntt_ops = &nrs_tbf_nid_ops,
2720         },
2721         {
2722                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2723                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2724                 .ntt_ops = &nrs_tbf_opcode_ops,
2725         },
2726         {
2727                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2728                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2729                 .ntt_ops = &nrs_tbf_generic_ops,
2730         },
2731         {
2732                 .ntt_name = NRS_TBF_TYPE_UID,
2733                 .ntt_flag = NRS_TBF_FLAG_UID,
2734                 .ntt_ops = &nrs_tbf_uid_ops,
2735         },
2736         {
2737                 .ntt_name = NRS_TBF_TYPE_GID,
2738                 .ntt_flag = NRS_TBF_FLAG_GID,
2739                 .ntt_ops = &nrs_tbf_gid_ops,
2740         },
2741 };
2742
2743 /**
2744  * Is called before the policy transitions into
2745  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2746  * policy-specific private data structure.
2747  *
2748  * \param[in] policy The policy to start
2749  *
2750  * \retval -ENOMEM OOM error
2751  * \retval  0      success
2752  *
2753  * \see nrs_policy_register()
2754  * \see nrs_policy_ctl()
2755  */
2756 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2757 {
2758         struct nrs_tbf_head     *head;
2759         struct nrs_tbf_ops      *ops;
2760         __u32                    type;
2761         char                    *name;
2762         int found = 0;
2763         int i;
2764         int rc = 0;
2765
2766         if (arg == NULL)
2767                 name = NRS_TBF_TYPE_GENERIC;
2768         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2769                 name = arg;
2770         else
2771                 GOTO(out, rc = -EINVAL);
2772
2773         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2774                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2775                         ops = nrs_tbf_types[i].ntt_ops;
2776                         type = nrs_tbf_types[i].ntt_flag;
2777                         found = 1;
2778                         break;
2779                 }
2780         }
2781         if (found == 0)
2782                 GOTO(out, rc = -ENOTSUPP);
2783
2784         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2785         if (head == NULL)
2786                 GOTO(out, rc = -ENOMEM);
2787
2788         memcpy(head->th_type, name, strlen(name));
2789         head->th_type[strlen(name)] = '\0';
2790         head->th_ops = ops;
2791         head->th_type_flag = type;
2792
2793         head->th_binheap = cfs_binheap_create(&nrs_tbf_heap_ops,
2794                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2795                                               nrs_pol2cptab(policy),
2796                                               nrs_pol2cptid(policy));
2797         if (head->th_binheap == NULL)
2798                 GOTO(out_free_head, rc = -ENOMEM);
2799
2800         atomic_set(&head->th_rule_sequence, 0);
2801         spin_lock_init(&head->th_rule_lock);
2802         INIT_LIST_HEAD(&head->th_list);
2803         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2804         head->th_timer.function = nrs_tbf_timer_cb;
2805         rc = head->th_ops->o_startup(policy, head);
2806         if (rc)
2807                 GOTO(out_free_heap, rc);
2808
2809         policy->pol_private = head;
2810         return 0;
2811 out_free_heap:
2812         cfs_binheap_destroy(head->th_binheap);
2813 out_free_head:
2814         OBD_FREE_PTR(head);
2815 out:
2816         return rc;
2817 }
2818
2819 /**
2820  * Is called before the policy transitions into
2821  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2822  * private data structure.
2823  *
2824  * \param[in] policy The policy to stop
2825  *
2826  * \see nrs_policy_stop0()
2827  */
2828 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2829 {
2830         struct nrs_tbf_head *head = policy->pol_private;
2831         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2832         struct nrs_tbf_rule *rule, *n;
2833
2834         LASSERT(head != NULL);
2835         LASSERT(head->th_cli_hash != NULL);
2836         hrtimer_cancel(&head->th_timer);
2837         /* Should cleanup hash first before free rules */
2838         cfs_hash_putref(head->th_cli_hash);
2839         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2840                 list_del_init(&rule->tr_linkage);
2841                 nrs_tbf_rule_put(rule);
2842         }
2843         LASSERT(list_empty(&head->th_list));
2844         LASSERT(head->th_binheap != NULL);
2845         LASSERT(cfs_binheap_is_empty(head->th_binheap));
2846         cfs_binheap_destroy(head->th_binheap);
2847         OBD_FREE_PTR(head);
2848         nrs->nrs_throttling = 0;
2849         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2850 }
2851
2852 /**
2853  * Performs a policy-specific ctl function on TBF policy instances; similar
2854  * to ioctl.
2855  *
2856  * \param[in]     policy the policy instance
2857  * \param[in]     opc    the opcode
2858  * \param[in,out] arg    used for passing parameters and information
2859  *
2860  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2861  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2862  *
2863  * \retval 0   operation carried out successfully
2864  * \retval -ve error
2865  */
2866 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2867                        enum ptlrpc_nrs_ctl opc,
2868                        void *arg)
2869 {
2870         int rc = 0;
2871         ENTRY;
2872
2873         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2874
2875         switch ((enum nrs_ctl_tbf)opc) {
2876         default:
2877                 RETURN(-EINVAL);
2878
2879         /**
2880          * Read RPC rate size of a policy instance.
2881          */
2882         case NRS_CTL_TBF_RD_RULE: {
2883                 struct nrs_tbf_head *head = policy->pol_private;
2884                 struct seq_file *m = arg;
2885                 struct ptlrpc_service_part *svcpt;
2886
2887                 svcpt = policy->pol_nrs->nrs_svcpt;
2888                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2889
2890                 rc = nrs_tbf_rule_dump_all(head, m);
2891                 }
2892                 break;
2893
2894         /**
2895          * Write RPC rate of a policy instance.
2896          */
2897         case NRS_CTL_TBF_WR_RULE: {
2898                 struct nrs_tbf_head *head = policy->pol_private;
2899                 struct nrs_tbf_cmd *cmd;
2900
2901                 cmd = (struct nrs_tbf_cmd *)arg;
2902                 rc = nrs_tbf_command(policy,
2903                                      head,
2904                                      cmd);
2905                 }
2906                 break;
2907         /**
2908          * Read the TBF policy type of a policy instance.
2909          */
2910         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2911                 struct nrs_tbf_head *head = policy->pol_private;
2912
2913                 *(__u32 *)arg = head->th_type_flag;
2914                 }
2915                 break;
2916         }
2917
2918         RETURN(rc);
2919 }
2920
2921 /**
2922  * Is called for obtaining a TBF policy resource.
2923  *
2924  * \param[in]  policy     The policy on which the request is being asked for
2925  * \param[in]  nrq        The request for which resources are being taken
2926  * \param[in]  parent     Parent resource, unused in this policy
2927  * \param[out] resp       Resources references are placed in this array
2928  * \param[in]  moving_req Signifies limited caller context; unused in this
2929  *                        policy
2930  *
2931  *
2932  * \see nrs_resource_get_safe()
2933  */
2934 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2935                            struct ptlrpc_nrs_request *nrq,
2936                            const struct ptlrpc_nrs_resource *parent,
2937                            struct ptlrpc_nrs_resource **resp,
2938                            bool moving_req)
2939 {
2940         struct nrs_tbf_head   *head;
2941         struct nrs_tbf_client *cli;
2942         struct nrs_tbf_client *tmp;
2943         struct ptlrpc_request *req;
2944
2945         if (parent == NULL) {
2946                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2947                 return 0;
2948         }
2949
2950         head = container_of(parent, struct nrs_tbf_head, th_res);
2951         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2952         cli = head->th_ops->o_cli_find(head, req);
2953         if (cli != NULL) {
2954                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2955                 LASSERT(cli->tc_rule);
2956                 if (cli->tc_rule_sequence !=
2957                     atomic_read(&head->th_rule_sequence) ||
2958                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2959                         struct nrs_tbf_rule *rule;
2960
2961                         CDEBUG(D_RPCTRACE,
2962                                "TBF class@%p rate %u sequence %d, "
2963                                "rule flags %d, head sequence %d\n",
2964                                cli, cli->tc_rpc_rate,
2965                                cli->tc_rule_sequence,
2966                                cli->tc_rule->tr_flags,
2967                                atomic_read(&head->th_rule_sequence));
2968                         rule = nrs_tbf_rule_match(head, cli);
2969                         if (rule != cli->tc_rule) {
2970                                 nrs_tbf_cli_reset(head, rule, cli);
2971                         } else {
2972                                 if (cli->tc_rule_generation != rule->tr_generation)
2973                                         nrs_tbf_cli_reset_value(head, cli);
2974                                 nrs_tbf_rule_put(rule);
2975                         }
2976                 } else if (cli->tc_rule_generation !=
2977                            cli->tc_rule->tr_generation) {
2978                         nrs_tbf_cli_reset_value(head, cli);
2979                 }
2980                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2981                 goto out;
2982         }
2983
2984         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2985                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2986         if (cli == NULL)
2987                 return -ENOMEM;
2988
2989         nrs_tbf_cli_init(head, cli, req);
2990         tmp = head->th_ops->o_cli_findadd(head, cli);
2991         if (tmp != cli) {
2992                 atomic_dec(&cli->tc_ref);
2993                 nrs_tbf_cli_fini(cli);
2994                 cli = tmp;
2995         }
2996 out:
2997         *resp = &cli->tc_res;
2998
2999         return 1;
3000 }
3001
3002 /**
3003  * Called when releasing references to the resource hierachy obtained for a
3004  * request for scheduling using the TBF policy.
3005  *
3006  * \param[in] policy   the policy the resource belongs to
3007  * \param[in] res      the resource to be released
3008  */
3009 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
3010                             const struct ptlrpc_nrs_resource *res)
3011 {
3012         struct nrs_tbf_head   *head;
3013         struct nrs_tbf_client *cli;
3014
3015         /**
3016          * Do nothing for freeing parent, nrs_tbf_net resources
3017          */
3018         if (res->res_parent == NULL)
3019                 return;
3020
3021         cli = container_of(res, struct nrs_tbf_client, tc_res);
3022         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
3023
3024         head->th_ops->o_cli_put(head, cli);
3025 }
3026
3027 /**
3028  * Called when getting a request from the TBF policy for handling, or just
3029  * peeking; removes the request from the policy when it is to be handled.
3030  *
3031  * \param[in] policy The policy
3032  * \param[in] peek   When set, signifies that we just want to examine the
3033  *                   request, and not handle it, so the request is not removed
3034  *                   from the policy.
3035  * \param[in] force  Force the policy to return a request; unused in this
3036  *                   policy
3037  *
3038  * \retval The request to be handled; this is the next request in the TBF
3039  *         rule
3040  *
3041  * \see ptlrpc_nrs_req_get_nolock()
3042  * \see nrs_request_get()
3043  */
3044 static
3045 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3046                                            bool peek, bool force)
3047 {
3048         struct nrs_tbf_head       *head = policy->pol_private;
3049         struct ptlrpc_nrs_request *nrq = NULL;
3050         struct nrs_tbf_client     *cli;
3051         struct cfs_binheap_node   *node;
3052
3053         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3054
3055         if (!peek && policy->pol_nrs->nrs_throttling)
3056                 return NULL;
3057
3058         node = cfs_binheap_root(head->th_binheap);
3059         if (unlikely(node == NULL))
3060                 return NULL;
3061
3062         cli = container_of(node, struct nrs_tbf_client, tc_node);
3063         LASSERT(cli->tc_in_heap);
3064         if (peek) {
3065                 nrq = list_entry(cli->tc_list.next,
3066                                      struct ptlrpc_nrs_request,
3067                                      nr_u.tbf.tr_list);
3068         } else {
3069                 struct nrs_tbf_rule *rule = cli->tc_rule;
3070                 __u64 now = ktime_to_ns(ktime_get());
3071                 __u64 passed;
3072                 __u64 ntoken;
3073                 __u64 deadline;
3074                 __u64 old_resid = 0;
3075
3076                 deadline = cli->tc_check_time +
3077                           cli->tc_nsecs;
3078                 LASSERT(now >= cli->tc_check_time);
3079                 passed = now - cli->tc_check_time;
3080                 ntoken = passed * cli->tc_rpc_rate;
3081                 do_div(ntoken, NSEC_PER_SEC);
3082
3083                 ntoken += cli->tc_ntoken;
3084                 if (rule->tr_flags & NTRS_REALTIME) {
3085                         LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3086                         old_resid = cli->tc_nsecs_resid;
3087                         cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3088                         if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3089                                 ntoken++;
3090                                 cli->tc_nsecs_resid -= cli->tc_nsecs;
3091                         }
3092                 } else if (ntoken > cli->tc_depth)
3093                         ntoken = cli->tc_depth;
3094
3095                 if (ntoken > 0) {
3096                         struct ptlrpc_request *req;
3097                         nrq = list_entry(cli->tc_list.next,
3098                                              struct ptlrpc_nrs_request,
3099                                              nr_u.tbf.tr_list);
3100                         req = container_of(nrq,
3101                                            struct ptlrpc_request,
3102                                            rq_nrq);
3103                         ntoken--;
3104                         cli->tc_ntoken = ntoken;
3105                         cli->tc_check_time = now;
3106                         list_del_init(&nrq->nr_u.tbf.tr_list);
3107                         if (list_empty(&cli->tc_list)) {
3108                                 cfs_binheap_remove(head->th_binheap,
3109                                                    &cli->tc_node);
3110                                 cli->tc_in_heap = false;
3111                         } else {
3112                                 if (!(rule->tr_flags & NTRS_REALTIME))
3113                                         cli->tc_deadline = now + cli->tc_nsecs;
3114                                 cfs_binheap_relocate(head->th_binheap,
3115                                                      &cli->tc_node);
3116                         }
3117                         CDEBUG(D_RPCTRACE,
3118                                "TBF dequeues: class@%p rate %u gen %llu "
3119                                "token %llu, rule@%p rate %u gen %llu\n",
3120                                cli, cli->tc_rpc_rate,
3121                                cli->tc_rule_generation, cli->tc_ntoken,
3122                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3123                                cli->tc_rule->tr_generation);
3124                 } else {
3125                         ktime_t time;
3126
3127                         if (rule->tr_flags & NTRS_REALTIME) {
3128                                 cli->tc_deadline = deadline;
3129                                 cli->tc_nsecs_resid = old_resid;
3130                                 cfs_binheap_relocate(head->th_binheap,
3131                                                      &cli->tc_node);
3132                                 if (node != cfs_binheap_root(head->th_binheap))
3133                                         return nrs_tbf_req_get(policy,
3134                                                                peek, force);
3135                         }
3136                         policy->pol_nrs->nrs_throttling = 1;
3137                         head->th_deadline = deadline;
3138                         time = ktime_set(0, 0);
3139                         time = ktime_add_ns(time, deadline);
3140                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3141                 }
3142         }
3143
3144         return nrq;
3145 }
3146
3147 /**
3148  * Adds request \a nrq to \a policy's list of queued requests
3149  *
3150  * \param[in] policy The policy
3151  * \param[in] nrq    The request to add
3152  *
3153  * \retval 0 success; nrs_request_enqueue() assumes this function will always
3154  *                    succeed
3155  */
3156 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3157                            struct ptlrpc_nrs_request *nrq)
3158 {
3159         struct nrs_tbf_head   *head;
3160         struct nrs_tbf_client *cli;
3161         int                    rc = 0;
3162
3163         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3164
3165         cli = container_of(nrs_request_resource(nrq),
3166                            struct nrs_tbf_client, tc_res);
3167         head = container_of(nrs_request_resource(nrq)->res_parent,
3168                             struct nrs_tbf_head, th_res);
3169         if (list_empty(&cli->tc_list)) {
3170                 LASSERT(!cli->tc_in_heap);
3171                 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3172                 rc = cfs_binheap_insert(head->th_binheap, &cli->tc_node);
3173                 if (rc == 0) {
3174                         cli->tc_in_heap = true;
3175                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3176                         list_add_tail(&nrq->nr_u.tbf.tr_list,
3177                                           &cli->tc_list);
3178                         if (policy->pol_nrs->nrs_throttling) {
3179                                 __u64 deadline = cli->tc_deadline;
3180                                 if ((head->th_deadline > deadline) &&
3181                                     (hrtimer_try_to_cancel(&head->th_timer)
3182                                      >= 0)) {
3183                                         ktime_t time;
3184                                         head->th_deadline = deadline;
3185                                         time = ktime_set(0, 0);
3186                                         time = ktime_add_ns(time, deadline);
3187                                         hrtimer_start(&head->th_timer, time,
3188                                                       HRTIMER_MODE_ABS);
3189                                 }
3190                         }
3191                 }
3192         } else {
3193                 LASSERT(cli->tc_in_heap);
3194                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3195                 list_add_tail(&nrq->nr_u.tbf.tr_list,
3196                                   &cli->tc_list);
3197         }
3198
3199         if (rc == 0)
3200                 CDEBUG(D_RPCTRACE,
3201                        "TBF enqueues: class@%p rate %u gen %llu "
3202                        "token %llu, rule@%p rate %u gen %llu\n",
3203                        cli, cli->tc_rpc_rate,
3204                        cli->tc_rule_generation, cli->tc_ntoken,
3205                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3206                        cli->tc_rule->tr_generation);
3207
3208         return rc;
3209 }
3210
3211 /**
3212  * Removes request \a nrq from \a policy's list of queued requests.
3213  *
3214  * \param[in] policy The policy
3215  * \param[in] nrq    The request to remove
3216  */
3217 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3218                              struct ptlrpc_nrs_request *nrq)
3219 {
3220         struct nrs_tbf_head   *head;
3221         struct nrs_tbf_client *cli;
3222
3223         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3224
3225         cli = container_of(nrs_request_resource(nrq),
3226                            struct nrs_tbf_client, tc_res);
3227         head = container_of(nrs_request_resource(nrq)->res_parent,
3228                             struct nrs_tbf_head, th_res);
3229
3230         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3231         list_del_init(&nrq->nr_u.tbf.tr_list);
3232         if (list_empty(&cli->tc_list)) {
3233                 cfs_binheap_remove(head->th_binheap,
3234                                    &cli->tc_node);
3235                 cli->tc_in_heap = false;
3236         } else {
3237                 cfs_binheap_relocate(head->th_binheap,
3238                                      &cli->tc_node);
3239         }
3240 }
3241
3242 /**
3243  * Prints a debug statement right before the request \a nrq stops being
3244  * handled.
3245  *
3246  * \param[in] policy The policy handling the request
3247  * \param[in] nrq    The request being handled
3248  *
3249  * \see ptlrpc_server_finish_request()
3250  * \see ptlrpc_nrs_req_stop_nolock()
3251  */
3252 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3253                               struct ptlrpc_nrs_request *nrq)
3254 {
3255         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3256                                                   rq_nrq);
3257
3258         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3259
3260         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3261                policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
3262                nrq->nr_u.tbf.tr_sequence);
3263 }
3264
3265 /**
3266  * debugfs interface
3267  */
3268
3269 /**
3270  * The maximum RPC rate.
3271  */
3272 #define LPROCFS_NRS_RATE_MAX            65535
3273
3274 static int
3275 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3276 {
3277         struct ptlrpc_service       *svc = m->private;
3278         int                          rc;
3279
3280         seq_printf(m, "regular_requests:\n");
3281         /**
3282          * Perform two separate calls to this as only one of the NRS heads'
3283          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3284          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3285          */
3286         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3287                                        NRS_POL_NAME_TBF,
3288                                        NRS_CTL_TBF_RD_RULE,
3289                                        false, m);
3290         if (rc == 0) {
3291                 /**
3292                  * -ENOSPC means buf in the parameter m is overflow, return 0
3293                  * here to let upper layer function seq_read alloc a larger
3294                  * memory area and do this process again.
3295                  */
3296         } else if (rc == -ENOSPC) {
3297                 return 0;
3298
3299                 /**
3300                  * Ignore -ENODEV as the regular NRS head's policy may be in the
3301                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3302                  */
3303         } else if (rc != -ENODEV) {
3304                 return rc;
3305         }
3306
3307         if (!nrs_svc_has_hp(svc))
3308                 goto no_hp;
3309
3310         seq_printf(m, "high_priority_requests:\n");
3311         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3312                                        NRS_POL_NAME_TBF,
3313                                        NRS_CTL_TBF_RD_RULE,
3314                                        false, m);
3315         if (rc == 0) {
3316                 /**
3317                  * -ENOSPC means buf in the parameter m is overflow, return 0
3318                  * here to let upper layer function seq_read alloc a larger
3319                  * memory area and do this process again.
3320                  */
3321         } else if (rc == -ENOSPC) {
3322                 return 0;
3323         }
3324
3325 no_hp:
3326
3327         return rc;
3328 }
3329
3330 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3331 {
3332         int rc;
3333         ENTRY;
3334
3335         switch (cmd->u.tc_start.ts_valid_type) {
3336         case NRS_TBF_FLAG_JOBID:
3337                 rc = nrs_tbf_jobid_parse(cmd, token);
3338                 break;
3339         case NRS_TBF_FLAG_NID:
3340                 rc = nrs_tbf_nid_parse(cmd, token);
3341                 break;
3342         case NRS_TBF_FLAG_OPCODE:
3343                 rc = nrs_tbf_opcode_parse(cmd, token);
3344                 break;
3345         case NRS_TBF_FLAG_GENERIC:
3346                 rc = nrs_tbf_generic_parse(cmd, token);
3347                 break;
3348         case NRS_TBF_FLAG_UID:
3349         case NRS_TBF_FLAG_GID:
3350                 rc = nrs_tbf_ug_id_parse(cmd, token);
3351                 break;
3352         default:
3353                 RETURN(-EINVAL);
3354         }
3355
3356         RETURN(rc);
3357 }
3358
3359 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3360 {
3361         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3362                 switch (cmd->u.tc_start.ts_valid_type) {
3363                 case NRS_TBF_FLAG_JOBID:
3364                         nrs_tbf_jobid_cmd_fini(cmd);
3365                         break;
3366                 case NRS_TBF_FLAG_NID:
3367                         nrs_tbf_nid_cmd_fini(cmd);
3368                         break;
3369                 case NRS_TBF_FLAG_OPCODE:
3370                         nrs_tbf_opcode_cmd_fini(cmd);
3371                         break;
3372                 case NRS_TBF_FLAG_GENERIC:
3373                         nrs_tbf_generic_cmd_fini(cmd);
3374                         break;
3375                 case NRS_TBF_FLAG_UID:
3376                 case NRS_TBF_FLAG_GID:
3377                         nrs_tbf_id_cmd_fini(cmd);
3378                         break;
3379                 default:
3380                         CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3381                               cmd->u.tc_start.ts_valid_type);
3382                 }
3383         }
3384 }
3385
3386 static bool name_is_valid(const char *name)
3387 {
3388         int i;
3389
3390         for (i = 0; i < strlen(name); i++) {
3391                 if ((!isalnum(name[i])) &&
3392                     (name[i] != '_'))
3393                         return false;
3394         }
3395         return true;
3396 }
3397
3398 static int
3399 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3400 {
3401         char    *key;
3402         char    *val;
3403         int      rc;
3404         __u64    rate;
3405
3406         val = buffer;
3407         key = strsep(&val, "=");
3408         if (val == NULL || strlen(val) == 0)
3409                 return -EINVAL;
3410
3411         /* Key of the value pair */
3412         if (strcmp(key, "rate") == 0) {
3413                 rc = kstrtoull(val, 10, &rate);
3414                 if (rc)
3415                         return rc;
3416
3417                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3418                         return -EINVAL;
3419
3420                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3421                         cmd->u.tc_start.ts_rpc_rate = rate;
3422                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3423                         cmd->u.tc_change.tc_rpc_rate = rate;
3424                 else
3425                         return -EINVAL;
3426         }  else if (strcmp(key, "rank") == 0) {
3427                 if (!name_is_valid(val))
3428                         return -EINVAL;
3429
3430                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3431                         cmd->u.tc_start.ts_next_name = val;
3432                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3433                         cmd->u.tc_change.tc_next_name = val;
3434                 else
3435                         return -EINVAL;
3436         } else if (strcmp(key, "realtime") == 0) {
3437                 unsigned long realtime;
3438
3439                 rc = kstrtoul(val, 10, &realtime);
3440                 if (rc)
3441                         return rc;
3442
3443                 if (realtime > 0)
3444                         cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3445         } else {
3446                 return -EINVAL;
3447         }
3448         return 0;
3449 }
3450
3451 static int
3452 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3453 {
3454         char    *val;
3455         char    *token;
3456         int      rc;
3457
3458         val = buffer;
3459         while (val != NULL && strlen(val) != 0) {
3460                 token = strsep(&val, " ");
3461                 rc = nrs_tbf_parse_value_pair(cmd, token);
3462                 if (rc)
3463                         return rc;
3464         }
3465
3466         switch (cmd->tc_cmd) {
3467         case NRS_CTL_TBF_START_RULE:
3468                 if (cmd->u.tc_start.ts_rpc_rate == 0)
3469                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3470                 break;
3471         case NRS_CTL_TBF_CHANGE_RULE:
3472                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3473                     cmd->u.tc_change.tc_next_name == NULL)
3474                         return -EINVAL;
3475                 break;
3476         case NRS_CTL_TBF_STOP_RULE:
3477                 break;
3478         default:
3479                 return -EINVAL;
3480         }
3481         return 0;
3482 }
3483
3484 static struct nrs_tbf_cmd *
3485 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3486 {
3487         static struct nrs_tbf_cmd       *cmd;
3488         char                            *token;
3489         char                            *val;
3490         int                              rc = 0;
3491
3492         OBD_ALLOC_PTR(cmd);
3493         if (cmd == NULL)
3494                 GOTO(out, rc = -ENOMEM);
3495         memset(cmd, 0, sizeof(*cmd));
3496
3497         val = buffer;
3498         token = strsep(&val, " ");
3499         if (val == NULL || strlen(val) == 0)
3500                 GOTO(out_free_cmd, rc = -EINVAL);
3501
3502         /* Type of the command */
3503         if (strcmp(token, "start") == 0) {
3504                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3505                 cmd->u.tc_start.ts_valid_type = type_flag;
3506         } else if (strcmp(token, "stop") == 0)
3507                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3508         else if (strcmp(token, "change") == 0)
3509                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3510         else
3511                 GOTO(out_free_cmd, rc = -EINVAL);
3512
3513         /* Name of the rule */
3514         token = strsep(&val, " ");
3515         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) ||
3516             !name_is_valid(token))
3517                 GOTO(out_free_cmd, rc = -EINVAL);
3518         cmd->tc_name = token;
3519
3520         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3521                 /* List of ID */
3522                 LASSERT(val);
3523                 token = val;
3524                 val = strrchr(token, '}');
3525                 if (!val)
3526                         GOTO(out_free_cmd, rc = -EINVAL);
3527
3528                 /* Skip '}' */
3529                 val++;
3530                 if (*val == '\0') {
3531                         val = NULL;
3532                 } else if (*val == ' ') {
3533                         *val = '\0';
3534                         val++;
3535                 } else
3536                         GOTO(out_free_cmd, rc = -EINVAL);
3537
3538                 rc = nrs_tbf_id_parse(cmd, token);
3539                 if (rc)
3540                         GOTO(out_free_cmd, rc);
3541         }
3542
3543         rc = nrs_tbf_parse_value_pairs(cmd, val);
3544         if (rc)
3545                 GOTO(out_cmd_fini, rc = -EINVAL);
3546         goto out;
3547 out_cmd_fini:
3548         nrs_tbf_cmd_fini(cmd);
3549 out_free_cmd:
3550         OBD_FREE_PTR(cmd);
3551 out:
3552         if (rc)
3553                 cmd = ERR_PTR(rc);
3554         return cmd;
3555 }
3556
3557 /**
3558  * Get the TBF policy type (nid, jobid, etc) preset by
3559  * proc entry 'nrs_policies' for command buffer parsing.
3560  *
3561  * \param[in] svc the PTLRPC service
3562  * \param[in] queue the NRS queue type
3563  *
3564  * \retval the preset TBF policy type flag
3565  */
3566 static __u32
3567 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3568 {
3569         __u32   type;
3570         int     rc;
3571
3572         rc = ptlrpc_nrs_policy_control(svc, queue,
3573                                        NRS_POL_NAME_TBF,
3574                                        NRS_CTL_TBF_RD_TYPE_FLAG,
3575                                        true, &type);
3576         if (rc != 0)
3577                 type = NRS_TBF_FLAG_INVALID;
3578
3579         return type;
3580 }
3581
3582 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3583 static ssize_t
3584 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3585                                       const char __user *buffer,
3586                                       size_t count, loff_t *off)
3587 {
3588         struct seq_file           *m = file->private_data;
3589         struct ptlrpc_service     *svc = m->private;
3590         char                      *kernbuf;
3591         char                      *val;
3592         int                        rc;
3593         static struct nrs_tbf_cmd *cmd;
3594         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3595         unsigned long              length;
3596         char                      *token;
3597
3598         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3599         if (kernbuf == NULL)
3600                 GOTO(out, rc = -ENOMEM);
3601
3602         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3603                 GOTO(out_free_kernbuff, rc = -EINVAL);
3604
3605         if (copy_from_user(kernbuf, buffer, count))
3606                 GOTO(out_free_kernbuff, rc = -EFAULT);
3607
3608         val = kernbuf;
3609         token = strsep(&val, " ");
3610         if (val == NULL)
3611                 GOTO(out_free_kernbuff, rc = -EINVAL);
3612
3613         if (strcmp(token, "reg") == 0) {
3614                 queue = PTLRPC_NRS_QUEUE_REG;
3615         } else if (strcmp(token, "hp") == 0) {
3616                 queue = PTLRPC_NRS_QUEUE_HP;
3617         } else {
3618                 kernbuf[strlen(token)] = ' ';
3619                 val = kernbuf;
3620         }
3621         length = strlen(val);
3622
3623         if (length == 0)
3624                 GOTO(out_free_kernbuff, rc = -EINVAL);
3625
3626         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3627                 GOTO(out_free_kernbuff, rc = -ENODEV);
3628         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3629                 queue = PTLRPC_NRS_QUEUE_REG;
3630
3631         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3632         if (IS_ERR(cmd))
3633                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3634
3635         /**
3636          * Serialize NRS core lprocfs operations with policy registration/
3637          * unregistration.
3638          */
3639         mutex_lock(&nrs_core.nrs_mutex);
3640         rc = ptlrpc_nrs_policy_control(svc, queue,
3641                                        NRS_POL_NAME_TBF,
3642                                        NRS_CTL_TBF_WR_RULE,
3643                                        false, cmd);
3644         mutex_unlock(&nrs_core.nrs_mutex);
3645
3646         nrs_tbf_cmd_fini(cmd);
3647         OBD_FREE_PTR(cmd);
3648 out_free_kernbuff:
3649         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3650 out:
3651         return rc ? rc : count;
3652 }
3653
3654 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3655
3656 /**
3657  * Initializes a TBF policy's lprocfs interface for service \a svc
3658  *
3659  * \param[in] svc the service
3660  *
3661  * \retval 0    success
3662  * \retval != 0 error
3663  */
3664 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3665 {
3666         struct lprocfs_vars nrs_tbf_lprocfs_vars[] = {
3667                 { .name         = "nrs_tbf_rule",
3668                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3669                   .data = svc },
3670                 { NULL }
3671         };
3672
3673         if (!svc->srv_debugfs_entry)
3674                 return 0;
3675
3676         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_tbf_lprocfs_vars, NULL);
3677
3678         return 0;
3679 }
3680
3681 /**
3682  * TBF policy operations
3683  */
3684 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3685         .op_policy_start        = nrs_tbf_start,
3686         .op_policy_stop         = nrs_tbf_stop,
3687         .op_policy_ctl          = nrs_tbf_ctl,
3688         .op_res_get             = nrs_tbf_res_get,
3689         .op_res_put             = nrs_tbf_res_put,
3690         .op_req_get             = nrs_tbf_req_get,
3691         .op_req_enqueue         = nrs_tbf_req_add,
3692         .op_req_dequeue         = nrs_tbf_req_del,
3693         .op_req_stop            = nrs_tbf_req_stop,
3694         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3695 };
3696
3697 /**
3698  * TBF policy configuration
3699  */
3700 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3701         .nc_name                = NRS_POL_NAME_TBF,
3702         .nc_ops                 = &nrs_tbf_ops,
3703         .nc_compat              = nrs_policy_compat_all,
3704 };
3705
3706 /** @} tbf */
3707
3708 /** @} nrs */
3709
3710 #endif /* HAVE_SERVER_SUPPORT */