Whamcloud - gitweb
LU-19098 hsm: don't print progname twice with lhsmtool
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (C) 2013 DataDirect Networks, Inc.
5  *
6  * Copyright (c) 2014, 2016, Intel Corporation.
7  */
8
9 /*
10  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
11  */
12
13 #define DEBUG_SUBSYSTEM S_RPC
14 #include <linux/delay.h>
15 #include <cfs_hash.h>
16 #include <obd_support.h>
17 #include <obd_class.h>
18 #include <libcfs/libcfs.h>
19 #include <lustre_req_layout.h>
20 #include "ptlrpc_internal.h"
21
22 /**
23  * \name tbf
24  *
25  * Token Bucket Filter over client NIDs
26  *
27  * @{
28  */
29
30 #define NRS_POL_NAME_TBF        "tbf"
31
32 static int tbf_jobid_cache_size = 8192;
33 module_param(tbf_jobid_cache_size, int, 0644);
34 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
35
36 static int tbf_rate = 10000;
37 module_param(tbf_rate, int, 0644);
38 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
39
40 static int tbf_depth = 3;
41 module_param(tbf_depth, int, 0644);
42 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
43
44 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
45 {
46         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
47                                                  th_timer);
48         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
49         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
50
51         nrs->nrs_throttling = 0;
52         wake_up(&svcpt->scp_waitq);
53
54         return HRTIMER_NORESTART;
55 }
56
57 #define NRS_TBF_DEFAULT_RULE "default"
58
59 /* rule's usage reference count is now dropped below one. There is no more
60  * outstanding usage references left. Stops the rule in case it was already
61  * stopping.
62  */
63 static void nrs_tbf_rule_fini(struct kref *kref)
64 {
65         struct nrs_tbf_rule *rule = container_of(kref, struct nrs_tbf_rule,
66                                                  tr_ref);
67
68         LASSERT(list_empty(&rule->tr_cli_list));
69         LASSERT(list_empty(&rule->tr_linkage));
70
71         rule->tr_head->th_ops->o_rule_fini(rule);
72         OBD_FREE_PTR(rule);
73 }
74
75 static void
76 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
77 {
78         LASSERT(!list_empty(&cli->tc_linkage));
79         LASSERT(cli->tc_rule);
80         spin_lock(&cli->tc_rule->tr_rule_lock);
81         list_del_init(&cli->tc_linkage);
82         spin_unlock(&cli->tc_rule->tr_rule_lock);
83         kref_put(&cli->tc_rule->tr_ref, nrs_tbf_rule_fini);
84         cli->tc_rule = NULL;
85 }
86
87 static void
88 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
89                         struct nrs_tbf_client *cli)
90
91 {
92         struct nrs_tbf_rule *rule = cli->tc_rule;
93
94         cli->tc_rpc_rate = rule->tr_rpc_rate;
95         cli->tc_nsecs = rule->tr_nsecs_per_rpc;
96         cli->tc_nsecs_resid = 0;
97         cli->tc_depth = rule->tr_depth;
98         cli->tc_ntoken = rule->tr_depth;
99         cli->tc_check_time = ktime_to_ns(ktime_get());
100         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
101         cli->tc_rule_generation = rule->tr_generation;
102
103         if (cli->tc_in_heap)
104                 binheap_relocate(head->th_binheap,
105                                  &cli->tc_node);
106 }
107
108 static void
109 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
110                   struct nrs_tbf_rule *rule,
111                   struct nrs_tbf_client *cli)
112 {
113         spin_lock(&cli->tc_rule_lock);
114         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
115                 LASSERT(rule != cli->tc_rule);
116                 nrs_tbf_cli_rule_put(cli);
117         }
118         LASSERT(cli->tc_rule == NULL);
119         LASSERT(list_empty(&cli->tc_linkage));
120         /* Rule's ref is added before called */
121         cli->tc_rule = rule;
122         spin_lock(&rule->tr_rule_lock);
123         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
124         spin_unlock(&rule->tr_rule_lock);
125         spin_unlock(&cli->tc_rule_lock);
126         nrs_tbf_cli_reset_value(head, cli);
127 }
128
129 static int
130 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
131 {
132         return rule->tr_head->th_ops->o_rule_dump(rule, m);
133 }
134
135 static int
136 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
137 {
138         struct nrs_tbf_rule *rule;
139         int rc = 0;
140
141         LASSERT(head != NULL);
142         spin_lock(&head->th_rule_lock);
143         /* List the rules from newest to oldest */
144         list_for_each_entry(rule, &head->th_list, tr_linkage) {
145                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
146                 rc = nrs_tbf_rule_dump(rule, m);
147                 if (rc) {
148                         rc = -ENOSPC;
149                         break;
150                 }
151         }
152         spin_unlock(&head->th_rule_lock);
153
154         return rc;
155 }
156
157 static struct nrs_tbf_rule *
158 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
159                          const char *name)
160 {
161         struct nrs_tbf_rule *rule;
162
163         LASSERT(head != NULL);
164         list_for_each_entry(rule, &head->th_list, tr_linkage) {
165                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
166                 if (strcmp(rule->tr_name, name) == 0) {
167                         kref_get(&rule->tr_ref);
168                         return rule;
169                 }
170         }
171         return NULL;
172 }
173
174 static struct nrs_tbf_rule *
175 nrs_tbf_rule_find(struct nrs_tbf_head *head,
176                   const char *name)
177 {
178         struct nrs_tbf_rule *rule;
179
180         LASSERT(head != NULL);
181         spin_lock(&head->th_rule_lock);
182         rule = nrs_tbf_rule_find_nolock(head, name);
183         spin_unlock(&head->th_rule_lock);
184         return rule;
185 }
186
187 static struct nrs_tbf_rule *
188 nrs_tbf_rule_match(struct nrs_tbf_head *head,
189                    struct nrs_tbf_client *cli)
190 {
191         struct nrs_tbf_rule *rule = NULL;
192         struct nrs_tbf_rule *tmp_rule;
193
194         spin_lock(&head->th_rule_lock);
195         /* Match the newest rule in the list */
196         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
197                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
198                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
199                         rule = tmp_rule;
200                         break;
201                 }
202         }
203
204         if (rule == NULL)
205                 rule = head->th_rule;
206
207         kref_get(&rule->tr_ref);
208         spin_unlock(&head->th_rule_lock);
209         return rule;
210 }
211
212 static void
213 nrs_tbf_cli_init(struct nrs_tbf_head *head,
214                  struct nrs_tbf_client *cli,
215                  struct ptlrpc_request *req)
216 {
217         struct nrs_tbf_rule *rule;
218
219         memset(cli, 0, sizeof(*cli));
220         cli->tc_in_heap = false;
221         head->th_ops->o_cli_init(cli, req);
222         INIT_LIST_HEAD(&cli->tc_list);
223         INIT_LIST_HEAD(&cli->tc_linkage);
224         spin_lock_init(&cli->tc_rule_lock);
225         refcount_set(&cli->tc_ref, 1);
226         rule = nrs_tbf_rule_match(head, cli);
227         nrs_tbf_cli_reset(head, rule, cli);
228 }
229
230 static void nrs_tbf_cli_free(struct rcu_head *head)
231 {
232         struct nrs_tbf_client *cli = container_of(head, struct nrs_tbf_client,
233                                                   tc_rcu_head);
234         OBD_FREE_PTR(cli);
235 }
236
237 static void
238 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
239 {
240         LASSERT(list_empty(&cli->tc_list));
241         LASSERT(!cli->tc_in_heap);
242         spin_lock(&cli->tc_rule_lock);
243         nrs_tbf_cli_rule_put(cli);
244         spin_unlock(&cli->tc_rule_lock);
245
246         if (cli->tc_id.ti_type & NRS_TBF_FLAG_NID)
247                 call_rcu(&cli->tc_rcu_head, nrs_tbf_cli_free);
248         else
249                 OBD_FREE_PTR(cli);
250 }
251
252 static int
253 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
254                    struct nrs_tbf_head *head,
255                    struct nrs_tbf_cmd *start)
256 {
257         struct nrs_tbf_rule     *rule;
258         struct nrs_tbf_rule     *tmp_rule;
259         struct nrs_tbf_rule     *next_rule;
260         char                    *next_name = start->u.tc_start.ts_next_name;
261         int                      rc;
262
263         rule = nrs_tbf_rule_find(head, start->tc_name);
264         if (rule) {
265                 kref_put(&rule->tr_ref, nrs_tbf_rule_fini);
266                 return -EEXIST;
267         }
268
269         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
270         if (rule == NULL)
271                 return -ENOMEM;
272
273         strscpy(rule->tr_name, start->tc_name, sizeof(rule->tr_name));
274         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
275         rule->tr_flags = start->u.tc_start.ts_rule_flags;
276         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
277         rule->tr_depth = tbf_depth;
278         kref_init(&rule->tr_ref);
279         INIT_LIST_HEAD(&rule->tr_cli_list);
280         INIT_LIST_HEAD(&rule->tr_nids);
281         INIT_LIST_HEAD(&rule->tr_linkage);
282         spin_lock_init(&rule->tr_rule_lock);
283         rule->tr_head = head;
284
285         rc = head->th_ops->o_rule_init(policy, rule, start);
286         if (rc) {
287                 OBD_FREE_PTR(rule);
288                 return rc;
289         }
290
291         /* Add as the newest rule */
292         spin_lock(&head->th_rule_lock);
293         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
294         if (tmp_rule) {
295                 spin_unlock(&head->th_rule_lock);
296                 kref_put(&tmp_rule->tr_ref, nrs_tbf_rule_fini);
297                 kref_put(&rule->tr_ref, nrs_tbf_rule_fini);
298                 return -EEXIST;
299         }
300
301         if (next_name) {
302                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
303                 if (!next_rule) {
304                         spin_unlock(&head->th_rule_lock);
305                         kref_put(&rule->tr_ref, nrs_tbf_rule_fini);
306                         return -ENOENT;
307                 }
308
309                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
310                 kref_put(&next_rule->tr_ref, nrs_tbf_rule_fini);
311         } else {
312                 /* Add on the top of the rule list */
313                 list_add(&rule->tr_linkage, &head->th_list);
314         }
315         spin_unlock(&head->th_rule_lock);
316         atomic_inc(&head->th_rule_sequence);
317         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
318                 rule->tr_flags |= NTRS_DEFAULT;
319                 LASSERT(head->th_rule == NULL);
320                 head->th_rule = rule;
321         }
322
323         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %llu gen %llu\n",
324                rule, rule->tr_rpc_rate, rule->tr_generation);
325
326         return 0;
327 }
328
329 /**
330  * Change the rank of a rule in the rule list
331  *
332  * The matched rule will be moved to the position right before another
333  * given rule.
334  *
335  * \param[in] policy    the policy instance
336  * \param[in] head      the TBF policy instance
337  * \param[in] name      the rule name to be moved
338  * \param[in] next_name the rule name before which the matched rule will be
339  *                      moved
340  *
341  */
342 static int
343 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
344                          struct nrs_tbf_head *head,
345                          char *name,
346                          char *next_name)
347 {
348         struct nrs_tbf_rule     *rule = NULL;
349         struct nrs_tbf_rule     *next_rule = NULL;
350         int                      rc = 0;
351
352         LASSERT(head != NULL);
353
354         spin_lock(&head->th_rule_lock);
355         rule = nrs_tbf_rule_find_nolock(head, name);
356         if (!rule)
357                 GOTO(out, rc = -ENOENT);
358
359         if (strcmp(name, next_name) == 0)
360                 GOTO(out_put, rc);
361
362         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
363         if (!next_rule)
364                 GOTO(out_put, rc = -ENOENT);
365
366         /* rules may be adjacent in same list, so list_move() isn't safe here */
367         list_move_tail(&rule->tr_linkage, &next_rule->tr_linkage);
368         kref_put(&next_rule->tr_ref, nrs_tbf_rule_fini);
369 out_put:
370         kref_put(&rule->tr_ref, nrs_tbf_rule_fini);
371 out:
372         spin_unlock(&head->th_rule_lock);
373         return rc;
374 }
375
376 static int
377 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
378                          struct nrs_tbf_head *head,
379                          char *name,
380                          __u64 rate)
381 {
382         struct nrs_tbf_rule *rule;
383
384         assert_spin_locked(&policy->pol_nrs->nrs_lock);
385
386         rule = nrs_tbf_rule_find(head, name);
387         if (rule == NULL)
388                 return -ENOENT;
389
390         rule->tr_rpc_rate = rate;
391         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
392         rule->tr_generation++;
393         kref_put(&rule->tr_ref, nrs_tbf_rule_fini);
394
395         return 0;
396 }
397
398 static int
399 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
400                     struct nrs_tbf_head *head,
401                     struct nrs_tbf_cmd *change)
402 {
403         __u64    rate = change->u.tc_change.tc_rpc_rate;
404         char    *next_name = change->u.tc_change.tc_next_name;
405         int      rc;
406
407         if (rate != 0) {
408                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
409                                               rate);
410                 if (rc)
411                         return rc;
412         }
413
414         if (next_name) {
415                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
416                                               next_name);
417                 if (rc)
418                         return rc;
419         }
420
421         return 0;
422 }
423
424 static int
425 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
426                   struct nrs_tbf_head *head,
427                   struct nrs_tbf_cmd *stop)
428 {
429         struct nrs_tbf_rule *rule;
430
431         assert_spin_locked(&policy->pol_nrs->nrs_lock);
432
433         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
434                 return -EPERM;
435
436         rule = nrs_tbf_rule_find(head, stop->tc_name);
437         if (rule == NULL)
438                 return -ENOENT;
439
440         list_del_init(&rule->tr_linkage);
441         rule->tr_flags |= NTRS_STOPPING;
442         kref_put(&rule->tr_ref, nrs_tbf_rule_fini);
443         kref_put(&rule->tr_ref, nrs_tbf_rule_fini);
444
445         return 0;
446 }
447
448 static int
449 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
450                 struct nrs_tbf_head *head,
451                 struct nrs_tbf_cmd *cmd)
452 {
453         int rc;
454
455         assert_spin_locked(&policy->pol_nrs->nrs_lock);
456
457         switch (cmd->tc_cmd) {
458         case NRS_CTL_TBF_START_RULE:
459                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
460                         return -EINVAL;
461
462                 spin_unlock(&policy->pol_nrs->nrs_lock);
463                 rc = nrs_tbf_rule_start(policy, head, cmd);
464                 spin_lock(&policy->pol_nrs->nrs_lock);
465                 return rc;
466         case NRS_CTL_TBF_CHANGE_RULE:
467                 rc = nrs_tbf_rule_change(policy, head, cmd);
468                 return rc;
469         case NRS_CTL_TBF_STOP_RULE:
470                 rc = nrs_tbf_rule_stop(policy, head, cmd);
471                 /* Take it as a success, if not exists at all */
472                 return rc == -ENOENT ? 0 : rc;
473         default:
474                 return -EFAULT;
475         }
476 }
477
478 /**
479  * Binary heap predicate.
480  *
481  * \param[in] e1 the first binheap node to compare
482  * \param[in] e2 the second binheap node to compare
483  *
484  * \retval 0 e1 > e2
485  * \retval 1 e1 < e2
486  */
487 static int
488 tbf_cli_compare(struct binheap_node *e1, struct binheap_node *e2)
489 {
490         struct nrs_tbf_client *cli1;
491         struct nrs_tbf_client *cli2;
492
493         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
494         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
495
496         if (cli1->tc_deadline < cli2->tc_deadline)
497                 return 1;
498         else if (cli1->tc_deadline > cli2->tc_deadline)
499                 return 0;
500
501         if (cli1->tc_check_time < cli2->tc_check_time)
502                 return 1;
503         else if (cli1->tc_check_time > cli2->tc_check_time)
504                 return 0;
505
506         /* Maybe need more comparasion, e.g. request number in the rules */
507         return 1;
508 }
509
510 /**
511  * TBF binary heap operations
512  */
513 static struct binheap_ops nrs_tbf_heap_ops = {
514         .hop_enter      = NULL,
515         .hop_exit       = NULL,
516         .hop_compare    = tbf_cli_compare,
517 };
518
519 static unsigned int
520 nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
521                        const unsigned int bits)
522 {
523         return cfs_hash_djb2_hash(key, strlen(key), bits);
524 }
525
526 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
527 {
528         struct nrs_tbf_client *cli = hlist_entry(hnode,
529                                                      struct nrs_tbf_client,
530                                                      tc_hnode);
531
532         return (strcmp(cli->tc_jobid, key) == 0);
533 }
534
535 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
536 {
537         struct nrs_tbf_client *cli = hlist_entry(hnode,
538                                                      struct nrs_tbf_client,
539                                                      tc_hnode);
540
541         return cli->tc_jobid;
542 }
543
544 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
545 {
546         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
547 }
548
549 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
550 {
551         struct nrs_tbf_client *cli = hlist_entry(hnode,
552                                                      struct nrs_tbf_client,
553                                                      tc_hnode);
554
555         refcount_inc(&cli->tc_ref);
556 }
557
558 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
559 {
560         struct nrs_tbf_client *cli = hlist_entry(hnode,
561                                                      struct nrs_tbf_client,
562                                                      tc_hnode);
563
564         refcount_dec(&cli->tc_ref);
565 }
566
567 static void
568 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
569
570 {
571         struct nrs_tbf_client *cli = hlist_entry(hnode,
572                                                  struct nrs_tbf_client,
573                                                  tc_hnode);
574
575         nrs_tbf_cli_fini(cli);
576 }
577
578 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
579         .hs_hash        = nrs_tbf_jobid_hop_hash,
580         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
581         .hs_key         = nrs_tbf_jobid_hop_key,
582         .hs_object      = nrs_tbf_hop_object,
583         .hs_get         = nrs_tbf_jobid_hop_get,
584         .hs_put         = nrs_tbf_jobid_hop_put,
585         .hs_put_locked  = nrs_tbf_jobid_hop_put,
586         .hs_exit        = nrs_tbf_jobid_hop_exit,
587 };
588
589 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
590                                   CFS_HASH_NO_ITEMREF | \
591                                   CFS_HASH_DEPTH)
592
593 static struct nrs_tbf_client *
594 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
595                           struct cfs_hash_bd *bd,
596                           const char *jobid)
597 {
598         struct hlist_node *hnode;
599         struct nrs_tbf_client *cli;
600
601         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
602         if (hnode == NULL)
603                 return NULL;
604
605         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
606         if (!list_empty(&cli->tc_lru))
607                 list_del_init(&cli->tc_lru);
608         return cli;
609 }
610
611 #define NRS_TBF_JOBID_NULL ""
612
613 static struct nrs_tbf_client *
614 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
615                        struct ptlrpc_request *req)
616 {
617         const char              *jobid;
618         struct nrs_tbf_client   *cli;
619         struct cfs_hash         *hs = head->th_cli_hash;
620         struct cfs_hash_bd               bd;
621
622         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
623         if (jobid == NULL)
624                 jobid = NRS_TBF_JOBID_NULL;
625         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
626         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
627         cfs_hash_bd_unlock(hs, &bd, 1);
628
629         return cli;
630 }
631
632 static struct nrs_tbf_client *
633 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
634                           struct nrs_tbf_client *cli)
635 {
636         const char              *jobid;
637         struct nrs_tbf_client   *ret;
638         struct cfs_hash         *hs = head->th_cli_hash;
639         struct cfs_hash_bd               bd;
640
641         jobid = cli->tc_jobid;
642         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
643         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
644         if (ret == NULL) {
645                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
646                 ret = cli;
647         }
648         cfs_hash_bd_unlock(hs, &bd, 1);
649
650         return ret;
651 }
652
653 static void
654 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
655                       struct nrs_tbf_client *cli)
656 {
657         struct cfs_hash_bd               bd;
658         struct cfs_hash         *hs = head->th_cli_hash;
659         struct nrs_tbf_bucket   *bkt;
660         int                      hw;
661         LIST_HEAD(zombies);
662
663         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
664         bkt = cfs_hash_bd_extra_get(hs, &bd);
665         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
666                 return;
667         LASSERT(list_empty(&cli->tc_lru));
668         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
669
670         /*
671          * Check and purge the LRU, there is at least one client in the LRU.
672          */
673         hw = tbf_jobid_cache_size >>
674              (hs->hs_cur_bits - hs->hs_bkt_bits);
675         while (cfs_hash_bd_count_get(&bd) > hw) {
676                 if (unlikely(list_empty(&bkt->ntb_lru)))
677                         break;
678                 cli = list_first_entry(&bkt->ntb_lru,
679                                        struct nrs_tbf_client,
680                                        tc_lru);
681                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
682                 list_move(&cli->tc_lru, &zombies);
683         }
684         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
685
686         while (!list_empty(&zombies)) {
687                 cli = container_of(zombies.next,
688                                    struct nrs_tbf_client, tc_lru);
689                 list_del_init(&cli->tc_lru);
690                 nrs_tbf_cli_fini(cli);
691         }
692 }
693
694 static void
695 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
696                        struct ptlrpc_request *req)
697 {
698         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
699
700         if (jobid == NULL)
701                 jobid = NRS_TBF_JOBID_NULL;
702         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
703         INIT_LIST_HEAD(&cli->tc_lru);
704         memcpy(cli->tc_jobid, jobid, strlen(jobid));
705 }
706
707 static int nrs_tbf_jobid_hash_order(void)
708 {
709         int bits;
710
711         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
712                 ;
713
714         return bits;
715 }
716
717 #define NRS_TBF_JOBID_BKT_BITS 10
718
719 static int
720 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
721                       struct nrs_tbf_head *head)
722 {
723         struct nrs_tbf_cmd       start;
724         struct nrs_tbf_bucket   *bkt;
725         int                      bits;
726         int                      i;
727         int                      rc;
728         struct cfs_hash_bd       bd;
729
730         bits = nrs_tbf_jobid_hash_order();
731         if (bits < NRS_TBF_JOBID_BKT_BITS)
732                 bits = NRS_TBF_JOBID_BKT_BITS;
733         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
734                                             bits,
735                                             bits,
736                                             NRS_TBF_JOBID_BKT_BITS,
737                                             sizeof(*bkt),
738                                             0,
739                                             0,
740                                             &nrs_tbf_jobid_hash_ops,
741                                             NRS_TBF_JOBID_HASH_FLAGS);
742         if (head->th_cli_hash == NULL)
743                 return -ENOMEM;
744
745         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
746                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
747                 INIT_LIST_HEAD(&bkt->ntb_lru);
748         }
749
750         memset(&start, 0, sizeof(start));
751         start.u.tc_start.ts_jobids_str = "*";
752
753         start.u.tc_start.ts_rpc_rate = tbf_rate;
754         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
755         start.tc_name = NRS_TBF_DEFAULT_RULE;
756         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
757         rc = nrs_tbf_rule_start(policy, head, &start);
758         if (rc) {
759                 cfs_hash_putref(head->th_cli_hash);
760                 head->th_cli_hash = NULL;
761         }
762
763         return rc;
764 }
765
766 /**
767  * Frees jobid of \a list.
768  *
769  */
770 static void
771 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
772 {
773         struct nrs_tbf_jobid *jobid, *n;
774
775         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
776                 OBD_FREE_STR(jobid->tj_id);
777                 list_del(&jobid->tj_linkage);
778                 OBD_FREE_PTR(jobid);
779         }
780 }
781
782 static int
783 nrs_tbf_jobid_list_add(char *id, struct list_head *jobid_list)
784 {
785         struct nrs_tbf_jobid *jobid;
786         char *ptr;
787
788         OBD_ALLOC_PTR(jobid);
789         if (jobid == NULL)
790                 return -ENOMEM;
791
792         OBD_STRNDUP(jobid->tj_id, id, strlen(id));
793         if (jobid->tj_id == NULL) {
794                 OBD_FREE_PTR(jobid);
795                 return -ENOMEM;
796         }
797
798         ptr = strchr(id, '*');
799         if (ptr == NULL)
800                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
801         else
802                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
803
804         list_add_tail(&jobid->tj_linkage, jobid_list);
805         return 0;
806 }
807
808 static bool
809 cfs_match_wildcard(const char *pattern, const char *content)
810 {
811         if (*pattern == '\0' && *content == '\0')
812                 return true;
813
814         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
815                 return false;
816
817         while (*pattern == *content) {
818                 pattern++;
819                 content++;
820                 if (*pattern == '\0' && *content == '\0')
821                         return true;
822
823                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
824                     *content == '\0')
825                         return false;
826         }
827
828         if (*pattern == '*')
829                 return (cfs_match_wildcard(pattern + 1, content) ||
830                         cfs_match_wildcard(pattern, content + 1));
831
832         return false;
833 }
834
835 static inline bool
836 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
837 {
838         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
839                 return strcmp(jobid->tj_id, id) == 0;
840
841         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
842                 return cfs_match_wildcard(jobid->tj_id, id);
843
844         return false;
845 }
846
847 static int
848 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
849 {
850         struct nrs_tbf_jobid *jobid;
851
852         list_for_each_entry(jobid, jobid_list, tj_linkage) {
853                 if (nrs_tbf_jobid_match(jobid, id))
854                         return 1;
855         }
856         return 0;
857 }
858
859 static int
860 nrs_tbf_jobid_list_parse(char *orig, struct list_head *jobid_list)
861 {
862         char *str, *copy;
863         int rc = 0;
864         ENTRY;
865
866         copy = kstrdup(orig, GFP_KERNEL);
867         if (!copy)
868                 return -ENOMEM;
869         str = copy;
870         INIT_LIST_HEAD(jobid_list);
871         while (str && rc == 0) {
872                 char *tok = strsep(&str, " ");
873
874                 if (*tok)
875                         rc = nrs_tbf_jobid_list_add(tok, jobid_list);
876         }
877         if (list_empty(jobid_list))
878                 rc = -EINVAL;
879         if (rc)
880                 nrs_tbf_jobid_list_free(jobid_list);
881         kfree(copy);
882         RETURN(rc);
883 }
884
885 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
886 {
887         if (!list_empty(&cmd->u.tc_start.ts_jobids))
888                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
889         OBD_FREE_STR(cmd->u.tc_start.ts_jobids_str);
890 }
891
892 static int nrs_tbf_check_id_value(char **strp, char *key)
893 {
894         char *str = *strp;
895         char *tok;
896         int len;
897
898         tok = strim(strsep(&str, "="));
899         if (!*tok || !str)
900                 /* No LHS or no '=' */
901                 return -EINVAL;
902         str = strim(str);
903         len = strlen(str);
904         if (strcmp(tok, key) != 0 ||
905             str[0] != '{' || str[len-1] != '}')
906                 /* Wrong key, or RHS missing {} */
907                 return -EINVAL;
908
909         /* Skip '{' and '}' */
910         str[len-1] = '\0';
911         str += 1;
912         *strp = str;
913         return 0;
914 }
915
916 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
917 {
918         int rc;
919
920         rc = nrs_tbf_check_id_value(&id, "jobid");
921         if (rc)
922                 return rc;
923
924         OBD_STRNDUP(cmd->u.tc_start.ts_jobids_str, id, strlen(id));
925         if (cmd->u.tc_start.ts_jobids_str == NULL)
926                 return -ENOMEM;
927
928         /* parse jobid list */
929         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
930                                       &cmd->u.tc_start.ts_jobids);
931         if (rc)
932                 nrs_tbf_jobid_cmd_fini(cmd);
933
934         return rc;
935 }
936
937 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
938                                    struct nrs_tbf_rule *rule,
939                                    struct nrs_tbf_cmd *start)
940 {
941         int rc = 0;
942
943         LASSERT(start->u.tc_start.ts_jobids_str);
944         OBD_STRNDUP(rule->tr_jobids_str,
945                     start->u.tc_start.ts_jobids_str,
946                     strlen(start->u.tc_start.ts_jobids_str));
947         if (rule->tr_jobids_str == NULL)
948                 return -ENOMEM;
949
950         INIT_LIST_HEAD(&rule->tr_jobids);
951         if (!list_empty(&start->u.tc_start.ts_jobids)) {
952                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
953                                               &rule->tr_jobids);
954                 if (rc)
955                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
956         }
957         if (rc)
958                 OBD_FREE_STR(rule->tr_jobids_str);
959         return rc;
960 }
961
962 static int
963 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
964 {
965         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
966                    rule->tr_jobids_str, rule->tr_rpc_rate,
967                    kref_read(&rule->tr_ref) - 1);
968         return 0;
969 }
970
971 static int
972 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
973                          struct nrs_tbf_client *cli)
974 {
975         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
976 }
977
978 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
979 {
980         if (!list_empty(&rule->tr_jobids))
981                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
982         LASSERT(rule->tr_jobids_str != NULL);
983         OBD_FREE_STR(rule->tr_jobids_str);
984 }
985
986 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
987         .o_name = NRS_TBF_TYPE_JOBID,
988         .o_startup = nrs_tbf_jobid_startup,
989         .o_cli_find = nrs_tbf_jobid_cli_find,
990         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
991         .o_cli_put = nrs_tbf_jobid_cli_put,
992         .o_cli_init = nrs_tbf_jobid_cli_init,
993         .o_rule_init = nrs_tbf_jobid_rule_init,
994         .o_rule_dump = nrs_tbf_jobid_rule_dump,
995         .o_rule_match = nrs_tbf_jobid_rule_match,
996         .o_rule_fini = nrs_tbf_jobid_rule_fini,
997 };
998
999 /**
1000  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1001  *
1002  * This uses ptlrpc_request::rq_peer.nid (as nid4) as its key, in order to hash
1003  * nrs_tbf_client objects.
1004  */
1005 #define NRS_TBF_NID_BKT_BITS    8
1006 #define NRS_TBF_NID_BITS        16
1007
1008 static u32 nrs_tbf_nid_hashfn(const void *data, u32 len, u32 seed)
1009 {
1010         const struct lnet_nid *nid = data;
1011
1012         return cfs_hash_32(nidhash(nid) ^ seed, 32);
1013 }
1014
1015 static int nrs_tbf_nid_cmpfn(struct rhashtable_compare_arg *arg, const void *obj)
1016 {
1017         const struct nrs_tbf_client *cli = obj;
1018         const struct lnet_nid *nid = arg->key;
1019
1020         if (!refcount_read(&cli->tc_ref))
1021                 return -ENXIO;
1022
1023         return nid_same(nid, &cli->tc_nid) ? 0 : -ESRCH;
1024 }
1025
1026 static const struct rhashtable_params tbf_nid_hash_params = {
1027         .key_len        = sizeof(struct lnet_nid),
1028         .key_offset     = offsetof(struct nrs_tbf_client, tc_nid),
1029         .head_offset    = offsetof(struct nrs_tbf_client, tc_rhash),
1030         .hashfn         = nrs_tbf_nid_hashfn,
1031         .obj_cmpfn      = nrs_tbf_nid_cmpfn,
1032         .automatic_shrinking = true,
1033 };
1034
1035 static void nrs_tbf_nid_exit(void *vcli, void *data)
1036 {
1037         struct nrs_tbf_client *cli = vcli;
1038
1039         CDEBUG(D_RPCTRACE,
1040                "Busy TBF object from client with NID %s, with %d refs\n",
1041                libcfs_nidstr(&cli->tc_nid), refcount_read(&cli->tc_ref));
1042
1043         nrs_tbf_cli_fini(cli);
1044 }
1045
1046 static struct nrs_tbf_client *
1047 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1048                      struct ptlrpc_request *req)
1049 {
1050         struct nrs_tbf_client *cli;
1051
1052         rcu_read_lock();
1053         cli = rhashtable_lookup(&head->th_cli_rhash, &req->rq_peer.nid,
1054                                 tbf_nid_hash_params);
1055         if (cli && !refcount_inc_not_zero(&cli->tc_ref))
1056                 cli = NULL;
1057         rcu_read_unlock();
1058
1059         return cli;
1060 }
1061
1062 static struct nrs_tbf_client *
1063 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1064                         struct nrs_tbf_client *cli)
1065 {
1066         struct nrs_tbf_client *cli2 = NULL;
1067
1068         rcu_read_lock();
1069 try_again:
1070         cli2 = rhashtable_lookup_get_insert_fast(&head->th_cli_rhash,
1071                                                  &cli->tc_rhash,
1072                                                  tbf_nid_hash_params);
1073         if (cli2) {
1074                 /* Insertion failed. */
1075                 if (IS_ERR(cli2)) {
1076                         /* hash table could be resizing. */
1077                         if (PTR_ERR(cli2) == -ENOMEM ||
1078                             PTR_ERR(cli2) == -EBUSY) {
1079                                 rcu_read_unlock();
1080                                 msleep(20);
1081                                 rcu_read_lock();
1082                                 goto try_again;
1083                         }
1084                         /* return ERR_PTR */
1085                 } else {
1086                         /* lost race. Use new cli2 */
1087                         if (!refcount_inc_not_zero(&cli2->tc_ref))
1088                                 goto try_again;
1089                 }
1090         } else {
1091                 /* New cli has been inserted */
1092                 cli2 = cli;
1093         }
1094         if (!IS_ERR(cli2))
1095                 cli2->tc_id.ti_type = head->th_type_flag;
1096         rcu_read_unlock();
1097
1098         return cli2;
1099 }
1100
1101 static void
1102 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1103                       struct nrs_tbf_client *cli)
1104 {
1105         if (!refcount_dec_and_test(&cli->tc_ref))
1106                 return;
1107
1108         rhashtable_remove_fast(&head->th_cli_rhash,
1109                                &cli->tc_rhash,
1110                                tbf_nid_hash_params);
1111         nrs_tbf_cli_fini(cli);
1112 }
1113
1114 static int
1115 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1116                     struct nrs_tbf_head *head)
1117 {
1118         struct nrs_tbf_cmd start;
1119         int rc;
1120
1121         rc = rhashtable_init(&head->th_cli_rhash, &tbf_nid_hash_params);
1122         if (rc < 0)
1123                 return rc;
1124
1125         memset(&start, 0, sizeof(start));
1126         start.u.tc_start.ts_nids_str = "*";
1127
1128         start.u.tc_start.ts_rpc_rate = tbf_rate;
1129         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1130         start.tc_name = NRS_TBF_DEFAULT_RULE;
1131         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1132         rc = nrs_tbf_rule_start(policy, head, &start);
1133         if (rc < 0)
1134                 rhashtable_free_and_destroy(&head->th_cli_rhash,
1135                                             nrs_tbf_nid_exit, NULL);
1136
1137         return rc;
1138 }
1139
1140 static void
1141 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1142                              struct ptlrpc_request *req)
1143 {
1144         cli->tc_nid = req->rq_peer.nid;
1145 }
1146
1147 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1148                                  struct nrs_tbf_rule *rule,
1149                                  struct nrs_tbf_cmd *start)
1150 {
1151         size_t len = strlen(start->u.tc_start.ts_nids_str);
1152
1153         LASSERT(start->u.tc_start.ts_nids_str);
1154
1155         OBD_STRNDUP(rule->tr_nids_str, start->u.tc_start.ts_nids_str, len);
1156         if (!rule->tr_nids_str)
1157                 return -ENOMEM;
1158
1159         INIT_LIST_HEAD(&rule->tr_nids);
1160         if (!list_empty(&start->u.tc_start.ts_nids)) {
1161                 if (cfs_parse_nidlist(rule->tr_nids_str, len, &rule->tr_nids)) {
1162                         CERROR("nids {%s} illegal\n",
1163                                rule->tr_nids_str);
1164                         OBD_FREE_STR(rule->tr_nids_str);
1165                         return -EINVAL;
1166                 }
1167         }
1168         return 0;
1169 }
1170
1171 static int
1172 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1173 {
1174         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1175                    rule->tr_nids_str, rule->tr_rpc_rate,
1176                    kref_read(&rule->tr_ref) - 1);
1177         return 0;
1178 }
1179
1180 static int
1181 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1182                        struct nrs_tbf_client *cli)
1183 {
1184         return cfs_match_nid(&cli->tc_nid, &rule->tr_nids);
1185 }
1186
1187 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1188 {
1189         if (!list_empty(&rule->tr_nids))
1190                 cfs_free_nidlist(&rule->tr_nids);
1191         LASSERT(rule->tr_nids_str != NULL);
1192         OBD_FREE_STR(rule->tr_nids_str);
1193 }
1194
1195 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1196 {
1197         if (!list_empty(&cmd->u.tc_start.ts_nids))
1198                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1199         OBD_FREE_STR(cmd->u.tc_start.ts_nids_str);
1200 }
1201
1202 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1203 {
1204         int rc;
1205         size_t len;
1206
1207         rc = nrs_tbf_check_id_value(&id, "nid");
1208         if (rc)
1209                 return rc;
1210
1211         len = strlen(id);
1212
1213         OBD_STRNDUP(cmd->u.tc_start.ts_nids_str, id, len);
1214         if (!cmd->u.tc_start.ts_nids_str)
1215                 return -ENOMEM;
1216
1217         /* parse NID list */
1218         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str, len,
1219                               &cmd->u.tc_start.ts_nids)) {
1220                 nrs_tbf_nid_cmd_fini(cmd);
1221                 return -EINVAL;
1222         }
1223
1224         return 0;
1225 }
1226
1227 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1228         .o_name         = NRS_TBF_TYPE_NID,
1229         .o_startup      = nrs_tbf_nid_startup,
1230         .o_cli_find     = nrs_tbf_nid_cli_find,
1231         .o_cli_findadd  = nrs_tbf_nid_cli_findadd,
1232         .o_cli_put      = nrs_tbf_nid_cli_put,
1233         .o_cli_init     = nrs_tbf_nid_cli_init,
1234         .o_rule_init    = nrs_tbf_nid_rule_init,
1235         .o_rule_dump    = nrs_tbf_nid_rule_dump,
1236         .o_rule_match   = nrs_tbf_nid_rule_match,
1237         .o_rule_fini    = nrs_tbf_nid_rule_fini,
1238 };
1239
1240 static unsigned int
1241 nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1242                  const unsigned int bits)
1243 {
1244         return cfs_hash_djb2_hash(key, sizeof(struct nrs_tbf_key), bits);
1245 }
1246
1247 static int nrs_tbf_hop_keycmp(const void *data, struct hlist_node *hnode)
1248 {
1249         struct nrs_tbf_key *key = (struct nrs_tbf_key *)data;
1250         struct nrs_tbf_client *cli = hlist_entry(hnode,
1251                                                  struct nrs_tbf_client,
1252                                                  tc_hnode);
1253
1254         return nid_same(&cli->tc_nid, &key->tk_nid) &&
1255                cli->tc_opcode == key->tk_opcode &&
1256                cli->tc_id.ti_uid == key->tk_id.ti_uid &&
1257                cli->tc_id.ti_gid == key->tk_id.ti_gid &&
1258                strcmp(cli->tc_jobid, key->tk_jobid) == 0;
1259 }
1260
1261 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1262 {
1263         struct nrs_tbf_client *cli = hlist_entry(hnode,
1264                                                  struct nrs_tbf_client,
1265                                                  tc_hnode);
1266         return &cli->tc_key;
1267 }
1268
1269 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1270 {
1271         struct nrs_tbf_client *cli = hlist_entry(hnode,
1272                                                  struct nrs_tbf_client,
1273                                                  tc_hnode);
1274
1275         refcount_inc(&cli->tc_ref);
1276 }
1277
1278 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1279 {
1280         struct nrs_tbf_client *cli = hlist_entry(hnode,
1281                                                  struct nrs_tbf_client,
1282                                                  tc_hnode);
1283
1284         refcount_dec(&cli->tc_ref);
1285 }
1286
1287 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1288
1289 {
1290         struct nrs_tbf_client *cli = hlist_entry(hnode,
1291                                                  struct nrs_tbf_client,
1292                                                  tc_hnode);
1293
1294         nrs_tbf_cli_fini(cli);
1295 }
1296
1297 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1298         .hs_hash        = nrs_tbf_hop_hash,
1299         .hs_keycmp      = nrs_tbf_hop_keycmp,
1300         .hs_key         = nrs_tbf_hop_key,
1301         .hs_object      = nrs_tbf_hop_object,
1302         .hs_get         = nrs_tbf_hop_get,
1303         .hs_put         = nrs_tbf_hop_put,
1304         .hs_put_locked  = nrs_tbf_hop_put,
1305         .hs_exit        = nrs_tbf_hop_exit,
1306 };
1307
1308 #define NRS_TBF_GENERIC_BKT_BITS        10
1309 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1310                                         CFS_HASH_NO_ITEMREF | \
1311                                         CFS_HASH_DEPTH)
1312
1313 static int
1314 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1315 {
1316         struct nrs_tbf_cmd       start;
1317         struct nrs_tbf_bucket   *bkt;
1318         int                      bits;
1319         int                      i;
1320         int                      rc;
1321         struct cfs_hash_bd       bd;
1322
1323         bits = nrs_tbf_jobid_hash_order();
1324         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1325                 bits = NRS_TBF_GENERIC_BKT_BITS;
1326         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1327                                             bits, bits,
1328                                             NRS_TBF_GENERIC_BKT_BITS,
1329                                             sizeof(*bkt), 0, 0,
1330                                             &nrs_tbf_hash_ops,
1331                                             NRS_TBF_GENERIC_HASH_FLAGS);
1332         if (head->th_cli_hash == NULL)
1333                 return -ENOMEM;
1334
1335         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1336                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1337                 INIT_LIST_HEAD(&bkt->ntb_lru);
1338         }
1339
1340         memset(&start, 0, sizeof(start));
1341         start.u.tc_start.ts_conds_str = "*";
1342
1343         start.u.tc_start.ts_rpc_rate = tbf_rate;
1344         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1345         start.tc_name = NRS_TBF_DEFAULT_RULE;
1346         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1347         rc = nrs_tbf_rule_start(policy, head, &start);
1348         if (rc)
1349                 cfs_hash_putref(head->th_cli_hash);
1350
1351         return rc;
1352 }
1353
1354 static struct nrs_tbf_client *
1355 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1356                         struct nrs_tbf_key *key)
1357 {
1358         struct hlist_node *hnode;
1359         struct nrs_tbf_client *cli;
1360
1361         hnode = cfs_hash_bd_lookup_locked(hs, bd, key);
1362         if (hnode == NULL)
1363                 return NULL;
1364
1365         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
1366         if (!list_empty(&cli->tc_lru))
1367                 list_del_init(&cli->tc_lru);
1368         return cli;
1369 }
1370
1371 /**
1372  * ONLY opcode presented in this function will be checked in
1373  * nrs_tbf_id_cli_set(). That means, we can add or remove an
1374  * opcode to enable or disable requests handled in nrs_tbf
1375  */
1376 static struct req_format *req_fmt(__u32 opcode)
1377 {
1378         switch (opcode) {
1379         case OST_GETATTR:
1380                 return &RQF_OST_GETATTR;
1381         case OST_SETATTR:
1382                 return &RQF_OST_SETATTR;
1383         case OST_READ:
1384                 return &RQF_OST_BRW_READ;
1385         case OST_WRITE:
1386                 return &RQF_OST_BRW_WRITE;
1387         /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1388          * in most case. Should they be removed? */
1389         case OST_CREATE:
1390                 return &RQF_OST_CREATE;
1391         case OST_DESTROY:
1392                 return &RQF_OST_DESTROY;
1393         case OST_PUNCH:
1394                 return &RQF_OST_PUNCH;
1395         case OST_SYNC:
1396                 return &RQF_OST_SYNC;
1397         case OST_LADVISE:
1398                 return &RQF_OST_LADVISE;
1399         case MDS_GETATTR:
1400                 return &RQF_MDS_GETATTR;
1401         case MDS_GETATTR_NAME:
1402                 return &RQF_MDS_GETATTR_NAME;
1403         /* close is skipped to avoid LDLM cancel slowness */
1404 #if 0
1405         case MDS_CLOSE:
1406                 return &RQF_MDS_CLOSE;
1407 #endif
1408         case MDS_REINT:
1409                 return &RQF_MDS_REINT;
1410         case MDS_READPAGE:
1411                 return &RQF_MDS_READPAGE;
1412         case MDS_GET_ROOT:
1413                 return &RQF_MDS_GET_ROOT;
1414         case MDS_STATFS:
1415                 return &RQF_MDS_STATFS;
1416         case MDS_SYNC:
1417                 return &RQF_MDS_SYNC;
1418         case MDS_QUOTACTL:
1419                 return &RQF_MDS_QUOTACTL;
1420         case MDS_GETXATTR:
1421                 return &RQF_MDS_GETXATTR;
1422         case MDS_GET_INFO:
1423                 return &RQF_MDS_GET_INFO;
1424         /* HSM op is skipped */
1425 #if 0 
1426         case MDS_HSM_STATE_GET:
1427                 return &RQF_MDS_HSM_STATE_GET;
1428         case MDS_HSM_STATE_SET:
1429                 return &RQF_MDS_HSM_STATE_SET;
1430         case MDS_HSM_ACTION:
1431                 return &RQF_MDS_HSM_ACTION;
1432         case MDS_HSM_CT_REGISTER:
1433                 return &RQF_MDS_HSM_CT_REGISTER;
1434         case MDS_HSM_CT_UNREGISTER:
1435                 return &RQF_MDS_HSM_CT_UNREGISTER;
1436 #endif
1437         case MDS_SWAP_LAYOUTS:
1438                 return &RQF_MDS_SWAP_LAYOUTS;
1439         case LDLM_ENQUEUE:
1440                 return &RQF_LDLM_ENQUEUE;
1441         default:
1442                 return NULL;
1443         }
1444 }
1445
1446 static struct req_format *intent_req_fmt(__u32 it_opc)
1447 {
1448         if (it_opc & (IT_OPEN | IT_CREAT))
1449                 return &RQF_LDLM_INTENT_OPEN;
1450         else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1451                 return &RQF_LDLM_INTENT_GETATTR;
1452         else if (it_opc & IT_GETXATTR)
1453                 return &RQF_LDLM_INTENT_GETXATTR;
1454         else if (it_opc & (IT_GLIMPSE | IT_BRW))
1455                 return &RQF_LDLM_INTENT;
1456         else
1457                 return NULL;
1458 }
1459
1460 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1461                               struct tbf_id *id)
1462 {
1463         struct ost_body *body;
1464
1465         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1466         if (body != NULL) {
1467                 id->ti_uid = body->oa.o_uid;
1468                 id->ti_gid = body->oa.o_gid;
1469                 return 0;
1470         }
1471
1472         return -EINVAL;
1473 }
1474
1475 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1476                                       struct tbf_id *id)
1477 {
1478         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1479                                                     &RMF_MDT_BODY);
1480         LASSERT(b != NULL);
1481
1482         /* TODO: nodemaping feature converts {ug}id from individual
1483          * clients to the actual ones of the file system. Some work
1484          * may be needed to fix this. */
1485         id->ti_uid = b->mbo_uid;
1486         id->ti_gid = b->mbo_gid;
1487 }
1488
1489 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1490                                            struct tbf_id *id)
1491 {
1492         struct mdt_rec_reint *rec;
1493
1494         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1495         LASSERT(rec != NULL);
1496
1497         /* use the fs{ug}id as {ug}id of the process */
1498         id->ti_uid = rec->rr_fsuid;
1499         id->ti_gid = rec->rr_fsgid;
1500 }
1501
1502 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1503                               struct tbf_id *id)
1504 {
1505         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1506         int rc = 0;
1507
1508         switch (opc) {
1509         case MDS_GETATTR:
1510         case MDS_GETATTR_NAME:
1511         case MDS_GET_ROOT:
1512         case MDS_READPAGE:
1513         case MDS_SYNC:
1514         case MDS_GETXATTR:
1515         case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1516                 unpack_ugid_from_mdt_body(req, id);
1517                 break;
1518         case MDS_CLOSE:
1519         case MDS_REINT:
1520                 unpack_ugid_from_mdt_rec_reint(req, id);
1521                 break;
1522         default:
1523                 rc = -EINVAL;
1524                 break;
1525         }
1526         return rc;
1527 }
1528
1529 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1530                               struct tbf_id *id)
1531 {
1532         struct ldlm_intent *lit;
1533         struct req_format *fmt;
1534
1535         if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1536                 return -EINVAL;
1537
1538         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1539         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1540         if (lit == NULL)
1541                 return -EINVAL;
1542
1543         fmt = intent_req_fmt(lit->opc);
1544         if (fmt == NULL)
1545                 return -EINVAL;
1546
1547         req_capsule_extend(&req->rq_pill, fmt);
1548
1549         if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1550                 unpack_ugid_from_mdt_body(req, id);
1551         else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1552                 unpack_ugid_from_mdt_rec_reint(req, id);
1553         else
1554                 return -EINVAL;
1555         return 0;
1556 }
1557
1558 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1559                               enum nrs_tbf_flag ti_type)
1560 {
1561         u32 opc;
1562         struct req_format *fmt;
1563         const struct req_format *old_fmt;
1564         int rc;
1565
1566         memset(id, 0, sizeof(struct tbf_id));
1567         id->ti_type = ti_type;
1568
1569         rc = lustre_msg_get_uid_gid(req->rq_reqmsg, &id->ti_uid, &id->ti_gid);
1570         if (!rc && id->ti_uid != (u32) -1 && id->ti_gid != (u32) -1)
1571                 return 0;
1572
1573         /* client req doesn't have uid/gid pack in ptlrpc_body
1574          * --> fallback to the old method
1575          */
1576         opc = lustre_msg_get_opc(req->rq_reqmsg);
1577         fmt = req_fmt(opc);
1578         if (fmt == NULL)
1579                 return -EINVAL;
1580
1581         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1582         old_fmt = req->rq_pill.rc_fmt;
1583         if (old_fmt == NULL)
1584                 req_capsule_set(&req->rq_pill, fmt);
1585
1586         if (opc < OST_LAST_OPC)
1587                 rc = ost_tbf_id_cli_set(req, id);
1588         else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1589                 rc = mdt_tbf_id_cli_set(req, id);
1590         else if (opc == LDLM_ENQUEUE)
1591                 rc = ldlm_tbf_id_cli_set(req, id);
1592         else
1593                 rc = -EINVAL;
1594
1595         /* restore it to the original state */
1596         if (req->rq_pill.rc_fmt != old_fmt)
1597                 req->rq_pill.rc_fmt = old_fmt;
1598         return rc;
1599 }
1600
1601 static inline void nrs_tbf_cli_gen_key(struct ptlrpc_request *req,
1602                                        struct nrs_tbf_key *key)
1603 {
1604         const char *jobid;
1605
1606         key->tk_nid = req->rq_peer.nid;
1607         key->tk_opcode = lustre_msg_get_opc(req->rq_reqmsg);
1608         nrs_tbf_id_cli_set(req, &key->tk_id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1609
1610         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1611         if (jobid == NULL)
1612                 jobid = NRS_TBF_JOBID_NULL;
1613         strscpy(key->tk_jobid, jobid, sizeof(key->tk_jobid));
1614 }
1615
1616 static struct nrs_tbf_client *
1617 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1618 {
1619         struct nrs_tbf_client *cli;
1620         struct cfs_hash *hs = head->th_cli_hash;
1621         struct cfs_hash_bd bd;
1622         struct nrs_tbf_key key;
1623
1624         memset(&key, 0, sizeof(key));
1625         nrs_tbf_cli_gen_key(req, &key);
1626         cfs_hash_bd_get_and_lock(hs, &key, &bd, 1);
1627         cli = nrs_tbf_cli_hash_lookup(hs, &bd, &key);
1628         cfs_hash_bd_unlock(hs, &bd, 1);
1629
1630         return cli;
1631 }
1632
1633 static struct nrs_tbf_client *
1634 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1635                     struct nrs_tbf_client *cli)
1636 {
1637         struct nrs_tbf_key      *key;
1638         struct nrs_tbf_client   *ret;
1639         struct cfs_hash         *hs = head->th_cli_hash;
1640         struct cfs_hash_bd       bd;
1641
1642         key = &cli->tc_key;
1643         cfs_hash_bd_get_and_lock(hs, key, &bd, 1);
1644         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1645         if (ret == NULL) {
1646                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1647                 ret = cli;
1648         }
1649         cfs_hash_bd_unlock(hs, &bd, 1);
1650
1651         return ret;
1652 }
1653
1654 static void
1655 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1656 {
1657         struct cfs_hash_bd       bd;
1658         struct cfs_hash         *hs = head->th_cli_hash;
1659         struct nrs_tbf_bucket   *bkt;
1660         int                      hw;
1661         LIST_HEAD(zombies);
1662
1663         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1664         bkt = cfs_hash_bd_extra_get(hs, &bd);
1665         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1666                 return;
1667         LASSERT(list_empty(&cli->tc_lru));
1668         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1669
1670         /**
1671          * Check and purge the LRU, there is at least one client in the LRU.
1672          */
1673         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1674         while (cfs_hash_bd_count_get(&bd) > hw) {
1675                 if (unlikely(list_empty(&bkt->ntb_lru)))
1676                         break;
1677                 cli = list_first_entry(&bkt->ntb_lru,
1678                                        struct nrs_tbf_client,
1679                                        tc_lru);
1680                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1681                 list_move(&cli->tc_lru, &zombies);
1682         }
1683         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1684
1685         while (!list_empty(&zombies)) {
1686                 cli = container_of(zombies.next,
1687                                    struct nrs_tbf_client, tc_lru);
1688                 list_del_init(&cli->tc_lru);
1689                 nrs_tbf_cli_fini(cli);
1690         }
1691 }
1692
1693 static void
1694 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1695                          struct ptlrpc_request *req)
1696 {
1697         nrs_tbf_cli_gen_key(req, &cli->tc_key);
1698         INIT_LIST_HEAD(&cli->tc_lru);
1699 }
1700
1701 static void
1702 nrs_tbf_id_list_free(struct list_head *uid_list)
1703 {
1704         struct nrs_tbf_id *nti_id, *n;
1705
1706         list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1707                 list_del_init(&nti_id->nti_linkage);
1708                 OBD_FREE_PTR(nti_id);
1709         }
1710 }
1711
1712 static void
1713 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1714 {
1715         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1716                 expr->te_field < NRS_TBF_FIELD_MAX);
1717         switch (expr->te_field) {
1718         case NRS_TBF_FIELD_NID:
1719                 cfs_free_nidlist(&expr->te_cond);
1720                 break;
1721         case NRS_TBF_FIELD_JOBID:
1722                 nrs_tbf_jobid_list_free(&expr->te_cond);
1723                 break;
1724         case NRS_TBF_FIELD_OPCODE:
1725                 bitmap_free(expr->te_opcodes);
1726                 break;
1727         case NRS_TBF_FIELD_UID:
1728         case NRS_TBF_FIELD_GID:
1729                 nrs_tbf_id_list_free(&expr->te_cond);
1730                 break;
1731         default:
1732                 LBUG();
1733         }
1734         OBD_FREE_PTR(expr);
1735 }
1736
1737 static void
1738 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1739 {
1740         struct nrs_tbf_expression *expression;
1741         struct nrs_tbf_expression *n;
1742
1743         LASSERT(list_empty(&conjunction->tc_linkage));
1744         list_for_each_entry_safe(expression, n,
1745                                  &conjunction->tc_expressions,
1746                                  te_linkage) {
1747                 list_del_init(&expression->te_linkage);
1748                 nrs_tbf_expression_free(expression);
1749         }
1750         OBD_FREE_PTR(conjunction);
1751 }
1752
1753 static void
1754 nrs_tbf_conds_free(struct list_head *cond_list)
1755 {
1756         struct nrs_tbf_conjunction *conjunction;
1757         struct nrs_tbf_conjunction *n;
1758
1759         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1760                 list_del_init(&conjunction->tc_linkage);
1761                 nrs_tbf_conjunction_free(conjunction);
1762         }
1763 }
1764
1765 static void
1766 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1767 {
1768         if (!list_empty(&cmd->u.tc_start.ts_conds))
1769                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1770         OBD_FREE_STR(cmd->u.tc_start.ts_conds_str);
1771 }
1772
1773 #define NRS_TBF_DISJUNCTION_DELIM       (",")
1774 #define NRS_TBF_CONJUNCTION_DELIM       ("&")
1775 #define NRS_TBF_EXPRESSION_DELIM        ("=")
1776
1777 static int
1778 nrs_tbf_opcode_list_parse(char *str, unsigned long **bitmaptr);
1779 static int
1780 nrs_tbf_id_list_parse(char *str, struct list_head *id_list,
1781                       enum nrs_tbf_flag tif);
1782
1783 static int
1784 nrs_tbf_expression_parse(char *str, struct list_head *cond_list)
1785 {
1786         struct nrs_tbf_expression *expr;
1787         char *field;
1788         int rc = 0;
1789         int len;
1790
1791         OBD_ALLOC_PTR(expr);
1792         if (expr == NULL)
1793                 return -ENOMEM;
1794
1795         field = strim(strsep(&str, NRS_TBF_EXPRESSION_DELIM));
1796         if (!*field || !str)
1797                 /* No LHS or no '=' sign */
1798                 GOTO(out, rc = -EINVAL);
1799         str = strim(str);
1800         len = strlen(str);
1801         if (len < 2 || str[0] != '{' || str[len-1] != '}')
1802                 /* No {} around RHS */
1803                 GOTO(out, rc = -EINVAL);
1804
1805         /* Skip '{' and '}' */
1806         str[len-1] = '\0';
1807         str += 1;
1808         len -= 2;
1809
1810         if (strcmp(field, "nid") == 0) {
1811                 if (cfs_parse_nidlist(str, len, &expr->te_cond) < 0)
1812                         GOTO(out, rc = -EINVAL);
1813                 expr->te_field = NRS_TBF_FIELD_NID;
1814         } else if (strcmp(field, "jobid") == 0) {
1815                 if (nrs_tbf_jobid_list_parse(str, &expr->te_cond) < 0)
1816                         GOTO(out, rc = -EINVAL);
1817                 expr->te_field = NRS_TBF_FIELD_JOBID;
1818         } else if (strcmp(field, "opcode") == 0) {
1819                 if (nrs_tbf_opcode_list_parse(str, &expr->te_opcodes) < 0)
1820                         GOTO(out, rc = -EINVAL);
1821                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1822         } else if (strcmp(field, "uid") == 0) {
1823                 if (nrs_tbf_id_list_parse(str, &expr->te_cond,
1824                                           NRS_TBF_FLAG_UID) < 0)
1825                         GOTO(out, rc = -EINVAL);
1826                 expr->te_field = NRS_TBF_FIELD_UID;
1827         } else if (strcmp(field, "gid") == 0) {
1828                 if (nrs_tbf_id_list_parse(str, &expr->te_cond,
1829                                           NRS_TBF_FLAG_GID) < 0)
1830                         GOTO(out, rc = -EINVAL);
1831                 expr->te_field = NRS_TBF_FIELD_GID;
1832         } else {
1833                 GOTO(out, rc = -EINVAL);
1834         }
1835
1836         list_add_tail(&expr->te_linkage, cond_list);
1837         return 0;
1838 out:
1839         OBD_FREE_PTR(expr);
1840         return rc;
1841 }
1842
1843 static int
1844 nrs_tbf_conjunction_parse(char *str, struct list_head *cond_list)
1845 {
1846         struct nrs_tbf_conjunction *conjunction;
1847         int rc = 0;
1848
1849         OBD_ALLOC_PTR(conjunction);
1850         if (conjunction == NULL)
1851                 return -ENOMEM;
1852
1853         INIT_LIST_HEAD(&conjunction->tc_expressions);
1854         list_add_tail(&conjunction->tc_linkage, cond_list);
1855
1856         while (str && !rc) {
1857                 char *expr = strsep(&str, NRS_TBF_CONJUNCTION_DELIM);
1858
1859                 rc = nrs_tbf_expression_parse(expr,
1860                                               &conjunction->tc_expressions);
1861         }
1862         return rc;
1863 }
1864
1865 static int
1866 nrs_tbf_conds_parse(char *orig, struct list_head *cond_list)
1867 {
1868         char *str;
1869         int rc = 0;
1870
1871         orig = kstrdup(orig, GFP_KERNEL);
1872         if (!orig)
1873                 return -ENOMEM;
1874         str = orig;
1875
1876         INIT_LIST_HEAD(cond_list);
1877         while (str && !rc) {
1878                 char *term = strsep(&str, NRS_TBF_DISJUNCTION_DELIM);
1879
1880                 rc = nrs_tbf_conjunction_parse(term, cond_list);
1881         }
1882         kfree(orig);
1883
1884         return rc;
1885 }
1886
1887 static int
1888 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1889 {
1890         int rc;
1891
1892         OBD_STRNDUP(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1893         if (cmd->u.tc_start.ts_conds_str == NULL)
1894                 return -ENOMEM;
1895
1896         /* Parse hybird NID and JOBID conditions */
1897         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1898                                  &cmd->u.tc_start.ts_conds);
1899         if (rc)
1900                 nrs_tbf_generic_cmd_fini(cmd);
1901
1902         return rc;
1903 }
1904
1905 static int
1906 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1907
1908 static int
1909 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1910                          struct nrs_tbf_rule *rule,
1911                          struct nrs_tbf_client *cli)
1912 {
1913         switch (expr->te_field) {
1914         case NRS_TBF_FIELD_NID:
1915                 return cfs_match_nid(&cli->tc_nid, &expr->te_cond);
1916         case NRS_TBF_FIELD_JOBID:
1917                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1918         case NRS_TBF_FIELD_OPCODE:
1919                 return test_bit(cli->tc_opcode, expr->te_opcodes);
1920         case NRS_TBF_FIELD_UID:
1921         case NRS_TBF_FIELD_GID:
1922                 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1923         default:
1924                 return 0;
1925         }
1926 }
1927
1928 static int
1929 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1930                           struct nrs_tbf_rule *rule,
1931                           struct nrs_tbf_client *cli)
1932 {
1933         struct nrs_tbf_expression *expr;
1934         int matched;
1935
1936         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
1937                 matched = nrs_tbf_expression_match(expr, rule, cli);
1938                 if (!matched)
1939                         return 0;
1940         }
1941
1942         return 1;
1943 }
1944
1945 static int
1946 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
1947 {
1948         struct nrs_tbf_conjunction *conjunction;
1949         int matched;
1950
1951         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
1952                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
1953                 if (matched)
1954                         return 1;
1955         }
1956
1957         return 0;
1958 }
1959
1960 static void
1961 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
1962 {
1963         if (!list_empty(&rule->tr_conds))
1964                 nrs_tbf_conds_free(&rule->tr_conds);
1965         LASSERT(rule->tr_conds_str != NULL);
1966         OBD_FREE_STR(rule->tr_conds_str);
1967 }
1968
1969 static int
1970 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
1971                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
1972 {
1973         int rc = 0;
1974
1975         LASSERT(start->u.tc_start.ts_conds_str);
1976         OBD_STRNDUP(rule->tr_conds_str,
1977                     start->u.tc_start.ts_conds_str,
1978                     strlen(start->u.tc_start.ts_conds_str));
1979         if (rule->tr_conds_str == NULL)
1980                 return -ENOMEM;
1981
1982         INIT_LIST_HEAD(&rule->tr_conds);
1983         if (!list_empty(&start->u.tc_start.ts_conds)) {
1984                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
1985                                          &rule->tr_conds);
1986         }
1987         if (rc)
1988                 nrs_tbf_generic_rule_fini(rule);
1989
1990         return rc;
1991 }
1992
1993 static int
1994 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1995 {
1996         seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
1997                    rule->tr_conds_str, rule->tr_rpc_rate,
1998                    kref_read(&rule->tr_ref) - 1);
1999         return 0;
2000 }
2001
2002 static int
2003 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2004                            struct nrs_tbf_client *cli)
2005 {
2006         return nrs_tbf_cond_match(rule, cli);
2007 }
2008
2009 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2010         .o_name = NRS_TBF_TYPE_GENERIC,
2011         .o_startup = nrs_tbf_startup,
2012         .o_cli_find = nrs_tbf_cli_find,
2013         .o_cli_findadd = nrs_tbf_cli_findadd,
2014         .o_cli_put = nrs_tbf_cli_put,
2015         .o_cli_init = nrs_tbf_generic_cli_init,
2016         .o_rule_init = nrs_tbf_rule_init,
2017         .o_rule_dump = nrs_tbf_generic_rule_dump,
2018         .o_rule_match = nrs_tbf_generic_rule_match,
2019         .o_rule_fini = nrs_tbf_generic_rule_fini,
2020 };
2021
2022 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2023 {
2024         if (rule->tr_opcodes)
2025                 bitmap_free(rule->tr_opcodes);
2026
2027         LASSERT(rule->tr_opcodes_str != NULL);
2028         OBD_FREE_STR(rule->tr_opcodes_str);
2029 }
2030
2031 static unsigned int
2032 nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2033                         const unsigned int bits)
2034 {
2035         /* XXX did hash needs ? */
2036         return cfs_hash_djb2_hash(key, sizeof(__u32), bits);
2037 }
2038
2039 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2040 {
2041         const __u32     *opc = key;
2042         struct nrs_tbf_client *cli = hlist_entry(hnode,
2043                                                  struct nrs_tbf_client,
2044                                                  tc_hnode);
2045
2046         return *opc == cli->tc_opcode;
2047 }
2048
2049 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2050 {
2051         struct nrs_tbf_client *cli = hlist_entry(hnode,
2052                                                  struct nrs_tbf_client,
2053                                                  tc_hnode);
2054
2055         return &cli->tc_opcode;
2056 }
2057
2058 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2059                                    struct hlist_node *hnode)
2060 {
2061         struct nrs_tbf_client *cli = hlist_entry(hnode,
2062                                                  struct nrs_tbf_client,
2063                                                  tc_hnode);
2064
2065         refcount_inc(&cli->tc_ref);
2066 }
2067
2068 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2069                                    struct hlist_node *hnode)
2070 {
2071         struct nrs_tbf_client *cli = hlist_entry(hnode,
2072                                                  struct nrs_tbf_client,
2073                                                  tc_hnode);
2074
2075         refcount_dec(&cli->tc_ref);
2076 }
2077
2078 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2079                                     struct hlist_node *hnode)
2080 {
2081         struct nrs_tbf_client *cli = hlist_entry(hnode,
2082                                                  struct nrs_tbf_client,
2083                                                  tc_hnode);
2084
2085         CDEBUG(D_RPCTRACE,
2086                "Busy TBF object from client with opcode %s, with %d refs\n",
2087                ll_opcode2str(cli->tc_opcode), refcount_read(&cli->tc_ref));
2088
2089         nrs_tbf_cli_fini(cli);
2090 }
2091 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2092         .hs_hash        = nrs_tbf_opcode_hop_hash,
2093         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
2094         .hs_key         = nrs_tbf_opcode_hop_key,
2095         .hs_object      = nrs_tbf_hop_object,
2096         .hs_get         = nrs_tbf_opcode_hop_get,
2097         .hs_put         = nrs_tbf_opcode_hop_put,
2098         .hs_put_locked  = nrs_tbf_opcode_hop_put,
2099         .hs_exit        = nrs_tbf_opcode_hop_exit,
2100 };
2101
2102 static int
2103 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2104                     struct nrs_tbf_head *head)
2105 {
2106         struct nrs_tbf_cmd      start = { 0 };
2107         int rc;
2108
2109         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2110                                             NRS_TBF_NID_BITS,
2111                                             NRS_TBF_NID_BITS,
2112                                             NRS_TBF_NID_BKT_BITS, 0,
2113                                             CFS_HASH_MIN_THETA,
2114                                             CFS_HASH_MAX_THETA,
2115                                             &nrs_tbf_opcode_hash_ops,
2116                                             CFS_HASH_RW_BKTLOCK);
2117         if (head->th_cli_hash == NULL)
2118                 return -ENOMEM;
2119
2120         start.u.tc_start.ts_opcodes_str = "*";
2121
2122         start.u.tc_start.ts_rpc_rate = tbf_rate;
2123         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2124         start.tc_name = NRS_TBF_DEFAULT_RULE;
2125         rc = nrs_tbf_rule_start(policy, head, &start);
2126
2127         return rc;
2128 }
2129
2130 static struct nrs_tbf_client *
2131 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2132                         struct ptlrpc_request *req)
2133 {
2134         __u32 opc;
2135
2136         opc = lustre_msg_get_opc(req->rq_reqmsg);
2137         return cfs_hash_lookup(head->th_cli_hash, &opc);
2138 }
2139
2140 static struct nrs_tbf_client *
2141 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2142                            struct nrs_tbf_client *cli)
2143 {
2144         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2145                                        &cli->tc_hnode);
2146 }
2147
2148 static void
2149 nrs_tbf_cfs_hash_cli_put(struct nrs_tbf_head *head,
2150                         struct nrs_tbf_client *cli)
2151 {
2152         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
2153 }
2154
2155 static void
2156 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2157                         struct ptlrpc_request *req)
2158 {
2159         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2160 }
2161
2162 #define MAX_OPCODE_LEN  32
2163 static int
2164 nrs_tbf_opcode_set_bit(char *id, unsigned long *opcodes)
2165 {
2166         int op;
2167
2168         op = ll_str2opcode(id);
2169         if (op < 0)
2170                 return -EINVAL;
2171
2172         set_bit(op, opcodes);
2173         return 0;
2174 }
2175
2176 static int
2177 nrs_tbf_opcode_list_parse(char *orig, unsigned long **bitmaptr)
2178 {
2179         unsigned long *opcodes;
2180         char *str;
2181         int cnt = 0;
2182         int rc = 0;
2183
2184         ENTRY;
2185         orig = kstrdup(orig, GFP_KERNEL);
2186         if (!orig)
2187                 return -ENOMEM;
2188         opcodes = bitmap_zalloc(LUSTRE_MAX_OPCODES, GFP_KERNEL);
2189         if (!opcodes) {
2190                 kfree(orig);
2191                 return -ENOMEM;
2192         }
2193         str = orig;
2194         while (str && rc == 0) {
2195                 char *tok = strsep(&str, " ");
2196
2197                 if (*tok) {
2198                         rc = nrs_tbf_opcode_set_bit(tok, opcodes);
2199                         cnt += 1;
2200                 }
2201         }
2202         if (cnt == 0)
2203                 rc = -EINVAL;
2204
2205         kfree(orig);
2206         if (rc == 0 && bitmaptr)
2207                 *bitmaptr = opcodes;
2208         else
2209                 bitmap_free(opcodes);
2210
2211         RETURN(rc);
2212 }
2213
2214 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2215 {
2216         OBD_FREE_STR(cmd->u.tc_start.ts_opcodes_str);
2217 }
2218
2219 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2220 {
2221         int rc;
2222
2223         rc = nrs_tbf_check_id_value(&id, "opcode");
2224         if (rc)
2225                 return rc;
2226
2227         OBD_STRNDUP(cmd->u.tc_start.ts_opcodes_str, id, strlen(id));
2228         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2229                 return -ENOMEM;
2230
2231         /* parse opcode list */
2232         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str, NULL);
2233         if (rc)
2234                 nrs_tbf_opcode_cmd_fini(cmd);
2235
2236         return rc;
2237 }
2238
2239 static int
2240 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2241                           struct nrs_tbf_client *cli)
2242 {
2243         if (rule->tr_opcodes == NULL)
2244                 return 0;
2245
2246         return test_bit(cli->tc_opcode, rule->tr_opcodes);
2247 }
2248
2249 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2250                                     struct nrs_tbf_rule *rule,
2251                                     struct nrs_tbf_cmd *start)
2252 {
2253         int rc = 0;
2254
2255         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2256         OBD_STRNDUP(rule->tr_opcodes_str,
2257                   start->u.tc_start.ts_opcodes_str,
2258                   strlen(start->u.tc_start.ts_opcodes_str));
2259         if (rule->tr_opcodes_str == NULL)
2260                 return -ENOMEM;
2261
2262         /* Default rule '*' */
2263         if (strcmp(start->u.tc_start.ts_opcodes_str, "*") == 0)
2264                 return 0;
2265
2266         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2267                                        &rule->tr_opcodes);
2268         if (rc)
2269                 OBD_FREE_STR(rule->tr_opcodes_str);
2270
2271         return rc;
2272 }
2273
2274 static int
2275 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2276 {
2277         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2278                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2279                    kref_read(&rule->tr_ref) - 1);
2280         return 0;
2281 }
2282
2283
2284 static struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2285         .o_name = NRS_TBF_TYPE_OPCODE,
2286         .o_startup = nrs_tbf_opcode_startup,
2287         .o_cli_find = nrs_tbf_opcode_cli_find,
2288         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2289         .o_cli_put = nrs_tbf_cfs_hash_cli_put,
2290         .o_cli_init = nrs_tbf_opcode_cli_init,
2291         .o_rule_init = nrs_tbf_opcode_rule_init,
2292         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2293         .o_rule_match = nrs_tbf_opcode_rule_match,
2294         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2295 };
2296
2297 static unsigned int
2298 nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2299                     const unsigned int bits)
2300 {
2301         return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), bits);
2302 }
2303
2304 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2305 {
2306         const struct tbf_id *opc = key;
2307         enum nrs_tbf_flag ntf;
2308         struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2309                                                  tc_hnode);
2310         ntf = opc->ti_type & cli->tc_id.ti_type;
2311         if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2312                 return 0;
2313
2314         if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2315                 return 0;
2316
2317         return 1;
2318 }
2319
2320 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2321 {
2322         struct nrs_tbf_client *cli = hlist_entry(hnode,
2323                                                  struct nrs_tbf_client,
2324                                                  tc_hnode);
2325         return &cli->tc_id;
2326 }
2327
2328 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2329 {
2330         struct nrs_tbf_client *cli = hlist_entry(hnode,
2331                                                  struct nrs_tbf_client,
2332                                                  tc_hnode);
2333
2334         refcount_inc(&cli->tc_ref);
2335 }
2336
2337 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2338 {
2339         struct nrs_tbf_client *cli = hlist_entry(hnode,
2340                                                  struct nrs_tbf_client,
2341                                                  tc_hnode);
2342
2343         refcount_dec(&cli->tc_ref);
2344 }
2345
2346 static void
2347 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2348
2349 {
2350         struct nrs_tbf_client *cli = hlist_entry(hnode,
2351                                                  struct nrs_tbf_client,
2352                                                  tc_hnode);
2353
2354         nrs_tbf_cli_fini(cli);
2355 }
2356
2357 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2358         .hs_hash        = nrs_tbf_id_hop_hash,
2359         .hs_keycmp      = nrs_tbf_id_hop_keycmp,
2360         .hs_key         = nrs_tbf_id_hop_key,
2361         .hs_object      = nrs_tbf_hop_object,
2362         .hs_get         = nrs_tbf_id_hop_get,
2363         .hs_put         = nrs_tbf_id_hop_put,
2364         .hs_put_locked  = nrs_tbf_id_hop_put,
2365         .hs_exit        = nrs_tbf_id_hop_exit,
2366 };
2367
2368 static int
2369 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2370                    struct nrs_tbf_head *head)
2371 {
2372         struct nrs_tbf_cmd start;
2373         int rc;
2374
2375         head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2376                                             NRS_TBF_NID_BITS,
2377                                             NRS_TBF_NID_BITS,
2378                                             NRS_TBF_NID_BKT_BITS, 0,
2379                                             CFS_HASH_MIN_THETA,
2380                                             CFS_HASH_MAX_THETA,
2381                                             &nrs_tbf_id_hash_ops,
2382                                             CFS_HASH_RW_BKTLOCK);
2383         if (head->th_cli_hash == NULL)
2384                 return -ENOMEM;
2385
2386         memset(&start, 0, sizeof(start));
2387         start.u.tc_start.ts_ids_str = "*";
2388         start.u.tc_start.ts_rpc_rate = tbf_rate;
2389         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2390         start.tc_name = NRS_TBF_DEFAULT_RULE;
2391         INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2392         rc = nrs_tbf_rule_start(policy, head, &start);
2393         if (rc) {
2394                 cfs_hash_putref(head->th_cli_hash);
2395                 head->th_cli_hash = NULL;
2396         }
2397
2398         return rc;
2399 }
2400
2401 static struct nrs_tbf_client *
2402 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2403                     struct ptlrpc_request *req)
2404 {
2405         struct tbf_id id;
2406
2407         LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2408                 head->th_type_flag == NRS_TBF_FLAG_GID);
2409
2410         nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2411         return cfs_hash_lookup(head->th_cli_hash, &id);
2412 }
2413
2414 static struct nrs_tbf_client *
2415 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2416                        struct nrs_tbf_client *cli)
2417 {
2418         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2419                                        &cli->tc_hnode);
2420 }
2421
2422 static void
2423 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2424                      struct ptlrpc_request *req)
2425 {
2426         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2427 }
2428
2429 static void
2430 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2431                      struct ptlrpc_request *req)
2432 {
2433         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2434 }
2435
2436 static int
2437 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2438 {
2439         struct nrs_tbf_id *nti_id;
2440         enum nrs_tbf_flag flag;
2441
2442         list_for_each_entry(nti_id, id_list, nti_linkage) {
2443                 flag = id.ti_type & nti_id->nti_id.ti_type;
2444                 if (!flag)
2445                         continue;
2446
2447                 if ((flag & NRS_TBF_FLAG_UID) &&
2448                     (id.ti_uid != nti_id->nti_id.ti_uid))
2449                         continue;
2450
2451                 if ((flag & NRS_TBF_FLAG_GID) &&
2452                     (id.ti_gid != nti_id->nti_id.ti_gid))
2453                         continue;
2454
2455                 return 1;
2456         }
2457         return 0;
2458 }
2459
2460 static int
2461 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2462                       struct nrs_tbf_client *cli)
2463 {
2464         return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2465 }
2466
2467 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2468 {
2469         nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2470
2471         OBD_FREE_STR(cmd->u.tc_start.ts_ids_str);
2472 }
2473
2474 static int
2475 nrs_tbf_id_list_parse(char *orig, struct list_head *id_list,
2476                       enum nrs_tbf_flag tif)
2477 {
2478         int rc = 0;
2479         unsigned long val;
2480         char *str;
2481         struct tbf_id id = { 0 };
2482         ENTRY;
2483
2484         if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2485                 RETURN(-EINVAL);
2486
2487         orig = kstrdup(orig, GFP_KERNEL);
2488         if (!orig)
2489                 return -ENOMEM;
2490
2491         INIT_LIST_HEAD(id_list);
2492         for (str = orig; str ; ) {
2493                 struct nrs_tbf_id *nti_id;
2494                 char *tok;
2495
2496                 tok = strsep(&str, " ");
2497                 if (!*tok)
2498                         /* Empty token - leading, trailing, or
2499                          * multiple spaces in list
2500                          */
2501                         continue;
2502
2503                 id.ti_type = tif;
2504                 rc = kstrtoul(tok, 0, &val);
2505                 if (rc < 0)
2506                         GOTO(out, rc = -EINVAL);
2507                 if (tif == NRS_TBF_FLAG_UID)
2508                         id.ti_uid = val;
2509                 else
2510                         id.ti_gid = val;
2511
2512                 OBD_ALLOC_PTR(nti_id);
2513                 if (nti_id == NULL)
2514                         GOTO(out, rc = -ENOMEM);
2515
2516                 nti_id->nti_id = id;
2517                 list_add_tail(&nti_id->nti_linkage, id_list);
2518         }
2519         if (list_empty(id_list))
2520                 /* Only white space in the list */
2521                 GOTO(out, rc = -EINVAL);
2522 out:
2523         kfree(orig);
2524         if (rc)
2525                 nrs_tbf_id_list_free(id_list);
2526         RETURN(rc);
2527 }
2528
2529 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2530 {
2531         int rc;
2532         enum nrs_tbf_flag tif;
2533
2534         tif = cmd->u.tc_start.ts_valid_type;
2535
2536         rc = nrs_tbf_check_id_value(&id,
2537                                     tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2538         if (rc)
2539                 return rc;
2540
2541         OBD_STRNDUP(cmd->u.tc_start.ts_ids_str, id, strlen(id));
2542         if (cmd->u.tc_start.ts_ids_str == NULL)
2543                 return -ENOMEM;
2544
2545         rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2546                                    &cmd->u.tc_start.ts_ids, tif);
2547         if (rc)
2548                 nrs_tbf_id_cmd_fini(cmd);
2549
2550         return rc;
2551 }
2552
2553 static int
2554 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2555                      struct nrs_tbf_rule *rule,
2556                      struct nrs_tbf_cmd *start)
2557 {
2558         struct nrs_tbf_head *head = rule->tr_head;
2559         int rc = 0;
2560         enum nrs_tbf_flag tif = head->th_type_flag;
2561         int ids_len = strlen(start->u.tc_start.ts_ids_str);
2562
2563         LASSERT(start->u.tc_start.ts_ids_str);
2564         INIT_LIST_HEAD(&rule->tr_ids);
2565
2566         OBD_STRNDUP(rule->tr_ids_str, start->u.tc_start.ts_ids_str, ids_len);
2567         if (rule->tr_ids_str == NULL)
2568                 return -ENOMEM;
2569
2570         if (!list_empty(&start->u.tc_start.ts_ids)) {
2571                 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2572                                            &rule->tr_ids, tif);
2573                 if (rc)
2574                         CERROR("%ss {%s} illegal\n",
2575                                tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2576                                rule->tr_ids_str);
2577         }
2578         if (rc)
2579                 OBD_FREE_STR(rule->tr_ids_str);
2580         return rc;
2581 }
2582
2583 static int
2584 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2585 {
2586         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2587                    rule->tr_ids_str, rule->tr_rpc_rate,
2588                    kref_read(&rule->tr_ref) - 1);
2589         return 0;
2590 }
2591
2592 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2593 {
2594         nrs_tbf_id_list_free(&rule->tr_ids);
2595         OBD_FREE_STR(rule->tr_ids_str);
2596 }
2597
2598 static struct nrs_tbf_ops nrs_tbf_uid_ops = {
2599         .o_name = NRS_TBF_TYPE_UID,
2600         .o_startup = nrs_tbf_id_startup,
2601         .o_cli_find = nrs_tbf_id_cli_find,
2602         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2603         .o_cli_put = nrs_tbf_cfs_hash_cli_put,
2604         .o_cli_init = nrs_tbf_uid_cli_init,
2605         .o_rule_init = nrs_tbf_id_rule_init,
2606         .o_rule_dump = nrs_tbf_id_rule_dump,
2607         .o_rule_match = nrs_tbf_id_rule_match,
2608         .o_rule_fini = nrs_tbf_id_rule_fini,
2609 };
2610
2611 static struct nrs_tbf_ops nrs_tbf_gid_ops = {
2612         .o_name = NRS_TBF_TYPE_GID,
2613         .o_startup = nrs_tbf_id_startup,
2614         .o_cli_find = nrs_tbf_id_cli_find,
2615         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2616         .o_cli_put = nrs_tbf_cfs_hash_cli_put,
2617         .o_cli_init = nrs_tbf_gid_cli_init,
2618         .o_rule_init = nrs_tbf_id_rule_init,
2619         .o_rule_dump = nrs_tbf_id_rule_dump,
2620         .o_rule_match = nrs_tbf_id_rule_match,
2621         .o_rule_fini = nrs_tbf_id_rule_fini,
2622 };
2623
2624 static struct nrs_tbf_type nrs_tbf_types[] = {
2625         {
2626                 .ntt_name = NRS_TBF_TYPE_JOBID,
2627                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2628                 .ntt_ops = &nrs_tbf_jobid_ops,
2629         },
2630         {
2631                 .ntt_name = NRS_TBF_TYPE_NID,
2632                 .ntt_flag = NRS_TBF_FLAG_NID,
2633                 .ntt_ops = &nrs_tbf_nid_ops,
2634         },
2635         {
2636                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2637                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2638                 .ntt_ops = &nrs_tbf_opcode_ops,
2639         },
2640         {
2641                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2642                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2643                 .ntt_ops = &nrs_tbf_generic_ops,
2644         },
2645         {
2646                 .ntt_name = NRS_TBF_TYPE_UID,
2647                 .ntt_flag = NRS_TBF_FLAG_UID,
2648                 .ntt_ops = &nrs_tbf_uid_ops,
2649         },
2650         {
2651                 .ntt_name = NRS_TBF_TYPE_GID,
2652                 .ntt_flag = NRS_TBF_FLAG_GID,
2653                 .ntt_ops = &nrs_tbf_gid_ops,
2654         },
2655 };
2656
2657 /**
2658  * Is called before the policy transitions into
2659  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2660  * policy-specific private data structure.
2661  *
2662  * \param[in] policy The policy to start
2663  *
2664  * \retval -ENOMEM OOM error
2665  * \retval  0      success
2666  *
2667  * \see nrs_policy_register()
2668  * \see nrs_policy_ctl()
2669  */
2670 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2671 {
2672         struct nrs_tbf_head     *head;
2673         struct nrs_tbf_ops      *ops;
2674         __u32                    type;
2675         char                    *name;
2676         int found = 0;
2677         int i;
2678         int rc = 0;
2679
2680         if (arg == NULL)
2681                 name = NRS_TBF_TYPE_GENERIC;
2682         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2683                 name = arg;
2684         else
2685                 GOTO(out, rc = -EINVAL);
2686
2687         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2688                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2689                         ops = nrs_tbf_types[i].ntt_ops;
2690                         type = nrs_tbf_types[i].ntt_flag;
2691                         found = 1;
2692                         break;
2693                 }
2694         }
2695         if (found == 0)
2696                 GOTO(out, rc = -ENOTSUPP);
2697
2698         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2699         if (head == NULL)
2700                 GOTO(out, rc = -ENOMEM);
2701
2702         memcpy(head->th_type, name, strlen(name));
2703         head->th_type[strlen(name)] = '\0';
2704         head->th_ops = ops;
2705         head->th_type_flag = type;
2706
2707         head->th_binheap = binheap_create(&nrs_tbf_heap_ops,
2708                                           CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2709                                           nrs_pol2cptab(policy),
2710                                           nrs_pol2cptid(policy));
2711         if (head->th_binheap == NULL)
2712                 GOTO(out_free_head, rc = -ENOMEM);
2713
2714         atomic_set(&head->th_rule_sequence, 0);
2715         spin_lock_init(&head->th_rule_lock);
2716         INIT_LIST_HEAD(&head->th_list);
2717         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2718         head->th_timer.function = nrs_tbf_timer_cb;
2719         rc = head->th_ops->o_startup(policy, head);
2720         if (rc)
2721                 GOTO(out_free_heap, rc);
2722
2723         policy->pol_private = head;
2724         return 0;
2725 out_free_heap:
2726         binheap_destroy(head->th_binheap);
2727 out_free_head:
2728         OBD_FREE_PTR(head);
2729 out:
2730         return rc;
2731 }
2732
2733 /**
2734  * Is called before the policy transitions into
2735  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2736  * private data structure.
2737  *
2738  * \param[in] policy The policy to stop
2739  *
2740  * \see nrs_policy_stop0()
2741  */
2742 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2743 {
2744         struct nrs_tbf_head *head = policy->pol_private;
2745         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2746         struct nrs_tbf_rule *rule, *n;
2747
2748         LASSERT(head != NULL);
2749         hrtimer_cancel(&head->th_timer);
2750         /* Should cleanup hash first before free rules */
2751         if (head->th_type_flag == NRS_TBF_FLAG_NID) {
2752                 rhashtable_free_and_destroy(&head->th_cli_rhash,
2753                                             nrs_tbf_nid_exit, NULL);
2754         } else {
2755                 LASSERT(head->th_cli_hash);
2756                 cfs_hash_putref(head->th_cli_hash);
2757         }
2758         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2759                 list_del_init(&rule->tr_linkage);
2760                 kref_put(&rule->tr_ref, nrs_tbf_rule_fini);
2761         }
2762         LASSERT(list_empty(&head->th_list));
2763         LASSERT(head->th_binheap != NULL);
2764         LASSERT(binheap_is_empty(head->th_binheap));
2765         binheap_destroy(head->th_binheap);
2766         OBD_FREE_PTR(head);
2767         nrs->nrs_throttling = 0;
2768         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2769 }
2770
2771 /**
2772  * Performs a policy-specific ctl function on TBF policy instances; similar
2773  * to ioctl.
2774  *
2775  * \param[in]     policy the policy instance
2776  * \param[in]     opc    the opcode
2777  * \param[in,out] arg    used for passing parameters and information
2778  *
2779  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2780  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2781  *
2782  * \retval 0   operation carried out successfully
2783  * \retval -ve error
2784  */
2785 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2786                        enum ptlrpc_nrs_ctl opc,
2787                        void *arg)
2788 {
2789         int rc = 0;
2790         ENTRY;
2791
2792         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2793
2794         switch (opc) {
2795         default:
2796                 RETURN(-EINVAL);
2797
2798         /**
2799          * Read RPC rate size of a policy instance.
2800          */
2801         case NRS_CTL_TBF_RD_RULE: {
2802                 struct nrs_tbf_head *head = policy->pol_private;
2803                 struct seq_file *m = arg;
2804                 struct ptlrpc_service_part *svcpt;
2805
2806                 svcpt = policy->pol_nrs->nrs_svcpt;
2807                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2808
2809                 rc = nrs_tbf_rule_dump_all(head, m);
2810                 }
2811                 break;
2812
2813         /**
2814          * Write RPC rate of a policy instance.
2815          */
2816         case NRS_CTL_TBF_WR_RULE: {
2817                 struct nrs_tbf_head *head = policy->pol_private;
2818                 struct nrs_tbf_cmd *cmd;
2819
2820                 cmd = (struct nrs_tbf_cmd *)arg;
2821                 rc = nrs_tbf_command(policy,
2822                                      head,
2823                                      cmd);
2824                 }
2825                 break;
2826         /**
2827          * Read the TBF policy type of a policy instance.
2828          */
2829         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2830                 struct nrs_tbf_head *head = policy->pol_private;
2831
2832                 *(__u32 *)arg = head->th_type_flag;
2833                 }
2834                 break;
2835         }
2836
2837         RETURN(rc);
2838 }
2839
2840 /**
2841  * Is called for obtaining a TBF policy resource.
2842  *
2843  * \param[in]  policy     The policy on which the request is being asked for
2844  * \param[in]  nrq        The request for which resources are being taken
2845  * \param[in]  parent     Parent resource, unused in this policy
2846  * \param[out] resp       Resources references are placed in this array
2847  * \param[in]  moving_req Signifies limited caller context; unused in this
2848  *                        policy
2849  *
2850  *
2851  * \see nrs_resource_get_safe()
2852  */
2853 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2854                            struct ptlrpc_nrs_request *nrq,
2855                            const struct ptlrpc_nrs_resource *parent,
2856                            struct ptlrpc_nrs_resource **resp,
2857                            bool moving_req)
2858 {
2859         struct nrs_tbf_head   *head;
2860         struct nrs_tbf_client *cli;
2861         struct nrs_tbf_client *tmp;
2862         struct ptlrpc_request *req;
2863
2864         if (parent == NULL) {
2865                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2866                 return 0;
2867         }
2868
2869         head = container_of(parent, struct nrs_tbf_head, th_res);
2870         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2871         cli = head->th_ops->o_cli_find(head, req);
2872         if (cli != NULL) {
2873                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2874                 LASSERT(cli->tc_rule);
2875                 if (cli->tc_rule_sequence !=
2876                     atomic_read(&head->th_rule_sequence) ||
2877                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2878                         struct nrs_tbf_rule *rule;
2879
2880                         CDEBUG(D_RPCTRACE,
2881                                "TBF class@%p rate %llu sequence %d, "
2882                                "rule flags %d, head sequence %d\n",
2883                                cli, cli->tc_rpc_rate,
2884                                cli->tc_rule_sequence,
2885                                cli->tc_rule->tr_flags,
2886                                atomic_read(&head->th_rule_sequence));
2887                         rule = nrs_tbf_rule_match(head, cli);
2888                         if (rule != cli->tc_rule) {
2889                                 nrs_tbf_cli_reset(head, rule, cli);
2890                         } else {
2891                                 if (cli->tc_rule_generation != rule->tr_generation)
2892                                         nrs_tbf_cli_reset_value(head, cli);
2893                                 kref_put(&rule->tr_ref, nrs_tbf_rule_fini);
2894                         }
2895                 } else if (cli->tc_rule_generation !=
2896                            cli->tc_rule->tr_generation) {
2897                         nrs_tbf_cli_reset_value(head, cli);
2898                 }
2899                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2900                 goto out;
2901         }
2902
2903         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2904                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2905         if (cli == NULL)
2906                 return -ENOMEM;
2907
2908         nrs_tbf_cli_init(head, cli, req);
2909         tmp = head->th_ops->o_cli_findadd(head, cli);
2910         if (tmp != cli) {
2911                 refcount_dec(&cli->tc_ref);
2912                 nrs_tbf_cli_fini(cli);
2913                 cli = tmp;
2914                 if (IS_ERR(cli))
2915                         return PTR_ERR(cli);
2916         }
2917 out:
2918         *resp = &cli->tc_res;
2919
2920         return 1;
2921 }
2922
2923 /**
2924  * Called when releasing references to the resource hierachy obtained for a
2925  * request for scheduling using the TBF policy.
2926  *
2927  * \param[in] policy   the policy the resource belongs to
2928  * \param[in] res      the resource to be released
2929  */
2930 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
2931                             const struct ptlrpc_nrs_resource *res)
2932 {
2933         struct nrs_tbf_head   *head;
2934         struct nrs_tbf_client *cli;
2935
2936         /**
2937          * Do nothing for freeing parent, nrs_tbf_net resources
2938          */
2939         if (res->res_parent == NULL)
2940                 return;
2941
2942         cli = container_of(res, struct nrs_tbf_client, tc_res);
2943         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
2944
2945         head->th_ops->o_cli_put(head, cli);
2946 }
2947
2948 /**
2949  * Called when getting a request from the TBF policy for handling, or just
2950  * peeking; removes the request from the policy when it is to be handled.
2951  *
2952  * \param[in] policy The policy
2953  * \param[in] peek   When set, signifies that we just want to examine the
2954  *                   request, and not handle it, so the request is not removed
2955  *                   from the policy.
2956  * \param[in] force  Force the policy to return a request
2957  *
2958  * \retval The request to be handled; this is the next request in the TBF
2959  *         rule
2960  *
2961  * \see ptlrpc_nrs_req_get_nolock()
2962  * \see nrs_request_get()
2963  */
2964 static
2965 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
2966                                            bool peek, bool force)
2967 {
2968         struct nrs_tbf_head       *head = policy->pol_private;
2969         struct ptlrpc_nrs_request *nrq = NULL;
2970         struct nrs_tbf_client     *cli;
2971         struct binheap_node       *node;
2972
2973         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2974
2975         if (likely(!peek && !force) && policy->pol_nrs->nrs_throttling)
2976                 return NULL;
2977
2978         node = binheap_root(head->th_binheap);
2979         if (unlikely(node == NULL))
2980                 return NULL;
2981
2982         cli = container_of(node, struct nrs_tbf_client, tc_node);
2983         LASSERT(cli->tc_in_heap);
2984         if (unlikely(peek)) {
2985                 nrq = list_first_entry(&cli->tc_list,
2986                                        struct ptlrpc_nrs_request,
2987                                        nr_u.tbf.tr_list);
2988         } else {
2989                 struct nrs_tbf_rule *rule = cli->tc_rule;
2990                 __u64 now = ktime_to_ns(ktime_get());
2991                 __u64 passed;
2992                 __u64 ntoken;
2993                 __u64 deadline;
2994                 __u64 old_resid = 0;
2995
2996                 deadline = cli->tc_check_time +
2997                           cli->tc_nsecs;
2998                 LASSERT(now >= cli->tc_check_time);
2999                 passed = now - cli->tc_check_time;
3000                 ntoken = passed * cli->tc_rpc_rate;
3001                 do_div(ntoken, NSEC_PER_SEC);
3002
3003                 ntoken += cli->tc_ntoken;
3004                 if (rule->tr_flags & NTRS_REALTIME) {
3005                         LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3006                         old_resid = cli->tc_nsecs_resid;
3007                         cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3008                         if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3009                                 ntoken++;
3010                                 cli->tc_nsecs_resid -= cli->tc_nsecs;
3011                         }
3012                 } else if (ntoken > cli->tc_depth)
3013                         ntoken = cli->tc_depth;
3014
3015                 /* give an extra token with force mode */
3016                 if (unlikely(force) && ntoken == 0)
3017                         ntoken = 1;
3018
3019                 if (ntoken > 0) {
3020                         nrq = list_first_entry(&cli->tc_list,
3021                                          struct ptlrpc_nrs_request,
3022                                          nr_u.tbf.tr_list);
3023                         ntoken--;
3024                         cli->tc_ntoken = ntoken;
3025                         cli->tc_check_time = now;
3026                         list_del_init(&nrq->nr_u.tbf.tr_list);
3027                         if (list_empty(&cli->tc_list)) {
3028                                 binheap_remove(head->th_binheap,
3029                                                &cli->tc_node);
3030                                 cli->tc_in_heap = false;
3031                         } else {
3032                                 if (!(rule->tr_flags & NTRS_REALTIME))
3033                                         cli->tc_deadline = now + cli->tc_nsecs;
3034                                 binheap_relocate(head->th_binheap,
3035                                                  &cli->tc_node);
3036                         }
3037                         CDEBUG(D_RPCTRACE,
3038                                "TBF dequeues: class@%p rate %llu gen %llu token %llu, rule@%p rate %llu gen %llu\n",
3039                                cli, cli->tc_rpc_rate,
3040                                cli->tc_rule_generation, cli->tc_ntoken,
3041                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3042                                cli->tc_rule->tr_generation);
3043                 } else {
3044                         ktime_t time;
3045
3046                         if (rule->tr_flags & NTRS_REALTIME) {
3047                                 cli->tc_deadline = deadline;
3048                                 cli->tc_nsecs_resid = old_resid;
3049                                 binheap_relocate(head->th_binheap,
3050                                                  &cli->tc_node);
3051                                 if (node != binheap_root(head->th_binheap))
3052                                         return nrs_tbf_req_get(policy,
3053                                                                peek, force);
3054                         }
3055                         policy->pol_nrs->nrs_throttling = 1;
3056                         head->th_deadline = deadline;
3057                         time = ktime_set(0, 0);
3058                         time = ktime_add_ns(time, deadline);
3059                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3060                 }
3061         }
3062
3063         return nrq;
3064 }
3065
3066 /**
3067  * Adds request \a nrq to \a policy's list of queued requests
3068  *
3069  * \param[in] policy The policy
3070  * \param[in] nrq    The request to add
3071  *
3072  * \retval 0 success; nrs_request_enqueue() assumes this function will always
3073  *                    succeed
3074  */
3075 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3076                            struct ptlrpc_nrs_request *nrq)
3077 {
3078         struct nrs_tbf_head   *head;
3079         struct nrs_tbf_client *cli;
3080         int                    rc = 0;
3081
3082         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3083
3084         cli = container_of(nrs_request_resource(nrq),
3085                            struct nrs_tbf_client, tc_res);
3086         head = container_of(nrs_request_resource(nrq)->res_parent,
3087                             struct nrs_tbf_head, th_res);
3088         if (list_empty(&cli->tc_list)) {
3089                 LASSERT(!cli->tc_in_heap);
3090                 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3091                 rc = binheap_insert(head->th_binheap, &cli->tc_node);
3092                 if (rc == 0) {
3093                         cli->tc_in_heap = true;
3094                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3095                         list_add_tail(&nrq->nr_u.tbf.tr_list,
3096                                           &cli->tc_list);
3097                         if (policy->pol_nrs->nrs_throttling) {
3098                                 __u64 deadline = cli->tc_deadline;
3099                                 if ((head->th_deadline > deadline) &&
3100                                     (hrtimer_try_to_cancel(&head->th_timer)
3101                                      >= 0)) {
3102                                         ktime_t time;
3103                                         head->th_deadline = deadline;
3104                                         time = ktime_set(0, 0);
3105                                         time = ktime_add_ns(time, deadline);
3106                                         hrtimer_start(&head->th_timer, time,
3107                                                       HRTIMER_MODE_ABS);
3108                                 }
3109                         }
3110                 }
3111         } else {
3112                 LASSERT(cli->tc_in_heap);
3113                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3114                 list_add_tail(&nrq->nr_u.tbf.tr_list,
3115                                   &cli->tc_list);
3116         }
3117
3118         if (rc == 0)
3119                 CDEBUG(D_RPCTRACE,
3120                        "TBF enqueues: class@%p rate %llu gen %llu token %llu, rule@%p rate %llu gen %llu\n",
3121                        cli, cli->tc_rpc_rate,
3122                        cli->tc_rule_generation, cli->tc_ntoken,
3123                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3124                        cli->tc_rule->tr_generation);
3125
3126         return rc;
3127 }
3128
3129 /**
3130  * Removes request \a nrq from \a policy's list of queued requests.
3131  *
3132  * \param[in] policy The policy
3133  * \param[in] nrq    The request to remove
3134  */
3135 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3136                              struct ptlrpc_nrs_request *nrq)
3137 {
3138         struct nrs_tbf_head   *head;
3139         struct nrs_tbf_client *cli;
3140
3141         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3142
3143         cli = container_of(nrs_request_resource(nrq),
3144                            struct nrs_tbf_client, tc_res);
3145         head = container_of(nrs_request_resource(nrq)->res_parent,
3146                             struct nrs_tbf_head, th_res);
3147
3148         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3149         list_del_init(&nrq->nr_u.tbf.tr_list);
3150         if (list_empty(&cli->tc_list)) {
3151                 binheap_remove(head->th_binheap,
3152                                &cli->tc_node);
3153                 cli->tc_in_heap = false;
3154         } else {
3155                 binheap_relocate(head->th_binheap,
3156                                  &cli->tc_node);
3157         }
3158 }
3159
3160 /**
3161  * Prints a debug statement right before the request \a nrq stops being
3162  * handled.
3163  *
3164  * \param[in] policy The policy handling the request
3165  * \param[in] nrq    The request being handled
3166  *
3167  * \see ptlrpc_server_finish_request()
3168  * \see ptlrpc_nrs_req_stop_nolock()
3169  */
3170 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3171                               struct ptlrpc_nrs_request *nrq)
3172 {
3173         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3174                                                   rq_nrq);
3175
3176         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3177
3178         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3179                policy->pol_desc->pd_name, libcfs_idstr(&req->rq_peer),
3180                nrq->nr_u.tbf.tr_sequence);
3181 }
3182
3183 /**
3184  * debugfs interface
3185  */
3186
3187 /**
3188  * The maximum RPC rate.
3189  */
3190 #define LPROCFS_NRS_RATE_MAX            1000000ULL      /* 1rpc/us */
3191
3192 static int
3193 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3194 {
3195         struct ptlrpc_service       *svc = m->private;
3196         int                          rc;
3197
3198         seq_printf(m, "regular_requests:\n");
3199         /**
3200          * Perform two separate calls to this as only one of the NRS heads'
3201          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3202          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3203          */
3204         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3205                                        NRS_POL_NAME_TBF,
3206                                        NRS_CTL_TBF_RD_RULE,
3207                                        false, m);
3208         if (rc == 0) {
3209                 /**
3210                  * -ENOSPC means buf in the parameter m is overflow, return 0
3211                  * here to let upper layer function seq_read alloc a larger
3212                  * memory area and do this process again.
3213                  */
3214         } else if (rc == -ENOSPC) {
3215                 return 0;
3216
3217                 /**
3218                  * Ignore -ENODEV as the regular NRS head's policy may be in the
3219                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3220                  */
3221         } else if (rc != -ENODEV) {
3222                 return rc;
3223         }
3224
3225         if (!nrs_svc_has_hp(svc))
3226                 goto no_hp;
3227
3228         seq_printf(m, "high_priority_requests:\n");
3229         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3230                                        NRS_POL_NAME_TBF,
3231                                        NRS_CTL_TBF_RD_RULE,
3232                                        false, m);
3233         if (rc == 0) {
3234                 /**
3235                  * -ENOSPC means buf in the parameter m is overflow, return 0
3236                  * here to let upper layer function seq_read alloc a larger
3237                  * memory area and do this process again.
3238                  */
3239         } else if (rc == -ENOSPC) {
3240                 return 0;
3241         }
3242
3243 no_hp:
3244
3245         return rc;
3246 }
3247
3248 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3249 {
3250         int rc;
3251         ENTRY;
3252
3253         switch (cmd->u.tc_start.ts_valid_type) {
3254         case NRS_TBF_FLAG_JOBID:
3255                 rc = nrs_tbf_jobid_parse(cmd, token);
3256                 break;
3257         case NRS_TBF_FLAG_NID:
3258                 rc = nrs_tbf_nid_parse(cmd, token);
3259                 break;
3260         case NRS_TBF_FLAG_OPCODE:
3261                 rc = nrs_tbf_opcode_parse(cmd, token);
3262                 break;
3263         case NRS_TBF_FLAG_GENERIC:
3264                 rc = nrs_tbf_generic_parse(cmd, token);
3265                 break;
3266         case NRS_TBF_FLAG_UID:
3267         case NRS_TBF_FLAG_GID:
3268                 rc = nrs_tbf_ug_id_parse(cmd, token);
3269                 break;
3270         default:
3271                 RETURN(-EINVAL);
3272         }
3273
3274         RETURN(rc);
3275 }
3276
3277 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3278 {
3279         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3280                 switch (cmd->u.tc_start.ts_valid_type) {
3281                 case NRS_TBF_FLAG_JOBID:
3282                         nrs_tbf_jobid_cmd_fini(cmd);
3283                         break;
3284                 case NRS_TBF_FLAG_NID:
3285                         nrs_tbf_nid_cmd_fini(cmd);
3286                         break;
3287                 case NRS_TBF_FLAG_OPCODE:
3288                         nrs_tbf_opcode_cmd_fini(cmd);
3289                         break;
3290                 case NRS_TBF_FLAG_GENERIC:
3291                         nrs_tbf_generic_cmd_fini(cmd);
3292                         break;
3293                 case NRS_TBF_FLAG_UID:
3294                 case NRS_TBF_FLAG_GID:
3295                         nrs_tbf_id_cmd_fini(cmd);
3296                         break;
3297                 default:
3298                         CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3299                               cmd->u.tc_start.ts_valid_type);
3300                 }
3301         }
3302 }
3303
3304 static int check_rule_name(const char *name)
3305 {
3306         int i;
3307
3308         if (name[0] == '\0')
3309                 return -EINVAL;
3310
3311         for (i = 0; name[i] != '\0' && i < MAX_TBF_NAME; i++) {
3312                 if (!isalnum(name[i]) && name[i] != '_')
3313                         return -EINVAL;
3314         }
3315
3316         if (i == MAX_TBF_NAME)
3317                 return -ENAMETOOLONG;
3318
3319         return 0;
3320 }
3321
3322 static int
3323 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3324 {
3325         char    *key;
3326         char    *val;
3327         int      rc;
3328         __u64    rate;
3329
3330         val = buffer;
3331         key = strsep(&val, "=");
3332         if (val == NULL || strlen(val) == 0)
3333                 return -EINVAL;
3334
3335         /* Key of the value pair */
3336         if (strcmp(key, "rate") == 0) {
3337                 rc = kstrtoull(val, 10, &rate);
3338                 if (rc)
3339                         return rc;
3340
3341                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3342                         return -EINVAL;
3343
3344                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3345                         cmd->u.tc_start.ts_rpc_rate = rate;
3346                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3347                         cmd->u.tc_change.tc_rpc_rate = rate;
3348                 else
3349                         return -EINVAL;
3350         }  else if (strcmp(key, "rank") == 0) {
3351                 rc = check_rule_name(val);
3352                 if (rc)
3353                         return rc;
3354
3355                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3356                         cmd->u.tc_start.ts_next_name = val;
3357                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3358                         cmd->u.tc_change.tc_next_name = val;
3359                 else
3360                         return -EINVAL;
3361         } else if (strcmp(key, "realtime") == 0) {
3362                 unsigned long realtime;
3363
3364                 rc = kstrtoul(val, 10, &realtime);
3365                 if (rc)
3366                         return rc;
3367
3368                 if (realtime > 0)
3369                         cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3370         } else {
3371                 return -EINVAL;
3372         }
3373         return 0;
3374 }
3375
3376 static int
3377 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3378 {
3379         char    *val;
3380         char    *token;
3381         int      rc;
3382
3383         val = buffer;
3384         while (val != NULL && strlen(val) != 0) {
3385                 token = strsep(&val, " ");
3386                 rc = nrs_tbf_parse_value_pair(cmd, token);
3387                 if (rc)
3388                         return rc;
3389         }
3390
3391         switch (cmd->tc_cmd) {
3392         case NRS_CTL_TBF_START_RULE:
3393                 if (cmd->u.tc_start.ts_rpc_rate == 0)
3394                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3395                 break;
3396         case NRS_CTL_TBF_CHANGE_RULE:
3397                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3398                     cmd->u.tc_change.tc_next_name == NULL)
3399                         return -EINVAL;
3400                 break;
3401         case NRS_CTL_TBF_STOP_RULE:
3402                 break;
3403         default:
3404                 return -EINVAL;
3405         }
3406         return 0;
3407 }
3408
3409 static struct nrs_tbf_cmd *
3410 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3411 {
3412         struct nrs_tbf_cmd *cmd;
3413         char *token;
3414         char *val;
3415         int rc = 0;
3416
3417         OBD_ALLOC_PTR(cmd);
3418         if (cmd == NULL)
3419                 GOTO(out, rc = -ENOMEM);
3420         memset(cmd, 0, sizeof(*cmd));
3421
3422         val = buffer;
3423         token = strsep(&val, " ");
3424         if (val == NULL || strlen(val) == 0)
3425                 GOTO(out_free_cmd, rc = -EINVAL);
3426
3427         /* Type of the command */
3428         if (strcmp(token, "start") == 0) {
3429                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3430                 cmd->u.tc_start.ts_valid_type = type_flag;
3431         } else if (strcmp(token, "stop") == 0)
3432                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3433         else if (strcmp(token, "change") == 0)
3434                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3435         else
3436                 GOTO(out_free_cmd, rc = -EINVAL);
3437
3438         /* Name of the rule */
3439         token = strsep(&val, " ");
3440         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE))
3441                 GOTO(out_free_cmd, rc = -EINVAL);
3442
3443         rc = check_rule_name(token);
3444         if (rc)
3445                 GOTO(out_free_cmd, rc);
3446
3447         cmd->tc_name = token;
3448
3449         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3450                 /* List of ID */
3451                 LASSERT(val);
3452                 token = val;
3453                 val = strrchr(token, '}');
3454                 if (!val)
3455                         GOTO(out_free_cmd, rc = -EINVAL);
3456
3457                 /* Skip '}' */
3458                 val++;
3459                 if (*val == '\0') {
3460                         val = NULL;
3461                 } else if (*val == ' ') {
3462                         *val = '\0';
3463                         val++;
3464                 } else
3465                         GOTO(out_free_cmd, rc = -EINVAL);
3466
3467                 rc = nrs_tbf_id_parse(cmd, token);
3468                 if (rc)
3469                         GOTO(out_free_cmd, rc);
3470         }
3471
3472         rc = nrs_tbf_parse_value_pairs(cmd, val);
3473         if (rc)
3474                 GOTO(out_cmd_fini, rc = -EINVAL);
3475         goto out;
3476 out_cmd_fini:
3477         nrs_tbf_cmd_fini(cmd);
3478 out_free_cmd:
3479         OBD_FREE_PTR(cmd);
3480 out:
3481         if (rc)
3482                 cmd = ERR_PTR(rc);
3483         return cmd;
3484 }
3485
3486 /**
3487  * Get the TBF policy type (nid, jobid, etc) preset by
3488  * proc entry 'nrs_policies' for command buffer parsing.
3489  *
3490  * \param[in] svc the PTLRPC service
3491  * \param[in] queue the NRS queue type
3492  *
3493  * \retval the preset TBF policy type flag
3494  */
3495 static __u32
3496 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3497 {
3498         __u32   type;
3499         int     rc;
3500
3501         rc = ptlrpc_nrs_policy_control(svc, queue,
3502                                        NRS_POL_NAME_TBF,
3503                                        NRS_CTL_TBF_RD_TYPE_FLAG,
3504                                        true, &type);
3505         if (rc != 0)
3506                 type = NRS_TBF_FLAG_INVALID;
3507
3508         return type;
3509 }
3510
3511 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3512 static ssize_t
3513 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3514                                       const char __user *buffer,
3515                                       size_t count, loff_t *off)
3516 {
3517         struct seq_file *m = file->private_data;
3518         struct ptlrpc_service *svc = m->private;
3519         char *kernbuf;
3520         char *val;
3521         int rc;
3522         struct nrs_tbf_cmd *cmd;
3523         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3524         unsigned long length;
3525         char *token;
3526
3527         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3528         if (kernbuf == NULL)
3529                 GOTO(out, rc = -ENOMEM);
3530
3531         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3532                 GOTO(out_free_kernbuff, rc = -EINVAL);
3533
3534         if (copy_from_user(kernbuf, buffer, count))
3535                 GOTO(out_free_kernbuff, rc = -EFAULT);
3536
3537         val = kernbuf;
3538         token = strsep(&val, " ");
3539         if (val == NULL)
3540                 GOTO(out_free_kernbuff, rc = -EINVAL);
3541
3542         if (strcmp(token, "reg") == 0) {
3543                 queue = PTLRPC_NRS_QUEUE_REG;
3544         } else if (strcmp(token, "hp") == 0) {
3545                 queue = PTLRPC_NRS_QUEUE_HP;
3546         } else {
3547                 kernbuf[strlen(token)] = ' ';
3548                 val = kernbuf;
3549         }
3550         length = strlen(val);
3551
3552         if (length == 0)
3553                 GOTO(out_free_kernbuff, rc = -EINVAL);
3554
3555         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3556                 GOTO(out_free_kernbuff, rc = -ENODEV);
3557         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3558                 queue = PTLRPC_NRS_QUEUE_REG;
3559
3560         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3561         if (IS_ERR(cmd))
3562                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3563
3564         /**
3565          * Serialize NRS core lprocfs operations with policy registration/
3566          * unregistration.
3567          */
3568         mutex_lock(&nrs_core.nrs_mutex);
3569         rc = ptlrpc_nrs_policy_control(svc, queue,
3570                                        NRS_POL_NAME_TBF,
3571                                        NRS_CTL_TBF_WR_RULE,
3572                                        false, cmd);
3573         mutex_unlock(&nrs_core.nrs_mutex);
3574
3575         nrs_tbf_cmd_fini(cmd);
3576         OBD_FREE_PTR(cmd);
3577 out_free_kernbuff:
3578         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3579 out:
3580         return rc ? rc : count;
3581 }
3582
3583 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3584
3585 /**
3586  * Initializes a TBF policy's lprocfs interface for service \a svc
3587  *
3588  * \param[in] svc the service
3589  *
3590  * \retval 0    success
3591  * \retval != 0 error
3592  */
3593 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3594 {
3595         struct ldebugfs_vars nrs_tbf_lprocfs_vars[] = {
3596                 { .name         = "nrs_tbf_rule",
3597                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3598                   .data = svc },
3599                 { NULL }
3600         };
3601
3602         if (!svc->srv_debugfs_entry)
3603                 return 0;
3604
3605         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_tbf_lprocfs_vars, NULL);
3606
3607         return 0;
3608 }
3609
3610 /**
3611  * TBF policy operations
3612  */
3613 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3614         .op_policy_start        = nrs_tbf_start,
3615         .op_policy_stop         = nrs_tbf_stop,
3616         .op_policy_ctl          = nrs_tbf_ctl,
3617         .op_res_get             = nrs_tbf_res_get,
3618         .op_res_put             = nrs_tbf_res_put,
3619         .op_req_get             = nrs_tbf_req_get,
3620         .op_req_enqueue         = nrs_tbf_req_add,
3621         .op_req_dequeue         = nrs_tbf_req_del,
3622         .op_req_stop            = nrs_tbf_req_stop,
3623         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3624 };
3625
3626 /**
3627  * TBF policy configuration
3628  */
3629 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3630         .nc_name                = NRS_POL_NAME_TBF,
3631         .nc_ops                 = &nrs_tbf_ops,
3632         .nc_compat              = nrs_policy_compat_all,
3633 };
3634
3635 /** @} tbf */
3636
3637 /** @} nrs */