From: Li Xi Date: Sun, 3 Nov 2013 17:49:54 +0000 (-0800) Subject: LU-3558 ptlrpc: Add the NRS TBF policy X-Git-Tag: 2.5.54~10 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=33e35c0bf214c2565b8b0ce6b2235b9262aa1b54 LU-3558 ptlrpc: Add the NRS TBF policy The TBF (Token Bucket Filter) policy schedules and throttles all types of RPCs for traffic control purposes. It divides RPCs into different types according to their NIDs or job IDs, and enforces a RPC rate limit on every type. The handling of a RPC will be delayed until there are enough tokens for the type. Different types are scheduled according to their deadlines, so that none of them will be starving even though the service does not have the ability to satisfy all the RPC rate requirments of types. The RPCs with the the same types are queued in a FIFO manner. Signed-off-by: Li Xi Change-Id: I3f73dfbfb451cc44dfe5e0a575ec7ab5b90ac47e Reviewed-on: http://review.whamcloud.com/6901 Reviewed-by: Oleg Drokin Tested-by: Oleg Drokin --- diff --git a/lustre/include/Makefile.am b/lustre/include/Makefile.am index 125669d..831a63d 100644 --- a/lustre/include/Makefile.am +++ b/lustre/include/Makefile.am @@ -75,6 +75,7 @@ EXTRA_DIST = \ lustre_mds.h \ lustre_net.h \ lustre_nodemap.h \ + lustre_nrs_tbf.h \ lustre_param.h \ lustre_quota.h \ lustre_req_layout.h \ diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 17b017c..e7a5d1c 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -822,10 +822,12 @@ struct ptlrpc_nrs_pol_ops { * initialize their resources here; this operation is optional. * * \param[in,out] policy The policy being started + * \param[in,out] arg A generic char buffer * * \see nrs_policy_start_locked() */ - int (*op_policy_start) (struct ptlrpc_nrs_policy *policy); + int (*op_policy_start) (struct ptlrpc_nrs_policy *policy, + char *arg); /** * Called when deactivating a policy via lprocfs; policies deallocate * their resources here; this operation is optional @@ -1103,6 +1105,10 @@ struct ptlrpc_nrs { * unregistration */ unsigned nrs_stopping:1; + /** + * NRS policy is throttling reqeust + */ + unsigned nrs_throttling:1; }; #define NRS_POL_NAME_MAX 16 @@ -1694,6 +1700,8 @@ struct nrs_orr_req { /** @} ORR/TRR */ +#include + /** * NRS request * @@ -1735,6 +1743,10 @@ struct ptlrpc_nrs_request { struct nrs_crrn_req crr; /** ORR and TRR share the same request definition */ struct nrs_orr_req orr; + /** + * TBF request definition + */ + struct nrs_tbf_req tbf; } nr_u; /** * Externally-registering policies may want to use this to allocate diff --git a/lustre/include/lustre_nrs_tbf.h b/lustre/include/lustre_nrs_tbf.h new file mode 100644 index 0000000..57c0a99 --- /dev/null +++ b/lustre/include/lustre_nrs_tbf.h @@ -0,0 +1,277 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. A copy is + * included in the COPYING file that accompanied this code. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * GPL HEADER END + */ +/* + * Copyright (C) 2013 DataDirect Networks, Inc. + * + */ +/* + * + * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy + * + */ + +#ifndef _LUSTRE_NRS_TBF_H +#define _LUSTRE_NRS_TBF_H +#include + +/* \name tbf + * + * TBF policy + * + * @{ + */ + +struct nrs_tbf_head; +struct nrs_tbf_cmd; + +struct nrs_tbf_jobid { + char *tj_id; + cfs_list_t tj_linkage; +}; + +struct nrs_tbf_client { + /** Resource object for policy instance. */ + struct ptlrpc_nrs_resource tc_res; + /** Node in the hash table. */ + cfs_hlist_node_t tc_hnode; + /** NID of the client. */ + lnet_nid_t tc_nid; + /** Jobid of the client. */ + char tc_jobid[JOBSTATS_JOBID_SIZE]; + /** Reference number of the client. */ + cfs_atomic_t tc_ref; + /** Likage to rule. */ + cfs_list_t tc_linkage; + /** Pointer to rule. */ + struct nrs_tbf_rule *tc_rule; + /** Generation of the rule matched. */ + __u64 tc_rule_generation; + /** Limit of RPC rate. */ + __u64 tc_rpc_rate; + /** Time to wait for next token. */ + __u64 tc_nsecs; + /** RPC token number. */ + __u64 tc_ntoken; + /** Token bucket depth. */ + __u64 tc_depth; + /** Time check-point. */ + __u64 tc_check_time; + /** List of queued requests. */ + cfs_list_t tc_list; + /** Node in binary heap. */ + cfs_binheap_node_t tc_node; + /** Whether the client is in heap. */ + bool tc_in_heap; + /** Sequence of the newest rule. */ + __u32 tc_rule_sequence; + /** + * Linkage into LRU list. Protected bucket lock of + * nrs_tbf_head::th_cli_hash. + */ + cfs_list_t tc_lru; +}; + +#define MAX_TBF_NAME (16) + +#define NTRS_STOPPING 0x0000001 +#define NTRS_DEFAULT 0x0000002 + +struct nrs_tbf_rule { + /** Name of the rule. */ + char tr_name[MAX_TBF_NAME]; + /** Head belongs to. */ + struct nrs_tbf_head *tr_head; + /** Likage to head. */ + cfs_list_t tr_linkage; + /** Nid list of the rule. */ + cfs_list_t tr_nids; + /** Nid list string of the rule.*/ + char *tr_nids_str; + /** Jobid list of the rule. */ + cfs_list_t tr_jobids; + /** Jobid list string of the rule.*/ + char *tr_jobids_str; + /** RPC/s limit. */ + __u64 tr_rpc_rate; + /** Time to wait for next token. */ + __u64 tr_nsecs; + /** Token bucket depth. */ + __u64 tr_depth; + /** List of client. */ + cfs_list_t tr_cli_list; + /** Flags of the rule. */ + __u32 tr_flags; + /** Usage Reference count taken on the rule. */ + cfs_atomic_t tr_ref; + /** Generation of the rule. */ + __u64 tr_generation; +}; + +struct nrs_tbf_ops { + char *o_name; + int (*o_startup)(struct ptlrpc_nrs_policy *, struct nrs_tbf_head *); + struct nrs_tbf_client *(*o_cli_find)(struct nrs_tbf_head *, + struct ptlrpc_request *); + struct nrs_tbf_client *(*o_cli_findadd)(struct nrs_tbf_head *, + struct nrs_tbf_client *); + void (*o_cli_put)(struct nrs_tbf_head *, struct nrs_tbf_client *); + void (*o_cli_init)(struct nrs_tbf_client *, struct ptlrpc_request *); + int (*o_rule_init)(struct ptlrpc_nrs_policy *, + struct nrs_tbf_rule *, + struct nrs_tbf_cmd *); + int (*o_rule_dump)(struct nrs_tbf_rule *, + char *, + int); + int (*o_rule_match)(struct nrs_tbf_rule *, + struct nrs_tbf_client *); + void (*o_rule_fini)(struct nrs_tbf_rule *); +}; + +struct nrs_tbf_dump { + char *td_buff; + int td_size; + int td_length; +}; + +#define NRS_TBF_TYPE_JOBID "jobid" +#define NRS_TBF_TYPE_NID "nid" +#define NRS_TBF_TYPE_MAX_LEN 20 +#define NRS_TBF_FLAG_JOBID 0x0000001 +#define NRS_TBF_FLAG_NID 0x0000002 + +struct nrs_tbf_bucket { + /** + * LRU list, updated on each access to client. Protected by + * bucket lock of nrs_tbf_head::th_cli_hash. + */ + cfs_list_t ntb_lru; +}; + +/** + * Private data structure for the TBF policy + */ +struct nrs_tbf_head { + /** + * Resource object for policy instance. + */ + struct ptlrpc_nrs_resource th_res; + /** + * List of rules. + */ + cfs_list_t th_list; + /** + * Lock to protect the list of rules. + */ + spinlock_t th_rule_lock; + /** + * Generation of rules. + */ + cfs_atomic_t th_rule_sequence; + /** + * Default rule. + */ + struct nrs_tbf_rule *th_rule; + /** + * Timer for next token. + */ +#if defined(__KERNEL__) && defined(__linux__) + struct hrtimer th_timer; +#endif + /** + * Deadline of the timer. + */ + __u64 th_deadline; + /** + * Sequence of requests. + */ + __u64 th_sequence; + /** + * Heap of queues. + */ + cfs_binheap_t *th_binheap; + /** + * Hash of clients. + */ + cfs_hash_t *th_cli_hash; + /** + * Type of TBF policy. + */ + char th_type[NRS_TBF_TYPE_MAX_LEN + 1]; + /** + * Rule operations. + */ + struct nrs_tbf_ops *th_ops; + /** + * Flag of type. + */ + __u32 th_type_flag; + /** + * Index of bucket on hash table while purging. + */ + int th_purge_start; +}; + +enum nrs_tbf_cmd_type { + NRS_CTL_TBF_START_RULE = 0, + NRS_CTL_TBF_STOP_RULE, + NRS_CTL_TBF_CHANGE_RATE, +}; + +struct nrs_tbf_cmd { + enum nrs_tbf_cmd_type tc_cmd; + char *tc_name; + __u64 tc_rpc_rate; + cfs_list_t tc_nids; + char *tc_nids_str; + cfs_list_t tc_jobids; + char *tc_jobids_str; + __u32 tc_valid_types; + __u32 tc_rule_flags; +}; + +struct nrs_tbf_req { + /** + * Linkage to queue. + */ + cfs_list_t tr_list; + /** + * Sequence of the request. + */ + __u64 tr_sequence; +}; + +/** + * TBF policy operations. + */ +enum nrs_ctl_tbf { + /** + * Read the the data of a TBF policy. + */ + NRS_CTL_TBF_RD_RULE = PTLRPC_NRS_CTL_1ST_POL_SPEC, + /** + * Write the the data of a TBF policy. + */ + NRS_CTL_TBF_WR_RULE, +}; + +/** @} tbf */ +#endif diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index 3735e6d..ceb4cb0 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -15,7 +15,7 @@ ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o ptlrpc_objs += sec.o sec_ctx.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o ptlrpc_objs += sec_null.o sec_plain.o nrs.o nrs_fifo.o nrs_crr.o nrs_orr.o -ptlrpc_objs += errno.o +ptlrpc_objs += nrs_tbf.o errno.o target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o diff --git a/lustre/ptlrpc/autoMakefile.am b/lustre/ptlrpc/autoMakefile.am index 8db5c73..06975a5 100644 --- a/lustre/ptlrpc/autoMakefile.am +++ b/lustre/ptlrpc/autoMakefile.am @@ -96,6 +96,7 @@ ptlrpc_SOURCES = \ nrs_fifo.c \ nrs_crr.c \ nrs_orr.c \ + nrs_tbf.c \ wiretest.c \ sec.c \ sec_bulk.c \ diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index 024169e..fe6491a1 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -624,11 +624,14 @@ out: RETURN(rc); } + +#define LPROCFS_NRS_WR_MAX_ARG (1024) /** * The longest valid command string is the maxium policy name size, plus the - * length of the " reg" substring + * length of the " reg" substring, plus the lenght of argument */ -#define LPROCFS_NRS_WR_MAX_CMD (NRS_POL_NAME_MAX + sizeof(" reg") - 1) +#define LPROCFS_NRS_WR_MAX_CMD (NRS_POL_NAME_MAX + sizeof(" reg") - 1 \ + + LPROCFS_NRS_WR_MAX_ARG) /** * Starts and stops a given policy on a PTLRPC service. @@ -646,7 +649,8 @@ ptlrpc_lprocfs_nrs_seq_write(struct file *file, const char *buffer, enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH; char *cmd; char *cmd_copy = NULL; - char *token; + char *policy_name; + char *queue_name; int rc = 0; ENTRY; @@ -666,9 +670,9 @@ ptlrpc_lprocfs_nrs_seq_write(struct file *file, const char *buffer, cmd[count] = '\0'; - token = strsep(&cmd, " "); + policy_name = strsep(&cmd, " "); - if (strlen(token) > NRS_POL_NAME_MAX - 1) + if (strlen(policy_name) > NRS_POL_NAME_MAX - 1) GOTO(out, rc = -EINVAL); /** @@ -677,15 +681,20 @@ ptlrpc_lprocfs_nrs_seq_write(struct file *file, const char *buffer, if (cmd == NULL) goto default_queue; + queue_name = strsep(&cmd, " "); /** - * The second token is either NULL, or an optional [reg|hp] string + * The second token is either an optional [reg|hp] string, + * or arguments */ - if (strcmp(cmd, "reg") == 0) + if (strcmp(queue_name, "reg") == 0) queue = PTLRPC_NRS_QUEUE_REG; - else if (strcmp(cmd, "hp") == 0) + else if (strcmp(queue_name, "hp") == 0) queue = PTLRPC_NRS_QUEUE_HP; - else - GOTO(out, rc = -EINVAL); + else { + if (cmd != NULL) + *(cmd - 1) = ' '; + cmd = queue_name; + } default_queue: @@ -700,8 +709,9 @@ default_queue: */ mutex_lock(&nrs_core.nrs_mutex); - rc = ptlrpc_nrs_policy_control(svc, queue, token, PTLRPC_NRS_CTL_START, - false, NULL); + rc = ptlrpc_nrs_policy_control(svc, queue, policy_name, + PTLRPC_NRS_CTL_START, + false, cmd); mutex_unlock(&nrs_core.nrs_mutex); out: diff --git a/lustre/ptlrpc/nrs.c b/lustre/ptlrpc/nrs.c index dc3dabd..4a3c976 100644 --- a/lustre/ptlrpc/nrs.c +++ b/lustre/ptlrpc/nrs.c @@ -200,7 +200,7 @@ static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs) * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In * this case, the fallback policy is only left active in the NRS head. */ -static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy) +static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg) { struct ptlrpc_nrs *nrs = policy->pol_nrs; int rc = 0; @@ -270,7 +270,7 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy) if (policy->pol_desc->pd_ops->op_policy_start) { spin_unlock(&nrs->nrs_lock); - rc = policy->pol_desc->pd_ops->op_policy_start(policy); + rc = policy->pol_desc->pd_ops->op_policy_start(policy, arg); spin_lock(&nrs->nrs_lock); if (rc != 0) { @@ -666,7 +666,7 @@ static int nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name, * Start \e policy */ case PTLRPC_NRS_CTL_START: - rc = nrs_policy_start_locked(policy); + rc = nrs_policy_start_locked(policy, arg); break; } out: @@ -801,7 +801,7 @@ static int nrs_policy_register(struct ptlrpc_nrs *nrs, nrs->nrs_num_pols++; if (policy->pol_flags & PTLRPC_NRS_FL_REG_START) - rc = nrs_policy_start_locked(policy); + rc = nrs_policy_start_locked(policy, NULL); spin_unlock(&nrs->nrs_lock); @@ -948,6 +948,7 @@ static int nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs, spin_lock_init(&nrs->nrs_lock); CFS_INIT_LIST_HEAD(&nrs->nrs_policy_list); CFS_INIT_LIST_HEAD(&nrs->nrs_policy_queued); + nrs->nrs_throttling = 0; rc = nrs_register_policies_locked(nrs); @@ -1625,6 +1626,24 @@ bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp) }; /** + * Returns whether NRS policy is throttling reqeust + * + * \param[in] svcpt the service partition to enquire. + * \param[in] hp whether the regular or high-priority NRS head is to be + * enquired. + * + * \retval false the indicated NRS head has no enqueued requests. + * \retval true the indicated NRS head has some enqueued requests. + */ +bool ptlrpc_nrs_req_throttling_nolock(struct ptlrpc_service_part *svcpt, + bool hp) +{ + struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp); + + return !!nrs->nrs_throttling; +}; + +/** * Moves request \a req from the regular to the high-priority NRS head. * * \param[in] req the request to move @@ -1745,6 +1764,7 @@ extern struct ptlrpc_nrs_pol_conf nrs_conf_crrn; /* ptlrpc/nrs_orr.c */ extern struct ptlrpc_nrs_pol_conf nrs_conf_orr; extern struct ptlrpc_nrs_pol_conf nrs_conf_trr; +extern struct ptlrpc_nrs_pol_conf nrs_conf_tbf; #endif /** @@ -1778,6 +1798,9 @@ int ptlrpc_nrs_init(void) rc = ptlrpc_nrs_policy_register(&nrs_conf_trr); if (rc != 0) GOTO(fail, rc); + rc = ptlrpc_nrs_policy_register(&nrs_conf_tbf); + if (rc != 0) + GOTO(fail, rc); #endif RETURN(rc); diff --git a/lustre/ptlrpc/nrs_crr.c b/lustre/ptlrpc/nrs_crr.c index 9b11547..b64a3c8 100644 --- a/lustre/ptlrpc/nrs_crr.c +++ b/lustre/ptlrpc/nrs_crr.c @@ -181,7 +181,7 @@ static cfs_hash_ops_t nrs_crrn_hash_ops = { * \retval -ENOMEM OOM error * \retval 0 success */ -static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy) +static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy, char *arg) { struct nrs_crrn_net *net; int rc = 0; diff --git a/lustre/ptlrpc/nrs_fifo.c b/lustre/ptlrpc/nrs_fifo.c index a77d533..94fafee 100644 --- a/lustre/ptlrpc/nrs_fifo.c +++ b/lustre/ptlrpc/nrs_fifo.c @@ -79,7 +79,7 @@ * \see nrs_policy_register() * \see nrs_policy_ctl() */ -static int nrs_fifo_start(struct ptlrpc_nrs_policy *policy) +static int nrs_fifo_start(struct ptlrpc_nrs_policy *policy, char *arg) { struct nrs_fifo_head *head; diff --git a/lustre/ptlrpc/nrs_orr.c b/lustre/ptlrpc/nrs_orr.c index 17577b6..ce15db2 100644 --- a/lustre/ptlrpc/nrs_orr.c +++ b/lustre/ptlrpc/nrs_orr.c @@ -609,7 +609,7 @@ static int nrs_orr_init(struct ptlrpc_nrs_policy *policy) * \retval -ENOMEM OOM error * \retval 0 success */ -static int nrs_orr_start(struct ptlrpc_nrs_policy *policy) +static int nrs_orr_start(struct ptlrpc_nrs_policy *policy, char *arg) { struct nrs_orr_data *orrd; cfs_hash_ops_t *ops; diff --git a/lustre/ptlrpc/nrs_tbf.c b/lustre/ptlrpc/nrs_tbf.c new file mode 100644 index 0000000..eb5d955 --- /dev/null +++ b/lustre/ptlrpc/nrs_tbf.c @@ -0,0 +1,1909 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. A copy is + * included in the COPYING file that accompanied this code. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * GPL HEADER END + */ +/* + * Copyright (C) 2013 DataDirect Networks, Inc. + * + */ +/* + * lustre/ptlrpc/nrs_tbf.c + * + * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy + * + */ + +#ifdef HAVE_SERVER_SUPPORT + +/** + * \addtogoup nrs + * @{ + */ + +#define DEBUG_SUBSYSTEM S_RPC +#ifndef __KERNEL__ +#include +#endif +#include +#include +#include +#include "ptlrpc_internal.h" + +/** + * \name tbf + * + * Token Bucket Filter over client NIDs + * + * @{ + */ + +#define NRS_POL_NAME_TBF "tbf" + +int tbf_jobid_cache_size = 8192; +CFS_MODULE_PARM(tbf_jobid_cache_size, "i", int, 0644, + "The size of jobid cache"); + +int tbf_rate = 10000; +CFS_MODULE_PARM(tbf_rate, "i", int, 0644, + "Default rate limit in RPCs/s"); + +int tbf_depth = 3; +CFS_MODULE_PARM(tbf_depth, "i", int, 0644, + "How many tokens that a client can save up"); + +static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer) +{ + struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head, + th_timer); + struct ptlrpc_nrs *nrs = head->th_res.res_policy->pol_nrs; + struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt; + + spin_lock(&nrs->nrs_lock); + nrs->nrs_throttling = 0; + spin_unlock(&nrs->nrs_lock); + wake_up(&svcpt->scp_waitq); + + return HRTIMER_NORESTART; +} + +#define NRS_TBF_DEFAULT_RULE "default" + +static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule) +{ + LASSERT(cfs_atomic_read(&rule->tr_ref) == 0); + LASSERT(cfs_list_empty(&rule->tr_cli_list)); + LASSERT(cfs_list_empty(&rule->tr_linkage)); + + rule->tr_head->th_ops->o_rule_fini(rule); + OBD_FREE_PTR(rule); +} + +/** + * Decreases the rule's usage reference count, and stops the rule in case it + * was already stopping and have no more outstanding usage references (which + * indicates it has no more queued or started requests, and can be safely + * stopped). + */ +static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule) +{ + if (cfs_atomic_dec_and_test(&rule->tr_ref)) + nrs_tbf_rule_fini(rule); +} + +/** + * Increases the rule's usage reference count. + */ +static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule) +{ + cfs_atomic_inc(&rule->tr_ref); +} + +static void +nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli) +{ + LASSERT(!cfs_list_empty(&cli->tc_linkage)); + LASSERT(cli->tc_rule); + cfs_list_del_init(&cli->tc_linkage); + nrs_tbf_rule_put(cli->tc_rule); + cli->tc_rule = NULL; +} + +static void +nrs_tbf_cli_reset_value(struct nrs_tbf_head *head, + struct nrs_tbf_client *cli) + +{ + struct nrs_tbf_rule *rule = cli->tc_rule; + + cli->tc_rpc_rate = rule->tr_rpc_rate; + cli->tc_nsecs = rule->tr_nsecs; + cli->tc_depth = rule->tr_depth; + cli->tc_ntoken = rule->tr_depth; + cli->tc_check_time = ktime_to_ns(ktime_get()); + cli->tc_rule_sequence = cfs_atomic_read(&head->th_rule_sequence); + cli->tc_rule_generation = rule->tr_generation; + + if (cli->tc_in_heap) + cfs_binheap_relocate(head->th_binheap, + &cli->tc_node); +} + +static void +nrs_tbf_cli_reset(struct nrs_tbf_head *head, + struct nrs_tbf_rule *rule, + struct nrs_tbf_client *cli) +{ + if (!cfs_list_empty(&cli->tc_linkage)) { + LASSERT(rule != cli->tc_rule); + nrs_tbf_cli_rule_put(cli); + } + LASSERT(cli->tc_rule == NULL); + LASSERT(cfs_list_empty(&cli->tc_linkage)); + /* Rule's ref is added before called */ + cli->tc_rule = rule; + cfs_list_add_tail(&cli->tc_linkage, &rule->tr_cli_list); + nrs_tbf_cli_reset_value(head, cli); +} + +static int +nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, char *buff, int length) +{ + return rule->tr_head->th_ops->o_rule_dump(rule, buff, length); +} + +static int +nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, char *buff, int length) +{ + struct nrs_tbf_rule *rule; + int rc = 0; + + LASSERT(head != NULL); + spin_lock(&head->th_rule_lock); + /* List the rules from newest to oldest */ + cfs_list_for_each_entry(rule, &head->th_list, tr_linkage) { + LASSERT((rule->tr_flags & NTRS_STOPPING) == 0); + rc += nrs_tbf_rule_dump(rule, buff + rc, length - rc); + } + spin_unlock(&head->th_rule_lock); + + return rc; +} + +static struct nrs_tbf_rule * +nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head, + const char *name) +{ + struct nrs_tbf_rule *rule; + + LASSERT(head != NULL); + cfs_list_for_each_entry(rule, &head->th_list, tr_linkage) { + LASSERT((rule->tr_flags & NTRS_STOPPING) == 0); + if (strcmp(rule->tr_name, name) == 0) { + nrs_tbf_rule_get(rule); + return rule; + } + } + return NULL; +} + +static struct nrs_tbf_rule * +nrs_tbf_rule_find(struct nrs_tbf_head *head, + const char *name) +{ + struct nrs_tbf_rule *rule; + + LASSERT(head != NULL); + spin_lock(&head->th_rule_lock); + rule = nrs_tbf_rule_find_nolock(head, name); + spin_unlock(&head->th_rule_lock); + return rule; +} + +static struct nrs_tbf_rule * +nrs_tbf_rule_match(struct nrs_tbf_head *head, + struct nrs_tbf_client *cli) +{ + struct nrs_tbf_rule *rule = NULL; + struct nrs_tbf_rule *tmp_rule; + + spin_lock(&head->th_rule_lock); + /* Match the newest rule in the list */ + cfs_list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) { + LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0); + if (head->th_ops->o_rule_match(tmp_rule, cli)) { + rule = tmp_rule; + break; + } + } + + if (rule == NULL) + rule = head->th_rule; + + nrs_tbf_rule_get(rule); + spin_unlock(&head->th_rule_lock); + return rule; +} + +static void +nrs_tbf_cli_init(struct nrs_tbf_head *head, + struct nrs_tbf_client *cli, + struct ptlrpc_request *req) +{ + struct nrs_tbf_rule *rule; + + cli->tc_in_heap = false; + head->th_ops->o_cli_init(cli, req); + CFS_INIT_LIST_HEAD(&cli->tc_list); + CFS_INIT_LIST_HEAD(&cli->tc_linkage); + cfs_atomic_set(&cli->tc_ref, 1); + rule = nrs_tbf_rule_match(head, cli); + nrs_tbf_cli_reset(head, rule, cli); +} + +static void +nrs_tbf_cli_fini(struct nrs_tbf_client *cli) +{ + LASSERT(cfs_list_empty(&cli->tc_list)); + LASSERT(!cli->tc_in_heap); + LASSERT(cfs_atomic_read(&cli->tc_ref) == 0); + nrs_tbf_cli_rule_put(cli); + OBD_FREE_PTR(cli); +} + +static int +nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy, + struct nrs_tbf_head *head, + struct nrs_tbf_cmd *start) +{ + struct nrs_tbf_rule *rule, *tmp_rule; + int rc; + + rule = nrs_tbf_rule_find(head, start->tc_name); + if (rule) { + nrs_tbf_rule_put(rule); + return -EEXIST; + } + + OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy)); + if (rule == NULL) + return -ENOMEM; + + memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name)); + rule->tr_rpc_rate = start->tc_rpc_rate; + rule->tr_nsecs = NSEC_PER_SEC / rule->tr_rpc_rate; + rule->tr_depth = tbf_depth; + cfs_atomic_set(&rule->tr_ref, 1); + CFS_INIT_LIST_HEAD(&rule->tr_cli_list); + CFS_INIT_LIST_HEAD(&rule->tr_nids); + + rc = head->th_ops->o_rule_init(policy, rule, start); + if (rc) { + OBD_FREE_PTR(rule); + return rc; + } + + /* Add as the newest rule */ + spin_lock(&head->th_rule_lock); + tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name); + if (tmp_rule) { + nrs_tbf_rule_put(tmp_rule); + nrs_tbf_rule_put(rule); + return -EEXIST; + } + cfs_list_add(&rule->tr_linkage, &head->th_list); + rule->tr_head = head; + spin_unlock(&head->th_rule_lock); + cfs_atomic_inc(&head->th_rule_sequence); + if (start->tc_rule_flags & NTRS_DEFAULT) { + rule->tr_flags |= NTRS_DEFAULT; + LASSERT(head->th_rule == NULL); + head->th_rule = rule; + } + + return 0; +} + +static int +nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy, + struct nrs_tbf_head *head, + struct nrs_tbf_cmd *change) +{ + struct nrs_tbf_rule *rule; + + LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock)); + + rule = nrs_tbf_rule_find(head, change->tc_name); + if (rule == NULL) + return -ENOENT; + + rule->tr_rpc_rate = change->tc_rpc_rate; + rule->tr_nsecs = NSEC_PER_SEC / rule->tr_rpc_rate; + rule->tr_generation++; + nrs_tbf_rule_put(rule); + + return 0; +} + +static int +nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy, + struct nrs_tbf_head *head, + struct nrs_tbf_cmd *stop) +{ + struct nrs_tbf_rule *rule; + + LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock)); + + if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0) + return -EPERM; + + rule = nrs_tbf_rule_find(head, stop->tc_name); + if (rule == NULL) + return -ENOENT; + + cfs_list_del_init(&rule->tr_linkage); + rule->tr_flags |= NTRS_STOPPING; + nrs_tbf_rule_put(rule); + nrs_tbf_rule_put(rule); + + return 0; +} + +static int +nrs_tbf_command(struct ptlrpc_nrs_policy *policy, + struct nrs_tbf_head *head, + struct nrs_tbf_cmd *cmd) +{ + int rc; + + LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock)); + + switch (cmd->tc_cmd) { + case NRS_CTL_TBF_START_RULE: + if (!(cmd->tc_valid_types & head->th_type_flag)) + return -EINVAL; + + spin_unlock(&policy->pol_nrs->nrs_lock); + rc = nrs_tbf_rule_start(policy, head, cmd); + spin_lock(&policy->pol_nrs->nrs_lock); + return rc; + case NRS_CTL_TBF_CHANGE_RATE: + rc = nrs_tbf_rule_change(policy, head, cmd); + return rc; + case NRS_CTL_TBF_STOP_RULE: + rc = nrs_tbf_rule_stop(policy, head, cmd); + /* Take it as a success, if not exists at all */ + return rc == -ENOENT ? 0 : rc; + default: + return -EFAULT; + } +} + +/** + * Binary heap predicate. + * + * \param[in] e1 the first binheap node to compare + * \param[in] e2 the second binheap node to compare + * + * \retval 0 e1 > e2 + * \retval 1 e1 < e2 + */ +static int tbf_cli_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2) +{ + struct nrs_tbf_client *cli1; + struct nrs_tbf_client *cli2; + + cli1 = container_of(e1, struct nrs_tbf_client, tc_node); + cli2 = container_of(e2, struct nrs_tbf_client, tc_node); + + if (cli1->tc_check_time + cli1->tc_nsecs < + cli2->tc_check_time + cli2->tc_nsecs) + return 1; + else if (cli1->tc_check_time + cli1->tc_nsecs > + cli2->tc_check_time + cli2->tc_nsecs) + return 0; + + if (cli1->tc_check_time < cli2->tc_check_time) + return 1; + else if (cli1->tc_check_time > cli2->tc_check_time) + return 0; + + /* Maybe need more comparasion, e.g. request number in the rules */ + return 1; +} + +/** + * TBF binary heap operations + */ +static cfs_binheap_ops_t nrs_tbf_heap_ops = { + .hop_enter = NULL, + .hop_exit = NULL, + .hop_compare = tbf_cli_compare, +}; + +static unsigned nrs_tbf_jobid_hop_hash(cfs_hash_t *hs, const void *key, + unsigned mask) +{ + return cfs_hash_djb2_hash(key, strlen(key), mask); +} + +static int nrs_tbf_jobid_hop_keycmp(const void *key, cfs_hlist_node_t *hnode) +{ + struct nrs_tbf_client *cli = cfs_hlist_entry(hnode, + struct nrs_tbf_client, + tc_hnode); + + return (strcmp(cli->tc_jobid, key) == 0); +} + +static void *nrs_tbf_jobid_hop_key(cfs_hlist_node_t *hnode) +{ + struct nrs_tbf_client *cli = cfs_hlist_entry(hnode, + struct nrs_tbf_client, + tc_hnode); + + return cli->tc_jobid; +} + +static void *nrs_tbf_jobid_hop_object(cfs_hlist_node_t *hnode) +{ + return cfs_hlist_entry(hnode, struct nrs_tbf_client, tc_hnode); +} + +static void nrs_tbf_jobid_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct nrs_tbf_client *cli = cfs_hlist_entry(hnode, + struct nrs_tbf_client, + tc_hnode); + + cfs_atomic_inc(&cli->tc_ref); +} + +static void nrs_tbf_jobid_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct nrs_tbf_client *cli = cfs_hlist_entry(hnode, + struct nrs_tbf_client, + tc_hnode); + + cfs_atomic_dec(&cli->tc_ref); +} + +static void nrs_tbf_jobid_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode) + +{ + struct nrs_tbf_client *cli = cfs_hlist_entry(hnode, + struct nrs_tbf_client, + tc_hnode); + + LASSERT(cfs_atomic_read(&cli->tc_ref) == 0); + nrs_tbf_cli_fini(cli); +} + +static cfs_hash_ops_t nrs_tbf_jobid_hash_ops = { + .hs_hash = nrs_tbf_jobid_hop_hash, + .hs_keycmp = nrs_tbf_jobid_hop_keycmp, + .hs_key = nrs_tbf_jobid_hop_key, + .hs_object = nrs_tbf_jobid_hop_object, + .hs_get = nrs_tbf_jobid_hop_get, + .hs_put = nrs_tbf_jobid_hop_put, + .hs_put_locked = nrs_tbf_jobid_hop_put, + .hs_exit = nrs_tbf_jobid_hop_exit, +}; + +#define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \ + CFS_HASH_NO_ITEMREF | \ + CFS_HASH_DEPTH) + +static struct nrs_tbf_client * +nrs_tbf_jobid_hash_lookup(cfs_hash_t *hs, + cfs_hash_bd_t *bd, + const char *jobid) +{ + cfs_hlist_node_t *hnode; + struct nrs_tbf_client *cli; + + /* cfs_hash_bd_peek_locked is a somehow "internal" function + * of cfs_hash, it doesn't add refcount on object. */ + hnode = cfs_hash_bd_peek_locked(hs, bd, (void *)jobid); + if (hnode == NULL) + return NULL; + + cfs_hash_get(hs, hnode); + cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode); + if (!cfs_list_empty(&cli->tc_lru)) + cfs_list_del_init(&cli->tc_lru); + return cli; +} + +#define NRS_TBF_JOBID_NULL "" + +static struct nrs_tbf_client * +nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head, + struct ptlrpc_request *req) +{ + const char *jobid; + struct nrs_tbf_client *cli; + cfs_hash_t *hs = head->th_cli_hash; + cfs_hash_bd_t bd; + + jobid = lustre_msg_get_jobid(req->rq_reqmsg); + if (jobid == NULL) + jobid = NRS_TBF_JOBID_NULL; + cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1); + cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid); + cfs_hash_bd_unlock(hs, &bd, 1); + + return cli; +} + +static struct nrs_tbf_client * +nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head, + struct nrs_tbf_client *cli) +{ + const char *jobid; + struct nrs_tbf_client *ret; + cfs_hash_t *hs = head->th_cli_hash; + cfs_hash_bd_t bd; + + jobid = cli->tc_jobid; + cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1); + ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid); + if (ret == NULL) { + cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode); + ret = cli; + } + cfs_hash_bd_unlock(hs, &bd, 1); + + return ret; +} + +static void +nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head, + struct nrs_tbf_client *cli) +{ + cfs_hash_bd_t bd; + cfs_hash_t *hs = head->th_cli_hash; + struct nrs_tbf_bucket *bkt; + int hw; + CFS_LIST_HEAD (zombies); + + cfs_hash_bd_get(hs, &cli->tc_jobid, &bd); + bkt = cfs_hash_bd_extra_get(hs, &bd); + if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref)) + return; + LASSERT(cfs_list_empty(&cli->tc_lru)); + cfs_list_add_tail(&cli->tc_lru, &bkt->ntb_lru); + + /* + * Check and purge the LRU, there is at least one client in the LRU. + */ + hw = tbf_jobid_cache_size >> + (hs->hs_cur_bits - hs->hs_bkt_bits); + while (cfs_hash_bd_count_get(&bd) > hw) { + if (unlikely(cfs_list_empty(&bkt->ntb_lru))) + break; + cli = cfs_list_entry(bkt->ntb_lru.next, + struct nrs_tbf_client, + tc_lru); + LASSERT(cfs_atomic_read(&cli->tc_ref) == 0); + cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode); + cfs_list_move(&cli->tc_lru, &zombies); + } + cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1); + + while (!cfs_list_empty(&zombies)) { + cli = container_of0(zombies.next, + struct nrs_tbf_client, tc_lru); + cfs_list_del_init(&cli->tc_lru); + nrs_tbf_cli_fini(cli); + } +} + +static void +nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli, + struct ptlrpc_request *req) +{ + char *jobid = lustre_msg_get_jobid(req->rq_reqmsg); + + if (jobid == NULL) + jobid = NRS_TBF_JOBID_NULL; + LASSERT(strlen(jobid) < JOBSTATS_JOBID_SIZE); + CFS_INIT_LIST_HEAD(&cli->tc_lru); + memcpy(cli->tc_jobid, jobid, strlen(jobid)); +} + +static int nrs_tbf_jobid_hash_order(void) +{ + int bits; + + for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits) + ; + + return bits; +} + +#define NRS_TBF_JOBID_BKT_BITS 10 + +static int +nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy, + struct nrs_tbf_head *head) +{ + struct nrs_tbf_cmd start; + struct nrs_tbf_bucket *bkt; + int bits; + int i; + int rc; + cfs_hash_bd_t bd; + + bits = nrs_tbf_jobid_hash_order(); + if (bits < NRS_TBF_JOBID_BKT_BITS) + bits = NRS_TBF_JOBID_BKT_BITS; + head->th_cli_hash = cfs_hash_create("nrs_tbf_hash", + bits, + bits, + NRS_TBF_JOBID_BKT_BITS, + sizeof(*bkt), + 0, + 0, + &nrs_tbf_jobid_hash_ops, + NRS_TBF_JOBID_HASH_FLAGS); + if (head->th_cli_hash == NULL) + return -ENOMEM; + + cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) { + bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd); + CFS_INIT_LIST_HEAD(&bkt->ntb_lru); + } + + memset(&start, 0, sizeof(start)); + start.tc_jobids_str = "*"; + + start.tc_rpc_rate = tbf_rate; + start.tc_rule_flags = NTRS_DEFAULT; + start.tc_name = NRS_TBF_DEFAULT_RULE; + CFS_INIT_LIST_HEAD(&start.tc_jobids); + rc = nrs_tbf_rule_start(policy, head, &start); + + return rc; +} + +/** + * Frees jobid of \a list. + * + */ +static void +nrs_tbf_jobid_list_free(cfs_list_t *jobid_list) +{ + struct nrs_tbf_jobid *jobid, *n; + + cfs_list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) { + OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1); + cfs_list_del(&jobid->tj_linkage); + OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid)); + } +} + +static int +nrs_tbf_jobid_list_add(const struct cfs_lstr *id, cfs_list_t *jobid_list) +{ + struct nrs_tbf_jobid *jobid; + + OBD_ALLOC(jobid, sizeof(struct nrs_tbf_jobid)); + if (jobid == NULL) + return -ENOMEM; + + OBD_ALLOC(jobid->tj_id, id->ls_len + 1); + if (jobid->tj_id == NULL) { + OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid)); + return -ENOMEM; + } + + memcpy(jobid->tj_id, id->ls_str, id->ls_len); + cfs_list_add_tail(&jobid->tj_linkage, jobid_list); + return 0; +} + +static int +nrs_tbf_jobid_list_match(cfs_list_t *jobid_list, char *id) +{ + struct nrs_tbf_jobid *jobid; + + cfs_list_for_each_entry(jobid, jobid_list, tj_linkage) { + if (strcmp(id, jobid->tj_id) == 0) + return 1; + } + return 0; +} + +static int +nrs_tbf_jobid_list_parse(char *str, int len, cfs_list_t *jobid_list) +{ + struct cfs_lstr src; + struct cfs_lstr res; + int rc = 0; + ENTRY; + + src.ls_str = str; + src.ls_len = len; + CFS_INIT_LIST_HEAD(jobid_list); + while (src.ls_str) { + rc = cfs_gettok(&src, ' ', &res); + if (rc == 0) { + rc = -EINVAL; + break; + } + rc = nrs_tbf_jobid_list_add(&res, jobid_list); + if (rc) + break; + } + if (rc) + nrs_tbf_jobid_list_free(jobid_list); + RETURN(rc); +} + +static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd) +{ + if (!cfs_list_empty(&cmd->tc_jobids)) + nrs_tbf_jobid_list_free(&cmd->tc_jobids); + if (cmd->tc_jobids_str) + OBD_FREE(cmd->tc_jobids_str, strlen(cmd->tc_jobids_str) + 1); +} + +static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, const char *id) +{ + int rc; + + OBD_ALLOC(cmd->tc_jobids_str, strlen(id) + 1); + if (cmd->tc_jobids_str == NULL) + return -ENOMEM; + + memcpy(cmd->tc_jobids_str, id, strlen(id)); + + /* parse jobid list */ + rc = nrs_tbf_jobid_list_parse(cmd->tc_jobids_str, + strlen(cmd->tc_jobids_str), + &cmd->tc_jobids); + if (rc) + nrs_tbf_jobid_cmd_fini(cmd); + + return rc; +} + +static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy, + struct nrs_tbf_rule *rule, + struct nrs_tbf_cmd *start) +{ + int rc = 0; + + LASSERT(start->tc_jobids_str); + OBD_ALLOC(rule->tr_jobids_str, + strlen(start->tc_jobids_str) + 1); + if (rule->tr_jobids_str == NULL) + return -ENOMEM; + + memcpy(rule->tr_jobids_str, + start->tc_jobids_str, + strlen(start->tc_jobids_str)); + + CFS_INIT_LIST_HEAD(&rule->tr_jobids); + if (!cfs_list_empty(&start->tc_jobids)) { + rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str, + strlen(rule->tr_jobids_str), + &rule->tr_jobids); + if (rc) + CERROR("jobids {%s} illegal\n", rule->tr_jobids_str); + } + if (rc) + OBD_FREE(rule->tr_jobids_str, + strlen(start->tc_jobids_str) + 1); + return rc; +} + +static int +nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, char *buff, int length) +{ + return snprintf(buff, length, "%s {%s} %llu, ref %d\n", + rule->tr_name, + rule->tr_jobids_str, + rule->tr_rpc_rate, + cfs_atomic_read(&rule->tr_ref) - 1); +} + +static int +nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule, + struct nrs_tbf_client *cli) +{ + return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid); +} + +static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule) +{ + if (!cfs_list_empty(&rule->tr_jobids)) + nrs_tbf_jobid_list_free(&rule->tr_jobids); + LASSERT(rule->tr_jobids_str != NULL); + OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1); +} + +struct nrs_tbf_ops nrs_tbf_jobid_ops = { + .o_name = NRS_TBF_TYPE_JOBID, + .o_startup = nrs_tbf_jobid_startup, + .o_cli_find = nrs_tbf_jobid_cli_find, + .o_cli_findadd = nrs_tbf_jobid_cli_findadd, + .o_cli_put = nrs_tbf_jobid_cli_put, + .o_cli_init = nrs_tbf_jobid_cli_init, + .o_rule_init = nrs_tbf_jobid_rule_init, + .o_rule_dump = nrs_tbf_jobid_rule_dump, + .o_rule_match = nrs_tbf_jobid_rule_match, + .o_rule_fini = nrs_tbf_jobid_rule_fini, +}; + +/** + * libcfs_hash operations for nrs_tbf_net::cn_cli_hash + * + * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash + * nrs_tbf_client objects. + */ +#define NRS_TBF_NID_BKT_BITS 8 +#define NRS_TBF_NID_BITS 16 + +static unsigned nrs_tbf_nid_hop_hash(cfs_hash_t *hs, const void *key, + unsigned mask) +{ + return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask); +} + +static int nrs_tbf_nid_hop_keycmp(const void *key, cfs_hlist_node_t *hnode) +{ + lnet_nid_t *nid = (lnet_nid_t *)key; + struct nrs_tbf_client *cli = cfs_hlist_entry(hnode, + struct nrs_tbf_client, + tc_hnode); + + return *nid == cli->tc_nid; +} + +static void *nrs_tbf_nid_hop_key(cfs_hlist_node_t *hnode) +{ + struct nrs_tbf_client *cli = cfs_hlist_entry(hnode, + struct nrs_tbf_client, + tc_hnode); + + return &cli->tc_nid; +} + +static void *nrs_tbf_nid_hop_object(cfs_hlist_node_t *hnode) +{ + return cfs_hlist_entry(hnode, struct nrs_tbf_client, tc_hnode); +} + +static void nrs_tbf_nid_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct nrs_tbf_client *cli = cfs_hlist_entry(hnode, + struct nrs_tbf_client, + tc_hnode); + + cfs_atomic_inc(&cli->tc_ref); +} + +static void nrs_tbf_nid_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct nrs_tbf_client *cli = cfs_hlist_entry(hnode, + struct nrs_tbf_client, + tc_hnode); + + cfs_atomic_dec(&cli->tc_ref); +} + +static void nrs_tbf_nid_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct nrs_tbf_client *cli = cfs_hlist_entry(hnode, + struct nrs_tbf_client, + tc_hnode); + + LASSERTF(cfs_atomic_read(&cli->tc_ref) == 0, + "Busy TBF object from client with NID %s, with %d refs\n", + libcfs_nid2str(cli->tc_nid), cfs_atomic_read(&cli->tc_ref)); + + nrs_tbf_cli_fini(cli); +} + +static cfs_hash_ops_t nrs_tbf_nid_hash_ops = { + .hs_hash = nrs_tbf_nid_hop_hash, + .hs_keycmp = nrs_tbf_nid_hop_keycmp, + .hs_key = nrs_tbf_nid_hop_key, + .hs_object = nrs_tbf_nid_hop_object, + .hs_get = nrs_tbf_nid_hop_get, + .hs_put = nrs_tbf_nid_hop_put, + .hs_put_locked = nrs_tbf_nid_hop_put, + .hs_exit = nrs_tbf_nid_hop_exit, +}; + +static struct nrs_tbf_client * +nrs_tbf_nid_cli_find(struct nrs_tbf_head *head, + struct ptlrpc_request *req) +{ + return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid); +} + +static struct nrs_tbf_client * +nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head, + struct nrs_tbf_client *cli) +{ + return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid, + &cli->tc_hnode); +} + +static void +nrs_tbf_nid_cli_put(struct nrs_tbf_head *head, + struct nrs_tbf_client *cli) +{ + cfs_hash_put(head->th_cli_hash, &cli->tc_hnode); +} + +static int +nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy, + struct nrs_tbf_head *head) +{ + struct nrs_tbf_cmd start; + int rc; + + head->th_cli_hash = cfs_hash_create("nrs_tbf_hash", + NRS_TBF_NID_BITS, + NRS_TBF_NID_BITS, + NRS_TBF_NID_BKT_BITS, 0, + CFS_HASH_MIN_THETA, + CFS_HASH_MAX_THETA, + &nrs_tbf_nid_hash_ops, + CFS_HASH_RW_BKTLOCK); + if (head->th_cli_hash == NULL) + return -ENOMEM; + + memset(&start, 0, sizeof(start)); + start.tc_nids_str = "*"; + + start.tc_rpc_rate = tbf_rate; + start.tc_rule_flags = NTRS_DEFAULT; + start.tc_name = NRS_TBF_DEFAULT_RULE; + CFS_INIT_LIST_HEAD(&start.tc_nids); + rc = nrs_tbf_rule_start(policy, head, &start); + + return rc; +} + +static void +nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli, + struct ptlrpc_request *req) +{ + cli->tc_nid = req->rq_peer.nid; +} + +static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy, + struct nrs_tbf_rule *rule, + struct nrs_tbf_cmd *start) +{ + LASSERT(start->tc_nids_str); + OBD_ALLOC(rule->tr_nids_str, + strlen(start->tc_nids_str) + 1); + if (rule->tr_nids_str == NULL) + return -ENOMEM; + + memcpy(rule->tr_nids_str, + start->tc_nids_str, + strlen(start->tc_nids_str)); + + CFS_INIT_LIST_HEAD(&rule->tr_nids); + if (!cfs_list_empty(&start->tc_nids)) { + if (cfs_parse_nidlist(rule->tr_nids_str, + strlen(rule->tr_nids_str), + &rule->tr_nids) <= 0) { + CERROR("nids {%s} illegal\n", + rule->tr_nids_str); + OBD_FREE(rule->tr_nids_str, + strlen(start->tc_nids_str) + 1); + return -EINVAL; + } + } + return 0; +} + +static int +nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, char *buff, int length) +{ + return snprintf(buff, length, "%s {%s} %llu, ref %d\n", + rule->tr_name, + rule->tr_nids_str, + rule->tr_rpc_rate, + cfs_atomic_read(&rule->tr_ref) - 1); +} + +static int +nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule, + struct nrs_tbf_client *cli) +{ + return cfs_match_nid(cli->tc_nid, &rule->tr_nids); +} + +static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule) +{ + if (!cfs_list_empty(&rule->tr_nids)) + cfs_free_nidlist(&rule->tr_nids); + LASSERT(rule->tr_nids_str != NULL); + OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1); +} + +static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd) +{ + if (!cfs_list_empty(&cmd->tc_nids)) + cfs_free_nidlist(&cmd->tc_nids); + if (cmd->tc_nids_str) + OBD_FREE(cmd->tc_nids_str, strlen(cmd->tc_nids_str) + 1); +} + +static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, const char *id) +{ + OBD_ALLOC(cmd->tc_nids_str, strlen(id) + 1); + if (cmd->tc_nids_str == NULL) + return -ENOMEM; + + memcpy(cmd->tc_nids_str, id, strlen(id)); + + /* parse NID list */ + if (cfs_parse_nidlist(cmd->tc_nids_str, + strlen(cmd->tc_nids_str), + &cmd->tc_nids) <= 0) { + nrs_tbf_nid_cmd_fini(cmd); + return -EINVAL; + } + + return 0; +} + +struct nrs_tbf_ops nrs_tbf_nid_ops = { + .o_name = NRS_TBF_TYPE_NID, + .o_startup = nrs_tbf_nid_startup, + .o_cli_find = nrs_tbf_nid_cli_find, + .o_cli_findadd = nrs_tbf_nid_cli_findadd, + .o_cli_put = nrs_tbf_nid_cli_put, + .o_cli_init = nrs_tbf_nid_cli_init, + .o_rule_init = nrs_tbf_nid_rule_init, + .o_rule_dump = nrs_tbf_nid_rule_dump, + .o_rule_match = nrs_tbf_nid_rule_match, + .o_rule_fini = nrs_tbf_nid_rule_fini, +}; + +/** + * Is called before the policy transitions into + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a + * policy-specific private data structure. + * + * \param[in] policy The policy to start + * + * \retval -ENOMEM OOM error + * \retval 0 success + * + * \see nrs_policy_register() + * \see nrs_policy_ctl() + */ +static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg) +{ + struct nrs_tbf_head *head; + struct nrs_tbf_ops *ops; + __u32 type; + int rc = 0; + + if (arg == NULL || strlen(arg) > NRS_TBF_TYPE_MAX_LEN) + GOTO(out, rc = -EINVAL); + + if (strcmp(arg, NRS_TBF_TYPE_NID) == 0) { + ops = &nrs_tbf_nid_ops; + type = NRS_TBF_FLAG_NID; + } else if (strcmp(arg, NRS_TBF_TYPE_JOBID) == 0) { + ops = &nrs_tbf_jobid_ops; + type = NRS_TBF_FLAG_JOBID; + } else + GOTO(out, rc = -ENOTSUPP); + + OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy)); + if (head == NULL) + GOTO(out, rc = -ENOMEM); + + memcpy(head->th_type, arg, strlen(arg)); + head->th_type[strlen(arg)] = '\0'; + head->th_ops = ops; + head->th_type_flag = type; + + head->th_binheap = cfs_binheap_create(&nrs_tbf_heap_ops, + CBH_FLAG_ATOMIC_GROW, 4096, NULL, + nrs_pol2cptab(policy), + nrs_pol2cptid(policy)); + if (head->th_binheap == NULL) + GOTO(out_free_head, rc = -ENOMEM); + + cfs_atomic_set(&head->th_rule_sequence, 0); + spin_lock_init(&head->th_rule_lock); + CFS_INIT_LIST_HEAD(&head->th_list); + hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS); + head->th_timer.function = nrs_tbf_timer_cb; + rc = head->th_ops->o_startup(policy, head); + if (rc) + GOTO(out_free_heap, rc); + + policy->pol_private = head; + return 0; +out_free_heap: + cfs_binheap_destroy(head->th_binheap); +out_free_head: + OBD_FREE_PTR(head); +out: + return rc; +} + +/** + * Is called before the policy transitions into + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific + * private data structure. + * + * \param[in] policy The policy to stop + * + * \see nrs_policy_stop0() + */ +static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy) +{ + struct nrs_tbf_head *head = policy->pol_private; + struct ptlrpc_nrs *nrs = policy->pol_nrs; + struct nrs_tbf_rule *rule, *n; + + LASSERT(head != NULL); + LASSERT(head->th_cli_hash != NULL); + hrtimer_cancel(&head->th_timer); + /* Should cleanup hash first before free rules */ + cfs_hash_putref(head->th_cli_hash); + cfs_list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) { + cfs_list_del_init(&rule->tr_linkage); + nrs_tbf_rule_put(rule); + } + LASSERT(cfs_list_empty(&head->th_list)); + LASSERT(head->th_binheap != NULL); + LASSERT(cfs_binheap_is_empty(head->th_binheap)); + cfs_binheap_destroy(head->th_binheap); + OBD_FREE_PTR(head); + spin_lock(&nrs->nrs_lock); + nrs->nrs_throttling = 0; + spin_unlock(&nrs->nrs_lock); + wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq); +} + +/** + * Performs a policy-specific ctl function on TBF policy instances; similar + * to ioctl. + * + * \param[in] policy the policy instance + * \param[in] opc the opcode + * \param[in,out] arg used for passing parameters and information + * + * \pre spin_is_locked(&policy->pol_nrs->->nrs_lock) + * \post spin_is_locked(&policy->pol_nrs->->nrs_lock) + * + * \retval 0 operation carried out successfully + * \retval -ve error + */ +int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc, + void *arg) +{ + int rc = 0; + ENTRY; + + LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock)); + + switch (opc) { + default: + RETURN(-EINVAL); + + /** + * Read RPC rate size of a policy instance. + */ + case NRS_CTL_TBF_RD_RULE: { + struct nrs_tbf_head *head = policy->pol_private; + struct ptlrpc_service_part *svcpt; + struct nrs_tbf_dump *dump; + int length; + + dump = (struct nrs_tbf_dump *)arg; + + svcpt = policy->pol_nrs->nrs_svcpt; + length = snprintf(dump->td_buff, dump->td_size, + "CPT %d:\n", + svcpt->scp_cpt); + dump->td_length += length; + dump->td_buff += length; + dump->td_size -= length; + + length = nrs_tbf_rule_dump_all(head, + dump->td_buff, + dump->td_size); + dump->td_length += length; + dump->td_buff += length; + dump->td_size -= length; + } + break; + + /** + * Write RPC rate of a policy instance. + */ + case NRS_CTL_TBF_WR_RULE: { + struct nrs_tbf_head *head = policy->pol_private; + struct nrs_tbf_cmd *cmd; + + cmd = (struct nrs_tbf_cmd *)arg; + rc = nrs_tbf_command(policy, + head, + cmd); + } + break; + } + + RETURN(rc); +} + +/** + * Is called for obtaining a TBF policy resource. + * + * \param[in] policy The policy on which the request is being asked for + * \param[in] nrq The request for which resources are being taken + * \param[in] parent Parent resource, unused in this policy + * \param[out] resp Resources references are placed in this array + * \param[in] moving_req Signifies limited caller context; unused in this + * policy + * + * + * \see nrs_resource_get_safe() + */ +static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq, + const struct ptlrpc_nrs_resource *parent, + struct ptlrpc_nrs_resource **resp, + bool moving_req) +{ + struct nrs_tbf_head *head; + struct nrs_tbf_client *cli; + struct nrs_tbf_client *tmp; + struct ptlrpc_request *req; + + if (parent == NULL) { + *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res; + return 0; + } + + head = container_of(parent, struct nrs_tbf_head, th_res); + req = container_of(nrq, struct ptlrpc_request, rq_nrq); + cli = head->th_ops->o_cli_find(head, req); + if (cli != NULL) { + spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock); + LASSERT(cli->tc_rule); + if (cli->tc_rule_sequence != + cfs_atomic_read(&head->th_rule_sequence) || + cli->tc_rule->tr_flags & NTRS_STOPPING) { + struct nrs_tbf_rule *rule; + + rule = nrs_tbf_rule_match(head, cli); + if (rule != cli->tc_rule) + nrs_tbf_cli_reset(head, rule, cli); + else + nrs_tbf_rule_put(rule); + } else if (cli->tc_rule_generation != + cli->tc_rule->tr_generation) { + nrs_tbf_cli_reset_value(head, cli); + } + spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock); + goto out; + } + + OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy), + sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO); + if (cli == NULL) + return -ENOMEM; + nrs_tbf_cli_init(head, cli, req); + tmp = head->th_ops->o_cli_findadd(head, cli); + if (tmp != cli) { + cfs_atomic_dec(&cli->tc_ref); + nrs_tbf_cli_fini(cli); + cli = tmp; + } +out: + *resp = &cli->tc_res; + + return 1; +} + +/** + * Called when releasing references to the resource hierachy obtained for a + * request for scheduling using the TBF policy. + * + * \param[in] policy the policy the resource belongs to + * \param[in] res the resource to be released + */ +static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy, + const struct ptlrpc_nrs_resource *res) +{ + struct nrs_tbf_head *head; + struct nrs_tbf_client *cli; + + /** + * Do nothing for freeing parent, nrs_tbf_net resources + */ + if (res->res_parent == NULL) + return; + + cli = container_of(res, struct nrs_tbf_client, tc_res); + head = container_of(res->res_parent, struct nrs_tbf_head, th_res); + + head->th_ops->o_cli_put(head, cli); +} + +/** + * Called when getting a request from the TBF policy for handling, or just + * peeking; removes the request from the policy when it is to be handled. + * + * \param[in] policy The policy + * \param[in] peek When set, signifies that we just want to examine the + * request, and not handle it, so the request is not removed + * from the policy. + * \param[in] force Force the policy to return a request; unused in this + * policy + * + * \retval The request to be handled; this is the next request in the TBF + * rule + * + * \see ptlrpc_nrs_req_get_nolock() + * \see nrs_request_get() + */ +static +struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy, + bool peek, bool force) +{ + struct nrs_tbf_head *head = policy->pol_private; + struct ptlrpc_nrs_request *nrq = NULL; + struct nrs_tbf_client *cli; + cfs_binheap_node_t *node; + + LASSERT(spin_is_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock)); + + if (!peek && policy->pol_nrs->nrs_throttling) + return NULL; + + node = cfs_binheap_root(head->th_binheap); + if (unlikely(node == NULL)) + return NULL; + + cli = container_of(node, struct nrs_tbf_client, tc_node); + LASSERT(cli->tc_in_heap); + if (peek) { + nrq = cfs_list_entry(cli->tc_list.next, + struct ptlrpc_nrs_request, + nr_u.tbf.tr_list); + } else { + __u64 now = ktime_to_ns(ktime_get()); + __u64 passed; + long ntoken; + __u64 deadline; + + deadline = cli->tc_check_time + + cli->tc_nsecs; + LASSERT(now >= cli->tc_check_time); + passed = now - cli->tc_check_time; + ntoken = (passed * cli->tc_rpc_rate) / NSEC_PER_SEC; + ntoken += cli->tc_ntoken; + if (ntoken > cli->tc_depth) + ntoken = cli->tc_depth; + if (ntoken > 0) { + struct ptlrpc_request *req; + nrq = cfs_list_entry(cli->tc_list.next, + struct ptlrpc_nrs_request, + nr_u.tbf.tr_list); + req = container_of(nrq, + struct ptlrpc_request, + rq_nrq); + ntoken--; + cli->tc_ntoken = ntoken; + cli->tc_check_time = now; + cfs_list_del_init(&nrq->nr_u.tbf.tr_list); + if (cfs_list_empty(&cli->tc_list)) { + cfs_binheap_remove(head->th_binheap, + &cli->tc_node); + cli->tc_in_heap = false; + } else { + cfs_binheap_relocate(head->th_binheap, + &cli->tc_node); + } + CDEBUG(D_RPCTRACE, + "NRS start %s request from %s, " + "seq: "LPU64"\n", + policy->pol_desc->pd_name, + libcfs_id2str(req->rq_peer), + nrq->nr_u.tbf.tr_sequence); + } else { + ktime_t time; + + spin_lock(&policy->pol_nrs->nrs_lock); + policy->pol_nrs->nrs_throttling = 1; + spin_unlock(&policy->pol_nrs->nrs_lock); + head->th_deadline = deadline; + time = ktime_set(0, 0); + time = ktime_add_ns(time, deadline); + hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS); + } + } + + return nrq; +} + +/** + * Adds request \a nrq to \a policy's list of queued requests + * + * \param[in] policy The policy + * \param[in] nrq The request to add + * + * \retval 0 success; nrs_request_enqueue() assumes this function will always + * succeed + */ +static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq) +{ + struct nrs_tbf_head *head; + struct nrs_tbf_client *cli; + int rc = 0; + + LASSERT(spin_is_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock)); + + cli = container_of(nrs_request_resource(nrq), + struct nrs_tbf_client, tc_res); + head = container_of(nrs_request_resource(nrq)->res_parent, + struct nrs_tbf_head, th_res); + if (cfs_list_empty(&cli->tc_list)) { + LASSERT(!cli->tc_in_heap); + rc = cfs_binheap_insert(head->th_binheap, &cli->tc_node); + if (rc == 0) { + cli->tc_in_heap = true; + nrq->nr_u.tbf.tr_sequence = head->th_sequence++; + cfs_list_add_tail(&nrq->nr_u.tbf.tr_list, + &cli->tc_list); + if (policy->pol_nrs->nrs_throttling) { + __u64 deadline = cli->tc_check_time + + cli->tc_nsecs; + if ((head->th_deadline > deadline) && + (hrtimer_try_to_cancel(&head->th_timer) + >= 0)) { + ktime_t time; + head->th_deadline = deadline; + time = ktime_set(0, 0); + time = ktime_add_ns(time, deadline); + hrtimer_start(&head->th_timer, time, + HRTIMER_MODE_ABS); + } + } + } + } else { + LASSERT(cli->tc_in_heap); + nrq->nr_u.tbf.tr_sequence = head->th_sequence++; + cfs_list_add_tail(&nrq->nr_u.tbf.tr_list, + &cli->tc_list); + } + return rc; +} + +/** + * Removes request \a nrq from \a policy's list of queued requests. + * + * \param[in] policy The policy + * \param[in] nrq The request to remove + */ +static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq) +{ + struct nrs_tbf_head *head; + struct nrs_tbf_client *cli; + + LASSERT(spin_is_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock)); + + cli = container_of(nrs_request_resource(nrq), + struct nrs_tbf_client, tc_res); + head = container_of(nrs_request_resource(nrq)->res_parent, + struct nrs_tbf_head, th_res); + + LASSERT(!cfs_list_empty(&nrq->nr_u.tbf.tr_list)); + cfs_list_del_init(&nrq->nr_u.tbf.tr_list); + if (cfs_list_empty(&cli->tc_list)) { + cfs_binheap_remove(head->th_binheap, + &cli->tc_node); + cli->tc_in_heap = false; + } else { + cfs_binheap_relocate(head->th_binheap, + &cli->tc_node); + } +} + +/** + * Prints a debug statement right before the request \a nrq stops being + * handled. + * + * \param[in] policy The policy handling the request + * \param[in] nrq The request being handled + * + * \see ptlrpc_server_finish_request() + * \see ptlrpc_nrs_req_stop_nolock() + */ +static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq) +{ + struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request, + rq_nrq); + + LASSERT(spin_is_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock)); + + CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: "LPU64"\n", + policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer), + nrq->nr_u.tbf.tr_sequence); +} + +#ifdef LPROCFS + +/** + * lprocfs interface + */ + +/** + * The maximum RPC rate. + */ +#define LPROCFS_NRS_RATE_MAX 65535 + +static int ptlrpc_lprocfs_rd_nrs_tbf_rule(char *page, char **start, + off_t off, int count, int *eof, + void *data) +{ + struct ptlrpc_service *svc = data; + int rc; + int rc2; + struct nrs_tbf_dump dump; + + rc2 = snprintf(page, count, "regular_requests:\n"); + /** + * Perform two separate calls to this as only one of the NRS heads' + * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state. + */ + dump.td_length = 0; + dump.td_buff = page + rc2; + dump.td_size = count - rc2; + rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG, + NRS_POL_NAME_TBF, + NRS_CTL_TBF_RD_RULE, + false, &dump); + if (rc == 0) { + *eof = 1; + rc2 += dump.td_length; + /** + * Ignore -ENODEV as the regular NRS head's policy may be in the + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state. + */ + } else if (rc != -ENODEV) { + return rc; + } + + if (!nrs_svc_has_hp(svc)) + goto no_hp; + + rc2 += snprintf(page + rc2, count - rc2, "high_priority_requests:\n"); + dump.td_length = 0; + dump.td_buff = page + rc2; + dump.td_size = count - rc2; + rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP, + NRS_POL_NAME_TBF, + NRS_CTL_TBF_RD_RULE, + false, &dump); + if (rc == 0) { + *eof = 1; + rc2 += dump.td_length; + /** + * Ignore -ENODEV as the high priority NRS head's policy may be + * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state. + */ + } else if (rc != -ENODEV) { + return rc; + } + +no_hp: + + return rc2 ? : rc; +} + +static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char **val) +{ + int rc; + char *token; + + token = strsep(val, "}"); + if (*val == NULL) + GOTO(out, rc = -EINVAL); + + if (strlen(token) <= 1 || + token[0] != '{') + GOTO(out, rc = -EINVAL); + /* Skip '{' */ + token++; + + /* Should be followed by ' ' or nothing */ + if ((*val)[0] == '\0') + *val = NULL; + else if ((*val)[0] == ' ') + (*val)++; + else + GOTO(out, rc = -EINVAL); + + rc = nrs_tbf_jobid_parse(cmd, token); + if (!rc) + cmd->tc_valid_types |= NRS_TBF_FLAG_JOBID; + + rc = nrs_tbf_nid_parse(cmd, token); + if (!rc) + cmd->tc_valid_types |= NRS_TBF_FLAG_NID; + + if (!cmd->tc_valid_types) + rc = -EINVAL; + else + rc = 0; +out: + return rc; +} + + +static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd) +{ + if (cmd->tc_valid_types & NRS_TBF_FLAG_JOBID) + nrs_tbf_jobid_cmd_fini(cmd); + if (cmd->tc_valid_types & NRS_TBF_FLAG_NID) + nrs_tbf_nid_cmd_fini(cmd); +} + +static struct nrs_tbf_cmd * +nrs_tbf_parse_cmd(char *buffer, unsigned long count) +{ + static struct nrs_tbf_cmd *cmd; + char *token; + char *val; + int i; + int rc = 0; + + OBD_ALLOC_PTR(cmd); + if (cmd == NULL) + GOTO(out, rc = -ENOMEM); + + val = buffer; + token = strsep(&val, " "); + if (val == NULL || strlen(val) == 0) + GOTO(out_free_cmd, rc = -EINVAL); + + /* Type of the command */ + if (strcmp(token, "start") == 0) + cmd->tc_cmd = NRS_CTL_TBF_START_RULE; + else if (strcmp(token, "stop") == 0) + cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE; + else if (strcmp(token, "change") == 0) + cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RATE; + else + GOTO(out_free_cmd, rc = -EINVAL); + + /* Name of the rule */ + token = strsep(&val, " "); + if (val == NULL) { + /** + * Stop comand only need name argument, + * But other commands need ID or rate argument. + */ + if (cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) + GOTO(out_free_cmd, rc = -EINVAL); + } + + for (i = 0; i < strlen(token); i++) { + if ((!isalnum(token[i])) && + (token[i] != '_')) + GOTO(out_free_cmd, rc = -EINVAL); + } + cmd->tc_name = token; + + if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) { + /* List of ID */ + LASSERT(val); + rc = nrs_tbf_id_parse(cmd, &val); + if (rc) + GOTO(out_free_cmd, rc); + } + + if (val != NULL) { + if (cmd->tc_cmd == NRS_CTL_TBF_STOP_RULE || + strlen(val) == 0 || !isdigit(val[0])) + GOTO(out_free_nid, rc = -EINVAL); + + cmd->tc_rpc_rate = simple_strtoull(val, NULL, 10); + if (cmd->tc_rpc_rate <= 0 || + cmd->tc_rpc_rate >= LPROCFS_NRS_RATE_MAX) + GOTO(out_free_nid, rc = -EINVAL); + } else { + if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RATE) + GOTO(out_free_nid, rc = -EINVAL); + /* No RPC rate given */ + cmd->tc_rpc_rate = tbf_rate; + } + goto out; +out_free_nid: + nrs_tbf_cmd_fini(cmd); +out_free_cmd: + OBD_FREE_PTR(cmd); +out: + if (rc) + cmd = ERR_PTR(rc); + return cmd; +} + +extern struct nrs_core nrs_core; +#define LPROCFS_WR_NRS_TBF_MAX_CMD (4096) +static int ptlrpc_lprocfs_wr_nrs_tbf_rule(struct file *file, + const char *buffer, + unsigned long count, void *data) +{ + struct ptlrpc_service *svc = data; + char *kernbuf; + char *val; + int rc; + static struct nrs_tbf_cmd *cmd; + enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH; + unsigned long length; + char *token; + + OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD); + if (kernbuf == NULL) + GOTO(out, rc = -ENOMEM); + + if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1) + GOTO(out_free_kernbuff, rc = -EINVAL); + + if (copy_from_user(kernbuf, buffer, count)) + GOTO(out_free_kernbuff, rc = -EFAULT); + + val = kernbuf; + token = strsep(&val, " "); + if (val == NULL) + GOTO(out_free_kernbuff, rc = -EINVAL); + + if (strcmp(token, "reg") == 0) { + queue = PTLRPC_NRS_QUEUE_REG; + } else if (strcmp(token, "hp") == 0) { + queue = PTLRPC_NRS_QUEUE_HP; + } else { + kernbuf[strlen(token)] = ' '; + val = kernbuf; + } + length = strlen(val); + + if (length == 0) + GOTO(out_free_kernbuff, rc = -EINVAL); + + if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc)) + GOTO(out_free_kernbuff, rc = -ENODEV); + else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc)) + queue = PTLRPC_NRS_QUEUE_REG; + + cmd = nrs_tbf_parse_cmd(val, length); + if (IS_ERR(cmd)) + GOTO(out_free_kernbuff, rc = PTR_ERR(cmd)); + + /** + * Serialize NRS core lprocfs operations with policy registration/ + * unregistration. + */ + mutex_lock(&nrs_core.nrs_mutex); + rc = ptlrpc_nrs_policy_control(svc, queue, + NRS_POL_NAME_TBF, + NRS_CTL_TBF_WR_RULE, + false, cmd); + mutex_unlock(&nrs_core.nrs_mutex); + + nrs_tbf_cmd_fini(cmd); + OBD_FREE_PTR(cmd); +out_free_kernbuff: + OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD); +out: + return rc ? rc : count; +} + + +/** + * Initializes a TBF policy's lprocfs interface for service \a svc + * + * \param[in] svc the service + * + * \retval 0 success + * \retval != 0 error + */ +int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc) +{ + int rc; + struct lprocfs_vars nrs_tbf_lprocfs_vars[] = { + { .name = "nrs_tbf_rule", + .read_fptr = ptlrpc_lprocfs_rd_nrs_tbf_rule, + .write_fptr = ptlrpc_lprocfs_wr_nrs_tbf_rule, + .data = svc }, + { NULL } + }; + + if (svc->srv_procroot == NULL) + return 0; + + rc = lprocfs_add_vars(svc->srv_procroot, nrs_tbf_lprocfs_vars, NULL); + + return rc; +} + +/** + * Cleans up a TBF policy's lprocfs interface for service \a svc + * + * \param[in] svc the service + */ +void nrs_tbf_lprocfs_fini(struct ptlrpc_service *svc) +{ + if (svc->srv_procroot == NULL) + return; + + lprocfs_remove_proc_entry("nrs_tbf_quantum", svc->srv_procroot); +} + +#endif /* LPROCFS */ + +/** + * TBF policy operations + */ +static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = { + .op_policy_start = nrs_tbf_start, + .op_policy_stop = nrs_tbf_stop, + .op_policy_ctl = nrs_tbf_ctl, + .op_res_get = nrs_tbf_res_get, + .op_res_put = nrs_tbf_res_put, + .op_req_get = nrs_tbf_req_get, + .op_req_enqueue = nrs_tbf_req_add, + .op_req_dequeue = nrs_tbf_req_del, + .op_req_stop = nrs_tbf_req_stop, +#ifdef LPROCFS + .op_lprocfs_init = nrs_tbf_lprocfs_init, + .op_lprocfs_fini = nrs_tbf_lprocfs_fini, +#endif +}; + +/** + * TBF policy configuration + */ +struct ptlrpc_nrs_pol_conf nrs_conf_tbf = { + .nc_name = NRS_POL_NAME_TBF, + .nc_ops = &nrs_tbf_ops, + .nc_compat = nrs_policy_compat_all, +}; + +/** @} tbf */ + +/** @} nrs */ + +#endif /* HAVE_SERVER_SUPPORT */ diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index 2cb3a72..d0c58d8 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -139,6 +139,8 @@ ptlrpc_nrs_req_peek_nolock(struct ptlrpc_service_part *svcpt, bool hp) void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req); bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp); +bool ptlrpc_nrs_req_throttling_nolock(struct ptlrpc_service_part *svcpt, + bool hp); int ptlrpc_nrs_policy_control(const struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue, char *name, diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index b26dd07..f6a134b 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -1704,6 +1704,9 @@ static bool ptlrpc_server_allow_high(struct ptlrpc_service_part *svcpt, if (force) return true; + if (ptlrpc_nrs_req_throttling_nolock(svcpt, true)) + return false; + if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL && CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) { /* leave just 1 thread for normal RPCs */ @@ -1754,8 +1757,13 @@ static bool ptlrpc_server_allow_normal(struct ptlrpc_service_part *svcpt, running += 1; } - if (force || - svcpt->scp_nreqs_active < running - 2) + if (force) + return true; + + if (ptlrpc_nrs_req_throttling_nolock(svcpt, false)) + return false; + + if (svcpt->scp_nreqs_active < running - 2) return true; if (svcpt->scp_nreqs_active >= running - 1)