Whamcloud - gitweb
9fb43e4d6f407000161dd8689b196894ab477292
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 #ifdef HAVE_SERVER_SUPPORT
35
36 /**
37  * \addtogoup nrs
38  * @{
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <libcfs/libcfs.h>
45 #include "ptlrpc_internal.h"
46
47 /**
48  * \name tbf
49  *
50  * Token Bucket Filter over client NIDs
51  *
52  * @{
53  */
54
55 #define NRS_POL_NAME_TBF        "tbf"
56
57 static int tbf_jobid_cache_size = 8192;
58 module_param(tbf_jobid_cache_size, int, 0644);
59 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
60
61 static int tbf_rate = 10000;
62 module_param(tbf_rate, int, 0644);
63 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
64
65 static int tbf_depth = 3;
66 module_param(tbf_depth, int, 0644);
67 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
68
69 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
70 {
71         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
72                                                  th_timer);
73         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
74         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
75
76         nrs->nrs_throttling = 0;
77         wake_up(&svcpt->scp_waitq);
78
79         return HRTIMER_NORESTART;
80 }
81
82 #define NRS_TBF_DEFAULT_RULE "default"
83
84 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
85 {
86         LASSERT(atomic_read(&rule->tr_ref) == 0);
87         LASSERT(list_empty(&rule->tr_cli_list));
88         LASSERT(list_empty(&rule->tr_linkage));
89
90         rule->tr_head->th_ops->o_rule_fini(rule);
91         OBD_FREE_PTR(rule);
92 }
93
94 /**
95  * Decreases the rule's usage reference count, and stops the rule in case it
96  * was already stopping and have no more outstanding usage references (which
97  * indicates it has no more queued or started requests, and can be safely
98  * stopped).
99  */
100 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
101 {
102         if (atomic_dec_and_test(&rule->tr_ref))
103                 nrs_tbf_rule_fini(rule);
104 }
105
106 /**
107  * Increases the rule's usage reference count.
108  */
109 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
110 {
111         atomic_inc(&rule->tr_ref);
112 }
113
114 static void
115 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
116 {
117         LASSERT(!list_empty(&cli->tc_linkage));
118         LASSERT(cli->tc_rule);
119         spin_lock(&cli->tc_rule->tr_rule_lock);
120         list_del_init(&cli->tc_linkage);
121         spin_unlock(&cli->tc_rule->tr_rule_lock);
122         nrs_tbf_rule_put(cli->tc_rule);
123         cli->tc_rule = NULL;
124 }
125
126 static void
127 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
128                         struct nrs_tbf_client *cli)
129
130 {
131         struct nrs_tbf_rule *rule = cli->tc_rule;
132
133         cli->tc_rpc_rate = rule->tr_rpc_rate;
134         cli->tc_nsecs = rule->tr_nsecs;
135         cli->tc_depth = rule->tr_depth;
136         cli->tc_ntoken = rule->tr_depth;
137         cli->tc_check_time = ktime_to_ns(ktime_get());
138         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
139         cli->tc_rule_generation = rule->tr_generation;
140
141         if (cli->tc_in_heap)
142                 cfs_binheap_relocate(head->th_binheap,
143                                      &cli->tc_node);
144 }
145
146 static void
147 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
148                   struct nrs_tbf_rule *rule,
149                   struct nrs_tbf_client *cli)
150 {
151         spin_lock(&cli->tc_rule_lock);
152         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
153                 LASSERT(rule != cli->tc_rule);
154                 nrs_tbf_cli_rule_put(cli);
155         }
156         LASSERT(cli->tc_rule == NULL);
157         LASSERT(list_empty(&cli->tc_linkage));
158         /* Rule's ref is added before called */
159         cli->tc_rule = rule;
160         spin_lock(&rule->tr_rule_lock);
161         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
162         spin_unlock(&rule->tr_rule_lock);
163         spin_unlock(&cli->tc_rule_lock);
164         nrs_tbf_cli_reset_value(head, cli);
165 }
166
167 static int
168 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
169 {
170         return rule->tr_head->th_ops->o_rule_dump(rule, m);
171 }
172
173 static int
174 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
175 {
176         struct nrs_tbf_rule *rule;
177         int rc = 0;
178
179         LASSERT(head != NULL);
180         spin_lock(&head->th_rule_lock);
181         /* List the rules from newest to oldest */
182         list_for_each_entry(rule, &head->th_list, tr_linkage) {
183                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
184                 rc = nrs_tbf_rule_dump(rule, m);
185                 if (rc) {
186                         rc = -ENOSPC;
187                         break;
188                 }
189         }
190         spin_unlock(&head->th_rule_lock);
191
192         return rc;
193 }
194
195 static struct nrs_tbf_rule *
196 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
197                          const char *name)
198 {
199         struct nrs_tbf_rule *rule;
200
201         LASSERT(head != NULL);
202         list_for_each_entry(rule, &head->th_list, tr_linkage) {
203                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
204                 if (strcmp(rule->tr_name, name) == 0) {
205                         nrs_tbf_rule_get(rule);
206                         return rule;
207                 }
208         }
209         return NULL;
210 }
211
212 static struct nrs_tbf_rule *
213 nrs_tbf_rule_find(struct nrs_tbf_head *head,
214                   const char *name)
215 {
216         struct nrs_tbf_rule *rule;
217
218         LASSERT(head != NULL);
219         spin_lock(&head->th_rule_lock);
220         rule = nrs_tbf_rule_find_nolock(head, name);
221         spin_unlock(&head->th_rule_lock);
222         return rule;
223 }
224
225 static struct nrs_tbf_rule *
226 nrs_tbf_rule_match(struct nrs_tbf_head *head,
227                    struct nrs_tbf_client *cli)
228 {
229         struct nrs_tbf_rule *rule = NULL;
230         struct nrs_tbf_rule *tmp_rule;
231
232         spin_lock(&head->th_rule_lock);
233         /* Match the newest rule in the list */
234         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
235                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
236                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
237                         rule = tmp_rule;
238                         break;
239                 }
240         }
241
242         if (rule == NULL)
243                 rule = head->th_rule;
244
245         nrs_tbf_rule_get(rule);
246         spin_unlock(&head->th_rule_lock);
247         return rule;
248 }
249
250 static void
251 nrs_tbf_cli_init(struct nrs_tbf_head *head,
252                  struct nrs_tbf_client *cli,
253                  struct ptlrpc_request *req)
254 {
255         struct nrs_tbf_rule *rule;
256
257         memset(cli, 0, sizeof(*cli));
258         cli->tc_in_heap = false;
259         head->th_ops->o_cli_init(cli, req);
260         INIT_LIST_HEAD(&cli->tc_list);
261         INIT_LIST_HEAD(&cli->tc_linkage);
262         spin_lock_init(&cli->tc_rule_lock);
263         atomic_set(&cli->tc_ref, 1);
264         rule = nrs_tbf_rule_match(head, cli);
265         nrs_tbf_cli_reset(head, rule, cli);
266 }
267
268 static void
269 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
270 {
271         LASSERT(list_empty(&cli->tc_list));
272         LASSERT(!cli->tc_in_heap);
273         LASSERT(atomic_read(&cli->tc_ref) == 0);
274         spin_lock(&cli->tc_rule_lock);
275         nrs_tbf_cli_rule_put(cli);
276         spin_unlock(&cli->tc_rule_lock);
277         OBD_FREE_PTR(cli);
278 }
279
280 static int
281 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
282                    struct nrs_tbf_head *head,
283                    struct nrs_tbf_cmd *start)
284 {
285         struct nrs_tbf_rule     *rule;
286         struct nrs_tbf_rule     *tmp_rule;
287         struct nrs_tbf_rule     *next_rule;
288         char                    *next_name = start->u.tc_start.ts_next_name;
289         int                      rc;
290
291         rule = nrs_tbf_rule_find(head, start->tc_name);
292         if (rule) {
293                 nrs_tbf_rule_put(rule);
294                 return -EEXIST;
295         }
296
297         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
298         if (rule == NULL)
299                 return -ENOMEM;
300
301         memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
302         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
303         rule->tr_nsecs = NSEC_PER_SEC;
304         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
305         rule->tr_depth = tbf_depth;
306         atomic_set(&rule->tr_ref, 1);
307         INIT_LIST_HEAD(&rule->tr_cli_list);
308         INIT_LIST_HEAD(&rule->tr_nids);
309         INIT_LIST_HEAD(&rule->tr_linkage);
310         spin_lock_init(&rule->tr_rule_lock);
311         rule->tr_head = head;
312
313         rc = head->th_ops->o_rule_init(policy, rule, start);
314         if (rc) {
315                 OBD_FREE_PTR(rule);
316                 return rc;
317         }
318
319         /* Add as the newest rule */
320         spin_lock(&head->th_rule_lock);
321         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
322         if (tmp_rule) {
323                 spin_unlock(&head->th_rule_lock);
324                 nrs_tbf_rule_put(tmp_rule);
325                 nrs_tbf_rule_put(rule);
326                 return -EEXIST;
327         }
328
329         if (next_name) {
330                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
331                 if (!next_rule) {
332                         spin_unlock(&head->th_rule_lock);
333                         nrs_tbf_rule_put(rule);
334                         return -ENOENT;
335                 }
336
337                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
338                 nrs_tbf_rule_put(next_rule);
339         } else {
340                 /* Add on the top of the rule list */
341                 list_add(&rule->tr_linkage, &head->th_list);
342         }
343         spin_unlock(&head->th_rule_lock);
344         atomic_inc(&head->th_rule_sequence);
345         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
346                 rule->tr_flags |= NTRS_DEFAULT;
347                 LASSERT(head->th_rule == NULL);
348                 head->th_rule = rule;
349         }
350
351         return 0;
352 }
353
354 /**
355  * Change the rank of a rule in the rule list
356  *
357  * The matched rule will be moved to the position right before another
358  * given rule.
359  *
360  * \param[in] policy    the policy instance
361  * \param[in] head      the TBF policy instance
362  * \param[in] name      the rule name to be moved
363  * \param[in] next_name the rule name before which the matched rule will be
364  *                      moved
365  *
366  */
367 static int
368 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
369                          struct nrs_tbf_head *head,
370                          char *name,
371                          char *next_name)
372 {
373         struct nrs_tbf_rule     *rule = NULL;
374         struct nrs_tbf_rule     *next_rule = NULL;
375         int                      rc = 0;
376
377         LASSERT(head != NULL);
378
379         spin_lock(&head->th_rule_lock);
380         rule = nrs_tbf_rule_find_nolock(head, name);
381         if (!rule)
382                 GOTO(out, rc = -ENOENT);
383
384         if (strcmp(name, next_name) == 0)
385                 GOTO(out_put, rc);
386
387         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
388         if (!next_rule)
389                 GOTO(out_put, rc = -ENOENT);
390
391         list_move(&rule->tr_linkage, next_rule->tr_linkage.prev);
392         nrs_tbf_rule_put(next_rule);
393 out_put:
394         nrs_tbf_rule_put(rule);
395 out:
396         spin_unlock(&head->th_rule_lock);
397         return rc;
398 }
399
400 static int
401 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
402                          struct nrs_tbf_head *head,
403                          char *name,
404                          __u64 rate)
405 {
406         struct nrs_tbf_rule *rule;
407
408         assert_spin_locked(&policy->pol_nrs->nrs_lock);
409
410         rule = nrs_tbf_rule_find(head, name);
411         if (rule == NULL)
412                 return -ENOENT;
413
414         rule->tr_rpc_rate = rate;
415         rule->tr_nsecs = NSEC_PER_SEC;
416         do_div(rule->tr_nsecs, rule->tr_rpc_rate);
417         rule->tr_generation++;
418         nrs_tbf_rule_put(rule);
419
420         return 0;
421 }
422
423 static int
424 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
425                     struct nrs_tbf_head *head,
426                     struct nrs_tbf_cmd *change)
427 {
428         __u64    rate = change->u.tc_change.tc_rpc_rate;
429         char    *next_name = change->u.tc_change.tc_next_name;
430         int      rc;
431
432         if (rate != 0) {
433                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
434                                               rate);
435                 if (rc)
436                         return rc;
437         }
438
439         if (next_name) {
440                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
441                                               next_name);
442                 if (rc)
443                         return rc;
444         }
445
446         return 0;
447 }
448
449 static int
450 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
451                   struct nrs_tbf_head *head,
452                   struct nrs_tbf_cmd *stop)
453 {
454         struct nrs_tbf_rule *rule;
455
456         assert_spin_locked(&policy->pol_nrs->nrs_lock);
457
458         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
459                 return -EPERM;
460
461         rule = nrs_tbf_rule_find(head, stop->tc_name);
462         if (rule == NULL)
463                 return -ENOENT;
464
465         list_del_init(&rule->tr_linkage);
466         rule->tr_flags |= NTRS_STOPPING;
467         nrs_tbf_rule_put(rule);
468         nrs_tbf_rule_put(rule);
469
470         return 0;
471 }
472
473 static int
474 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
475                 struct nrs_tbf_head *head,
476                 struct nrs_tbf_cmd *cmd)
477 {
478         int rc;
479
480         assert_spin_locked(&policy->pol_nrs->nrs_lock);
481
482         switch (cmd->tc_cmd) {
483         case NRS_CTL_TBF_START_RULE:
484                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
485                         return -EINVAL;
486
487                 spin_unlock(&policy->pol_nrs->nrs_lock);
488                 rc = nrs_tbf_rule_start(policy, head, cmd);
489                 spin_lock(&policy->pol_nrs->nrs_lock);
490                 return rc;
491         case NRS_CTL_TBF_CHANGE_RULE:
492                 rc = nrs_tbf_rule_change(policy, head, cmd);
493                 return rc;
494         case NRS_CTL_TBF_STOP_RULE:
495                 rc = nrs_tbf_rule_stop(policy, head, cmd);
496                 /* Take it as a success, if not exists at all */
497                 return rc == -ENOENT ? 0 : rc;
498         default:
499                 return -EFAULT;
500         }
501 }
502
503 /**
504  * Binary heap predicate.
505  *
506  * \param[in] e1 the first binheap node to compare
507  * \param[in] e2 the second binheap node to compare
508  *
509  * \retval 0 e1 > e2
510  * \retval 1 e1 < e2
511  */
512 static int
513 tbf_cli_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
514 {
515         struct nrs_tbf_client *cli1;
516         struct nrs_tbf_client *cli2;
517
518         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
519         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
520
521         if (cli1->tc_check_time + cli1->tc_nsecs <
522             cli2->tc_check_time + cli2->tc_nsecs)
523                 return 1;
524         else if (cli1->tc_check_time + cli1->tc_nsecs >
525                  cli2->tc_check_time + cli2->tc_nsecs)
526                 return 0;
527
528         if (cli1->tc_check_time < cli2->tc_check_time)
529                 return 1;
530         else if (cli1->tc_check_time > cli2->tc_check_time)
531                 return 0;
532
533         /* Maybe need more comparasion, e.g. request number in the rules */
534         return 1;
535 }
536
537 /**
538  * TBF binary heap operations
539  */
540 static struct cfs_binheap_ops nrs_tbf_heap_ops = {
541         .hop_enter      = NULL,
542         .hop_exit       = NULL,
543         .hop_compare    = tbf_cli_compare,
544 };
545
546 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
547                                   unsigned mask)
548 {
549         return cfs_hash_djb2_hash(key, strlen(key), mask);
550 }
551
552 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
553 {
554         struct nrs_tbf_client *cli = hlist_entry(hnode,
555                                                      struct nrs_tbf_client,
556                                                      tc_hnode);
557
558         return (strcmp(cli->tc_jobid, key) == 0);
559 }
560
561 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
562 {
563         struct nrs_tbf_client *cli = hlist_entry(hnode,
564                                                      struct nrs_tbf_client,
565                                                      tc_hnode);
566
567         return cli->tc_jobid;
568 }
569
570 static void *nrs_tbf_jobid_hop_object(struct hlist_node *hnode)
571 {
572         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
573 }
574
575 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
576 {
577         struct nrs_tbf_client *cli = hlist_entry(hnode,
578                                                      struct nrs_tbf_client,
579                                                      tc_hnode);
580
581         atomic_inc(&cli->tc_ref);
582 }
583
584 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
585 {
586         struct nrs_tbf_client *cli = hlist_entry(hnode,
587                                                      struct nrs_tbf_client,
588                                                      tc_hnode);
589
590         atomic_dec(&cli->tc_ref);
591 }
592
593 static void
594 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
595
596 {
597         struct nrs_tbf_client *cli = hlist_entry(hnode,
598                                                  struct nrs_tbf_client,
599                                                  tc_hnode);
600
601         LASSERT(atomic_read(&cli->tc_ref) == 0);
602         nrs_tbf_cli_fini(cli);
603 }
604
605 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
606         .hs_hash        = nrs_tbf_jobid_hop_hash,
607         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
608         .hs_key         = nrs_tbf_jobid_hop_key,
609         .hs_object      = nrs_tbf_jobid_hop_object,
610         .hs_get         = nrs_tbf_jobid_hop_get,
611         .hs_put         = nrs_tbf_jobid_hop_put,
612         .hs_put_locked  = nrs_tbf_jobid_hop_put,
613         .hs_exit        = nrs_tbf_jobid_hop_exit,
614 };
615
616 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
617                                   CFS_HASH_NO_ITEMREF | \
618                                   CFS_HASH_DEPTH)
619
620 static struct nrs_tbf_client *
621 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
622                           struct cfs_hash_bd *bd,
623                           const char *jobid)
624 {
625         struct hlist_node *hnode;
626         struct nrs_tbf_client *cli;
627
628         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
629         if (hnode == NULL)
630                 return NULL;
631
632         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
633         if (!list_empty(&cli->tc_lru))
634                 list_del_init(&cli->tc_lru);
635         return cli;
636 }
637
638 #define NRS_TBF_JOBID_NULL ""
639
640 static struct nrs_tbf_client *
641 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
642                        struct ptlrpc_request *req)
643 {
644         const char              *jobid;
645         struct nrs_tbf_client   *cli;
646         struct cfs_hash         *hs = head->th_cli_hash;
647         struct cfs_hash_bd               bd;
648
649         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
650         if (jobid == NULL)
651                 jobid = NRS_TBF_JOBID_NULL;
652         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
653         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
654         cfs_hash_bd_unlock(hs, &bd, 1);
655
656         return cli;
657 }
658
659 static struct nrs_tbf_client *
660 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
661                           struct nrs_tbf_client *cli)
662 {
663         const char              *jobid;
664         struct nrs_tbf_client   *ret;
665         struct cfs_hash         *hs = head->th_cli_hash;
666         struct cfs_hash_bd               bd;
667
668         jobid = cli->tc_jobid;
669         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
670         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
671         if (ret == NULL) {
672                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
673                 ret = cli;
674         }
675         cfs_hash_bd_unlock(hs, &bd, 1);
676
677         return ret;
678 }
679
680 static void
681 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
682                       struct nrs_tbf_client *cli)
683 {
684         struct cfs_hash_bd               bd;
685         struct cfs_hash         *hs = head->th_cli_hash;
686         struct nrs_tbf_bucket   *bkt;
687         int                      hw;
688         struct list_head        zombies;
689
690         INIT_LIST_HEAD(&zombies);
691         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
692         bkt = cfs_hash_bd_extra_get(hs, &bd);
693         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
694                 return;
695         LASSERT(list_empty(&cli->tc_lru));
696         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
697
698         /*
699          * Check and purge the LRU, there is at least one client in the LRU.
700          */
701         hw = tbf_jobid_cache_size >>
702              (hs->hs_cur_bits - hs->hs_bkt_bits);
703         while (cfs_hash_bd_count_get(&bd) > hw) {
704                 if (unlikely(list_empty(&bkt->ntb_lru)))
705                         break;
706                 cli = list_entry(bkt->ntb_lru.next,
707                                      struct nrs_tbf_client,
708                                      tc_lru);
709                 LASSERT(atomic_read(&cli->tc_ref) == 0);
710                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
711                 list_move(&cli->tc_lru, &zombies);
712         }
713         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
714
715         while (!list_empty(&zombies)) {
716                 cli = container_of0(zombies.next,
717                                     struct nrs_tbf_client, tc_lru);
718                 list_del_init(&cli->tc_lru);
719                 nrs_tbf_cli_fini(cli);
720         }
721 }
722
723 static void
724 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
725                        struct ptlrpc_request *req)
726 {
727         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
728
729         if (jobid == NULL)
730                 jobid = NRS_TBF_JOBID_NULL;
731         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
732         INIT_LIST_HEAD(&cli->tc_lru);
733         memcpy(cli->tc_jobid, jobid, strlen(jobid));
734 }
735
736 static int nrs_tbf_jobid_hash_order(void)
737 {
738         int bits;
739
740         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
741                 ;
742
743         return bits;
744 }
745
746 #define NRS_TBF_JOBID_BKT_BITS 10
747
748 static int
749 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
750                       struct nrs_tbf_head *head)
751 {
752         struct nrs_tbf_cmd       start;
753         struct nrs_tbf_bucket   *bkt;
754         int                      bits;
755         int                      i;
756         int                      rc;
757         struct cfs_hash_bd       bd;
758
759         bits = nrs_tbf_jobid_hash_order();
760         if (bits < NRS_TBF_JOBID_BKT_BITS)
761                 bits = NRS_TBF_JOBID_BKT_BITS;
762         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
763                                             bits,
764                                             bits,
765                                             NRS_TBF_JOBID_BKT_BITS,
766                                             sizeof(*bkt),
767                                             0,
768                                             0,
769                                             &nrs_tbf_jobid_hash_ops,
770                                             NRS_TBF_JOBID_HASH_FLAGS);
771         if (head->th_cli_hash == NULL)
772                 return -ENOMEM;
773
774         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
775                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
776                 INIT_LIST_HEAD(&bkt->ntb_lru);
777         }
778
779         memset(&start, 0, sizeof(start));
780         start.u.tc_start.ts_jobids_str = "*";
781
782         start.u.tc_start.ts_rpc_rate = tbf_rate;
783         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
784         start.tc_name = NRS_TBF_DEFAULT_RULE;
785         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
786         rc = nrs_tbf_rule_start(policy, head, &start);
787         if (rc) {
788                 cfs_hash_putref(head->th_cli_hash);
789                 head->th_cli_hash = NULL;
790         }
791
792         return rc;
793 }
794
795 /**
796  * Frees jobid of \a list.
797  *
798  */
799 static void
800 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
801 {
802         struct nrs_tbf_jobid *jobid, *n;
803
804         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
805                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
806                 list_del(&jobid->tj_linkage);
807                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
808         }
809 }
810
811 static int
812 nrs_tbf_jobid_list_add(const struct cfs_lstr *id, struct list_head *jobid_list)
813 {
814         struct nrs_tbf_jobid *jobid;
815
816         OBD_ALLOC(jobid, sizeof(struct nrs_tbf_jobid));
817         if (jobid == NULL)
818                 return -ENOMEM;
819
820         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
821         if (jobid->tj_id == NULL) {
822                 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
823                 return -ENOMEM;
824         }
825
826         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
827         list_add_tail(&jobid->tj_linkage, jobid_list);
828         return 0;
829 }
830
831 static int
832 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
833 {
834         struct nrs_tbf_jobid *jobid;
835
836         list_for_each_entry(jobid, jobid_list, tj_linkage) {
837                 if (strcmp(id, jobid->tj_id) == 0)
838                         return 1;
839         }
840         return 0;
841 }
842
843 static int
844 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
845 {
846         struct cfs_lstr src;
847         struct cfs_lstr res;
848         int rc = 0;
849         ENTRY;
850
851         src.ls_str = str;
852         src.ls_len = len;
853         INIT_LIST_HEAD(jobid_list);
854         while (src.ls_str) {
855                 rc = cfs_gettok(&src, ' ', &res);
856                 if (rc == 0) {
857                         rc = -EINVAL;
858                         break;
859                 }
860                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
861                 if (rc)
862                         break;
863         }
864         if (rc)
865                 nrs_tbf_jobid_list_free(jobid_list);
866         RETURN(rc);
867 }
868
869 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
870 {
871         if (!list_empty(&cmd->u.tc_start.ts_jobids))
872                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
873         if (cmd->u.tc_start.ts_jobids_str)
874                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
875                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
876 }
877
878 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
879 {
880         struct cfs_lstr res;
881         int keylen = strlen(key);
882         int rc;
883
884         rc = cfs_gettok(src, '=', &res);
885         if (rc == 0 || res.ls_len != keylen ||
886             strncmp(res.ls_str, key, keylen) != 0 ||
887             src->ls_len <= 2 || src->ls_str[0] != '{' ||
888             src->ls_str[src->ls_len - 1] != '}')
889                 return -EINVAL;
890
891         /* Skip '{' and '}' */
892         src->ls_str++;
893         src->ls_len -= 2;
894         return 0;
895 }
896
897 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
898 {
899         struct cfs_lstr src;
900         int rc;
901
902         src.ls_str = id;
903         src.ls_len = strlen(id);
904         rc = nrs_tbf_check_id_value(&src, "jobid");
905         if (rc)
906                 return rc;
907
908         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
909         if (cmd->u.tc_start.ts_jobids_str == NULL)
910                 return -ENOMEM;
911
912         memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
913
914         /* parse jobid list */
915         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
916                                       strlen(cmd->u.tc_start.ts_jobids_str),
917                                       &cmd->u.tc_start.ts_jobids);
918         if (rc)
919                 nrs_tbf_jobid_cmd_fini(cmd);
920
921         return rc;
922 }
923
924 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
925                                    struct nrs_tbf_rule *rule,
926                                    struct nrs_tbf_cmd *start)
927 {
928         int rc = 0;
929
930         LASSERT(start->u.tc_start.ts_jobids_str);
931         OBD_ALLOC(rule->tr_jobids_str,
932                   strlen(start->u.tc_start.ts_jobids_str) + 1);
933         if (rule->tr_jobids_str == NULL)
934                 return -ENOMEM;
935
936         memcpy(rule->tr_jobids_str,
937                start->u.tc_start.ts_jobids_str,
938                strlen(start->u.tc_start.ts_jobids_str));
939
940         INIT_LIST_HEAD(&rule->tr_jobids);
941         if (!list_empty(&start->u.tc_start.ts_jobids)) {
942                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
943                                               strlen(rule->tr_jobids_str),
944                                               &rule->tr_jobids);
945                 if (rc)
946                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
947         }
948         if (rc)
949                 OBD_FREE(rule->tr_jobids_str,
950                          strlen(start->u.tc_start.ts_jobids_str) + 1);
951         return rc;
952 }
953
954 static int
955 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
956 {
957         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
958                    rule->tr_jobids_str, rule->tr_rpc_rate,
959                    atomic_read(&rule->tr_ref) - 1);
960         return 0;
961 }
962
963 static int
964 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
965                          struct nrs_tbf_client *cli)
966 {
967         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
968 }
969
970 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
971 {
972         if (!list_empty(&rule->tr_jobids))
973                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
974         LASSERT(rule->tr_jobids_str != NULL);
975         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
976 }
977
978 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
979         .o_name = NRS_TBF_TYPE_JOBID,
980         .o_startup = nrs_tbf_jobid_startup,
981         .o_cli_find = nrs_tbf_jobid_cli_find,
982         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
983         .o_cli_put = nrs_tbf_jobid_cli_put,
984         .o_cli_init = nrs_tbf_jobid_cli_init,
985         .o_rule_init = nrs_tbf_jobid_rule_init,
986         .o_rule_dump = nrs_tbf_jobid_rule_dump,
987         .o_rule_match = nrs_tbf_jobid_rule_match,
988         .o_rule_fini = nrs_tbf_jobid_rule_fini,
989 };
990
991 /**
992  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
993  *
994  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
995  * nrs_tbf_client objects.
996  */
997 #define NRS_TBF_NID_BKT_BITS    8
998 #define NRS_TBF_NID_BITS        16
999
1000 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1001                                   unsigned mask)
1002 {
1003         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1004 }
1005
1006 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1007 {
1008         lnet_nid_t            *nid = (lnet_nid_t *)key;
1009         struct nrs_tbf_client *cli = hlist_entry(hnode,
1010                                                      struct nrs_tbf_client,
1011                                                      tc_hnode);
1012
1013         return *nid == cli->tc_nid;
1014 }
1015
1016 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1017 {
1018         struct nrs_tbf_client *cli = hlist_entry(hnode,
1019                                                      struct nrs_tbf_client,
1020                                                      tc_hnode);
1021
1022         return &cli->tc_nid;
1023 }
1024
1025 static void *nrs_tbf_nid_hop_object(struct hlist_node *hnode)
1026 {
1027         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
1028 }
1029
1030 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1031 {
1032         struct nrs_tbf_client *cli = hlist_entry(hnode,
1033                                                      struct nrs_tbf_client,
1034                                                      tc_hnode);
1035
1036         atomic_inc(&cli->tc_ref);
1037 }
1038
1039 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1040 {
1041         struct nrs_tbf_client *cli = hlist_entry(hnode,
1042                                                      struct nrs_tbf_client,
1043                                                      tc_hnode);
1044
1045         atomic_dec(&cli->tc_ref);
1046 }
1047
1048 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1049 {
1050         struct nrs_tbf_client *cli = hlist_entry(hnode,
1051                                                      struct nrs_tbf_client,
1052                                                      tc_hnode);
1053
1054         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1055                  "Busy TBF object from client with NID %s, with %d refs\n",
1056                  libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1057
1058         nrs_tbf_cli_fini(cli);
1059 }
1060
1061 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1062         .hs_hash        = nrs_tbf_nid_hop_hash,
1063         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1064         .hs_key         = nrs_tbf_nid_hop_key,
1065         .hs_object      = nrs_tbf_nid_hop_object,
1066         .hs_get         = nrs_tbf_nid_hop_get,
1067         .hs_put         = nrs_tbf_nid_hop_put,
1068         .hs_put_locked  = nrs_tbf_nid_hop_put,
1069         .hs_exit        = nrs_tbf_nid_hop_exit,
1070 };
1071
1072 static struct nrs_tbf_client *
1073 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1074                      struct ptlrpc_request *req)
1075 {
1076         return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1077 }
1078
1079 static struct nrs_tbf_client *
1080 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1081                         struct nrs_tbf_client *cli)
1082 {
1083         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1084                                        &cli->tc_hnode);
1085 }
1086
1087 static void
1088 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1089                       struct nrs_tbf_client *cli)
1090 {
1091         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1092 }
1093
1094 static int
1095 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1096                     struct nrs_tbf_head *head)
1097 {
1098         struct nrs_tbf_cmd      start;
1099         int rc;
1100
1101         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1102                                             NRS_TBF_NID_BITS,
1103                                             NRS_TBF_NID_BITS,
1104                                             NRS_TBF_NID_BKT_BITS, 0,
1105                                             CFS_HASH_MIN_THETA,
1106                                             CFS_HASH_MAX_THETA,
1107                                             &nrs_tbf_nid_hash_ops,
1108                                             CFS_HASH_RW_BKTLOCK);
1109         if (head->th_cli_hash == NULL)
1110                 return -ENOMEM;
1111
1112         memset(&start, 0, sizeof(start));
1113         start.u.tc_start.ts_nids_str = "*";
1114
1115         start.u.tc_start.ts_rpc_rate = tbf_rate;
1116         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1117         start.tc_name = NRS_TBF_DEFAULT_RULE;
1118         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1119         rc = nrs_tbf_rule_start(policy, head, &start);
1120         if (rc) {
1121                 cfs_hash_putref(head->th_cli_hash);
1122                 head->th_cli_hash = NULL;
1123         }
1124
1125         return rc;
1126 }
1127
1128 static void
1129 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1130                              struct ptlrpc_request *req)
1131 {
1132         cli->tc_nid = req->rq_peer.nid;
1133 }
1134
1135 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1136                                  struct nrs_tbf_rule *rule,
1137                                  struct nrs_tbf_cmd *start)
1138 {
1139         LASSERT(start->u.tc_start.ts_nids_str);
1140         OBD_ALLOC(rule->tr_nids_str,
1141                   strlen(start->u.tc_start.ts_nids_str) + 1);
1142         if (rule->tr_nids_str == NULL)
1143                 return -ENOMEM;
1144
1145         memcpy(rule->tr_nids_str,
1146                start->u.tc_start.ts_nids_str,
1147                strlen(start->u.tc_start.ts_nids_str));
1148
1149         INIT_LIST_HEAD(&rule->tr_nids);
1150         if (!list_empty(&start->u.tc_start.ts_nids)) {
1151                 if (cfs_parse_nidlist(rule->tr_nids_str,
1152                                       strlen(rule->tr_nids_str),
1153                                       &rule->tr_nids) <= 0) {
1154                         CERROR("nids {%s} illegal\n",
1155                                rule->tr_nids_str);
1156                         OBD_FREE(rule->tr_nids_str,
1157                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1158                         return -EINVAL;
1159                 }
1160         }
1161         return 0;
1162 }
1163
1164 static int
1165 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1166 {
1167         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1168                    rule->tr_nids_str, rule->tr_rpc_rate,
1169                    atomic_read(&rule->tr_ref) - 1);
1170         return 0;
1171 }
1172
1173 static int
1174 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1175                        struct nrs_tbf_client *cli)
1176 {
1177         return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1178 }
1179
1180 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1181 {
1182         if (!list_empty(&rule->tr_nids))
1183                 cfs_free_nidlist(&rule->tr_nids);
1184         LASSERT(rule->tr_nids_str != NULL);
1185         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1186 }
1187
1188 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1189 {
1190         if (!list_empty(&cmd->u.tc_start.ts_nids))
1191                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1192         if (cmd->u.tc_start.ts_nids_str)
1193                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1194                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1195 }
1196
1197 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1198 {
1199         struct cfs_lstr src;
1200         int rc;
1201
1202         src.ls_str = id;
1203         src.ls_len = strlen(id);
1204         rc = nrs_tbf_check_id_value(&src, "nid");
1205         if (rc)
1206                 return rc;
1207
1208         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1209         if (cmd->u.tc_start.ts_nids_str == NULL)
1210                 return -ENOMEM;
1211
1212         memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1213
1214         /* parse NID list */
1215         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1216                               strlen(cmd->u.tc_start.ts_nids_str),
1217                               &cmd->u.tc_start.ts_nids) <= 0) {
1218                 nrs_tbf_nid_cmd_fini(cmd);
1219                 return -EINVAL;
1220         }
1221
1222         return 0;
1223 }
1224
1225 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1226         .o_name = NRS_TBF_TYPE_NID,
1227         .o_startup = nrs_tbf_nid_startup,
1228         .o_cli_find = nrs_tbf_nid_cli_find,
1229         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1230         .o_cli_put = nrs_tbf_nid_cli_put,
1231         .o_cli_init = nrs_tbf_nid_cli_init,
1232         .o_rule_init = nrs_tbf_nid_rule_init,
1233         .o_rule_dump = nrs_tbf_nid_rule_dump,
1234         .o_rule_match = nrs_tbf_nid_rule_match,
1235         .o_rule_fini = nrs_tbf_nid_rule_fini,
1236 };
1237
1238 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1239                                  unsigned mask)
1240 {
1241         return cfs_hash_djb2_hash(key, strlen(key), mask);
1242 }
1243
1244 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1245 {
1246         struct nrs_tbf_client *cli = hlist_entry(hnode,
1247                                                  struct nrs_tbf_client,
1248                                                  tc_hnode);
1249
1250         return (strcmp(cli->tc_key, key) == 0);
1251 }
1252
1253 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1254 {
1255         struct nrs_tbf_client *cli = hlist_entry(hnode,
1256                                                  struct nrs_tbf_client,
1257                                                  tc_hnode);
1258         return cli->tc_key;
1259 }
1260
1261 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
1262 {
1263         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
1264 }
1265
1266 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1267 {
1268         struct nrs_tbf_client *cli = hlist_entry(hnode,
1269                                                  struct nrs_tbf_client,
1270                                                  tc_hnode);
1271
1272         atomic_inc(&cli->tc_ref);
1273 }
1274
1275 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1276 {
1277         struct nrs_tbf_client *cli = hlist_entry(hnode,
1278                                                  struct nrs_tbf_client,
1279                                                  tc_hnode);
1280
1281         atomic_dec(&cli->tc_ref);
1282 }
1283
1284 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1285
1286 {
1287         struct nrs_tbf_client *cli = hlist_entry(hnode,
1288                                                  struct nrs_tbf_client,
1289                                                  tc_hnode);
1290
1291         LASSERT(atomic_read(&cli->tc_ref) == 0);
1292         nrs_tbf_cli_fini(cli);
1293 }
1294
1295 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1296         .hs_hash        = nrs_tbf_hop_hash,
1297         .hs_keycmp      = nrs_tbf_hop_keycmp,
1298         .hs_key         = nrs_tbf_hop_key,
1299         .hs_object      = nrs_tbf_hop_object,
1300         .hs_get         = nrs_tbf_hop_get,
1301         .hs_put         = nrs_tbf_hop_put,
1302         .hs_put_locked  = nrs_tbf_hop_put,
1303         .hs_exit        = nrs_tbf_hop_exit,
1304 };
1305
1306 #define NRS_TBF_GENERIC_BKT_BITS        10
1307 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1308                                         CFS_HASH_NO_ITEMREF | \
1309                                         CFS_HASH_DEPTH)
1310
1311 static int
1312 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1313 {
1314         struct nrs_tbf_cmd       start;
1315         struct nrs_tbf_bucket   *bkt;
1316         int                      bits;
1317         int                      i;
1318         int                      rc;
1319         struct cfs_hash_bd       bd;
1320
1321         bits = nrs_tbf_jobid_hash_order();
1322         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1323                 bits = NRS_TBF_GENERIC_BKT_BITS;
1324         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1325                                             bits, bits,
1326                                             NRS_TBF_GENERIC_BKT_BITS,
1327                                             sizeof(*bkt), 0, 0,
1328                                             &nrs_tbf_hash_ops,
1329                                             NRS_TBF_GENERIC_HASH_FLAGS);
1330         if (head->th_cli_hash == NULL)
1331                 return -ENOMEM;
1332
1333         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1334                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1335                 INIT_LIST_HEAD(&bkt->ntb_lru);
1336         }
1337
1338         memset(&start, 0, sizeof(start));
1339         start.u.tc_start.ts_conds_str = "*";
1340
1341         start.u.tc_start.ts_rpc_rate = tbf_rate;
1342         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1343         start.tc_name = NRS_TBF_DEFAULT_RULE;
1344         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1345         rc = nrs_tbf_rule_start(policy, head, &start);
1346         if (rc)
1347                 cfs_hash_putref(head->th_cli_hash);
1348
1349         return rc;
1350 }
1351
1352 static struct nrs_tbf_client *
1353 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1354                         const char *key)
1355 {
1356         struct hlist_node *hnode;
1357         struct nrs_tbf_client *cli;
1358
1359         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1360         if (hnode == NULL)
1361                 return NULL;
1362
1363         cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
1364         if (!list_empty(&cli->tc_lru))
1365                 list_del_init(&cli->tc_lru);
1366         return cli;
1367 }
1368
1369 static struct nrs_tbf_client *
1370 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1371 {
1372         struct nrs_tbf_client *cli;
1373         struct cfs_hash *hs = head->th_cli_hash;
1374         struct cfs_hash_bd bd;
1375         char keystr[NRS_TBF_KEY_LEN] = { '\0' };
1376         const char *jobid;
1377         __u32 opc;
1378
1379         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1380         if (jobid == NULL)
1381                 jobid = NRS_TBF_JOBID_NULL;
1382         opc = lustre_msg_get_opc(req->rq_reqmsg);
1383         snprintf(keystr, sizeof(keystr), "%s_%s_%d", jobid,
1384                  libcfs_nid2str(req->rq_peer.nid), opc);
1385         LASSERT(strlen(keystr) < NRS_TBF_KEY_LEN);
1386         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1387         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1388         cfs_hash_bd_unlock(hs, &bd, 1);
1389
1390         return cli;
1391 }
1392
1393 static struct nrs_tbf_client *
1394 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1395                     struct nrs_tbf_client *cli)
1396 {
1397         const char              *key;
1398         struct nrs_tbf_client   *ret;
1399         struct cfs_hash         *hs = head->th_cli_hash;
1400         struct cfs_hash_bd       bd;
1401
1402         key = cli->tc_key;
1403         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1404         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1405         if (ret == NULL) {
1406                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1407                 ret = cli;
1408         }
1409         cfs_hash_bd_unlock(hs, &bd, 1);
1410
1411         return ret;
1412 }
1413
1414 static void
1415 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1416 {
1417         struct cfs_hash_bd       bd;
1418         struct cfs_hash         *hs = head->th_cli_hash;
1419         struct nrs_tbf_bucket   *bkt;
1420         int                      hw;
1421         struct list_head         zombies;
1422
1423         INIT_LIST_HEAD(&zombies);
1424         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1425         bkt = cfs_hash_bd_extra_get(hs, &bd);
1426         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1427                 return;
1428         LASSERT(list_empty(&cli->tc_lru));
1429         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1430
1431         /**
1432          * Check and purge the LRU, there is at least one client in the LRU.
1433          */
1434         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1435         while (cfs_hash_bd_count_get(&bd) > hw) {
1436                 if (unlikely(list_empty(&bkt->ntb_lru)))
1437                         break;
1438                 cli = list_entry(bkt->ntb_lru.next,
1439                                  struct nrs_tbf_client,
1440                                  tc_lru);
1441                 LASSERT(atomic_read(&cli->tc_ref) == 0);
1442                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1443                 list_move(&cli->tc_lru, &zombies);
1444         }
1445         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1446
1447         while (!list_empty(&zombies)) {
1448                 cli = container_of0(zombies.next,
1449                                     struct nrs_tbf_client, tc_lru);
1450                 list_del_init(&cli->tc_lru);
1451                 nrs_tbf_cli_fini(cli);
1452         }
1453 }
1454
1455 static void
1456 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1457                          struct ptlrpc_request *req)
1458 {
1459         char keystr[NRS_TBF_KEY_LEN];
1460         const char *jobid;
1461         __u32 opc;
1462
1463         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1464         if (jobid == NULL)
1465                 jobid = NRS_TBF_JOBID_NULL;
1466         opc = lustre_msg_get_opc(req->rq_reqmsg);
1467         snprintf(keystr, sizeof(keystr), "%s_%s_%d", jobid,
1468                  libcfs_nid2str(req->rq_peer.nid), opc);
1469
1470         LASSERT(strlen(keystr) < NRS_TBF_KEY_LEN);
1471         INIT_LIST_HEAD(&cli->tc_lru);
1472         memcpy(cli->tc_key, keystr, strlen(keystr));
1473         memcpy(cli->tc_jobid, jobid, strlen(jobid));
1474         cli->tc_nid = req->rq_peer.nid;
1475         cli->tc_opcode = opc;
1476 }
1477
1478 static void
1479 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1480 {
1481         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1482                 expr->te_field < NRS_TBF_FIELD_MAX);
1483         switch (expr->te_field) {
1484         case NRS_TBF_FIELD_NID:
1485                 cfs_free_nidlist(&expr->te_cond);
1486                 break;
1487         case NRS_TBF_FIELD_JOBID:
1488                 nrs_tbf_jobid_list_free(&expr->te_cond);
1489                 break;
1490         case NRS_TBF_FIELD_OPCODE:
1491                 CFS_FREE_BITMAP(expr->te_opcodes);
1492                 break;
1493         default:
1494                 LBUG();
1495         }
1496         OBD_FREE_PTR(expr);
1497 }
1498
1499 static void
1500 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1501 {
1502         struct nrs_tbf_expression *expression;
1503         struct nrs_tbf_expression *n;
1504
1505         LASSERT(list_empty(&conjunction->tc_linkage));
1506         list_for_each_entry_safe(expression, n,
1507                                  &conjunction->tc_expressions,
1508                                  te_linkage) {
1509                 list_del_init(&expression->te_linkage);
1510                 nrs_tbf_expression_free(expression);
1511         }
1512         OBD_FREE_PTR(conjunction);
1513 }
1514
1515 static void
1516 nrs_tbf_conds_free(struct list_head *cond_list)
1517 {
1518         struct nrs_tbf_conjunction *conjunction;
1519         struct nrs_tbf_conjunction *n;
1520
1521         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1522                 list_del_init(&conjunction->tc_linkage);
1523                 nrs_tbf_conjunction_free(conjunction);
1524         }
1525 }
1526
1527 static void
1528 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1529 {
1530         if (!list_empty(&cmd->u.tc_start.ts_conds))
1531                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1532         if (cmd->u.tc_start.ts_conds_str)
1533                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1534                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1535 }
1536
1537 #define NRS_TBF_DISJUNCTION_DELIM       (',')
1538 #define NRS_TBF_CONJUNCTION_DELIM       ('&')
1539 #define NRS_TBF_EXPRESSION_DELIM        ('=')
1540
1541 static inline bool
1542 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1543 {
1544         int len = strlen(str);
1545
1546         return (field->ls_len == len &&
1547                 strncmp(field->ls_str, str, len) == 0);
1548 }
1549
1550 static int
1551 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1552
1553 static int
1554 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1555 {
1556         struct nrs_tbf_expression *expr;
1557         struct cfs_lstr field;
1558         int rc = 0;
1559
1560         OBD_ALLOC(expr, sizeof(struct nrs_tbf_expression));
1561         if (expr == NULL)
1562                 return -ENOMEM;
1563
1564         rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1565         if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1566             src->ls_str[src->ls_len - 1] != '}')
1567                 GOTO(out, rc = -EINVAL);
1568
1569         /* Skip '{' and '}' */
1570         src->ls_str++;
1571         src->ls_len -= 2;
1572
1573         if (nrs_tbf_check_field(&field, "nid")) {
1574                 if (cfs_parse_nidlist(src->ls_str,
1575                                       src->ls_len,
1576                                       &expr->te_cond) <= 0)
1577                         GOTO(out, rc = -EINVAL);
1578                 expr->te_field = NRS_TBF_FIELD_NID;
1579         } else if (nrs_tbf_check_field(&field, "jobid")) {
1580                 if (nrs_tbf_jobid_list_parse(src->ls_str,
1581                                              src->ls_len,
1582                                              &expr->te_cond) < 0)
1583                         GOTO(out, rc = -EINVAL);
1584                 expr->te_field = NRS_TBF_FIELD_JOBID;
1585         } else if (nrs_tbf_check_field(&field, "opcode")) {
1586                 if (nrs_tbf_opcode_list_parse(src->ls_str,
1587                                               src->ls_len,
1588                                               &expr->te_opcodes) < 0)
1589                         GOTO(out, rc = -EINVAL);
1590                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1591         } else
1592                 GOTO(out, rc = -EINVAL);
1593
1594         list_add_tail(&expr->te_linkage, cond_list);
1595         return 0;
1596 out:
1597         OBD_FREE_PTR(expr);
1598         return rc;
1599 }
1600
1601 static int
1602 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1603 {
1604         struct nrs_tbf_conjunction *conjunction;
1605         struct cfs_lstr expr;
1606         int rc = 0;
1607
1608         OBD_ALLOC(conjunction, sizeof(struct nrs_tbf_conjunction));
1609         if (conjunction == NULL)
1610                 return -ENOMEM;
1611
1612         INIT_LIST_HEAD(&conjunction->tc_expressions);
1613         list_add_tail(&conjunction->tc_linkage, cond_list);
1614
1615         while (src->ls_str) {
1616                 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1617                 if (rc == 0) {
1618                         rc = -EINVAL;
1619                         break;
1620                 }
1621                 rc = nrs_tbf_expression_parse(&expr,
1622                                               &conjunction->tc_expressions);
1623                 if (rc)
1624                         break;
1625         }
1626         return rc;
1627 }
1628
1629 static int
1630 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1631 {
1632         struct cfs_lstr src;
1633         struct cfs_lstr res;
1634         int rc = 0;
1635
1636         src.ls_str = str;
1637         src.ls_len = len;
1638         INIT_LIST_HEAD(cond_list);
1639         while (src.ls_str) {
1640                 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1641                 if (rc == 0) {
1642                         rc = -EINVAL;
1643                         break;
1644                 }
1645                 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1646                 if (rc)
1647                         break;
1648         }
1649         return rc;
1650 }
1651
1652 static int
1653 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1654 {
1655         int rc;
1656
1657         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1658         if (cmd->u.tc_start.ts_conds_str == NULL)
1659                 return -ENOMEM;
1660
1661         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1662
1663         /* Parse hybird NID and JOBID conditions */
1664         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1665                                  strlen(cmd->u.tc_start.ts_conds_str),
1666                                  &cmd->u.tc_start.ts_conds);
1667         if (rc)
1668                 nrs_tbf_generic_cmd_fini(cmd);
1669
1670         return rc;
1671 }
1672
1673 static int
1674 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1675                          struct nrs_tbf_rule *rule,
1676                          struct nrs_tbf_client *cli)
1677 {
1678         switch (expr->te_field) {
1679         case NRS_TBF_FIELD_NID:
1680                 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1681         case NRS_TBF_FIELD_JOBID:
1682                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1683         case NRS_TBF_FIELD_OPCODE:
1684                 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1685         default:
1686                 return 0;
1687         }
1688 }
1689
1690 static int
1691 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1692                           struct nrs_tbf_rule *rule,
1693                           struct nrs_tbf_client *cli)
1694 {
1695         struct nrs_tbf_expression *expr;
1696         int matched;
1697
1698         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
1699                 matched = nrs_tbf_expression_match(expr, rule, cli);
1700                 if (!matched)
1701                         return 0;
1702         }
1703
1704         return 1;
1705 }
1706
1707 static int
1708 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
1709 {
1710         struct nrs_tbf_conjunction *conjunction;
1711         int matched;
1712
1713         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
1714                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
1715                 if (matched)
1716                         return 1;
1717         }
1718
1719         return 0;
1720 }
1721
1722 static void
1723 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
1724 {
1725         if (!list_empty(&rule->tr_conds))
1726                 nrs_tbf_conds_free(&rule->tr_conds);
1727         LASSERT(rule->tr_conds_str != NULL);
1728         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
1729 }
1730
1731 static int
1732 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
1733                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
1734 {
1735         int rc = 0;
1736
1737         LASSERT(start->u.tc_start.ts_conds_str);
1738         OBD_ALLOC(rule->tr_conds_str,
1739                   strlen(start->u.tc_start.ts_conds_str) + 1);
1740         if (rule->tr_conds_str == NULL)
1741                 return -ENOMEM;
1742
1743         memcpy(rule->tr_conds_str,
1744                start->u.tc_start.ts_conds_str,
1745                strlen(start->u.tc_start.ts_conds_str));
1746
1747         INIT_LIST_HEAD(&rule->tr_conds);
1748         if (!list_empty(&start->u.tc_start.ts_conds)) {
1749                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
1750                                          strlen(rule->tr_conds_str),
1751                                          &rule->tr_conds);
1752         }
1753         if (rc)
1754                 nrs_tbf_generic_rule_fini(rule);
1755
1756         return rc;
1757 }
1758
1759 static int
1760 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1761 {
1762         seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
1763                    rule->tr_conds_str, rule->tr_rpc_rate,
1764                    atomic_read(&rule->tr_ref) - 1);
1765         return 0;
1766 }
1767
1768 static int
1769 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
1770                            struct nrs_tbf_client *cli)
1771 {
1772         return nrs_tbf_cond_match(rule, cli);
1773 }
1774
1775 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
1776         .o_name = NRS_TBF_TYPE_GENERIC,
1777         .o_startup = nrs_tbf_startup,
1778         .o_cli_find = nrs_tbf_cli_find,
1779         .o_cli_findadd = nrs_tbf_cli_findadd,
1780         .o_cli_put = nrs_tbf_cli_put,
1781         .o_cli_init = nrs_tbf_generic_cli_init,
1782         .o_rule_init = nrs_tbf_rule_init,
1783         .o_rule_dump = nrs_tbf_generic_rule_dump,
1784         .o_rule_match = nrs_tbf_generic_rule_match,
1785         .o_rule_fini = nrs_tbf_generic_rule_fini,
1786 };
1787
1788 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
1789 {
1790         if (rule->tr_opcodes != NULL)
1791                 CFS_FREE_BITMAP(rule->tr_opcodes);
1792
1793         LASSERT(rule->tr_opcodes_str != NULL);
1794         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
1795 }
1796
1797 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
1798                                         unsigned mask)
1799 {
1800         return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
1801 }
1802
1803 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
1804 {
1805         const __u32     *opc = key;
1806         struct nrs_tbf_client *cli = hlist_entry(hnode,
1807                                                  struct nrs_tbf_client,
1808                                                  tc_hnode);
1809
1810         return *opc == cli->tc_opcode;
1811 }
1812
1813 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
1814 {
1815         struct nrs_tbf_client *cli = hlist_entry(hnode,
1816                                                  struct nrs_tbf_client,
1817                                                  tc_hnode);
1818
1819         return &cli->tc_opcode;
1820 }
1821
1822 static void *nrs_tbf_opcode_hop_object(struct hlist_node *hnode)
1823 {
1824         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
1825 }
1826
1827 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
1828                                    struct hlist_node *hnode)
1829 {
1830         struct nrs_tbf_client *cli = hlist_entry(hnode,
1831                                                  struct nrs_tbf_client,
1832                                                  tc_hnode);
1833
1834         atomic_inc(&cli->tc_ref);
1835 }
1836
1837 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
1838                                    struct hlist_node *hnode)
1839 {
1840         struct nrs_tbf_client *cli = hlist_entry(hnode,
1841                                                  struct nrs_tbf_client,
1842                                                  tc_hnode);
1843
1844         atomic_dec(&cli->tc_ref);
1845 }
1846
1847 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
1848                                     struct hlist_node *hnode)
1849 {
1850         struct nrs_tbf_client *cli = hlist_entry(hnode,
1851                                                  struct nrs_tbf_client,
1852                                                  tc_hnode);
1853
1854         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1855                  "Busy TBF object from client with opcode %s, with %d refs\n",
1856                  ll_opcode2str(cli->tc_opcode),
1857                  atomic_read(&cli->tc_ref));
1858
1859         nrs_tbf_cli_fini(cli);
1860 }
1861 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
1862         .hs_hash        = nrs_tbf_opcode_hop_hash,
1863         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
1864         .hs_key         = nrs_tbf_opcode_hop_key,
1865         .hs_object      = nrs_tbf_opcode_hop_object,
1866         .hs_get         = nrs_tbf_opcode_hop_get,
1867         .hs_put         = nrs_tbf_opcode_hop_put,
1868         .hs_put_locked  = nrs_tbf_opcode_hop_put,
1869         .hs_exit        = nrs_tbf_opcode_hop_exit,
1870 };
1871
1872 static int
1873 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
1874                     struct nrs_tbf_head *head)
1875 {
1876         struct nrs_tbf_cmd      start = { 0 };
1877         int rc;
1878
1879         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1880                                             NRS_TBF_NID_BITS,
1881                                             NRS_TBF_NID_BITS,
1882                                             NRS_TBF_NID_BKT_BITS, 0,
1883                                             CFS_HASH_MIN_THETA,
1884                                             CFS_HASH_MAX_THETA,
1885                                             &nrs_tbf_opcode_hash_ops,
1886                                             CFS_HASH_RW_BKTLOCK);
1887         if (head->th_cli_hash == NULL)
1888                 return -ENOMEM;
1889
1890         start.u.tc_start.ts_opcodes = NULL;
1891         start.u.tc_start.ts_opcodes_str = "*";
1892
1893         start.u.tc_start.ts_rpc_rate = tbf_rate;
1894         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1895         start.tc_name = NRS_TBF_DEFAULT_RULE;
1896         rc = nrs_tbf_rule_start(policy, head, &start);
1897
1898         return rc;
1899 }
1900
1901 static struct nrs_tbf_client *
1902 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
1903                         struct ptlrpc_request *req)
1904 {
1905         __u32 opc;
1906
1907         opc = lustre_msg_get_opc(req->rq_reqmsg);
1908         return cfs_hash_lookup(head->th_cli_hash, &opc);
1909 }
1910
1911 static struct nrs_tbf_client *
1912 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
1913                            struct nrs_tbf_client *cli)
1914 {
1915         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
1916                                        &cli->tc_hnode);
1917 }
1918
1919 static void
1920 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
1921                         struct ptlrpc_request *req)
1922 {
1923         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
1924 }
1925
1926 #define MAX_OPCODE_LEN  32
1927 static int
1928 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
1929 {
1930         int     op = 0;
1931         char    opcode_str[MAX_OPCODE_LEN];
1932
1933         if (id->ls_len + 1 > MAX_OPCODE_LEN)
1934                 return -EINVAL;
1935
1936         memcpy(opcode_str, id->ls_str, id->ls_len);
1937         opcode_str[id->ls_len] = '\0';
1938
1939         op = ll_str2opcode(opcode_str);
1940         if (op < 0)
1941                 return -EINVAL;
1942
1943         cfs_bitmap_set(opcodes, op);
1944         return 0;
1945 }
1946
1947 static int
1948 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
1949 {
1950         struct cfs_bitmap *opcodes;
1951         struct cfs_lstr src;
1952         struct cfs_lstr res;
1953         int rc = 0;
1954         ENTRY;
1955
1956         opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
1957         if (opcodes == NULL)
1958                 return -ENOMEM;
1959
1960         src.ls_str = str;
1961         src.ls_len = len;
1962         while (src.ls_str) {
1963                 rc = cfs_gettok(&src, ' ', &res);
1964                 if (rc == 0) {
1965                         rc = -EINVAL;
1966                         break;
1967                 }
1968                 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
1969                 if (rc)
1970                         break;
1971         }
1972
1973         if (rc == 0)
1974                 *bitmaptr = opcodes;
1975         else
1976                 CFS_FREE_BITMAP(opcodes);
1977
1978         RETURN(rc);
1979 }
1980
1981 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
1982 {
1983         if (cmd->u.tc_start.ts_opcodes)
1984                 CFS_FREE_BITMAP(cmd->u.tc_start.ts_opcodes);
1985
1986         if (cmd->u.tc_start.ts_opcodes_str)
1987                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
1988                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
1989
1990 }
1991
1992 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
1993 {
1994         struct cfs_lstr src;
1995         int rc;
1996
1997         src.ls_str = id;
1998         src.ls_len = strlen(id);
1999         rc = nrs_tbf_check_id_value(&src, "opcode");
2000         if (rc)
2001                 return rc;
2002
2003         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2004         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2005                 return -ENOMEM;
2006
2007         memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2008
2009         /* parse opcode list */
2010         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2011                                        strlen(cmd->u.tc_start.ts_opcodes_str),
2012                                        &cmd->u.tc_start.ts_opcodes);
2013         if (rc)
2014                 nrs_tbf_opcode_cmd_fini(cmd);
2015
2016         return rc;
2017 }
2018
2019 static int
2020 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2021                           struct nrs_tbf_client *cli)
2022 {
2023         if (rule->tr_opcodes == NULL)
2024                 return 0;
2025
2026         return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2027 }
2028
2029 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2030                                     struct nrs_tbf_rule *rule,
2031                                     struct nrs_tbf_cmd *start)
2032 {
2033         int rc = 0;
2034
2035         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2036         OBD_ALLOC(rule->tr_opcodes_str,
2037                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2038         if (rule->tr_opcodes_str == NULL)
2039                 return -ENOMEM;
2040
2041         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2042                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2043
2044         /* Default rule '*' */
2045         if (start->u.tc_start.ts_opcodes == NULL)
2046                 return 0;
2047
2048         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2049                                        strlen(rule->tr_opcodes_str),
2050                                        &rule->tr_opcodes);
2051         if (rc)
2052                 OBD_FREE(rule->tr_opcodes_str,
2053                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2054
2055         return rc;
2056 }
2057
2058 static int
2059 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2060 {
2061         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2062                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2063                    atomic_read(&rule->tr_ref) - 1);
2064         return 0;
2065 }
2066
2067
2068 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2069         .o_name = NRS_TBF_TYPE_OPCODE,
2070         .o_startup = nrs_tbf_opcode_startup,
2071         .o_cli_find = nrs_tbf_opcode_cli_find,
2072         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2073         .o_cli_put = nrs_tbf_nid_cli_put,
2074         .o_cli_init = nrs_tbf_opcode_cli_init,
2075         .o_rule_init = nrs_tbf_opcode_rule_init,
2076         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2077         .o_rule_match = nrs_tbf_opcode_rule_match,
2078         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2079 };
2080
2081 static struct nrs_tbf_type nrs_tbf_types[] = {
2082         {
2083                 .ntt_name = NRS_TBF_TYPE_JOBID,
2084                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2085                 .ntt_ops = &nrs_tbf_jobid_ops,
2086         },
2087         {
2088                 .ntt_name = NRS_TBF_TYPE_NID,
2089                 .ntt_flag = NRS_TBF_FLAG_NID,
2090                 .ntt_ops = &nrs_tbf_nid_ops,
2091         },
2092         {
2093                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2094                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2095                 .ntt_ops = &nrs_tbf_opcode_ops,
2096         },
2097         {
2098                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2099                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2100                 .ntt_ops = &nrs_tbf_generic_ops,
2101         },
2102 };
2103
2104 /**
2105  * Is called before the policy transitions into
2106  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2107  * policy-specific private data structure.
2108  *
2109  * \param[in] policy The policy to start
2110  *
2111  * \retval -ENOMEM OOM error
2112  * \retval  0      success
2113  *
2114  * \see nrs_policy_register()
2115  * \see nrs_policy_ctl()
2116  */
2117 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2118 {
2119         struct nrs_tbf_head     *head;
2120         struct nrs_tbf_ops      *ops;
2121         __u32                    type;
2122         char                    *name;
2123         int found = 0;
2124         int i;
2125         int rc = 0;
2126
2127         if (arg == NULL)
2128                 name = NRS_TBF_TYPE_GENERIC;
2129         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2130                 name = arg;
2131         else
2132                 GOTO(out, rc = -EINVAL);
2133
2134         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2135                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2136                         ops = nrs_tbf_types[i].ntt_ops;
2137                         type = nrs_tbf_types[i].ntt_flag;
2138                         found = 1;
2139                         break;
2140                 }
2141         }
2142         if (found == 0)
2143                 GOTO(out, rc = -ENOTSUPP);
2144
2145         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2146         if (head == NULL)
2147                 GOTO(out, rc = -ENOMEM);
2148
2149         memcpy(head->th_type, name, strlen(name));
2150         head->th_type[strlen(name)] = '\0';
2151         head->th_ops = ops;
2152         head->th_type_flag = type;
2153
2154         head->th_binheap = cfs_binheap_create(&nrs_tbf_heap_ops,
2155                                               CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2156                                               nrs_pol2cptab(policy),
2157                                               nrs_pol2cptid(policy));
2158         if (head->th_binheap == NULL)
2159                 GOTO(out_free_head, rc = -ENOMEM);
2160
2161         atomic_set(&head->th_rule_sequence, 0);
2162         spin_lock_init(&head->th_rule_lock);
2163         INIT_LIST_HEAD(&head->th_list);
2164         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2165         head->th_timer.function = nrs_tbf_timer_cb;
2166         rc = head->th_ops->o_startup(policy, head);
2167         if (rc)
2168                 GOTO(out_free_heap, rc);
2169
2170         policy->pol_private = head;
2171         return 0;
2172 out_free_heap:
2173         cfs_binheap_destroy(head->th_binheap);
2174 out_free_head:
2175         OBD_FREE_PTR(head);
2176 out:
2177         return rc;
2178 }
2179
2180 /**
2181  * Is called before the policy transitions into
2182  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2183  * private data structure.
2184  *
2185  * \param[in] policy The policy to stop
2186  *
2187  * \see nrs_policy_stop0()
2188  */
2189 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2190 {
2191         struct nrs_tbf_head *head = policy->pol_private;
2192         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2193         struct nrs_tbf_rule *rule, *n;
2194
2195         LASSERT(head != NULL);
2196         LASSERT(head->th_cli_hash != NULL);
2197         hrtimer_cancel(&head->th_timer);
2198         /* Should cleanup hash first before free rules */
2199         cfs_hash_putref(head->th_cli_hash);
2200         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2201                 list_del_init(&rule->tr_linkage);
2202                 nrs_tbf_rule_put(rule);
2203         }
2204         LASSERT(list_empty(&head->th_list));
2205         LASSERT(head->th_binheap != NULL);
2206         LASSERT(cfs_binheap_is_empty(head->th_binheap));
2207         cfs_binheap_destroy(head->th_binheap);
2208         OBD_FREE_PTR(head);
2209         nrs->nrs_throttling = 0;
2210         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2211 }
2212
2213 /**
2214  * Performs a policy-specific ctl function on TBF policy instances; similar
2215  * to ioctl.
2216  *
2217  * \param[in]     policy the policy instance
2218  * \param[in]     opc    the opcode
2219  * \param[in,out] arg    used for passing parameters and information
2220  *
2221  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2222  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2223  *
2224  * \retval 0   operation carried out successfully
2225  * \retval -ve error
2226  */
2227 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2228                        enum ptlrpc_nrs_ctl opc,
2229                        void *arg)
2230 {
2231         int rc = 0;
2232         ENTRY;
2233
2234         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2235
2236         switch ((enum nrs_ctl_tbf)opc) {
2237         default:
2238                 RETURN(-EINVAL);
2239
2240         /**
2241          * Read RPC rate size of a policy instance.
2242          */
2243         case NRS_CTL_TBF_RD_RULE: {
2244                 struct nrs_tbf_head *head = policy->pol_private;
2245                 struct seq_file *m = (struct seq_file *) arg;
2246                 struct ptlrpc_service_part *svcpt;
2247
2248                 svcpt = policy->pol_nrs->nrs_svcpt;
2249                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2250
2251                 rc = nrs_tbf_rule_dump_all(head, m);
2252                 }
2253                 break;
2254
2255         /**
2256          * Write RPC rate of a policy instance.
2257          */
2258         case NRS_CTL_TBF_WR_RULE: {
2259                 struct nrs_tbf_head *head = policy->pol_private;
2260                 struct nrs_tbf_cmd *cmd;
2261
2262                 cmd = (struct nrs_tbf_cmd *)arg;
2263                 rc = nrs_tbf_command(policy,
2264                                      head,
2265                                      cmd);
2266                 }
2267                 break;
2268         /**
2269          * Read the TBF policy type of a policy instance.
2270          */
2271         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2272                 struct nrs_tbf_head *head = policy->pol_private;
2273
2274                 *(__u32 *)arg = head->th_type_flag;
2275                 }
2276                 break;
2277         }
2278
2279         RETURN(rc);
2280 }
2281
2282 /**
2283  * Is called for obtaining a TBF policy resource.
2284  *
2285  * \param[in]  policy     The policy on which the request is being asked for
2286  * \param[in]  nrq        The request for which resources are being taken
2287  * \param[in]  parent     Parent resource, unused in this policy
2288  * \param[out] resp       Resources references are placed in this array
2289  * \param[in]  moving_req Signifies limited caller context; unused in this
2290  *                        policy
2291  *
2292  *
2293  * \see nrs_resource_get_safe()
2294  */
2295 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2296                            struct ptlrpc_nrs_request *nrq,
2297                            const struct ptlrpc_nrs_resource *parent,
2298                            struct ptlrpc_nrs_resource **resp,
2299                            bool moving_req)
2300 {
2301         struct nrs_tbf_head   *head;
2302         struct nrs_tbf_client *cli;
2303         struct nrs_tbf_client *tmp;
2304         struct ptlrpc_request *req;
2305
2306         if (parent == NULL) {
2307                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2308                 return 0;
2309         }
2310
2311         head = container_of(parent, struct nrs_tbf_head, th_res);
2312         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2313         cli = head->th_ops->o_cli_find(head, req);
2314         if (cli != NULL) {
2315                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2316                 LASSERT(cli->tc_rule);
2317                 if (cli->tc_rule_sequence !=
2318                     atomic_read(&head->th_rule_sequence) ||
2319                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2320                         struct nrs_tbf_rule *rule;
2321
2322                         rule = nrs_tbf_rule_match(head, cli);
2323                         if (rule != cli->tc_rule)
2324                                 nrs_tbf_cli_reset(head, rule, cli);
2325                         else
2326                                 nrs_tbf_rule_put(rule);
2327                 } else if (cli->tc_rule_generation !=
2328                            cli->tc_rule->tr_generation) {
2329                         nrs_tbf_cli_reset_value(head, cli);
2330                 }
2331                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2332                 goto out;
2333         }
2334
2335         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2336                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2337         if (cli == NULL)
2338                 return -ENOMEM;
2339
2340         nrs_tbf_cli_init(head, cli, req);
2341         tmp = head->th_ops->o_cli_findadd(head, cli);
2342         if (tmp != cli) {
2343                 atomic_dec(&cli->tc_ref);
2344                 nrs_tbf_cli_fini(cli);
2345                 cli = tmp;
2346         }
2347 out:
2348         *resp = &cli->tc_res;
2349
2350         return 1;
2351 }
2352
2353 /**
2354  * Called when releasing references to the resource hierachy obtained for a
2355  * request for scheduling using the TBF policy.
2356  *
2357  * \param[in] policy   the policy the resource belongs to
2358  * \param[in] res      the resource to be released
2359  */
2360 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
2361                             const struct ptlrpc_nrs_resource *res)
2362 {
2363         struct nrs_tbf_head   *head;
2364         struct nrs_tbf_client *cli;
2365
2366         /**
2367          * Do nothing for freeing parent, nrs_tbf_net resources
2368          */
2369         if (res->res_parent == NULL)
2370                 return;
2371
2372         cli = container_of(res, struct nrs_tbf_client, tc_res);
2373         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
2374
2375         head->th_ops->o_cli_put(head, cli);
2376 }
2377
2378 /**
2379  * Called when getting a request from the TBF policy for handling, or just
2380  * peeking; removes the request from the policy when it is to be handled.
2381  *
2382  * \param[in] policy The policy
2383  * \param[in] peek   When set, signifies that we just want to examine the
2384  *                   request, and not handle it, so the request is not removed
2385  *                   from the policy.
2386  * \param[in] force  Force the policy to return a request; unused in this
2387  *                   policy
2388  *
2389  * \retval The request to be handled; this is the next request in the TBF
2390  *         rule
2391  *
2392  * \see ptlrpc_nrs_req_get_nolock()
2393  * \see nrs_request_get()
2394  */
2395 static
2396 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
2397                                            bool peek, bool force)
2398 {
2399         struct nrs_tbf_head       *head = policy->pol_private;
2400         struct ptlrpc_nrs_request *nrq = NULL;
2401         struct nrs_tbf_client     *cli;
2402         struct cfs_binheap_node   *node;
2403
2404         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2405
2406         if (!peek && policy->pol_nrs->nrs_throttling)
2407                 return NULL;
2408
2409         node = cfs_binheap_root(head->th_binheap);
2410         if (unlikely(node == NULL))
2411                 return NULL;
2412
2413         cli = container_of(node, struct nrs_tbf_client, tc_node);
2414         LASSERT(cli->tc_in_heap);
2415         if (peek) {
2416                 nrq = list_entry(cli->tc_list.next,
2417                                      struct ptlrpc_nrs_request,
2418                                      nr_u.tbf.tr_list);
2419         } else {
2420                 __u64 now = ktime_to_ns(ktime_get());
2421                 __u64 passed;
2422                 __u64 ntoken;
2423                 __u64 deadline;
2424
2425                 deadline = cli->tc_check_time +
2426                           cli->tc_nsecs;
2427                 LASSERT(now >= cli->tc_check_time);
2428                 passed = now - cli->tc_check_time;
2429                 ntoken = passed * cli->tc_rpc_rate;
2430                 do_div(ntoken, NSEC_PER_SEC);
2431                 ntoken += cli->tc_ntoken;
2432                 if (ntoken > cli->tc_depth)
2433                         ntoken = cli->tc_depth;
2434                 if (ntoken > 0) {
2435                         struct ptlrpc_request *req;
2436                         nrq = list_entry(cli->tc_list.next,
2437                                              struct ptlrpc_nrs_request,
2438                                              nr_u.tbf.tr_list);
2439                         req = container_of(nrq,
2440                                            struct ptlrpc_request,
2441                                            rq_nrq);
2442                         ntoken--;
2443                         cli->tc_ntoken = ntoken;
2444                         cli->tc_check_time = now;
2445                         list_del_init(&nrq->nr_u.tbf.tr_list);
2446                         if (list_empty(&cli->tc_list)) {
2447                                 cfs_binheap_remove(head->th_binheap,
2448                                                    &cli->tc_node);
2449                                 cli->tc_in_heap = false;
2450                         } else {
2451                                 cfs_binheap_relocate(head->th_binheap,
2452                                                      &cli->tc_node);
2453                         }
2454                         CDEBUG(D_RPCTRACE,
2455                                "NRS start %s request from %s, "
2456                                "seq: %llu\n",
2457                                policy->pol_desc->pd_name,
2458                                libcfs_id2str(req->rq_peer),
2459                                nrq->nr_u.tbf.tr_sequence);
2460                 } else {
2461                         ktime_t time;
2462
2463                         policy->pol_nrs->nrs_throttling = 1;
2464                         head->th_deadline = deadline;
2465                         time = ktime_set(0, 0);
2466                         time = ktime_add_ns(time, deadline);
2467                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
2468                 }
2469         }
2470
2471         return nrq;
2472 }
2473
2474 /**
2475  * Adds request \a nrq to \a policy's list of queued requests
2476  *
2477  * \param[in] policy The policy
2478  * \param[in] nrq    The request to add
2479  *
2480  * \retval 0 success; nrs_request_enqueue() assumes this function will always
2481  *                    succeed
2482  */
2483 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
2484                            struct ptlrpc_nrs_request *nrq)
2485 {
2486         struct nrs_tbf_head   *head;
2487         struct nrs_tbf_client *cli;
2488         int                    rc = 0;
2489
2490         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2491
2492         cli = container_of(nrs_request_resource(nrq),
2493                            struct nrs_tbf_client, tc_res);
2494         head = container_of(nrs_request_resource(nrq)->res_parent,
2495                             struct nrs_tbf_head, th_res);
2496         if (list_empty(&cli->tc_list)) {
2497                 LASSERT(!cli->tc_in_heap);
2498                 rc = cfs_binheap_insert(head->th_binheap, &cli->tc_node);
2499                 if (rc == 0) {
2500                         cli->tc_in_heap = true;
2501                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
2502                         list_add_tail(&nrq->nr_u.tbf.tr_list,
2503                                           &cli->tc_list);
2504                         if (policy->pol_nrs->nrs_throttling) {
2505                                 __u64 deadline = cli->tc_check_time +
2506                                                  cli->tc_nsecs;
2507                                 if ((head->th_deadline > deadline) &&
2508                                     (hrtimer_try_to_cancel(&head->th_timer)
2509                                      >= 0)) {
2510                                         ktime_t time;
2511                                         head->th_deadline = deadline;
2512                                         time = ktime_set(0, 0);
2513                                         time = ktime_add_ns(time, deadline);
2514                                         hrtimer_start(&head->th_timer, time,
2515                                                       HRTIMER_MODE_ABS);
2516                                 }
2517                         }
2518                 }
2519         } else {
2520                 LASSERT(cli->tc_in_heap);
2521                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
2522                 list_add_tail(&nrq->nr_u.tbf.tr_list,
2523                                   &cli->tc_list);
2524         }
2525         return rc;
2526 }
2527
2528 /**
2529  * Removes request \a nrq from \a policy's list of queued requests.
2530  *
2531  * \param[in] policy The policy
2532  * \param[in] nrq    The request to remove
2533  */
2534 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
2535                              struct ptlrpc_nrs_request *nrq)
2536 {
2537         struct nrs_tbf_head   *head;
2538         struct nrs_tbf_client *cli;
2539
2540         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2541
2542         cli = container_of(nrs_request_resource(nrq),
2543                            struct nrs_tbf_client, tc_res);
2544         head = container_of(nrs_request_resource(nrq)->res_parent,
2545                             struct nrs_tbf_head, th_res);
2546
2547         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
2548         list_del_init(&nrq->nr_u.tbf.tr_list);
2549         if (list_empty(&cli->tc_list)) {
2550                 cfs_binheap_remove(head->th_binheap,
2551                                    &cli->tc_node);
2552                 cli->tc_in_heap = false;
2553         } else {
2554                 cfs_binheap_relocate(head->th_binheap,
2555                                      &cli->tc_node);
2556         }
2557 }
2558
2559 /**
2560  * Prints a debug statement right before the request \a nrq stops being
2561  * handled.
2562  *
2563  * \param[in] policy The policy handling the request
2564  * \param[in] nrq    The request being handled
2565  *
2566  * \see ptlrpc_server_finish_request()
2567  * \see ptlrpc_nrs_req_stop_nolock()
2568  */
2569 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
2570                               struct ptlrpc_nrs_request *nrq)
2571 {
2572         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
2573                                                   rq_nrq);
2574
2575         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2576
2577         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
2578                policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
2579                nrq->nr_u.tbf.tr_sequence);
2580 }
2581
2582 #ifdef CONFIG_PROC_FS
2583
2584 /**
2585  * lprocfs interface
2586  */
2587
2588 /**
2589  * The maximum RPC rate.
2590  */
2591 #define LPROCFS_NRS_RATE_MAX            65535
2592
2593 static int
2594 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
2595 {
2596         struct ptlrpc_service       *svc = m->private;
2597         int                          rc;
2598
2599         seq_printf(m, "regular_requests:\n");
2600         /**
2601          * Perform two separate calls to this as only one of the NRS heads'
2602          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
2603          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
2604          */
2605         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
2606                                        NRS_POL_NAME_TBF,
2607                                        NRS_CTL_TBF_RD_RULE,
2608                                        false, m);
2609         if (rc == 0) {
2610                 /**
2611                  * -ENOSPC means buf in the parameter m is overflow, return 0
2612                  * here to let upper layer function seq_read alloc a larger
2613                  * memory area and do this process again.
2614                  */
2615         } else if (rc == -ENOSPC) {
2616                 return 0;
2617
2618                 /**
2619                  * Ignore -ENODEV as the regular NRS head's policy may be in the
2620                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
2621                  */
2622         } else if (rc != -ENODEV) {
2623                 return rc;
2624         }
2625
2626         if (!nrs_svc_has_hp(svc))
2627                 goto no_hp;
2628
2629         seq_printf(m, "high_priority_requests:\n");
2630         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
2631                                        NRS_POL_NAME_TBF,
2632                                        NRS_CTL_TBF_RD_RULE,
2633                                        false, m);
2634         if (rc == 0) {
2635                 /**
2636                  * -ENOSPC means buf in the parameter m is overflow, return 0
2637                  * here to let upper layer function seq_read alloc a larger
2638                  * memory area and do this process again.
2639                  */
2640         } else if (rc == -ENOSPC) {
2641                 return 0;
2642         }
2643
2644 no_hp:
2645
2646         return rc;
2647 }
2648
2649 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
2650 {
2651         int rc;
2652
2653         switch (cmd->u.tc_start.ts_valid_type) {
2654         case NRS_TBF_FLAG_JOBID:
2655                 rc = nrs_tbf_jobid_parse(cmd, token);
2656                 break;
2657         case NRS_TBF_FLAG_NID:
2658                 rc = nrs_tbf_nid_parse(cmd, token);
2659                 break;
2660         case NRS_TBF_FLAG_OPCODE:
2661                 rc = nrs_tbf_opcode_parse(cmd, token);
2662                 break;
2663         case NRS_TBF_FLAG_GENERIC:
2664                 rc = nrs_tbf_generic_parse(cmd, token);
2665                 break;
2666         default:
2667                 RETURN(-EINVAL);
2668         }
2669
2670         return rc;
2671 }
2672
2673 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
2674 {
2675         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
2676                 if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_JOBID)
2677                         nrs_tbf_jobid_cmd_fini(cmd);
2678                 else if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_NID)
2679                         nrs_tbf_nid_cmd_fini(cmd);
2680                 else if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_OPCODE)
2681                         nrs_tbf_opcode_cmd_fini(cmd);
2682                 else if (cmd->u.tc_start.ts_valid_type == NRS_TBF_FLAG_GENERIC)
2683                         nrs_tbf_generic_cmd_fini(cmd);
2684         }
2685 }
2686
2687 static bool name_is_valid(const char *name)
2688 {
2689         int i;
2690
2691         for (i = 0; i < strlen(name); i++) {
2692                 if ((!isalnum(name[i])) &&
2693                     (name[i] != '_'))
2694                         return false;
2695         }
2696         return true;
2697 }
2698
2699 static int
2700 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
2701 {
2702         char    *key;
2703         char    *val;
2704         int      rc;
2705         __u64    rate;
2706
2707         val = buffer;
2708         key = strsep(&val, "=");
2709         if (val == NULL || strlen(val) == 0)
2710                 return -EINVAL;
2711
2712         /* Key of the value pair */
2713         if (strcmp(key, "rate") == 0) {
2714                 rc = kstrtoull(val, 10, &rate);
2715                 if (rc)
2716                         return rc;
2717
2718                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
2719                         return -EINVAL;
2720
2721                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
2722                         cmd->u.tc_start.ts_rpc_rate = rate;
2723                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
2724                         cmd->u.tc_change.tc_rpc_rate = rate;
2725                 else
2726                         return -EINVAL;
2727         }  else if (strcmp(key, "rank") == 0) {
2728                 if (!name_is_valid(val))
2729                         return -EINVAL;
2730
2731                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
2732                         cmd->u.tc_start.ts_next_name = val;
2733                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
2734                         cmd->u.tc_change.tc_next_name = val;
2735                 else
2736                         return -EINVAL;
2737         } else {
2738                 return -EINVAL;
2739         }
2740         return 0;
2741 }
2742
2743 static int
2744 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
2745 {
2746         char    *val;
2747         char    *token;
2748         int      rc;
2749
2750         val = buffer;
2751         while (val != NULL && strlen(val) != 0) {
2752                 token = strsep(&val, " ");
2753                 rc = nrs_tbf_parse_value_pair(cmd, token);
2754                 if (rc)
2755                         return rc;
2756         }
2757
2758         switch (cmd->tc_cmd) {
2759         case NRS_CTL_TBF_START_RULE:
2760                 if (cmd->u.tc_start.ts_rpc_rate == 0)
2761                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
2762                 break;
2763         case NRS_CTL_TBF_CHANGE_RULE:
2764                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
2765                     cmd->u.tc_change.tc_next_name == NULL)
2766                         return -EINVAL;
2767                 break;
2768         case NRS_CTL_TBF_STOP_RULE:
2769                 break;
2770         default:
2771                 return -EINVAL;
2772         }
2773         return 0;
2774 }
2775
2776 static struct nrs_tbf_cmd *
2777 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
2778 {
2779         static struct nrs_tbf_cmd       *cmd;
2780         char                            *token;
2781         char                            *val;
2782         int                              rc = 0;
2783
2784         OBD_ALLOC_PTR(cmd);
2785         if (cmd == NULL)
2786                 GOTO(out, rc = -ENOMEM);
2787         memset(cmd, 0, sizeof(*cmd));
2788
2789         val = buffer;
2790         token = strsep(&val, " ");
2791         if (val == NULL || strlen(val) == 0)
2792                 GOTO(out_free_cmd, rc = -EINVAL);
2793
2794         /* Type of the command */
2795         if (strcmp(token, "start") == 0) {
2796                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
2797                 cmd->u.tc_start.ts_valid_type = type_flag;
2798         } else if (strcmp(token, "stop") == 0)
2799                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
2800         else if (strcmp(token, "change") == 0)
2801                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
2802         else
2803                 GOTO(out_free_cmd, rc = -EINVAL);
2804
2805         /* Name of the rule */
2806         token = strsep(&val, " ");
2807         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) ||
2808             !name_is_valid(token))
2809                 GOTO(out_free_cmd, rc = -EINVAL);
2810         cmd->tc_name = token;
2811
2812         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
2813                 /* List of ID */
2814                 LASSERT(val);
2815                 token = val;
2816                 val = strrchr(token, '}');
2817                 if (!val)
2818                         GOTO(out_free_cmd, rc = -EINVAL);
2819
2820                 /* Skip '}' */
2821                 val++;
2822                 if (*val == '\0') {
2823                         val = NULL;
2824                 } else if (*val == ' ') {
2825                         *val = '\0';
2826                         val++;
2827                 } else
2828                         GOTO(out_free_cmd, rc = -EINVAL);
2829
2830                 rc = nrs_tbf_id_parse(cmd, token);
2831                 if (rc)
2832                         GOTO(out_free_cmd, rc);
2833         }
2834
2835         rc = nrs_tbf_parse_value_pairs(cmd, val);
2836         if (rc)
2837                 GOTO(out_cmd_fini, rc = -EINVAL);
2838         goto out;
2839 out_cmd_fini:
2840         nrs_tbf_cmd_fini(cmd);
2841 out_free_cmd:
2842         OBD_FREE_PTR(cmd);
2843 out:
2844         if (rc)
2845                 cmd = ERR_PTR(rc);
2846         return cmd;
2847 }
2848
2849 /**
2850  * Get the TBF policy type (nid, jobid, etc) preset by
2851  * proc entry 'nrs_policies' for command buffer parsing.
2852  *
2853  * \param[in] svc the PTLRPC service
2854  * \param[in] queue the NRS queue type
2855  *
2856  * \retval the preset TBF policy type flag
2857  */
2858 static __u32
2859 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
2860 {
2861         __u32   type;
2862         int     rc;
2863
2864         rc = ptlrpc_nrs_policy_control(svc, queue,
2865                                        NRS_POL_NAME_TBF,
2866                                        NRS_CTL_TBF_RD_TYPE_FLAG,
2867                                        true, &type);
2868         if (rc != 0)
2869                 type = NRS_TBF_FLAG_INVALID;
2870
2871         return type;
2872 }
2873
2874 extern struct nrs_core nrs_core;
2875 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
2876 static ssize_t
2877 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
2878                                       const char __user *buffer,
2879                                       size_t count, loff_t *off)
2880 {
2881         struct seq_file           *m = file->private_data;
2882         struct ptlrpc_service     *svc = m->private;
2883         char                      *kernbuf;
2884         char                      *val;
2885         int                        rc;
2886         static struct nrs_tbf_cmd *cmd;
2887         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
2888         unsigned long              length;
2889         char                      *token;
2890
2891         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
2892         if (kernbuf == NULL)
2893                 GOTO(out, rc = -ENOMEM);
2894
2895         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
2896                 GOTO(out_free_kernbuff, rc = -EINVAL);
2897
2898         if (copy_from_user(kernbuf, buffer, count))
2899                 GOTO(out_free_kernbuff, rc = -EFAULT);
2900
2901         val = kernbuf;
2902         token = strsep(&val, " ");
2903         if (val == NULL)
2904                 GOTO(out_free_kernbuff, rc = -EINVAL);
2905
2906         if (strcmp(token, "reg") == 0) {
2907                 queue = PTLRPC_NRS_QUEUE_REG;
2908         } else if (strcmp(token, "hp") == 0) {
2909                 queue = PTLRPC_NRS_QUEUE_HP;
2910         } else {
2911                 kernbuf[strlen(token)] = ' ';
2912                 val = kernbuf;
2913         }
2914         length = strlen(val);
2915
2916         if (length == 0)
2917                 GOTO(out_free_kernbuff, rc = -EINVAL);
2918
2919         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
2920                 GOTO(out_free_kernbuff, rc = -ENODEV);
2921         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
2922                 queue = PTLRPC_NRS_QUEUE_REG;
2923
2924         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
2925         if (IS_ERR(cmd))
2926                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
2927
2928         /**
2929          * Serialize NRS core lprocfs operations with policy registration/
2930          * unregistration.
2931          */
2932         mutex_lock(&nrs_core.nrs_mutex);
2933         rc = ptlrpc_nrs_policy_control(svc, queue,
2934                                        NRS_POL_NAME_TBF,
2935                                        NRS_CTL_TBF_WR_RULE,
2936                                        false, cmd);
2937         mutex_unlock(&nrs_core.nrs_mutex);
2938
2939         nrs_tbf_cmd_fini(cmd);
2940         OBD_FREE_PTR(cmd);
2941 out_free_kernbuff:
2942         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
2943 out:
2944         return rc ? rc : count;
2945 }
2946 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
2947
2948 /**
2949  * Initializes a TBF policy's lprocfs interface for service \a svc
2950  *
2951  * \param[in] svc the service
2952  *
2953  * \retval 0    success
2954  * \retval != 0 error
2955  */
2956 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
2957 {
2958         struct lprocfs_vars nrs_tbf_lprocfs_vars[] = {
2959                 { .name         = "nrs_tbf_rule",
2960                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
2961                   .data = svc },
2962                 { NULL }
2963         };
2964
2965         if (svc->srv_procroot == NULL)
2966                 return 0;
2967
2968         return lprocfs_add_vars(svc->srv_procroot, nrs_tbf_lprocfs_vars, NULL);
2969 }
2970
2971 /**
2972  * Cleans up a TBF policy's lprocfs interface for service \a svc
2973  *
2974  * \param[in] svc the service
2975  */
2976 static void nrs_tbf_lprocfs_fini(struct ptlrpc_service *svc)
2977 {
2978         if (svc->srv_procroot == NULL)
2979                 return;
2980
2981         lprocfs_remove_proc_entry("nrs_tbf_rule", svc->srv_procroot);
2982 }
2983
2984 #endif /* CONFIG_PROC_FS */
2985
2986 /**
2987  * TBF policy operations
2988  */
2989 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
2990         .op_policy_start        = nrs_tbf_start,
2991         .op_policy_stop         = nrs_tbf_stop,
2992         .op_policy_ctl          = nrs_tbf_ctl,
2993         .op_res_get             = nrs_tbf_res_get,
2994         .op_res_put             = nrs_tbf_res_put,
2995         .op_req_get             = nrs_tbf_req_get,
2996         .op_req_enqueue         = nrs_tbf_req_add,
2997         .op_req_dequeue         = nrs_tbf_req_del,
2998         .op_req_stop            = nrs_tbf_req_stop,
2999 #ifdef CONFIG_PROC_FS
3000         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3001         .op_lprocfs_fini        = nrs_tbf_lprocfs_fini,
3002 #endif
3003 };
3004
3005 /**
3006  * TBF policy configuration
3007  */
3008 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3009         .nc_name                = NRS_POL_NAME_TBF,
3010         .nc_ops                 = &nrs_tbf_ops,
3011         .nc_compat              = nrs_policy_compat_all,
3012 };
3013
3014 /** @} tbf */
3015
3016 /** @} nrs */
3017
3018 #endif /* HAVE_SERVER_SUPPORT */