Whamcloud - gitweb
Revert "LU-15969 llite: add support for ->fileattr_get/set"
[fs/lustre-release.git] / lustre / ptlrpc / nrs_tbf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2013 DataDirect Networks, Inc.
24  *
25  * Copyright (c) 2014, 2016, Intel Corporation.
26  */
27 /*
28  * lustre/ptlrpc/nrs_tbf.c
29  *
30  * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
31  *
32  */
33
34 /**
35  * \addtogoup nrs
36  * @{
37  */
38
39 #define DEBUG_SUBSYSTEM S_RPC
40 #include <obd_support.h>
41 #include <obd_class.h>
42 #include <libcfs/libcfs.h>
43 #include <lustre_req_layout.h>
44 #include "ptlrpc_internal.h"
45
46 /**
47  * \name tbf
48  *
49  * Token Bucket Filter over client NIDs
50  *
51  * @{
52  */
53
54 #define NRS_POL_NAME_TBF        "tbf"
55
56 static int tbf_jobid_cache_size = 8192;
57 module_param(tbf_jobid_cache_size, int, 0644);
58 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
59
60 static int tbf_rate = 10000;
61 module_param(tbf_rate, int, 0644);
62 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
63
64 static int tbf_depth = 3;
65 module_param(tbf_depth, int, 0644);
66 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
67
68 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
69 {
70         struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
71                                                  th_timer);
72         struct ptlrpc_nrs   *nrs = head->th_res.res_policy->pol_nrs;
73         struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
74
75         nrs->nrs_throttling = 0;
76         wake_up(&svcpt->scp_waitq);
77
78         return HRTIMER_NORESTART;
79 }
80
81 #define NRS_TBF_DEFAULT_RULE "default"
82
83 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
84 {
85         LASSERT(atomic_read(&rule->tr_ref) == 0);
86         LASSERT(list_empty(&rule->tr_cli_list));
87         LASSERT(list_empty(&rule->tr_linkage));
88
89         rule->tr_head->th_ops->o_rule_fini(rule);
90         OBD_FREE_PTR(rule);
91 }
92
93 /**
94  * Decreases the rule's usage reference count, and stops the rule in case it
95  * was already stopping and have no more outstanding usage references (which
96  * indicates it has no more queued or started requests, and can be safely
97  * stopped).
98  */
99 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
100 {
101         if (atomic_dec_and_test(&rule->tr_ref))
102                 nrs_tbf_rule_fini(rule);
103 }
104
105 /**
106  * Increases the rule's usage reference count.
107  */
108 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
109 {
110         atomic_inc(&rule->tr_ref);
111 }
112
113 static void
114 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
115 {
116         LASSERT(!list_empty(&cli->tc_linkage));
117         LASSERT(cli->tc_rule);
118         spin_lock(&cli->tc_rule->tr_rule_lock);
119         list_del_init(&cli->tc_linkage);
120         spin_unlock(&cli->tc_rule->tr_rule_lock);
121         nrs_tbf_rule_put(cli->tc_rule);
122         cli->tc_rule = NULL;
123 }
124
125 static void
126 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
127                         struct nrs_tbf_client *cli)
128
129 {
130         struct nrs_tbf_rule *rule = cli->tc_rule;
131
132         cli->tc_rpc_rate = rule->tr_rpc_rate;
133         cli->tc_nsecs = rule->tr_nsecs_per_rpc;
134         cli->tc_nsecs_resid = 0;
135         cli->tc_depth = rule->tr_depth;
136         cli->tc_ntoken = rule->tr_depth;
137         cli->tc_check_time = ktime_to_ns(ktime_get());
138         cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
139         cli->tc_rule_generation = rule->tr_generation;
140
141         if (cli->tc_in_heap)
142                 binheap_relocate(head->th_binheap,
143                                  &cli->tc_node);
144 }
145
146 static void
147 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
148                   struct nrs_tbf_rule *rule,
149                   struct nrs_tbf_client *cli)
150 {
151         spin_lock(&cli->tc_rule_lock);
152         if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
153                 LASSERT(rule != cli->tc_rule);
154                 nrs_tbf_cli_rule_put(cli);
155         }
156         LASSERT(cli->tc_rule == NULL);
157         LASSERT(list_empty(&cli->tc_linkage));
158         /* Rule's ref is added before called */
159         cli->tc_rule = rule;
160         spin_lock(&rule->tr_rule_lock);
161         list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
162         spin_unlock(&rule->tr_rule_lock);
163         spin_unlock(&cli->tc_rule_lock);
164         nrs_tbf_cli_reset_value(head, cli);
165 }
166
167 static int
168 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
169 {
170         return rule->tr_head->th_ops->o_rule_dump(rule, m);
171 }
172
173 static int
174 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
175 {
176         struct nrs_tbf_rule *rule;
177         int rc = 0;
178
179         LASSERT(head != NULL);
180         spin_lock(&head->th_rule_lock);
181         /* List the rules from newest to oldest */
182         list_for_each_entry(rule, &head->th_list, tr_linkage) {
183                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
184                 rc = nrs_tbf_rule_dump(rule, m);
185                 if (rc) {
186                         rc = -ENOSPC;
187                         break;
188                 }
189         }
190         spin_unlock(&head->th_rule_lock);
191
192         return rc;
193 }
194
195 static struct nrs_tbf_rule *
196 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
197                          const char *name)
198 {
199         struct nrs_tbf_rule *rule;
200
201         LASSERT(head != NULL);
202         list_for_each_entry(rule, &head->th_list, tr_linkage) {
203                 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
204                 if (strcmp(rule->tr_name, name) == 0) {
205                         nrs_tbf_rule_get(rule);
206                         return rule;
207                 }
208         }
209         return NULL;
210 }
211
212 static struct nrs_tbf_rule *
213 nrs_tbf_rule_find(struct nrs_tbf_head *head,
214                   const char *name)
215 {
216         struct nrs_tbf_rule *rule;
217
218         LASSERT(head != NULL);
219         spin_lock(&head->th_rule_lock);
220         rule = nrs_tbf_rule_find_nolock(head, name);
221         spin_unlock(&head->th_rule_lock);
222         return rule;
223 }
224
225 static struct nrs_tbf_rule *
226 nrs_tbf_rule_match(struct nrs_tbf_head *head,
227                    struct nrs_tbf_client *cli)
228 {
229         struct nrs_tbf_rule *rule = NULL;
230         struct nrs_tbf_rule *tmp_rule;
231
232         spin_lock(&head->th_rule_lock);
233         /* Match the newest rule in the list */
234         list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
235                 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
236                 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
237                         rule = tmp_rule;
238                         break;
239                 }
240         }
241
242         if (rule == NULL)
243                 rule = head->th_rule;
244
245         nrs_tbf_rule_get(rule);
246         spin_unlock(&head->th_rule_lock);
247         return rule;
248 }
249
250 static void
251 nrs_tbf_cli_init(struct nrs_tbf_head *head,
252                  struct nrs_tbf_client *cli,
253                  struct ptlrpc_request *req)
254 {
255         struct nrs_tbf_rule *rule;
256
257         memset(cli, 0, sizeof(*cli));
258         cli->tc_in_heap = false;
259         head->th_ops->o_cli_init(cli, req);
260         INIT_LIST_HEAD(&cli->tc_list);
261         INIT_LIST_HEAD(&cli->tc_linkage);
262         spin_lock_init(&cli->tc_rule_lock);
263         atomic_set(&cli->tc_ref, 1);
264         rule = nrs_tbf_rule_match(head, cli);
265         nrs_tbf_cli_reset(head, rule, cli);
266 }
267
268 static void
269 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
270 {
271         LASSERT(list_empty(&cli->tc_list));
272         LASSERT(!cli->tc_in_heap);
273         LASSERT(atomic_read(&cli->tc_ref) == 0);
274         spin_lock(&cli->tc_rule_lock);
275         nrs_tbf_cli_rule_put(cli);
276         spin_unlock(&cli->tc_rule_lock);
277         OBD_FREE_PTR(cli);
278 }
279
280 static int
281 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
282                    struct nrs_tbf_head *head,
283                    struct nrs_tbf_cmd *start)
284 {
285         struct nrs_tbf_rule     *rule;
286         struct nrs_tbf_rule     *tmp_rule;
287         struct nrs_tbf_rule     *next_rule;
288         char                    *next_name = start->u.tc_start.ts_next_name;
289         int                      rc;
290
291         rule = nrs_tbf_rule_find(head, start->tc_name);
292         if (rule) {
293                 nrs_tbf_rule_put(rule);
294                 return -EEXIST;
295         }
296
297         OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
298         if (rule == NULL)
299                 return -ENOMEM;
300
301         strlcpy(rule->tr_name, start->tc_name, sizeof(rule->tr_name));
302         rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
303         rule->tr_flags = start->u.tc_start.ts_rule_flags;
304         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
305         rule->tr_depth = tbf_depth;
306         atomic_set(&rule->tr_ref, 1);
307         INIT_LIST_HEAD(&rule->tr_cli_list);
308         INIT_LIST_HEAD(&rule->tr_nids);
309         INIT_LIST_HEAD(&rule->tr_linkage);
310         spin_lock_init(&rule->tr_rule_lock);
311         rule->tr_head = head;
312
313         rc = head->th_ops->o_rule_init(policy, rule, start);
314         if (rc) {
315                 OBD_FREE_PTR(rule);
316                 return rc;
317         }
318
319         /* Add as the newest rule */
320         spin_lock(&head->th_rule_lock);
321         tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
322         if (tmp_rule) {
323                 spin_unlock(&head->th_rule_lock);
324                 nrs_tbf_rule_put(tmp_rule);
325                 nrs_tbf_rule_put(rule);
326                 return -EEXIST;
327         }
328
329         if (next_name) {
330                 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
331                 if (!next_rule) {
332                         spin_unlock(&head->th_rule_lock);
333                         nrs_tbf_rule_put(rule);
334                         return -ENOENT;
335                 }
336
337                 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
338                 nrs_tbf_rule_put(next_rule);
339         } else {
340                 /* Add on the top of the rule list */
341                 list_add(&rule->tr_linkage, &head->th_list);
342         }
343         spin_unlock(&head->th_rule_lock);
344         atomic_inc(&head->th_rule_sequence);
345         if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
346                 rule->tr_flags |= NTRS_DEFAULT;
347                 LASSERT(head->th_rule == NULL);
348                 head->th_rule = rule;
349         }
350
351         CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %llu gen %llu\n",
352                rule, rule->tr_rpc_rate, rule->tr_generation);
353
354         return 0;
355 }
356
357 /**
358  * Change the rank of a rule in the rule list
359  *
360  * The matched rule will be moved to the position right before another
361  * given rule.
362  *
363  * \param[in] policy    the policy instance
364  * \param[in] head      the TBF policy instance
365  * \param[in] name      the rule name to be moved
366  * \param[in] next_name the rule name before which the matched rule will be
367  *                      moved
368  *
369  */
370 static int
371 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
372                          struct nrs_tbf_head *head,
373                          char *name,
374                          char *next_name)
375 {
376         struct nrs_tbf_rule     *rule = NULL;
377         struct nrs_tbf_rule     *next_rule = NULL;
378         int                      rc = 0;
379
380         LASSERT(head != NULL);
381
382         spin_lock(&head->th_rule_lock);
383         rule = nrs_tbf_rule_find_nolock(head, name);
384         if (!rule)
385                 GOTO(out, rc = -ENOENT);
386
387         if (strcmp(name, next_name) == 0)
388                 GOTO(out_put, rc);
389
390         next_rule = nrs_tbf_rule_find_nolock(head, next_name);
391         if (!next_rule)
392                 GOTO(out_put, rc = -ENOENT);
393
394         /* rules may be adjacent in same list, so list_move() isn't safe here */
395         list_move_tail(&rule->tr_linkage, &next_rule->tr_linkage);
396         nrs_tbf_rule_put(next_rule);
397 out_put:
398         nrs_tbf_rule_put(rule);
399 out:
400         spin_unlock(&head->th_rule_lock);
401         return rc;
402 }
403
404 static int
405 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
406                          struct nrs_tbf_head *head,
407                          char *name,
408                          __u64 rate)
409 {
410         struct nrs_tbf_rule *rule;
411
412         assert_spin_locked(&policy->pol_nrs->nrs_lock);
413
414         rule = nrs_tbf_rule_find(head, name);
415         if (rule == NULL)
416                 return -ENOENT;
417
418         rule->tr_rpc_rate = rate;
419         rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
420         rule->tr_generation++;
421         nrs_tbf_rule_put(rule);
422
423         return 0;
424 }
425
426 static int
427 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
428                     struct nrs_tbf_head *head,
429                     struct nrs_tbf_cmd *change)
430 {
431         __u64    rate = change->u.tc_change.tc_rpc_rate;
432         char    *next_name = change->u.tc_change.tc_next_name;
433         int      rc;
434
435         if (rate != 0) {
436                 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
437                                               rate);
438                 if (rc)
439                         return rc;
440         }
441
442         if (next_name) {
443                 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
444                                               next_name);
445                 if (rc)
446                         return rc;
447         }
448
449         return 0;
450 }
451
452 static int
453 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
454                   struct nrs_tbf_head *head,
455                   struct nrs_tbf_cmd *stop)
456 {
457         struct nrs_tbf_rule *rule;
458
459         assert_spin_locked(&policy->pol_nrs->nrs_lock);
460
461         if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
462                 return -EPERM;
463
464         rule = nrs_tbf_rule_find(head, stop->tc_name);
465         if (rule == NULL)
466                 return -ENOENT;
467
468         list_del_init(&rule->tr_linkage);
469         rule->tr_flags |= NTRS_STOPPING;
470         nrs_tbf_rule_put(rule);
471         nrs_tbf_rule_put(rule);
472
473         return 0;
474 }
475
476 static int
477 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
478                 struct nrs_tbf_head *head,
479                 struct nrs_tbf_cmd *cmd)
480 {
481         int rc;
482
483         assert_spin_locked(&policy->pol_nrs->nrs_lock);
484
485         switch (cmd->tc_cmd) {
486         case NRS_CTL_TBF_START_RULE:
487                 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
488                         return -EINVAL;
489
490                 spin_unlock(&policy->pol_nrs->nrs_lock);
491                 rc = nrs_tbf_rule_start(policy, head, cmd);
492                 spin_lock(&policy->pol_nrs->nrs_lock);
493                 return rc;
494         case NRS_CTL_TBF_CHANGE_RULE:
495                 rc = nrs_tbf_rule_change(policy, head, cmd);
496                 return rc;
497         case NRS_CTL_TBF_STOP_RULE:
498                 rc = nrs_tbf_rule_stop(policy, head, cmd);
499                 /* Take it as a success, if not exists at all */
500                 return rc == -ENOENT ? 0 : rc;
501         default:
502                 return -EFAULT;
503         }
504 }
505
506 /**
507  * Binary heap predicate.
508  *
509  * \param[in] e1 the first binheap node to compare
510  * \param[in] e2 the second binheap node to compare
511  *
512  * \retval 0 e1 > e2
513  * \retval 1 e1 < e2
514  */
515 static int
516 tbf_cli_compare(struct binheap_node *e1, struct binheap_node *e2)
517 {
518         struct nrs_tbf_client *cli1;
519         struct nrs_tbf_client *cli2;
520
521         cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
522         cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
523
524         if (cli1->tc_deadline < cli2->tc_deadline)
525                 return 1;
526         else if (cli1->tc_deadline > cli2->tc_deadline)
527                 return 0;
528
529         if (cli1->tc_check_time < cli2->tc_check_time)
530                 return 1;
531         else if (cli1->tc_check_time > cli2->tc_check_time)
532                 return 0;
533
534         /* Maybe need more comparasion, e.g. request number in the rules */
535         return 1;
536 }
537
538 /**
539  * TBF binary heap operations
540  */
541 static struct binheap_ops nrs_tbf_heap_ops = {
542         .hop_enter      = NULL,
543         .hop_exit       = NULL,
544         .hop_compare    = tbf_cli_compare,
545 };
546
547 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
548                                   unsigned mask)
549 {
550         return cfs_hash_djb2_hash(key, strlen(key), mask);
551 }
552
553 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
554 {
555         struct nrs_tbf_client *cli = hlist_entry(hnode,
556                                                      struct nrs_tbf_client,
557                                                      tc_hnode);
558
559         return (strcmp(cli->tc_jobid, key) == 0);
560 }
561
562 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
563 {
564         struct nrs_tbf_client *cli = hlist_entry(hnode,
565                                                      struct nrs_tbf_client,
566                                                      tc_hnode);
567
568         return cli->tc_jobid;
569 }
570
571 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
572 {
573         return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
574 }
575
576 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
577 {
578         struct nrs_tbf_client *cli = hlist_entry(hnode,
579                                                      struct nrs_tbf_client,
580                                                      tc_hnode);
581
582         atomic_inc(&cli->tc_ref);
583 }
584
585 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
586 {
587         struct nrs_tbf_client *cli = hlist_entry(hnode,
588                                                      struct nrs_tbf_client,
589                                                      tc_hnode);
590
591         atomic_dec(&cli->tc_ref);
592 }
593
594 static void
595 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
596
597 {
598         struct nrs_tbf_client *cli = hlist_entry(hnode,
599                                                  struct nrs_tbf_client,
600                                                  tc_hnode);
601
602         LASSERT(atomic_read(&cli->tc_ref) == 0);
603         nrs_tbf_cli_fini(cli);
604 }
605
606 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
607         .hs_hash        = nrs_tbf_jobid_hop_hash,
608         .hs_keycmp      = nrs_tbf_jobid_hop_keycmp,
609         .hs_key         = nrs_tbf_jobid_hop_key,
610         .hs_object      = nrs_tbf_hop_object,
611         .hs_get         = nrs_tbf_jobid_hop_get,
612         .hs_put         = nrs_tbf_jobid_hop_put,
613         .hs_put_locked  = nrs_tbf_jobid_hop_put,
614         .hs_exit        = nrs_tbf_jobid_hop_exit,
615 };
616
617 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
618                                   CFS_HASH_NO_ITEMREF | \
619                                   CFS_HASH_DEPTH)
620
621 static struct nrs_tbf_client *
622 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
623                           struct cfs_hash_bd *bd,
624                           const char *jobid)
625 {
626         struct hlist_node *hnode;
627         struct nrs_tbf_client *cli;
628
629         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
630         if (hnode == NULL)
631                 return NULL;
632
633         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
634         if (!list_empty(&cli->tc_lru))
635                 list_del_init(&cli->tc_lru);
636         return cli;
637 }
638
639 #define NRS_TBF_JOBID_NULL ""
640
641 static struct nrs_tbf_client *
642 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
643                        struct ptlrpc_request *req)
644 {
645         const char              *jobid;
646         struct nrs_tbf_client   *cli;
647         struct cfs_hash         *hs = head->th_cli_hash;
648         struct cfs_hash_bd               bd;
649
650         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
651         if (jobid == NULL)
652                 jobid = NRS_TBF_JOBID_NULL;
653         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
654         cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
655         cfs_hash_bd_unlock(hs, &bd, 1);
656
657         return cli;
658 }
659
660 static struct nrs_tbf_client *
661 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
662                           struct nrs_tbf_client *cli)
663 {
664         const char              *jobid;
665         struct nrs_tbf_client   *ret;
666         struct cfs_hash         *hs = head->th_cli_hash;
667         struct cfs_hash_bd               bd;
668
669         jobid = cli->tc_jobid;
670         cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
671         ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
672         if (ret == NULL) {
673                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
674                 ret = cli;
675         }
676         cfs_hash_bd_unlock(hs, &bd, 1);
677
678         return ret;
679 }
680
681 static void
682 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
683                       struct nrs_tbf_client *cli)
684 {
685         struct cfs_hash_bd               bd;
686         struct cfs_hash         *hs = head->th_cli_hash;
687         struct nrs_tbf_bucket   *bkt;
688         int                      hw;
689         LIST_HEAD(zombies);
690
691         cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
692         bkt = cfs_hash_bd_extra_get(hs, &bd);
693         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
694                 return;
695         LASSERT(list_empty(&cli->tc_lru));
696         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
697
698         /*
699          * Check and purge the LRU, there is at least one client in the LRU.
700          */
701         hw = tbf_jobid_cache_size >>
702              (hs->hs_cur_bits - hs->hs_bkt_bits);
703         while (cfs_hash_bd_count_get(&bd) > hw) {
704                 if (unlikely(list_empty(&bkt->ntb_lru)))
705                         break;
706                 cli = list_entry(bkt->ntb_lru.next,
707                                      struct nrs_tbf_client,
708                                      tc_lru);
709                 LASSERT(atomic_read(&cli->tc_ref) == 0);
710                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
711                 list_move(&cli->tc_lru, &zombies);
712         }
713         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
714
715         while (!list_empty(&zombies)) {
716                 cli = container_of(zombies.next,
717                                    struct nrs_tbf_client, tc_lru);
718                 list_del_init(&cli->tc_lru);
719                 nrs_tbf_cli_fini(cli);
720         }
721 }
722
723 static void
724 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
725                        struct ptlrpc_request *req)
726 {
727         char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
728
729         if (jobid == NULL)
730                 jobid = NRS_TBF_JOBID_NULL;
731         LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
732         INIT_LIST_HEAD(&cli->tc_lru);
733         memcpy(cli->tc_jobid, jobid, strlen(jobid));
734 }
735
736 static int nrs_tbf_jobid_hash_order(void)
737 {
738         int bits;
739
740         for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
741                 ;
742
743         return bits;
744 }
745
746 #define NRS_TBF_JOBID_BKT_BITS 10
747
748 static int
749 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
750                       struct nrs_tbf_head *head)
751 {
752         struct nrs_tbf_cmd       start;
753         struct nrs_tbf_bucket   *bkt;
754         int                      bits;
755         int                      i;
756         int                      rc;
757         struct cfs_hash_bd       bd;
758
759         bits = nrs_tbf_jobid_hash_order();
760         if (bits < NRS_TBF_JOBID_BKT_BITS)
761                 bits = NRS_TBF_JOBID_BKT_BITS;
762         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
763                                             bits,
764                                             bits,
765                                             NRS_TBF_JOBID_BKT_BITS,
766                                             sizeof(*bkt),
767                                             0,
768                                             0,
769                                             &nrs_tbf_jobid_hash_ops,
770                                             NRS_TBF_JOBID_HASH_FLAGS);
771         if (head->th_cli_hash == NULL)
772                 return -ENOMEM;
773
774         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
775                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
776                 INIT_LIST_HEAD(&bkt->ntb_lru);
777         }
778
779         memset(&start, 0, sizeof(start));
780         start.u.tc_start.ts_jobids_str = "*";
781
782         start.u.tc_start.ts_rpc_rate = tbf_rate;
783         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
784         start.tc_name = NRS_TBF_DEFAULT_RULE;
785         INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
786         rc = nrs_tbf_rule_start(policy, head, &start);
787         if (rc) {
788                 cfs_hash_putref(head->th_cli_hash);
789                 head->th_cli_hash = NULL;
790         }
791
792         return rc;
793 }
794
795 /**
796  * Frees jobid of \a list.
797  *
798  */
799 static void
800 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
801 {
802         struct nrs_tbf_jobid *jobid, *n;
803
804         list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
805                 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
806                 list_del(&jobid->tj_linkage);
807                 OBD_FREE_PTR(jobid);
808         }
809 }
810
811 static int
812 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
813 {
814         struct nrs_tbf_jobid *jobid;
815         char *ptr;
816
817         OBD_ALLOC_PTR(jobid);
818         if (jobid == NULL)
819                 return -ENOMEM;
820
821         OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
822         if (jobid->tj_id == NULL) {
823                 OBD_FREE_PTR(jobid);
824                 return -ENOMEM;
825         }
826
827         memcpy(jobid->tj_id, id->ls_str, id->ls_len);
828         ptr = lprocfs_strnstr(id->ls_str, "*", id->ls_len);
829         if (ptr == NULL)
830                 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
831         else
832                 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
833
834         list_add_tail(&jobid->tj_linkage, jobid_list);
835         return 0;
836 }
837
838 static bool
839 cfs_match_wildcard(const char *pattern, const char *content)
840 {
841         if (*pattern == '\0' && *content == '\0')
842                 return true;
843
844         if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
845                 return false;
846
847         while (*pattern == *content) {
848                 pattern++;
849                 content++;
850                 if (*pattern == '\0' && *content == '\0')
851                         return true;
852
853                 if (*pattern == '*' && *(pattern + 1) != '\0' &&
854                     *content == '\0')
855                         return false;
856         }
857
858         if (*pattern == '*')
859                 return (cfs_match_wildcard(pattern + 1, content) ||
860                         cfs_match_wildcard(pattern, content + 1));
861
862         return false;
863 }
864
865 static inline bool
866 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
867 {
868         if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
869                 return strcmp(jobid->tj_id, id) == 0;
870
871         if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
872                 return cfs_match_wildcard(jobid->tj_id, id);
873
874         return false;
875 }
876
877 static int
878 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
879 {
880         struct nrs_tbf_jobid *jobid;
881
882         list_for_each_entry(jobid, jobid_list, tj_linkage) {
883                 if (nrs_tbf_jobid_match(jobid, id))
884                         return 1;
885         }
886         return 0;
887 }
888
889 static int
890 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
891 {
892         struct cfs_lstr src;
893         struct cfs_lstr res;
894         int rc = 0;
895         ENTRY;
896
897         src.ls_str = str;
898         src.ls_len = len;
899         INIT_LIST_HEAD(jobid_list);
900         while (src.ls_str) {
901                 rc = cfs_gettok(&src, ' ', &res);
902                 if (rc == 0) {
903                         rc = -EINVAL;
904                         break;
905                 }
906                 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
907                 if (rc)
908                         break;
909         }
910         if (rc)
911                 nrs_tbf_jobid_list_free(jobid_list);
912         RETURN(rc);
913 }
914
915 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
916 {
917         if (!list_empty(&cmd->u.tc_start.ts_jobids))
918                 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
919         if (cmd->u.tc_start.ts_jobids_str)
920                 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
921                          strlen(cmd->u.tc_start.ts_jobids_str) + 1);
922 }
923
924 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
925 {
926         struct cfs_lstr res;
927         int keylen = strlen(key);
928         int rc;
929
930         rc = cfs_gettok(src, '=', &res);
931         if (rc == 0 || res.ls_len != keylen ||
932             strncmp(res.ls_str, key, keylen) != 0 ||
933             !src->ls_str || src->ls_len <= 2 ||
934             src->ls_str[0] != '{' || src->ls_str[src->ls_len - 1] != '}')
935                 return -EINVAL;
936
937         /* Skip '{' and '}' */
938         src->ls_str++;
939         src->ls_len -= 2;
940         return 0;
941 }
942
943 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
944 {
945         struct cfs_lstr src;
946         int rc;
947
948         src.ls_str = id;
949         src.ls_len = strlen(id);
950         rc = nrs_tbf_check_id_value(&src, "jobid");
951         if (rc)
952                 return rc;
953
954         OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
955         if (cmd->u.tc_start.ts_jobids_str == NULL)
956                 return -ENOMEM;
957
958         memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
959
960         /* parse jobid list */
961         rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
962                                       strlen(cmd->u.tc_start.ts_jobids_str),
963                                       &cmd->u.tc_start.ts_jobids);
964         if (rc)
965                 nrs_tbf_jobid_cmd_fini(cmd);
966
967         return rc;
968 }
969
970 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
971                                    struct nrs_tbf_rule *rule,
972                                    struct nrs_tbf_cmd *start)
973 {
974         int rc = 0;
975
976         LASSERT(start->u.tc_start.ts_jobids_str);
977         OBD_ALLOC(rule->tr_jobids_str,
978                   strlen(start->u.tc_start.ts_jobids_str) + 1);
979         if (rule->tr_jobids_str == NULL)
980                 return -ENOMEM;
981
982         memcpy(rule->tr_jobids_str,
983                start->u.tc_start.ts_jobids_str,
984                strlen(start->u.tc_start.ts_jobids_str));
985
986         INIT_LIST_HEAD(&rule->tr_jobids);
987         if (!list_empty(&start->u.tc_start.ts_jobids)) {
988                 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
989                                               strlen(rule->tr_jobids_str),
990                                               &rule->tr_jobids);
991                 if (rc)
992                         CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
993         }
994         if (rc)
995                 OBD_FREE(rule->tr_jobids_str,
996                          strlen(start->u.tc_start.ts_jobids_str) + 1);
997         return rc;
998 }
999
1000 static int
1001 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1002 {
1003         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1004                    rule->tr_jobids_str, rule->tr_rpc_rate,
1005                    atomic_read(&rule->tr_ref) - 1);
1006         return 0;
1007 }
1008
1009 static int
1010 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1011                          struct nrs_tbf_client *cli)
1012 {
1013         return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1014 }
1015
1016 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1017 {
1018         if (!list_empty(&rule->tr_jobids))
1019                 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1020         LASSERT(rule->tr_jobids_str != NULL);
1021         OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1022 }
1023
1024 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1025         .o_name = NRS_TBF_TYPE_JOBID,
1026         .o_startup = nrs_tbf_jobid_startup,
1027         .o_cli_find = nrs_tbf_jobid_cli_find,
1028         .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1029         .o_cli_put = nrs_tbf_jobid_cli_put,
1030         .o_cli_init = nrs_tbf_jobid_cli_init,
1031         .o_rule_init = nrs_tbf_jobid_rule_init,
1032         .o_rule_dump = nrs_tbf_jobid_rule_dump,
1033         .o_rule_match = nrs_tbf_jobid_rule_match,
1034         .o_rule_fini = nrs_tbf_jobid_rule_fini,
1035 };
1036
1037 /**
1038  * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1039  *
1040  * This uses ptlrpc_request::rq_peer.nid (as nid4) as its key, in order to hash
1041  * nrs_tbf_client objects.
1042  */
1043 #define NRS_TBF_NID_BKT_BITS    8
1044 #define NRS_TBF_NID_BITS        16
1045
1046 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1047                                   unsigned mask)
1048 {
1049         return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1050 }
1051
1052 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1053 {
1054         const struct lnet_nid *nid = key;
1055         struct nrs_tbf_client *cli = hlist_entry(hnode,
1056                                                  struct nrs_tbf_client,
1057                                                  tc_hnode);
1058
1059         return nid_same(nid, &cli->tc_nid);
1060 }
1061
1062 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1063 {
1064         struct nrs_tbf_client *cli = hlist_entry(hnode,
1065                                                  struct nrs_tbf_client,
1066                                                  tc_hnode);
1067
1068         return &cli->tc_nid;
1069 }
1070
1071 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1072 {
1073         struct nrs_tbf_client *cli = hlist_entry(hnode,
1074                                                  struct nrs_tbf_client,
1075                                                  tc_hnode);
1076
1077         atomic_inc(&cli->tc_ref);
1078 }
1079
1080 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1081 {
1082         struct nrs_tbf_client *cli = hlist_entry(hnode,
1083                                                  struct nrs_tbf_client,
1084                                                  tc_hnode);
1085
1086         atomic_dec(&cli->tc_ref);
1087 }
1088
1089 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1090 {
1091         struct nrs_tbf_client *cli = hlist_entry(hnode,
1092                                                  struct nrs_tbf_client,
1093                                                  tc_hnode);
1094
1095         LASSERTF(atomic_read(&cli->tc_ref) == 0,
1096                  "Busy TBF object from client with NID %s, with %d refs\n",
1097                  libcfs_nidstr(&cli->tc_nid), atomic_read(&cli->tc_ref));
1098
1099         nrs_tbf_cli_fini(cli);
1100 }
1101
1102 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1103         .hs_hash        = nrs_tbf_nid_hop_hash,
1104         .hs_keycmp      = nrs_tbf_nid_hop_keycmp,
1105         .hs_key         = nrs_tbf_nid_hop_key,
1106         .hs_object      = nrs_tbf_hop_object,
1107         .hs_get         = nrs_tbf_nid_hop_get,
1108         .hs_put         = nrs_tbf_nid_hop_put,
1109         .hs_put_locked  = nrs_tbf_nid_hop_put,
1110         .hs_exit        = nrs_tbf_nid_hop_exit,
1111 };
1112
1113 static struct nrs_tbf_client *
1114 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1115                      struct ptlrpc_request *req)
1116 {
1117         lnet_nid_t nid4 = lnet_nid_to_nid4(&req->rq_peer.nid);
1118
1119         return cfs_hash_lookup(head->th_cli_hash, &nid4);
1120 }
1121
1122 static struct nrs_tbf_client *
1123 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1124                         struct nrs_tbf_client *cli)
1125 {
1126         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1127                                        &cli->tc_hnode);
1128 }
1129
1130 static void
1131 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1132                       struct nrs_tbf_client *cli)
1133 {
1134         cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1135 }
1136
1137 static int
1138 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1139                     struct nrs_tbf_head *head)
1140 {
1141         struct nrs_tbf_cmd      start;
1142         int rc;
1143
1144         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1145                                             NRS_TBF_NID_BITS,
1146                                             NRS_TBF_NID_BITS,
1147                                             NRS_TBF_NID_BKT_BITS, 0,
1148                                             CFS_HASH_MIN_THETA,
1149                                             CFS_HASH_MAX_THETA,
1150                                             &nrs_tbf_nid_hash_ops,
1151                                             CFS_HASH_RW_BKTLOCK);
1152         if (head->th_cli_hash == NULL)
1153                 return -ENOMEM;
1154
1155         memset(&start, 0, sizeof(start));
1156         start.u.tc_start.ts_nids_str = "*";
1157
1158         start.u.tc_start.ts_rpc_rate = tbf_rate;
1159         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1160         start.tc_name = NRS_TBF_DEFAULT_RULE;
1161         INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1162         rc = nrs_tbf_rule_start(policy, head, &start);
1163         if (rc) {
1164                 cfs_hash_putref(head->th_cli_hash);
1165                 head->th_cli_hash = NULL;
1166         }
1167
1168         return rc;
1169 }
1170
1171 static void
1172 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1173                              struct ptlrpc_request *req)
1174 {
1175         cli->tc_nid = req->rq_peer.nid;
1176 }
1177
1178 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1179                                  struct nrs_tbf_rule *rule,
1180                                  struct nrs_tbf_cmd *start)
1181 {
1182         LASSERT(start->u.tc_start.ts_nids_str);
1183         OBD_ALLOC(rule->tr_nids_str,
1184                   strlen(start->u.tc_start.ts_nids_str) + 1);
1185         if (rule->tr_nids_str == NULL)
1186                 return -ENOMEM;
1187
1188         memcpy(rule->tr_nids_str,
1189                start->u.tc_start.ts_nids_str,
1190                strlen(start->u.tc_start.ts_nids_str));
1191
1192         INIT_LIST_HEAD(&rule->tr_nids);
1193         if (!list_empty(&start->u.tc_start.ts_nids)) {
1194                 if (cfs_parse_nidlist(rule->tr_nids_str,
1195                                       strlen(rule->tr_nids_str),
1196                                       &rule->tr_nids) <= 0) {
1197                         CERROR("nids {%s} illegal\n",
1198                                rule->tr_nids_str);
1199                         OBD_FREE(rule->tr_nids_str,
1200                                  strlen(start->u.tc_start.ts_nids_str) + 1);
1201                         return -EINVAL;
1202                 }
1203         }
1204         return 0;
1205 }
1206
1207 static int
1208 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1209 {
1210         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1211                    rule->tr_nids_str, rule->tr_rpc_rate,
1212                    atomic_read(&rule->tr_ref) - 1);
1213         return 0;
1214 }
1215
1216 static int
1217 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1218                        struct nrs_tbf_client *cli)
1219 {
1220         return cfs_match_nid(&cli->tc_nid, &rule->tr_nids);
1221 }
1222
1223 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1224 {
1225         if (!list_empty(&rule->tr_nids))
1226                 cfs_free_nidlist(&rule->tr_nids);
1227         LASSERT(rule->tr_nids_str != NULL);
1228         OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1229 }
1230
1231 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1232 {
1233         if (!list_empty(&cmd->u.tc_start.ts_nids))
1234                 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1235         if (cmd->u.tc_start.ts_nids_str)
1236                 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1237                          strlen(cmd->u.tc_start.ts_nids_str) + 1);
1238 }
1239
1240 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1241 {
1242         struct cfs_lstr src;
1243         int rc;
1244
1245         src.ls_str = id;
1246         src.ls_len = strlen(id);
1247         rc = nrs_tbf_check_id_value(&src, "nid");
1248         if (rc)
1249                 return rc;
1250
1251         OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1252         if (cmd->u.tc_start.ts_nids_str == NULL)
1253                 return -ENOMEM;
1254
1255         memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1256
1257         /* parse NID list */
1258         if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1259                               strlen(cmd->u.tc_start.ts_nids_str),
1260                               &cmd->u.tc_start.ts_nids) <= 0) {
1261                 nrs_tbf_nid_cmd_fini(cmd);
1262                 return -EINVAL;
1263         }
1264
1265         return 0;
1266 }
1267
1268 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1269         .o_name = NRS_TBF_TYPE_NID,
1270         .o_startup = nrs_tbf_nid_startup,
1271         .o_cli_find = nrs_tbf_nid_cli_find,
1272         .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1273         .o_cli_put = nrs_tbf_nid_cli_put,
1274         .o_cli_init = nrs_tbf_nid_cli_init,
1275         .o_rule_init = nrs_tbf_nid_rule_init,
1276         .o_rule_dump = nrs_tbf_nid_rule_dump,
1277         .o_rule_match = nrs_tbf_nid_rule_match,
1278         .o_rule_fini = nrs_tbf_nid_rule_fini,
1279 };
1280
1281 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1282                                  unsigned mask)
1283 {
1284         return cfs_hash_djb2_hash(key, strlen(key), mask);
1285 }
1286
1287 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1288 {
1289         struct nrs_tbf_client *cli = hlist_entry(hnode,
1290                                                  struct nrs_tbf_client,
1291                                                  tc_hnode);
1292
1293         return (strcmp(cli->tc_key, key) == 0);
1294 }
1295
1296 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1297 {
1298         struct nrs_tbf_client *cli = hlist_entry(hnode,
1299                                                  struct nrs_tbf_client,
1300                                                  tc_hnode);
1301         return cli->tc_key;
1302 }
1303
1304 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1305 {
1306         struct nrs_tbf_client *cli = hlist_entry(hnode,
1307                                                  struct nrs_tbf_client,
1308                                                  tc_hnode);
1309
1310         atomic_inc(&cli->tc_ref);
1311 }
1312
1313 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1314 {
1315         struct nrs_tbf_client *cli = hlist_entry(hnode,
1316                                                  struct nrs_tbf_client,
1317                                                  tc_hnode);
1318
1319         atomic_dec(&cli->tc_ref);
1320 }
1321
1322 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1323
1324 {
1325         struct nrs_tbf_client *cli = hlist_entry(hnode,
1326                                                  struct nrs_tbf_client,
1327                                                  tc_hnode);
1328
1329         LASSERT(atomic_read(&cli->tc_ref) == 0);
1330         nrs_tbf_cli_fini(cli);
1331 }
1332
1333 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1334         .hs_hash        = nrs_tbf_hop_hash,
1335         .hs_keycmp      = nrs_tbf_hop_keycmp,
1336         .hs_key         = nrs_tbf_hop_key,
1337         .hs_object      = nrs_tbf_hop_object,
1338         .hs_get         = nrs_tbf_hop_get,
1339         .hs_put         = nrs_tbf_hop_put,
1340         .hs_put_locked  = nrs_tbf_hop_put,
1341         .hs_exit        = nrs_tbf_hop_exit,
1342 };
1343
1344 #define NRS_TBF_GENERIC_BKT_BITS        10
1345 #define NRS_TBF_GENERIC_HASH_FLAGS      (CFS_HASH_SPIN_BKTLOCK | \
1346                                         CFS_HASH_NO_ITEMREF | \
1347                                         CFS_HASH_DEPTH)
1348
1349 static int
1350 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1351 {
1352         struct nrs_tbf_cmd       start;
1353         struct nrs_tbf_bucket   *bkt;
1354         int                      bits;
1355         int                      i;
1356         int                      rc;
1357         struct cfs_hash_bd       bd;
1358
1359         bits = nrs_tbf_jobid_hash_order();
1360         if (bits < NRS_TBF_GENERIC_BKT_BITS)
1361                 bits = NRS_TBF_GENERIC_BKT_BITS;
1362         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1363                                             bits, bits,
1364                                             NRS_TBF_GENERIC_BKT_BITS,
1365                                             sizeof(*bkt), 0, 0,
1366                                             &nrs_tbf_hash_ops,
1367                                             NRS_TBF_GENERIC_HASH_FLAGS);
1368         if (head->th_cli_hash == NULL)
1369                 return -ENOMEM;
1370
1371         cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1372                 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1373                 INIT_LIST_HEAD(&bkt->ntb_lru);
1374         }
1375
1376         memset(&start, 0, sizeof(start));
1377         start.u.tc_start.ts_conds_str = "*";
1378
1379         start.u.tc_start.ts_rpc_rate = tbf_rate;
1380         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1381         start.tc_name = NRS_TBF_DEFAULT_RULE;
1382         INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1383         rc = nrs_tbf_rule_start(policy, head, &start);
1384         if (rc)
1385                 cfs_hash_putref(head->th_cli_hash);
1386
1387         return rc;
1388 }
1389
1390 static struct nrs_tbf_client *
1391 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1392                         const char *key)
1393 {
1394         struct hlist_node *hnode;
1395         struct nrs_tbf_client *cli;
1396
1397         hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1398         if (hnode == NULL)
1399                 return NULL;
1400
1401         cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
1402         if (!list_empty(&cli->tc_lru))
1403                 list_del_init(&cli->tc_lru);
1404         return cli;
1405 }
1406
1407 /**
1408  * ONLY opcode presented in this function will be checked in
1409  * nrs_tbf_id_cli_set(). That means, we can add or remove an
1410  * opcode to enable or disable requests handled in nrs_tbf
1411  */
1412 static struct req_format *req_fmt(__u32 opcode)
1413 {
1414         switch (opcode) {
1415         case OST_GETATTR:
1416                 return &RQF_OST_GETATTR;
1417         case OST_SETATTR:
1418                 return &RQF_OST_SETATTR;
1419         case OST_READ:
1420                 return &RQF_OST_BRW_READ;
1421         case OST_WRITE:
1422                 return &RQF_OST_BRW_WRITE;
1423         /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1424          * in most case. Should they be removed? */
1425         case OST_CREATE:
1426                 return &RQF_OST_CREATE;
1427         case OST_DESTROY:
1428                 return &RQF_OST_DESTROY;
1429         case OST_PUNCH:
1430                 return &RQF_OST_PUNCH;
1431         case OST_SYNC:
1432                 return &RQF_OST_SYNC;
1433         case OST_LADVISE:
1434                 return &RQF_OST_LADVISE;
1435         case MDS_GETATTR:
1436                 return &RQF_MDS_GETATTR;
1437         case MDS_GETATTR_NAME:
1438                 return &RQF_MDS_GETATTR_NAME;
1439         /* close is skipped to avoid LDLM cancel slowness */
1440 #if 0
1441         case MDS_CLOSE:
1442                 return &RQF_MDS_CLOSE;
1443 #endif
1444         case MDS_REINT:
1445                 return &RQF_MDS_REINT;
1446         case MDS_READPAGE:
1447                 return &RQF_MDS_READPAGE;
1448         case MDS_GET_ROOT:
1449                 return &RQF_MDS_GET_ROOT;
1450         case MDS_STATFS:
1451                 return &RQF_MDS_STATFS;
1452         case MDS_SYNC:
1453                 return &RQF_MDS_SYNC;
1454         case MDS_QUOTACTL:
1455                 return &RQF_MDS_QUOTACTL;
1456         case MDS_GETXATTR:
1457                 return &RQF_MDS_GETXATTR;
1458         case MDS_GET_INFO:
1459                 return &RQF_MDS_GET_INFO;
1460         /* HSM op is skipped */
1461 #if 0 
1462         case MDS_HSM_STATE_GET:
1463                 return &RQF_MDS_HSM_STATE_GET;
1464         case MDS_HSM_STATE_SET:
1465                 return &RQF_MDS_HSM_STATE_SET;
1466         case MDS_HSM_ACTION:
1467                 return &RQF_MDS_HSM_ACTION;
1468         case MDS_HSM_CT_REGISTER:
1469                 return &RQF_MDS_HSM_CT_REGISTER;
1470         case MDS_HSM_CT_UNREGISTER:
1471                 return &RQF_MDS_HSM_CT_UNREGISTER;
1472 #endif
1473         case MDS_SWAP_LAYOUTS:
1474                 return &RQF_MDS_SWAP_LAYOUTS;
1475         case LDLM_ENQUEUE:
1476                 return &RQF_LDLM_ENQUEUE;
1477         default:
1478                 return NULL;
1479         }
1480 }
1481
1482 static struct req_format *intent_req_fmt(__u32 it_opc)
1483 {
1484         if (it_opc & (IT_OPEN | IT_CREAT))
1485                 return &RQF_LDLM_INTENT_OPEN;
1486         else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1487                 return &RQF_LDLM_INTENT_GETATTR;
1488         else if (it_opc & IT_GETXATTR)
1489                 return &RQF_LDLM_INTENT_GETXATTR;
1490         else if (it_opc & (IT_GLIMPSE | IT_BRW))
1491                 return &RQF_LDLM_INTENT;
1492         else
1493                 return NULL;
1494 }
1495
1496 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1497                               struct tbf_id *id)
1498 {
1499         struct ost_body *body;
1500
1501         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1502         if (body != NULL) {
1503                 id->ti_uid = body->oa.o_uid;
1504                 id->ti_gid = body->oa.o_gid;
1505                 return 0;
1506         }
1507
1508         return -EINVAL;
1509 }
1510
1511 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1512                                       struct tbf_id *id)
1513 {
1514         struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1515                                                     &RMF_MDT_BODY);
1516         LASSERT(b != NULL);
1517
1518         /* TODO: nodemaping feature converts {ug}id from individual
1519          * clients to the actual ones of the file system. Some work
1520          * may be needed to fix this. */
1521         id->ti_uid = b->mbo_uid;
1522         id->ti_gid = b->mbo_gid;
1523 }
1524
1525 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1526                                            struct tbf_id *id)
1527 {
1528         struct mdt_rec_reint *rec;
1529
1530         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1531         LASSERT(rec != NULL);
1532
1533         /* use the fs{ug}id as {ug}id of the process */
1534         id->ti_uid = rec->rr_fsuid;
1535         id->ti_gid = rec->rr_fsgid;
1536 }
1537
1538 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1539                               struct tbf_id *id)
1540 {
1541         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1542         int rc = 0;
1543
1544         switch (opc) {
1545         case MDS_GETATTR:
1546         case MDS_GETATTR_NAME:
1547         case MDS_GET_ROOT:
1548         case MDS_READPAGE:
1549         case MDS_SYNC:
1550         case MDS_GETXATTR:
1551         case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1552                 unpack_ugid_from_mdt_body(req, id);
1553                 break;
1554         case MDS_CLOSE:
1555         case MDS_REINT:
1556                 unpack_ugid_from_mdt_rec_reint(req, id);
1557                 break;
1558         default:
1559                 rc = -EINVAL;
1560                 break;
1561         }
1562         return rc;
1563 }
1564
1565 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1566                               struct tbf_id *id)
1567 {
1568         struct ldlm_intent *lit;
1569         struct req_format *fmt;
1570
1571         if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1572                 return -EINVAL;
1573
1574         req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1575         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1576         if (lit == NULL)
1577                 return -EINVAL;
1578
1579         fmt = intent_req_fmt(lit->opc);
1580         if (fmt == NULL)
1581                 return -EINVAL;
1582
1583         req_capsule_extend(&req->rq_pill, fmt);
1584
1585         if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1586                 unpack_ugid_from_mdt_body(req, id);
1587         else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1588                 unpack_ugid_from_mdt_rec_reint(req, id);
1589         else
1590                 return -EINVAL;
1591         return 0;
1592 }
1593
1594 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1595                               enum nrs_tbf_flag ti_type)
1596 {
1597         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1598         struct req_format *fmt = req_fmt(opc);
1599         bool fmt_unset = false;
1600         int rc;
1601
1602         memset(id, 0, sizeof(struct tbf_id));
1603         id->ti_type = ti_type;
1604
1605         rc = lustre_msg_get_uid_gid(req->rq_reqmsg, &id->ti_uid, &id->ti_gid);
1606         if (!rc && id->ti_uid != (u32) -1 && id->ti_gid != (u32) -1)
1607                 return 0;
1608
1609         /* client req doesn't have uid/gid pack in ptlrpc_body
1610          * --> fallback to the old method
1611          */
1612         if (fmt == NULL)
1613                 return -EINVAL;
1614         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1615         if (req->rq_pill.rc_fmt == NULL) {
1616                 req_capsule_set(&req->rq_pill, fmt);
1617                 fmt_unset = true;
1618         }
1619
1620         if (opc < OST_LAST_OPC)
1621                 rc = ost_tbf_id_cli_set(req, id);
1622         else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1623                 rc = mdt_tbf_id_cli_set(req, id);
1624         else if (opc == LDLM_ENQUEUE)
1625                 rc = ldlm_tbf_id_cli_set(req, id);
1626         else
1627                 rc = -EINVAL;
1628
1629         /* restore it to the initialized state */
1630         if (fmt_unset)
1631                 req->rq_pill.rc_fmt = NULL;
1632         return rc;
1633 }
1634
1635 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1636                                        struct ptlrpc_request *req,
1637                                        char *keystr, size_t keystr_sz)
1638 {
1639         const char *jobid;
1640         u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1641         struct tbf_id id;
1642
1643         nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1644         jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1645         if (jobid == NULL)
1646                 jobid = NRS_TBF_JOBID_NULL;
1647
1648         snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1649                  libcfs_nidstr(&req->rq_peer.nid), opc, id.ti_uid,
1650                  id.ti_gid);
1651
1652         if (cli) {
1653                 INIT_LIST_HEAD(&cli->tc_lru);
1654                 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1655                 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1656                 cli->tc_nid = req->rq_peer.nid;
1657                 cli->tc_opcode = opc;
1658                 cli->tc_id = id;
1659         }
1660 }
1661
1662 static struct nrs_tbf_client *
1663 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1664 {
1665         struct nrs_tbf_client *cli;
1666         struct cfs_hash *hs = head->th_cli_hash;
1667         struct cfs_hash_bd bd;
1668         char keystr[NRS_TBF_KEY_LEN];
1669
1670         nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1671         cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1672         cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1673         cfs_hash_bd_unlock(hs, &bd, 1);
1674
1675         return cli;
1676 }
1677
1678 static struct nrs_tbf_client *
1679 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1680                     struct nrs_tbf_client *cli)
1681 {
1682         const char              *key;
1683         struct nrs_tbf_client   *ret;
1684         struct cfs_hash         *hs = head->th_cli_hash;
1685         struct cfs_hash_bd       bd;
1686
1687         key = cli->tc_key;
1688         cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1689         ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1690         if (ret == NULL) {
1691                 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1692                 ret = cli;
1693         }
1694         cfs_hash_bd_unlock(hs, &bd, 1);
1695
1696         return ret;
1697 }
1698
1699 static void
1700 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1701 {
1702         struct cfs_hash_bd       bd;
1703         struct cfs_hash         *hs = head->th_cli_hash;
1704         struct nrs_tbf_bucket   *bkt;
1705         int                      hw;
1706         LIST_HEAD(zombies);
1707
1708         cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1709         bkt = cfs_hash_bd_extra_get(hs, &bd);
1710         if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1711                 return;
1712         LASSERT(list_empty(&cli->tc_lru));
1713         list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1714
1715         /**
1716          * Check and purge the LRU, there is at least one client in the LRU.
1717          */
1718         hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1719         while (cfs_hash_bd_count_get(&bd) > hw) {
1720                 if (unlikely(list_empty(&bkt->ntb_lru)))
1721                         break;
1722                 cli = list_entry(bkt->ntb_lru.next,
1723                                  struct nrs_tbf_client,
1724                                  tc_lru);
1725                 LASSERT(atomic_read(&cli->tc_ref) == 0);
1726                 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1727                 list_move(&cli->tc_lru, &zombies);
1728         }
1729         cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1730
1731         while (!list_empty(&zombies)) {
1732                 cli = container_of(zombies.next,
1733                                    struct nrs_tbf_client, tc_lru);
1734                 list_del_init(&cli->tc_lru);
1735                 nrs_tbf_cli_fini(cli);
1736         }
1737 }
1738
1739 static void
1740 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1741                          struct ptlrpc_request *req)
1742 {
1743         char keystr[NRS_TBF_KEY_LEN];
1744
1745         nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1746 }
1747
1748 static void
1749 nrs_tbf_id_list_free(struct list_head *uid_list)
1750 {
1751         struct nrs_tbf_id *nti_id, *n;
1752
1753         list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1754                 list_del_init(&nti_id->nti_linkage);
1755                 OBD_FREE_PTR(nti_id);
1756         }
1757 }
1758
1759 static void
1760 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1761 {
1762         LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1763                 expr->te_field < NRS_TBF_FIELD_MAX);
1764         switch (expr->te_field) {
1765         case NRS_TBF_FIELD_NID:
1766                 cfs_free_nidlist(&expr->te_cond);
1767                 break;
1768         case NRS_TBF_FIELD_JOBID:
1769                 nrs_tbf_jobid_list_free(&expr->te_cond);
1770                 break;
1771         case NRS_TBF_FIELD_OPCODE:
1772                 bitmap_free(expr->te_opcodes);
1773                 break;
1774         case NRS_TBF_FIELD_UID:
1775         case NRS_TBF_FIELD_GID:
1776                 nrs_tbf_id_list_free(&expr->te_cond);
1777                 break;
1778         default:
1779                 LBUG();
1780         }
1781         OBD_FREE_PTR(expr);
1782 }
1783
1784 static void
1785 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1786 {
1787         struct nrs_tbf_expression *expression;
1788         struct nrs_tbf_expression *n;
1789
1790         LASSERT(list_empty(&conjunction->tc_linkage));
1791         list_for_each_entry_safe(expression, n,
1792                                  &conjunction->tc_expressions,
1793                                  te_linkage) {
1794                 list_del_init(&expression->te_linkage);
1795                 nrs_tbf_expression_free(expression);
1796         }
1797         OBD_FREE_PTR(conjunction);
1798 }
1799
1800 static void
1801 nrs_tbf_conds_free(struct list_head *cond_list)
1802 {
1803         struct nrs_tbf_conjunction *conjunction;
1804         struct nrs_tbf_conjunction *n;
1805
1806         list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1807                 list_del_init(&conjunction->tc_linkage);
1808                 nrs_tbf_conjunction_free(conjunction);
1809         }
1810 }
1811
1812 static void
1813 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1814 {
1815         if (!list_empty(&cmd->u.tc_start.ts_conds))
1816                 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1817         if (cmd->u.tc_start.ts_conds_str)
1818                 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1819                          strlen(cmd->u.tc_start.ts_conds_str) + 1);
1820 }
1821
1822 #define NRS_TBF_DISJUNCTION_DELIM       (',')
1823 #define NRS_TBF_CONJUNCTION_DELIM       ('&')
1824 #define NRS_TBF_EXPRESSION_DELIM        ('=')
1825
1826 static inline bool
1827 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1828 {
1829         int len = strlen(str);
1830
1831         return (field->ls_len == len &&
1832                 strncmp(field->ls_str, str, len) == 0);
1833 }
1834
1835 static int
1836 nrs_tbf_opcode_list_parse(char *str, int len, unsigned long **bitmaptr);
1837 static int
1838 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
1839                       enum nrs_tbf_flag tif);
1840
1841 static int
1842 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1843 {
1844         struct nrs_tbf_expression *expr;
1845         struct cfs_lstr field;
1846         int rc = 0;
1847
1848         OBD_ALLOC_PTR(expr);
1849         if (expr == NULL)
1850                 return -ENOMEM;
1851
1852         rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1853         if (rc == 0 || !src->ls_str || src->ls_len <= 2 ||
1854             src->ls_str[0] != '{' || src->ls_str[src->ls_len - 1] != '}')
1855                 GOTO(out, rc = -EINVAL);
1856
1857         /* Skip '{' and '}' */
1858         src->ls_str++;
1859         src->ls_len -= 2;
1860
1861         if (nrs_tbf_check_field(&field, "nid")) {
1862                 if (cfs_parse_nidlist(src->ls_str,
1863                                       src->ls_len,
1864                                       &expr->te_cond) <= 0)
1865                         GOTO(out, rc = -EINVAL);
1866                 expr->te_field = NRS_TBF_FIELD_NID;
1867         } else if (nrs_tbf_check_field(&field, "jobid")) {
1868                 if (nrs_tbf_jobid_list_parse(src->ls_str,
1869                                              src->ls_len,
1870                                              &expr->te_cond) < 0)
1871                         GOTO(out, rc = -EINVAL);
1872                 expr->te_field = NRS_TBF_FIELD_JOBID;
1873         } else if (nrs_tbf_check_field(&field, "opcode")) {
1874                 if (nrs_tbf_opcode_list_parse(src->ls_str,
1875                                               src->ls_len,
1876                                               &expr->te_opcodes) < 0)
1877                         GOTO(out, rc = -EINVAL);
1878                 expr->te_field = NRS_TBF_FIELD_OPCODE;
1879         } else if (nrs_tbf_check_field(&field, "uid")) {
1880                 if (nrs_tbf_id_list_parse(src->ls_str,
1881                                           src->ls_len,
1882                                           &expr->te_cond,
1883                                           NRS_TBF_FLAG_UID) < 0)
1884                         GOTO(out, rc = -EINVAL);
1885                 expr->te_field = NRS_TBF_FIELD_UID;
1886         } else if (nrs_tbf_check_field(&field, "gid")) {
1887                 if (nrs_tbf_id_list_parse(src->ls_str,
1888                                           src->ls_len,
1889                                           &expr->te_cond,
1890                                           NRS_TBF_FLAG_GID) < 0)
1891                         GOTO(out, rc = -EINVAL);
1892                 expr->te_field = NRS_TBF_FIELD_GID;
1893         } else {
1894                 GOTO(out, rc = -EINVAL);
1895         }
1896
1897         list_add_tail(&expr->te_linkage, cond_list);
1898         return 0;
1899 out:
1900         OBD_FREE_PTR(expr);
1901         return rc;
1902 }
1903
1904 static int
1905 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1906 {
1907         struct nrs_tbf_conjunction *conjunction;
1908         struct cfs_lstr expr;
1909         int rc = 0;
1910
1911         OBD_ALLOC_PTR(conjunction);
1912         if (conjunction == NULL)
1913                 return -ENOMEM;
1914
1915         INIT_LIST_HEAD(&conjunction->tc_expressions);
1916         list_add_tail(&conjunction->tc_linkage, cond_list);
1917
1918         while (src->ls_str) {
1919                 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1920                 if (rc == 0) {
1921                         rc = -EINVAL;
1922                         break;
1923                 }
1924                 rc = nrs_tbf_expression_parse(&expr,
1925                                               &conjunction->tc_expressions);
1926                 if (rc)
1927                         break;
1928         }
1929         return rc;
1930 }
1931
1932 static int
1933 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1934 {
1935         struct cfs_lstr src;
1936         struct cfs_lstr res;
1937         int rc = 0;
1938
1939         src.ls_str = str;
1940         src.ls_len = len;
1941         INIT_LIST_HEAD(cond_list);
1942         while (src.ls_str) {
1943                 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1944                 if (rc == 0) {
1945                         rc = -EINVAL;
1946                         break;
1947                 }
1948                 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1949                 if (rc)
1950                         break;
1951         }
1952         return rc;
1953 }
1954
1955 static int
1956 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1957 {
1958         int rc;
1959
1960         OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1961         if (cmd->u.tc_start.ts_conds_str == NULL)
1962                 return -ENOMEM;
1963
1964         memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1965
1966         /* Parse hybird NID and JOBID conditions */
1967         rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1968                                  strlen(cmd->u.tc_start.ts_conds_str),
1969                                  &cmd->u.tc_start.ts_conds);
1970         if (rc)
1971                 nrs_tbf_generic_cmd_fini(cmd);
1972
1973         return rc;
1974 }
1975
1976 static int
1977 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1978
1979 static int
1980 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1981                          struct nrs_tbf_rule *rule,
1982                          struct nrs_tbf_client *cli)
1983 {
1984         switch (expr->te_field) {
1985         case NRS_TBF_FIELD_NID:
1986                 return cfs_match_nid(&cli->tc_nid, &expr->te_cond);
1987         case NRS_TBF_FIELD_JOBID:
1988                 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1989         case NRS_TBF_FIELD_OPCODE:
1990                 return test_bit(cli->tc_opcode, expr->te_opcodes);
1991         case NRS_TBF_FIELD_UID:
1992         case NRS_TBF_FIELD_GID:
1993                 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1994         default:
1995                 return 0;
1996         }
1997 }
1998
1999 static int
2000 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
2001                           struct nrs_tbf_rule *rule,
2002                           struct nrs_tbf_client *cli)
2003 {
2004         struct nrs_tbf_expression *expr;
2005         int matched;
2006
2007         list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
2008                 matched = nrs_tbf_expression_match(expr, rule, cli);
2009                 if (!matched)
2010                         return 0;
2011         }
2012
2013         return 1;
2014 }
2015
2016 static int
2017 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
2018 {
2019         struct nrs_tbf_conjunction *conjunction;
2020         int matched;
2021
2022         list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2023                 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2024                 if (matched)
2025                         return 1;
2026         }
2027
2028         return 0;
2029 }
2030
2031 static void
2032 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2033 {
2034         if (!list_empty(&rule->tr_conds))
2035                 nrs_tbf_conds_free(&rule->tr_conds);
2036         LASSERT(rule->tr_conds_str != NULL);
2037         OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2038 }
2039
2040 static int
2041 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2042                   struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2043 {
2044         int rc = 0;
2045
2046         LASSERT(start->u.tc_start.ts_conds_str);
2047         OBD_ALLOC(rule->tr_conds_str,
2048                   strlen(start->u.tc_start.ts_conds_str) + 1);
2049         if (rule->tr_conds_str == NULL)
2050                 return -ENOMEM;
2051
2052         memcpy(rule->tr_conds_str,
2053                start->u.tc_start.ts_conds_str,
2054                strlen(start->u.tc_start.ts_conds_str));
2055
2056         INIT_LIST_HEAD(&rule->tr_conds);
2057         if (!list_empty(&start->u.tc_start.ts_conds)) {
2058                 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2059                                          strlen(rule->tr_conds_str),
2060                                          &rule->tr_conds);
2061         }
2062         if (rc)
2063                 nrs_tbf_generic_rule_fini(rule);
2064
2065         return rc;
2066 }
2067
2068 static int
2069 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2070 {
2071         seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
2072                    rule->tr_conds_str, rule->tr_rpc_rate,
2073                    atomic_read(&rule->tr_ref) - 1);
2074         return 0;
2075 }
2076
2077 static int
2078 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2079                            struct nrs_tbf_client *cli)
2080 {
2081         return nrs_tbf_cond_match(rule, cli);
2082 }
2083
2084 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2085         .o_name = NRS_TBF_TYPE_GENERIC,
2086         .o_startup = nrs_tbf_startup,
2087         .o_cli_find = nrs_tbf_cli_find,
2088         .o_cli_findadd = nrs_tbf_cli_findadd,
2089         .o_cli_put = nrs_tbf_cli_put,
2090         .o_cli_init = nrs_tbf_generic_cli_init,
2091         .o_rule_init = nrs_tbf_rule_init,
2092         .o_rule_dump = nrs_tbf_generic_rule_dump,
2093         .o_rule_match = nrs_tbf_generic_rule_match,
2094         .o_rule_fini = nrs_tbf_generic_rule_fini,
2095 };
2096
2097 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2098 {
2099         if (rule->tr_opcodes)
2100                 bitmap_free(rule->tr_opcodes);
2101
2102         LASSERT(rule->tr_opcodes_str != NULL);
2103         OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2104 }
2105
2106 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2107                                         unsigned mask)
2108 {
2109         return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2110 }
2111
2112 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2113 {
2114         const __u32     *opc = key;
2115         struct nrs_tbf_client *cli = hlist_entry(hnode,
2116                                                  struct nrs_tbf_client,
2117                                                  tc_hnode);
2118
2119         return *opc == cli->tc_opcode;
2120 }
2121
2122 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2123 {
2124         struct nrs_tbf_client *cli = hlist_entry(hnode,
2125                                                  struct nrs_tbf_client,
2126                                                  tc_hnode);
2127
2128         return &cli->tc_opcode;
2129 }
2130
2131 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2132                                    struct hlist_node *hnode)
2133 {
2134         struct nrs_tbf_client *cli = hlist_entry(hnode,
2135                                                  struct nrs_tbf_client,
2136                                                  tc_hnode);
2137
2138         atomic_inc(&cli->tc_ref);
2139 }
2140
2141 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2142                                    struct hlist_node *hnode)
2143 {
2144         struct nrs_tbf_client *cli = hlist_entry(hnode,
2145                                                  struct nrs_tbf_client,
2146                                                  tc_hnode);
2147
2148         atomic_dec(&cli->tc_ref);
2149 }
2150
2151 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2152                                     struct hlist_node *hnode)
2153 {
2154         struct nrs_tbf_client *cli = hlist_entry(hnode,
2155                                                  struct nrs_tbf_client,
2156                                                  tc_hnode);
2157
2158         LASSERTF(atomic_read(&cli->tc_ref) == 0,
2159                  "Busy TBF object from client with opcode %s, with %d refs\n",
2160                  ll_opcode2str(cli->tc_opcode),
2161                  atomic_read(&cli->tc_ref));
2162
2163         nrs_tbf_cli_fini(cli);
2164 }
2165 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2166         .hs_hash        = nrs_tbf_opcode_hop_hash,
2167         .hs_keycmp      = nrs_tbf_opcode_hop_keycmp,
2168         .hs_key         = nrs_tbf_opcode_hop_key,
2169         .hs_object      = nrs_tbf_hop_object,
2170         .hs_get         = nrs_tbf_opcode_hop_get,
2171         .hs_put         = nrs_tbf_opcode_hop_put,
2172         .hs_put_locked  = nrs_tbf_opcode_hop_put,
2173         .hs_exit        = nrs_tbf_opcode_hop_exit,
2174 };
2175
2176 static int
2177 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2178                     struct nrs_tbf_head *head)
2179 {
2180         struct nrs_tbf_cmd      start = { 0 };
2181         int rc;
2182
2183         head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2184                                             NRS_TBF_NID_BITS,
2185                                             NRS_TBF_NID_BITS,
2186                                             NRS_TBF_NID_BKT_BITS, 0,
2187                                             CFS_HASH_MIN_THETA,
2188                                             CFS_HASH_MAX_THETA,
2189                                             &nrs_tbf_opcode_hash_ops,
2190                                             CFS_HASH_RW_BKTLOCK);
2191         if (head->th_cli_hash == NULL)
2192                 return -ENOMEM;
2193
2194         start.u.tc_start.ts_opcodes_str = "*";
2195
2196         start.u.tc_start.ts_rpc_rate = tbf_rate;
2197         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2198         start.tc_name = NRS_TBF_DEFAULT_RULE;
2199         rc = nrs_tbf_rule_start(policy, head, &start);
2200
2201         return rc;
2202 }
2203
2204 static struct nrs_tbf_client *
2205 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2206                         struct ptlrpc_request *req)
2207 {
2208         __u32 opc;
2209
2210         opc = lustre_msg_get_opc(req->rq_reqmsg);
2211         return cfs_hash_lookup(head->th_cli_hash, &opc);
2212 }
2213
2214 static struct nrs_tbf_client *
2215 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2216                            struct nrs_tbf_client *cli)
2217 {
2218         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2219                                        &cli->tc_hnode);
2220 }
2221
2222 static void
2223 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2224                         struct ptlrpc_request *req)
2225 {
2226         cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2227 }
2228
2229 #define MAX_OPCODE_LEN  32
2230 static int
2231 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, unsigned long *opcodes)
2232 {
2233         int     op = 0;
2234         char    opcode_str[MAX_OPCODE_LEN];
2235
2236         if (id->ls_len + 1 > MAX_OPCODE_LEN)
2237                 return -EINVAL;
2238
2239         memcpy(opcode_str, id->ls_str, id->ls_len);
2240         opcode_str[id->ls_len] = '\0';
2241
2242         op = ll_str2opcode(opcode_str);
2243         if (op < 0)
2244                 return -EINVAL;
2245
2246         set_bit(op, opcodes);
2247         return 0;
2248 }
2249
2250 static int
2251 nrs_tbf_opcode_list_parse(char *str, int len, unsigned long **bitmaptr)
2252 {
2253         unsigned long *opcodes;
2254         struct cfs_lstr src;
2255         struct cfs_lstr res;
2256         int rc = 0;
2257
2258         ENTRY;
2259         opcodes = bitmap_zalloc(LUSTRE_MAX_OPCODES, GFP_KERNEL);
2260         if (!opcodes)
2261                 return -ENOMEM;
2262
2263         src.ls_str = str;
2264         src.ls_len = len;
2265         while (src.ls_str) {
2266                 rc = cfs_gettok(&src, ' ', &res);
2267                 if (rc == 0) {
2268                         rc = -EINVAL;
2269                         break;
2270                 }
2271                 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2272                 if (rc)
2273                         break;
2274         }
2275
2276         if (rc == 0 && bitmaptr)
2277                 *bitmaptr = opcodes;
2278         else
2279                 bitmap_free(opcodes);
2280
2281         RETURN(rc);
2282 }
2283
2284 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2285 {
2286         if (cmd->u.tc_start.ts_opcodes_str)
2287                 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2288                          strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2289
2290 }
2291
2292 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2293 {
2294         struct cfs_lstr src;
2295         int rc;
2296
2297         src.ls_str = id;
2298         src.ls_len = strlen(id);
2299         rc = nrs_tbf_check_id_value(&src, "opcode");
2300         if (rc)
2301                 return rc;
2302
2303         OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2304         if (cmd->u.tc_start.ts_opcodes_str == NULL)
2305                 return -ENOMEM;
2306
2307         memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2308
2309         /* parse opcode list */
2310         rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2311                                        strlen(cmd->u.tc_start.ts_opcodes_str),
2312                                        NULL);
2313         if (rc)
2314                 nrs_tbf_opcode_cmd_fini(cmd);
2315
2316         return rc;
2317 }
2318
2319 static int
2320 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2321                           struct nrs_tbf_client *cli)
2322 {
2323         if (rule->tr_opcodes == NULL)
2324                 return 0;
2325
2326         return test_bit(cli->tc_opcode, rule->tr_opcodes);
2327 }
2328
2329 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2330                                     struct nrs_tbf_rule *rule,
2331                                     struct nrs_tbf_cmd *start)
2332 {
2333         int rc = 0;
2334
2335         LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2336         OBD_ALLOC(rule->tr_opcodes_str,
2337                   strlen(start->u.tc_start.ts_opcodes_str) + 1);
2338         if (rule->tr_opcodes_str == NULL)
2339                 return -ENOMEM;
2340
2341         strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2342                 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2343
2344         /* Default rule '*' */
2345         if (strcmp(start->u.tc_start.ts_opcodes_str, "*") == 0)
2346                 return 0;
2347
2348         rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2349                                        strlen(rule->tr_opcodes_str),
2350                                        &rule->tr_opcodes);
2351         if (rc)
2352                 OBD_FREE(rule->tr_opcodes_str,
2353                          strlen(start->u.tc_start.ts_opcodes_str) + 1);
2354
2355         return rc;
2356 }
2357
2358 static int
2359 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2360 {
2361         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2362                    rule->tr_opcodes_str, rule->tr_rpc_rate,
2363                    atomic_read(&rule->tr_ref) - 1);
2364         return 0;
2365 }
2366
2367
2368 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2369         .o_name = NRS_TBF_TYPE_OPCODE,
2370         .o_startup = nrs_tbf_opcode_startup,
2371         .o_cli_find = nrs_tbf_opcode_cli_find,
2372         .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2373         .o_cli_put = nrs_tbf_nid_cli_put,
2374         .o_cli_init = nrs_tbf_opcode_cli_init,
2375         .o_rule_init = nrs_tbf_opcode_rule_init,
2376         .o_rule_dump = nrs_tbf_opcode_rule_dump,
2377         .o_rule_match = nrs_tbf_opcode_rule_match,
2378         .o_rule_fini = nrs_tbf_opcode_rule_fini,
2379 };
2380
2381 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2382                                     unsigned mask)
2383 {
2384         return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2385 }
2386
2387 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2388 {
2389         const struct tbf_id *opc = key;
2390         enum nrs_tbf_flag ntf;
2391         struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2392                                                  tc_hnode);
2393         ntf = opc->ti_type & cli->tc_id.ti_type;
2394         if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2395                 return 0;
2396
2397         if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2398                 return 0;
2399
2400         return 1;
2401 }
2402
2403 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2404 {
2405         struct nrs_tbf_client *cli = hlist_entry(hnode,
2406                                                  struct nrs_tbf_client,
2407                                                  tc_hnode);
2408         return &cli->tc_id;
2409 }
2410
2411 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2412 {
2413         struct nrs_tbf_client *cli = hlist_entry(hnode,
2414                                                  struct nrs_tbf_client,
2415                                                  tc_hnode);
2416
2417         atomic_inc(&cli->tc_ref);
2418 }
2419
2420 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2421 {
2422         struct nrs_tbf_client *cli = hlist_entry(hnode,
2423                                                  struct nrs_tbf_client,
2424                                                  tc_hnode);
2425
2426         atomic_dec(&cli->tc_ref);
2427 }
2428
2429 static void
2430 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2431
2432 {
2433         struct nrs_tbf_client *cli = hlist_entry(hnode,
2434                                                  struct nrs_tbf_client,
2435                                                  tc_hnode);
2436
2437         LASSERT(atomic_read(&cli->tc_ref) == 0);
2438         nrs_tbf_cli_fini(cli);
2439 }
2440
2441 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2442         .hs_hash        = nrs_tbf_id_hop_hash,
2443         .hs_keycmp      = nrs_tbf_id_hop_keycmp,
2444         .hs_key         = nrs_tbf_id_hop_key,
2445         .hs_object      = nrs_tbf_hop_object,
2446         .hs_get         = nrs_tbf_id_hop_get,
2447         .hs_put         = nrs_tbf_id_hop_put,
2448         .hs_put_locked  = nrs_tbf_id_hop_put,
2449         .hs_exit        = nrs_tbf_id_hop_exit,
2450 };
2451
2452 static int
2453 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2454                    struct nrs_tbf_head *head)
2455 {
2456         struct nrs_tbf_cmd start;
2457         int rc;
2458
2459         head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2460                                             NRS_TBF_NID_BITS,
2461                                             NRS_TBF_NID_BITS,
2462                                             NRS_TBF_NID_BKT_BITS, 0,
2463                                             CFS_HASH_MIN_THETA,
2464                                             CFS_HASH_MAX_THETA,
2465                                             &nrs_tbf_id_hash_ops,
2466                                             CFS_HASH_RW_BKTLOCK);
2467         if (head->th_cli_hash == NULL)
2468                 return -ENOMEM;
2469
2470         memset(&start, 0, sizeof(start));
2471         start.u.tc_start.ts_ids_str = "*";
2472         start.u.tc_start.ts_rpc_rate = tbf_rate;
2473         start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2474         start.tc_name = NRS_TBF_DEFAULT_RULE;
2475         INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2476         rc = nrs_tbf_rule_start(policy, head, &start);
2477         if (rc) {
2478                 cfs_hash_putref(head->th_cli_hash);
2479                 head->th_cli_hash = NULL;
2480         }
2481
2482         return rc;
2483 }
2484
2485 static struct nrs_tbf_client *
2486 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2487                     struct ptlrpc_request *req)
2488 {
2489         struct tbf_id id;
2490
2491         LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2492                 head->th_type_flag == NRS_TBF_FLAG_GID);
2493
2494         nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2495         return cfs_hash_lookup(head->th_cli_hash, &id);
2496 }
2497
2498 static struct nrs_tbf_client *
2499 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2500                        struct nrs_tbf_client *cli)
2501 {
2502         return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2503                                        &cli->tc_hnode);
2504 }
2505
2506 static void
2507 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2508                      struct ptlrpc_request *req)
2509 {
2510         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2511 }
2512
2513 static void
2514 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2515                      struct ptlrpc_request *req)
2516 {
2517         nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2518 }
2519
2520 static int
2521 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2522 {
2523         struct nrs_tbf_id *nti_id;
2524         enum nrs_tbf_flag flag;
2525
2526         list_for_each_entry(nti_id, id_list, nti_linkage) {
2527                 flag = id.ti_type & nti_id->nti_id.ti_type;
2528                 if (!flag)
2529                         continue;
2530
2531                 if ((flag & NRS_TBF_FLAG_UID) &&
2532                     (id.ti_uid != nti_id->nti_id.ti_uid))
2533                         continue;
2534
2535                 if ((flag & NRS_TBF_FLAG_GID) &&
2536                     (id.ti_gid != nti_id->nti_id.ti_gid))
2537                         continue;
2538
2539                 return 1;
2540         }
2541         return 0;
2542 }
2543
2544 static int
2545 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2546                       struct nrs_tbf_client *cli)
2547 {
2548         return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2549 }
2550
2551 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2552 {
2553         nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2554
2555         if (cmd->u.tc_start.ts_ids_str)
2556                 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2557                          strlen(cmd->u.tc_start.ts_ids_str) + 1);
2558 }
2559
2560 static int
2561 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
2562                       enum nrs_tbf_flag tif)
2563 {
2564         struct cfs_lstr src;
2565         struct cfs_lstr res;
2566         int rc = 0;
2567         struct tbf_id id = { 0 };
2568         ENTRY;
2569
2570         if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2571                 RETURN(-EINVAL);
2572
2573         src.ls_str = str;
2574         src.ls_len = len;
2575         INIT_LIST_HEAD(id_list);
2576         while (src.ls_str) {
2577                 struct nrs_tbf_id *nti_id;
2578
2579                 if (cfs_gettok(&src, ' ', &res) == 0)
2580                         GOTO(out, rc = -EINVAL);
2581
2582                 id.ti_type = tif;
2583                 if (tif == NRS_TBF_FLAG_UID) {
2584                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2585                                                &id.ti_uid, 0, (u32)~0U))
2586                                 GOTO(out, rc = -EINVAL);
2587                 } else {
2588                         if (!cfs_str2num_check(res.ls_str, res.ls_len,
2589                                                &id.ti_gid, 0, (u32)~0U))
2590                                 GOTO(out, rc = -EINVAL);
2591                 }
2592
2593                 OBD_ALLOC_PTR(nti_id);
2594                 if (nti_id == NULL)
2595                         GOTO(out, rc = -ENOMEM);
2596
2597                 nti_id->nti_id = id;
2598                 list_add_tail(&nti_id->nti_linkage, id_list);
2599         }
2600 out:
2601         if (rc)
2602                 nrs_tbf_id_list_free(id_list);
2603         RETURN(rc);
2604 }
2605
2606 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2607 {
2608         struct cfs_lstr src;
2609         int rc;
2610         enum nrs_tbf_flag tif;
2611
2612         tif = cmd->u.tc_start.ts_valid_type;
2613
2614         src.ls_str = id;
2615         src.ls_len = strlen(id);
2616
2617         rc = nrs_tbf_check_id_value(&src,
2618                                     tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2619         if (rc)
2620                 return rc;
2621
2622         OBD_ALLOC(cmd->u.tc_start.ts_ids_str, src.ls_len + 1);
2623         if (cmd->u.tc_start.ts_ids_str == NULL)
2624                 return -ENOMEM;
2625
2626         strlcpy(cmd->u.tc_start.ts_ids_str, src.ls_str, src.ls_len + 1);
2627
2628         rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2629                                    strlen(cmd->u.tc_start.ts_ids_str),
2630                                    &cmd->u.tc_start.ts_ids, tif);
2631         if (rc)
2632                 nrs_tbf_id_cmd_fini(cmd);
2633
2634         return rc;
2635 }
2636
2637 static int
2638 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2639                      struct nrs_tbf_rule *rule,
2640                      struct nrs_tbf_cmd *start)
2641 {
2642         struct nrs_tbf_head *head = rule->tr_head;
2643         int rc = 0;
2644         enum nrs_tbf_flag tif = head->th_type_flag;
2645         int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2646
2647         LASSERT(start->u.tc_start.ts_ids_str);
2648         INIT_LIST_HEAD(&rule->tr_ids);
2649
2650         OBD_ALLOC(rule->tr_ids_str, ids_len);
2651         if (rule->tr_ids_str == NULL)
2652                 return -ENOMEM;
2653
2654         strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2655                 ids_len);
2656
2657         if (!list_empty(&start->u.tc_start.ts_ids)) {
2658                 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2659                                            strlen(rule->tr_ids_str),
2660                                            &rule->tr_ids, tif);
2661                 if (rc)
2662                         CERROR("%ss {%s} illegal\n",
2663                                tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2664                                rule->tr_ids_str);
2665         }
2666         if (rc) {
2667                 OBD_FREE(rule->tr_ids_str, ids_len);
2668                 rule->tr_ids_str = NULL;
2669         }
2670         return rc;
2671 }
2672
2673 static int
2674 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2675 {
2676         seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2677                    rule->tr_ids_str, rule->tr_rpc_rate,
2678                    atomic_read(&rule->tr_ref) - 1);
2679         return 0;
2680 }
2681
2682 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2683 {
2684         nrs_tbf_id_list_free(&rule->tr_ids);
2685         if (rule->tr_ids_str != NULL)
2686                 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2687 }
2688
2689 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2690         .o_name = NRS_TBF_TYPE_UID,
2691         .o_startup = nrs_tbf_id_startup,
2692         .o_cli_find = nrs_tbf_id_cli_find,
2693         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2694         .o_cli_put = nrs_tbf_nid_cli_put,
2695         .o_cli_init = nrs_tbf_uid_cli_init,
2696         .o_rule_init = nrs_tbf_id_rule_init,
2697         .o_rule_dump = nrs_tbf_id_rule_dump,
2698         .o_rule_match = nrs_tbf_id_rule_match,
2699         .o_rule_fini = nrs_tbf_id_rule_fini,
2700 };
2701
2702 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2703         .o_name = NRS_TBF_TYPE_GID,
2704         .o_startup = nrs_tbf_id_startup,
2705         .o_cli_find = nrs_tbf_id_cli_find,
2706         .o_cli_findadd = nrs_tbf_id_cli_findadd,
2707         .o_cli_put = nrs_tbf_nid_cli_put,
2708         .o_cli_init = nrs_tbf_gid_cli_init,
2709         .o_rule_init = nrs_tbf_id_rule_init,
2710         .o_rule_dump = nrs_tbf_id_rule_dump,
2711         .o_rule_match = nrs_tbf_id_rule_match,
2712         .o_rule_fini = nrs_tbf_id_rule_fini,
2713 };
2714
2715 static struct nrs_tbf_type nrs_tbf_types[] = {
2716         {
2717                 .ntt_name = NRS_TBF_TYPE_JOBID,
2718                 .ntt_flag = NRS_TBF_FLAG_JOBID,
2719                 .ntt_ops = &nrs_tbf_jobid_ops,
2720         },
2721         {
2722                 .ntt_name = NRS_TBF_TYPE_NID,
2723                 .ntt_flag = NRS_TBF_FLAG_NID,
2724                 .ntt_ops = &nrs_tbf_nid_ops,
2725         },
2726         {
2727                 .ntt_name = NRS_TBF_TYPE_OPCODE,
2728                 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2729                 .ntt_ops = &nrs_tbf_opcode_ops,
2730         },
2731         {
2732                 .ntt_name = NRS_TBF_TYPE_GENERIC,
2733                 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2734                 .ntt_ops = &nrs_tbf_generic_ops,
2735         },
2736         {
2737                 .ntt_name = NRS_TBF_TYPE_UID,
2738                 .ntt_flag = NRS_TBF_FLAG_UID,
2739                 .ntt_ops = &nrs_tbf_uid_ops,
2740         },
2741         {
2742                 .ntt_name = NRS_TBF_TYPE_GID,
2743                 .ntt_flag = NRS_TBF_FLAG_GID,
2744                 .ntt_ops = &nrs_tbf_gid_ops,
2745         },
2746 };
2747
2748 /**
2749  * Is called before the policy transitions into
2750  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2751  * policy-specific private data structure.
2752  *
2753  * \param[in] policy The policy to start
2754  *
2755  * \retval -ENOMEM OOM error
2756  * \retval  0      success
2757  *
2758  * \see nrs_policy_register()
2759  * \see nrs_policy_ctl()
2760  */
2761 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2762 {
2763         struct nrs_tbf_head     *head;
2764         struct nrs_tbf_ops      *ops;
2765         __u32                    type;
2766         char                    *name;
2767         int found = 0;
2768         int i;
2769         int rc = 0;
2770
2771         if (arg == NULL)
2772                 name = NRS_TBF_TYPE_GENERIC;
2773         else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2774                 name = arg;
2775         else
2776                 GOTO(out, rc = -EINVAL);
2777
2778         for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2779                 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2780                         ops = nrs_tbf_types[i].ntt_ops;
2781                         type = nrs_tbf_types[i].ntt_flag;
2782                         found = 1;
2783                         break;
2784                 }
2785         }
2786         if (found == 0)
2787                 GOTO(out, rc = -ENOTSUPP);
2788
2789         OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2790         if (head == NULL)
2791                 GOTO(out, rc = -ENOMEM);
2792
2793         memcpy(head->th_type, name, strlen(name));
2794         head->th_type[strlen(name)] = '\0';
2795         head->th_ops = ops;
2796         head->th_type_flag = type;
2797
2798         head->th_binheap = binheap_create(&nrs_tbf_heap_ops,
2799                                           CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2800                                           nrs_pol2cptab(policy),
2801                                           nrs_pol2cptid(policy));
2802         if (head->th_binheap == NULL)
2803                 GOTO(out_free_head, rc = -ENOMEM);
2804
2805         atomic_set(&head->th_rule_sequence, 0);
2806         spin_lock_init(&head->th_rule_lock);
2807         INIT_LIST_HEAD(&head->th_list);
2808         hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2809         head->th_timer.function = nrs_tbf_timer_cb;
2810         rc = head->th_ops->o_startup(policy, head);
2811         if (rc)
2812                 GOTO(out_free_heap, rc);
2813
2814         policy->pol_private = head;
2815         return 0;
2816 out_free_heap:
2817         binheap_destroy(head->th_binheap);
2818 out_free_head:
2819         OBD_FREE_PTR(head);
2820 out:
2821         return rc;
2822 }
2823
2824 /**
2825  * Is called before the policy transitions into
2826  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2827  * private data structure.
2828  *
2829  * \param[in] policy The policy to stop
2830  *
2831  * \see nrs_policy_stop0()
2832  */
2833 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2834 {
2835         struct nrs_tbf_head *head = policy->pol_private;
2836         struct ptlrpc_nrs *nrs = policy->pol_nrs;
2837         struct nrs_tbf_rule *rule, *n;
2838
2839         LASSERT(head != NULL);
2840         LASSERT(head->th_cli_hash != NULL);
2841         hrtimer_cancel(&head->th_timer);
2842         /* Should cleanup hash first before free rules */
2843         cfs_hash_putref(head->th_cli_hash);
2844         list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2845                 list_del_init(&rule->tr_linkage);
2846                 nrs_tbf_rule_put(rule);
2847         }
2848         LASSERT(list_empty(&head->th_list));
2849         LASSERT(head->th_binheap != NULL);
2850         LASSERT(binheap_is_empty(head->th_binheap));
2851         binheap_destroy(head->th_binheap);
2852         OBD_FREE_PTR(head);
2853         nrs->nrs_throttling = 0;
2854         wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2855 }
2856
2857 /**
2858  * Performs a policy-specific ctl function on TBF policy instances; similar
2859  * to ioctl.
2860  *
2861  * \param[in]     policy the policy instance
2862  * \param[in]     opc    the opcode
2863  * \param[in,out] arg    used for passing parameters and information
2864  *
2865  * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2866  * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2867  *
2868  * \retval 0   operation carried out successfully
2869  * \retval -ve error
2870  */
2871 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2872                        enum ptlrpc_nrs_ctl opc,
2873                        void *arg)
2874 {
2875         int rc = 0;
2876         ENTRY;
2877
2878         assert_spin_locked(&policy->pol_nrs->nrs_lock);
2879
2880         switch (opc) {
2881         default:
2882                 RETURN(-EINVAL);
2883
2884         /**
2885          * Read RPC rate size of a policy instance.
2886          */
2887         case NRS_CTL_TBF_RD_RULE: {
2888                 struct nrs_tbf_head *head = policy->pol_private;
2889                 struct seq_file *m = arg;
2890                 struct ptlrpc_service_part *svcpt;
2891
2892                 svcpt = policy->pol_nrs->nrs_svcpt;
2893                 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2894
2895                 rc = nrs_tbf_rule_dump_all(head, m);
2896                 }
2897                 break;
2898
2899         /**
2900          * Write RPC rate of a policy instance.
2901          */
2902         case NRS_CTL_TBF_WR_RULE: {
2903                 struct nrs_tbf_head *head = policy->pol_private;
2904                 struct nrs_tbf_cmd *cmd;
2905
2906                 cmd = (struct nrs_tbf_cmd *)arg;
2907                 rc = nrs_tbf_command(policy,
2908                                      head,
2909                                      cmd);
2910                 }
2911                 break;
2912         /**
2913          * Read the TBF policy type of a policy instance.
2914          */
2915         case NRS_CTL_TBF_RD_TYPE_FLAG: {
2916                 struct nrs_tbf_head *head = policy->pol_private;
2917
2918                 *(__u32 *)arg = head->th_type_flag;
2919                 }
2920                 break;
2921         }
2922
2923         RETURN(rc);
2924 }
2925
2926 /**
2927  * Is called for obtaining a TBF policy resource.
2928  *
2929  * \param[in]  policy     The policy on which the request is being asked for
2930  * \param[in]  nrq        The request for which resources are being taken
2931  * \param[in]  parent     Parent resource, unused in this policy
2932  * \param[out] resp       Resources references are placed in this array
2933  * \param[in]  moving_req Signifies limited caller context; unused in this
2934  *                        policy
2935  *
2936  *
2937  * \see nrs_resource_get_safe()
2938  */
2939 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2940                            struct ptlrpc_nrs_request *nrq,
2941                            const struct ptlrpc_nrs_resource *parent,
2942                            struct ptlrpc_nrs_resource **resp,
2943                            bool moving_req)
2944 {
2945         struct nrs_tbf_head   *head;
2946         struct nrs_tbf_client *cli;
2947         struct nrs_tbf_client *tmp;
2948         struct ptlrpc_request *req;
2949
2950         if (parent == NULL) {
2951                 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2952                 return 0;
2953         }
2954
2955         head = container_of(parent, struct nrs_tbf_head, th_res);
2956         req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2957         cli = head->th_ops->o_cli_find(head, req);
2958         if (cli != NULL) {
2959                 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2960                 LASSERT(cli->tc_rule);
2961                 if (cli->tc_rule_sequence !=
2962                     atomic_read(&head->th_rule_sequence) ||
2963                     cli->tc_rule->tr_flags & NTRS_STOPPING) {
2964                         struct nrs_tbf_rule *rule;
2965
2966                         CDEBUG(D_RPCTRACE,
2967                                "TBF class@%p rate %llu sequence %d, "
2968                                "rule flags %d, head sequence %d\n",
2969                                cli, cli->tc_rpc_rate,
2970                                cli->tc_rule_sequence,
2971                                cli->tc_rule->tr_flags,
2972                                atomic_read(&head->th_rule_sequence));
2973                         rule = nrs_tbf_rule_match(head, cli);
2974                         if (rule != cli->tc_rule) {
2975                                 nrs_tbf_cli_reset(head, rule, cli);
2976                         } else {
2977                                 if (cli->tc_rule_generation != rule->tr_generation)
2978                                         nrs_tbf_cli_reset_value(head, cli);
2979                                 nrs_tbf_rule_put(rule);
2980                         }
2981                 } else if (cli->tc_rule_generation !=
2982                            cli->tc_rule->tr_generation) {
2983                         nrs_tbf_cli_reset_value(head, cli);
2984                 }
2985                 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2986                 goto out;
2987         }
2988
2989         OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2990                           sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2991         if (cli == NULL)
2992                 return -ENOMEM;
2993
2994         nrs_tbf_cli_init(head, cli, req);
2995         tmp = head->th_ops->o_cli_findadd(head, cli);
2996         if (tmp != cli) {
2997                 atomic_dec(&cli->tc_ref);
2998                 nrs_tbf_cli_fini(cli);
2999                 cli = tmp;
3000         }
3001 out:
3002         *resp = &cli->tc_res;
3003
3004         return 1;
3005 }
3006
3007 /**
3008  * Called when releasing references to the resource hierachy obtained for a
3009  * request for scheduling using the TBF policy.
3010  *
3011  * \param[in] policy   the policy the resource belongs to
3012  * \param[in] res      the resource to be released
3013  */
3014 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
3015                             const struct ptlrpc_nrs_resource *res)
3016 {
3017         struct nrs_tbf_head   *head;
3018         struct nrs_tbf_client *cli;
3019
3020         /**
3021          * Do nothing for freeing parent, nrs_tbf_net resources
3022          */
3023         if (res->res_parent == NULL)
3024                 return;
3025
3026         cli = container_of(res, struct nrs_tbf_client, tc_res);
3027         head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
3028
3029         head->th_ops->o_cli_put(head, cli);
3030 }
3031
3032 /**
3033  * Called when getting a request from the TBF policy for handling, or just
3034  * peeking; removes the request from the policy when it is to be handled.
3035  *
3036  * \param[in] policy The policy
3037  * \param[in] peek   When set, signifies that we just want to examine the
3038  *                   request, and not handle it, so the request is not removed
3039  *                   from the policy.
3040  * \param[in] force  Force the policy to return a request
3041  *
3042  * \retval The request to be handled; this is the next request in the TBF
3043  *         rule
3044  *
3045  * \see ptlrpc_nrs_req_get_nolock()
3046  * \see nrs_request_get()
3047  */
3048 static
3049 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3050                                            bool peek, bool force)
3051 {
3052         struct nrs_tbf_head       *head = policy->pol_private;
3053         struct ptlrpc_nrs_request *nrq = NULL;
3054         struct nrs_tbf_client     *cli;
3055         struct binheap_node       *node;
3056
3057         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3058
3059         if (likely(!peek && !force) && policy->pol_nrs->nrs_throttling)
3060                 return NULL;
3061
3062         node = binheap_root(head->th_binheap);
3063         if (unlikely(node == NULL))
3064                 return NULL;
3065
3066         cli = container_of(node, struct nrs_tbf_client, tc_node);
3067         LASSERT(cli->tc_in_heap);
3068         if (unlikely(peek)) {
3069                 nrq = list_entry(cli->tc_list.next,
3070                                      struct ptlrpc_nrs_request,
3071                                      nr_u.tbf.tr_list);
3072         } else {
3073                 struct nrs_tbf_rule *rule = cli->tc_rule;
3074                 __u64 now = ktime_to_ns(ktime_get());
3075                 __u64 passed;
3076                 __u64 ntoken;
3077                 __u64 deadline;
3078                 __u64 old_resid = 0;
3079
3080                 deadline = cli->tc_check_time +
3081                           cli->tc_nsecs;
3082                 LASSERT(now >= cli->tc_check_time);
3083                 passed = now - cli->tc_check_time;
3084                 ntoken = passed * cli->tc_rpc_rate;
3085                 do_div(ntoken, NSEC_PER_SEC);
3086
3087                 ntoken += cli->tc_ntoken;
3088                 if (rule->tr_flags & NTRS_REALTIME) {
3089                         LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3090                         old_resid = cli->tc_nsecs_resid;
3091                         cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3092                         if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3093                                 ntoken++;
3094                                 cli->tc_nsecs_resid -= cli->tc_nsecs;
3095                         }
3096                 } else if (ntoken > cli->tc_depth)
3097                         ntoken = cli->tc_depth;
3098
3099                 /* give an extra token with force mode */
3100                 if (unlikely(force) && ntoken == 0)
3101                         ntoken = 1;
3102
3103                 if (ntoken > 0) {
3104                         nrq = list_entry(cli->tc_list.next,
3105                                          struct ptlrpc_nrs_request,
3106                                          nr_u.tbf.tr_list);
3107                         ntoken--;
3108                         cli->tc_ntoken = ntoken;
3109                         cli->tc_check_time = now;
3110                         list_del_init(&nrq->nr_u.tbf.tr_list);
3111                         if (list_empty(&cli->tc_list)) {
3112                                 binheap_remove(head->th_binheap,
3113                                                &cli->tc_node);
3114                                 cli->tc_in_heap = false;
3115                         } else {
3116                                 if (!(rule->tr_flags & NTRS_REALTIME))
3117                                         cli->tc_deadline = now + cli->tc_nsecs;
3118                                 binheap_relocate(head->th_binheap,
3119                                                  &cli->tc_node);
3120                         }
3121                         CDEBUG(D_RPCTRACE,
3122                                "TBF dequeues: class@%p rate %llu gen %llu token %llu, rule@%p rate %llu gen %llu\n",
3123                                cli, cli->tc_rpc_rate,
3124                                cli->tc_rule_generation, cli->tc_ntoken,
3125                                cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3126                                cli->tc_rule->tr_generation);
3127                 } else {
3128                         ktime_t time;
3129
3130                         if (rule->tr_flags & NTRS_REALTIME) {
3131                                 cli->tc_deadline = deadline;
3132                                 cli->tc_nsecs_resid = old_resid;
3133                                 binheap_relocate(head->th_binheap,
3134                                                  &cli->tc_node);
3135                                 if (node != binheap_root(head->th_binheap))
3136                                         return nrs_tbf_req_get(policy,
3137                                                                peek, force);
3138                         }
3139                         policy->pol_nrs->nrs_throttling = 1;
3140                         head->th_deadline = deadline;
3141                         time = ktime_set(0, 0);
3142                         time = ktime_add_ns(time, deadline);
3143                         hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3144                 }
3145         }
3146
3147         return nrq;
3148 }
3149
3150 /**
3151  * Adds request \a nrq to \a policy's list of queued requests
3152  *
3153  * \param[in] policy The policy
3154  * \param[in] nrq    The request to add
3155  *
3156  * \retval 0 success; nrs_request_enqueue() assumes this function will always
3157  *                    succeed
3158  */
3159 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3160                            struct ptlrpc_nrs_request *nrq)
3161 {
3162         struct nrs_tbf_head   *head;
3163         struct nrs_tbf_client *cli;
3164         int                    rc = 0;
3165
3166         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3167
3168         cli = container_of(nrs_request_resource(nrq),
3169                            struct nrs_tbf_client, tc_res);
3170         head = container_of(nrs_request_resource(nrq)->res_parent,
3171                             struct nrs_tbf_head, th_res);
3172         if (list_empty(&cli->tc_list)) {
3173                 LASSERT(!cli->tc_in_heap);
3174                 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3175                 rc = binheap_insert(head->th_binheap, &cli->tc_node);
3176                 if (rc == 0) {
3177                         cli->tc_in_heap = true;
3178                         nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3179                         list_add_tail(&nrq->nr_u.tbf.tr_list,
3180                                           &cli->tc_list);
3181                         if (policy->pol_nrs->nrs_throttling) {
3182                                 __u64 deadline = cli->tc_deadline;
3183                                 if ((head->th_deadline > deadline) &&
3184                                     (hrtimer_try_to_cancel(&head->th_timer)
3185                                      >= 0)) {
3186                                         ktime_t time;
3187                                         head->th_deadline = deadline;
3188                                         time = ktime_set(0, 0);
3189                                         time = ktime_add_ns(time, deadline);
3190                                         hrtimer_start(&head->th_timer, time,
3191                                                       HRTIMER_MODE_ABS);
3192                                 }
3193                         }
3194                 }
3195         } else {
3196                 LASSERT(cli->tc_in_heap);
3197                 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3198                 list_add_tail(&nrq->nr_u.tbf.tr_list,
3199                                   &cli->tc_list);
3200         }
3201
3202         if (rc == 0)
3203                 CDEBUG(D_RPCTRACE,
3204                        "TBF enqueues: class@%p rate %llu gen %llu token %llu, rule@%p rate %llu gen %llu\n",
3205                        cli, cli->tc_rpc_rate,
3206                        cli->tc_rule_generation, cli->tc_ntoken,
3207                        cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3208                        cli->tc_rule->tr_generation);
3209
3210         return rc;
3211 }
3212
3213 /**
3214  * Removes request \a nrq from \a policy's list of queued requests.
3215  *
3216  * \param[in] policy The policy
3217  * \param[in] nrq    The request to remove
3218  */
3219 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3220                              struct ptlrpc_nrs_request *nrq)
3221 {
3222         struct nrs_tbf_head   *head;
3223         struct nrs_tbf_client *cli;
3224
3225         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3226
3227         cli = container_of(nrs_request_resource(nrq),
3228                            struct nrs_tbf_client, tc_res);
3229         head = container_of(nrs_request_resource(nrq)->res_parent,
3230                             struct nrs_tbf_head, th_res);
3231
3232         LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3233         list_del_init(&nrq->nr_u.tbf.tr_list);
3234         if (list_empty(&cli->tc_list)) {
3235                 binheap_remove(head->th_binheap,
3236                                &cli->tc_node);
3237                 cli->tc_in_heap = false;
3238         } else {
3239                 binheap_relocate(head->th_binheap,
3240                                  &cli->tc_node);
3241         }
3242 }
3243
3244 /**
3245  * Prints a debug statement right before the request \a nrq stops being
3246  * handled.
3247  *
3248  * \param[in] policy The policy handling the request
3249  * \param[in] nrq    The request being handled
3250  *
3251  * \see ptlrpc_server_finish_request()
3252  * \see ptlrpc_nrs_req_stop_nolock()
3253  */
3254 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3255                               struct ptlrpc_nrs_request *nrq)
3256 {
3257         struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3258                                                   rq_nrq);
3259
3260         assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3261
3262         CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3263                policy->pol_desc->pd_name, libcfs_idstr(&req->rq_peer),
3264                nrq->nr_u.tbf.tr_sequence);
3265 }
3266
3267 /**
3268  * debugfs interface
3269  */
3270
3271 /**
3272  * The maximum RPC rate.
3273  */
3274 #define LPROCFS_NRS_RATE_MAX            1000000ULL      /* 1rpc/us */
3275
3276 static int
3277 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3278 {
3279         struct ptlrpc_service       *svc = m->private;
3280         int                          rc;
3281
3282         seq_printf(m, "regular_requests:\n");
3283         /**
3284          * Perform two separate calls to this as only one of the NRS heads'
3285          * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3286          * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3287          */
3288         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3289                                        NRS_POL_NAME_TBF,
3290                                        NRS_CTL_TBF_RD_RULE,
3291                                        false, m);
3292         if (rc == 0) {
3293                 /**
3294                  * -ENOSPC means buf in the parameter m is overflow, return 0
3295                  * here to let upper layer function seq_read alloc a larger
3296                  * memory area and do this process again.
3297                  */
3298         } else if (rc == -ENOSPC) {
3299                 return 0;
3300
3301                 /**
3302                  * Ignore -ENODEV as the regular NRS head's policy may be in the
3303                  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3304                  */
3305         } else if (rc != -ENODEV) {
3306                 return rc;
3307         }
3308
3309         if (!nrs_svc_has_hp(svc))
3310                 goto no_hp;
3311
3312         seq_printf(m, "high_priority_requests:\n");
3313         rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3314                                        NRS_POL_NAME_TBF,
3315                                        NRS_CTL_TBF_RD_RULE,
3316                                        false, m);
3317         if (rc == 0) {
3318                 /**
3319                  * -ENOSPC means buf in the parameter m is overflow, return 0
3320                  * here to let upper layer function seq_read alloc a larger
3321                  * memory area and do this process again.
3322                  */
3323         } else if (rc == -ENOSPC) {
3324                 return 0;
3325         }
3326
3327 no_hp:
3328
3329         return rc;
3330 }
3331
3332 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3333 {
3334         int rc;
3335         ENTRY;
3336
3337         switch (cmd->u.tc_start.ts_valid_type) {
3338         case NRS_TBF_FLAG_JOBID:
3339                 rc = nrs_tbf_jobid_parse(cmd, token);
3340                 break;
3341         case NRS_TBF_FLAG_NID:
3342                 rc = nrs_tbf_nid_parse(cmd, token);
3343                 break;
3344         case NRS_TBF_FLAG_OPCODE:
3345                 rc = nrs_tbf_opcode_parse(cmd, token);
3346                 break;
3347         case NRS_TBF_FLAG_GENERIC:
3348                 rc = nrs_tbf_generic_parse(cmd, token);
3349                 break;
3350         case NRS_TBF_FLAG_UID:
3351         case NRS_TBF_FLAG_GID:
3352                 rc = nrs_tbf_ug_id_parse(cmd, token);
3353                 break;
3354         default:
3355                 RETURN(-EINVAL);
3356         }
3357
3358         RETURN(rc);
3359 }
3360
3361 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3362 {
3363         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3364                 switch (cmd->u.tc_start.ts_valid_type) {
3365                 case NRS_TBF_FLAG_JOBID:
3366                         nrs_tbf_jobid_cmd_fini(cmd);
3367                         break;
3368                 case NRS_TBF_FLAG_NID:
3369                         nrs_tbf_nid_cmd_fini(cmd);
3370                         break;
3371                 case NRS_TBF_FLAG_OPCODE:
3372                         nrs_tbf_opcode_cmd_fini(cmd);
3373                         break;
3374                 case NRS_TBF_FLAG_GENERIC:
3375                         nrs_tbf_generic_cmd_fini(cmd);
3376                         break;
3377                 case NRS_TBF_FLAG_UID:
3378                 case NRS_TBF_FLAG_GID:
3379                         nrs_tbf_id_cmd_fini(cmd);
3380                         break;
3381                 default:
3382                         CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3383                               cmd->u.tc_start.ts_valid_type);
3384                 }
3385         }
3386 }
3387
3388 static int check_rule_name(const char *name)
3389 {
3390         int i;
3391
3392         if (name[0] == '\0')
3393                 return -EINVAL;
3394
3395         for (i = 0; name[i] != '\0' && i < MAX_TBF_NAME; i++) {
3396                 if (!isalnum(name[i]) && name[i] != '_')
3397                         return -EINVAL;
3398         }
3399
3400         if (i == MAX_TBF_NAME)
3401                 return -ENAMETOOLONG;
3402
3403         return 0;
3404 }
3405
3406 static int
3407 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3408 {
3409         char    *key;
3410         char    *val;
3411         int      rc;
3412         __u64    rate;
3413
3414         val = buffer;
3415         key = strsep(&val, "=");
3416         if (val == NULL || strlen(val) == 0)
3417                 return -EINVAL;
3418
3419         /* Key of the value pair */
3420         if (strcmp(key, "rate") == 0) {
3421                 rc = kstrtoull(val, 10, &rate);
3422                 if (rc)
3423                         return rc;
3424
3425                 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3426                         return -EINVAL;
3427
3428                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3429                         cmd->u.tc_start.ts_rpc_rate = rate;
3430                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3431                         cmd->u.tc_change.tc_rpc_rate = rate;
3432                 else
3433                         return -EINVAL;
3434         }  else if (strcmp(key, "rank") == 0) {
3435                 rc = check_rule_name(val);
3436                 if (rc)
3437                         return rc;
3438
3439                 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3440                         cmd->u.tc_start.ts_next_name = val;
3441                 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3442                         cmd->u.tc_change.tc_next_name = val;
3443                 else
3444                         return -EINVAL;
3445         } else if (strcmp(key, "realtime") == 0) {
3446                 unsigned long realtime;
3447
3448                 rc = kstrtoul(val, 10, &realtime);
3449                 if (rc)
3450                         return rc;
3451
3452                 if (realtime > 0)
3453                         cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3454         } else {
3455                 return -EINVAL;
3456         }
3457         return 0;
3458 }
3459
3460 static int
3461 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3462 {
3463         char    *val;
3464         char    *token;
3465         int      rc;
3466
3467         val = buffer;
3468         while (val != NULL && strlen(val) != 0) {
3469                 token = strsep(&val, " ");
3470                 rc = nrs_tbf_parse_value_pair(cmd, token);
3471                 if (rc)
3472                         return rc;
3473         }
3474
3475         switch (cmd->tc_cmd) {
3476         case NRS_CTL_TBF_START_RULE:
3477                 if (cmd->u.tc_start.ts_rpc_rate == 0)
3478                         cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3479                 break;
3480         case NRS_CTL_TBF_CHANGE_RULE:
3481                 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3482                     cmd->u.tc_change.tc_next_name == NULL)
3483                         return -EINVAL;
3484                 break;
3485         case NRS_CTL_TBF_STOP_RULE:
3486                 break;
3487         default:
3488                 return -EINVAL;
3489         }
3490         return 0;
3491 }
3492
3493 static struct nrs_tbf_cmd *
3494 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3495 {
3496         struct nrs_tbf_cmd *cmd;
3497         char *token;
3498         char *val;
3499         int rc = 0;
3500
3501         OBD_ALLOC_PTR(cmd);
3502         if (cmd == NULL)
3503                 GOTO(out, rc = -ENOMEM);
3504         memset(cmd, 0, sizeof(*cmd));
3505
3506         val = buffer;
3507         token = strsep(&val, " ");
3508         if (val == NULL || strlen(val) == 0)
3509                 GOTO(out_free_cmd, rc = -EINVAL);
3510
3511         /* Type of the command */
3512         if (strcmp(token, "start") == 0) {
3513                 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3514                 cmd->u.tc_start.ts_valid_type = type_flag;
3515         } else if (strcmp(token, "stop") == 0)
3516                 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3517         else if (strcmp(token, "change") == 0)
3518                 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3519         else
3520                 GOTO(out_free_cmd, rc = -EINVAL);
3521
3522         /* Name of the rule */
3523         token = strsep(&val, " ");
3524         if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE))
3525                 GOTO(out_free_cmd, rc = -EINVAL);
3526
3527         rc = check_rule_name(token);
3528         if (rc)
3529                 GOTO(out_free_cmd, rc);
3530
3531         cmd->tc_name = token;
3532
3533         if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3534                 /* List of ID */
3535                 LASSERT(val);
3536                 token = val;
3537                 val = strrchr(token, '}');
3538                 if (!val)
3539                         GOTO(out_free_cmd, rc = -EINVAL);
3540
3541                 /* Skip '}' */
3542                 val++;
3543                 if (*val == '\0') {
3544                         val = NULL;
3545                 } else if (*val == ' ') {
3546                         *val = '\0';
3547                         val++;
3548                 } else
3549                         GOTO(out_free_cmd, rc = -EINVAL);
3550
3551                 rc = nrs_tbf_id_parse(cmd, token);
3552                 if (rc)
3553                         GOTO(out_free_cmd, rc);
3554         }
3555
3556         rc = nrs_tbf_parse_value_pairs(cmd, val);
3557         if (rc)
3558                 GOTO(out_cmd_fini, rc = -EINVAL);
3559         goto out;
3560 out_cmd_fini:
3561         nrs_tbf_cmd_fini(cmd);
3562 out_free_cmd:
3563         OBD_FREE_PTR(cmd);
3564 out:
3565         if (rc)
3566                 cmd = ERR_PTR(rc);
3567         return cmd;
3568 }
3569
3570 /**
3571  * Get the TBF policy type (nid, jobid, etc) preset by
3572  * proc entry 'nrs_policies' for command buffer parsing.
3573  *
3574  * \param[in] svc the PTLRPC service
3575  * \param[in] queue the NRS queue type
3576  *
3577  * \retval the preset TBF policy type flag
3578  */
3579 static __u32
3580 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3581 {
3582         __u32   type;
3583         int     rc;
3584
3585         rc = ptlrpc_nrs_policy_control(svc, queue,
3586                                        NRS_POL_NAME_TBF,
3587                                        NRS_CTL_TBF_RD_TYPE_FLAG,
3588                                        true, &type);
3589         if (rc != 0)
3590                 type = NRS_TBF_FLAG_INVALID;
3591
3592         return type;
3593 }
3594
3595 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3596 static ssize_t
3597 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3598                                       const char __user *buffer,
3599                                       size_t count, loff_t *off)
3600 {
3601         struct seq_file *m = file->private_data;
3602         struct ptlrpc_service *svc = m->private;
3603         char *kernbuf;
3604         char *val;
3605         int rc;
3606         struct nrs_tbf_cmd *cmd;
3607         enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3608         unsigned long length;
3609         char *token;
3610
3611         OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3612         if (kernbuf == NULL)
3613                 GOTO(out, rc = -ENOMEM);
3614
3615         if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3616                 GOTO(out_free_kernbuff, rc = -EINVAL);
3617
3618         if (copy_from_user(kernbuf, buffer, count))
3619                 GOTO(out_free_kernbuff, rc = -EFAULT);
3620
3621         val = kernbuf;
3622         token = strsep(&val, " ");
3623         if (val == NULL)
3624                 GOTO(out_free_kernbuff, rc = -EINVAL);
3625
3626         if (strcmp(token, "reg") == 0) {
3627                 queue = PTLRPC_NRS_QUEUE_REG;
3628         } else if (strcmp(token, "hp") == 0) {
3629                 queue = PTLRPC_NRS_QUEUE_HP;
3630         } else {
3631                 kernbuf[strlen(token)] = ' ';
3632                 val = kernbuf;
3633         }
3634         length = strlen(val);
3635
3636         if (length == 0)
3637                 GOTO(out_free_kernbuff, rc = -EINVAL);
3638
3639         if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3640                 GOTO(out_free_kernbuff, rc = -ENODEV);
3641         else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3642                 queue = PTLRPC_NRS_QUEUE_REG;
3643
3644         cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3645         if (IS_ERR(cmd))
3646                 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3647
3648         /**
3649          * Serialize NRS core lprocfs operations with policy registration/
3650          * unregistration.
3651          */
3652         mutex_lock(&nrs_core.nrs_mutex);
3653         rc = ptlrpc_nrs_policy_control(svc, queue,
3654                                        NRS_POL_NAME_TBF,
3655                                        NRS_CTL_TBF_WR_RULE,
3656                                        false, cmd);
3657         mutex_unlock(&nrs_core.nrs_mutex);
3658
3659         nrs_tbf_cmd_fini(cmd);
3660         OBD_FREE_PTR(cmd);
3661 out_free_kernbuff:
3662         OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3663 out:
3664         return rc ? rc : count;
3665 }
3666
3667 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3668
3669 /**
3670  * Initializes a TBF policy's lprocfs interface for service \a svc
3671  *
3672  * \param[in] svc the service
3673  *
3674  * \retval 0    success
3675  * \retval != 0 error
3676  */
3677 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3678 {
3679         struct ldebugfs_vars nrs_tbf_lprocfs_vars[] = {
3680                 { .name         = "nrs_tbf_rule",
3681                   .fops         = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3682                   .data = svc },
3683                 { NULL }
3684         };
3685
3686         if (!svc->srv_debugfs_entry)
3687                 return 0;
3688
3689         ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_tbf_lprocfs_vars, NULL);
3690
3691         return 0;
3692 }
3693
3694 /**
3695  * TBF policy operations
3696  */
3697 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3698         .op_policy_start        = nrs_tbf_start,
3699         .op_policy_stop         = nrs_tbf_stop,
3700         .op_policy_ctl          = nrs_tbf_ctl,
3701         .op_res_get             = nrs_tbf_res_get,
3702         .op_res_put             = nrs_tbf_res_put,
3703         .op_req_get             = nrs_tbf_req_get,
3704         .op_req_enqueue         = nrs_tbf_req_add,
3705         .op_req_dequeue         = nrs_tbf_req_del,
3706         .op_req_stop            = nrs_tbf_req_stop,
3707         .op_lprocfs_init        = nrs_tbf_lprocfs_init,
3708 };
3709
3710 /**
3711  * TBF policy configuration
3712  */
3713 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3714         .nc_name                = NRS_POL_NAME_TBF,
3715         .nc_ops                 = &nrs_tbf_ops,
3716         .nc_compat              = nrs_policy_compat_all,
3717 };
3718
3719 /** @} tbf */
3720
3721 /** @} nrs */