Whamcloud - gitweb
LU-9679 lustre: avoid cast of file->private_data
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 #ifdef HAVE_SERVER_SUPPORT
35
36 /**
37  * \addtogoup nrs
38  * @{
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <libcfs/libcfs.h>
45 #include <lustre_req_layout.h>
46 #include "ptlrpc_internal.h"
47
48 /**
49  * \name tbf
50  *
51  * Token Bucket Filter over client NIDs
52  *
53  * @{
54  */
55
56 #define NRS_POL_NAME_TBF        "tbf"
57
58 static int tbf_jobid_cache_size = 8192;
59 module_param(tbf_jobid_cache_size, int, 0644);
60 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
61
62 static int tbf_rate = 10000;
63 module_param(tbf_rate, int, 0644);
64 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
65
66 static int tbf_depth = 3;
67 module_param(tbf_depth, int, 0644);
68 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
69
70 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
71 {
72         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
73                                                  th_timer);
74         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
75         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
76
77         nrs->nrs_throttling = 0;
78         wake_up(&svcpt->scp_waitq);
79
80         return HRTIMER_NORESTART;
81 }
82
83 #define NRS_TBF_DEFAULT_RULE "default"
84
85 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
86 {
87         LASSERT(atomic_read(&rule->tr_ref) == 0);
88         LASSERT(list_empty(&rule->tr_cli_list));
89         LASSERT(list_empty(&rule->tr_linkage));
90
91         rule->tr_head->th_ops->o_rule_fini(rule);
92         OBD_FREE_PTR(rule);
93 }
94
95 /**
96  * Decreases the rule's usage reference count, and stops the rule in case it
97  * was already stopping and have no more outstanding usage references (which
98  * indicates it has no more queued or started requests, and can be safely
99  * stopped).
100  */
101 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
102 {
103         if (atomic_dec_and_test(&rule->tr_ref))
104                 nrs_tbf_rule_fini(rule);
105 }
106
107 /**
108  * Increases the rule's usage reference count.
109  */
110 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
111 {
112         atomic_inc(&rule->tr_ref);
113 }
114
115 static void
116 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
117 {
118         LASSERT(!list_empty(&cli->tc_linkage));
119         LASSERT(cli->tc_rule);
120         spin_lock(&cli->tc_rule->tr_rule_lock);
121         list_del_init(&cli->tc_linkage);
122         spin_unlock(&cli->tc_rule->tr_rule_lock);
123         nrs_tbf_rule_put(cli->tc_rule);
124         cli->tc_rule = NULL;
125 }
126
127 static void
128 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
129                         struct nrs_tbf_client *cli)
130
131 {
132         struct nrs_tbf_rule *rule = cli->tc_rule;
133
134         cli->tc_rpc_rate = rule->tr_rpc_rate;
135         cli->tc_nsecs = rule->tr_nsecs;
136         cli->tc_depth = rule->tr_depth;
137         cli->tc_ntoken = rule->tr_depth;
138         cli->tc_check_time = ktime_to_ns(ktime_get());
139         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
140         cli->tc_rule_generation = rule->tr_generation;
141
142         if (cli->tc_in_heap)
143                 cfs_binheap_relocate(head->th_binheap,
144                                      &cli->tc_node);
145 }
146
147 static void
148 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
149                   struct nrs_tbf_rule *rule,
150                   struct nrs_tbf_client *cli)
151 {
152         spin_lock(&cli->tc_rule_lock);
153         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
154                 LASSERT(rule != cli->tc_rule);
155                 nrs_tbf_cli_rule_put(cli);
156         }
157         LASSERT(cli->tc_rule == NULL);
158         LASSERT(list_empty(&cli->tc_linkage));
159         /* Rule's ref is added before called */
160         cli->tc_rule = rule;
161         spin_lock(&rule->tr_rule_lock);
162         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
163         spin_unlock(&rule->tr_rule_lock);
164         spin_unlock(&cli->tc_rule_lock);
165         nrs_tbf_cli_reset_value(head, cli);
166 }
167
168 static int
169 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
170 {
171         return rule->tr_head->th_ops->o_rule_dump(rule, m);
172 }
173
174 static int
175 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
176 {
177         struct nrs_tbf_rule *rule;
178         int rc = 0;
179
180         LASSERT(head != NULL);
181         spin_lock(&head->th_rule_lock);
182         /* List the rules from newest to oldest */
183         list_for_each_entry(rule, &head->th_list, tr_linkage) {
184                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
185                 rc = nrs_tbf_rule_dump(rule, m);
186                 if (rc) {
187                         rc = -ENOSPC;
188                         break;
189                 }
190         }
191         spin_unlock(&head->th_rule_lock);
192
193         return rc;
194 }
195
196 static struct nrs_tbf_rule *
197 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
198                          const char *name)
199 {
200         struct nrs_tbf_rule *rule;
201
202         LASSERT(head != NULL);
203         list_for_each_entry(rule, &head->th_list, tr_linkage) {
204                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
205                 if (strcmp(rule->tr_name, name) == 0) {
206                         nrs_tbf_rule_get(rule);
207                         return rule;
208                 }
209         }
210         return NULL;
211 }
212
213 static struct nrs_tbf_rule *
214 nrs_tbf_rule_find(struct nrs_tbf_head *head,
215                   const char *name)
216 {
217         struct nrs_tbf_rule *rule;
218
219         LASSERT(head != NULL);
220         spin_lock(&head->th_rule_lock);
221         rule = nrs_tbf_rule_find_nolock(head, name);
222         spin_unlock(&head->th_rule_lock);
223         return rule;
224 }
225
226 static struct nrs_tbf_rule *
227 nrs_tbf_rule_match(struct nrs_tbf_head *head,
228                    struct nrs_tbf_client *cli)
229 {
230         struct nrs_tbf_rule *rule = NULL;
231         struct nrs_tbf_rule *tmp_rule;
232
233         spin_lock(&head->th_rule_lock);
234         /* Match the newest rule in the list */
235         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
236                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
237                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
238                         rule = tmp_rule;
239                         break;
240                 }
241         }
242
243         if (rule == NULL)
244                 rule = head->th_rule;
245
246         nrs_tbf_rule_get(rule);
247         spin_unlock(&head->th_rule_lock);
248         return rule;
249 }
250
251 static void
252 nrs_tbf_cli_init(struct nrs_tbf_head *head,
253                  struct nrs_tbf_client *cli,
254                  struct ptlrpc_request *req)
255 {
256         struct nrs_tbf_rule *rule;
257
258         memset(cli, 0, sizeof(*cli));
259         cli->tc_in_heap = false;
260         head->th_ops->o_cli_init(cli, req);
261         INIT_LIST_HEAD(&cli->tc_list);
262         INIT_LIST_HEAD(&cli->tc_linkage);
263         spin_lock_init(&cli->tc_rule_lock);
264         atomic_set(&cli->tc_ref, 1);
265         rule = nrs_tbf_rule_match(head, cli);
266         nrs_tbf_cli_reset(head, rule, cli);
267 }
268
269 static void
270 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
271 {
272         LASSERT(list_empty(&cli->tc_list));
273         LASSERT(!cli->tc_in_heap);
274         LASSERT(atomic_read(&cli->tc_ref) == 0);
275         spin_lock(&cli->tc_rule_lock);
276         nrs_tbf_cli_rule_put(cli);
277         spin_unlock(&cli->tc_rule_lock);
278         OBD_FREE_PTR(cli);
279 }
280
281 static int
282 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
283                    struct nrs_tbf_head *head,
284                    struct nrs_tbf_cmd *start)
285 {
286         struct nrs_tbf_rule     *rule;
287         struct nrs_tbf_rule     *tmp_rule;
288         struct nrs_tbf_rule     *next_rule;
289         char                    *next_name = start->u.tc_start.ts_next_name;
290         int                      rc;
291
292         rule = nrs_tbf_rule_find(head, start->tc_name);
293         if (rule) {
294                 nrs_tbf_rule_put(rule);
295                 return -EEXIST;
296         }
297
298         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
299         if (rule == NULL)
300                 return -ENOMEM;
301
302         memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
303         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
304         rule->tr_flags = start->u.tc_start.ts_rule_flags;
305         rule->tr_nsecs = NSEC_PER_SEC;
306         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
307         rule->tr_depth = tbf_depth;
308         atomic_set(&rule->tr_ref, 1);
309         INIT_LIST_HEAD(&rule->tr_cli_list);
310         INIT_LIST_HEAD(&rule->tr_nids);
311         INIT_LIST_HEAD(&rule->tr_linkage);
312         spin_lock_init(&rule->tr_rule_lock);
313         rule->tr_head = head;
314
315         rc = head->th_ops->o_rule_init(policy, rule, start);
316         if (rc) {
317                 OBD_FREE_PTR(rule);
318                 return rc;
319         }
320
321         /* Add as the newest rule */
322         spin_lock(&head->th_rule_lock);
323         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
324         if (tmp_rule) {
325                 spin_unlock(&head->th_rule_lock);
326                 nrs_tbf_rule_put(tmp_rule);
327                 nrs_tbf_rule_put(rule);
328                 return -EEXIST;
329         }
330
331         if (next_name) {
332                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
333                 if (!next_rule) {
334                         spin_unlock(&head->th_rule_lock);
335                         nrs_tbf_rule_put(rule);
336                         return -ENOENT;
337                 }
338
339                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
340                 nrs_tbf_rule_put(next_rule);
341         } else {
342                 /* Add on the top of the rule list */
343                 list_add(&rule->tr_linkage, &head->th_list);
344         }
345         spin_unlock(&head->th_rule_lock);
346         atomic_inc(&head->th_rule_sequence);
347         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
348                 rule->tr_flags |= NTRS_DEFAULT;
349                 LASSERT(head->th_rule == NULL);
350                 head->th_rule = rule;
351         }
352
353         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %llu gen %llu\n",
354                rule, rule->tr_rpc_rate, rule->tr_generation);
355
356         return 0;
357 }
358
359 /**
360  * Change the rank of a rule in the rule list
361  *
362  * The matched rule will be moved to the position right before another
363  * given rule.
364  *
365  * \param[in] policy    the policy instance
366  * \param[in] head      the TBF policy instance
367  * \param[in] name      the rule name to be moved
368  * \param[in] next_name the rule name before which the matched rule will be
369  *                      moved
370  *
371  */
372 static int
373 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
374                          struct nrs_tbf_head *head,
375                          char *name,
376                          char *next_name)
377 {
378         struct nrs_tbf_rule     *rule = NULL;
379         struct nrs_tbf_rule     *next_rule = NULL;
380         int                      rc = 0;
381
382         LASSERT(head != NULL);
383
384         spin_lock(&head->th_rule_lock);
385         rule = nrs_tbf_rule_find_nolock(head, name);
386         if (!rule)
387                 GOTO(out, rc = -ENOENT);
388
389         if (strcmp(name, next_name) == 0)
390                 GOTO(out_put, rc);
391
392         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
393         if (!next_rule)
394                 GOTO(out_put, rc = -ENOENT);
395
396         list_move(&rule->tr_linkage, next_rule->tr_linkage.prev);
397         nrs_tbf_rule_put(next_rule);
398 out_put:
399         nrs_tbf_rule_put(rule);
400 out:
401         spin_unlock(&head->th_rule_lock);
402         return rc;
403 }
404
405 static int
406 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
407                          struct nrs_tbf_head *head,
408                          char *name,
409                          __u64 rate)
410 {
411         struct nrs_tbf_rule *rule;
412
413         assert_spin_locked(&policy->pol_nrs->nrs_lock);
414
415         rule = nrs_tbf_rule_find(head, name);
416         if (rule == NULL)
417                 return -ENOENT;
418
419         rule->tr_rpc_rate = rate;
420         rule->tr_nsecs = NSEC_PER_SEC;
421         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
422         rule->tr_generation++;
423         nrs_tbf_rule_put(rule);
424
425         return 0;
426 }
427
428 static int
429 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
430                     struct nrs_tbf_head *head,
431                     struct nrs_tbf_cmd *change)
432 {
433         __u64    rate = change->u.tc_change.tc_rpc_rate;
434         char    *next_name = change->u.tc_change.tc_next_name;
435         int      rc;
436
437         if (rate != 0) {
438                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
439                                               rate);
440                 if (rc)
441                         return rc;
442         }
443
444         if (next_name) {
445                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
446                                               next_name);
447                 if (rc)
448                         return rc;
449         }
450
451         return 0;
452 }
453
454 static int
455 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
456                   struct nrs_tbf_head *head,
457                   struct nrs_tbf_cmd *stop)
458 {
459         struct nrs_tbf_rule *rule;
460
461         assert_spin_locked(&policy->pol_nrs->nrs_lock);
462
463         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
464                 return -EPERM;
465
466         rule = nrs_tbf_rule_find(head, stop->tc_name);
467         if (rule == NULL)
468                 return -ENOENT;
469
470         list_del_init(&rule->tr_linkage);
471         rule->tr_flags |= NTRS_STOPPING;
472         nrs_tbf_rule_put(rule);
473         nrs_tbf_rule_put(rule);
474
475         return 0;
476 }
477
478 static int
479 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
480                 struct nrs_tbf_head *head,
481                 struct nrs_tbf_cmd *cmd)
482 {
483         int rc;
484
485         assert_spin_locked(&policy->pol_nrs->nrs_lock);
486
487         switch (cmd->tc_cmd) {
488         case NRS_CTL_TBF_START_RULE:
489                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
490                         return -EINVAL;
491
492                 spin_unlock(&policy->pol_nrs->nrs_lock);
493                 rc = nrs_tbf_rule_start(policy, head, cmd);
494                 spin_lock(&policy->pol_nrs->nrs_lock);
495                 return rc;
496         case NRS_CTL_TBF_CHANGE_RULE:
497                 rc = nrs_tbf_rule_change(policy, head, cmd);
498                 return rc;
499         case NRS_CTL_TBF_STOP_RULE:
500                 rc = nrs_tbf_rule_stop(policy, head, cmd);
501                 /* Take it as a success, if not exists at all */
502                 return rc == -ENOENT ? 0 : rc;
503         default:
504                 return -EFAULT;
505         }
506 }
507
508 /**
509  * Binary heap predicate.
510  *
511  * \param[in] e1 the first binheap node to compare
512  * \param[in] e2 the second binheap node to compare
513  *
514  * \retval 0 e1 > e2
515  * \retval 1 e1 < e2
516  */
517 static int
518 tbf_cli_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
519 {
520         struct nrs_tbf_client *cli1;
521         struct nrs_tbf_client *cli2;
522
523         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
524         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
525
526         if (cli1->tc_deadline < cli2->tc_deadline)
527                 return 1;
528         else if (cli1->tc_deadline > cli2->tc_deadline)
529                 return 0;
530
531         if (cli1->tc_check_time < cli2->tc_check_time)
532                 return 1;
533         else if (cli1->tc_check_time > cli2->tc_check_time)
534                 return 0;
535
536         /* Maybe need more comparasion, e.g. request number in the rules */
537         return 1;
538 }
539
540 /**
541  * TBF binary heap operations
542  */
543 static struct cfs_binheap_ops nrs_tbf_heap_ops = {
544         .hop_enter      = NULL,
545         .hop_exit       = NULL,
546         .hop_compare    = tbf_cli_compare,
547 };
548
549 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
550                                   unsigned mask)
551 {
552         return cfs_hash_djb2_hash(key, strlen(key), mask);
553 }
554
555 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
556 {
557         struct nrs_tbf_client *cli = hlist_entry(hnode,
558                                                      struct nrs_tbf_client,
559                                                      tc_hnode);
560
561         return (strcmp(cli->tc_jobid, key) == 0);
562 }
563
564 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
565 {
566         struct nrs_tbf_client *cli = hlist_entry(hnode,
567                                                      struct nrs_tbf_client,
568                                                      tc_hnode);
569
570         return cli->tc_jobid;
571 }
572
573 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
574 {
575         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
576 }
577
578 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
579 {
580         struct nrs_tbf_client *cli = hlist_entry(hnode,
581                                                      struct nrs_tbf_client,
582                                                      tc_hnode);
583
584         atomic_inc(&cli->tc_ref);
585 }
586
587 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
588 {
589         struct nrs_tbf_client *cli = hlist_entry(hnode,
590                                                      struct nrs_tbf_client,
591                                                      tc_hnode);
592
593         atomic_dec(&cli->tc_ref);
594 }
595
596 static void
597 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
598
599 {
600         struct nrs_tbf_client *cli = hlist_entry(hnode,
601                                                  struct nrs_tbf_client,
602                                                  tc_hnode);
603
604         LASSERT(atomic_read(&cli->tc_ref) == 0);
605         nrs_tbf_cli_fini(cli);
606 }
607
608 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
609         .hs_hash        = nrs_tbf_jobid_hop_hash,
610         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
611         .hs_key         = nrs_tbf_jobid_hop_key,
612         .hs_object      = nrs_tbf_hop_object,
613         .hs_get         = nrs_tbf_jobid_hop_get,
614         .hs_put         = nrs_tbf_jobid_hop_put,
615         .hs_put_locked  = nrs_tbf_jobid_hop_put,
616         .hs_exit        = nrs_tbf_jobid_hop_exit,
617 };
618
619 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
620                                   CFS_HASH_NO_ITEMREF | \
621                                   CFS_HASH_DEPTH)
622
623 static struct nrs_tbf_client *
624 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
625                           struct cfs_hash_bd *bd,
626                           const char *jobid)
627 {
628         struct hlist_node *hnode;
629         struct nrs_tbf_client *cli;
630
631         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
632         if (hnode == NULL)
633                 return NULL;
634
635         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
636         if (!list_empty(&cli->tc_lru))
637                 list_del_init(&cli->tc_lru);
638         return cli;
639 }
640
641 #define NRS_TBF_JOBID_NULL ""
642
643 static struct nrs_tbf_client *
644 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
645                        struct ptlrpc_request *req)
646 {
647         const char              *jobid;
648         struct nrs_tbf_client   *cli;
649         struct cfs_hash         *hs = head->th_cli_hash;
650         struct cfs_hash_bd               bd;
651
652         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
653         if (jobid == NULL)
654                 jobid = NRS_TBF_JOBID_NULL;
655         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
656         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
657         cfs_hash_bd_unlock(hs, &bd, 1);
658
659         return cli;
660 }
661
662 static struct nrs_tbf_client *
663 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
664                           struct nrs_tbf_client *cli)
665 {
666         const char              *jobid;
667         struct nrs_tbf_client   *ret;
668         struct cfs_hash         *hs = head->th_cli_hash;
669         struct cfs_hash_bd               bd;
670
671         jobid = cli->tc_jobid;
672         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
673         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
674         if (ret == NULL) {
675                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
676                 ret = cli;
677         }
678         cfs_hash_bd_unlock(hs, &bd, 1);
679
680         return ret;
681 }
682
683 static void
684 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
685                       struct nrs_tbf_client *cli)
686 {
687         struct cfs_hash_bd               bd;
688         struct cfs_hash         *hs = head->th_cli_hash;
689         struct nrs_tbf_bucket   *bkt;
690         int                      hw;
691         LIST_HEAD(zombies);
692
693         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
694         bkt = cfs_hash_bd_extra_get(hs, &bd);
695         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
696                 return;
697         LASSERT(list_empty(&cli->tc_lru));
698         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
699
700         /*
701          * Check and purge the LRU, there is at least one client in the LRU.
702          */
703         hw = tbf_jobid_cache_size >>
704              (hs->hs_cur_bits - hs->hs_bkt_bits);
705         while (cfs_hash_bd_count_get(&bd) > hw) {
706                 if (unlikely(list_empty(&bkt->ntb_lru)))
707                         break;
708                 cli = list_entry(bkt->ntb_lru.next,
709                                      struct nrs_tbf_client,
710                                      tc_lru);
711                 LASSERT(atomic_read(&cli->tc_ref) == 0);
712                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
713                 list_move(&cli->tc_lru, &zombies);
714         }
715         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
716
717         while (!list_empty(&zombies)) {
718                 cli = container_of0(zombies.next,
719                                     struct nrs_tbf_client, tc_lru);
720                 list_del_init(&cli->tc_lru);
721                 nrs_tbf_cli_fini(cli);
722         }
723 }
724
725 static void
726 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
727                        struct ptlrpc_request *req)
728 {
729         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
730
731         if (jobid == NULL)
732                 jobid = NRS_TBF_JOBID_NULL;
733         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
734         INIT_LIST_HEAD(&cli->tc_lru);
735         memcpy(cli->tc_jobid, jobid, strlen(jobid));
736 }
737
738 static int nrs_tbf_jobid_hash_order(void)
739 {
740         int bits;
741
742         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
743                 ;
744
745         return bits;
746 }
747
748 #define NRS_TBF_JOBID_BKT_BITS 10
749
750 static int
751 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
752                       struct nrs_tbf_head *head)
753 {
754         struct nrs_tbf_cmd       start;
755         struct nrs_tbf_bucket   *bkt;
756         int                      bits;
757         int                      i;
758         int                      rc;
759         struct cfs_hash_bd       bd;
760
761         bits = nrs_tbf_jobid_hash_order();
762         if (bits < NRS_TBF_JOBID_BKT_BITS)
763                 bits = NRS_TBF_JOBID_BKT_BITS;
764         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
765                                             bits,
766                                             bits,
767                                             NRS_TBF_JOBID_BKT_BITS,
768                                             sizeof(*bkt),
769                                             0,
770                                             0,
771                                             &nrs_tbf_jobid_hash_ops,
772                                             NRS_TBF_JOBID_HASH_FLAGS);
773         if (head->th_cli_hash == NULL)
774                 return -ENOMEM;
775
776         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
777                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
778                 INIT_LIST_HEAD(&bkt->ntb_lru);
779         }
780
781         memset(&start, 0, sizeof(start));
782         start.u.tc_start.ts_jobids_str = "*";
783
784         start.u.tc_start.ts_rpc_rate = tbf_rate;
785         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
786         start.tc_name = NRS_TBF_DEFAULT_RULE;
787         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
788         rc = nrs_tbf_rule_start(policy, head, &start);
789         if (rc) {
790                 cfs_hash_putref(head->th_cli_hash);
791                 head->th_cli_hash = NULL;
792         }
793
794         return rc;
795 }
796
797 /**
798  * Frees jobid of \a list.
799  *
800  */
801 static void
802 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
803 {
804         struct nrs_tbf_jobid *jobid, *n;
805
806         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
807                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
808                 list_del(&jobid->tj_linkage);
809                 OBD_FREE_PTR(jobid);
810         }
811 }
812
813 static int
814 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
815 {
816         struct nrs_tbf_jobid *jobid;
817         char *ptr;
818
819         OBD_ALLOC_PTR(jobid);
820         if (jobid == NULL)
821                 return -ENOMEM;
822
823         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
824         if (jobid->tj_id == NULL) {
825                 OBD_FREE_PTR(jobid);
826                 return -ENOMEM;
827         }
828
829         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
830         ptr = lprocfs_strnstr(id->ls_str, "*", id->ls_len);
831         if (ptr == NULL)
832                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
833         else
834                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
835
836         list_add_tail(&jobid->tj_linkage, jobid_list);
837         return 0;
838 }
839
840 static bool
841 cfs_match_wildcard(const char *pattern, const char *content)
842 {
843         if (*pattern == '\0' && *content == '\0')
844                 return true;
845
846         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
847                 return false;
848
849         while (*pattern == *content) {
850                 pattern++;
851                 content++;
852                 if (*pattern == '\0' && *content == '\0')
853                         return true;
854
855                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
856                     *content == '\0')
857                         return false;
858         }
859
860         if (*pattern == '*')
861                 return (cfs_match_wildcard(pattern + 1, content) ||
862                         cfs_match_wildcard(pattern, content + 1));
863
864         return false;
865 }
866
867 static inline bool
868 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
869 {
870         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
871                 return strcmp(jobid->tj_id, id) == 0;
872
873         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
874                 return cfs_match_wildcard(jobid->tj_id, id);
875
876         return false;
877 }
878
879 static int
880 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
881 {
882         struct nrs_tbf_jobid *jobid;
883
884         list_for_each_entry(jobid, jobid_list, tj_linkage) {
885                 if (nrs_tbf_jobid_match(jobid, id))
886                         return 1;
887         }
888         return 0;
889 }
890
891 static int
892 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
893 {
894         struct cfs_lstr src;
895         struct cfs_lstr res;
896         int rc = 0;
897         ENTRY;
898
899         src.ls_str = str;
900         src.ls_len = len;
901         INIT_LIST_HEAD(jobid_list);
902         while (src.ls_str) {
903                 rc = cfs_gettok(&src, ' ', &res);
904                 if (rc == 0) {
905                         rc = -EINVAL;
906                         break;
907                 }
908                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
909                 if (rc)
910                         break;
911         }
912         if (rc)
913                 nrs_tbf_jobid_list_free(jobid_list);
914         RETURN(rc);
915 }
916
917 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
918 {
919         if (!list_empty(&cmd->u.tc_start.ts_jobids))
920                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
921         if (cmd->u.tc_start.ts_jobids_str)
922                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
923                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
924 }
925
926 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
927 {
928         struct cfs_lstr res;
929         int keylen = strlen(key);
930         int rc;
931
932         rc = cfs_gettok(src, '=', &res);
933         if (rc == 0 || res.ls_len != keylen ||
934             strncmp(res.ls_str, key, keylen) != 0 ||
935             src->ls_len <= 2 || src->ls_str[0] != '{' ||
936             src->ls_str[src->ls_len - 1] != '}')
937                 return -EINVAL;
938
939         /* Skip '{' and '}' */
940         src->ls_str++;
941         src->ls_len -= 2;
942         return 0;
943 }
944
945 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
946 {
947         struct cfs_lstr src;
948         int rc;
949
950         src.ls_str = id;
951         src.ls_len = strlen(id);
952         rc = nrs_tbf_check_id_value(&src, "jobid");
953         if (rc)
954                 return rc;
955
956         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
957         if (cmd->u.tc_start.ts_jobids_str == NULL)
958                 return -ENOMEM;
959
960         memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
961
962         /* parse jobid list */
963         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
964                                       strlen(cmd->u.tc_start.ts_jobids_str),
965                                       &cmd->u.tc_start.ts_jobids);
966         if (rc)
967                 nrs_tbf_jobid_cmd_fini(cmd);
968
969         return rc;
970 }
971
972 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
973                                    struct nrs_tbf_rule *rule,
974                                    struct nrs_tbf_cmd *start)
975 {
976         int rc = 0;
977
978         LASSERT(start->u.tc_start.ts_jobids_str);
979         OBD_ALLOC(rule->tr_jobids_str,
980                   strlen(start->u.tc_start.ts_jobids_str) + 1);
981         if (rule->tr_jobids_str == NULL)
982                 return -ENOMEM;
983
984         memcpy(rule->tr_jobids_str,
985                start->u.tc_start.ts_jobids_str,
986                strlen(start->u.tc_start.ts_jobids_str));
987
988         INIT_LIST_HEAD(&rule->tr_jobids);
989         if (!list_empty(&start->u.tc_start.ts_jobids)) {
990                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
991                                               strlen(rule->tr_jobids_str),
992                                               &rule->tr_jobids);
993                 if (rc)
994                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
995         }
996         if (rc)
997                 OBD_FREE(rule->tr_jobids_str,
998                          strlen(start->u.tc_start.ts_jobids_str) + 1);
999         return rc;
1000 }
1001
1002 static int
1003 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1004 {
1005         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1006                    rule->tr_jobids_str, rule->tr_rpc_rate,
1007                    atomic_read(&rule->tr_ref) - 1);
1008         return 0;
1009 }
1010
1011 static int
1012 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1013                          struct nrs_tbf_client *cli)
1014 {
1015         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1016 }
1017
1018 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1019 {
1020         if (!list_empty(&rule->tr_jobids))
1021                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1022         LASSERT(rule->tr_jobids_str != NULL);
1023         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1024 }
1025
1026 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1027         .o_name = NRS_TBF_TYPE_JOBID,
1028         .o_startup = nrs_tbf_jobid_startup,
1029         .o_cli_find = nrs_tbf_jobid_cli_find,
1030         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1031         .o_cli_put = nrs_tbf_jobid_cli_put,
1032         .o_cli_init = nrs_tbf_jobid_cli_init,
1033         .o_rule_init = nrs_tbf_jobid_rule_init,
1034         .o_rule_dump = nrs_tbf_jobid_rule_dump,
1035         .o_rule_match = nrs_tbf_jobid_rule_match,
1036         .o_rule_fini = nrs_tbf_jobid_rule_fini,
1037 };
1038
1039 /**
1040  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1041  *
1042  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
1043  * nrs_tbf_client objects.
1044  */
1045 #define NRS_TBF_NID_BKT_BITS    8
1046 #define NRS_TBF_NID_BITS        16
1047
1048 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1049                                   unsigned mask)
1050 {
1051         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1052 }
1053
1054 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1055 {
1056         lnet_nid_t            *nid = (lnet_nid_t *)key;
1057         struct nrs_tbf_client *cli = hlist_entry(hnode,
1058                                                      struct nrs_tbf_client,
1059                                                      tc_hnode);
1060
1061         return *nid == cli->tc_nid;
1062 }
1063
1064 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1065 {
1066         struct nrs_tbf_client *cli = hlist_entry(hnode,
1067                                                      struct nrs_tbf_client,
1068                                                      tc_hnode);
1069
1070         return &cli->tc_nid;
1071 }
1072
1073 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1074 {
1075         struct nrs_tbf_client *cli = hlist_entry(hnode,
1076                                                      struct nrs_tbf_client,
1077                                                      tc_hnode);
1078
1079         atomic_inc(&cli->tc_ref);
1080 }
1081
1082 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1083 {
1084         struct nrs_tbf_client *cli = hlist_entry(hnode,
1085                                                      struct nrs_tbf_client,
1086                                                      tc_hnode);
1087
1088         atomic_dec(&cli->tc_ref);
1089 }
1090
1091 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1092 {
1093         struct nrs_tbf_client *cli = hlist_entry(hnode,
1094                                                      struct nrs_tbf_client,
1095                                                      tc_hnode);
1096
1097         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1098                  "Busy TBF object from client with NID %s, with %d refs\n",
1099                  libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1100
1101         nrs_tbf_cli_fini(cli);
1102 }
1103
1104 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1105         .hs_hash        = nrs_tbf_nid_hop_hash,
1106         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1107         .hs_key         = nrs_tbf_nid_hop_key,
1108         .hs_object      = nrs_tbf_hop_object,
1109         .hs_get         = nrs_tbf_nid_hop_get,
1110         .hs_put         = nrs_tbf_nid_hop_put,
1111         .hs_put_locked  = nrs_tbf_nid_hop_put,
1112         .hs_exit        = nrs_tbf_nid_hop_exit,
1113 };
1114
1115 static struct nrs_tbf_client *
1116 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1117                      struct ptlrpc_request *req)
1118 {
1119         return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1120 }
1121
1122 static struct nrs_tbf_client *
1123 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1124                         struct nrs_tbf_client *cli)
1125 {
1126         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1127                                        &cli->tc_hnode);
1128 }
1129
1130 static void
1131 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1132                       struct nrs_tbf_client *cli)
1133 {
1134         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1135 }
1136
1137 static int
1138 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1139                     struct nrs_tbf_head *head)
1140 {
1141         struct nrs_tbf_cmd      start;
1142         int rc;
1143
1144         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1145                                             NRS_TBF_NID_BITS,
1146                                             NRS_TBF_NID_BITS,
1147                                             NRS_TBF_NID_BKT_BITS, 0,
1148                                             CFS_HASH_MIN_THETA,
1149                                             CFS_HASH_MAX_THETA,
1150                                             &nrs_tbf_nid_hash_ops,
1151                                             CFS_HASH_RW_BKTLOCK);
1152         if (head->th_cli_hash == NULL)
1153                 return -ENOMEM;
1154
1155         memset(&start, 0, sizeof(start));
1156         start.u.tc_start.ts_nids_str = "*";
1157
1158         start.u.tc_start.ts_rpc_rate = tbf_rate;
1159         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1160         start.tc_name = NRS_TBF_DEFAULT_RULE;
1161         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1162         rc = nrs_tbf_rule_start(policy, head, &start);
1163         if (rc) {
1164                 cfs_hash_putref(head->th_cli_hash);
1165                 head->th_cli_hash = NULL;
1166         }
1167
1168         return rc;
1169 }
1170
1171 static void
1172 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1173                              struct ptlrpc_request *req)
1174 {
1175         cli->tc_nid = req->rq_peer.nid;
1176 }
1177
1178 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1179                                  struct nrs_tbf_rule *rule,
1180                                  struct nrs_tbf_cmd *start)
1181 {
1182         LASSERT(start->u.tc_start.ts_nids_str);
1183         OBD_ALLOC(rule->tr_nids_str,
1184                   strlen(start->u.tc_start.ts_nids_str) + 1);
1185         if (rule->tr_nids_str == NULL)
1186                 return -ENOMEM;
1187
1188         memcpy(rule->tr_nids_str,
1189                start->u.tc_start.ts_nids_str,
1190                strlen(start->u.tc_start.ts_nids_str));
1191
1192         INIT_LIST_HEAD(&rule->tr_nids);
1193         if (!list_empty(&start->u.tc_start.ts_nids)) {
1194                 if (cfs_parse_nidlist(rule->tr_nids_str,
1195                                       strlen(rule->tr_nids_str),
1196                                       &rule->tr_nids) <= 0) {
1197                         CERROR("nids {%s} illegal\n",
1198                                rule->tr_nids_str);
1199                         OBD_FREE(rule->tr_nids_str,
1200                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1201                         return -EINVAL;
1202                 }
1203         }
1204         return 0;
1205 }
1206
1207 static int
1208 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1209 {
1210         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1211                    rule->tr_nids_str, rule->tr_rpc_rate,
1212                    atomic_read(&rule->tr_ref) - 1);
1213         return 0;
1214 }
1215
1216 static int
1217 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1218                        struct nrs_tbf_client *cli)
1219 {
1220         return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1221 }
1222
1223 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1224 {
1225         if (!list_empty(&rule->tr_nids))
1226                 cfs_free_nidlist(&rule->tr_nids);
1227         LASSERT(rule->tr_nids_str != NULL);
1228         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1229 }
1230
1231 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1232 {
1233         if (!list_empty(&cmd->u.tc_start.ts_nids))
1234                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1235         if (cmd->u.tc_start.ts_nids_str)
1236                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1237                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1238 }
1239
1240 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1241 {
1242         struct cfs_lstr src;
1243         int rc;
1244
1245         src.ls_str = id;
1246         src.ls_len = strlen(id);
1247         rc = nrs_tbf_check_id_value(&src, "nid");
1248         if (rc)
1249                 return rc;
1250
1251         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1252         if (cmd->u.tc_start.ts_nids_str == NULL)
1253                 return -ENOMEM;
1254
1255         memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1256
1257         /* parse NID list */
1258         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1259                               strlen(cmd->u.tc_start.ts_nids_str),
1260                               &cmd->u.tc_start.ts_nids) <= 0) {
1261                 nrs_tbf_nid_cmd_fini(cmd);
1262                 return -EINVAL;
1263         }
1264
1265         return 0;
1266 }
1267
1268 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1269         .o_name = NRS_TBF_TYPE_NID,
1270         .o_startup = nrs_tbf_nid_startup,
1271         .o_cli_find = nrs_tbf_nid_cli_find,
1272         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1273         .o_cli_put = nrs_tbf_nid_cli_put,
1274         .o_cli_init = nrs_tbf_nid_cli_init,
1275         .o_rule_init = nrs_tbf_nid_rule_init,
1276         .o_rule_dump = nrs_tbf_nid_rule_dump,
1277         .o_rule_match = nrs_tbf_nid_rule_match,
1278         .o_rule_fini = nrs_tbf_nid_rule_fini,
1279 };
1280
1281 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1282                                  unsigned mask)
1283 {
1284         return cfs_hash_djb2_hash(key, strlen(key), mask);
1285 }
1286
1287 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1288 {
1289         struct nrs_tbf_client *cli = hlist_entry(hnode,
1290                                                  struct nrs_tbf_client,
1291                                                  tc_hnode);
1292
1293         return (strcmp(cli->tc_key, key) == 0);
1294 }
1295
1296 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1297 {
1298         struct nrs_tbf_client *cli = hlist_entry(hnode,
1299                                                  struct nrs_tbf_client,
1300                                                  tc_hnode);
1301         return cli->tc_key;
1302 }
1303
1304 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1305 {
1306         struct nrs_tbf_client *cli = hlist_entry(hnode,
1307                                                  struct nrs_tbf_client,
1308                                                  tc_hnode);
1309
1310         atomic_inc(&cli->tc_ref);
1311 }
1312
1313 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1314 {
1315         struct nrs_tbf_client *cli = hlist_entry(hnode,
1316                                                  struct nrs_tbf_client,
1317                                                  tc_hnode);
1318
1319         atomic_dec(&cli->tc_ref);
1320 }
1321
1322 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1323
1324 {
1325         struct nrs_tbf_client *cli = hlist_entry(hnode,
1326                                                  struct nrs_tbf_client,
1327                                                  tc_hnode);
1328
1329         LASSERT(atomic_read(&cli->tc_ref) == 0);
1330         nrs_tbf_cli_fini(cli);
1331 }
1332
1333 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1334         .hs_hash        = nrs_tbf_hop_hash,
1335         .hs_keycmp      = nrs_tbf_hop_keycmp,
1336         .hs_key         = nrs_tbf_hop_key,
1337         .hs_object      = nrs_tbf_hop_object,
1338         .hs_get         = nrs_tbf_hop_get,
1339         .hs_put         = nrs_tbf_hop_put,
1340         .hs_put_locked  = nrs_tbf_hop_put,
1341         .hs_exit        = nrs_tbf_hop_exit,
1342 };
1343
1344 #define NRS_TBF_GENERIC_BKT_BITS        10
1345 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1346                                         CFS_HASH_NO_ITEMREF | \
1347                                         CFS_HASH_DEPTH)
1348
1349 static int
1350 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1351 {
1352         struct nrs_tbf_cmd       start;
1353         struct nrs_tbf_bucket   *bkt;
1354         int                      bits;
1355         int                      i;
1356         int                      rc;
1357         struct cfs_hash_bd       bd;
1358
1359         bits = nrs_tbf_jobid_hash_order();
1360         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1361                 bits = NRS_TBF_GENERIC_BKT_BITS;
1362         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1363                                             bits, bits,
1364                                             NRS_TBF_GENERIC_BKT_BITS,
1365                                             sizeof(*bkt), 0, 0,
1366                                             &nrs_tbf_hash_ops,
1367                                             NRS_TBF_GENERIC_HASH_FLAGS);
1368         if (head->th_cli_hash == NULL)
1369                 return -ENOMEM;
1370
1371         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1372                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1373                 INIT_LIST_HEAD(&bkt->ntb_lru);
1374         }
1375
1376         memset(&start, 0, sizeof(start));
1377         start.u.tc_start.ts_conds_str = "*";
1378
1379         start.u.tc_start.ts_rpc_rate = tbf_rate;
1380         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1381         start.tc_name = NRS_TBF_DEFAULT_RULE;
1382         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1383         rc = nrs_tbf_rule_start(policy, head, &start);
1384         if (rc)
1385                 cfs_hash_putref(head->th_cli_hash);
1386
1387         return rc;
1388 }
1389
1390 static struct nrs_tbf_client *
1391 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1392                         const char *key)
1393 {
1394         struct hlist_node *hnode;
1395         struct nrs_tbf_client *cli;
1396
1397         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1398         if (hnode == NULL)
1399                 return NULL;
1400
1401         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
1402         if (!list_empty(&cli->tc_lru))
1403                 list_del_init(&cli->tc_lru);
1404         return cli;
1405 }
1406
1407 /**
1408  * ONLY opcode presented in this function will be checked in
1409  * nrs_tbf_id_cli_set(). That means, we can add or remove an
1410  * opcode to enable or disable requests handled in nrs_tbf
1411  */
1412 static struct req_format *req_fmt(__u32 opcode)
1413 {
1414         switch (opcode) {
1415         case OST_GETATTR:
1416                 return &RQF_OST_GETATTR;
1417         case OST_SETATTR:
1418                 return &RQF_OST_SETATTR;
1419         case OST_READ:
1420                 return &RQF_OST_BRW_READ;
1421         case OST_WRITE:
1422                 return &RQF_OST_BRW_WRITE;
1423         /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1424          * in most case. Should they be removed? */
1425         case OST_CREATE:
1426                 return &RQF_OST_CREATE;
1427         case OST_DESTROY:
1428                 return &RQF_OST_DESTROY;
1429         case OST_PUNCH:
1430                 return &RQF_OST_PUNCH;
1431         case OST_SYNC:
1432                 return &RQF_OST_SYNC;
1433         case OST_LADVISE:
1434                 return &RQF_OST_LADVISE;
1435         case MDS_GETATTR:
1436                 return &RQF_MDS_GETATTR;
1437         case MDS_GETATTR_NAME:
1438                 return &RQF_MDS_GETATTR_NAME;
1439         /* close is skipped to avoid LDLM cancel slowness */
1440 #if 0
1441         case MDS_CLOSE:
1442                 return &RQF_MDS_CLOSE;
1443 #endif
1444         case MDS_REINT:
1445                 return &RQF_MDS_REINT;
1446         case MDS_READPAGE:
1447                 return &RQF_MDS_READPAGE;
1448         case MDS_GET_ROOT:
1449                 return &RQF_MDS_GET_ROOT;
1450         case MDS_STATFS:
1451                 return &RQF_MDS_STATFS;
1452         case MDS_SYNC:
1453                 return &RQF_MDS_SYNC;
1454         case MDS_QUOTACTL:
1455                 return &RQF_MDS_QUOTACTL;
1456         case MDS_GETXATTR:
1457                 return &RQF_MDS_GETXATTR;
1458         case MDS_GET_INFO:
1459                 return &RQF_MDS_GET_INFO;
1460         /* HSM op is skipped */
1461 #if 0 
1462         case MDS_HSM_STATE_GET:
1463                 return &RQF_MDS_HSM_STATE_GET;
1464         case MDS_HSM_STATE_SET:
1465                 return &RQF_MDS_HSM_STATE_SET;
1466         case MDS_HSM_ACTION:
1467                 return &RQF_MDS_HSM_ACTION;
1468         case MDS_HSM_CT_REGISTER:
1469                 return &RQF_MDS_HSM_CT_REGISTER;
1470         case MDS_HSM_CT_UNREGISTER:
1471                 return &RQF_MDS_HSM_CT_UNREGISTER;
1472 #endif
1473         case MDS_SWAP_LAYOUTS:
1474                 return &RQF_MDS_SWAP_LAYOUTS;
1475         case LDLM_ENQUEUE:
1476                 return &RQF_LDLM_ENQUEUE;
1477         default:
1478                 return NULL;
1479         }
1480 }
1481
1482 static struct req_format *intent_req_fmt(__u32 it_opc)
1483 {
1484         if (it_opc & (IT_OPEN | IT_CREAT))
1485                 return &RQF_LDLM_INTENT_OPEN;
1486         else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1487                 return &RQF_LDLM_INTENT_GETATTR;
1488         else if (it_opc & IT_GETXATTR)
1489                 return &RQF_LDLM_INTENT_GETXATTR;
1490         else if (it_opc & (IT_GLIMPSE | IT_BRW))
1491                 return &RQF_LDLM_INTENT;
1492         else
1493                 return NULL;
1494 }
1495
1496 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1497                               struct tbf_id *id)
1498 {
1499         struct ost_body *body;
1500
1501         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1502         if (body != NULL) {
1503                 id->ti_uid = body->oa.o_uid;
1504                 id->ti_gid = body->oa.o_gid;
1505                 return 0;
1506         }
1507
1508         return -EINVAL;
1509 }
1510
1511 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1512                                       struct tbf_id *id)
1513 {
1514         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1515                                                     &RMF_MDT_BODY);
1516         LASSERT(b != NULL);
1517
1518         /* TODO: nodemaping feature converts {ug}id from individual
1519          * clients to the actual ones of the file system. Some work
1520          * may be needed to fix this. */
1521         id->ti_uid = b->mbo_uid;
1522         id->ti_gid = b->mbo_gid;
1523 }
1524
1525 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1526                                            struct tbf_id *id)
1527 {
1528         struct mdt_rec_reint *rec;
1529
1530         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1531         LASSERT(rec != NULL);
1532
1533         /* use the fs{ug}id as {ug}id of the process */
1534         id->ti_uid = rec->rr_fsuid;
1535         id->ti_gid = rec->rr_fsgid;
1536 }
1537
1538 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1539                               struct tbf_id *id)
1540 {
1541         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1542         int rc = 0;
1543
1544         switch (opc) {
1545         case MDS_GETATTR:
1546         case MDS_GETATTR_NAME:
1547         case MDS_GET_ROOT:
1548         case MDS_READPAGE:
1549         case MDS_SYNC:
1550         case MDS_GETXATTR:
1551         case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1552                 unpack_ugid_from_mdt_body(req, id);
1553                 break;
1554         case MDS_CLOSE:
1555         case MDS_REINT:
1556                 unpack_ugid_from_mdt_rec_reint(req, id);
1557                 break;
1558         default:
1559                 rc = -EINVAL;
1560                 break;
1561         }
1562         return rc;
1563 }
1564
1565 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1566                               struct tbf_id *id)
1567 {
1568         struct ldlm_intent *lit;
1569         struct req_format *fmt;
1570
1571         if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1572                 return -EINVAL;
1573
1574         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1575         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1576         if (lit == NULL)
1577                 return -EINVAL;
1578
1579         fmt = intent_req_fmt(lit->opc);
1580         if (fmt == NULL)
1581                 return -EINVAL;
1582
1583         req_capsule_extend(&req->rq_pill, fmt);
1584
1585         if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1586                 unpack_ugid_from_mdt_body(req, id);
1587         else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1588                 unpack_ugid_from_mdt_rec_reint(req, id);
1589         else
1590                 return -EINVAL;
1591         return 0;
1592 }
1593
1594 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1595                               enum nrs_tbf_flag ti_type)
1596 {
1597         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1598         struct req_format *fmt = req_fmt(opc);
1599         bool fmt_unset = false;
1600         int rc;
1601
1602         memset(id, 0, sizeof(struct tbf_id));
1603         id->ti_type = ti_type;
1604
1605         if (fmt == NULL)
1606                 return -EINVAL;
1607         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1608         if (req->rq_pill.rc_fmt == NULL) {
1609                 req_capsule_set(&req->rq_pill, fmt);
1610                 fmt_unset = true;
1611         }
1612
1613         if (opc < OST_LAST_OPC)
1614                 rc = ost_tbf_id_cli_set(req, id);
1615         else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1616                 rc = mdt_tbf_id_cli_set(req, id);
1617         else if (opc == LDLM_ENQUEUE)
1618                 rc = ldlm_tbf_id_cli_set(req, id);
1619         else
1620                 rc = -EINVAL;
1621
1622         /* restore it to the initialized state */
1623         if (fmt_unset)
1624                 req->rq_pill.rc_fmt = NULL;
1625         return rc;
1626 }
1627
1628 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1629                                        struct ptlrpc_request *req,
1630                                        char *keystr, size_t keystr_sz)
1631 {
1632         const char *jobid;
1633         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1634         struct tbf_id id;
1635
1636         nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1637         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1638         if (jobid == NULL)
1639                 jobid = NRS_TBF_JOBID_NULL;
1640
1641         snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1642                  libcfs_nid2str(req->rq_peer.nid), opc, id.ti_uid,
1643                  id.ti_gid);
1644
1645         if (cli) {
1646                 INIT_LIST_HEAD(&cli->tc_lru);
1647                 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1648                 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1649                 cli->tc_nid = req->rq_peer.nid;
1650                 cli->tc_opcode = opc;
1651                 cli->tc_id = id;
1652         }
1653 }
1654
1655 static struct nrs_tbf_client *
1656 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1657 {
1658         struct nrs_tbf_client *cli;
1659         struct cfs_hash *hs = head->th_cli_hash;
1660         struct cfs_hash_bd bd;
1661         char keystr[NRS_TBF_KEY_LEN];
1662
1663         nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1664         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1665         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1666         cfs_hash_bd_unlock(hs, &bd, 1);
1667
1668         return cli;
1669 }
1670
1671 static struct nrs_tbf_client *
1672 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1673                     struct nrs_tbf_client *cli)
1674 {
1675         const char              *key;
1676         struct nrs_tbf_client   *ret;
1677         struct cfs_hash         *hs = head->th_cli_hash;
1678         struct cfs_hash_bd       bd;
1679
1680         key = cli->tc_key;
1681         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1682         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1683         if (ret == NULL) {
1684                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1685                 ret = cli;
1686         }
1687         cfs_hash_bd_unlock(hs, &bd, 1);
1688
1689         return ret;
1690 }
1691
1692 static void
1693 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1694 {
1695         struct cfs_hash_bd       bd;
1696         struct cfs_hash         *hs = head->th_cli_hash;
1697         struct nrs_tbf_bucket   *bkt;
1698         int                      hw;
1699         LIST_HEAD(zombies);
1700
1701         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1702         bkt = cfs_hash_bd_extra_get(hs, &bd);
1703         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1704                 return;
1705         LASSERT(list_empty(&cli->tc_lru));
1706         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1707
1708         /**
1709          * Check and purge the LRU, there is at least one client in the LRU.
1710          */
1711         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1712         while (cfs_hash_bd_count_get(&bd) > hw) {
1713                 if (unlikely(list_empty(&bkt->ntb_lru)))
1714                         break;
1715                 cli = list_entry(bkt->ntb_lru.next,
1716                                  struct nrs_tbf_client,
1717                                  tc_lru);
1718                 LASSERT(atomic_read(&cli->tc_ref) == 0);
1719                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1720                 list_move(&cli->tc_lru, &zombies);
1721         }
1722         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1723
1724         while (!list_empty(&zombies)) {
1725                 cli = container_of0(zombies.next,
1726                                     struct nrs_tbf_client, tc_lru);
1727                 list_del_init(&cli->tc_lru);
1728                 nrs_tbf_cli_fini(cli);
1729         }
1730 }
1731
1732 static void
1733 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1734                          struct ptlrpc_request *req)
1735 {
1736         char keystr[NRS_TBF_KEY_LEN];
1737
1738         nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1739 }
1740
1741 static void
1742 nrs_tbf_id_list_free(struct list_head *uid_list)
1743 {
1744         struct nrs_tbf_id *nti_id, *n;
1745
1746         list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1747                 list_del_init(&nti_id->nti_linkage);
1748                 OBD_FREE_PTR(nti_id);
1749         }
1750 }
1751
1752 static void
1753 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1754 {
1755         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1756                 expr->te_field < NRS_TBF_FIELD_MAX);
1757         switch (expr->te_field) {
1758         case NRS_TBF_FIELD_NID:
1759                 cfs_free_nidlist(&expr->te_cond);
1760                 break;
1761         case NRS_TBF_FIELD_JOBID:
1762                 nrs_tbf_jobid_list_free(&expr->te_cond);
1763                 break;
1764         case NRS_TBF_FIELD_OPCODE:
1765                 CFS_FREE_BITMAP(expr->te_opcodes);
1766                 break;
1767         case NRS_TBF_FIELD_UID:
1768         case NRS_TBF_FIELD_GID:
1769                 nrs_tbf_id_list_free(&expr->te_cond);
1770                 break;
1771         default:
1772                 LBUG();
1773         }
1774         OBD_FREE_PTR(expr);
1775 }
1776
1777 static void
1778 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1779 {
1780         struct nrs_tbf_expression *expression;
1781         struct nrs_tbf_expression *n;
1782
1783         LASSERT(list_empty(&conjunction->tc_linkage));
1784         list_for_each_entry_safe(expression, n,
1785                                  &conjunction->tc_expressions,
1786                                  te_linkage) {
1787                 list_del_init(&expression->te_linkage);
1788                 nrs_tbf_expression_free(expression);
1789         }
1790         OBD_FREE_PTR(conjunction);
1791 }
1792
1793 static void
1794 nrs_tbf_conds_free(struct list_head *cond_list)
1795 {
1796         struct nrs_tbf_conjunction *conjunction;
1797         struct nrs_tbf_conjunction *n;
1798
1799         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1800                 list_del_init(&conjunction->tc_linkage);
1801                 nrs_tbf_conjunction_free(conjunction);
1802         }
1803 }
1804
1805 static void
1806 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1807 {
1808         if (!list_empty(&cmd->u.tc_start.ts_conds))
1809                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1810         if (cmd->u.tc_start.ts_conds_str)
1811                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1812                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1813 }
1814
1815 #define NRS_TBF_DISJUNCTION_DELIM       (',')
1816 #define NRS_TBF_CONJUNCTION_DELIM       ('&')
1817 #define NRS_TBF_EXPRESSION_DELIM        ('=')
1818
1819 static inline bool
1820 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1821 {
1822         int len = strlen(str);
1823
1824         return (field->ls_len == len &&
1825                 strncmp(field->ls_str, str, len) == 0);
1826 }
1827
1828 static int
1829 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1830 static int
1831 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
1832                       enum nrs_tbf_flag tif);
1833
1834 static int
1835 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1836 {
1837         struct nrs_tbf_expression *expr;
1838         struct cfs_lstr field;
1839         int rc = 0;
1840
1841         OBD_ALLOC_PTR(expr);
1842         if (expr == NULL)
1843                 return -ENOMEM;
1844
1845         rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1846         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1847             src->ls_str[src->ls_len - 1] != '}')
1848                 GOTO(out, rc = -EINVAL);
1849
1850         /* Skip '{' and '}' */
1851         src->ls_str++;
1852         src->ls_len -= 2;
1853
1854         if (nrs_tbf_check_field(&field, "nid")) {
1855                 if (cfs_parse_nidlist(src->ls_str,
1856                                       src->ls_len,
1857                                       &expr->te_cond) <= 0)
1858                         GOTO(out, rc = -EINVAL);
1859                 expr->te_field = NRS_TBF_FIELD_NID;
1860         } else if (nrs_tbf_check_field(&field, "jobid")) {
1861                 if (nrs_tbf_jobid_list_parse(src->ls_str,
1862                                              src->ls_len,
1863                                              &expr->te_cond) < 0)
1864                         GOTO(out, rc = -EINVAL);
1865                 expr->te_field = NRS_TBF_FIELD_JOBID;
1866         } else if (nrs_tbf_check_field(&field, "opcode")) {
1867                 if (nrs_tbf_opcode_list_parse(src->ls_str,
1868                                               src->ls_len,
1869                                               &expr->te_opcodes) < 0)
1870                         GOTO(out, rc = -EINVAL);
1871                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1872         } else if (nrs_tbf_check_field(&field, "uid")) {
1873                 if (nrs_tbf_id_list_parse(src->ls_str,
1874                                           src->ls_len,
1875                                           &expr->te_cond,
1876                                           NRS_TBF_FLAG_UID) < 0)
1877                         GOTO(out, rc = -EINVAL);
1878                 expr->te_field = NRS_TBF_FIELD_UID;
1879         } else if (nrs_tbf_check_field(&field, "gid")) {
1880                 if (nrs_tbf_id_list_parse(src->ls_str,
1881                                           src->ls_len,
1882                                           &expr->te_cond,
1883                                           NRS_TBF_FLAG_GID) < 0)
1884                         GOTO(out, rc = -EINVAL);
1885                 expr->te_field = NRS_TBF_FIELD_GID;
1886         } else {
1887                 GOTO(out, rc = -EINVAL);
1888         }
1889
1890         list_add_tail(&expr->te_linkage, cond_list);
1891         return 0;
1892 out:
1893         OBD_FREE_PTR(expr);
1894         return rc;
1895 }
1896
1897 static int
1898 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1899 {
1900         struct nrs_tbf_conjunction *conjunction;
1901         struct cfs_lstr expr;
1902         int rc = 0;
1903
1904         OBD_ALLOC_PTR(conjunction);
1905         if (conjunction == NULL)
1906                 return -ENOMEM;
1907
1908         INIT_LIST_HEAD(&conjunction->tc_expressions);
1909         list_add_tail(&conjunction->tc_linkage, cond_list);
1910
1911         while (src->ls_str) {
1912                 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1913                 if (rc == 0) {
1914                         rc = -EINVAL;
1915                         break;
1916                 }
1917                 rc = nrs_tbf_expression_parse(&expr,
1918                                               &conjunction->tc_expressions);
1919                 if (rc)
1920                         break;
1921         }
1922         return rc;
1923 }
1924
1925 static int
1926 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1927 {
1928         struct cfs_lstr src;
1929         struct cfs_lstr res;
1930         int rc = 0;
1931
1932         src.ls_str = str;
1933         src.ls_len = len;
1934         INIT_LIST_HEAD(cond_list);
1935         while (src.ls_str) {
1936                 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1937                 if (rc == 0) {
1938                         rc = -EINVAL;
1939                         break;
1940                 }
1941                 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1942                 if (rc)
1943                         break;
1944         }
1945         return rc;
1946 }
1947
1948 static int
1949 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1950 {
1951         int rc;
1952
1953         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1954         if (cmd->u.tc_start.ts_conds_str == NULL)
1955                 return -ENOMEM;
1956
1957         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1958
1959         /* Parse hybird NID and JOBID conditions */
1960         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1961                                  strlen(cmd->u.tc_start.ts_conds_str),
1962                                  &cmd->u.tc_start.ts_conds);
1963         if (rc)
1964                 nrs_tbf_generic_cmd_fini(cmd);
1965
1966         return rc;
1967 }
1968
1969 static int
1970 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1971
1972 static int
1973 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1974                          struct nrs_tbf_rule *rule,
1975                          struct nrs_tbf_client *cli)
1976 {
1977         switch (expr->te_field) {
1978         case NRS_TBF_FIELD_NID:
1979                 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1980         case NRS_TBF_FIELD_JOBID:
1981                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1982         case NRS_TBF_FIELD_OPCODE:
1983                 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1984         case NRS_TBF_FIELD_UID:
1985         case NRS_TBF_FIELD_GID:
1986                 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1987         default:
1988                 return 0;
1989         }
1990 }
1991
1992 static int
1993 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1994                           struct nrs_tbf_rule *rule,
1995                           struct nrs_tbf_client *cli)
1996 {
1997         struct nrs_tbf_expression *expr;
1998         int matched;
1999
2000         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
2001                 matched = nrs_tbf_expression_match(expr, rule, cli);
2002                 if (!matched)
2003                         return 0;
2004         }
2005
2006         return 1;
2007 }
2008
2009 static int
2010 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
2011 {
2012         struct nrs_tbf_conjunction *conjunction;
2013         int matched;
2014
2015         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2016                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2017                 if (matched)
2018                         return 1;
2019         }
2020
2021         return 0;
2022 }
2023
2024 static void
2025 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2026 {
2027         if (!list_empty(&rule->tr_conds))
2028                 nrs_tbf_conds_free(&rule->tr_conds);
2029         LASSERT(rule->tr_conds_str != NULL);
2030         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2031 }
2032
2033 static int
2034 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2035                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2036 {
2037         int rc = 0;
2038
2039         LASSERT(start->u.tc_start.ts_conds_str);
2040         OBD_ALLOC(rule->tr_conds_str,
2041                   strlen(start->u.tc_start.ts_conds_str) + 1);
2042         if (rule->tr_conds_str == NULL)
2043                 return -ENOMEM;
2044
2045         memcpy(rule->tr_conds_str,
2046                start->u.tc_start.ts_conds_str,
2047                strlen(start->u.tc_start.ts_conds_str));
2048
2049         INIT_LIST_HEAD(&rule->tr_conds);
2050         if (!list_empty(&start->u.tc_start.ts_conds)) {
2051                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2052                                          strlen(rule->tr_conds_str),
2053                                          &rule->tr_conds);
2054         }
2055         if (rc)
2056                 nrs_tbf_generic_rule_fini(rule);
2057
2058         return rc;
2059 }
2060
2061 static int
2062 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2063 {
2064         seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
2065                    rule->tr_conds_str, rule->tr_rpc_rate,
2066                    atomic_read(&rule->tr_ref) - 1);
2067         return 0;
2068 }
2069
2070 static int
2071 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2072                            struct nrs_tbf_client *cli)
2073 {
2074         return nrs_tbf_cond_match(rule, cli);
2075 }
2076
2077 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2078         .o_name = NRS_TBF_TYPE_GENERIC,
2079         .o_startup = nrs_tbf_startup,
2080         .o_cli_find = nrs_tbf_cli_find,
2081         .o_cli_findadd = nrs_tbf_cli_findadd,
2082         .o_cli_put = nrs_tbf_cli_put,
2083         .o_cli_init = nrs_tbf_generic_cli_init,
2084         .o_rule_init = nrs_tbf_rule_init,
2085         .o_rule_dump = nrs_tbf_generic_rule_dump,
2086         .o_rule_match = nrs_tbf_generic_rule_match,
2087         .o_rule_fini = nrs_tbf_generic_rule_fini,
2088 };
2089
2090 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2091 {
2092         if (rule->tr_opcodes != NULL)
2093                 CFS_FREE_BITMAP(rule->tr_opcodes);
2094
2095         LASSERT(rule->tr_opcodes_str != NULL);
2096         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2097 }
2098
2099 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2100                                         unsigned mask)
2101 {
2102         return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2103 }
2104
2105 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2106 {
2107         const __u32     *opc = key;
2108         struct nrs_tbf_client *cli = hlist_entry(hnode,
2109                                                  struct nrs_tbf_client,
2110                                                  tc_hnode);
2111
2112         return *opc == cli->tc_opcode;
2113 }
2114
2115 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2116 {
2117         struct nrs_tbf_client *cli = hlist_entry(hnode,
2118                                                  struct nrs_tbf_client,
2119                                                  tc_hnode);
2120
2121         return &cli->tc_opcode;
2122 }
2123
2124 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2125                                    struct hlist_node *hnode)
2126 {
2127         struct nrs_tbf_client *cli = hlist_entry(hnode,
2128                                                  struct nrs_tbf_client,
2129                                                  tc_hnode);
2130
2131         atomic_inc(&cli->tc_ref);
2132 }
2133
2134 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2135                                    struct hlist_node *hnode)
2136 {
2137         struct nrs_tbf_client *cli = hlist_entry(hnode,
2138                                                  struct nrs_tbf_client,
2139                                                  tc_hnode);
2140
2141         atomic_dec(&cli->tc_ref);
2142 }
2143
2144 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2145                                     struct hlist_node *hnode)
2146 {
2147         struct nrs_tbf_client *cli = hlist_entry(hnode,
2148                                                  struct nrs_tbf_client,
2149                                                  tc_hnode);
2150
2151         LASSERTF(atomic_read(&cli->tc_ref) == 0,
2152                  "Busy TBF object from client with opcode %s, with %d refs\n",
2153                  ll_opcode2str(cli->tc_opcode),
2154                  atomic_read(&cli->tc_ref));
2155
2156         nrs_tbf_cli_fini(cli);
2157 }
2158 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2159         .hs_hash        = nrs_tbf_opcode_hop_hash,
2160         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
2161         .hs_key         = nrs_tbf_opcode_hop_key,
2162         .hs_object      = nrs_tbf_hop_object,
2163         .hs_get         = nrs_tbf_opcode_hop_get,
2164         .hs_put         = nrs_tbf_opcode_hop_put,
2165         .hs_put_locked  = nrs_tbf_opcode_hop_put,
2166         .hs_exit        = nrs_tbf_opcode_hop_exit,
2167 };
2168
2169 static int
2170 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2171                     struct nrs_tbf_head *head)
2172 {
2173         struct nrs_tbf_cmd      start = { 0 };
2174         int rc;
2175
2176         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2177                                             NRS_TBF_NID_BITS,
2178                                             NRS_TBF_NID_BITS,
2179                                             NRS_TBF_NID_BKT_BITS, 0,
2180                                             CFS_HASH_MIN_THETA,
2181                                             CFS_HASH_MAX_THETA,
2182                                             &nrs_tbf_opcode_hash_ops,
2183                                             CFS_HASH_RW_BKTLOCK);
2184         if (head->th_cli_hash == NULL)
2185                 return -ENOMEM;
2186
2187         start.u.tc_start.ts_opcodes = NULL;
2188         start.u.tc_start.ts_opcodes_str = "*";
2189
2190         start.u.tc_start.ts_rpc_rate = tbf_rate;
2191         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2192         start.tc_name = NRS_TBF_DEFAULT_RULE;
2193         rc = nrs_tbf_rule_start(policy, head, &start);
2194
2195         return rc;
2196 }
2197
2198 static struct nrs_tbf_client *
2199 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2200                         struct ptlrpc_request *req)
2201 {
2202         __u32 opc;
2203
2204         opc = lustre_msg_get_opc(req->rq_reqmsg);
2205         return cfs_hash_lookup(head->th_cli_hash, &opc);
2206 }
2207
2208 static struct nrs_tbf_client *
2209 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2210                            struct nrs_tbf_client *cli)
2211 {
2212         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2213                                        &cli->tc_hnode);
2214 }
2215
2216 static void
2217 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2218                         struct ptlrpc_request *req)
2219 {
2220         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2221 }
2222
2223 #define MAX_OPCODE_LEN  32
2224 static int
2225 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
2226 {
2227         int     op = 0;
2228         char    opcode_str[MAX_OPCODE_LEN];
2229
2230         if (id->ls_len + 1 > MAX_OPCODE_LEN)
2231                 return -EINVAL;
2232
2233         memcpy(opcode_str, id->ls_str, id->ls_len);
2234         opcode_str[id->ls_len] = '\0';
2235
2236         op = ll_str2opcode(opcode_str);
2237         if (op < 0)
2238                 return -EINVAL;
2239
2240         cfs_bitmap_set(opcodes, op);
2241         return 0;
2242 }
2243
2244 static int
2245 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
2246 {
2247         struct cfs_bitmap *opcodes;
2248         struct cfs_lstr src;
2249         struct cfs_lstr res;
2250         int rc = 0;
2251         ENTRY;
2252
2253         opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
2254         if (opcodes == NULL)
2255                 return -ENOMEM;
2256
2257         src.ls_str = str;
2258         src.ls_len = len;
2259         while (src.ls_str) {
2260                 rc = cfs_gettok(&src, ' ', &res);
2261                 if (rc == 0) {
2262                         rc = -EINVAL;
2263                         break;
2264                 }
2265                 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2266                 if (rc)
2267                         break;
2268         }
2269
2270         if (rc == 0)
2271                 *bitmaptr = opcodes;
2272         else
2273                 CFS_FREE_BITMAP(opcodes);
2274
2275         RETURN(rc);
2276 }
2277
2278 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2279 {
2280         if (cmd->u.tc_start.ts_opcodes)
2281                 CFS_FREE_BITMAP(cmd->u.tc_start.ts_opcodes);
2282
2283         if (cmd->u.tc_start.ts_opcodes_str)
2284                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2285                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2286
2287 }
2288
2289 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2290 {
2291         struct cfs_lstr src;
2292         int rc;
2293
2294         src.ls_str = id;
2295         src.ls_len = strlen(id);
2296         rc = nrs_tbf_check_id_value(&src, "opcode");
2297         if (rc)
2298                 return rc;
2299
2300         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2301         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2302                 return -ENOMEM;
2303
2304         memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2305
2306         /* parse opcode list */
2307         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2308                                        strlen(cmd->u.tc_start.ts_opcodes_str),
2309                                        &cmd->u.tc_start.ts_opcodes);
2310         if (rc)
2311                 nrs_tbf_opcode_cmd_fini(cmd);
2312
2313         return rc;
2314 }
2315
2316 static int
2317 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2318                           struct nrs_tbf_client *cli)
2319 {
2320         if (rule->tr_opcodes == NULL)
2321                 return 0;
2322
2323         return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2324 }
2325
2326 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2327                                     struct nrs_tbf_rule *rule,
2328                                     struct nrs_tbf_cmd *start)
2329 {
2330         int rc = 0;
2331
2332         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2333         OBD_ALLOC(rule->tr_opcodes_str,
2334                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2335         if (rule->tr_opcodes_str == NULL)
2336                 return -ENOMEM;
2337
2338         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2339                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2340
2341         /* Default rule '*' */
2342         if (start->u.tc_start.ts_opcodes == NULL)
2343                 return 0;
2344
2345         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2346                                        strlen(rule->tr_opcodes_str),
2347                                        &rule->tr_opcodes);
2348         if (rc)
2349                 OBD_FREE(rule->tr_opcodes_str,
2350                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2351
2352         return rc;
2353 }
2354
2355 static int
2356 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2357 {
2358         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2359                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2360                    atomic_read(&rule->tr_ref) - 1);
2361         return 0;
2362 }
2363
2364
2365 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2366         .o_name = NRS_TBF_TYPE_OPCODE,
2367         .o_startup = nrs_tbf_opcode_startup,
2368         .o_cli_find = nrs_tbf_opcode_cli_find,
2369         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2370         .o_cli_put = nrs_tbf_nid_cli_put,
2371         .o_cli_init = nrs_tbf_opcode_cli_init,
2372         .o_rule_init = nrs_tbf_opcode_rule_init,
2373         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2374         .o_rule_match = nrs_tbf_opcode_rule_match,
2375         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2376 };
2377
2378 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2379                                     unsigned mask)
2380 {
2381         return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2382 }
2383
2384 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2385 {
2386         const struct tbf_id *opc = key;
2387         enum nrs_tbf_flag ntf;
2388         struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2389                                                  tc_hnode);
2390         ntf = opc->ti_type & cli->tc_id.ti_type;
2391         if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2392                 return 0;
2393
2394         if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2395                 return 0;
2396
2397         return 1;
2398 }
2399
2400 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2401 {
2402         struct nrs_tbf_client *cli = hlist_entry(hnode,
2403                                                  struct nrs_tbf_client,
2404                                                  tc_hnode);
2405         return &cli->tc_id;
2406 }
2407
2408 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2409 {
2410         struct nrs_tbf_client *cli = hlist_entry(hnode,
2411                                                  struct nrs_tbf_client,
2412                                                  tc_hnode);
2413
2414         atomic_inc(&cli->tc_ref);
2415 }
2416
2417 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2418 {
2419         struct nrs_tbf_client *cli = hlist_entry(hnode,
2420                                                  struct nrs_tbf_client,
2421                                                  tc_hnode);
2422
2423         atomic_dec(&cli->tc_ref);
2424 }
2425
2426 static void
2427 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2428
2429 {
2430         struct nrs_tbf_client *cli = hlist_entry(hnode,
2431                                                  struct nrs_tbf_client,
2432                                                  tc_hnode);
2433
2434         LASSERT(atomic_read(&cli->tc_ref) == 0);
2435         nrs_tbf_cli_fini(cli);
2436 }
2437
2438 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2439         .hs_hash        = nrs_tbf_id_hop_hash,
2440         .hs_keycmp      = nrs_tbf_id_hop_keycmp,
2441         .hs_key         = nrs_tbf_id_hop_key,
2442         .hs_object      = nrs_tbf_hop_object,
2443         .hs_get         = nrs_tbf_id_hop_get,
2444         .hs_put         = nrs_tbf_id_hop_put,
2445         .hs_put_locked  = nrs_tbf_id_hop_put,
2446         .hs_exit        = nrs_tbf_id_hop_exit,
2447 };
2448
2449 static int
2450 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2451                    struct nrs_tbf_head *head)
2452 {
2453         struct nrs_tbf_cmd start;
2454         int rc;
2455
2456         head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2457                                             NRS_TBF_NID_BITS,
2458                                             NRS_TBF_NID_BITS,
2459                                             NRS_TBF_NID_BKT_BITS, 0,
2460                                             CFS_HASH_MIN_THETA,
2461                                             CFS_HASH_MAX_THETA,
2462                                             &nrs_tbf_id_hash_ops,
2463                                             CFS_HASH_RW_BKTLOCK);
2464         if (head->th_cli_hash == NULL)
2465                 return -ENOMEM;
2466
2467         memset(&start, 0, sizeof(start));
2468         start.u.tc_start.ts_ids_str = "*";
2469         start.u.tc_start.ts_rpc_rate = tbf_rate;
2470         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2471         start.tc_name = NRS_TBF_DEFAULT_RULE;
2472         INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2473         rc = nrs_tbf_rule_start(policy, head, &start);
2474         if (rc) {
2475                 cfs_hash_putref(head->th_cli_hash);
2476                 head->th_cli_hash = NULL;
2477         }
2478
2479         return rc;
2480 }
2481
2482 static struct nrs_tbf_client *
2483 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2484                     struct ptlrpc_request *req)
2485 {
2486         struct tbf_id id;
2487
2488         LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2489                 head->th_type_flag == NRS_TBF_FLAG_GID);
2490
2491         nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2492         return cfs_hash_lookup(head->th_cli_hash, &id);
2493 }
2494
2495 static struct nrs_tbf_client *
2496 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2497                        struct nrs_tbf_client *cli)
2498 {
2499         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2500                                        &cli->tc_hnode);
2501 }
2502
2503 static void
2504 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2505                      struct ptlrpc_request *req)
2506 {
2507         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2508 }
2509
2510 static void
2511 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2512                      struct ptlrpc_request *req)
2513 {
2514         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2515 }
2516
2517 static int
2518 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2519 {
2520         struct nrs_tbf_id *nti_id;
2521         enum nrs_tbf_flag flag;
2522
2523         list_for_each_entry(nti_id, id_list, nti_linkage) {
2524                 flag = id.ti_type & nti_id->nti_id.ti_type;
2525                 if (!flag)
2526                         continue;
2527
2528                 if ((flag & NRS_TBF_FLAG_UID) &&
2529                     (id.ti_uid != nti_id->nti_id.ti_uid))
2530                         continue;
2531
2532                 if ((flag & NRS_TBF_FLAG_GID) &&
2533                     (id.ti_gid != nti_id->nti_id.ti_gid))
2534                         continue;
2535
2536                 return 1;
2537         }
2538         return 0;
2539 }
2540
2541 static int
2542 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2543                       struct nrs_tbf_client *cli)
2544 {
2545         return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2546 }
2547
2548 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2549 {
2550         nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2551
2552         if (cmd->u.tc_start.ts_ids_str)
2553                 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2554                          strlen(cmd->u.tc_start.ts_ids_str) + 1);
2555 }
2556
2557 static int
2558 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
2559                       enum nrs_tbf_flag tif)
2560 {
2561         struct cfs_lstr src;
2562         struct cfs_lstr res;
2563         int rc = 0;
2564         struct tbf_id id = { 0 };
2565         ENTRY;
2566
2567         if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2568                 RETURN(-EINVAL);
2569
2570         src.ls_str = str;
2571         src.ls_len = len;
2572         INIT_LIST_HEAD(id_list);
2573         while (src.ls_str) {
2574                 struct nrs_tbf_id *nti_id;
2575
2576                 if (cfs_gettok(&src, ' ', &res) == 0)
2577                         GOTO(out, rc = -EINVAL);
2578
2579                 id.ti_type = tif;
2580                 if (tif == NRS_TBF_FLAG_UID) {
2581                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2582                                                &id.ti_uid, 0, (u32)~0U))
2583                                 GOTO(out, rc = -EINVAL);
2584                 } else {
2585                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2586                                                &id.ti_gid, 0, (u32)~0U))
2587                                 GOTO(out, rc = -EINVAL);
2588                 }
2589
2590                 OBD_ALLOC_PTR(nti_id);
2591                 if (nti_id == NULL)
2592                         GOTO(out, rc = -ENOMEM);
2593
2594                 nti_id->nti_id = id;
2595                 list_add_tail(&nti_id->nti_linkage, id_list);
2596         }
2597 out:
2598         if (rc)
2599                 nrs_tbf_id_list_free(id_list);
2600         RETURN(rc);
2601 }
2602
2603 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2604 {
2605         struct cfs_lstr src;
2606         int rc;
2607         enum nrs_tbf_flag tif;
2608
2609         tif = cmd->u.tc_start.ts_valid_type;
2610
2611         src.ls_str = id;
2612         src.ls_len = strlen(id);
2613
2614         rc = nrs_tbf_check_id_value(&src,
2615                                     tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2616         if (rc)
2617                 return rc;
2618
2619         OBD_ALLOC(cmd->u.tc_start.ts_ids_str, src.ls_len + 1);
2620         if (cmd->u.tc_start.ts_ids_str == NULL)
2621                 return -ENOMEM;
2622
2623         strlcpy(cmd->u.tc_start.ts_ids_str, src.ls_str, src.ls_len + 1);
2624
2625         rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2626                                    strlen(cmd->u.tc_start.ts_ids_str),
2627                                    &cmd->u.tc_start.ts_ids, tif);
2628         if (rc)
2629                 nrs_tbf_id_cmd_fini(cmd);
2630
2631         return rc;
2632 }
2633
2634 static int
2635 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2636                      struct nrs_tbf_rule *rule,
2637                      struct nrs_tbf_cmd *start)
2638 {
2639         struct nrs_tbf_head *head = rule->tr_head;
2640         int rc = 0;
2641         enum nrs_tbf_flag tif = head->th_type_flag;
2642         int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2643
2644         LASSERT(start->u.tc_start.ts_ids_str);
2645         INIT_LIST_HEAD(&rule->tr_ids);
2646
2647         OBD_ALLOC(rule->tr_ids_str, ids_len);
2648         if (rule->tr_ids_str == NULL)
2649                 return -ENOMEM;
2650
2651         strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2652                 ids_len);
2653
2654         if (!list_empty(&start->u.tc_start.ts_ids)) {
2655                 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2656                                            strlen(rule->tr_ids_str),
2657                                            &rule->tr_ids, tif);
2658                 if (rc)
2659                         CERROR("%ss {%s} illegal\n",
2660                                tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2661                                rule->tr_ids_str);
2662         }
2663         if (rc) {
2664                 OBD_FREE(rule->tr_ids_str, ids_len);
2665                 rule->tr_ids_str = NULL;
2666         }
2667         return rc;
2668 }
2669
2670 static int
2671 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2672 {
2673         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2674                    rule->tr_ids_str, rule->tr_rpc_rate,
2675                    atomic_read(&rule->tr_ref) - 1);
2676         return 0;
2677 }
2678
2679 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2680 {
2681         nrs_tbf_id_list_free(&rule->tr_ids);
2682         if (rule->tr_ids_str != NULL)
2683                 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2684 }
2685
2686 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2687         .o_name = NRS_TBF_TYPE_UID,
2688         .o_startup = nrs_tbf_id_startup,
2689         .o_cli_find = nrs_tbf_id_cli_find,
2690         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2691         .o_cli_put = nrs_tbf_nid_cli_put,
2692         .o_cli_init = nrs_tbf_uid_cli_init,
2693         .o_rule_init = nrs_tbf_id_rule_init,
2694         .o_rule_dump = nrs_tbf_id_rule_dump,
2695         .o_rule_match = nrs_tbf_id_rule_match,
2696         .o_rule_fini = nrs_tbf_id_rule_fini,
2697 };
2698
2699 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2700         .o_name = NRS_TBF_TYPE_GID,
2701         .o_startup = nrs_tbf_id_startup,
2702         .o_cli_find = nrs_tbf_id_cli_find,
2703         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2704         .o_cli_put = nrs_tbf_nid_cli_put,
2705         .o_cli_init = nrs_tbf_gid_cli_init,
2706         .o_rule_init = nrs_tbf_id_rule_init,
2707         .o_rule_dump = nrs_tbf_id_rule_dump,
2708         .o_rule_match = nrs_tbf_id_rule_match,
2709         .o_rule_fini = nrs_tbf_id_rule_fini,
2710 };
2711
2712 static struct nrs_tbf_type nrs_tbf_types[] = {
2713         {
2714                 .ntt_name = NRS_TBF_TYPE_JOBID,
2715                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2716                 .ntt_ops = &nrs_tbf_jobid_ops,
2717         },
2718         {
2719                 .ntt_name = NRS_TBF_TYPE_NID,
2720                 .ntt_flag = NRS_TBF_FLAG_NID,
2721                 .ntt_ops = &nrs_tbf_nid_ops,
2722         },
2723         {
2724                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2725                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2726                 .ntt_ops = &nrs_tbf_opcode_ops,
2727         },
2728         {
2729                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2730                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2731                 .ntt_ops = &nrs_tbf_generic_ops,
2732         },
2733         {
2734                 .ntt_name = NRS_TBF_TYPE_UID,
2735                 .ntt_flag = NRS_TBF_FLAG_UID,
2736                 .ntt_ops = &nrs_tbf_uid_ops,
2737         },
2738         {
2739                 .ntt_name = NRS_TBF_TYPE_GID,
2740                 .ntt_flag = NRS_TBF_FLAG_GID,
2741                 .ntt_ops = &nrs_tbf_gid_ops,
2742         },
2743 };
2744
2745 /**
2746  * Is called before the policy transitions into
2747  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2748  * policy-specific private data structure.
2749  *
2750  * \param[in] policy The policy to start
2751  *
2752  * \retval -ENOMEM OOM error
2753  * \retval  0      success
2754  *
2755  * \see nrs_policy_register()
2756  * \see nrs_policy_ctl()
2757  */
2758 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2759 {
2760         struct nrs_tbf_head     *head;
2761         struct nrs_tbf_ops      *ops;
2762         __u32                    type;
2763         char                    *name;
2764         int found = 0;
2765         int i;
2766         int rc = 0;
2767
2768         if (arg == NULL)
2769                 name = NRS_TBF_TYPE_GENERIC;
2770         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2771                 name = arg;
2772         else
2773                 GOTO(out, rc = -EINVAL);
2774
2775         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2776                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2777                         ops = nrs_tbf_types[i].ntt_ops;
2778                         type = nrs_tbf_types[i].ntt_flag;
2779                         found = 1;
2780                         break;
2781                 }
2782         }
2783         if (found == 0)
2784                 GOTO(out, rc = -ENOTSUPP);
2785
2786         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2787         if (head == NULL)
2788                 GOTO(out, rc = -ENOMEM);
2789
2790         memcpy(head->th_type, name, strlen(name));
2791         head->th_type[strlen(name)] = '\0';
2792         head->th_ops = ops;
2793         head->th_type_flag = type;
2794
2795         head->th_binheap = cfs_binheap_create(&nrs_tbf_heap_ops,
2796                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2797                                               nrs_pol2cptab(policy),
2798                                               nrs_pol2cptid(policy));
2799         if (head->th_binheap == NULL)
2800                 GOTO(out_free_head, rc = -ENOMEM);
2801
2802         atomic_set(&head->th_rule_sequence, 0);
2803         spin_lock_init(&head->th_rule_lock);
2804         INIT_LIST_HEAD(&head->th_list);
2805         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2806         head->th_timer.function = nrs_tbf_timer_cb;
2807         rc = head->th_ops->o_startup(policy, head);
2808         if (rc)
2809                 GOTO(out_free_heap, rc);
2810
2811         policy->pol_private = head;
2812         return 0;
2813 out_free_heap:
2814         cfs_binheap_destroy(head->th_binheap);
2815 out_free_head:
2816         OBD_FREE_PTR(head);
2817 out:
2818         return rc;
2819 }
2820
2821 /**
2822  * Is called before the policy transitions into
2823  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2824  * private data structure.
2825  *
2826  * \param[in] policy The policy to stop
2827  *
2828  * \see nrs_policy_stop0()
2829  */
2830 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2831 {
2832         struct nrs_tbf_head *head = policy->pol_private;
2833         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2834         struct nrs_tbf_rule *rule, *n;
2835
2836         LASSERT(head != NULL);
2837         LASSERT(head->th_cli_hash != NULL);
2838         hrtimer_cancel(&head->th_timer);
2839         /* Should cleanup hash first before free rules */
2840         cfs_hash_putref(head->th_cli_hash);
2841         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2842                 list_del_init(&rule->tr_linkage);
2843                 nrs_tbf_rule_put(rule);
2844         }
2845         LASSERT(list_empty(&head->th_list));
2846         LASSERT(head->th_binheap != NULL);
2847         LASSERT(cfs_binheap_is_empty(head->th_binheap));
2848         cfs_binheap_destroy(head->th_binheap);
2849         OBD_FREE_PTR(head);
2850         nrs->nrs_throttling = 0;
2851         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2852 }
2853
2854 /**
2855  * Performs a policy-specific ctl function on TBF policy instances; similar
2856  * to ioctl.
2857  *
2858  * \param[in]     policy the policy instance
2859  * \param[in]     opc    the opcode
2860  * \param[in,out] arg    used for passing parameters and information
2861  *
2862  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2863  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2864  *
2865  * \retval 0   operation carried out successfully
2866  * \retval -ve error
2867  */
2868 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2869                        enum ptlrpc_nrs_ctl opc,
2870                        void *arg)
2871 {
2872         int rc = 0;
2873         ENTRY;
2874
2875         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2876
2877         switch ((enum nrs_ctl_tbf)opc) {
2878         default:
2879                 RETURN(-EINVAL);
2880
2881         /**
2882          * Read RPC rate size of a policy instance.
2883          */
2884         case NRS_CTL_TBF_RD_RULE: {
2885                 struct nrs_tbf_head *head = policy->pol_private;
2886                 struct seq_file *m = arg;
2887                 struct ptlrpc_service_part *svcpt;
2888
2889                 svcpt = policy->pol_nrs->nrs_svcpt;
2890                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2891
2892                 rc = nrs_tbf_rule_dump_all(head, m);
2893                 }
2894                 break;
2895
2896         /**
2897          * Write RPC rate of a policy instance.
2898          */
2899         case NRS_CTL_TBF_WR_RULE: {
2900                 struct nrs_tbf_head *head = policy->pol_private;
2901                 struct nrs_tbf_cmd *cmd;
2902
2903                 cmd = (struct nrs_tbf_cmd *)arg;
2904                 rc = nrs_tbf_command(policy,
2905                                      head,
2906                                      cmd);
2907                 }
2908                 break;
2909         /**
2910          * Read the TBF policy type of a policy instance.
2911          */
2912         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2913                 struct nrs_tbf_head *head = policy->pol_private;
2914
2915                 *(__u32 *)arg = head->th_type_flag;
2916                 }
2917                 break;
2918         }
2919
2920         RETURN(rc);
2921 }
2922
2923 /**
2924  * Is called for obtaining a TBF policy resource.
2925  *
2926  * \param[in]  policy     The policy on which the request is being asked for
2927  * \param[in]  nrq        The request for which resources are being taken
2928  * \param[in]  parent     Parent resource, unused in this policy
2929  * \param[out] resp       Resources references are placed in this array
2930  * \param[in]  moving_req Signifies limited caller context; unused in this
2931  *                        policy
2932  *
2933  *
2934  * \see nrs_resource_get_safe()
2935  */
2936 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2937                            struct ptlrpc_nrs_request *nrq,
2938                            const struct ptlrpc_nrs_resource *parent,
2939                            struct ptlrpc_nrs_resource **resp,
2940                            bool moving_req)
2941 {
2942         struct nrs_tbf_head   *head;
2943         struct nrs_tbf_client *cli;
2944         struct nrs_tbf_client *tmp;
2945         struct ptlrpc_request *req;
2946
2947         if (parent == NULL) {
2948                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2949                 return 0;
2950         }
2951
2952         head = container_of(parent, struct nrs_tbf_head, th_res);
2953         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2954         cli = head->th_ops->o_cli_find(head, req);
2955         if (cli != NULL) {
2956                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2957                 LASSERT(cli->tc_rule);
2958                 if (cli->tc_rule_sequence !=
2959                     atomic_read(&head->th_rule_sequence) ||
2960                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2961                         struct nrs_tbf_rule *rule;
2962
2963                         CDEBUG(D_RPCTRACE,
2964                                "TBF class@%p rate %llu sequence %d, "
2965                                "rule flags %d, head sequence %d\n",
2966                                cli, cli->tc_rpc_rate,
2967                                cli->tc_rule_sequence,
2968                                cli->tc_rule->tr_flags,
2969                                atomic_read(&head->th_rule_sequence));
2970                         rule = nrs_tbf_rule_match(head, cli);
2971                         if (rule != cli->tc_rule) {
2972                                 nrs_tbf_cli_reset(head, rule, cli);
2973                         } else {
2974                                 if (cli->tc_rule_generation != rule->tr_generation)
2975                                         nrs_tbf_cli_reset_value(head, cli);
2976                                 nrs_tbf_rule_put(rule);
2977                         }
2978                 } else if (cli->tc_rule_generation !=
2979                            cli->tc_rule->tr_generation) {
2980                         nrs_tbf_cli_reset_value(head, cli);
2981                 }
2982                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2983                 goto out;
2984         }
2985
2986         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2987                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2988         if (cli == NULL)
2989                 return -ENOMEM;
2990
2991         nrs_tbf_cli_init(head, cli, req);
2992         tmp = head->th_ops->o_cli_findadd(head, cli);
2993         if (tmp != cli) {
2994                 atomic_dec(&cli->tc_ref);
2995                 nrs_tbf_cli_fini(cli);
2996                 cli = tmp;
2997         }
2998 out:
2999         *resp = &cli->tc_res;
3000
3001         return 1;
3002 }
3003
3004 /**
3005  * Called when releasing references to the resource hierachy obtained for a
3006  * request for scheduling using the TBF policy.
3007  *
3008  * \param[in] policy   the policy the resource belongs to
3009  * \param[in] res      the resource to be released
3010  */
3011 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
3012                             const struct ptlrpc_nrs_resource *res)
3013 {
3014         struct nrs_tbf_head   *head;
3015         struct nrs_tbf_client *cli;
3016
3017         /**
3018          * Do nothing for freeing parent, nrs_tbf_net resources
3019          */
3020         if (res->res_parent == NULL)
3021                 return;
3022
3023         cli = container_of(res, struct nrs_tbf_client, tc_res);
3024         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
3025
3026         head->th_ops->o_cli_put(head, cli);
3027 }
3028
3029 /**
3030  * Called when getting a request from the TBF policy for handling, or just
3031  * peeking; removes the request from the policy when it is to be handled.
3032  *
3033  * \param[in] policy The policy
3034  * \param[in] peek   When set, signifies that we just want to examine the
3035  *                   request, and not handle it, so the request is not removed
3036  *                   from the policy.
3037  * \param[in] force  Force the policy to return a request; unused in this
3038  *                   policy
3039  *
3040  * \retval The request to be handled; this is the next request in the TBF
3041  *         rule
3042  *
3043  * \see ptlrpc_nrs_req_get_nolock()
3044  * \see nrs_request_get()
3045  */
3046 static
3047 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3048                                            bool peek, bool force)
3049 {
3050         struct nrs_tbf_head       *head = policy->pol_private;
3051         struct ptlrpc_nrs_request *nrq = NULL;
3052         struct nrs_tbf_client     *cli;
3053         struct cfs_binheap_node   *node;
3054
3055         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3056
3057         if (!peek && policy->pol_nrs->nrs_throttling)
3058                 return NULL;
3059
3060         node = cfs_binheap_root(head->th_binheap);
3061         if (unlikely(node == NULL))
3062                 return NULL;
3063
3064         cli = container_of(node, struct nrs_tbf_client, tc_node);
3065         LASSERT(cli->tc_in_heap);
3066         if (peek) {
3067                 nrq = list_entry(cli->tc_list.next,
3068                                      struct ptlrpc_nrs_request,
3069                                      nr_u.tbf.tr_list);
3070         } else {
3071                 struct nrs_tbf_rule *rule = cli->tc_rule;
3072                 __u64 now = ktime_to_ns(ktime_get());
3073                 __u64 passed;
3074                 __u64 ntoken;
3075                 __u64 deadline;
3076                 __u64 old_resid = 0;
3077
3078                 deadline = cli->tc_check_time +
3079                           cli->tc_nsecs;
3080                 LASSERT(now >= cli->tc_check_time);
3081                 passed = now - cli->tc_check_time;
3082                 ntoken = passed * cli->tc_rpc_rate;
3083                 do_div(ntoken, NSEC_PER_SEC);
3084
3085                 ntoken += cli->tc_ntoken;
3086                 if (rule->tr_flags & NTRS_REALTIME) {
3087                         LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3088                         old_resid = cli->tc_nsecs_resid;
3089                         cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3090                         if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3091                                 ntoken++;
3092                                 cli->tc_nsecs_resid -= cli->tc_nsecs;
3093                         }
3094                 } else if (ntoken > cli->tc_depth)
3095                         ntoken = cli->tc_depth;
3096
3097                 if (ntoken > 0) {
3098                         struct ptlrpc_request *req;
3099                         nrq = list_entry(cli->tc_list.next,
3100                                              struct ptlrpc_nrs_request,
3101                                              nr_u.tbf.tr_list);
3102                         req = container_of(nrq,
3103                                            struct ptlrpc_request,
3104                                            rq_nrq);
3105                         ntoken--;
3106                         cli->tc_ntoken = ntoken;
3107                         cli->tc_check_time = now;
3108                         list_del_init(&nrq->nr_u.tbf.tr_list);
3109                         if (list_empty(&cli->tc_list)) {
3110                                 cfs_binheap_remove(head->th_binheap,
3111                                                    &cli->tc_node);
3112                                 cli->tc_in_heap = false;
3113                         } else {
3114                                 if (!(rule->tr_flags & NTRS_REALTIME))
3115                                         cli->tc_deadline = now + cli->tc_nsecs;
3116                                 cfs_binheap_relocate(head->th_binheap,
3117                                                      &cli->tc_node);
3118                         }
3119                         CDEBUG(D_RPCTRACE,
3120                                "TBF dequeues: class@%p rate %llu gen %llu "
3121                                "token %llu, rule@%p rate %llu gen %llu\n",
3122                                cli, cli->tc_rpc_rate,
3123                                cli->tc_rule_generation, cli->tc_ntoken,
3124                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3125                                cli->tc_rule->tr_generation);
3126                 } else {
3127                         ktime_t time;
3128
3129                         if (rule->tr_flags & NTRS_REALTIME) {
3130                                 cli->tc_deadline = deadline;
3131                                 cli->tc_nsecs_resid = old_resid;
3132                                 cfs_binheap_relocate(head->th_binheap,
3133                                                      &cli->tc_node);
3134                                 if (node != cfs_binheap_root(head->th_binheap))
3135                                         return nrs_tbf_req_get(policy,
3136                                                                peek, force);
3137                         }
3138                         policy->pol_nrs->nrs_throttling = 1;
3139                         head->th_deadline = deadline;
3140                         time = ktime_set(0, 0);
3141                         time = ktime_add_ns(time, deadline);
3142                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3143                 }
3144         }
3145
3146         return nrq;
3147 }
3148
3149 /**
3150  * Adds request \a nrq to \a policy's list of queued requests
3151  *
3152  * \param[in] policy The policy
3153  * \param[in] nrq    The request to add
3154  *
3155  * \retval 0 success; nrs_request_enqueue() assumes this function will always
3156  *                    succeed
3157  */
3158 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3159                            struct ptlrpc_nrs_request *nrq)
3160 {
3161         struct nrs_tbf_head   *head;
3162         struct nrs_tbf_client *cli;
3163         int                    rc = 0;
3164
3165         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3166
3167         cli = container_of(nrs_request_resource(nrq),
3168                            struct nrs_tbf_client, tc_res);
3169         head = container_of(nrs_request_resource(nrq)->res_parent,
3170                             struct nrs_tbf_head, th_res);
3171         if (list_empty(&cli->tc_list)) {
3172                 LASSERT(!cli->tc_in_heap);
3173                 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3174                 rc = cfs_binheap_insert(head->th_binheap, &cli->tc_node);
3175                 if (rc == 0) {
3176                         cli->tc_in_heap = true;
3177                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3178                         list_add_tail(&nrq->nr_u.tbf.tr_list,
3179                                           &cli->tc_list);
3180                         if (policy->pol_nrs->nrs_throttling) {
3181                                 __u64 deadline = cli->tc_deadline;
3182                                 if ((head->th_deadline > deadline) &&
3183                                     (hrtimer_try_to_cancel(&head->th_timer)
3184                                      >= 0)) {
3185                                         ktime_t time;
3186                                         head->th_deadline = deadline;
3187                                         time = ktime_set(0, 0);
3188                                         time = ktime_add_ns(time, deadline);
3189                                         hrtimer_start(&head->th_timer, time,
3190                                                       HRTIMER_MODE_ABS);
3191                                 }
3192                         }
3193                 }
3194         } else {
3195                 LASSERT(cli->tc_in_heap);
3196                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3197                 list_add_tail(&nrq->nr_u.tbf.tr_list,
3198                                   &cli->tc_list);
3199         }
3200
3201         if (rc == 0)
3202                 CDEBUG(D_RPCTRACE,
3203                        "TBF enqueues: class@%p rate %llu gen %llu "
3204                        "token %llu, rule@%p rate %llu gen %llu\n",
3205                        cli, cli->tc_rpc_rate,
3206                        cli->tc_rule_generation, cli->tc_ntoken,
3207                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3208                        cli->tc_rule->tr_generation);
3209
3210         return rc;
3211 }
3212
3213 /**
3214  * Removes request \a nrq from \a policy's list of queued requests.
3215  *
3216  * \param[in] policy The policy
3217  * \param[in] nrq    The request to remove
3218  */
3219 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3220                              struct ptlrpc_nrs_request *nrq)
3221 {
3222         struct nrs_tbf_head   *head;
3223         struct nrs_tbf_client *cli;
3224
3225         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3226
3227         cli = container_of(nrs_request_resource(nrq),
3228                            struct nrs_tbf_client, tc_res);
3229         head = container_of(nrs_request_resource(nrq)->res_parent,
3230                             struct nrs_tbf_head, th_res);
3231
3232         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3233         list_del_init(&nrq->nr_u.tbf.tr_list);
3234         if (list_empty(&cli->tc_list)) {
3235                 cfs_binheap_remove(head->th_binheap,
3236                                    &cli->tc_node);
3237                 cli->tc_in_heap = false;
3238         } else {
3239                 cfs_binheap_relocate(head->th_binheap,
3240                                      &cli->tc_node);
3241         }
3242 }
3243
3244 /**
3245  * Prints a debug statement right before the request \a nrq stops being
3246  * handled.
3247  *
3248  * \param[in] policy The policy handling the request
3249  * \param[in] nrq    The request being handled
3250  *
3251  * \see ptlrpc_server_finish_request()
3252  * \see ptlrpc_nrs_req_stop_nolock()
3253  */
3254 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3255                               struct ptlrpc_nrs_request *nrq)
3256 {
3257         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3258                                                   rq_nrq);
3259
3260         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3261
3262         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3263                policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
3264                nrq->nr_u.tbf.tr_sequence);
3265 }
3266
3267 /**
3268  * debugfs interface
3269  */
3270
3271 /**
3272  * The maximum RPC rate.
3273  */
3274 #define LPROCFS_NRS_RATE_MAX            65535
3275
3276 static int
3277 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3278 {
3279         struct ptlrpc_service       *svc = m->private;
3280         int                          rc;
3281
3282         seq_printf(m, "regular_requests:\n");
3283         /**
3284          * Perform two separate calls to this as only one of the NRS heads'
3285          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3286          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3287          */
3288         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3289                                        NRS_POL_NAME_TBF,
3290                                        NRS_CTL_TBF_RD_RULE,
3291                                        false, m);
3292         if (rc == 0) {
3293                 /**
3294                  * -ENOSPC means buf in the parameter m is overflow, return 0
3295                  * here to let upper layer function seq_read alloc a larger
3296                  * memory area and do this process again.
3297                  */
3298         } else if (rc == -ENOSPC) {
3299                 return 0;
3300
3301                 /**
3302                  * Ignore -ENODEV as the regular NRS head's policy may be in the
3303                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3304                  */
3305         } else if (rc != -ENODEV) {
3306                 return rc;
3307         }
3308
3309         if (!nrs_svc_has_hp(svc))
3310                 goto no_hp;
3311
3312         seq_printf(m, "high_priority_requests:\n");
3313         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3314                                        NRS_POL_NAME_TBF,
3315                                        NRS_CTL_TBF_RD_RULE,
3316                                        false, m);
3317         if (rc == 0) {
3318                 /**
3319                  * -ENOSPC means buf in the parameter m is overflow, return 0
3320                  * here to let upper layer function seq_read alloc a larger
3321                  * memory area and do this process again.
3322                  */
3323         } else if (rc == -ENOSPC) {
3324                 return 0;
3325         }
3326
3327 no_hp:
3328
3329         return rc;
3330 }
3331
3332 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3333 {
3334         int rc;
3335         ENTRY;
3336
3337         switch (cmd->u.tc_start.ts_valid_type) {
3338         case NRS_TBF_FLAG_JOBID:
3339                 rc = nrs_tbf_jobid_parse(cmd, token);
3340                 break;
3341         case NRS_TBF_FLAG_NID:
3342                 rc = nrs_tbf_nid_parse(cmd, token);
3343                 break;
3344         case NRS_TBF_FLAG_OPCODE:
3345                 rc = nrs_tbf_opcode_parse(cmd, token);
3346                 break;
3347         case NRS_TBF_FLAG_GENERIC:
3348                 rc = nrs_tbf_generic_parse(cmd, token);
3349                 break;
3350         case NRS_TBF_FLAG_UID:
3351         case NRS_TBF_FLAG_GID:
3352                 rc = nrs_tbf_ug_id_parse(cmd, token);
3353                 break;
3354         default:
3355                 RETURN(-EINVAL);
3356         }
3357
3358         RETURN(rc);
3359 }
3360
3361 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3362 {
3363         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3364                 switch (cmd->u.tc_start.ts_valid_type) {
3365                 case NRS_TBF_FLAG_JOBID:
3366                         nrs_tbf_jobid_cmd_fini(cmd);
3367                         break;
3368          &nb