From aedb20f5caf2f2a28e33da70d837415a5082c0af Mon Sep 17 00:00:00 2001 From: Nikitas Angelinas Date: Wed, 20 Jun 2012 10:31:16 +0100 Subject: [PATCH] LU-398 ptlrpc: Add the NRS framework and FIFO policy The Network Request Scheduler (NRS) allows a user to control the way in which RPCs are dispatched from PTLRPC services. This can be used to various effects, most importantly improved fairness across the cluster, better I/O performance under certain workloads, and allowing for QoS semantics to be used at the filesystem level. Central to the operation of NRS are NRS policies, each of which implements handling of RPCs in a different way, aiming to achieve a particular goal. This patch adds the core NRS framework and the NRS FIFO policy, which is a logical wrapper around previous, non-NRS functionality, and is used as the default policy for handling all types of RPCs. Signed-off-by: Nikitas Angelinas Co-authored-by: Liang Zhen Change-Id: I7fcd7885cc89b653d9bc482da533aae2f129bdf9 Oracle-bug-id: b=13634 Xyratex-bug-id: MRP-73 Reviewed-on: http://review.whamcloud.com/4411 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Mike Pershin Reviewed-by: Liang Zhen Reviewed-by: Oleg Drokin --- lustre/include/lustre_net.h | 745 +++++++++++++++- lustre/ldlm/ldlm_lockd.c | 35 +- lustre/ptlrpc/Makefile.in | 2 +- lustre/ptlrpc/autoMakefile.am | 15 +- lustre/ptlrpc/lproc_ptlrpc.c | 330 ++++++- lustre/ptlrpc/nrs.c | 1859 +++++++++++++++++++++++++++++++++++++++ lustre/ptlrpc/nrs_fifo.c | 276 ++++++ lustre/ptlrpc/ptlrpc_internal.h | 120 +++ lustre/ptlrpc/ptlrpc_module.c | 15 +- lustre/ptlrpc/service.c | 186 ++-- 10 files changed, 3438 insertions(+), 145 deletions(-) create mode 100644 lustre/ptlrpc/nrs.c create mode 100644 lustre/ptlrpc/nrs_fifo.c diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index b1e0682..b87cf91 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -502,6 +502,7 @@ struct ptlrpc_set_cbdata { struct ptlrpc_bulk_desc; struct ptlrpc_service_part; +struct ptlrpc_service; /** * ptlrpc callback & work item stuff @@ -626,6 +627,658 @@ struct lu_env; struct ldlm_lock; /** + * \defgroup nrs Network Request Scheduler + * @{ + */ +struct ptlrpc_nrs_policy; +struct ptlrpc_nrs_resource; +struct ptlrpc_nrs_request; + +/** + * NRS control operations. + * + * These are common for all policies. + */ +enum ptlrpc_nrs_ctl { + /** + * Activate the policy. + */ + PTLRPC_NRS_CTL_START, + /** + * Reserved for multiple primary policies, which may be a possibility + * in the future. + */ + PTLRPC_NRS_CTL_STOP, + /** + * Recycle resources for inactive policies. + */ + PTLRPC_NRS_CTL_SHRINK, + /** + * Not a valid opcode. + */ + PTLRPC_NRS_CTL_INVALID, + /** + * Policies can start using opcodes from this value and onwards for + * their own purposes; the assigned value itself is arbitrary. + */ + PTLRPC_NRS_CTL_1ST_POL_SPEC = 0x20, +}; + +/** + * NRS policy operations. + * + * These determine the behaviour of a policy, and are called in response to + * NRS core events. + */ +struct ptlrpc_nrs_pol_ops { + /** + * Called during policy registration; this operation is optional. + * + * \param[in] policy The policy being initialized + */ + int (*op_policy_init) (struct ptlrpc_nrs_policy *policy); + /** + * Called during policy unregistration; this operation is optional. + * + * \param[in] policy The policy being unregistered/finalized + */ + void (*op_policy_fini) (struct ptlrpc_nrs_policy *policy); + /** + * Called when activating a policy via lprocfs; policies allocate and + * initialize their resources here; this operation is optional. + * + * \param[in] policy The policy being started + * + * \see nrs_policy_start_locked() + */ + int (*op_policy_start) (struct ptlrpc_nrs_policy *policy); + /** + * Called when deactivating a policy via lprocfs; policies deallocate + * their resources here; this operation is optional + * + * \param[in] policy The policy being stopped + * + * \see nrs_policy_stop_final() + */ + void (*op_policy_stop) (struct ptlrpc_nrs_policy *policy); + /** + * Used for policy-specific operations; i.e. not generic ones like + * \e PTLRPC_NRS_CTL_START and \e PTLRPC_NRS_CTL_GET_INFO; analogous + * to an ioctl; this operation is optional. + * + * \param[in] policy The policy carrying out operation \a opc + * \param[in] opc The command operation being carried out + * \param[in,out] arg An generic buffer for communication between the + * user and the control operation + * + * \retval -ve error + * \retval 0 success + * + * \see ptlrpc_nrs_policy_control() + */ + int (*op_policy_ctl) (struct ptlrpc_nrs_policy *policy, + enum ptlrpc_nrs_ctl opc, void *arg); + + /** + * Called when obtaining references to the resources of the resource + * hierarchy for a request that has arrived for handling at the PTLRPC + * service. Policies should return -ve for requests they do not wish + * to handle. This operation is mandatory. + * + * \param[in] policy The policy we're getting resources for. + * \param[in] nrq The request we are getting resources for. + * \param[in] parent The parent resource of the resource being + * requested; set to NULL if none. + * \param[out] resp The resource is to be returned here; the + * fallback policy in an NRS head should + * \e always return a non-NULL pointer value. + * \param[in] moving_req When set, signifies that this is an attempt + * to obtain resources for a request being moved + * to the high-priority NRS head by + * ldlm_lock_reorder_req(). + * This implies two things: + * 1. We are under obd_export::exp_rpc_lock and + * so should not sleep. + * 2. We should not perform non-idempotent or can + * skip performing idempotent operations that + * were carried out when resources were first + * taken for the request when it was initialized + * in ptlrpc_nrs_req_initialize(). + * + * \retval 0, +ve The level of the returned resource in the resource + * hierarchy; currently only 0 (for a non-leaf resource) + * and 1 (for a leaf resource) are supported by the + * framework. + * \retval -ve error + * + * \see ptlrpc_nrs_req_initialize() + * \see ptlrpc_nrs_hpreq_add_nolock() + * \see ptlrpc_nrs_req_hp_move() + */ + int (*op_res_get) (struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq, + struct ptlrpc_nrs_resource *parent, + struct ptlrpc_nrs_resource **resp, + bool moving_req); + /** + * Called when releasing references taken for resources in the resource + * hierarchy for the request; this operation is optional. + * + * \param[in] policy The policy the resource belongs to + * \param[in] res The resource to be freed + * + * \see ptlrpc_nrs_req_finalize() + * \see ptlrpc_nrs_hpreq_add_nolock() + * \see ptlrpc_nrs_req_hp_move() + */ + void (*op_res_put) (struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_resource *res); + + /** + * Obtain a request for handling from the policy via polling; this + * operation is mandatory. + * + * \param[in] policy The policy to poll + * + * \retval NULL No erquest available for handling + * \retval valid-pointer The request polled for handling + * + * \see ptlrpc_nrs_req_poll_nolock() + */ + struct ptlrpc_nrs_request * + (*op_req_poll) (struct ptlrpc_nrs_policy *policy); + /** + * Called when attempting to add a request to a policy for later + * handling; this operation is mandatory. + * + * \param[in] policy The policy on which to enqueue \a nrq + * \param[in] nrq The request to enqueue + * + * \retval 0 success + * \retval != 0 error + * + * \see ptlrpc_nrs_req_add_nolock() + */ + int (*op_req_enqueue) (struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq); + /** + * Removes a request from the policy's set of pending requests. Normally + * called after a request has been polled successfully from the policy + * for handling; this operation is mandatory. + * + * \param[in] policy The policy the request \a nrq belongs to + * \param[in] nrq The request to dequeue + * + * \see ptlrpc_nrs_req_del_nolock() + */ + void (*op_req_dequeue) (struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq); + /** + * Called before carrying out the request; should not block. Could be + * used for job/resource control; this operation is optional. + * + * \param[in] policy The policy which is starting to handle request + * \a nrq + * \param[in] nrq The request + * + * \pre spin_is_locked(&svcpt->scp_req_lock) + * + * \see ptlrpc_nrs_req_start_nolock() + */ + void (*op_req_start) (struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq); + /** + * Called after the request being carried out. Could be used for + * job/resource control; this operation is optional. + * + * \param[in] policy The policy which is stopping to handle request + * \a nrq + * \param[in] nrq The request + * + * \pre spin_is_locked(&svcpt->scp_req_lock) + * + * \see ptlrpc_nrs_req_stop_nolock() + */ + void (*op_req_stop) (struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq); + /** + * Registers the policy's lprocfs interface with a PTLRPC service. + * + * \param[in] svc The service + * + * \retval 0 success + * \retval != 0 error + */ + int (*op_lprocfs_init) (struct ptlrpc_service *svc); + /** + * Unegisters the policy's lprocfs interface with a PTLRPC service. + * + * \param[in] svc The service + */ + void (*op_lprocfs_fini) (struct ptlrpc_service *svc); +}; + +/** + * Policy flags + */ +enum nrs_policy_flags { + /** + * Fallback policy, use this flag only on a single supported policy per + * service. Do not use this flag for policies registering using + * ptlrpc_nrs_policy_register() (i.e. ones that are not in + * \e nrs_pols_builtin). + */ + PTLRPC_NRS_FL_FALLBACK = (1 << 0), + /** + * Start policy immediately after registering. + */ + PTLRPC_NRS_FL_REG_START = (1 << 1), + /** + * This is a polciy registering externally with NRS core, via + * ptlrpc_nrs_policy_register(), (i.e. one that is not in + * \e nrs_pols_builtin. Used to avoid ptlrpc_nrs_policy_register() + * racing with a policy start operation issued by the user via lprocfs. + */ + PTLRPC_NRS_FL_REG_EXTERN = (1 << 2), +}; + +/** + * NRS queue type. + * + * Denotes whether an NRS instance is for handling normal or high-priority + * RPCs, or whether an operation pertains to one or both of the NRS instances + * in a service. + */ +enum ptlrpc_nrs_queue_type { + PTLRPC_NRS_QUEUE_REG, + PTLRPC_NRS_QUEUE_HP, + PTLRPC_NRS_QUEUE_BOTH, +}; + +/** + * NRS head + * + * A PTLRPC service has at least one NRS head instance for handling normal + * priority RPCs, and may optionally have a second NRS head instance for + * handling high-priority RPCs. Each NRS head maintains a list of available + * policies, of which one and only one policy is acting as the fallback policy, + * and optionally a different policy may be acting as the primary policy. For + * all RPCs handled by this NRS head instance, NRS core will first attempt to + * enqueue the RPC using the primary policy (if any). The fallback policy is + * used in the following cases: + * - when there was no primary policy in the + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED state at the time the request + * was initialized. + * - when the primary policy that was at the + * ptlrpc_nrs_pol_state::PTLRPC_NRS_POL_STATE_STARTED state at the time the + * RPC was initialized, denoted it did not wish, or for some other reason was + * not able to handle the request, by returning a non-valid NRS resource + * reference. + * - when the primary policy that was at the + * ptlrpc_nrs_pol_state::PTLRPC_NRS_POL_STATE_STARTED state at the time the + * RPC was initialized, fails later during the request enqueueing stage. + * + * \see nrs_resource_get_safe() + * \see nrs_request_enqueue() + */ +struct ptlrpc_nrs { + spinlock_t nrs_lock; + /** XXX Possibly replace svcpt->scp_req_lock with another lock here. */ + /** + * Linkage into nrs_core_heads_list + */ + cfs_list_t nrs_heads; + /** + * List of registered policies + */ + cfs_list_t nrs_policy_list; + /** + * List of policies with queued requests. Policies that have any + * outstanding requests are queued here, and this list is queried + * in a round-robin manner from NRS core when obtaining a request + * for handling. This ensures that requests from policies that at some + * point transition away from the + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED state are drained. + */ + cfs_list_t nrs_policy_queued; + /** + * Service partition for this NRS head + */ + struct ptlrpc_service_part *nrs_svcpt; + /** + * Primary policy, which is the preferred policy for handling RPCs + */ + struct ptlrpc_nrs_policy *nrs_policy_primary; + /** + * Fallback policy, which is the backup policy for handling RPCs + */ + struct ptlrpc_nrs_policy *nrs_policy_fallback; + /** + * This NRS head handles either HP or regular requests + */ + enum ptlrpc_nrs_queue_type nrs_queue_type; + /** + * # queued requests from all policies in this NRS head + */ + unsigned long nrs_req_queued; + /** + * # scheduled requests from all policies in this NRS head + */ + unsigned long nrs_req_started; + /** + * # policies on this NRS + * TODO: Can we avoid having this? + */ + unsigned nrs_num_pols; + /** + * This NRS head is in progress of starting a policy + */ + unsigned nrs_policy_starting:1; + /** + * In progress of shutting down the whole NRS head; used during + * unregistration + */ + unsigned nrs_stopping:1; +}; + +#define NRS_POL_NAME_MAX 16 + +/** + * NRS policy registering descriptor + * + * Is used to hold a description of a policy that can be passed to NRS core in + * order to register the policy with NRS heads in different PTLRPC services. + */ +struct ptlrpc_nrs_pol_desc { + /** + * Human-readable policy name + */ + char pd_name[NRS_POL_NAME_MAX]; + /** + * NRS operations for this policy + */ + struct ptlrpc_nrs_pol_ops *pd_ops; + /** + * Service Compatibility function; this determines whether a policy is + * adequate for handling RPCs of a particular PTLRPC service. + * + * XXX:This should give the same result during policy + * registration and unregistration, and for all partitions of a + * service; so the result should not depend on temporal service + * or other properties, that may influence the result. + */ + bool (*pd_compat) (struct ptlrpc_service *svc, + const struct ptlrpc_nrs_pol_desc *desc); + /** + * Optionally set for policies that support a single ptlrpc service, + * i.e. ones that have \a pd_compat set to nrs_policy_compat_one() + */ + char *pd_compat_svc_name; + /** + * Bitmask of nrs_policy_flags + */ + unsigned pd_flags; + /** + * Link into nrs_core::nrs_policies + */ + cfs_list_t pd_list; +}; + +/** + * NRS policy state + * + * Policies transition from one state to the other during their lifetime + */ +enum ptlrpc_nrs_pol_state { + /** + * Not a valid policy state. + */ + NRS_POL_STATE_INVALID, + /** + * For now, this state is used exclusively for policies that register + * externally to NRS core, i.e. ones that do so via + * ptlrpc_nrs_policy_register() and are not part of nrs_pols_builtin; + * it is used to prevent a race condition between the policy registering + * with more than one service partition while service is operational, + * and the user starting the policy via lprocfs. + * + * \see nrs_pol_make_avail() + */ + NRS_POL_STATE_UNAVAIL, + /** + * Policies are at this state either at the start of their life, or + * transition here when the user selects a different policy to act + * as the primary one. + */ + NRS_POL_STATE_STOPPED, + /** + * Policy is progress of stopping + */ + NRS_POL_STATE_STOPPING, + /** + * Policy is in progress of starting + */ + NRS_POL_STATE_STARTING, + /** + * A policy is in this state in two cases: + * - it is the fallback policy, which is always in this state. + * - it has been activated by the user; i.e. it is the primary policy, + */ + NRS_POL_STATE_STARTED, +}; + +/** + * NRS policy information + * + * Used for obtaining information for the status of a policy via lprocfs + */ +struct ptlrpc_nrs_pol_info { + /** + * Policy name + */ + char pi_name[NRS_POL_NAME_MAX]; + /** + * Current policy state + */ + enum ptlrpc_nrs_pol_state pi_state; + /** + * # RPCs enqueued for later dispatching by the policy + */ + long pi_req_queued; + /** + * # RPCs started for dispatch by the policy + */ + long pi_req_started; + /** + * Is this a fallback policy? + */ + unsigned pi_fallback:1; +}; + +/** + * NRS policy + * + * There is one instance of this for each policy in each NRS head of each + * PTLRPC service partition. + */ +struct ptlrpc_nrs_policy { + /** + * Linkage into the NRS head's list of policies, + * ptlrpc_nrs:nrs_policy_list + */ + cfs_list_t pol_list; + /** + * Linkage into the NRS head's list of policies with enqueued + * requests ptlrpc_nrs:nrs_policy_queued + */ + cfs_list_t pol_list_queued; + /** + * Current state of this policy + */ + enum ptlrpc_nrs_pol_state pol_state; + /** + * Bitmask of nrs_policy_flags + */ + unsigned pol_flags; + /** + * # RPCs enqueued for later dispatching by the policy + */ + long pol_req_queued; + /** + * # RPCs started for dispatch by the policy + */ + long pol_req_started; + /** + * Usage Reference count taken on the policy instance + */ + long pol_ref; + /** + * The NRS head this policy has been created at + */ + struct ptlrpc_nrs *pol_nrs; + /** + * NRS operations for this policy; points to ptlrpc_nrs_pol_desc::pd_ops + */ + struct ptlrpc_nrs_pol_ops *pol_ops; + /** + * Private policy data; varies by policy type + */ + void *pol_private; + /** + * Human-readable policy name; point to ptlrpc_nrs_pol_desc::pd_name + */ + char *pol_name; +}; + +/** + * NRS resource + * + * Resources are embedded into two types of NRS entities: + * - Inside NRS policies, in the policy's private data in + * ptlrpc_nrs_policy::pol_private + * - In objects that act as prime-level scheduling entities in different NRS + * policies; e.g. on a policy that performs round robin or similar order + * scheduling across client NIDs, there would be one NRS resource per unique + * client NID. On a policy which performs round robin scheduling across + * backend filesystem objects, there would be one resource associated with + * each of the backend filesystem objects partaking in the scheduling + * performed by the policy. + * + * NRS resources share a parent-child relationship, in which resources embedded + * in policy instances are the parent entities, with all scheduling entities + * a policy schedules across being the children, thus forming a simple resource + * hierarchy. This hierarchy may be extended with one or more levels in the + * future if the ability to have more than one primary policy is added. + * + * Upon request initialization, references to the then active NRS policies are + * taken and used to later handle the dispatching of the request with one of + * these policies. + * + * \see nrs_resource_get_safe() + * \see ptlrpc_nrs_req_add() + */ +struct ptlrpc_nrs_resource { + /** + * This NRS resource's parent; is NULL for resources embedded in NRS + * policy instances; i.e. those are top-level ones. + */ + struct ptlrpc_nrs_resource *res_parent; + /** + * The policy associated with this resource. + */ + struct ptlrpc_nrs_policy *res_policy; +}; + +enum { + NRS_RES_FALLBACK, + NRS_RES_PRIMARY, + NRS_RES_MAX +}; + +/* \name fifo + * + * FIFO policy + * + * This policy is a logical wrapper around previous, non-NRS functionality. + * It dispatches RPCs in the same order as they arrive from the network. This + * policy is currently used as the fallback policy, and the only enabled policy + * on all NRS heads of all PTLRPC service partitions. + * @{ + */ + +/** + * Private data structure for the FIFO policy + */ +struct nrs_fifo_head { + /** + * Resource object for policy instance. + */ + struct ptlrpc_nrs_resource fh_res; + /** + * List of queued requests. + */ + cfs_list_t fh_list; + /** + * For debugging purposes. + */ + __u64 fh_sequence; +}; + +struct nrs_fifo_req { + /** request header, must be the first member of structure */ + cfs_list_t fr_list; + __u64 fr_sequence; +}; + +/** @} fifo */ + +/** + * NRS request + * + * Instances of this object exist embedded within ptlrpc_request; the main + * purpose of this object is to hold references to the request's resources + * for the lifetime of the request, and to hold properties that policies use + * use for determining the request's scheduling priority. + * */ +struct ptlrpc_nrs_request { + /** + * The request's resource hierarchy. + */ + struct ptlrpc_nrs_resource *nr_res_ptrs[NRS_RES_MAX]; + /** + * Index into ptlrpc_nrs_request::nr_res_ptrs of the resource of the + * policy that was used to enqueue the request. + * + * \see nrs_request_enqueue() + */ + unsigned nr_res_idx; + unsigned nr_initialized:1; + unsigned nr_enqueued:1; + unsigned nr_dequeued:1; + unsigned nr_started:1; + unsigned nr_finalized:1; + cfs_binheap_node_t nr_node; + + /** + * Policy-specific fields, used for determining a request's scheduling + * priority, and other supporting functionality. + */ + union { + /** + * Fields for the FIFO policy + */ + struct nrs_fifo_req fifo; + } nr_u; + /** + * Externally-registering policies may want to use this to allocate + * their own request properties. + */ + void *ext; +}; + +/** @} nrs */ + +/** * Basic request prioritization operations structure. * The whole idea is centered around locks and RPCs that might affect locks. * When a lock is contended we try to give priority to RPCs that might lead @@ -686,6 +1339,12 @@ struct ptlrpc_request { /** history sequence # */ __u64 rq_history_seq; + /** \addtogroup nrs + * @{ + */ + /** stub for NRS request */ + struct ptlrpc_nrs_request rq_nrq; + /** @} nrs */ /** the index of service's srv_at_array into which request is linked */ time_t rq_at_index; /** Lock to protect request flags and some other important bits, like @@ -927,6 +1586,36 @@ static inline int ptlrpc_req_interpret(const struct lu_env *env, return rc; } +/** \addtogroup nrs + * @{ + */ +int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_desc *desc); +int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_desc *desc); +void ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req); +void nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_pol_info *info); + +/* + * Can the request be moved from the regular NRS head to the high-priority NRS + * head (of the same PTLRPC service partition), if any? + * + * For a reliable result, this should be checked under svcpt->scp_req lock. + */ +static inline bool +ptlrpc_nrs_req_can_move(struct ptlrpc_request *req) +{ + struct ptlrpc_nrs_request *nrq = &req->rq_nrq; + + /** + * LU-898: Check ptlrpc_nrs_request::nr_enqueued to make sure the + * request has been enqueued first, and ptlrpc_nrs_request::nr_started + * to make sure it has not been scheduled yet (analogous to previous + * (non-NRS) checking of !list_empty(&ptlrpc_request::rq_list). + */ + return nrq->nr_enqueued && !nrq->nr_started && !req->rq_hp; +} +/** @} nrs */ + /** * Returns 1 if request buffer at offset \a index was already swabbed */ @@ -1463,11 +2152,7 @@ struct ptlrpc_service_part { * sent to this portal */ spinlock_t scp_req_lock __cfs_cacheline_aligned; - /** # reqs in either of the queues below */ - /** reqs waiting for service */ - cfs_list_t scp_req_pending; - /** high priority queue */ - cfs_list_t scp_hreq_pending; + /** # reqs in either of the NRS heads below */ /** # reqs being served */ int scp_nreqs_active; /** # HPreqs being served */ @@ -1475,6 +2160,12 @@ struct ptlrpc_service_part { /** # hp requests handled */ int scp_hreq_count; + /** NRS head for regular requests */ + struct ptlrpc_nrs scp_nrs_reg; + /** NRS head for HP requests; this is only valid for services that can + * handle HP requests */ + struct ptlrpc_nrs *scp_nrs_hp; + /** AT stuff */ /** @{ */ /** @@ -1614,6 +2305,49 @@ enum ptlrpcd_ctl_flags { LIOD_BIND = 1 << 4, }; +/** + * \addtogroup nrs + * @{ + * + * Service compatibility function; policy is compatible with all services. + * + * \param[in] svc The service the policy is attempting to register with. + * \param[in] desc The policy descriptor + * + * \retval true The policy is compatible with the NRS head + * + * \see ptlrpc_nrs_pol_desc::pd_compat() + */ +static inline bool +nrs_policy_compat_all(struct ptlrpc_service *svc, + const struct ptlrpc_nrs_pol_desc *desc) +{ + return true; +} + +/** + * Service compatibility function; policy is compatible with only a specific + * service which is identified by its human-readable name at + * ptlrpc_service::srv_name. + * + * \param[in] svc The service the policy is attempting to register with. + * \param[in] desc The policy descriptor + * + * \retval false The policy is not compatible with the NRS head + * \retval true The policy is compatible with the NRS head + * + * \see ptlrpc_nrs_pol_desc::pd_compat() + */ +static inline bool +nrs_policy_compat_one(struct ptlrpc_service *svc, + const struct ptlrpc_nrs_pol_desc *desc) +{ + LASSERT(desc->pd_compat_svc_name != NULL); + return strcmp(svc->srv_name, desc->pd_compat_svc_name) == 0; +} + +/** @} nrs */ + /* ptlrpc/events.c */ extern lnet_handle_eq_t ptlrpc_eq_h; extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid, @@ -1894,7 +2628,6 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service); int liblustre_check_services(void *arg); void ptlrpc_daemonize(char *name); int ptlrpc_service_health_check(struct ptlrpc_service *); -void ptlrpc_hpreq_reorder(struct ptlrpc_request *req); void ptlrpc_server_drop_request(struct ptlrpc_request *req); #ifdef __KERNEL__ diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 464a19a..4010c45 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -797,25 +797,28 @@ static inline int ldlm_ast_fini(struct ptlrpc_request *req, */ static void ldlm_lock_reorder_req(struct ldlm_lock *lock) { - struct ptlrpc_request *req; - ENTRY; + struct ptlrpc_request *req; + ENTRY; - if (lock->l_export == NULL) { - LDLM_DEBUG(lock, "client lock: no-op"); - RETURN_EXIT; - } + if (lock->l_export == NULL) { + LDLM_DEBUG(lock, "client lock: no-op"); + RETURN_EXIT; + } spin_lock_bh(&lock->l_export->exp_rpc_lock); - cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs, - rq_exp_list) { - /* Do not process requests that were not yet added to there - * incoming queue or were already removed from there for - * processing */ - if (!req->rq_hp && !cfs_list_empty(&req->rq_list) && - req->rq_ops->hpreq_lock_match && - req->rq_ops->hpreq_lock_match(req, lock)) - ptlrpc_hpreq_reorder(req); - } + cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs, + rq_exp_list) { + /* Do not process requests that were not yet added to there + * incoming queue or were already removed from there for + * processing. We evaluate ptlrpc_request_reorderable() without + * holding svcpt->scp_req_lock, and then redo the checks with + * the lock held once we need to obtain a reliable result. + */ + if (ptlrpc_nrs_req_can_move(req) && + req->rq_ops->hpreq_lock_match && + req->rq_ops->hpreq_lock_match(req, lock)) + ptlrpc_nrs_req_hp_move(req); + } spin_unlock_bh(&lock->l_export->exp_rpc_lock); EXIT; } diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index 86da7be..0ea5fbf 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -14,7 +14,7 @@ ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o ptlrpc_objs += sec.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o -ptlrpc_objs += sec_null.o sec_plain.o +ptlrpc_objs += sec_null.o sec_plain.o nrs.o nrs_fifo.o target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o diff --git a/lustre/ptlrpc/autoMakefile.am b/lustre/ptlrpc/autoMakefile.am index 44b9ce1..2b1d88a 100644 --- a/lustre/ptlrpc/autoMakefile.am +++ b/lustre/ptlrpc/autoMakefile.am @@ -47,14 +47,15 @@ LDLM_COMM_SOURCES= $(top_srcdir)/lustre/ldlm/l_lock.c \ $(top_srcdir)/lustre/ldlm/ldlm_lockd.c \ $(top_srcdir)/lustre/ldlm/ldlm_internal.h \ $(top_srcdir)/lustre/ldlm/ldlm_inodebits.c \ - $(top_srcdir)/lustre/ldlm/ldlm_flock.c \ + $(top_srcdir)/lustre/ldlm/ldlm_flock.c \ $(top_srcdir)/lustre/ldlm/ldlm_pool.c -COMMON_SOURCES = client.c recover.c connection.c niobuf.c pack_generic.c \ - events.c ptlrpc_module.c service.c pinger.c recov_thread.c llog_net.c \ - llog_client.c llog_server.c import.c ptlrpcd.c pers.c wiretest.c \ - ptlrpc_internal.h layout.c sec.c sec_bulk.c sec_gc.c sec_config.c \ - sec_lproc.c sec_null.c sec_plain.c lproc_ptlrpc.c $(LDLM_COMM_SOURCES) +COMMON_SOURCES = client.c recover.c connection.c niobuf.c pack_generic.c \ + events.c ptlrpc_module.c service.c pinger.c recov_thread.c llog_net.c \ + llog_client.c llog_server.c import.c ptlrpcd.c pers.c wiretest.c \ + ptlrpc_internal.h layout.c sec.c sec_bulk.c sec_gc.c sec_config.c \ + sec_lproc.c sec_null.c sec_plain.c lproc_ptlrpc.c nrs.c nrs_fifo.c \ + $(LDLM_COMM_SOURCES) if LIBLUSTRE @@ -92,6 +93,8 @@ ptlrpc_SOURCES = \ recover.c \ recov_thread.c \ service.c \ + nrs.c \ + nrs_fifo.c \ wiretest.c \ sec.c \ sec_bulk.c \ diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index 5596213..69ef3bf 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -405,6 +405,330 @@ ptlrpc_lprocfs_wr_threads_max(struct file *file, const char *buffer, return count; } +/** + * \addtogoup nrs + * @{ + */ +extern struct nrs_core nrs_core; + +/** + * Translates \e ptlrpc_nrs_pol_state values to human-readable strings. + * + * \param[in] state The policy state + */ +static const char * +nrs_state2str(enum ptlrpc_nrs_pol_state state) +{ + switch (state) { + default: + LBUG(); + case NRS_POL_STATE_INVALID: + return "invalid"; + case NRS_POL_STATE_UNAVAIL: + return "unavail"; + case NRS_POL_STATE_STOPPED: + return "stopped"; + case NRS_POL_STATE_STOPPING: + return "stopping"; + case NRS_POL_STATE_STARTING: + return "starting"; + case NRS_POL_STATE_STARTED: + return "started"; + } +} + +/** + * Obtains status information for \a policy. + * + * Information is copied in \a info. + * + * \param[in] policy The policy + * \param[out] info Holds returned status information + */ +void +nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_pol_info *info) +{ + LASSERT(policy != NULL); + LASSERT(info != NULL); + LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock)); + + memcpy(info->pi_name, policy->pol_name, NRS_POL_NAME_MAX); + + info->pi_fallback = !!(policy->pol_flags & PTLRPC_NRS_FL_FALLBACK); + info->pi_state = policy->pol_state; + /** + * XXX: These are accessed without holding + * ptlrpc_service_part::scp_req_lock. + */ + info->pi_req_queued = policy->pol_req_queued; + info->pi_req_started = policy->pol_req_started; +} + +/** + * Reads and prints policy status information for all policies of a PTLRPC + * service. + */ +static int +ptlrpc_lprocfs_rd_nrs(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct ptlrpc_service *svc = data; + struct ptlrpc_service_part *svcpt; + struct ptlrpc_nrs *nrs; + struct ptlrpc_nrs_policy *policy; + struct ptlrpc_nrs_pol_info *infos; + struct ptlrpc_nrs_pol_info tmp; + unsigned num_pols; + unsigned pol_idx = 0; + bool hp = false; + int i; + int rc = 0; + int rc2 = 0; + ENTRY; + + /** + * Serialize NRS core lprocfs operations with policy registration/ + * unregistration. + */ + mutex_lock(&nrs_core.nrs_mutex); + + /** + * Use the first service partition's regular NRS head in order to obtain + * the number of policies registered with NRS heads of this service. All + * service partitions will have the same number of policies. + */ + nrs = nrs_svcpt2nrs(svc->srv_parts[0], false); + + spin_lock(&nrs->nrs_lock); + num_pols = svc->srv_parts[0]->scp_nrs_reg.nrs_num_pols; + spin_unlock(&nrs->nrs_lock); + + OBD_ALLOC(infos, num_pols * sizeof(*infos)); + if (infos == NULL) + GOTO(out, rc = -ENOMEM); +again: + + ptlrpc_service_for_each_part(svcpt, i, svc) { + nrs = nrs_svcpt2nrs(svcpt, hp); + spin_lock(&nrs->nrs_lock); + + pol_idx = 0; + + cfs_list_for_each_entry(policy, &nrs->nrs_policy_list, + pol_list) { + LASSERT(pol_idx < num_pols); + + nrs_policy_get_info_locked(policy, &tmp); + /** + * Copy values when handling the first service + * partition. + */ + if (i == 0) { + memcpy(infos[pol_idx].pi_name, tmp.pi_name, + NRS_POL_NAME_MAX); + memcpy(&infos[pol_idx].pi_state, &tmp.pi_state, + sizeof(tmp.pi_state)); + infos[pol_idx].pi_fallback = tmp.pi_fallback; + /** + * For the rest of the service partitions + * sanity-check the values we get. + */ + } else { + LASSERT(strncmp(infos[pol_idx].pi_name, + tmp.pi_name, + NRS_POL_NAME_MAX) == 0); + /** + * Not asserting ptlrpc_nrs_pol_info::pi_state, + * because it may be different between + * instances of the same policy in different + * service partitions. + */ + LASSERT(infos[pol_idx].pi_fallback == + tmp.pi_fallback); + } + + infos[pol_idx].pi_req_queued += tmp.pi_req_queued; + infos[pol_idx].pi_req_started += tmp.pi_req_started; + + pol_idx++; + } + spin_unlock(&nrs->nrs_lock); + } + + /** + * Policy status information output is in YAML format. + * For example: + * + * regular_requests: + * - name: fifo + * state: started + * fallback: yes + * queued: 0 + * active: 0 + * + * - name: crrn + * state: started + * fallback: no + * queued: 2015 + * active: 384 + * + * high_priority_requests: + * - name: fifo + * state: started + * fallback: yes + * queued: 0 + * active: 2 + * + * - name: crrn + * state: stopped + * fallback: no + * queued: 0 + * active: 0 + */ + rc2 = snprintf(page + rc, count - rc, + "%s\n", !hp ? + "\nregular_requests:" : + "high_priority_requests:"); + + if (rc2 >= count - rc) { + /** Output was truncated */ + GOTO(out, rc = -EFBIG); + } + + rc += rc2; + + for (pol_idx = 0; pol_idx < num_pols; pol_idx++) { + rc2 = snprintf(page + rc, count - rc, + " - name: %s\n" + " state: %s\n" + " fallback: %s\n" + " queued: %-20d\n" + " active: %-20d\n\n", + infos[pol_idx].pi_name, + nrs_state2str(infos[pol_idx].pi_state), + infos[pol_idx].pi_fallback ? "yes" : "no", + (int)infos[pol_idx].pi_req_queued, + (int)infos[pol_idx].pi_req_started); + + + if (rc2 >= count - rc) { + /** Output was truncated */ + GOTO(out, rc = -EFBIG); + } + + rc += rc2; + } + + if (!hp && nrs_svc_has_hp(svc)) { + memset(infos, 0, num_pols * sizeof(*infos)); + + /** + * Redo the processing for the service's HP NRS heads' policies. + */ + hp = true; + goto again; + } + + *eof = 1; + +out: + if (infos) + OBD_FREE(infos, num_pols * sizeof(*infos)); + + mutex_unlock(&nrs_core.nrs_mutex); + + RETURN(rc); +} + +/** + * The longest valid command string is the maxium policy name size, plus the + * length of the " reg" substring + */ +#define LPROCFS_NRS_WR_MAX_CMD (NRS_POL_NAME_MAX + sizeof(" reg") - 1) + +/** + * Starts and stops a given policy on a PTLRPC service. + * + * Commands consist of the policy name, followed by an optional [reg|hp] token; + * if the optional token is omitted, the operation is performed on both the + * regular and high-priority (if the service has one) NRS head. + */ +static int +ptlrpc_lprocfs_wr_nrs(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct ptlrpc_service *svc = data; + enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH; + char *cmd; + char *cmd_copy = NULL; + char *token; + int rc = 0; + ENTRY; + + if (count >= LPROCFS_NRS_WR_MAX_CMD) + GOTO(out, rc = -EINVAL); + + OBD_ALLOC(cmd, LPROCFS_NRS_WR_MAX_CMD); + if (cmd == NULL) + GOTO(out, rc = -ENOMEM); + /** + * strsep() modifies its argument, so keep a copy + */ + cmd_copy = cmd; + + if (cfs_copy_from_user(cmd, buffer, count)) + GOTO(out, rc = -EFAULT); + + cmd[count] = '\0'; + + token = strsep(&cmd, " "); + + if (strlen(token) > NRS_POL_NAME_MAX - 1) + GOTO(out, rc = -EINVAL); + + /** + * No [reg|hp] token has been specified + */ + if (cmd == NULL) + goto default_queue; + + /** + * The second token is either NULL, or an optional [reg|hp] string + */ + if (strcmp(cmd, "reg") == 0) + queue = PTLRPC_NRS_QUEUE_REG; + else if (strcmp(cmd, "hp") == 0) + queue = PTLRPC_NRS_QUEUE_HP; + else + GOTO(out, rc = -EINVAL); + +default_queue: + + if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc)) + GOTO(out, rc = -ENODEV); + else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc)) + queue = PTLRPC_NRS_QUEUE_REG; + + /** + * Serialize NRS core lprocfs operations with policy registration/ + * unregistration. + */ + mutex_lock(&nrs_core.nrs_mutex); + + rc = ptlrpc_nrs_policy_control(svc, queue, token, PTLRPC_NRS_CTL_START, + false, NULL); + + mutex_unlock(&nrs_core.nrs_mutex); +out: + if (cmd_copy) + OBD_FREE(cmd_copy, LPROCFS_NRS_WR_MAX_CMD); + + RETURN(rc < 0 ? rc : count); +} + +/** @} nrs */ + struct ptlrpc_srh_iterator { int srhi_idx; __u64 srhi_seq; @@ -792,7 +1116,11 @@ void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry, {.name = "timeouts", .read_fptr = ptlrpc_lprocfs_rd_timeouts, .data = svc}, - {NULL} + {.name = "nrs_policies", + .read_fptr = ptlrpc_lprocfs_rd_nrs, + .write_fptr = ptlrpc_lprocfs_wr_nrs, + .data = svc}, + {NULL} }; static struct file_operations req_history_fops = { .owner = THIS_MODULE, diff --git a/lustre/ptlrpc/nrs.c b/lustre/ptlrpc/nrs.c new file mode 100644 index 0000000..31c1dc2 --- /dev/null +++ b/lustre/ptlrpc/nrs.c @@ -0,0 +1,1859 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. A copy is + * included in the COPYING file that accompanied this code. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * GPL HEADER END + */ +/* + * Copyright (c) 2011 Intel Corporation + * + * Copyright 2012 Xyratex Technology Limited + */ +/* + * lustre/ptlrpc/nrs.c + * + * Network Request Scheduler (NRS) + * + * Allows to reorder the handling of RPCs at servers. + * + * Author: Liang Zhen + * Author: Nikitas Angelinas + */ +/** + * \addtogoup nrs + * @{ + */ + +#define DEBUG_SUBSYSTEM S_RPC +#ifndef __KERNEL__ +#include +#endif +#include +#include +#include +#include +#include +#include "ptlrpc_internal.h" + +/* XXX: This is just for liblustre. Remove the #if defined directive when the + * "cfs_" prefix is dropped from cfs_list_head. */ +#if defined (__linux__) && defined(__KERNEL__) +extern struct list_head ptlrpc_all_services; +#else +extern struct cfs_list_head ptlrpc_all_services; +#endif + +/** + * NRS core object. + */ +struct nrs_core nrs_core; + +static int +nrs_policy_init(struct ptlrpc_nrs_policy *policy) +{ + return policy->pol_ops->op_policy_init != NULL ? + policy->pol_ops->op_policy_init(policy) : 0; +} + +static void +nrs_policy_fini(struct ptlrpc_nrs_policy *policy) +{ + LASSERT(policy->pol_ref == 0); + LASSERT(policy->pol_req_queued == 0); + + if (policy->pol_ops->op_policy_fini != NULL) + policy->pol_ops->op_policy_fini(policy); +} + +static int +nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc, + void *arg) +{ + return policy->pol_ops->op_policy_ctl != NULL ? + policy->pol_ops->op_policy_ctl(policy, opc, arg) : -ENOSYS; +} + +static void +nrs_policy_stop0(struct ptlrpc_nrs_policy *policy) +{ + struct ptlrpc_nrs *nrs = policy->pol_nrs; + ENTRY; + + if (policy->pol_ops->op_policy_stop != NULL) { + spin_unlock(&nrs->nrs_lock); + + policy->pol_ops->op_policy_stop(policy); + + spin_lock(&nrs->nrs_lock); + } + + LASSERT(cfs_list_empty(&policy->pol_list_queued)); + LASSERT(policy->pol_req_queued == 0 && + policy->pol_req_started == 0); + + policy->pol_private = NULL; + + policy->pol_state = NRS_POL_STATE_STOPPED; + EXIT; +} + +static int +nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy) +{ + struct ptlrpc_nrs *nrs = policy->pol_nrs; + ENTRY; + + if (nrs->nrs_policy_fallback == policy && !nrs->nrs_stopping) + RETURN(-EPERM); + + if (policy->pol_state == NRS_POL_STATE_STARTING) + RETURN(-EAGAIN); + + /* In progress or already stopped */ + if (policy->pol_state != NRS_POL_STATE_STARTED) + RETURN(0); + + policy->pol_state = NRS_POL_STATE_STOPPING; + + /* Immediately make it invisible */ + if (nrs->nrs_policy_primary == policy) { + nrs->nrs_policy_primary = NULL; + + } else { + LASSERT(nrs->nrs_policy_fallback == policy); + nrs->nrs_policy_fallback = NULL; + } + + /* I have the only refcount */ + if (policy->pol_ref == 1) + nrs_policy_stop0(policy); + + RETURN(0); +} + +/** + * Transitions the \a nrs NRS head's primary policy to + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING and if the policy has no + * pending usage references, to ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED. + * + * \param[in] nrs The NRS head to carry out this operation on + */ +static void +nrs_policy_stop_primary(struct ptlrpc_nrs *nrs) +{ + struct ptlrpc_nrs_policy *tmp = nrs->nrs_policy_primary; + ENTRY; + + if (tmp == NULL) { + /** + * XXX: This should really be RETURN_EXIT, but the latter does + * not currently print anything out, and possibly should be + * fixed to do so. + */ + EXIT; + return; + } + + nrs->nrs_policy_primary = NULL; + + LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED); + tmp->pol_state = NRS_POL_STATE_STOPPING; + + if (tmp->pol_ref == 0) + nrs_policy_stop0(tmp); + EXIT; +} + +/** + * Transitions a policy across the ptlrpc_nrs_pol_state range of values, in + * response to an lprocfs command to start a policy. + * + * If a primary policy different to the current one is specified, this function + * will transition the new policy to the + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTING and then to + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED, and will then transition + * the old primary policy (if there is one) to + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding + * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. + * + * If the fallback policy is specified, this is taken to indicate an instruction + * to stop the current primary policy, without substituting it with another + * primary policy, so the primary policy (if any) is transitioned to + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding + * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In + * this case, the fallback policy is only left active in the NRS head. + */ +static int +nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy) +{ + struct ptlrpc_nrs *nrs = policy->pol_nrs; + int rc = 0; + ENTRY; + + /** + * Don't allow multiple starting which is too complex, and has no real + * benefit. + */ + if (nrs->nrs_policy_starting) + RETURN(-EAGAIN); + + LASSERT(policy->pol_state != NRS_POL_STATE_STARTING); + + if (policy->pol_state == NRS_POL_STATE_STOPPING || + policy->pol_state == NRS_POL_STATE_UNAVAIL) + RETURN(-EAGAIN); + + if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) { + /** + * This is for cases in which the user sets the policy to the + * fallback policy (currently fifo for all services); i.e. the + * user is resetting the policy to the default; so we stop the + * primary policy, if any. + */ + if (policy == nrs->nrs_policy_fallback) { + nrs_policy_stop_primary(nrs); + + RETURN(0); + } + + /** + * If we reach here, we must be setting up the fallback policy + * at service startup time, and only a single policy with the + * nrs_policy_flags::PTLRPC_NRS_FL_FALLBACK flag set can + * register with NRS core. + */ + LASSERT(nrs->nrs_policy_fallback == NULL); + } else { + /** + * Shouldn't start primary policy if w/o fallback policy. + */ + if (nrs->nrs_policy_fallback == NULL) + RETURN(-EPERM); + + if (policy->pol_state == NRS_POL_STATE_STARTED) + RETURN(0); + } + + /** + * Serialize policy starting.across the NRS head + */ + nrs->nrs_policy_starting = 1; + + policy->pol_state = NRS_POL_STATE_STARTING; + + if (policy->pol_ops->op_policy_start) { + spin_unlock(&nrs->nrs_lock); + + rc = policy->pol_ops->op_policy_start(policy); + + spin_lock(&nrs->nrs_lock); + if (rc != 0) { + policy->pol_state = NRS_POL_STATE_STOPPED; + GOTO(out, rc); + } + } + + policy->pol_state = NRS_POL_STATE_STARTED; + + if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) { + /** + * This path is only used at PTLRPC service setup time. + */ + nrs->nrs_policy_fallback = policy; + } else { + /* + * Try to stop the current primary policy if there is one. + */ + nrs_policy_stop_primary(nrs); + + /** + * And set the newly-started policy as the primary one. + */ + nrs->nrs_policy_primary = policy; + } + +out: + nrs->nrs_policy_starting = 0; + + RETURN(rc); +} + +/** + * Increases the policy's usage reference count. + */ +static void +nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy) +{ + policy->pol_ref++; +} + +/** + * Decreases the policy's usage reference count, and stops the policy in case it + * was already stopping and have no more outstanding usage references (which + * indicates it has no more queued or started requests, and can be safely + * stopped). + */ +static void +nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy) +{ + LASSERT(policy->pol_ref > 0); + + policy->pol_ref--; + if (unlikely(policy->pol_ref == 0 && + policy->pol_state == NRS_POL_STATE_STOPPING)) + nrs_policy_stop0(policy); +} + +static void +nrs_policy_put(struct ptlrpc_nrs_policy *policy) +{ + spin_lock(&policy->pol_nrs->nrs_lock); + nrs_policy_put_locked(policy); + spin_unlock(&policy->pol_nrs->nrs_lock); +} + +/** + * Find and return a policy by name. + */ +static struct ptlrpc_nrs_policy * +nrs_policy_find_locked(struct ptlrpc_nrs *nrs, char *name) +{ + struct ptlrpc_nrs_policy *tmp; + + cfs_list_for_each_entry(tmp, &(nrs)->nrs_policy_list, pol_list) { + if (strncmp(tmp->pol_name, name, NRS_POL_NAME_MAX) == 0) { + nrs_policy_get_locked(tmp); + return tmp; + } + } + return NULL; +} + +/** + * Release references for the resource hierarchy moving upwards towards the + * policy instance resource. + */ +static void +nrs_resource_put(struct ptlrpc_nrs_resource *res) +{ + struct ptlrpc_nrs_policy *policy = res->res_policy; + + if (policy->pol_ops->op_res_put != NULL) { + struct ptlrpc_nrs_resource *parent; + + for (; res != NULL; res = parent) { + parent = res->res_parent; + policy->pol_ops->op_res_put(policy, res); + } + } +} + +/** + * Obtains references for each resource in the resource hierarchy for request + * \a nrq if it is to be handled by \a policy. + * + * \param[in] policy The policy + * \param[in] nrq The request + * \param[in] moving_req Denotes whether this is a call to the function by + * ldlm_lock_reorder_req(), in order to move \a nrq to + * the high-priority NRS head; we should not sleep when + * set. + * + * \retval NULL Resource hierarchy references not obtained + * \retval valid-pointer The bottom level of the resource hierarchy + * + * \see ptlrpc_nrs_pol_ops::op_res_get() + */ +static struct ptlrpc_nrs_resource * +nrs_resource_get(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq, bool moving_req) +{ + /** + * Set to NULL to traverse the resource hierarchy from the top. + */ + struct ptlrpc_nrs_resource *res = NULL; + struct ptlrpc_nrs_resource *tmp = NULL; + int rc; + + while (1) { + rc = policy->pol_ops->op_res_get(policy, nrq, res, &tmp, + moving_req); + if (rc < 0) { + if (res != NULL) + nrs_resource_put(res); + return NULL; + } + + LASSERT(tmp != NULL); + tmp->res_parent = res; + tmp->res_policy = policy; + res = tmp; + tmp = NULL; + /** + * Return once we have obtained a reference to the bottom level + * of the resource hierarchy. + */ + if (rc > 0) + return res; + } +} + +/** + * Obtains resources for the resource hierarchies and policy references for + * the fallback and current primary policy (if any), that will later be used + * to handle request \a nrq. + * + * \param[in] nrs The NRS head instance that will be handling request \a nrq. + * \param[in] nrq The request that is being handled. + * \param[out] resp The array where references to the resource hierarchy are + * stored. + * \param[in] moving_req Is set when obtaining resources while moving a + * request from a policy on the regular NRS head to a + * policy on the HP NRS head (via + * ldlm_lock_reorder_req()). It signifies that + * allocations to get resources should be atomic; for + * a full explanation, see comment in + * ptlrpc_nrs_pol_ops::op_res_get(). + */ +static void +nrs_resource_get_safe(struct ptlrpc_nrs *nrs, struct ptlrpc_nrs_request *nrq, + struct ptlrpc_nrs_resource **resp, bool moving_req) +{ + struct ptlrpc_nrs_policy *primary = NULL; + struct ptlrpc_nrs_policy *fallback = NULL; + + memset(resp, 0, sizeof(resp[0]) * NRS_RES_MAX); + + /** + * Obtain policy references. + */ + spin_lock(&nrs->nrs_lock); + + fallback = nrs->nrs_policy_fallback; + nrs_policy_get_locked(fallback); + + primary = nrs->nrs_policy_primary; + if (primary != NULL) + nrs_policy_get_locked(primary); + + spin_unlock(&nrs->nrs_lock); + + /** + * Obtain resource hierarchy references. + */ + resp[NRS_RES_FALLBACK] = nrs_resource_get(fallback, nrq, moving_req); + LASSERT(resp[NRS_RES_FALLBACK] != NULL); + + if (primary != NULL) { + resp[NRS_RES_PRIMARY] = nrs_resource_get(primary, nrq, + moving_req); + /** + * A primary policy may exist which may not wish to serve a + * particular request for different reasons; release the + * reference on the policy as it will not be used for this + * request. + */ + if (resp[NRS_RES_PRIMARY] == NULL) + nrs_policy_put(primary); + } +} + +/** + * Releases references to resource hierarchies and policies, because they are no + * longer required; used when request handling has been completed, ot the + * request is moving to the high priority NRS head. + * + * \param resp The resource hierarchy that is being released + * + * \see ptlrpcnrs_req_hp_move() + * \see ptlrpc_nrs_req_finalize() + */ +static void +nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp) +{ + struct ptlrpc_nrs_policy *pols[NRS_RES_MAX]; + struct ptlrpc_nrs *nrs = NULL; + int i; + + for (i = 0; i < NRS_RES_MAX; i++) { + if (resp[i] != NULL) { + pols[i] = resp[i]->res_policy; + nrs_resource_put(resp[i]); + resp[i] = NULL; + } else { + pols[i] = NULL; + } + } + + for (i = 0; i < NRS_RES_MAX; i++) { + if (pols[i] == NULL) + continue; + + if (nrs == NULL) { + nrs = pols[i]->pol_nrs; + spin_lock(&nrs->nrs_lock); + } + nrs_policy_put_locked(pols[i]); + } + + if (nrs != NULL) + spin_unlock(&nrs->nrs_lock); +} + +/** + * Obtains an NRS request from \a policy for handling via polling. + * + * \param[in] policy The policy being polled + * \param[in,out] arg Reserved parameter + */ +static struct ptlrpc_nrs_request * +nrs_request_poll(struct ptlrpc_nrs_policy *policy) +{ + struct ptlrpc_nrs_request *nrq; + + LASSERT(policy->pol_req_queued > 0); + + nrq = policy->pol_ops->op_req_poll(policy); + + LASSERT(nrq != NULL); + LASSERT(nrs_request_policy(nrq) == policy); + + return nrq; +} + +/** + * Enqueues request \a nrq for later handling, via one one the policies for + * which resources where earlier obtained via nrs_resource_get_safe(). The + * function attempts to enqueue the request first on the primary policy + * (if any), since this is the preferred choice. + * + * \param nrq The request being enqueued + * + * \see nrs_resource_get_safe() + */ +static void +nrs_request_enqueue(struct ptlrpc_nrs_request *nrq) +{ + struct ptlrpc_nrs_policy *policy; + int rc; + int i; + + /** + * Try in descending order, because the primary policy (if any) is + * the preferred choice. + */ + for (i = NRS_RES_MAX - 1; i >= 0; i--) { + if (nrq->nr_res_ptrs[i] == NULL) + continue; + + nrq->nr_res_idx = i; + policy = nrq->nr_res_ptrs[i]->res_policy; + + rc = policy->pol_ops->op_req_enqueue(policy, nrq); + if (rc == 0) { + policy->pol_nrs->nrs_req_queued++; + policy->pol_req_queued++; + return; + } + } + /** + * Should never get here, as at least the primary policy's + * ptlrpc_nrs_pol_ops::op_req_enqueue() implementation should always + * succeed. + */ + LBUG(); +} + +/** + * Dequeues request \a nrq from the policy which was used for handling it + * + * \param nrq The request being dequeued + * + * \see ptlrpc_nrs_req_del_nolock() + */ +static void +nrs_request_dequeue(struct ptlrpc_nrs_request *nrq) +{ + struct ptlrpc_nrs_policy *policy; + + policy = nrs_request_policy(nrq); + + policy->pol_ops->op_req_dequeue(policy, nrq); + + LASSERT(policy->pol_nrs->nrs_req_queued > 0); + LASSERT(policy->pol_req_queued > 0); + + policy->pol_nrs->nrs_req_queued--; + policy->pol_req_queued--; +} + +/** + * Is called when the request starts being handled, after it has been enqueued, + * polled and dequeued. + * + * \param[in] nrs The NRS request that is starting to be handled; can be used + * for job/resource control. + * + * \see ptlrpc_nrs_req_start_nolock() + */ +static void +nrs_request_start(struct ptlrpc_nrs_request *nrq) +{ + struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq); + + policy->pol_req_started++; + policy->pol_nrs->nrs_req_started++; + if (policy->pol_ops->op_req_start) + policy->pol_ops->op_req_start(policy, nrq); +} + +/** + * Called when a request has been handled + * + * \param[in] nrs The request that has been handled; can be used for + * job/resource control. + * + * \see ptlrpc_nrs_req_stop_nolock() + */ +static void +nrs_request_stop(struct ptlrpc_nrs_request *nrq) +{ + struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq); + + if (policy->pol_ops->op_req_stop) + policy->pol_ops->op_req_stop(policy, nrq); + + LASSERT(policy->pol_nrs->nrs_req_started > 0); + LASSERT(policy->pol_req_started > 0); + + policy->pol_nrs->nrs_req_started--; + policy->pol_req_started--; +} + +/** + * Handler for operations that can be carried out on policies. + * + * Handles opcodes that are common to all policy types within NRS core, and + * passes any unknown opcodes to the policy-specific control function. + * + * \param[in] nrs The NRS head this policy belongs to. + * \param[in] name The human-readable policy name; should be the same as + * ptlrpc_nrs_pol_desc::pd_name. + * \param[in] opc The opcode of the operation being carried out. + * \param[in,out] arg Can be used to pass information in and out between when + * carrying an operation; usually data that is private to + * the policy at some level, or generic policy status + * information. + * + * \retval -ve error condition + * \retval 0 operation was carried out successfully + */ +static int +nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name, enum ptlrpc_nrs_ctl opc, + void *arg) +{ + struct ptlrpc_nrs_policy *policy; + int rc = 0; + + spin_lock(&nrs->nrs_lock); + + policy = nrs_policy_find_locked(nrs, name); + if (policy == NULL) + GOTO(out, rc = -ENOENT); + + switch (opc) { + /** + * Unknown opcode, pass it down to the policy-specific control + * function for handling. + */ + default: + rc = nrs_policy_ctl_locked(policy, opc, arg); + break; + + /** + * Start \e policy + */ + case PTLRPC_NRS_CTL_START: + rc = nrs_policy_start_locked(policy); + break; + + /** + * TODO: This may need to be augmented for resource deallocation + * used by the policies. + */ + case PTLRPC_NRS_CTL_SHRINK: + rc = -ENOSYS; + break; + } +out: + if (policy != NULL) + nrs_policy_put_locked(policy); + + spin_unlock(&nrs->nrs_lock); + + return rc; +} + +/** + * Unregisters a policy by name. + * + * \param[in] nrs The NRS head this policy belongs to. + * \param[in] name The human-readable policy name; should be the same as + * ptlrpc_nrs_pol_desc::pd_name + * + * \retval -ve error + * \retval 0 success + */ +static int +nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name) +{ + struct ptlrpc_nrs_policy *policy = NULL; + ENTRY; + + spin_lock(&nrs->nrs_lock); + + policy = nrs_policy_find_locked(nrs, name); + if (policy == NULL) { + spin_unlock(&nrs->nrs_lock); + + CERROR("Can't find NRS policy %s\n", name); + RETURN(-ENOENT); + } + + if (policy->pol_ref > 1) { + CERROR("Policy %s is busy with %d references\n", name, + (int)policy->pol_ref); + nrs_policy_put_locked(policy); + + spin_unlock(&nrs->nrs_lock); + RETURN(-EBUSY); + } + + LASSERT(policy->pol_req_queued == 0); + LASSERT(policy->pol_req_started == 0); + + if (policy->pol_state != NRS_POL_STATE_STOPPED) { + nrs_policy_stop_locked(policy); + LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED); + } + + cfs_list_del(&policy->pol_list); + nrs->nrs_num_pols--; + + nrs_policy_put_locked(policy); + + spin_unlock(&nrs->nrs_lock); + + nrs_policy_fini(policy); + + LASSERT(policy->pol_private == NULL); + OBD_FREE_PTR(policy); + + RETURN(0); +} + +/** + * Register a policy from \policy descriptor \a desc with NRS head \a nrs. + * + * \param[in] nrs The NRS head on which the policy will be registered. + * \param[in] desc The policy descriptor from which the information will be + * obtained to register the policy. + * + * \retval -ve error + * \retval 0 success + */ +static int +nrs_policy_register(struct ptlrpc_nrs *nrs, + struct ptlrpc_nrs_pol_desc *desc) +{ + struct ptlrpc_nrs_policy *policy; + struct ptlrpc_nrs_policy *tmp; + struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt; + int rc; + ENTRY; + + LASSERT(svcpt != NULL); + LASSERT(desc->pd_ops != NULL); + LASSERT(desc->pd_ops->op_res_get != NULL); + LASSERT(desc->pd_ops->op_req_poll != NULL); + LASSERT(desc->pd_ops->op_req_enqueue != NULL); + LASSERT(desc->pd_ops->op_req_dequeue != NULL); + LASSERT(desc->pd_compat != NULL); + + OBD_CPT_ALLOC_GFP(policy, svcpt->scp_service->srv_cptable, + svcpt->scp_cpt, sizeof(*policy), CFS_ALLOC_IO); + if (policy == NULL) + RETURN(-ENOMEM); + + policy->pol_nrs = nrs; + policy->pol_name = desc->pd_name; + policy->pol_ops = desc->pd_ops; + policy->pol_state = desc->pd_flags & PTLRPC_NRS_FL_REG_EXTERN ? + NRS_POL_STATE_UNAVAIL : NRS_POL_STATE_STOPPED; + policy->pol_flags = desc->pd_flags & ~PTLRPC_NRS_FL_REG_EXTERN; + + CFS_INIT_LIST_HEAD(&policy->pol_list); + CFS_INIT_LIST_HEAD(&policy->pol_list_queued); + + rc = nrs_policy_init(policy); + if (rc != 0) { + OBD_FREE_PTR(policy); + RETURN(rc); + } + + spin_lock(&nrs->nrs_lock); + + tmp = nrs_policy_find_locked(nrs, policy->pol_name); + if (tmp != NULL) { + CERROR("NRS policy %s has been registered, can't register it " + "for %s\n", + policy->pol_name, svcpt->scp_service->srv_name); + nrs_policy_put_locked(tmp); + + spin_unlock(&nrs->nrs_lock); + nrs_policy_fini(policy); + OBD_FREE_PTR(policy); + + RETURN(-EEXIST); + } + + cfs_list_add_tail(&policy->pol_list, &nrs->nrs_policy_list); + nrs->nrs_num_pols++; + + if (policy->pol_flags & PTLRPC_NRS_FL_REG_START) + rc = nrs_policy_start_locked(policy); + + spin_unlock(&nrs->nrs_lock); + + if (rc != 0) + (void) nrs_policy_unregister(nrs, policy->pol_name); + + RETURN(rc); +} + +/** + * Enqueue request \a req using one of the policies its resources are referring + * to. + * + * \param[in] req The request to enqueue. + */ +static void +ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req) +{ + struct ptlrpc_nrs_policy *policy; + + LASSERT(req->rq_nrq.nr_initialized); + LASSERT(!req->rq_nrq.nr_enqueued); + + nrs_request_enqueue(&req->rq_nrq); + req->rq_nrq.nr_enqueued = 1; + + policy = nrs_request_policy(&req->rq_nrq); + /** + * Add the policy to the NRS head's list of policies with enqueued + * requests, if it has not been added there. + */ + if (cfs_list_empty(&policy->pol_list_queued)) + cfs_list_add_tail(&policy->pol_list_queued, + &policy->pol_nrs->nrs_policy_queued); +} + +/** + * Enqueue a request on the high priority NRS head. + * + * \param req The request to enqueue. + */ +static void +ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req) +{ + int opc = lustre_msg_get_opc(req->rq_reqmsg); + ENTRY; + + spin_lock(&req->rq_lock); + req->rq_hp = 1; + ptlrpc_nrs_req_add_nolock(req); + if (opc != OBD_PING) + DEBUG_REQ(D_NET, req, "high priority req"); + spin_unlock(&req->rq_lock); + EXIT; +} + +/* ptlrpc/nrs_fifo.c */ +extern struct ptlrpc_nrs_pol_desc ptlrpc_nrs_fifo_desc; + +/** + * Array of policies that ship alongside NRS core; i.e. ones that do not + * register externally using ptlrpc_nrs_policy_register(). + */ +static struct ptlrpc_nrs_pol_desc *nrs_pols_builtin[] = { + &ptlrpc_nrs_fifo_desc, +}; + +/** + * Returns a boolean predicate indicating whether the policy described by + * \a desc is adequate for use with service \a svc. + * + * \param[in] nrs The service + * \param[in] desc The policy descriptor + * + * \retval false The policy is not compatible with the service partition + * \retval true The policy is compatible with the service partition + */ +static inline bool +nrs_policy_compatible(struct ptlrpc_service *svc, + const struct ptlrpc_nrs_pol_desc *desc) +{ + return desc->pd_compat(svc, desc); +} + +/** + * Registers all compatible policies in nrs_core.nrs_policies, for NRS head + * \a nrs. + * + * \param[in] nrs The NRS head + * + * \retval -ve error + * \retval 0 success + * + * \pre mutex_is_locked(&nrs_core.nrs_mutex) + * + * \see ptlrpc_service_nrs_setup() + */ +static int +nrs_register_policies_locked(struct ptlrpc_nrs *nrs) +{ + struct ptlrpc_nrs_pol_desc *desc; + /* For convenience */ + struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt; + struct ptlrpc_service *svc = svcpt->scp_service; + int rc = -EINVAL; + ENTRY; + + LASSERT(mutex_is_locked(&nrs_core.nrs_mutex)); + + cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) { + if (nrs_policy_compatible(svc, desc)) { + rc = nrs_policy_register(nrs, desc); + if (rc != 0) { + CERROR("Failed to register NRS policy %s for " + "partition %d of service %s: %d\n", + desc->pd_name, svcpt->scp_cpt, + svc->srv_name, rc); + /** + * Fail registration if any of the policies' + * registration fails. + */ + break; + } + } + } + + RETURN(rc); +} + +/** + * Initializes NRS head \a nrs of service partition \a svcpt, and registers all + * compatible policies in NRS core, with the NRS head. + * + * \param[in] nrs The NRS head + * \param[in] svcpt The PTLRPC service partition to setup + * + * \pre mutex_is_locked(&nrs_core.nrs_mutex) + */ +static int +nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs, + struct ptlrpc_service_part *svcpt) +{ + int rc; + enum ptlrpc_nrs_queue_type queue; + + LASSERT(mutex_is_locked(&nrs_core.nrs_mutex)); + + if (nrs == &svcpt->scp_nrs_reg) + queue = PTLRPC_NRS_QUEUE_REG; + else if (nrs == svcpt->scp_nrs_hp) + queue = PTLRPC_NRS_QUEUE_HP; + else + LBUG(); + + nrs->nrs_svcpt = svcpt; + nrs->nrs_queue_type = queue; + spin_lock_init(&nrs->nrs_lock); + CFS_INIT_LIST_HEAD(&nrs->nrs_heads); + CFS_INIT_LIST_HEAD(&nrs->nrs_policy_list); + CFS_INIT_LIST_HEAD(&nrs->nrs_policy_queued); + + cfs_list_add_tail(&nrs->nrs_heads, &nrs_core.nrs_heads); + + rc = nrs_register_policies_locked(nrs); + + RETURN(rc); +} + +/** + * Allocates a regular and optionally a high-priority NRS head (if the service + * handles high-priority RPCs), and then registers all available compatible + * policies on those NRS heads. + * + * \param[n] svcpt The PTLRPC service partition to setup + * + * \pre mutex_is_locked(&nrs_core.nrs_mutex) + */ +static int +nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt) +{ + struct ptlrpc_nrs *nrs; + int rc; + ENTRY; + + LASSERT(mutex_is_locked(&nrs_core.nrs_mutex)); + + /** + * Initialize the regular NRS head. + */ + nrs = nrs_svcpt2nrs(svcpt, false); + rc = nrs_svcpt_setup_locked0(nrs, svcpt); + if (rc) + GOTO(out, rc); + + /** + * Optionally allocate a high-priority NRS head. + */ + if (svcpt->scp_service->srv_ops.so_hpreq_handler == NULL) + GOTO(out, rc); + + OBD_CPT_ALLOC_PTR(svcpt->scp_nrs_hp, + svcpt->scp_service->srv_cptable, + svcpt->scp_cpt); + if (svcpt->scp_nrs_hp == NULL) + GOTO(out, rc = -ENOMEM); + + nrs = nrs_svcpt2nrs(svcpt, true); + rc = nrs_svcpt_setup_locked0(nrs, svcpt); + +out: + RETURN(rc); +} + +/** + * Unregisters all policies on all available NRS heads in a service partition; + * called at PTLRPC service unregistration time. + * + * \param[in] svcpt The PTLRPC service partition + * + * \pre mutex_is_locked(&nrs_core.nrs_mutex) + */ +static void +nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt) +{ + struct ptlrpc_nrs *nrs; + struct ptlrpc_nrs_policy *policy; + struct ptlrpc_nrs_policy *tmp; + int rc; + bool hp = false; + ENTRY; + + LASSERT(mutex_is_locked(&nrs_core.nrs_mutex)); + +again: + nrs = nrs_svcpt2nrs(svcpt, hp); + nrs->nrs_stopping = 1; + + cfs_list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list, + pol_list) { + rc = nrs_policy_unregister(nrs, policy->pol_name); + LASSERT(rc == 0); + } + + cfs_list_del(&nrs->nrs_heads); + + /** + * If the service partition has an HP NRS head, clean that up as well. + */ + if (!hp && nrs_svcpt_has_hp(svcpt)) { + hp = true; + goto again; + } + + if (hp) + OBD_FREE_PTR(nrs); + + EXIT; +} + +/** + * Checks whether the policy in \a desc has been added to NRS core's list of + * policies, \e nrs_core.nrs_policies. + * + * \param[in] desc The policy descriptor + * + * \retval true The policy is present + * \retval false The policy is not present + */ +static bool +nrs_policy_exists_locked(const struct ptlrpc_nrs_pol_desc *desc) +{ + struct ptlrpc_nrs_pol_desc *tmp; + ENTRY; + + cfs_list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) { + if (strncmp(tmp->pd_name, desc->pd_name, NRS_POL_NAME_MAX) == 0) + RETURN(true); + } + RETURN(false); +} + +/** + * Removes the policy from all supported NRS heads. + * + * \param[in] desc The policy descriptor to unregister + * + * \retval -ve error + * \retval 0 successfully unregistered policy on all supported NRS heads + * + * \pre mutex_is_locked(&nrs_core.nrs_mutex) + */ +static int +nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc) +{ + struct ptlrpc_nrs *nrs; + int rc = 0; + ENTRY; + + LASSERT(mutex_is_locked(&nrs_core.nrs_mutex)); + + cfs_list_for_each_entry(nrs, &nrs_core.nrs_heads, nrs_heads) { + if (!nrs_policy_compatible(nrs->nrs_svcpt->scp_service, desc)) { + /** + * The policy may only have registered on compatible + * NRS heads. + */ + continue; + } + + rc = nrs_policy_unregister(nrs, desc->pd_name); + + /** + * Ignore -ENOENT as the policy may not have registered + * successfully on all service partitions. + */ + if (rc == -ENOENT) { + rc = 0; + } else if (rc != 0) { + CERROR("Failed to unregister NRS policy %s for " + "partition %d of service %s: %d\n", + desc->pd_name, nrs->nrs_svcpt->scp_cpt, + nrs->nrs_svcpt->scp_service->srv_name, rc); + break; + } + } + RETURN(rc); +} + +/** + * Transitions a policy from ptlrpc_nrs_pol_state::NRS_POL_STATE_UNAVAIL to + * ptlrpc_nrs_pol_state::STOPPED; is used to prevent policies that are + * registering externally using ptlrpc_nrs_policy_register from starting + * before they have successfully registered on all compatible service + * partitions. + * + * \param[in] nrs The NRS head that the policy belongs to + * \param[in] name The human-readable policy name + */ +static void +nrs_pol_make_available0(struct ptlrpc_nrs *nrs, char *name) +{ + struct ptlrpc_nrs_policy *pol; + + LASSERT(nrs); + LASSERT(name); + + spin_lock(&nrs->nrs_lock); + pol = nrs_policy_find_locked(nrs, name); + if (pol) { + LASSERT(pol->pol_state == NRS_POL_STATE_UNAVAIL); + pol->pol_state = NRS_POL_STATE_STOPPED; + nrs_policy_put_locked(pol); + } + spin_unlock(&nrs->nrs_lock); +} + +/** + * Make the policy available on all compatible service partitions of all PTLRPC + * services. + * + * \param[in] desc The descriptor for the policy that is to be made available + * + * \pre mutex_is_locked(&nrs_core.nrs_mutex) + * + * \see nrs_pol_make_available0() + */ +static void +nrs_pol_make_available_locked(struct ptlrpc_nrs_pol_desc *desc) +{ + struct ptlrpc_nrs *nrs; + ENTRY; + + LASSERT(mutex_is_locked(&nrs_core.nrs_mutex)); + + /** + * Cycle through all registered instances of the policy and place them + * at the STOPPED state. + */ + cfs_list_for_each_entry(nrs, &nrs_core.nrs_heads, nrs_heads) { + if (!nrs_policy_compatible(nrs->nrs_svcpt->scp_service, desc)) + continue; + nrs_pol_make_available0(nrs, desc->pd_name); + } + EXIT; +} + +/** + * Registers a new policy with NRS core. + * + * Used for policies that register externally with NRS core, i.e. ones that are + * not part of \e nrs_pols_builtin[]. The function will only succeed if policy + * registration with all compatible service partitions is successful. + * + * \param[in] desc The policy descriptor to register + * + * \retval -ve error + * \retval 0 success + */ +int +ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_desc *desc) +{ + struct ptlrpc_nrs *nrs; + struct ptlrpc_service *svc; + struct ptlrpc_service_part *svcpt; + int i; + int rc; + int rc2; + ENTRY; + + LASSERT(desc != NULL); + + desc->pd_name[NRS_POL_NAME_MAX - 1] = '\0'; + + if (desc->pd_flags & (PTLRPC_NRS_FL_FALLBACK | + PTLRPC_NRS_FL_REG_START)) { + CERROR("Failing to register NRS policy %s; re-check policy " + "flags, externally-registered policies cannot act as " + "fallback policies or be started immediately without " + "interaction with lprocfs.\n", desc->pd_name); + RETURN(-EINVAL); + } + + desc->pd_flags |= PTLRPC_NRS_FL_REG_EXTERN; + + mutex_lock(&nrs_core.nrs_mutex); + + rc = nrs_policy_exists_locked(desc); + if (rc) { + CERROR("Failing to register NRS policy %s which has " + "already been registered with NRS core!\n", + desc->pd_name); + GOTO(fail, rc = -EEXIST); + } + + /** + * Register the new policy on all compatible services + */ + mutex_lock(&ptlrpc_all_services_mutex); + + cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) { + + if (unlikely(svc->srv_is_stopping)) { + mutex_unlock(&ptlrpc_all_services_mutex); + GOTO(fail, rc = -ESRCH); + } + + if (!nrs_policy_compatible(svc, desc)) { + /** + * Attempt to register the policy if it is + * compatible, otherwise try the next service. + */ + continue; + } + ptlrpc_service_for_each_part(svcpt, i, svc) { + bool hp = false; + +again: + nrs = nrs_svcpt2nrs(svcpt, hp); + rc = nrs_policy_register(nrs, desc); + if (rc != 0) { + CERROR("Failed to register NRS policy %s for " + "partition %d of service %s: %d\n", + desc->pd_name, nrs->nrs_svcpt->scp_cpt, + nrs->nrs_svcpt->scp_service->srv_name, + rc); + + rc2 = nrs_policy_unregister_locked(desc); + /** + * Should not fail at this point + */ + LASSERT(rc2 == 0); + mutex_unlock(&ptlrpc_all_services_mutex); + GOTO(fail, rc); + } + + if (!hp && nrs_svc_has_hp(svc)) { + hp = true; + goto again; + } + } + if (desc->pd_ops->op_lprocfs_init != NULL) { + rc = desc->pd_ops->op_lprocfs_init(svc); + if (rc != 0) { + rc2 = nrs_policy_unregister_locked(desc); + /** + * Should not fail at this point + */ + LASSERT(rc2 == 0); + mutex_unlock(&ptlrpc_all_services_mutex); + GOTO(fail, rc); + } + } + } + + mutex_unlock(&ptlrpc_all_services_mutex); + + /** + * The policy has successfully registered with all service partitions, + * so mark the policy instances at the NRS heads as available. + */ + nrs_pol_make_available_locked(desc); + + cfs_list_add_tail(&desc->pd_list, &nrs_core.nrs_policies); +fail: + mutex_unlock(&nrs_core.nrs_mutex); + + RETURN(rc); +} +EXPORT_SYMBOL(ptlrpc_nrs_policy_register); + +/** + * Unregisters a previously registered policy with NRS core. All instances of + * the policy on all NRS heads of all supported services are removed. + * + * \param[in] desc The descriptor of the policy to unregister + * + * \retval -ve error + * \retval 0 success + */ +int +ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_desc *desc) +{ + int rc; + struct ptlrpc_service *svc; + ENTRY; + + LASSERT(desc != NULL); + + if (desc->pd_flags & PTLRPC_NRS_FL_FALLBACK) { + CERROR("Unable to unregister a fallback policy, unless the " + "PTLRPC service is stopping.\n"); + RETURN(-EPERM); + } + + desc->pd_name[NRS_POL_NAME_MAX - 1] = '\0'; + + mutex_lock(&nrs_core.nrs_mutex); + + rc = nrs_policy_exists_locked(desc); + if (!rc) { + CERROR("Failing to unregister NRS policy %s which has " + "not been registered with NRS core!\n", + desc->pd_name); + GOTO(fail, rc = -ENOENT); + } + + rc = nrs_policy_unregister_locked(desc); + if (rc == -EBUSY) { + CERROR("Please first stop policy %s on all service partitions " + "and then retry to unregister the policy.\n", + desc->pd_name); + GOTO(fail, rc); + } + CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n", + desc->pd_name); + + cfs_list_del(&desc->pd_list); + + /** + * Unregister the policy's lprocfs interface from all compatible + * services. + */ + mutex_lock(&ptlrpc_all_services_mutex); + + cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) { + if (!nrs_policy_compatible(svc, desc)) + continue; + + if (desc->pd_ops->op_lprocfs_fini != NULL) + desc->pd_ops->op_lprocfs_fini(svc); + } + + mutex_unlock(&ptlrpc_all_services_mutex); + +fail: + mutex_unlock(&nrs_core.nrs_mutex); + + RETURN(rc); +} +EXPORT_SYMBOL(ptlrpc_nrs_policy_unregister); + +/** + * Setup NRS heads on all service partitions of service \a svc, and register + * all compatible policies on those NRS heads. + * + * \param[in] svc The service to setup + * + * \retval -ve error, the calling logic should eventually call + * ptlrpc_service_nrs_cleanup() to undo any work performed + * by this function. + * + * \see ptlrpc_register_service() + * \see ptlrpc_service_nrs_cleanup() + */ +int +ptlrpc_service_nrs_setup(struct ptlrpc_service *svc) +{ + struct ptlrpc_service_part *svcpt; + const struct ptlrpc_nrs_pol_desc *desc; + int i; + int rc = 0; + + mutex_lock(&nrs_core.nrs_mutex); + + /** + * Initialize NRS heads on all service CPTs. + */ + ptlrpc_service_for_each_part(svcpt, i, svc) { + rc = nrs_svcpt_setup_locked(svcpt); + if (rc != 0) + GOTO(failed, rc); + } + + /* + * Set up lprocfs interfaces for all supported policies for the + * service. + */ + cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) { + if (!nrs_policy_compatible(svc, desc)) + continue; + + if (desc->pd_ops->op_lprocfs_init != NULL) { + rc = desc->pd_ops->op_lprocfs_init(svc); + if (rc != 0) + GOTO(failed, rc); + } + } + +failed: + + mutex_unlock(&nrs_core.nrs_mutex); + + RETURN(rc); +} + +/** + * Unregisters all policies on all service partitions of service \a svc. + * + * \param[in] svc The PTLRPC service to unregister + */ +void +ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc) +{ + struct ptlrpc_service_part *svcpt; + const struct ptlrpc_nrs_pol_desc *desc; + int i; + + mutex_lock(&nrs_core.nrs_mutex); + + /** + * Clean up NRS heads on all service partitions + */ + ptlrpc_service_for_each_part(svcpt, i, svc) + nrs_svcpt_cleanup_locked(svcpt); + + /** + * Clean up lprocfs interfaces for all supported policies for the + * service. + */ + cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) { + if (!nrs_policy_compatible(svc, desc)) + continue; + + if (desc->pd_ops->op_lprocfs_fini != NULL) + desc->pd_ops->op_lprocfs_fini(svc); + } + + mutex_unlock(&nrs_core.nrs_mutex); +} + +/** + * Obtains NRS head resources for request \a req. + * + * These could be either on the regular or HP NRS head of \a svcpt; resources + * taken on the regular head can later be swapped for HP head resources by + * ldlm_lock_reorder_req(). + * + * \param[in] svcpt The service partition + * \param[in] req The request + * \param[in] hp Which NRS head of \a svcpt to use + */ +void +ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt, + struct ptlrpc_request *req, bool hp) +{ + struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp); + + memset(&req->rq_nrq, 0, sizeof(req->rq_nrq)); + nrs_resource_get_safe(nrs, &req->rq_nrq, req->rq_nrq.nr_res_ptrs, + false); + + /** + * It is fine to access \e nr_initialized without locking as there is + * no contention at this early stage. + */ + req->rq_nrq.nr_initialized = 1; +} + +/** + * Releases resources for a request; is called after the request has been + * handled. + * + * \param[in] req The request + * + * \see ptlrpc_server_finish_request() + */ +void +ptlrpc_nrs_req_finalize(struct ptlrpc_request *req) +{ + if (req->rq_nrq.nr_initialized) { + nrs_resource_put_safe(req->rq_nrq.nr_res_ptrs); + /* no protection on bit nr_initialized because no + * contention at this late stage */ + req->rq_nrq.nr_finalized = 1; + } +} + +void +ptlrpc_nrs_req_start_nolock(struct ptlrpc_request *req) +{ + req->rq_nrq.nr_started = 1; + nrs_request_start(&req->rq_nrq); +} + +void +ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req) +{ + if (req->rq_nrq.nr_started) + nrs_request_stop(&req->rq_nrq); +} + +/** + * Enqueues request \a req on either the regular or high-priority NRS head + * of service partition \a svcpt. + * + * \param[in] svcpt The service partition + * \param[in] req The request to be enqueued + * \param[in] hp Whether to enqueue the request on the regular or + * high-priority NRS head. + */ +void +ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt, + struct ptlrpc_request *req, bool hp) +{ + spin_lock(&svcpt->scp_req_lock); + + if (hp) + ptlrpc_nrs_hpreq_add_nolock(req); + else + ptlrpc_nrs_req_add_nolock(req); + + spin_unlock(&svcpt->scp_req_lock); +} + +/** + * Obtains a request for handling from an NRS head of service partition + * \a svcpt. + * + * \param[in] svcpt The service partition + * \param[in] hp Whether to obtain a request from the regular or + * high-priority NRS head. + * + * \retval the request to be handled + * \retval NULL on failure + */ +struct ptlrpc_request * +ptlrpc_nrs_req_poll_nolock(struct ptlrpc_service_part *svcpt, bool hp) +{ + struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp); + struct ptlrpc_nrs_policy *policy; + struct ptlrpc_nrs_request *nrq; + + if (unlikely(nrs->nrs_req_queued == 0)) + return NULL; + + /** + * Always try to drain requests from all NRS polices even if they are + * inactive, because the user can change policy status at runtime. + */ + cfs_list_for_each_entry(policy, &(nrs)->nrs_policy_queued, + pol_list_queued) { + nrq = nrs_request_poll(policy); + if (likely(nrq != NULL)) + return container_of(nrq, struct ptlrpc_request, rq_nrq); + } + + return NULL; +} + +/** + * Dequeues a request that was previously obtained via ptlrpc_nrs_req_poll() and + * is about to be handled. + * + * \param[in] req The request + */ +void +ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req) +{ + struct ptlrpc_nrs_policy *policy; + + LASSERT(req->rq_nrq.nr_enqueued); + LASSERT(!req->rq_nrq.nr_dequeued); + + policy = nrs_request_policy(&req->rq_nrq); + nrs_request_dequeue(&req->rq_nrq); + req->rq_nrq.nr_dequeued = 1; + + /** + * If the policy has no more requests queued, remove it from + * ptlrpc_nrs::nrs_policy_queued. + */ + if (policy->pol_req_queued == 0) { + cfs_list_del_init(&policy->pol_list_queued); + + /** + * If there are other policies with queued requests, move the + * current policy to the end so that we can round robin over + * all policies and drain the requests. + */ + } else if (policy->pol_req_queued != policy->pol_nrs->nrs_req_queued) { + LASSERT(policy->pol_req_queued < + policy->pol_nrs->nrs_req_queued); + + cfs_list_move_tail(&policy->pol_list_queued, + &policy->pol_nrs->nrs_policy_queued); + } +} + +/** + * Returns whether there are any requests currently enqueued on any of the + * policies of service partition's \a svcpt NRS head specified by \a hp. Should + * be called while holding ptlrpc_service_part::scp_req_lock to get a reliable + * result. + * + * \param[in] svcpt The service partition to enquire. + * \param[in] hp Whether the regular or high-priority NRS head is to be + * enquired. + * + * \retval false The indicated NRS head has no enqueued requests. + * \retval true The indicated NRS head has some enqueued requests. + */ +bool +ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp) +{ + struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp); + + return nrs->nrs_req_queued > 0; +}; + +/** + * Moves request \a req from the regular to the high-priority NRS head. + * + * \param[in] req The request to move + */ +void +ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req) +{ + struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt; + struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, true); + struct ptlrpc_nrs_request *nrq = &req->rq_nrq; + struct ptlrpc_nrs_resource *res1[NRS_RES_MAX]; + struct ptlrpc_nrs_resource *res2[NRS_RES_MAX]; + ENTRY; + + /** + * Obtain the high-priority NRS head resources. + * XXX: Maybe want to remove nrs_resource[get|put]_safe() dance + * when request cannot actually move; move this further down? + */ + nrs_resource_get_safe(nrs, nrq, res1, true); + + spin_lock(&svcpt->scp_req_lock); + + if (!ptlrpc_nrs_req_can_move(req)) + goto out; + + ptlrpc_nrs_req_del_nolock(req); + nrq->nr_enqueued = nrq->nr_dequeued = 0; + + memcpy(res2, nrq->nr_res_ptrs, NRS_RES_MAX * sizeof(res2[0])); + memcpy(nrq->nr_res_ptrs, res1, NRS_RES_MAX * sizeof(res1[0])); + + ptlrpc_nrs_hpreq_add_nolock(req); + + memcpy(res1, res2, NRS_RES_MAX * sizeof(res1[0])); +out: + spin_unlock(&svcpt->scp_req_lock); + + /** + * Release either the regular NRS head resources if we moved the + * request, or the high-priority NRS head resources if we took a + * reference earlier in this function and ptlrpc_nrs_req_can_move() + * returned false. + */ + nrs_resource_put_safe(res1); + EXIT; +} + +/** + * Carries out a control operation \a opc on the policy identified by the + * human-readable \a name, on either all partitions, or only on the first + * partition of service \a svc. + * + * \param[in] svc The service the policy belongs to. + * \param[in] queue Whether to carry out the command on the policy which + * belongs to the regular, high-priority, or both NRS + * heads of service partitions of \a svc. + * \param[in] name The policy to act upon, by human-readable name + * \param[in] opc The opcode of the operation to carry out + * \param[in] single When set, the operation will only be carried out on the + * NRS heads of the first service partition of \a svc. + * This is useful for some policies which e.g. share + * identical values on the same parameters of different + * service partitions; when reading these parameters via + * lprocfs, these policies may just want to obtain and + * print out the values from the first service partition. + * Storing these values centrally elsewhere then could be + * another solution for this. + * \param[in,out] arg Can be used as a generic in/out buffer between control + * operations and the user environment. + * + *\retval -ve error condition + *\retval 0 operation was carried out successfully + */ +int +ptlrpc_nrs_policy_control(struct ptlrpc_service *svc, + enum ptlrpc_nrs_queue_type queue, char *name, + enum ptlrpc_nrs_ctl opc, bool single, void *arg) +{ + struct ptlrpc_service_part *svcpt; + int i; + int rc = 0; + ENTRY; + + ptlrpc_service_for_each_part(svcpt, i, svc) { + switch (queue) { + default: + return -EINVAL; + + case PTLRPC_NRS_QUEUE_BOTH: + case PTLRPC_NRS_QUEUE_REG: + rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, false), name, + opc, arg); + if (rc != 0 || (queue == PTLRPC_NRS_QUEUE_REG && + single)) + GOTO(out, rc); + + if (queue == PTLRPC_NRS_QUEUE_REG) + break; + + /* fallthrough */ + + case PTLRPC_NRS_QUEUE_HP: + /** + * XXX: We could optionally check for + * nrs_svc_has_hp(svc) here, and return an error if it + * is false. Right now we rely on the policies' lprocfs + * handlers that call the present function to make this + * check; if they fail to do so, they might hit the + * assertion inside nrs_svcpt2nrs() below. + */ + rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, true), name, + opc, arg); + if (rc != 0 || single) + GOTO(out, rc); + + break; + } + } +out: + RETURN(rc); +} + +/** + * Adds all policies that ship with NRS, i.e. those in the \e nrs_pols_builtin + * array, to NRS core's list of policies \e nrs_core.nrs_policies. + * + * \retval 0 All policy descriptors in \e nrs_pols_builtin have been added + * successfully to \e nrs_core.nrs_policies + */ +int +ptlrpc_nrs_init(void) +{ + int rc = -EINVAL; + int i; + ENTRY; + + /** + * Initialize the NRS core object. + */ + mutex_init(&nrs_core.nrs_mutex); + CFS_INIT_LIST_HEAD(&nrs_core.nrs_heads); + CFS_INIT_LIST_HEAD(&nrs_core.nrs_policies); + + for (i = 0; i < ARRAY_SIZE(nrs_pols_builtin); i++) { + /** + * No need to take nrs_core.nrs_mutex as there is no contention at + * this early stage. + */ + rc = nrs_policy_exists_locked(nrs_pols_builtin[i]); + /** + * This should not fail for in-tree policies. + */ + LASSERT(rc == false); + cfs_list_add_tail(&nrs_pols_builtin[i]->pd_list, + &nrs_core.nrs_policies); + } + + RETURN(rc); +} + +/** + * Stub finalization function + */ +void +ptlrpc_nrs_fini(void) +{ +} + +/** @} nrs */ diff --git a/lustre/ptlrpc/nrs_fifo.c b/lustre/ptlrpc/nrs_fifo.c new file mode 100644 index 0000000..1f4de96 --- /dev/null +++ b/lustre/ptlrpc/nrs_fifo.c @@ -0,0 +1,276 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. A copy is + * included in the COPYING file that accompanied this code. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * GPL HEADER END + */ +/* + * Copyright (c) 2011 Intel Corporation + * + * Copyright 2012 Xyratex Technology Limited + */ +/* + * lustre/ptlrpc/nrs_fifo.c + * + * Network Request Scheduler (NRS) FIFO policy + * + * Handles RPCs in a FIFO manner, as received from the network. This policy is + * a logical wrapper around previous, non-NRS functionality. It is used as the + * default and fallback policy for all types of RPCs on all PTLRPC service + * partitions, for both regular and high-priority NRS heads. Default here means + * the policy is the one enabled at PTLRPC service partition startup time, and + * fallback means the policy is used to handle RPCs that are not handled + * successfully or are not handled at all by any primary policy that may be + * enabled on a given NRS head. + * + * Author: Liang Zhen + * Author: Nikitas Angelinas + */ +/** + * \addtogoup nrs + * @{ + */ + +#define DEBUG_SUBSYSTEM S_RPC +#ifndef __KERNEL__ +#include +#endif +#include +#include +#include +#include "ptlrpc_internal.h" + +/** + * \name fifo + * + * The FIFO policy is a logical wrapper around previous, non-NRS functionality. + * It schedules RPCs in the same order as they are queued from LNet. + * + * @{ + */ + +/** + * Is called before the policy transitions into + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a + * policy-specific private data structure. + * + * \param[in] policy The policy to start + * + * \retval -ENOMEM OOM error + * \retval 0 success + * + * \see nrs_policy_register() + * \see nrs_policy_ctl() + */ +static int +nrs_fifo_start(struct ptlrpc_nrs_policy *policy) +{ + struct nrs_fifo_head *head; + + OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy)); + if (head == NULL) + return -ENOMEM; + + CFS_INIT_LIST_HEAD(&head->fh_list); + policy->pol_private = head; + return 0; +} + +/** + * Is called before the policy transitions into + * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific + * private data structure. + * + * \param[in] policy The policy to stop + * + * \see nrs_policy_stop0() + */ +static void +nrs_fifo_stop(struct ptlrpc_nrs_policy *policy) +{ + struct nrs_fifo_head *head = policy->pol_private; + + LASSERT(head != NULL); + LASSERT(cfs_list_empty(&head->fh_list)); + + OBD_FREE_PTR(head); +} + +/** + * Is called for obtaining a FIFO policy resource. + * + * \param[in] policy The policy on which the request is being asked for + * \param[in] nrq The request for which resources are being taken + * \param[in] parent Parent resource, unused in this policy + * \param[out] resp Resources references are placed in this array + * \param[in] moving_req Signifies limited caller context; unused in this + * policy + * + * \retval 1 The FIFO policy only has a one-level resource hierarchy, as since + * it implements a simple scheduling algorithm in which request + * priority is determined on the request arrival order, it does not + * need to maintain a set of resources that would otherwise be used + * to calculate a request's priority. + * + * \see nrs_resource_get_safe() + */ +static int +nrs_fifo_res_get(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq, + struct ptlrpc_nrs_resource *parent, + struct ptlrpc_nrs_resource **resp, bool moving_req) +{ + /** + * Just return the resource embedded inside nrs_fifo_head, and end this + * resource hierarchy reference request. + */ + *resp = &((struct nrs_fifo_head *)policy->pol_private)->fh_res; + return 1; +} + +/** + * Called when polling the fifo policy for a request. + * + * \param[in] policy The policy being polled + * + * \retval The request to be handled; this is the next request in the FIFO + * queue + * \see ptlrpc_nrs_req_poll_nolock() + */ +static struct ptlrpc_nrs_request * +nrs_fifo_req_poll(struct ptlrpc_nrs_policy *policy) +{ + struct nrs_fifo_head *head = policy->pol_private; + + LASSERT(head != NULL); + + return cfs_list_empty(&head->fh_list) ? NULL : + cfs_list_entry(head->fh_list.next, struct ptlrpc_nrs_request, + nr_u.fifo.fr_list); +} + +/** + * Adds request \a nrq to \a policy's list of queued requests + * + * \param[in] policy The policy + * \param[in] nrq The request to add + * + * \retval 0 success; nrs_request_enqueue() assumes this function will always + * succeed + */ +static int +nrs_fifo_req_add(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq) +{ + struct nrs_fifo_head *head; + + head = container_of(nrs_request_resource(nrq), struct nrs_fifo_head, + fh_res); + /** + * Only used for debugging + */ + nrq->nr_u.fifo.fr_sequence = head->fh_sequence++; + cfs_list_add_tail(&nrq->nr_u.fifo.fr_list, &head->fh_list); + + return 0; +} + +/** + * Removes request \a nrq from \a policy's list of queued requests. + * + * \param[in] policy The policy + * \param[in] nrq The request to remove + */ +static void +nrs_fifo_req_del(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq) +{ + LASSERT(!cfs_list_empty(&nrq->nr_u.fifo.fr_list)); + cfs_list_del_init(&nrq->nr_u.fifo.fr_list); +} + +/** + * Prints a debug statement right before the request \a nrq starts being + * handled. + * + * \param[in] policy The policy handling the request + * \param[in] nrq The request being handled + */ +static void +nrs_fifo_req_start(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq) +{ + struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request, + rq_nrq); + + CDEBUG(D_RPCTRACE, "NRS start %s request from %s, seq: "LPU64"\n", + nrs_request_policy(nrq)->pol_name, libcfs_id2str(req->rq_peer), + nrq->nr_u.fifo.fr_sequence); +} + +/** + * Prints a debug statement right before the request \a nrq stops being + * handled. + * + * \param[in] policy The policy handling the request + * \param[in] nrq The request being handled + * + * \see ptlrpc_server_finish_request() + * \see ptlrpc_nrs_req_stop_nolock() + */ +static void +nrs_fifo_req_stop(struct ptlrpc_nrs_policy *policy, + struct ptlrpc_nrs_request *nrq) +{ + struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request, + rq_nrq); + + CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: "LPU64"\n", + nrs_request_policy(nrq)->pol_name, libcfs_id2str(req->rq_peer), + nrq->nr_u.fifo.fr_sequence); +} + +/** + * FIFO policy operations + */ +static struct ptlrpc_nrs_pol_ops nrs_fifo_ops = { + .op_policy_start = nrs_fifo_start, + .op_policy_stop = nrs_fifo_stop, + .op_res_get = nrs_fifo_res_get, + .op_req_poll = nrs_fifo_req_poll, + .op_req_enqueue = nrs_fifo_req_add, + .op_req_dequeue = nrs_fifo_req_del, + .op_req_start = nrs_fifo_req_start, + .op_req_stop = nrs_fifo_req_stop, +}; + +/** + * FIFO policy descriptor + */ +struct ptlrpc_nrs_pol_desc ptlrpc_nrs_fifo_desc = { + .pd_name = "fifo", + .pd_ops = &nrs_fifo_ops, + .pd_compat = nrs_policy_compat_all, + .pd_flags = PTLRPC_NRS_FL_FALLBACK | + PTLRPC_NRS_FL_REG_START +}; + +/** @} fifo */ + +/** @} nrs */ + diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index 7c10574..606e333 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -46,6 +46,7 @@ struct obd_import; struct ldlm_res_id; struct ptlrpc_request_set; extern int test_req_buffer_pressure; +extern struct mutex ptlrpc_all_services_mutex; int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait); /* ptlrpcd.c */ @@ -84,6 +85,125 @@ void ptlrpc_lprocfs_do_request_stat (struct ptlrpc_request *req, #define ptlrpc_lprocfs_do_request_stat(params...) do{}while(0) #endif /* LPROCFS */ +/* NRS */ + +/** + * NRS core object. + * + * Holds NRS core fields. + */ +struct nrs_core { + /** + * Protects nrs_core::nrs_heads, nrs_core::nrs_policies, serializes + * external policy registration/unregistration, and NRS core lprocfs + * operations. + */ + struct mutex nrs_mutex; + /* XXX: This is just for liblustre. Remove the #if defined directive + * when the * "cfs_" prefix is dropped from cfs_list_head. */ +#if defined (__linux__) && defined(__KERNEL__) + /** + * List of all NRS heads on all service partitions of all services; + * protected by nrs_core::nrs_mutex. + */ + struct list_head nrs_heads; + /** + * List of all policy descriptors registered with NRS core; protected + * by nrs_core::nrs_mutex. + */ + struct list_head nrs_policies; +#else + struct cfs_list_head nrs_heads; + struct cfs_list_head nrs_policies; +#endif + +}; + +int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc); +void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc); + +void ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt, + struct ptlrpc_request *req, bool hp); +void ptlrpc_nrs_req_finalize(struct ptlrpc_request *req); +void ptlrpc_nrs_req_start_nolock(struct ptlrpc_request *req); +void ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req); +void ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt, + struct ptlrpc_request *req, bool hp); +struct ptlrpc_request * +ptlrpc_nrs_req_poll_nolock(struct ptlrpc_service_part *svcpt, + bool hp); +void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req); +bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp); + +int ptlrpc_nrs_policy_control(struct ptlrpc_service *svc, + enum ptlrpc_nrs_queue_type queue, char *name, + enum ptlrpc_nrs_ctl opc, bool single, void *arg); + +int ptlrpc_nrs_init(void); +void ptlrpc_nrs_fini(void); + +static inline int +nrs_svcpt_has_hp(struct ptlrpc_service_part *svcpt) +{ + return svcpt->scp_nrs_hp != NULL; +} + +static inline int +nrs_svc_has_hp(struct ptlrpc_service *svc) +{ + /** + * If the first service partition has an HP NRS head, all service + * partitions will. + */ + return nrs_svcpt_has_hp(svc->srv_parts[0]); +} + +static inline struct ptlrpc_nrs * +nrs_svcpt2nrs(struct ptlrpc_service_part *svcpt, bool hp) +{ + LASSERT(ergo(hp, nrs_svcpt_has_hp(svcpt))); + return hp ? svcpt->scp_nrs_hp : &svcpt->scp_nrs_reg; +} + +static inline int +nrs_pol2cptid(struct ptlrpc_nrs_policy *policy) +{ + return policy->pol_nrs->nrs_svcpt->scp_cpt; +} + +static inline struct ptlrpc_service * +nrs_pol2svc(struct ptlrpc_nrs_policy *policy) +{ + return policy->pol_nrs->nrs_svcpt->scp_service; +} + +static inline struct ptlrpc_service_part * +nrs_pol2svcpt(struct ptlrpc_nrs_policy *policy) +{ + return policy->pol_nrs->nrs_svcpt; +} + +static inline struct cfs_cpt_table * +nrs_pol2cptab(struct ptlrpc_nrs_policy *policy) +{ + return nrs_pol2svc(policy)->srv_cptable; +} + +static inline struct ptlrpc_nrs_resource * +nrs_request_resource(struct ptlrpc_nrs_request *nrq) +{ + LASSERT(nrq->nr_initialized); + LASSERT(!nrq->nr_finalized); + + return nrq->nr_res_ptrs[nrq->nr_res_idx]; +} + +static inline struct ptlrpc_nrs_policy * +nrs_request_policy(struct ptlrpc_nrs_request *nrq) +{ + return nrs_request_resource(nrq)->res_policy; +} + /* recovd_thread.c */ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink); diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 1026e48..51869db 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -51,7 +51,6 @@ extern spinlock_t ptlrpc_last_xid_lock; #if RS_DEBUG extern spinlock_t ptlrpc_rs_debug_lock; #endif -extern spinlock_t ptlrpc_all_services_lock; extern struct mutex pinger_mutex; extern struct mutex ptlrpcd_mutex; @@ -64,7 +63,7 @@ __init int ptlrpc_init(void) #if RS_DEBUG spin_lock_init(&ptlrpc_rs_debug_lock); #endif - spin_lock_init(&ptlrpc_all_services_lock); + mutex_init(&ptlrpc_all_services_mutex); mutex_init(&pinger_mutex); mutex_init(&ptlrpcd_mutex); ptlrpc_init_xid(); @@ -110,8 +109,13 @@ __init int ptlrpc_init(void) if (rc) GOTO(cleanup, rc); -#ifdef __KERNEL__ cleanup_phase = 7; + rc = ptlrpc_nrs_init(); + if (rc) + GOTO(cleanup, rc); + +#ifdef __KERNEL__ + cleanup_phase = 8; rc = tgt_mod_init(); if (rc) GOTO(cleanup, rc); @@ -121,9 +125,11 @@ __init int ptlrpc_init(void) cleanup: switch(cleanup_phase) { #ifdef __KERNEL__ + case 8: + ptlrpc_nrs_fini(); +#endif case 7: llog_recov_fini(); -#endif case 6: sptlrpc_fini(); case 5: @@ -147,6 +153,7 @@ cleanup: static void __exit ptlrpc_exit(void) { tgt_mod_exit(); + ptlrpc_nrs_fini(); llog_recov_fini(); sptlrpc_fini(); ldlm_exit(); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 09b32fa..574cd63 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -67,8 +67,10 @@ static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt); static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req); static void ptlrpc_at_remove_timed(struct ptlrpc_request *req); -static CFS_LIST_HEAD(ptlrpc_all_services); -spinlock_t ptlrpc_all_services_lock; +/** Holds a list of all PTLRPC services */ +CFS_LIST_HEAD(ptlrpc_all_services); +/** Used to protect the \e ptlrpc_all_services list */ +struct mutex ptlrpc_all_services_mutex; struct ptlrpc_request_buffer_desc * ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt) @@ -636,8 +638,6 @@ ptlrpc_service_part_init(struct ptlrpc_service *svc, /* acitve requests and hp requests */ spin_lock_init(&svcpt->scp_req_lock); - CFS_INIT_LIST_HEAD(&svcpt->scp_req_pending); - CFS_INIT_LIST_HEAD(&svcpt->scp_hreq_pending); /* reply states */ spin_lock_init(&svcpt->scp_rep_lock); @@ -824,15 +824,19 @@ ptlrpc_register_service(struct ptlrpc_service_conf *conf, rc = LNetSetLazyPortal(service->srv_req_portal); LASSERT(rc == 0); - spin_lock(&ptlrpc_all_services_lock); - cfs_list_add (&service->srv_list, &ptlrpc_all_services); - spin_unlock(&ptlrpc_all_services_lock); + mutex_lock(&ptlrpc_all_services_mutex); + cfs_list_add (&service->srv_list, &ptlrpc_all_services); + mutex_unlock(&ptlrpc_all_services_mutex); - if (proc_entry != NULL) - ptlrpc_lprocfs_register_service(proc_entry, service); + if (proc_entry != NULL) + ptlrpc_lprocfs_register_service(proc_entry, service); - CDEBUG(D_NET, "%s: Started, listening on portal %d\n", - service->srv_name, service->srv_req_portal); + rc = ptlrpc_service_nrs_setup(service); + if (rc != 0) + GOTO(failed, rc); + + CDEBUG(D_NET, "%s: Started, listening on portal %d\n", + service->srv_name, service->srv_req_portal); #ifdef __KERNEL__ rc = ptlrpc_start_threads(service); @@ -991,11 +995,14 @@ static void ptlrpc_server_finish_request(struct ptlrpc_service_part *svcpt, ptlrpc_server_hpreq_fini(req); spin_lock(&svcpt->scp_req_lock); + ptlrpc_nrs_req_stop_nolock(req); svcpt->scp_nreqs_active--; if (req->rq_hp) svcpt->scp_nhreqs_active--; spin_unlock(&svcpt->scp_req_lock); + ptlrpc_nrs_req_finalize(req); + ptlrpc_server_drop_request(req); } @@ -1471,23 +1478,39 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt) * Put the request to the export list if the request may become * a high priority one. */ -static int ptlrpc_server_hpreq_init(struct ptlrpc_service *svc, +static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt, struct ptlrpc_request *req) { - int rc = 0; - ENTRY; + int rc = 0; + ENTRY; - if (svc->srv_ops.so_hpreq_handler) { - rc = svc->srv_ops.so_hpreq_handler(req); - if (rc) - RETURN(rc); - } - if (req->rq_export && req->rq_ops) { - /* Perform request specific check. We should do this check - * before the request is added into exp_hp_rpcs list otherwise - * it may hit swab race at LU-1044. */ - if (req->rq_ops->hpreq_check) - rc = req->rq_ops->hpreq_check(req); + if (svcpt->scp_service->srv_ops.so_hpreq_handler) { + rc = svcpt->scp_service->srv_ops.so_hpreq_handler(req); + if (rc < 0) + RETURN(rc); + LASSERT(rc == 0); + } + if (req->rq_export && req->rq_ops) { + /* Perform request specific check. We should do this check + * before the request is added into exp_hp_rpcs list otherwise + * it may hit swab race at LU-1044. */ + if (req->rq_ops->hpreq_check) { + rc = req->rq_ops->hpreq_check(req); + /** + * XXX: Out of all current + * ptlrpc_hpreq_ops::hpreq_check(), only + * ldlm_cancel_hpreq_check() can return an error code; + * other functions assert in similar places, which seems + * odd. What also does not seem right is that handlers + * for those RPCs do not assert on the same checks, but + * rather handle the error cases. e.g. see + * ost_rw_hpreq_check(), and ost_brw_read(), + * ost_brw_write(). + */ + if (rc < 0) + RETURN(rc); + LASSERT(rc == 0 || rc == 1); + } spin_lock_bh(&req->rq_export->exp_rpc_lock); cfs_list_add(&req->rq_exp_list, @@ -1495,6 +1518,8 @@ static int ptlrpc_server_hpreq_init(struct ptlrpc_service *svc, spin_unlock_bh(&req->rq_export->exp_rpc_lock); } + ptlrpc_nrs_req_initialize(svcpt, req, rc); + RETURN(rc); } @@ -1539,77 +1564,17 @@ int ptlrpc_hpreq_handler(struct ptlrpc_request *req) } EXPORT_SYMBOL(ptlrpc_hpreq_handler); -/** - * Make the request a high priority one. - * - * All the high priority requests are queued in a separate FIFO - * ptlrpc_service_part::scp_hpreq_pending list which is parallel to - * ptlrpc_service_part::scp_req_pending list but has a higher priority - * for handling. - * - * \see ptlrpc_server_handle_request(). - */ -static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service_part *svcpt, - struct ptlrpc_request *req) -{ - ENTRY; - - spin_lock(&req->rq_lock); - if (req->rq_hp == 0) { - int opc = lustre_msg_get_opc(req->rq_reqmsg); - - /* Add to the high priority queue. */ - cfs_list_move_tail(&req->rq_list, &svcpt->scp_hreq_pending); - req->rq_hp = 1; - if (opc != OBD_PING) - DEBUG_REQ(D_RPCTRACE, req, "high priority req"); - } - spin_unlock(&req->rq_lock); - EXIT; -} - -/** - * \see ptlrpc_hpreq_reorder_nolock - */ -void ptlrpc_hpreq_reorder(struct ptlrpc_request *req) -{ - struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt; - ENTRY; - - spin_lock(&svcpt->scp_req_lock); - /* It may happen that the request is already taken for the processing - * but still in the export list, or the request is not in the request - * queue but in the export list already, do not add it into the - * HP list. */ - if (!cfs_list_empty(&req->rq_list)) - ptlrpc_hpreq_reorder_nolock(svcpt, req); - spin_unlock(&svcpt->scp_req_lock); - EXIT; -} -EXPORT_SYMBOL(ptlrpc_hpreq_reorder); - -/** - * Add a request to the regular or HP queue; optionally perform HP request - * initialization. - */ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt, struct ptlrpc_request *req) { int rc; ENTRY; - rc = ptlrpc_server_hpreq_init(svcpt->scp_service, req); + rc = ptlrpc_server_hpreq_init(svcpt, req); if (rc < 0) RETURN(rc); - spin_lock(&svcpt->scp_req_lock); - - if (rc) - ptlrpc_hpreq_reorder_nolock(svcpt, req); - else - cfs_list_add_tail(&req->rq_list, &svcpt->scp_req_pending); - - spin_unlock(&svcpt->scp_req_lock); + ptlrpc_nrs_req_add(svcpt, req, !!rc); RETURN(0); } @@ -1624,6 +1589,9 @@ static int ptlrpc_server_allow_high(struct ptlrpc_service_part *svcpt, { int running = svcpt->scp_nthrs_running; + if (!nrs_svcpt_has_hp(svcpt)) + return 0; + if (force) return 1; @@ -1641,7 +1609,7 @@ static int ptlrpc_server_allow_high(struct ptlrpc_service_part *svcpt, if (svcpt->scp_nhreqs_active == 0) return 1; - return cfs_list_empty(&svcpt->scp_req_pending) || + return !ptlrpc_nrs_req_pending_nolock(svcpt, false) || svcpt->scp_hreq_count < svcpt->scp_service->srv_hpreq_ratio; } @@ -1649,7 +1617,7 @@ static int ptlrpc_server_high_pending(struct ptlrpc_service_part *svcpt, int force) { return ptlrpc_server_allow_high(svcpt, force) && - !cfs_list_empty(&svcpt->scp_hreq_pending); + ptlrpc_nrs_req_pending_nolock(svcpt, true); } /** @@ -1685,14 +1653,14 @@ static int ptlrpc_server_allow_normal(struct ptlrpc_service_part *svcpt, return 0; return svcpt->scp_nhreqs_active > 0 || - svcpt->scp_service->srv_ops.so_hpreq_handler == NULL; + !nrs_svcpt_has_hp(svcpt); } static int ptlrpc_server_normal_pending(struct ptlrpc_service_part *svcpt, int force) { return ptlrpc_server_allow_normal(svcpt, force) && - !cfs_list_empty(&svcpt->scp_req_pending); + ptlrpc_nrs_req_pending_nolock(svcpt, false); } /** @@ -1722,15 +1690,13 @@ ptlrpc_server_request_get(struct ptlrpc_service_part *svcpt, int force) ENTRY; if (ptlrpc_server_high_pending(svcpt, force)) { - req = cfs_list_entry(svcpt->scp_hreq_pending.next, - struct ptlrpc_request, rq_list); + req = ptlrpc_nrs_req_poll_nolock(svcpt, true); svcpt->scp_hreq_count++; RETURN(req); } if (ptlrpc_server_normal_pending(svcpt, force)) { - req = cfs_list_entry(svcpt->scp_req_pending.next, - struct ptlrpc_request, rq_list); + req = ptlrpc_nrs_req_poll_nolock(svcpt, false); svcpt->scp_hreq_count = 0; RETURN(req); } @@ -1938,12 +1904,12 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt, } } } - - cfs_list_del_init(&request->rq_list); + ptlrpc_nrs_req_del_nolock(request); svcpt->scp_nreqs_active++; if (request->rq_hp) svcpt->scp_nhreqs_active++; + ptlrpc_nrs_req_start_nolock(request); spin_unlock(&svcpt->scp_req_lock); ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET); @@ -2032,7 +1998,7 @@ put_conn: cfs_gettimeofday(&work_end); timediff = cfs_timeval_sub(&work_end, &work_start, NULL); - CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc " + CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc " "%s:%s+%d:%d:x"LPU64":%s:%d Request procesed in " "%ldus (%ldus total) trans "LPU64" rc %d/%d\n", cfs_curproc_comm(), @@ -3083,7 +3049,7 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc) while (ptlrpc_server_request_pending(svcpt, 1)) { req = ptlrpc_server_request_get(svcpt, 1); - cfs_list_del(&req->rq_list); + ptlrpc_nrs_req_del_nolock(req); svcpt->scp_nreqs_active++; ptlrpc_server_hpreq_fini(req); @@ -3166,17 +3132,19 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) service->srv_is_stopping = 1; - spin_lock(&ptlrpc_all_services_lock); + mutex_lock(&ptlrpc_all_services_mutex); cfs_list_del_init(&service->srv_list); - spin_unlock(&ptlrpc_all_services_lock); - - ptlrpc_lprocfs_unregister_service(service); + mutex_unlock(&ptlrpc_all_services_mutex); ptlrpc_service_del_atimer(service); ptlrpc_stop_all_threads(service); ptlrpc_service_unlink_rqbd(service); ptlrpc_service_purge_all(service); + ptlrpc_service_nrs_cleanup(service); + + ptlrpc_lprocfs_unregister_service(service); + ptlrpc_service_free(service); RETURN(0); @@ -3203,14 +3171,10 @@ int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt) return 0; } - /* How long has the next entry been waiting? */ - if (cfs_list_empty(&svcpt->scp_req_pending)) { - request = cfs_list_entry(svcpt->scp_hreq_pending.next, - struct ptlrpc_request, rq_list); - } else { - request = cfs_list_entry(svcpt->scp_req_pending.next, - struct ptlrpc_request, rq_list); - } + /* How long has the next entry been waiting? */ + request = ptlrpc_nrs_req_poll_nolock(svcpt, true); + if (request == NULL) + request = ptlrpc_nrs_req_poll_nolock(svcpt, false); timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL); spin_unlock(&svcpt->scp_req_lock); -- 1.8.3.1