struct ptlrpc_bulk_desc;
struct ptlrpc_service_part;
+struct ptlrpc_service;
/**
* ptlrpc callback & work item stuff
struct ldlm_lock;
/**
+ * \defgroup nrs Network Request Scheduler
+ * @{
+ */
+struct ptlrpc_nrs_policy;
+struct ptlrpc_nrs_resource;
+struct ptlrpc_nrs_request;
+
+/**
+ * NRS control operations.
+ *
+ * These are common for all policies.
+ */
+enum ptlrpc_nrs_ctl {
+ /**
+ * Activate the policy.
+ */
+ PTLRPC_NRS_CTL_START,
+ /**
+ * Reserved for multiple primary policies, which may be a possibility
+ * in the future.
+ */
+ PTLRPC_NRS_CTL_STOP,
+ /**
+ * Recycle resources for inactive policies.
+ */
+ PTLRPC_NRS_CTL_SHRINK,
+ /**
+ * Not a valid opcode.
+ */
+ PTLRPC_NRS_CTL_INVALID,
+ /**
+ * Policies can start using opcodes from this value and onwards for
+ * their own purposes; the assigned value itself is arbitrary.
+ */
+ PTLRPC_NRS_CTL_1ST_POL_SPEC = 0x20,
+};
+
+/**
+ * NRS policy operations.
+ *
+ * These determine the behaviour of a policy, and are called in response to
+ * NRS core events.
+ */
+struct ptlrpc_nrs_pol_ops {
+ /**
+ * Called during policy registration; this operation is optional.
+ *
+ * \param[in] policy The policy being initialized
+ */
+ int (*op_policy_init) (struct ptlrpc_nrs_policy *policy);
+ /**
+ * Called during policy unregistration; this operation is optional.
+ *
+ * \param[in] policy The policy being unregistered/finalized
+ */
+ void (*op_policy_fini) (struct ptlrpc_nrs_policy *policy);
+ /**
+ * Called when activating a policy via lprocfs; policies allocate and
+ * initialize their resources here; this operation is optional.
+ *
+ * \param[in] policy The policy being started
+ *
+ * \see nrs_policy_start_locked()
+ */
+ int (*op_policy_start) (struct ptlrpc_nrs_policy *policy);
+ /**
+ * Called when deactivating a policy via lprocfs; policies deallocate
+ * their resources here; this operation is optional
+ *
+ * \param[in] policy The policy being stopped
+ *
+ * \see nrs_policy_stop_final()
+ */
+ void (*op_policy_stop) (struct ptlrpc_nrs_policy *policy);
+ /**
+ * Used for policy-specific operations; i.e. not generic ones like
+ * \e PTLRPC_NRS_CTL_START and \e PTLRPC_NRS_CTL_GET_INFO; analogous
+ * to an ioctl; this operation is optional.
+ *
+ * \param[in] policy The policy carrying out operation \a opc
+ * \param[in] opc The command operation being carried out
+ * \param[in,out] arg An generic buffer for communication between the
+ * user and the control operation
+ *
+ * \retval -ve error
+ * \retval 0 success
+ *
+ * \see ptlrpc_nrs_policy_control()
+ */
+ int (*op_policy_ctl) (struct ptlrpc_nrs_policy *policy,
+ enum ptlrpc_nrs_ctl opc, void *arg);
+
+ /**
+ * Called when obtaining references to the resources of the resource
+ * hierarchy for a request that has arrived for handling at the PTLRPC
+ * service. Policies should return -ve for requests they do not wish
+ * to handle. This operation is mandatory.
+ *
+ * \param[in] policy The policy we're getting resources for.
+ * \param[in] nrq The request we are getting resources for.
+ * \param[in] parent The parent resource of the resource being
+ * requested; set to NULL if none.
+ * \param[out] resp The resource is to be returned here; the
+ * fallback policy in an NRS head should
+ * \e always return a non-NULL pointer value.
+ * \param[in] moving_req When set, signifies that this is an attempt
+ * to obtain resources for a request being moved
+ * to the high-priority NRS head by
+ * ldlm_lock_reorder_req().
+ * This implies two things:
+ * 1. We are under obd_export::exp_rpc_lock and
+ * so should not sleep.
+ * 2. We should not perform non-idempotent or can
+ * skip performing idempotent operations that
+ * were carried out when resources were first
+ * taken for the request when it was initialized
+ * in ptlrpc_nrs_req_initialize().
+ *
+ * \retval 0, +ve The level of the returned resource in the resource
+ * hierarchy; currently only 0 (for a non-leaf resource)
+ * and 1 (for a leaf resource) are supported by the
+ * framework.
+ * \retval -ve error
+ *
+ * \see ptlrpc_nrs_req_initialize()
+ * \see ptlrpc_nrs_hpreq_add_nolock()
+ * \see ptlrpc_nrs_req_hp_move()
+ */
+ int (*op_res_get) (struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq,
+ struct ptlrpc_nrs_resource *parent,
+ struct ptlrpc_nrs_resource **resp,
+ bool moving_req);
+ /**
+ * Called when releasing references taken for resources in the resource
+ * hierarchy for the request; this operation is optional.
+ *
+ * \param[in] policy The policy the resource belongs to
+ * \param[in] res The resource to be freed
+ *
+ * \see ptlrpc_nrs_req_finalize()
+ * \see ptlrpc_nrs_hpreq_add_nolock()
+ * \see ptlrpc_nrs_req_hp_move()
+ */
+ void (*op_res_put) (struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_resource *res);
+
+ /**
+ * Obtain a request for handling from the policy via polling; this
+ * operation is mandatory.
+ *
+ * \param[in] policy The policy to poll
+ *
+ * \retval NULL No erquest available for handling
+ * \retval valid-pointer The request polled for handling
+ *
+ * \see ptlrpc_nrs_req_poll_nolock()
+ */
+ struct ptlrpc_nrs_request *
+ (*op_req_poll) (struct ptlrpc_nrs_policy *policy);
+ /**
+ * Called when attempting to add a request to a policy for later
+ * handling; this operation is mandatory.
+ *
+ * \param[in] policy The policy on which to enqueue \a nrq
+ * \param[in] nrq The request to enqueue
+ *
+ * \retval 0 success
+ * \retval != 0 error
+ *
+ * \see ptlrpc_nrs_req_add_nolock()
+ */
+ int (*op_req_enqueue) (struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq);
+ /**
+ * Removes a request from the policy's set of pending requests. Normally
+ * called after a request has been polled successfully from the policy
+ * for handling; this operation is mandatory.
+ *
+ * \param[in] policy The policy the request \a nrq belongs to
+ * \param[in] nrq The request to dequeue
+ *
+ * \see ptlrpc_nrs_req_del_nolock()
+ */
+ void (*op_req_dequeue) (struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq);
+ /**
+ * Called before carrying out the request; should not block. Could be
+ * used for job/resource control; this operation is optional.
+ *
+ * \param[in] policy The policy which is starting to handle request
+ * \a nrq
+ * \param[in] nrq The request
+ *
+ * \pre spin_is_locked(&svcpt->scp_req_lock)
+ *
+ * \see ptlrpc_nrs_req_start_nolock()
+ */
+ void (*op_req_start) (struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq);
+ /**
+ * Called after the request being carried out. Could be used for
+ * job/resource control; this operation is optional.
+ *
+ * \param[in] policy The policy which is stopping to handle request
+ * \a nrq
+ * \param[in] nrq The request
+ *
+ * \pre spin_is_locked(&svcpt->scp_req_lock)
+ *
+ * \see ptlrpc_nrs_req_stop_nolock()
+ */
+ void (*op_req_stop) (struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq);
+ /**
+ * Registers the policy's lprocfs interface with a PTLRPC service.
+ *
+ * \param[in] svc The service
+ *
+ * \retval 0 success
+ * \retval != 0 error
+ */
+ int (*op_lprocfs_init) (struct ptlrpc_service *svc);
+ /**
+ * Unegisters the policy's lprocfs interface with a PTLRPC service.
+ *
+ * \param[in] svc The service
+ */
+ void (*op_lprocfs_fini) (struct ptlrpc_service *svc);
+};
+
+/**
+ * Policy flags
+ */
+enum nrs_policy_flags {
+ /**
+ * Fallback policy, use this flag only on a single supported policy per
+ * service. Do not use this flag for policies registering using
+ * ptlrpc_nrs_policy_register() (i.e. ones that are not in
+ * \e nrs_pols_builtin).
+ */
+ PTLRPC_NRS_FL_FALLBACK = (1 << 0),
+ /**
+ * Start policy immediately after registering.
+ */
+ PTLRPC_NRS_FL_REG_START = (1 << 1),
+ /**
+ * This is a polciy registering externally with NRS core, via
+ * ptlrpc_nrs_policy_register(), (i.e. one that is not in
+ * \e nrs_pols_builtin. Used to avoid ptlrpc_nrs_policy_register()
+ * racing with a policy start operation issued by the user via lprocfs.
+ */
+ PTLRPC_NRS_FL_REG_EXTERN = (1 << 2),
+};
+
+/**
+ * NRS queue type.
+ *
+ * Denotes whether an NRS instance is for handling normal or high-priority
+ * RPCs, or whether an operation pertains to one or both of the NRS instances
+ * in a service.
+ */
+enum ptlrpc_nrs_queue_type {
+ PTLRPC_NRS_QUEUE_REG,
+ PTLRPC_NRS_QUEUE_HP,
+ PTLRPC_NRS_QUEUE_BOTH,
+};
+
+/**
+ * NRS head
+ *
+ * A PTLRPC service has at least one NRS head instance for handling normal
+ * priority RPCs, and may optionally have a second NRS head instance for
+ * handling high-priority RPCs. Each NRS head maintains a list of available
+ * policies, of which one and only one policy is acting as the fallback policy,
+ * and optionally a different policy may be acting as the primary policy. For
+ * all RPCs handled by this NRS head instance, NRS core will first attempt to
+ * enqueue the RPC using the primary policy (if any). The fallback policy is
+ * used in the following cases:
+ * - when there was no primary policy in the
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED state at the time the request
+ * was initialized.
+ * - when the primary policy that was at the
+ * ptlrpc_nrs_pol_state::PTLRPC_NRS_POL_STATE_STARTED state at the time the
+ * RPC was initialized, denoted it did not wish, or for some other reason was
+ * not able to handle the request, by returning a non-valid NRS resource
+ * reference.
+ * - when the primary policy that was at the
+ * ptlrpc_nrs_pol_state::PTLRPC_NRS_POL_STATE_STARTED state at the time the
+ * RPC was initialized, fails later during the request enqueueing stage.
+ *
+ * \see nrs_resource_get_safe()
+ * \see nrs_request_enqueue()
+ */
+struct ptlrpc_nrs {
+ spinlock_t nrs_lock;
+ /** XXX Possibly replace svcpt->scp_req_lock with another lock here. */
+ /**
+ * Linkage into nrs_core_heads_list
+ */
+ cfs_list_t nrs_heads;
+ /**
+ * List of registered policies
+ */
+ cfs_list_t nrs_policy_list;
+ /**
+ * List of policies with queued requests. Policies that have any
+ * outstanding requests are queued here, and this list is queried
+ * in a round-robin manner from NRS core when obtaining a request
+ * for handling. This ensures that requests from policies that at some
+ * point transition away from the
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED state are drained.
+ */
+ cfs_list_t nrs_policy_queued;
+ /**
+ * Service partition for this NRS head
+ */
+ struct ptlrpc_service_part *nrs_svcpt;
+ /**
+ * Primary policy, which is the preferred policy for handling RPCs
+ */
+ struct ptlrpc_nrs_policy *nrs_policy_primary;
+ /**
+ * Fallback policy, which is the backup policy for handling RPCs
+ */
+ struct ptlrpc_nrs_policy *nrs_policy_fallback;
+ /**
+ * This NRS head handles either HP or regular requests
+ */
+ enum ptlrpc_nrs_queue_type nrs_queue_type;
+ /**
+ * # queued requests from all policies in this NRS head
+ */
+ unsigned long nrs_req_queued;
+ /**
+ * # scheduled requests from all policies in this NRS head
+ */
+ unsigned long nrs_req_started;
+ /**
+ * # policies on this NRS
+ * TODO: Can we avoid having this?
+ */
+ unsigned nrs_num_pols;
+ /**
+ * This NRS head is in progress of starting a policy
+ */
+ unsigned nrs_policy_starting:1;
+ /**
+ * In progress of shutting down the whole NRS head; used during
+ * unregistration
+ */
+ unsigned nrs_stopping:1;
+};
+
+#define NRS_POL_NAME_MAX 16
+
+/**
+ * NRS policy registering descriptor
+ *
+ * Is used to hold a description of a policy that can be passed to NRS core in
+ * order to register the policy with NRS heads in different PTLRPC services.
+ */
+struct ptlrpc_nrs_pol_desc {
+ /**
+ * Human-readable policy name
+ */
+ char pd_name[NRS_POL_NAME_MAX];
+ /**
+ * NRS operations for this policy
+ */
+ struct ptlrpc_nrs_pol_ops *pd_ops;
+ /**
+ * Service Compatibility function; this determines whether a policy is
+ * adequate for handling RPCs of a particular PTLRPC service.
+ *
+ * XXX:This should give the same result during policy
+ * registration and unregistration, and for all partitions of a
+ * service; so the result should not depend on temporal service
+ * or other properties, that may influence the result.
+ */
+ bool (*pd_compat) (struct ptlrpc_service *svc,
+ const struct ptlrpc_nrs_pol_desc *desc);
+ /**
+ * Optionally set for policies that support a single ptlrpc service,
+ * i.e. ones that have \a pd_compat set to nrs_policy_compat_one()
+ */
+ char *pd_compat_svc_name;
+ /**
+ * Bitmask of nrs_policy_flags
+ */
+ unsigned pd_flags;
+ /**
+ * Link into nrs_core::nrs_policies
+ */
+ cfs_list_t pd_list;
+};
+
+/**
+ * NRS policy state
+ *
+ * Policies transition from one state to the other during their lifetime
+ */
+enum ptlrpc_nrs_pol_state {
+ /**
+ * Not a valid policy state.
+ */
+ NRS_POL_STATE_INVALID,
+ /**
+ * For now, this state is used exclusively for policies that register
+ * externally to NRS core, i.e. ones that do so via
+ * ptlrpc_nrs_policy_register() and are not part of nrs_pols_builtin;
+ * it is used to prevent a race condition between the policy registering
+ * with more than one service partition while service is operational,
+ * and the user starting the policy via lprocfs.
+ *
+ * \see nrs_pol_make_avail()
+ */
+ NRS_POL_STATE_UNAVAIL,
+ /**
+ * Policies are at this state either at the start of their life, or
+ * transition here when the user selects a different policy to act
+ * as the primary one.
+ */
+ NRS_POL_STATE_STOPPED,
+ /**
+ * Policy is progress of stopping
+ */
+ NRS_POL_STATE_STOPPING,
+ /**
+ * Policy is in progress of starting
+ */
+ NRS_POL_STATE_STARTING,
+ /**
+ * A policy is in this state in two cases:
+ * - it is the fallback policy, which is always in this state.
+ * - it has been activated by the user; i.e. it is the primary policy,
+ */
+ NRS_POL_STATE_STARTED,
+};
+
+/**
+ * NRS policy information
+ *
+ * Used for obtaining information for the status of a policy via lprocfs
+ */
+struct ptlrpc_nrs_pol_info {
+ /**
+ * Policy name
+ */
+ char pi_name[NRS_POL_NAME_MAX];
+ /**
+ * Current policy state
+ */
+ enum ptlrpc_nrs_pol_state pi_state;
+ /**
+ * # RPCs enqueued for later dispatching by the policy
+ */
+ long pi_req_queued;
+ /**
+ * # RPCs started for dispatch by the policy
+ */
+ long pi_req_started;
+ /**
+ * Is this a fallback policy?
+ */
+ unsigned pi_fallback:1;
+};
+
+/**
+ * NRS policy
+ *
+ * There is one instance of this for each policy in each NRS head of each
+ * PTLRPC service partition.
+ */
+struct ptlrpc_nrs_policy {
+ /**
+ * Linkage into the NRS head's list of policies,
+ * ptlrpc_nrs:nrs_policy_list
+ */
+ cfs_list_t pol_list;
+ /**
+ * Linkage into the NRS head's list of policies with enqueued
+ * requests ptlrpc_nrs:nrs_policy_queued
+ */
+ cfs_list_t pol_list_queued;
+ /**
+ * Current state of this policy
+ */
+ enum ptlrpc_nrs_pol_state pol_state;
+ /**
+ * Bitmask of nrs_policy_flags
+ */
+ unsigned pol_flags;
+ /**
+ * # RPCs enqueued for later dispatching by the policy
+ */
+ long pol_req_queued;
+ /**
+ * # RPCs started for dispatch by the policy
+ */
+ long pol_req_started;
+ /**
+ * Usage Reference count taken on the policy instance
+ */
+ long pol_ref;
+ /**
+ * The NRS head this policy has been created at
+ */
+ struct ptlrpc_nrs *pol_nrs;
+ /**
+ * NRS operations for this policy; points to ptlrpc_nrs_pol_desc::pd_ops
+ */
+ struct ptlrpc_nrs_pol_ops *pol_ops;
+ /**
+ * Private policy data; varies by policy type
+ */
+ void *pol_private;
+ /**
+ * Human-readable policy name; point to ptlrpc_nrs_pol_desc::pd_name
+ */
+ char *pol_name;
+};
+
+/**
+ * NRS resource
+ *
+ * Resources are embedded into two types of NRS entities:
+ * - Inside NRS policies, in the policy's private data in
+ * ptlrpc_nrs_policy::pol_private
+ * - In objects that act as prime-level scheduling entities in different NRS
+ * policies; e.g. on a policy that performs round robin or similar order
+ * scheduling across client NIDs, there would be one NRS resource per unique
+ * client NID. On a policy which performs round robin scheduling across
+ * backend filesystem objects, there would be one resource associated with
+ * each of the backend filesystem objects partaking in the scheduling
+ * performed by the policy.
+ *
+ * NRS resources share a parent-child relationship, in which resources embedded
+ * in policy instances are the parent entities, with all scheduling entities
+ * a policy schedules across being the children, thus forming a simple resource
+ * hierarchy. This hierarchy may be extended with one or more levels in the
+ * future if the ability to have more than one primary policy is added.
+ *
+ * Upon request initialization, references to the then active NRS policies are
+ * taken and used to later handle the dispatching of the request with one of
+ * these policies.
+ *
+ * \see nrs_resource_get_safe()
+ * \see ptlrpc_nrs_req_add()
+ */
+struct ptlrpc_nrs_resource {
+ /**
+ * This NRS resource's parent; is NULL for resources embedded in NRS
+ * policy instances; i.e. those are top-level ones.
+ */
+ struct ptlrpc_nrs_resource *res_parent;
+ /**
+ * The policy associated with this resource.
+ */
+ struct ptlrpc_nrs_policy *res_policy;
+};
+
+enum {
+ NRS_RES_FALLBACK,
+ NRS_RES_PRIMARY,
+ NRS_RES_MAX
+};
+
+/* \name fifo
+ *
+ * FIFO policy
+ *
+ * This policy is a logical wrapper around previous, non-NRS functionality.
+ * It dispatches RPCs in the same order as they arrive from the network. This
+ * policy is currently used as the fallback policy, and the only enabled policy
+ * on all NRS heads of all PTLRPC service partitions.
+ * @{
+ */
+
+/**
+ * Private data structure for the FIFO policy
+ */
+struct nrs_fifo_head {
+ /**
+ * Resource object for policy instance.
+ */
+ struct ptlrpc_nrs_resource fh_res;
+ /**
+ * List of queued requests.
+ */
+ cfs_list_t fh_list;
+ /**
+ * For debugging purposes.
+ */
+ __u64 fh_sequence;
+};
+
+struct nrs_fifo_req {
+ /** request header, must be the first member of structure */
+ cfs_list_t fr_list;
+ __u64 fr_sequence;
+};
+
+/** @} fifo */
+
+/**
+ * NRS request
+ *
+ * Instances of this object exist embedded within ptlrpc_request; the main
+ * purpose of this object is to hold references to the request's resources
+ * for the lifetime of the request, and to hold properties that policies use
+ * use for determining the request's scheduling priority.
+ * */
+struct ptlrpc_nrs_request {
+ /**
+ * The request's resource hierarchy.
+ */
+ struct ptlrpc_nrs_resource *nr_res_ptrs[NRS_RES_MAX];
+ /**
+ * Index into ptlrpc_nrs_request::nr_res_ptrs of the resource of the
+ * policy that was used to enqueue the request.
+ *
+ * \see nrs_request_enqueue()
+ */
+ unsigned nr_res_idx;
+ unsigned nr_initialized:1;
+ unsigned nr_enqueued:1;
+ unsigned nr_dequeued:1;
+ unsigned nr_started:1;
+ unsigned nr_finalized:1;
+ cfs_binheap_node_t nr_node;
+
+ /**
+ * Policy-specific fields, used for determining a request's scheduling
+ * priority, and other supporting functionality.
+ */
+ union {
+ /**
+ * Fields for the FIFO policy
+ */
+ struct nrs_fifo_req fifo;
+ } nr_u;
+ /**
+ * Externally-registering policies may want to use this to allocate
+ * their own request properties.
+ */
+ void *ext;
+};
+
+/** @} nrs */
+
+/**
* Basic request prioritization operations structure.
* The whole idea is centered around locks and RPCs that might affect locks.
* When a lock is contended we try to give priority to RPCs that might lead
/** history sequence # */
__u64 rq_history_seq;
+ /** \addtogroup nrs
+ * @{
+ */
+ /** stub for NRS request */
+ struct ptlrpc_nrs_request rq_nrq;
+ /** @} nrs */
/** the index of service's srv_at_array into which request is linked */
time_t rq_at_index;
/** Lock to protect request flags and some other important bits, like
return rc;
}
+/** \addtogroup nrs
+ * @{
+ */
+int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_desc *desc);
+int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_desc *desc);
+void ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req);
+void nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_pol_info *info);
+
+/*
+ * Can the request be moved from the regular NRS head to the high-priority NRS
+ * head (of the same PTLRPC service partition), if any?
+ *
+ * For a reliable result, this should be checked under svcpt->scp_req lock.
+ */
+static inline bool
+ptlrpc_nrs_req_can_move(struct ptlrpc_request *req)
+{
+ struct ptlrpc_nrs_request *nrq = &req->rq_nrq;
+
+ /**
+ * LU-898: Check ptlrpc_nrs_request::nr_enqueued to make sure the
+ * request has been enqueued first, and ptlrpc_nrs_request::nr_started
+ * to make sure it has not been scheduled yet (analogous to previous
+ * (non-NRS) checking of !list_empty(&ptlrpc_request::rq_list).
+ */
+ return nrq->nr_enqueued && !nrq->nr_started && !req->rq_hp;
+}
+/** @} nrs */
+
/**
* Returns 1 if request buffer at offset \a index was already swabbed
*/
* sent to this portal
*/
spinlock_t scp_req_lock __cfs_cacheline_aligned;
- /** # reqs in either of the queues below */
- /** reqs waiting for service */
- cfs_list_t scp_req_pending;
- /** high priority queue */
- cfs_list_t scp_hreq_pending;
+ /** # reqs in either of the NRS heads below */
/** # reqs being served */
int scp_nreqs_active;
/** # HPreqs being served */
/** # hp requests handled */
int scp_hreq_count;
+ /** NRS head for regular requests */
+ struct ptlrpc_nrs scp_nrs_reg;
+ /** NRS head for HP requests; this is only valid for services that can
+ * handle HP requests */
+ struct ptlrpc_nrs *scp_nrs_hp;
+
/** AT stuff */
/** @{ */
/**
LIOD_BIND = 1 << 4,
};
+/**
+ * \addtogroup nrs
+ * @{
+ *
+ * Service compatibility function; policy is compatible with all services.
+ *
+ * \param[in] svc The service the policy is attempting to register with.
+ * \param[in] desc The policy descriptor
+ *
+ * \retval true The policy is compatible with the NRS head
+ *
+ * \see ptlrpc_nrs_pol_desc::pd_compat()
+ */
+static inline bool
+nrs_policy_compat_all(struct ptlrpc_service *svc,
+ const struct ptlrpc_nrs_pol_desc *desc)
+{
+ return true;
+}
+
+/**
+ * Service compatibility function; policy is compatible with only a specific
+ * service which is identified by its human-readable name at
+ * ptlrpc_service::srv_name.
+ *
+ * \param[in] svc The service the policy is attempting to register with.
+ * \param[in] desc The policy descriptor
+ *
+ * \retval false The policy is not compatible with the NRS head
+ * \retval true The policy is compatible with the NRS head
+ *
+ * \see ptlrpc_nrs_pol_desc::pd_compat()
+ */
+static inline bool
+nrs_policy_compat_one(struct ptlrpc_service *svc,
+ const struct ptlrpc_nrs_pol_desc *desc)
+{
+ LASSERT(desc->pd_compat_svc_name != NULL);
+ return strcmp(svc->srv_name, desc->pd_compat_svc_name) == 0;
+}
+
+/** @} nrs */
+
/* ptlrpc/events.c */
extern lnet_handle_eq_t ptlrpc_eq_h;
extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
int liblustre_check_services(void *arg);
void ptlrpc_daemonize(char *name);
int ptlrpc_service_health_check(struct ptlrpc_service *);
-void ptlrpc_hpreq_reorder(struct ptlrpc_request *req);
void ptlrpc_server_drop_request(struct ptlrpc_request *req);
#ifdef __KERNEL__
*/
static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
{
- struct ptlrpc_request *req;
- ENTRY;
+ struct ptlrpc_request *req;
+ ENTRY;
- if (lock->l_export == NULL) {
- LDLM_DEBUG(lock, "client lock: no-op");
- RETURN_EXIT;
- }
+ if (lock->l_export == NULL) {
+ LDLM_DEBUG(lock, "client lock: no-op");
+ RETURN_EXIT;
+ }
spin_lock_bh(&lock->l_export->exp_rpc_lock);
- cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
- rq_exp_list) {
- /* Do not process requests that were not yet added to there
- * incoming queue or were already removed from there for
- * processing */
- if (!req->rq_hp && !cfs_list_empty(&req->rq_list) &&
- req->rq_ops->hpreq_lock_match &&
- req->rq_ops->hpreq_lock_match(req, lock))
- ptlrpc_hpreq_reorder(req);
- }
+ cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
+ rq_exp_list) {
+ /* Do not process requests that were not yet added to there
+ * incoming queue or were already removed from there for
+ * processing. We evaluate ptlrpc_request_reorderable() without
+ * holding svcpt->scp_req_lock, and then redo the checks with
+ * the lock held once we need to obtain a reliable result.
+ */
+ if (ptlrpc_nrs_req_can_move(req) &&
+ req->rq_ops->hpreq_lock_match &&
+ req->rq_ops->hpreq_lock_match(req, lock))
+ ptlrpc_nrs_req_hp_move(req);
+ }
spin_unlock_bh(&lock->l_export->exp_rpc_lock);
EXIT;
}
ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o
ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o
ptlrpc_objs += sec.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o
-ptlrpc_objs += sec_null.o sec_plain.o
+ptlrpc_objs += sec_null.o sec_plain.o nrs.o nrs_fifo.o
target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
$(top_srcdir)/lustre/ldlm/ldlm_lockd.c \
$(top_srcdir)/lustre/ldlm/ldlm_internal.h \
$(top_srcdir)/lustre/ldlm/ldlm_inodebits.c \
- $(top_srcdir)/lustre/ldlm/ldlm_flock.c \
+ $(top_srcdir)/lustre/ldlm/ldlm_flock.c \
$(top_srcdir)/lustre/ldlm/ldlm_pool.c
-COMMON_SOURCES = client.c recover.c connection.c niobuf.c pack_generic.c \
- events.c ptlrpc_module.c service.c pinger.c recov_thread.c llog_net.c \
- llog_client.c llog_server.c import.c ptlrpcd.c pers.c wiretest.c \
- ptlrpc_internal.h layout.c sec.c sec_bulk.c sec_gc.c sec_config.c \
- sec_lproc.c sec_null.c sec_plain.c lproc_ptlrpc.c $(LDLM_COMM_SOURCES)
+COMMON_SOURCES = client.c recover.c connection.c niobuf.c pack_generic.c \
+ events.c ptlrpc_module.c service.c pinger.c recov_thread.c llog_net.c \
+ llog_client.c llog_server.c import.c ptlrpcd.c pers.c wiretest.c \
+ ptlrpc_internal.h layout.c sec.c sec_bulk.c sec_gc.c sec_config.c \
+ sec_lproc.c sec_null.c sec_plain.c lproc_ptlrpc.c nrs.c nrs_fifo.c \
+ $(LDLM_COMM_SOURCES)
if LIBLUSTRE
recover.c \
recov_thread.c \
service.c \
+ nrs.c \
+ nrs_fifo.c \
wiretest.c \
sec.c \
sec_bulk.c \
return count;
}
+/**
+ * \addtogoup nrs
+ * @{
+ */
+extern struct nrs_core nrs_core;
+
+/**
+ * Translates \e ptlrpc_nrs_pol_state values to human-readable strings.
+ *
+ * \param[in] state The policy state
+ */
+static const char *
+nrs_state2str(enum ptlrpc_nrs_pol_state state)
+{
+ switch (state) {
+ default:
+ LBUG();
+ case NRS_POL_STATE_INVALID:
+ return "invalid";
+ case NRS_POL_STATE_UNAVAIL:
+ return "unavail";
+ case NRS_POL_STATE_STOPPED:
+ return "stopped";
+ case NRS_POL_STATE_STOPPING:
+ return "stopping";
+ case NRS_POL_STATE_STARTING:
+ return "starting";
+ case NRS_POL_STATE_STARTED:
+ return "started";
+ }
+}
+
+/**
+ * Obtains status information for \a policy.
+ *
+ * Information is copied in \a info.
+ *
+ * \param[in] policy The policy
+ * \param[out] info Holds returned status information
+ */
+void
+nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_pol_info *info)
+{
+ LASSERT(policy != NULL);
+ LASSERT(info != NULL);
+ LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock));
+
+ memcpy(info->pi_name, policy->pol_name, NRS_POL_NAME_MAX);
+
+ info->pi_fallback = !!(policy->pol_flags & PTLRPC_NRS_FL_FALLBACK);
+ info->pi_state = policy->pol_state;
+ /**
+ * XXX: These are accessed without holding
+ * ptlrpc_service_part::scp_req_lock.
+ */
+ info->pi_req_queued = policy->pol_req_queued;
+ info->pi_req_started = policy->pol_req_started;
+}
+
+/**
+ * Reads and prints policy status information for all policies of a PTLRPC
+ * service.
+ */
+static int
+ptlrpc_lprocfs_rd_nrs(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct ptlrpc_service *svc = data;
+ struct ptlrpc_service_part *svcpt;
+ struct ptlrpc_nrs *nrs;
+ struct ptlrpc_nrs_policy *policy;
+ struct ptlrpc_nrs_pol_info *infos;
+ struct ptlrpc_nrs_pol_info tmp;
+ unsigned num_pols;
+ unsigned pol_idx = 0;
+ bool hp = false;
+ int i;
+ int rc = 0;
+ int rc2 = 0;
+ ENTRY;
+
+ /**
+ * Serialize NRS core lprocfs operations with policy registration/
+ * unregistration.
+ */
+ mutex_lock(&nrs_core.nrs_mutex);
+
+ /**
+ * Use the first service partition's regular NRS head in order to obtain
+ * the number of policies registered with NRS heads of this service. All
+ * service partitions will have the same number of policies.
+ */
+ nrs = nrs_svcpt2nrs(svc->srv_parts[0], false);
+
+ spin_lock(&nrs->nrs_lock);
+ num_pols = svc->srv_parts[0]->scp_nrs_reg.nrs_num_pols;
+ spin_unlock(&nrs->nrs_lock);
+
+ OBD_ALLOC(infos, num_pols * sizeof(*infos));
+ if (infos == NULL)
+ GOTO(out, rc = -ENOMEM);
+again:
+
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ nrs = nrs_svcpt2nrs(svcpt, hp);
+ spin_lock(&nrs->nrs_lock);
+
+ pol_idx = 0;
+
+ cfs_list_for_each_entry(policy, &nrs->nrs_policy_list,
+ pol_list) {
+ LASSERT(pol_idx < num_pols);
+
+ nrs_policy_get_info_locked(policy, &tmp);
+ /**
+ * Copy values when handling the first service
+ * partition.
+ */
+ if (i == 0) {
+ memcpy(infos[pol_idx].pi_name, tmp.pi_name,
+ NRS_POL_NAME_MAX);
+ memcpy(&infos[pol_idx].pi_state, &tmp.pi_state,
+ sizeof(tmp.pi_state));
+ infos[pol_idx].pi_fallback = tmp.pi_fallback;
+ /**
+ * For the rest of the service partitions
+ * sanity-check the values we get.
+ */
+ } else {
+ LASSERT(strncmp(infos[pol_idx].pi_name,
+ tmp.pi_name,
+ NRS_POL_NAME_MAX) == 0);
+ /**
+ * Not asserting ptlrpc_nrs_pol_info::pi_state,
+ * because it may be different between
+ * instances of the same policy in different
+ * service partitions.
+ */
+ LASSERT(infos[pol_idx].pi_fallback ==
+ tmp.pi_fallback);
+ }
+
+ infos[pol_idx].pi_req_queued += tmp.pi_req_queued;
+ infos[pol_idx].pi_req_started += tmp.pi_req_started;
+
+ pol_idx++;
+ }
+ spin_unlock(&nrs->nrs_lock);
+ }
+
+ /**
+ * Policy status information output is in YAML format.
+ * For example:
+ *
+ * regular_requests:
+ * - name: fifo
+ * state: started
+ * fallback: yes
+ * queued: 0
+ * active: 0
+ *
+ * - name: crrn
+ * state: started
+ * fallback: no
+ * queued: 2015
+ * active: 384
+ *
+ * high_priority_requests:
+ * - name: fifo
+ * state: started
+ * fallback: yes
+ * queued: 0
+ * active: 2
+ *
+ * - name: crrn
+ * state: stopped
+ * fallback: no
+ * queued: 0
+ * active: 0
+ */
+ rc2 = snprintf(page + rc, count - rc,
+ "%s\n", !hp ?
+ "\nregular_requests:" :
+ "high_priority_requests:");
+
+ if (rc2 >= count - rc) {
+ /** Output was truncated */
+ GOTO(out, rc = -EFBIG);
+ }
+
+ rc += rc2;
+
+ for (pol_idx = 0; pol_idx < num_pols; pol_idx++) {
+ rc2 = snprintf(page + rc, count - rc,
+ " - name: %s\n"
+ " state: %s\n"
+ " fallback: %s\n"
+ " queued: %-20d\n"
+ " active: %-20d\n\n",
+ infos[pol_idx].pi_name,
+ nrs_state2str(infos[pol_idx].pi_state),
+ infos[pol_idx].pi_fallback ? "yes" : "no",
+ (int)infos[pol_idx].pi_req_queued,
+ (int)infos[pol_idx].pi_req_started);
+
+
+ if (rc2 >= count - rc) {
+ /** Output was truncated */
+ GOTO(out, rc = -EFBIG);
+ }
+
+ rc += rc2;
+ }
+
+ if (!hp && nrs_svc_has_hp(svc)) {
+ memset(infos, 0, num_pols * sizeof(*infos));
+
+ /**
+ * Redo the processing for the service's HP NRS heads' policies.
+ */
+ hp = true;
+ goto again;
+ }
+
+ *eof = 1;
+
+out:
+ if (infos)
+ OBD_FREE(infos, num_pols * sizeof(*infos));
+
+ mutex_unlock(&nrs_core.nrs_mutex);
+
+ RETURN(rc);
+}
+
+/**
+ * The longest valid command string is the maxium policy name size, plus the
+ * length of the " reg" substring
+ */
+#define LPROCFS_NRS_WR_MAX_CMD (NRS_POL_NAME_MAX + sizeof(" reg") - 1)
+
+/**
+ * Starts and stops a given policy on a PTLRPC service.
+ *
+ * Commands consist of the policy name, followed by an optional [reg|hp] token;
+ * if the optional token is omitted, the operation is performed on both the
+ * regular and high-priority (if the service has one) NRS head.
+ */
+static int
+ptlrpc_lprocfs_wr_nrs(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct ptlrpc_service *svc = data;
+ enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
+ char *cmd;
+ char *cmd_copy = NULL;
+ char *token;
+ int rc = 0;
+ ENTRY;
+
+ if (count >= LPROCFS_NRS_WR_MAX_CMD)
+ GOTO(out, rc = -EINVAL);
+
+ OBD_ALLOC(cmd, LPROCFS_NRS_WR_MAX_CMD);
+ if (cmd == NULL)
+ GOTO(out, rc = -ENOMEM);
+ /**
+ * strsep() modifies its argument, so keep a copy
+ */
+ cmd_copy = cmd;
+
+ if (cfs_copy_from_user(cmd, buffer, count))
+ GOTO(out, rc = -EFAULT);
+
+ cmd[count] = '\0';
+
+ token = strsep(&cmd, " ");
+
+ if (strlen(token) > NRS_POL_NAME_MAX - 1)
+ GOTO(out, rc = -EINVAL);
+
+ /**
+ * No [reg|hp] token has been specified
+ */
+ if (cmd == NULL)
+ goto default_queue;
+
+ /**
+ * The second token is either NULL, or an optional [reg|hp] string
+ */
+ if (strcmp(cmd, "reg") == 0)
+ queue = PTLRPC_NRS_QUEUE_REG;
+ else if (strcmp(cmd, "hp") == 0)
+ queue = PTLRPC_NRS_QUEUE_HP;
+ else
+ GOTO(out, rc = -EINVAL);
+
+default_queue:
+
+ if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
+ GOTO(out, rc = -ENODEV);
+ else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
+ queue = PTLRPC_NRS_QUEUE_REG;
+
+ /**
+ * Serialize NRS core lprocfs operations with policy registration/
+ * unregistration.
+ */
+ mutex_lock(&nrs_core.nrs_mutex);
+
+ rc = ptlrpc_nrs_policy_control(svc, queue, token, PTLRPC_NRS_CTL_START,
+ false, NULL);
+
+ mutex_unlock(&nrs_core.nrs_mutex);
+out:
+ if (cmd_copy)
+ OBD_FREE(cmd_copy, LPROCFS_NRS_WR_MAX_CMD);
+
+ RETURN(rc < 0 ? rc : count);
+}
+
+/** @} nrs */
+
struct ptlrpc_srh_iterator {
int srhi_idx;
__u64 srhi_seq;
{.name = "timeouts",
.read_fptr = ptlrpc_lprocfs_rd_timeouts,
.data = svc},
- {NULL}
+ {.name = "nrs_policies",
+ .read_fptr = ptlrpc_lprocfs_rd_nrs,
+ .write_fptr = ptlrpc_lprocfs_wr_nrs,
+ .data = svc},
+ {NULL}
};
static struct file_operations req_history_fops = {
.owner = THIS_MODULE,
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License version 2 for more details. A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2011 Intel Corporation
+ *
+ * Copyright 2012 Xyratex Technology Limited
+ */
+/*
+ * lustre/ptlrpc/nrs.c
+ *
+ * Network Request Scheduler (NRS)
+ *
+ * Allows to reorder the handling of RPCs at servers.
+ *
+ * Author: Liang Zhen <liang@whamcloud.com>
+ * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
+ */
+/**
+ * \addtogoup nrs
+ * @{
+ */
+
+#define DEBUG_SUBSYSTEM S_RPC
+#ifndef __KERNEL__
+#include <liblustre.h>
+#endif
+#include <obd_support.h>
+#include <obd_class.h>
+#include <lustre_net.h>
+#include <lprocfs_status.h>
+#include <libcfs/libcfs.h>
+#include "ptlrpc_internal.h"
+
+/* XXX: This is just for liblustre. Remove the #if defined directive when the
+ * "cfs_" prefix is dropped from cfs_list_head. */
+#if defined (__linux__) && defined(__KERNEL__)
+extern struct list_head ptlrpc_all_services;
+#else
+extern struct cfs_list_head ptlrpc_all_services;
+#endif
+
+/**
+ * NRS core object.
+ */
+struct nrs_core nrs_core;
+
+static int
+nrs_policy_init(struct ptlrpc_nrs_policy *policy)
+{
+ return policy->pol_ops->op_policy_init != NULL ?
+ policy->pol_ops->op_policy_init(policy) : 0;
+}
+
+static void
+nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
+{
+ LASSERT(policy->pol_ref == 0);
+ LASSERT(policy->pol_req_queued == 0);
+
+ if (policy->pol_ops->op_policy_fini != NULL)
+ policy->pol_ops->op_policy_fini(policy);
+}
+
+static int
+nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc,
+ void *arg)
+{
+ return policy->pol_ops->op_policy_ctl != NULL ?
+ policy->pol_ops->op_policy_ctl(policy, opc, arg) : -ENOSYS;
+}
+
+static void
+nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
+{
+ struct ptlrpc_nrs *nrs = policy->pol_nrs;
+ ENTRY;
+
+ if (policy->pol_ops->op_policy_stop != NULL) {
+ spin_unlock(&nrs->nrs_lock);
+
+ policy->pol_ops->op_policy_stop(policy);
+
+ spin_lock(&nrs->nrs_lock);
+ }
+
+ LASSERT(cfs_list_empty(&policy->pol_list_queued));
+ LASSERT(policy->pol_req_queued == 0 &&
+ policy->pol_req_started == 0);
+
+ policy->pol_private = NULL;
+
+ policy->pol_state = NRS_POL_STATE_STOPPED;
+ EXIT;
+}
+
+static int
+nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
+{
+ struct ptlrpc_nrs *nrs = policy->pol_nrs;
+ ENTRY;
+
+ if (nrs->nrs_policy_fallback == policy && !nrs->nrs_stopping)
+ RETURN(-EPERM);
+
+ if (policy->pol_state == NRS_POL_STATE_STARTING)
+ RETURN(-EAGAIN);
+
+ /* In progress or already stopped */
+ if (policy->pol_state != NRS_POL_STATE_STARTED)
+ RETURN(0);
+
+ policy->pol_state = NRS_POL_STATE_STOPPING;
+
+ /* Immediately make it invisible */
+ if (nrs->nrs_policy_primary == policy) {
+ nrs->nrs_policy_primary = NULL;
+
+ } else {
+ LASSERT(nrs->nrs_policy_fallback == policy);
+ nrs->nrs_policy_fallback = NULL;
+ }
+
+ /* I have the only refcount */
+ if (policy->pol_ref == 1)
+ nrs_policy_stop0(policy);
+
+ RETURN(0);
+}
+
+/**
+ * Transitions the \a nrs NRS head's primary policy to
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING and if the policy has no
+ * pending usage references, to ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED.
+ *
+ * \param[in] nrs The NRS head to carry out this operation on
+ */
+static void
+nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
+{
+ struct ptlrpc_nrs_policy *tmp = nrs->nrs_policy_primary;
+ ENTRY;
+
+ if (tmp == NULL) {
+ /**
+ * XXX: This should really be RETURN_EXIT, but the latter does
+ * not currently print anything out, and possibly should be
+ * fixed to do so.
+ */
+ EXIT;
+ return;
+ }
+
+ nrs->nrs_policy_primary = NULL;
+
+ LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED);
+ tmp->pol_state = NRS_POL_STATE_STOPPING;
+
+ if (tmp->pol_ref == 0)
+ nrs_policy_stop0(tmp);
+ EXIT;
+}
+
+/**
+ * Transitions a policy across the ptlrpc_nrs_pol_state range of values, in
+ * response to an lprocfs command to start a policy.
+ *
+ * If a primary policy different to the current one is specified, this function
+ * will transition the new policy to the
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTING and then to
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED, and will then transition
+ * the old primary policy (if there is one) to
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding
+ * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED.
+ *
+ * If the fallback policy is specified, this is taken to indicate an instruction
+ * to stop the current primary policy, without substituting it with another
+ * primary policy, so the primary policy (if any) is transitioned to
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING, and if there are no outstanding
+ * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In
+ * this case, the fallback policy is only left active in the NRS head.
+ */
+static int
+nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
+{
+ struct ptlrpc_nrs *nrs = policy->pol_nrs;
+ int rc = 0;
+ ENTRY;
+
+ /**
+ * Don't allow multiple starting which is too complex, and has no real
+ * benefit.
+ */
+ if (nrs->nrs_policy_starting)
+ RETURN(-EAGAIN);
+
+ LASSERT(policy->pol_state != NRS_POL_STATE_STARTING);
+
+ if (policy->pol_state == NRS_POL_STATE_STOPPING ||
+ policy->pol_state == NRS_POL_STATE_UNAVAIL)
+ RETURN(-EAGAIN);
+
+ if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
+ /**
+ * This is for cases in which the user sets the policy to the
+ * fallback policy (currently fifo for all services); i.e. the
+ * user is resetting the policy to the default; so we stop the
+ * primary policy, if any.
+ */
+ if (policy == nrs->nrs_policy_fallback) {
+ nrs_policy_stop_primary(nrs);
+
+ RETURN(0);
+ }
+
+ /**
+ * If we reach here, we must be setting up the fallback policy
+ * at service startup time, and only a single policy with the
+ * nrs_policy_flags::PTLRPC_NRS_FL_FALLBACK flag set can
+ * register with NRS core.
+ */
+ LASSERT(nrs->nrs_policy_fallback == NULL);
+ } else {
+ /**
+ * Shouldn't start primary policy if w/o fallback policy.
+ */
+ if (nrs->nrs_policy_fallback == NULL)
+ RETURN(-EPERM);
+
+ if (policy->pol_state == NRS_POL_STATE_STARTED)
+ RETURN(0);
+ }
+
+ /**
+ * Serialize policy starting.across the NRS head
+ */
+ nrs->nrs_policy_starting = 1;
+
+ policy->pol_state = NRS_POL_STATE_STARTING;
+
+ if (policy->pol_ops->op_policy_start) {
+ spin_unlock(&nrs->nrs_lock);
+
+ rc = policy->pol_ops->op_policy_start(policy);
+
+ spin_lock(&nrs->nrs_lock);
+ if (rc != 0) {
+ policy->pol_state = NRS_POL_STATE_STOPPED;
+ GOTO(out, rc);
+ }
+ }
+
+ policy->pol_state = NRS_POL_STATE_STARTED;
+
+ if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
+ /**
+ * This path is only used at PTLRPC service setup time.
+ */
+ nrs->nrs_policy_fallback = policy;
+ } else {
+ /*
+ * Try to stop the current primary policy if there is one.
+ */
+ nrs_policy_stop_primary(nrs);
+
+ /**
+ * And set the newly-started policy as the primary one.
+ */
+ nrs->nrs_policy_primary = policy;
+ }
+
+out:
+ nrs->nrs_policy_starting = 0;
+
+ RETURN(rc);
+}
+
+/**
+ * Increases the policy's usage reference count.
+ */
+static void
+nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
+{
+ policy->pol_ref++;
+}
+
+/**
+ * Decreases the policy's usage reference count, and stops the policy in case it
+ * was already stopping and have no more outstanding usage references (which
+ * indicates it has no more queued or started requests, and can be safely
+ * stopped).
+ */
+static void
+nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
+{
+ LASSERT(policy->pol_ref > 0);
+
+ policy->pol_ref--;
+ if (unlikely(policy->pol_ref == 0 &&
+ policy->pol_state == NRS_POL_STATE_STOPPING))
+ nrs_policy_stop0(policy);
+}
+
+static void
+nrs_policy_put(struct ptlrpc_nrs_policy *policy)
+{
+ spin_lock(&policy->pol_nrs->nrs_lock);
+ nrs_policy_put_locked(policy);
+ spin_unlock(&policy->pol_nrs->nrs_lock);
+}
+
+/**
+ * Find and return a policy by name.
+ */
+static struct ptlrpc_nrs_policy *
+nrs_policy_find_locked(struct ptlrpc_nrs *nrs, char *name)
+{
+ struct ptlrpc_nrs_policy *tmp;
+
+ cfs_list_for_each_entry(tmp, &(nrs)->nrs_policy_list, pol_list) {
+ if (strncmp(tmp->pol_name, name, NRS_POL_NAME_MAX) == 0) {
+ nrs_policy_get_locked(tmp);
+ return tmp;
+ }
+ }
+ return NULL;
+}
+
+/**
+ * Release references for the resource hierarchy moving upwards towards the
+ * policy instance resource.
+ */
+static void
+nrs_resource_put(struct ptlrpc_nrs_resource *res)
+{
+ struct ptlrpc_nrs_policy *policy = res->res_policy;
+
+ if (policy->pol_ops->op_res_put != NULL) {
+ struct ptlrpc_nrs_resource *parent;
+
+ for (; res != NULL; res = parent) {
+ parent = res->res_parent;
+ policy->pol_ops->op_res_put(policy, res);
+ }
+ }
+}
+
+/**
+ * Obtains references for each resource in the resource hierarchy for request
+ * \a nrq if it is to be handled by \a policy.
+ *
+ * \param[in] policy The policy
+ * \param[in] nrq The request
+ * \param[in] moving_req Denotes whether this is a call to the function by
+ * ldlm_lock_reorder_req(), in order to move \a nrq to
+ * the high-priority NRS head; we should not sleep when
+ * set.
+ *
+ * \retval NULL Resource hierarchy references not obtained
+ * \retval valid-pointer The bottom level of the resource hierarchy
+ *
+ * \see ptlrpc_nrs_pol_ops::op_res_get()
+ */
+static struct ptlrpc_nrs_resource *
+nrs_resource_get(struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq, bool moving_req)
+{
+ /**
+ * Set to NULL to traverse the resource hierarchy from the top.
+ */
+ struct ptlrpc_nrs_resource *res = NULL;
+ struct ptlrpc_nrs_resource *tmp = NULL;
+ int rc;
+
+ while (1) {
+ rc = policy->pol_ops->op_res_get(policy, nrq, res, &tmp,
+ moving_req);
+ if (rc < 0) {
+ if (res != NULL)
+ nrs_resource_put(res);
+ return NULL;
+ }
+
+ LASSERT(tmp != NULL);
+ tmp->res_parent = res;
+ tmp->res_policy = policy;
+ res = tmp;
+ tmp = NULL;
+ /**
+ * Return once we have obtained a reference to the bottom level
+ * of the resource hierarchy.
+ */
+ if (rc > 0)
+ return res;
+ }
+}
+
+/**
+ * Obtains resources for the resource hierarchies and policy references for
+ * the fallback and current primary policy (if any), that will later be used
+ * to handle request \a nrq.
+ *
+ * \param[in] nrs The NRS head instance that will be handling request \a nrq.
+ * \param[in] nrq The request that is being handled.
+ * \param[out] resp The array where references to the resource hierarchy are
+ * stored.
+ * \param[in] moving_req Is set when obtaining resources while moving a
+ * request from a policy on the regular NRS head to a
+ * policy on the HP NRS head (via
+ * ldlm_lock_reorder_req()). It signifies that
+ * allocations to get resources should be atomic; for
+ * a full explanation, see comment in
+ * ptlrpc_nrs_pol_ops::op_res_get().
+ */
+static void
+nrs_resource_get_safe(struct ptlrpc_nrs *nrs, struct ptlrpc_nrs_request *nrq,
+ struct ptlrpc_nrs_resource **resp, bool moving_req)
+{
+ struct ptlrpc_nrs_policy *primary = NULL;
+ struct ptlrpc_nrs_policy *fallback = NULL;
+
+ memset(resp, 0, sizeof(resp[0]) * NRS_RES_MAX);
+
+ /**
+ * Obtain policy references.
+ */
+ spin_lock(&nrs->nrs_lock);
+
+ fallback = nrs->nrs_policy_fallback;
+ nrs_policy_get_locked(fallback);
+
+ primary = nrs->nrs_policy_primary;
+ if (primary != NULL)
+ nrs_policy_get_locked(primary);
+
+ spin_unlock(&nrs->nrs_lock);
+
+ /**
+ * Obtain resource hierarchy references.
+ */
+ resp[NRS_RES_FALLBACK] = nrs_resource_get(fallback, nrq, moving_req);
+ LASSERT(resp[NRS_RES_FALLBACK] != NULL);
+
+ if (primary != NULL) {
+ resp[NRS_RES_PRIMARY] = nrs_resource_get(primary, nrq,
+ moving_req);
+ /**
+ * A primary policy may exist which may not wish to serve a
+ * particular request for different reasons; release the
+ * reference on the policy as it will not be used for this
+ * request.
+ */
+ if (resp[NRS_RES_PRIMARY] == NULL)
+ nrs_policy_put(primary);
+ }
+}
+
+/**
+ * Releases references to resource hierarchies and policies, because they are no
+ * longer required; used when request handling has been completed, ot the
+ * request is moving to the high priority NRS head.
+ *
+ * \param resp The resource hierarchy that is being released
+ *
+ * \see ptlrpcnrs_req_hp_move()
+ * \see ptlrpc_nrs_req_finalize()
+ */
+static void
+nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
+{
+ struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
+ struct ptlrpc_nrs *nrs = NULL;
+ int i;
+
+ for (i = 0; i < NRS_RES_MAX; i++) {
+ if (resp[i] != NULL) {
+ pols[i] = resp[i]->res_policy;
+ nrs_resource_put(resp[i]);
+ resp[i] = NULL;
+ } else {
+ pols[i] = NULL;
+ }
+ }
+
+ for (i = 0; i < NRS_RES_MAX; i++) {
+ if (pols[i] == NULL)
+ continue;
+
+ if (nrs == NULL) {
+ nrs = pols[i]->pol_nrs;
+ spin_lock(&nrs->nrs_lock);
+ }
+ nrs_policy_put_locked(pols[i]);
+ }
+
+ if (nrs != NULL)
+ spin_unlock(&nrs->nrs_lock);
+}
+
+/**
+ * Obtains an NRS request from \a policy for handling via polling.
+ *
+ * \param[in] policy The policy being polled
+ * \param[in,out] arg Reserved parameter
+ */
+static struct ptlrpc_nrs_request *
+nrs_request_poll(struct ptlrpc_nrs_policy *policy)
+{
+ struct ptlrpc_nrs_request *nrq;
+
+ LASSERT(policy->pol_req_queued > 0);
+
+ nrq = policy->pol_ops->op_req_poll(policy);
+
+ LASSERT(nrq != NULL);
+ LASSERT(nrs_request_policy(nrq) == policy);
+
+ return nrq;
+}
+
+/**
+ * Enqueues request \a nrq for later handling, via one one the policies for
+ * which resources where earlier obtained via nrs_resource_get_safe(). The
+ * function attempts to enqueue the request first on the primary policy
+ * (if any), since this is the preferred choice.
+ *
+ * \param nrq The request being enqueued
+ *
+ * \see nrs_resource_get_safe()
+ */
+static void
+nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
+{
+ struct ptlrpc_nrs_policy *policy;
+ int rc;
+ int i;
+
+ /**
+ * Try in descending order, because the primary policy (if any) is
+ * the preferred choice.
+ */
+ for (i = NRS_RES_MAX - 1; i >= 0; i--) {
+ if (nrq->nr_res_ptrs[i] == NULL)
+ continue;
+
+ nrq->nr_res_idx = i;
+ policy = nrq->nr_res_ptrs[i]->res_policy;
+
+ rc = policy->pol_ops->op_req_enqueue(policy, nrq);
+ if (rc == 0) {
+ policy->pol_nrs->nrs_req_queued++;
+ policy->pol_req_queued++;
+ return;
+ }
+ }
+ /**
+ * Should never get here, as at least the primary policy's
+ * ptlrpc_nrs_pol_ops::op_req_enqueue() implementation should always
+ * succeed.
+ */
+ LBUG();
+}
+
+/**
+ * Dequeues request \a nrq from the policy which was used for handling it
+ *
+ * \param nrq The request being dequeued
+ *
+ * \see ptlrpc_nrs_req_del_nolock()
+ */
+static void
+nrs_request_dequeue(struct ptlrpc_nrs_request *nrq)
+{
+ struct ptlrpc_nrs_policy *policy;
+
+ policy = nrs_request_policy(nrq);
+
+ policy->pol_ops->op_req_dequeue(policy, nrq);
+
+ LASSERT(policy->pol_nrs->nrs_req_queued > 0);
+ LASSERT(policy->pol_req_queued > 0);
+
+ policy->pol_nrs->nrs_req_queued--;
+ policy->pol_req_queued--;
+}
+
+/**
+ * Is called when the request starts being handled, after it has been enqueued,
+ * polled and dequeued.
+ *
+ * \param[in] nrs The NRS request that is starting to be handled; can be used
+ * for job/resource control.
+ *
+ * \see ptlrpc_nrs_req_start_nolock()
+ */
+static void
+nrs_request_start(struct ptlrpc_nrs_request *nrq)
+{
+ struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
+
+ policy->pol_req_started++;
+ policy->pol_nrs->nrs_req_started++;
+ if (policy->pol_ops->op_req_start)
+ policy->pol_ops->op_req_start(policy, nrq);
+}
+
+/**
+ * Called when a request has been handled
+ *
+ * \param[in] nrs The request that has been handled; can be used for
+ * job/resource control.
+ *
+ * \see ptlrpc_nrs_req_stop_nolock()
+ */
+static void
+nrs_request_stop(struct ptlrpc_nrs_request *nrq)
+{
+ struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
+
+ if (policy->pol_ops->op_req_stop)
+ policy->pol_ops->op_req_stop(policy, nrq);
+
+ LASSERT(policy->pol_nrs->nrs_req_started > 0);
+ LASSERT(policy->pol_req_started > 0);
+
+ policy->pol_nrs->nrs_req_started--;
+ policy->pol_req_started--;
+}
+
+/**
+ * Handler for operations that can be carried out on policies.
+ *
+ * Handles opcodes that are common to all policy types within NRS core, and
+ * passes any unknown opcodes to the policy-specific control function.
+ *
+ * \param[in] nrs The NRS head this policy belongs to.
+ * \param[in] name The human-readable policy name; should be the same as
+ * ptlrpc_nrs_pol_desc::pd_name.
+ * \param[in] opc The opcode of the operation being carried out.
+ * \param[in,out] arg Can be used to pass information in and out between when
+ * carrying an operation; usually data that is private to
+ * the policy at some level, or generic policy status
+ * information.
+ *
+ * \retval -ve error condition
+ * \retval 0 operation was carried out successfully
+ */
+static int
+nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name, enum ptlrpc_nrs_ctl opc,
+ void *arg)
+{
+ struct ptlrpc_nrs_policy *policy;
+ int rc = 0;
+
+ spin_lock(&nrs->nrs_lock);
+
+ policy = nrs_policy_find_locked(nrs, name);
+ if (policy == NULL)
+ GOTO(out, rc = -ENOENT);
+
+ switch (opc) {
+ /**
+ * Unknown opcode, pass it down to the policy-specific control
+ * function for handling.
+ */
+ default:
+ rc = nrs_policy_ctl_locked(policy, opc, arg);
+ break;
+
+ /**
+ * Start \e policy
+ */
+ case PTLRPC_NRS_CTL_START:
+ rc = nrs_policy_start_locked(policy);
+ break;
+
+ /**
+ * TODO: This may need to be augmented for resource deallocation
+ * used by the policies.
+ */
+ case PTLRPC_NRS_CTL_SHRINK:
+ rc = -ENOSYS;
+ break;
+ }
+out:
+ if (policy != NULL)
+ nrs_policy_put_locked(policy);
+
+ spin_unlock(&nrs->nrs_lock);
+
+ return rc;
+}
+
+/**
+ * Unregisters a policy by name.
+ *
+ * \param[in] nrs The NRS head this policy belongs to.
+ * \param[in] name The human-readable policy name; should be the same as
+ * ptlrpc_nrs_pol_desc::pd_name
+ *
+ * \retval -ve error
+ * \retval 0 success
+ */
+static int
+nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
+{
+ struct ptlrpc_nrs_policy *policy = NULL;
+ ENTRY;
+
+ spin_lock(&nrs->nrs_lock);
+
+ policy = nrs_policy_find_locked(nrs, name);
+ if (policy == NULL) {
+ spin_unlock(&nrs->nrs_lock);
+
+ CERROR("Can't find NRS policy %s\n", name);
+ RETURN(-ENOENT);
+ }
+
+ if (policy->pol_ref > 1) {
+ CERROR("Policy %s is busy with %d references\n", name,
+ (int)policy->pol_ref);
+ nrs_policy_put_locked(policy);
+
+ spin_unlock(&nrs->nrs_lock);
+ RETURN(-EBUSY);
+ }
+
+ LASSERT(policy->pol_req_queued == 0);
+ LASSERT(policy->pol_req_started == 0);
+
+ if (policy->pol_state != NRS_POL_STATE_STOPPED) {
+ nrs_policy_stop_locked(policy);
+ LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED);
+ }
+
+ cfs_list_del(&policy->pol_list);
+ nrs->nrs_num_pols--;
+
+ nrs_policy_put_locked(policy);
+
+ spin_unlock(&nrs->nrs_lock);
+
+ nrs_policy_fini(policy);
+
+ LASSERT(policy->pol_private == NULL);
+ OBD_FREE_PTR(policy);
+
+ RETURN(0);
+}
+
+/**
+ * Register a policy from \policy descriptor \a desc with NRS head \a nrs.
+ *
+ * \param[in] nrs The NRS head on which the policy will be registered.
+ * \param[in] desc The policy descriptor from which the information will be
+ * obtained to register the policy.
+ *
+ * \retval -ve error
+ * \retval 0 success
+ */
+static int
+nrs_policy_register(struct ptlrpc_nrs *nrs,
+ struct ptlrpc_nrs_pol_desc *desc)
+{
+ struct ptlrpc_nrs_policy *policy;
+ struct ptlrpc_nrs_policy *tmp;
+ struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
+ int rc;
+ ENTRY;
+
+ LASSERT(svcpt != NULL);
+ LASSERT(desc->pd_ops != NULL);
+ LASSERT(desc->pd_ops->op_res_get != NULL);
+ LASSERT(desc->pd_ops->op_req_poll != NULL);
+ LASSERT(desc->pd_ops->op_req_enqueue != NULL);
+ LASSERT(desc->pd_ops->op_req_dequeue != NULL);
+ LASSERT(desc->pd_compat != NULL);
+
+ OBD_CPT_ALLOC_GFP(policy, svcpt->scp_service->srv_cptable,
+ svcpt->scp_cpt, sizeof(*policy), CFS_ALLOC_IO);
+ if (policy == NULL)
+ RETURN(-ENOMEM);
+
+ policy->pol_nrs = nrs;
+ policy->pol_name = desc->pd_name;
+ policy->pol_ops = desc->pd_ops;
+ policy->pol_state = desc->pd_flags & PTLRPC_NRS_FL_REG_EXTERN ?
+ NRS_POL_STATE_UNAVAIL : NRS_POL_STATE_STOPPED;
+ policy->pol_flags = desc->pd_flags & ~PTLRPC_NRS_FL_REG_EXTERN;
+
+ CFS_INIT_LIST_HEAD(&policy->pol_list);
+ CFS_INIT_LIST_HEAD(&policy->pol_list_queued);
+
+ rc = nrs_policy_init(policy);
+ if (rc != 0) {
+ OBD_FREE_PTR(policy);
+ RETURN(rc);
+ }
+
+ spin_lock(&nrs->nrs_lock);
+
+ tmp = nrs_policy_find_locked(nrs, policy->pol_name);
+ if (tmp != NULL) {
+ CERROR("NRS policy %s has been registered, can't register it "
+ "for %s\n",
+ policy->pol_name, svcpt->scp_service->srv_name);
+ nrs_policy_put_locked(tmp);
+
+ spin_unlock(&nrs->nrs_lock);
+ nrs_policy_fini(policy);
+ OBD_FREE_PTR(policy);
+
+ RETURN(-EEXIST);
+ }
+
+ cfs_list_add_tail(&policy->pol_list, &nrs->nrs_policy_list);
+ nrs->nrs_num_pols++;
+
+ if (policy->pol_flags & PTLRPC_NRS_FL_REG_START)
+ rc = nrs_policy_start_locked(policy);
+
+ spin_unlock(&nrs->nrs_lock);
+
+ if (rc != 0)
+ (void) nrs_policy_unregister(nrs, policy->pol_name);
+
+ RETURN(rc);
+}
+
+/**
+ * Enqueue request \a req using one of the policies its resources are referring
+ * to.
+ *
+ * \param[in] req The request to enqueue.
+ */
+static void
+ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
+{
+ struct ptlrpc_nrs_policy *policy;
+
+ LASSERT(req->rq_nrq.nr_initialized);
+ LASSERT(!req->rq_nrq.nr_enqueued);
+
+ nrs_request_enqueue(&req->rq_nrq);
+ req->rq_nrq.nr_enqueued = 1;
+
+ policy = nrs_request_policy(&req->rq_nrq);
+ /**
+ * Add the policy to the NRS head's list of policies with enqueued
+ * requests, if it has not been added there.
+ */
+ if (cfs_list_empty(&policy->pol_list_queued))
+ cfs_list_add_tail(&policy->pol_list_queued,
+ &policy->pol_nrs->nrs_policy_queued);
+}
+
+/**
+ * Enqueue a request on the high priority NRS head.
+ *
+ * \param req The request to enqueue.
+ */
+static void
+ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req)
+{
+ int opc = lustre_msg_get_opc(req->rq_reqmsg);
+ ENTRY;
+
+ spin_lock(&req->rq_lock);
+ req->rq_hp = 1;
+ ptlrpc_nrs_req_add_nolock(req);
+ if (opc != OBD_PING)
+ DEBUG_REQ(D_NET, req, "high priority req");
+ spin_unlock(&req->rq_lock);
+ EXIT;
+}
+
+/* ptlrpc/nrs_fifo.c */
+extern struct ptlrpc_nrs_pol_desc ptlrpc_nrs_fifo_desc;
+
+/**
+ * Array of policies that ship alongside NRS core; i.e. ones that do not
+ * register externally using ptlrpc_nrs_policy_register().
+ */
+static struct ptlrpc_nrs_pol_desc *nrs_pols_builtin[] = {
+ &ptlrpc_nrs_fifo_desc,
+};
+
+/**
+ * Returns a boolean predicate indicating whether the policy described by
+ * \a desc is adequate for use with service \a svc.
+ *
+ * \param[in] nrs The service
+ * \param[in] desc The policy descriptor
+ *
+ * \retval false The policy is not compatible with the service partition
+ * \retval true The policy is compatible with the service partition
+ */
+static inline bool
+nrs_policy_compatible(struct ptlrpc_service *svc,
+ const struct ptlrpc_nrs_pol_desc *desc)
+{
+ return desc->pd_compat(svc, desc);
+}
+
+/**
+ * Registers all compatible policies in nrs_core.nrs_policies, for NRS head
+ * \a nrs.
+ *
+ * \param[in] nrs The NRS head
+ *
+ * \retval -ve error
+ * \retval 0 success
+ *
+ * \pre mutex_is_locked(&nrs_core.nrs_mutex)
+ *
+ * \see ptlrpc_service_nrs_setup()
+ */
+static int
+nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
+{
+ struct ptlrpc_nrs_pol_desc *desc;
+ /* For convenience */
+ struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
+ struct ptlrpc_service *svc = svcpt->scp_service;
+ int rc = -EINVAL;
+ ENTRY;
+
+ LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
+
+ cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
+ if (nrs_policy_compatible(svc, desc)) {
+ rc = nrs_policy_register(nrs, desc);
+ if (rc != 0) {
+ CERROR("Failed to register NRS policy %s for "
+ "partition %d of service %s: %d\n",
+ desc->pd_name, svcpt->scp_cpt,
+ svc->srv_name, rc);
+ /**
+ * Fail registration if any of the policies'
+ * registration fails.
+ */
+ break;
+ }
+ }
+ }
+
+ RETURN(rc);
+}
+
+/**
+ * Initializes NRS head \a nrs of service partition \a svcpt, and registers all
+ * compatible policies in NRS core, with the NRS head.
+ *
+ * \param[in] nrs The NRS head
+ * \param[in] svcpt The PTLRPC service partition to setup
+ *
+ * \pre mutex_is_locked(&nrs_core.nrs_mutex)
+ */
+static int
+nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
+ struct ptlrpc_service_part *svcpt)
+{
+ int rc;
+ enum ptlrpc_nrs_queue_type queue;
+
+ LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
+
+ if (nrs == &svcpt->scp_nrs_reg)
+ queue = PTLRPC_NRS_QUEUE_REG;
+ else if (nrs == svcpt->scp_nrs_hp)
+ queue = PTLRPC_NRS_QUEUE_HP;
+ else
+ LBUG();
+
+ nrs->nrs_svcpt = svcpt;
+ nrs->nrs_queue_type = queue;
+ spin_lock_init(&nrs->nrs_lock);
+ CFS_INIT_LIST_HEAD(&nrs->nrs_heads);
+ CFS_INIT_LIST_HEAD(&nrs->nrs_policy_list);
+ CFS_INIT_LIST_HEAD(&nrs->nrs_policy_queued);
+
+ cfs_list_add_tail(&nrs->nrs_heads, &nrs_core.nrs_heads);
+
+ rc = nrs_register_policies_locked(nrs);
+
+ RETURN(rc);
+}
+
+/**
+ * Allocates a regular and optionally a high-priority NRS head (if the service
+ * handles high-priority RPCs), and then registers all available compatible
+ * policies on those NRS heads.
+ *
+ * \param[n] svcpt The PTLRPC service partition to setup
+ *
+ * \pre mutex_is_locked(&nrs_core.nrs_mutex)
+ */
+static int
+nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt)
+{
+ struct ptlrpc_nrs *nrs;
+ int rc;
+ ENTRY;
+
+ LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
+
+ /**
+ * Initialize the regular NRS head.
+ */
+ nrs = nrs_svcpt2nrs(svcpt, false);
+ rc = nrs_svcpt_setup_locked0(nrs, svcpt);
+ if (rc)
+ GOTO(out, rc);
+
+ /**
+ * Optionally allocate a high-priority NRS head.
+ */
+ if (svcpt->scp_service->srv_ops.so_hpreq_handler == NULL)
+ GOTO(out, rc);
+
+ OBD_CPT_ALLOC_PTR(svcpt->scp_nrs_hp,
+ svcpt->scp_service->srv_cptable,
+ svcpt->scp_cpt);
+ if (svcpt->scp_nrs_hp == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ nrs = nrs_svcpt2nrs(svcpt, true);
+ rc = nrs_svcpt_setup_locked0(nrs, svcpt);
+
+out:
+ RETURN(rc);
+}
+
+/**
+ * Unregisters all policies on all available NRS heads in a service partition;
+ * called at PTLRPC service unregistration time.
+ *
+ * \param[in] svcpt The PTLRPC service partition
+ *
+ * \pre mutex_is_locked(&nrs_core.nrs_mutex)
+ */
+static void
+nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt)
+{
+ struct ptlrpc_nrs *nrs;
+ struct ptlrpc_nrs_policy *policy;
+ struct ptlrpc_nrs_policy *tmp;
+ int rc;
+ bool hp = false;
+ ENTRY;
+
+ LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
+
+again:
+ nrs = nrs_svcpt2nrs(svcpt, hp);
+ nrs->nrs_stopping = 1;
+
+ cfs_list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list,
+ pol_list) {
+ rc = nrs_policy_unregister(nrs, policy->pol_name);
+ LASSERT(rc == 0);
+ }
+
+ cfs_list_del(&nrs->nrs_heads);
+
+ /**
+ * If the service partition has an HP NRS head, clean that up as well.
+ */
+ if (!hp && nrs_svcpt_has_hp(svcpt)) {
+ hp = true;
+ goto again;
+ }
+
+ if (hp)
+ OBD_FREE_PTR(nrs);
+
+ EXIT;
+}
+
+/**
+ * Checks whether the policy in \a desc has been added to NRS core's list of
+ * policies, \e nrs_core.nrs_policies.
+ *
+ * \param[in] desc The policy descriptor
+ *
+ * \retval true The policy is present
+ * \retval false The policy is not present
+ */
+static bool
+nrs_policy_exists_locked(const struct ptlrpc_nrs_pol_desc *desc)
+{
+ struct ptlrpc_nrs_pol_desc *tmp;
+ ENTRY;
+
+ cfs_list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) {
+ if (strncmp(tmp->pd_name, desc->pd_name, NRS_POL_NAME_MAX) == 0)
+ RETURN(true);
+ }
+ RETURN(false);
+}
+
+/**
+ * Removes the policy from all supported NRS heads.
+ *
+ * \param[in] desc The policy descriptor to unregister
+ *
+ * \retval -ve error
+ * \retval 0 successfully unregistered policy on all supported NRS heads
+ *
+ * \pre mutex_is_locked(&nrs_core.nrs_mutex)
+ */
+static int
+nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc)
+{
+ struct ptlrpc_nrs *nrs;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
+
+ cfs_list_for_each_entry(nrs, &nrs_core.nrs_heads, nrs_heads) {
+ if (!nrs_policy_compatible(nrs->nrs_svcpt->scp_service, desc)) {
+ /**
+ * The policy may only have registered on compatible
+ * NRS heads.
+ */
+ continue;
+ }
+
+ rc = nrs_policy_unregister(nrs, desc->pd_name);
+
+ /**
+ * Ignore -ENOENT as the policy may not have registered
+ * successfully on all service partitions.
+ */
+ if (rc == -ENOENT) {
+ rc = 0;
+ } else if (rc != 0) {
+ CERROR("Failed to unregister NRS policy %s for "
+ "partition %d of service %s: %d\n",
+ desc->pd_name, nrs->nrs_svcpt->scp_cpt,
+ nrs->nrs_svcpt->scp_service->srv_name, rc);
+ break;
+ }
+ }
+ RETURN(rc);
+}
+
+/**
+ * Transitions a policy from ptlrpc_nrs_pol_state::NRS_POL_STATE_UNAVAIL to
+ * ptlrpc_nrs_pol_state::STOPPED; is used to prevent policies that are
+ * registering externally using ptlrpc_nrs_policy_register from starting
+ * before they have successfully registered on all compatible service
+ * partitions.
+ *
+ * \param[in] nrs The NRS head that the policy belongs to
+ * \param[in] name The human-readable policy name
+ */
+static void
+nrs_pol_make_available0(struct ptlrpc_nrs *nrs, char *name)
+{
+ struct ptlrpc_nrs_policy *pol;
+
+ LASSERT(nrs);
+ LASSERT(name);
+
+ spin_lock(&nrs->nrs_lock);
+ pol = nrs_policy_find_locked(nrs, name);
+ if (pol) {
+ LASSERT(pol->pol_state == NRS_POL_STATE_UNAVAIL);
+ pol->pol_state = NRS_POL_STATE_STOPPED;
+ nrs_policy_put_locked(pol);
+ }
+ spin_unlock(&nrs->nrs_lock);
+}
+
+/**
+ * Make the policy available on all compatible service partitions of all PTLRPC
+ * services.
+ *
+ * \param[in] desc The descriptor for the policy that is to be made available
+ *
+ * \pre mutex_is_locked(&nrs_core.nrs_mutex)
+ *
+ * \see nrs_pol_make_available0()
+ */
+static void
+nrs_pol_make_available_locked(struct ptlrpc_nrs_pol_desc *desc)
+{
+ struct ptlrpc_nrs *nrs;
+ ENTRY;
+
+ LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
+
+ /**
+ * Cycle through all registered instances of the policy and place them
+ * at the STOPPED state.
+ */
+ cfs_list_for_each_entry(nrs, &nrs_core.nrs_heads, nrs_heads) {
+ if (!nrs_policy_compatible(nrs->nrs_svcpt->scp_service, desc))
+ continue;
+ nrs_pol_make_available0(nrs, desc->pd_name);
+ }
+ EXIT;
+}
+
+/**
+ * Registers a new policy with NRS core.
+ *
+ * Used for policies that register externally with NRS core, i.e. ones that are
+ * not part of \e nrs_pols_builtin[]. The function will only succeed if policy
+ * registration with all compatible service partitions is successful.
+ *
+ * \param[in] desc The policy descriptor to register
+ *
+ * \retval -ve error
+ * \retval 0 success
+ */
+int
+ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_desc *desc)
+{
+ struct ptlrpc_nrs *nrs;
+ struct ptlrpc_service *svc;
+ struct ptlrpc_service_part *svcpt;
+ int i;
+ int rc;
+ int rc2;
+ ENTRY;
+
+ LASSERT(desc != NULL);
+
+ desc->pd_name[NRS_POL_NAME_MAX - 1] = '\0';
+
+ if (desc->pd_flags & (PTLRPC_NRS_FL_FALLBACK |
+ PTLRPC_NRS_FL_REG_START)) {
+ CERROR("Failing to register NRS policy %s; re-check policy "
+ "flags, externally-registered policies cannot act as "
+ "fallback policies or be started immediately without "
+ "interaction with lprocfs.\n", desc->pd_name);
+ RETURN(-EINVAL);
+ }
+
+ desc->pd_flags |= PTLRPC_NRS_FL_REG_EXTERN;
+
+ mutex_lock(&nrs_core.nrs_mutex);
+
+ rc = nrs_policy_exists_locked(desc);
+ if (rc) {
+ CERROR("Failing to register NRS policy %s which has "
+ "already been registered with NRS core!\n",
+ desc->pd_name);
+ GOTO(fail, rc = -EEXIST);
+ }
+
+ /**
+ * Register the new policy on all compatible services
+ */
+ mutex_lock(&ptlrpc_all_services_mutex);
+
+ cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
+
+ if (unlikely(svc->srv_is_stopping)) {
+ mutex_unlock(&ptlrpc_all_services_mutex);
+ GOTO(fail, rc = -ESRCH);
+ }
+
+ if (!nrs_policy_compatible(svc, desc)) {
+ /**
+ * Attempt to register the policy if it is
+ * compatible, otherwise try the next service.
+ */
+ continue;
+ }
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ bool hp = false;
+
+again:
+ nrs = nrs_svcpt2nrs(svcpt, hp);
+ rc = nrs_policy_register(nrs, desc);
+ if (rc != 0) {
+ CERROR("Failed to register NRS policy %s for "
+ "partition %d of service %s: %d\n",
+ desc->pd_name, nrs->nrs_svcpt->scp_cpt,
+ nrs->nrs_svcpt->scp_service->srv_name,
+ rc);
+
+ rc2 = nrs_policy_unregister_locked(desc);
+ /**
+ * Should not fail at this point
+ */
+ LASSERT(rc2 == 0);
+ mutex_unlock(&ptlrpc_all_services_mutex);
+ GOTO(fail, rc);
+ }
+
+ if (!hp && nrs_svc_has_hp(svc)) {
+ hp = true;
+ goto again;
+ }
+ }
+ if (desc->pd_ops->op_lprocfs_init != NULL) {
+ rc = desc->pd_ops->op_lprocfs_init(svc);
+ if (rc != 0) {
+ rc2 = nrs_policy_unregister_locked(desc);
+ /**
+ * Should not fail at this point
+ */
+ LASSERT(rc2 == 0);
+ mutex_unlock(&ptlrpc_all_services_mutex);
+ GOTO(fail, rc);
+ }
+ }
+ }
+
+ mutex_unlock(&ptlrpc_all_services_mutex);
+
+ /**
+ * The policy has successfully registered with all service partitions,
+ * so mark the policy instances at the NRS heads as available.
+ */
+ nrs_pol_make_available_locked(desc);
+
+ cfs_list_add_tail(&desc->pd_list, &nrs_core.nrs_policies);
+fail:
+ mutex_unlock(&nrs_core.nrs_mutex);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(ptlrpc_nrs_policy_register);
+
+/**
+ * Unregisters a previously registered policy with NRS core. All instances of
+ * the policy on all NRS heads of all supported services are removed.
+ *
+ * \param[in] desc The descriptor of the policy to unregister
+ *
+ * \retval -ve error
+ * \retval 0 success
+ */
+int
+ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_desc *desc)
+{
+ int rc;
+ struct ptlrpc_service *svc;
+ ENTRY;
+
+ LASSERT(desc != NULL);
+
+ if (desc->pd_flags & PTLRPC_NRS_FL_FALLBACK) {
+ CERROR("Unable to unregister a fallback policy, unless the "
+ "PTLRPC service is stopping.\n");
+ RETURN(-EPERM);
+ }
+
+ desc->pd_name[NRS_POL_NAME_MAX - 1] = '\0';
+
+ mutex_lock(&nrs_core.nrs_mutex);
+
+ rc = nrs_policy_exists_locked(desc);
+ if (!rc) {
+ CERROR("Failing to unregister NRS policy %s which has "
+ "not been registered with NRS core!\n",
+ desc->pd_name);
+ GOTO(fail, rc = -ENOENT);
+ }
+
+ rc = nrs_policy_unregister_locked(desc);
+ if (rc == -EBUSY) {
+ CERROR("Please first stop policy %s on all service partitions "
+ "and then retry to unregister the policy.\n",
+ desc->pd_name);
+ GOTO(fail, rc);
+ }
+ CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n",
+ desc->pd_name);
+
+ cfs_list_del(&desc->pd_list);
+
+ /**
+ * Unregister the policy's lprocfs interface from all compatible
+ * services.
+ */
+ mutex_lock(&ptlrpc_all_services_mutex);
+
+ cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
+ if (!nrs_policy_compatible(svc, desc))
+ continue;
+
+ if (desc->pd_ops->op_lprocfs_fini != NULL)
+ desc->pd_ops->op_lprocfs_fini(svc);
+ }
+
+ mutex_unlock(&ptlrpc_all_services_mutex);
+
+fail:
+ mutex_unlock(&nrs_core.nrs_mutex);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(ptlrpc_nrs_policy_unregister);
+
+/**
+ * Setup NRS heads on all service partitions of service \a svc, and register
+ * all compatible policies on those NRS heads.
+ *
+ * \param[in] svc The service to setup
+ *
+ * \retval -ve error, the calling logic should eventually call
+ * ptlrpc_service_nrs_cleanup() to undo any work performed
+ * by this function.
+ *
+ * \see ptlrpc_register_service()
+ * \see ptlrpc_service_nrs_cleanup()
+ */
+int
+ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
+{
+ struct ptlrpc_service_part *svcpt;
+ const struct ptlrpc_nrs_pol_desc *desc;
+ int i;
+ int rc = 0;
+
+ mutex_lock(&nrs_core.nrs_mutex);
+
+ /**
+ * Initialize NRS heads on all service CPTs.
+ */
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ rc = nrs_svcpt_setup_locked(svcpt);
+ if (rc != 0)
+ GOTO(failed, rc);
+ }
+
+ /*
+ * Set up lprocfs interfaces for all supported policies for the
+ * service.
+ */
+ cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
+ if (!nrs_policy_compatible(svc, desc))
+ continue;
+
+ if (desc->pd_ops->op_lprocfs_init != NULL) {
+ rc = desc->pd_ops->op_lprocfs_init(svc);
+ if (rc != 0)
+ GOTO(failed, rc);
+ }
+ }
+
+failed:
+
+ mutex_unlock(&nrs_core.nrs_mutex);
+
+ RETURN(rc);
+}
+
+/**
+ * Unregisters all policies on all service partitions of service \a svc.
+ *
+ * \param[in] svc The PTLRPC service to unregister
+ */
+void
+ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
+{
+ struct ptlrpc_service_part *svcpt;
+ const struct ptlrpc_nrs_pol_desc *desc;
+ int i;
+
+ mutex_lock(&nrs_core.nrs_mutex);
+
+ /**
+ * Clean up NRS heads on all service partitions
+ */
+ ptlrpc_service_for_each_part(svcpt, i, svc)
+ nrs_svcpt_cleanup_locked(svcpt);
+
+ /**
+ * Clean up lprocfs interfaces for all supported policies for the
+ * service.
+ */
+ cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
+ if (!nrs_policy_compatible(svc, desc))
+ continue;
+
+ if (desc->pd_ops->op_lprocfs_fini != NULL)
+ desc->pd_ops->op_lprocfs_fini(svc);
+ }
+
+ mutex_unlock(&nrs_core.nrs_mutex);
+}
+
+/**
+ * Obtains NRS head resources for request \a req.
+ *
+ * These could be either on the regular or HP NRS head of \a svcpt; resources
+ * taken on the regular head can later be swapped for HP head resources by
+ * ldlm_lock_reorder_req().
+ *
+ * \param[in] svcpt The service partition
+ * \param[in] req The request
+ * \param[in] hp Which NRS head of \a svcpt to use
+ */
+void
+ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
+ struct ptlrpc_request *req, bool hp)
+{
+ struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
+
+ memset(&req->rq_nrq, 0, sizeof(req->rq_nrq));
+ nrs_resource_get_safe(nrs, &req->rq_nrq, req->rq_nrq.nr_res_ptrs,
+ false);
+
+ /**
+ * It is fine to access \e nr_initialized without locking as there is
+ * no contention at this early stage.
+ */
+ req->rq_nrq.nr_initialized = 1;
+}
+
+/**
+ * Releases resources for a request; is called after the request has been
+ * handled.
+ *
+ * \param[in] req The request
+ *
+ * \see ptlrpc_server_finish_request()
+ */
+void
+ptlrpc_nrs_req_finalize(struct ptlrpc_request *req)
+{
+ if (req->rq_nrq.nr_initialized) {
+ nrs_resource_put_safe(req->rq_nrq.nr_res_ptrs);
+ /* no protection on bit nr_initialized because no
+ * contention at this late stage */
+ req->rq_nrq.nr_finalized = 1;
+ }
+}
+
+void
+ptlrpc_nrs_req_start_nolock(struct ptlrpc_request *req)
+{
+ req->rq_nrq.nr_started = 1;
+ nrs_request_start(&req->rq_nrq);
+}
+
+void
+ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req)
+{
+ if (req->rq_nrq.nr_started)
+ nrs_request_stop(&req->rq_nrq);
+}
+
+/**
+ * Enqueues request \a req on either the regular or high-priority NRS head
+ * of service partition \a svcpt.
+ *
+ * \param[in] svcpt The service partition
+ * \param[in] req The request to be enqueued
+ * \param[in] hp Whether to enqueue the request on the regular or
+ * high-priority NRS head.
+ */
+void
+ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
+ struct ptlrpc_request *req, bool hp)
+{
+ spin_lock(&svcpt->scp_req_lock);
+
+ if (hp)
+ ptlrpc_nrs_hpreq_add_nolock(req);
+ else
+ ptlrpc_nrs_req_add_nolock(req);
+
+ spin_unlock(&svcpt->scp_req_lock);
+}
+
+/**
+ * Obtains a request for handling from an NRS head of service partition
+ * \a svcpt.
+ *
+ * \param[in] svcpt The service partition
+ * \param[in] hp Whether to obtain a request from the regular or
+ * high-priority NRS head.
+ *
+ * \retval the request to be handled
+ * \retval NULL on failure
+ */
+struct ptlrpc_request *
+ptlrpc_nrs_req_poll_nolock(struct ptlrpc_service_part *svcpt, bool hp)
+{
+ struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
+ struct ptlrpc_nrs_policy *policy;
+ struct ptlrpc_nrs_request *nrq;
+
+ if (unlikely(nrs->nrs_req_queued == 0))
+ return NULL;
+
+ /**
+ * Always try to drain requests from all NRS polices even if they are
+ * inactive, because the user can change policy status at runtime.
+ */
+ cfs_list_for_each_entry(policy, &(nrs)->nrs_policy_queued,
+ pol_list_queued) {
+ nrq = nrs_request_poll(policy);
+ if (likely(nrq != NULL))
+ return container_of(nrq, struct ptlrpc_request, rq_nrq);
+ }
+
+ return NULL;
+}
+
+/**
+ * Dequeues a request that was previously obtained via ptlrpc_nrs_req_poll() and
+ * is about to be handled.
+ *
+ * \param[in] req The request
+ */
+void
+ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req)
+{
+ struct ptlrpc_nrs_policy *policy;
+
+ LASSERT(req->rq_nrq.nr_enqueued);
+ LASSERT(!req->rq_nrq.nr_dequeued);
+
+ policy = nrs_request_policy(&req->rq_nrq);
+ nrs_request_dequeue(&req->rq_nrq);
+ req->rq_nrq.nr_dequeued = 1;
+
+ /**
+ * If the policy has no more requests queued, remove it from
+ * ptlrpc_nrs::nrs_policy_queued.
+ */
+ if (policy->pol_req_queued == 0) {
+ cfs_list_del_init(&policy->pol_list_queued);
+
+ /**
+ * If there are other policies with queued requests, move the
+ * current policy to the end so that we can round robin over
+ * all policies and drain the requests.
+ */
+ } else if (policy->pol_req_queued != policy->pol_nrs->nrs_req_queued) {
+ LASSERT(policy->pol_req_queued <
+ policy->pol_nrs->nrs_req_queued);
+
+ cfs_list_move_tail(&policy->pol_list_queued,
+ &policy->pol_nrs->nrs_policy_queued);
+ }
+}
+
+/**
+ * Returns whether there are any requests currently enqueued on any of the
+ * policies of service partition's \a svcpt NRS head specified by \a hp. Should
+ * be called while holding ptlrpc_service_part::scp_req_lock to get a reliable
+ * result.
+ *
+ * \param[in] svcpt The service partition to enquire.
+ * \param[in] hp Whether the regular or high-priority NRS head is to be
+ * enquired.
+ *
+ * \retval false The indicated NRS head has no enqueued requests.
+ * \retval true The indicated NRS head has some enqueued requests.
+ */
+bool
+ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
+{
+ struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
+
+ return nrs->nrs_req_queued > 0;
+};
+
+/**
+ * Moves request \a req from the regular to the high-priority NRS head.
+ *
+ * \param[in] req The request to move
+ */
+void
+ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
+ struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, true);
+ struct ptlrpc_nrs_request *nrq = &req->rq_nrq;
+ struct ptlrpc_nrs_resource *res1[NRS_RES_MAX];
+ struct ptlrpc_nrs_resource *res2[NRS_RES_MAX];
+ ENTRY;
+
+ /**
+ * Obtain the high-priority NRS head resources.
+ * XXX: Maybe want to remove nrs_resource[get|put]_safe() dance
+ * when request cannot actually move; move this further down?
+ */
+ nrs_resource_get_safe(nrs, nrq, res1, true);
+
+ spin_lock(&svcpt->scp_req_lock);
+
+ if (!ptlrpc_nrs_req_can_move(req))
+ goto out;
+
+ ptlrpc_nrs_req_del_nolock(req);
+ nrq->nr_enqueued = nrq->nr_dequeued = 0;
+
+ memcpy(res2, nrq->nr_res_ptrs, NRS_RES_MAX * sizeof(res2[0]));
+ memcpy(nrq->nr_res_ptrs, res1, NRS_RES_MAX * sizeof(res1[0]));
+
+ ptlrpc_nrs_hpreq_add_nolock(req);
+
+ memcpy(res1, res2, NRS_RES_MAX * sizeof(res1[0]));
+out:
+ spin_unlock(&svcpt->scp_req_lock);
+
+ /**
+ * Release either the regular NRS head resources if we moved the
+ * request, or the high-priority NRS head resources if we took a
+ * reference earlier in this function and ptlrpc_nrs_req_can_move()
+ * returned false.
+ */
+ nrs_resource_put_safe(res1);
+ EXIT;
+}
+
+/**
+ * Carries out a control operation \a opc on the policy identified by the
+ * human-readable \a name, on either all partitions, or only on the first
+ * partition of service \a svc.
+ *
+ * \param[in] svc The service the policy belongs to.
+ * \param[in] queue Whether to carry out the command on the policy which
+ * belongs to the regular, high-priority, or both NRS
+ * heads of service partitions of \a svc.
+ * \param[in] name The policy to act upon, by human-readable name
+ * \param[in] opc The opcode of the operation to carry out
+ * \param[in] single When set, the operation will only be carried out on the
+ * NRS heads of the first service partition of \a svc.
+ * This is useful for some policies which e.g. share
+ * identical values on the same parameters of different
+ * service partitions; when reading these parameters via
+ * lprocfs, these policies may just want to obtain and
+ * print out the values from the first service partition.
+ * Storing these values centrally elsewhere then could be
+ * another solution for this.
+ * \param[in,out] arg Can be used as a generic in/out buffer between control
+ * operations and the user environment.
+ *
+ *\retval -ve error condition
+ *\retval 0 operation was carried out successfully
+ */
+int
+ptlrpc_nrs_policy_control(struct ptlrpc_service *svc,
+ enum ptlrpc_nrs_queue_type queue, char *name,
+ enum ptlrpc_nrs_ctl opc, bool single, void *arg)
+{
+ struct ptlrpc_service_part *svcpt;
+ int i;
+ int rc = 0;
+ ENTRY;
+
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ switch (queue) {
+ default:
+ return -EINVAL;
+
+ case PTLRPC_NRS_QUEUE_BOTH:
+ case PTLRPC_NRS_QUEUE_REG:
+ rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, false), name,
+ opc, arg);
+ if (rc != 0 || (queue == PTLRPC_NRS_QUEUE_REG &&
+ single))
+ GOTO(out, rc);
+
+ if (queue == PTLRPC_NRS_QUEUE_REG)
+ break;
+
+ /* fallthrough */
+
+ case PTLRPC_NRS_QUEUE_HP:
+ /**
+ * XXX: We could optionally check for
+ * nrs_svc_has_hp(svc) here, and return an error if it
+ * is false. Right now we rely on the policies' lprocfs
+ * handlers that call the present function to make this
+ * check; if they fail to do so, they might hit the
+ * assertion inside nrs_svcpt2nrs() below.
+ */
+ rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, true), name,
+ opc, arg);
+ if (rc != 0 || single)
+ GOTO(out, rc);
+
+ break;
+ }
+ }
+out:
+ RETURN(rc);
+}
+
+/**
+ * Adds all policies that ship with NRS, i.e. those in the \e nrs_pols_builtin
+ * array, to NRS core's list of policies \e nrs_core.nrs_policies.
+ *
+ * \retval 0 All policy descriptors in \e nrs_pols_builtin have been added
+ * successfully to \e nrs_core.nrs_policies
+ */
+int
+ptlrpc_nrs_init(void)
+{
+ int rc = -EINVAL;
+ int i;
+ ENTRY;
+
+ /**
+ * Initialize the NRS core object.
+ */
+ mutex_init(&nrs_core.nrs_mutex);
+ CFS_INIT_LIST_HEAD(&nrs_core.nrs_heads);
+ CFS_INIT_LIST_HEAD(&nrs_core.nrs_policies);
+
+ for (i = 0; i < ARRAY_SIZE(nrs_pols_builtin); i++) {
+ /**
+ * No need to take nrs_core.nrs_mutex as there is no contention at
+ * this early stage.
+ */
+ rc = nrs_policy_exists_locked(nrs_pols_builtin[i]);
+ /**
+ * This should not fail for in-tree policies.
+ */
+ LASSERT(rc == false);
+ cfs_list_add_tail(&nrs_pols_builtin[i]->pd_list,
+ &nrs_core.nrs_policies);
+ }
+
+ RETURN(rc);
+}
+
+/**
+ * Stub finalization function
+ */
+void
+ptlrpc_nrs_fini(void)
+{
+}
+
+/** @} nrs */
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License version 2 for more details. A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2011 Intel Corporation
+ *
+ * Copyright 2012 Xyratex Technology Limited
+ */
+/*
+ * lustre/ptlrpc/nrs_fifo.c
+ *
+ * Network Request Scheduler (NRS) FIFO policy
+ *
+ * Handles RPCs in a FIFO manner, as received from the network. This policy is
+ * a logical wrapper around previous, non-NRS functionality. It is used as the
+ * default and fallback policy for all types of RPCs on all PTLRPC service
+ * partitions, for both regular and high-priority NRS heads. Default here means
+ * the policy is the one enabled at PTLRPC service partition startup time, and
+ * fallback means the policy is used to handle RPCs that are not handled
+ * successfully or are not handled at all by any primary policy that may be
+ * enabled on a given NRS head.
+ *
+ * Author: Liang Zhen <liang@whamcloud.com>
+ * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
+ */
+/**
+ * \addtogoup nrs
+ * @{
+ */
+
+#define DEBUG_SUBSYSTEM S_RPC
+#ifndef __KERNEL__
+#include <liblustre.h>
+#endif
+#include <obd_support.h>
+#include <obd_class.h>
+#include <libcfs/libcfs.h>
+#include "ptlrpc_internal.h"
+
+/**
+ * \name fifo
+ *
+ * The FIFO policy is a logical wrapper around previous, non-NRS functionality.
+ * It schedules RPCs in the same order as they are queued from LNet.
+ *
+ * @{
+ */
+
+/**
+ * Is called before the policy transitions into
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
+ * policy-specific private data structure.
+ *
+ * \param[in] policy The policy to start
+ *
+ * \retval -ENOMEM OOM error
+ * \retval 0 success
+ *
+ * \see nrs_policy_register()
+ * \see nrs_policy_ctl()
+ */
+static int
+nrs_fifo_start(struct ptlrpc_nrs_policy *policy)
+{
+ struct nrs_fifo_head *head;
+
+ OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
+ if (head == NULL)
+ return -ENOMEM;
+
+ CFS_INIT_LIST_HEAD(&head->fh_list);
+ policy->pol_private = head;
+ return 0;
+}
+
+/**
+ * Is called before the policy transitions into
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
+ * private data structure.
+ *
+ * \param[in] policy The policy to stop
+ *
+ * \see nrs_policy_stop0()
+ */
+static void
+nrs_fifo_stop(struct ptlrpc_nrs_policy *policy)
+{
+ struct nrs_fifo_head *head = policy->pol_private;
+
+ LASSERT(head != NULL);
+ LASSERT(cfs_list_empty(&head->fh_list));
+
+ OBD_FREE_PTR(head);
+}
+
+/**
+ * Is called for obtaining a FIFO policy resource.
+ *
+ * \param[in] policy The policy on which the request is being asked for
+ * \param[in] nrq The request for which resources are being taken
+ * \param[in] parent Parent resource, unused in this policy
+ * \param[out] resp Resources references are placed in this array
+ * \param[in] moving_req Signifies limited caller context; unused in this
+ * policy
+ *
+ * \retval 1 The FIFO policy only has a one-level resource hierarchy, as since
+ * it implements a simple scheduling algorithm in which request
+ * priority is determined on the request arrival order, it does not
+ * need to maintain a set of resources that would otherwise be used
+ * to calculate a request's priority.
+ *
+ * \see nrs_resource_get_safe()
+ */
+static int
+nrs_fifo_res_get(struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq,
+ struct ptlrpc_nrs_resource *parent,
+ struct ptlrpc_nrs_resource **resp, bool moving_req)
+{
+ /**
+ * Just return the resource embedded inside nrs_fifo_head, and end this
+ * resource hierarchy reference request.
+ */
+ *resp = &((struct nrs_fifo_head *)policy->pol_private)->fh_res;
+ return 1;
+}
+
+/**
+ * Called when polling the fifo policy for a request.
+ *
+ * \param[in] policy The policy being polled
+ *
+ * \retval The request to be handled; this is the next request in the FIFO
+ * queue
+ * \see ptlrpc_nrs_req_poll_nolock()
+ */
+static struct ptlrpc_nrs_request *
+nrs_fifo_req_poll(struct ptlrpc_nrs_policy *policy)
+{
+ struct nrs_fifo_head *head = policy->pol_private;
+
+ LASSERT(head != NULL);
+
+ return cfs_list_empty(&head->fh_list) ? NULL :
+ cfs_list_entry(head->fh_list.next, struct ptlrpc_nrs_request,
+ nr_u.fifo.fr_list);
+}
+
+/**
+ * Adds request \a nrq to \a policy's list of queued requests
+ *
+ * \param[in] policy The policy
+ * \param[in] nrq The request to add
+ *
+ * \retval 0 success; nrs_request_enqueue() assumes this function will always
+ * succeed
+ */
+static int
+nrs_fifo_req_add(struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq)
+{
+ struct nrs_fifo_head *head;
+
+ head = container_of(nrs_request_resource(nrq), struct nrs_fifo_head,
+ fh_res);
+ /**
+ * Only used for debugging
+ */
+ nrq->nr_u.fifo.fr_sequence = head->fh_sequence++;
+ cfs_list_add_tail(&nrq->nr_u.fifo.fr_list, &head->fh_list);
+
+ return 0;
+}
+
+/**
+ * Removes request \a nrq from \a policy's list of queued requests.
+ *
+ * \param[in] policy The policy
+ * \param[in] nrq The request to remove
+ */
+static void
+nrs_fifo_req_del(struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq)
+{
+ LASSERT(!cfs_list_empty(&nrq->nr_u.fifo.fr_list));
+ cfs_list_del_init(&nrq->nr_u.fifo.fr_list);
+}
+
+/**
+ * Prints a debug statement right before the request \a nrq starts being
+ * handled.
+ *
+ * \param[in] policy The policy handling the request
+ * \param[in] nrq The request being handled
+ */
+static void
+nrs_fifo_req_start(struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq)
+{
+ struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
+ rq_nrq);
+
+ CDEBUG(D_RPCTRACE, "NRS start %s request from %s, seq: "LPU64"\n",
+ nrs_request_policy(nrq)->pol_name, libcfs_id2str(req->rq_peer),
+ nrq->nr_u.fifo.fr_sequence);
+}
+
+/**
+ * Prints a debug statement right before the request \a nrq stops being
+ * handled.
+ *
+ * \param[in] policy The policy handling the request
+ * \param[in] nrq The request being handled
+ *
+ * \see ptlrpc_server_finish_request()
+ * \see ptlrpc_nrs_req_stop_nolock()
+ */
+static void
+nrs_fifo_req_stop(struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq)
+{
+ struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
+ rq_nrq);
+
+ CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: "LPU64"\n",
+ nrs_request_policy(nrq)->pol_name, libcfs_id2str(req->rq_peer),
+ nrq->nr_u.fifo.fr_sequence);
+}
+
+/**
+ * FIFO policy operations
+ */
+static struct ptlrpc_nrs_pol_ops nrs_fifo_ops = {
+ .op_policy_start = nrs_fifo_start,
+ .op_policy_stop = nrs_fifo_stop,
+ .op_res_get = nrs_fifo_res_get,
+ .op_req_poll = nrs_fifo_req_poll,
+ .op_req_enqueue = nrs_fifo_req_add,
+ .op_req_dequeue = nrs_fifo_req_del,
+ .op_req_start = nrs_fifo_req_start,
+ .op_req_stop = nrs_fifo_req_stop,
+};
+
+/**
+ * FIFO policy descriptor
+ */
+struct ptlrpc_nrs_pol_desc ptlrpc_nrs_fifo_desc = {
+ .pd_name = "fifo",
+ .pd_ops = &nrs_fifo_ops,
+ .pd_compat = nrs_policy_compat_all,
+ .pd_flags = PTLRPC_NRS_FL_FALLBACK |
+ PTLRPC_NRS_FL_REG_START
+};
+
+/** @} fifo */
+
+/** @} nrs */
+
struct ldlm_res_id;
struct ptlrpc_request_set;
extern int test_req_buffer_pressure;
+extern struct mutex ptlrpc_all_services_mutex;
int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait);
/* ptlrpcd.c */
#define ptlrpc_lprocfs_do_request_stat(params...) do{}while(0)
#endif /* LPROCFS */
+/* NRS */
+
+/**
+ * NRS core object.
+ *
+ * Holds NRS core fields.
+ */
+struct nrs_core {
+ /**
+ * Protects nrs_core::nrs_heads, nrs_core::nrs_policies, serializes
+ * external policy registration/unregistration, and NRS core lprocfs
+ * operations.
+ */
+ struct mutex nrs_mutex;
+ /* XXX: This is just for liblustre. Remove the #if defined directive
+ * when the * "cfs_" prefix is dropped from cfs_list_head. */
+#if defined (__linux__) && defined(__KERNEL__)
+ /**
+ * List of all NRS heads on all service partitions of all services;
+ * protected by nrs_core::nrs_mutex.
+ */
+ struct list_head nrs_heads;
+ /**
+ * List of all policy descriptors registered with NRS core; protected
+ * by nrs_core::nrs_mutex.
+ */
+ struct list_head nrs_policies;
+#else
+ struct cfs_list_head nrs_heads;
+ struct cfs_list_head nrs_policies;
+#endif
+
+};
+
+int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc);
+void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc);
+
+void ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
+ struct ptlrpc_request *req, bool hp);
+void ptlrpc_nrs_req_finalize(struct ptlrpc_request *req);
+void ptlrpc_nrs_req_start_nolock(struct ptlrpc_request *req);
+void ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req);
+void ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
+ struct ptlrpc_request *req, bool hp);
+struct ptlrpc_request *
+ptlrpc_nrs_req_poll_nolock(struct ptlrpc_service_part *svcpt,
+ bool hp);
+void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req);
+bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp);
+
+int ptlrpc_nrs_policy_control(struct ptlrpc_service *svc,
+ enum ptlrpc_nrs_queue_type queue, char *name,
+ enum ptlrpc_nrs_ctl opc, bool single, void *arg);
+
+int ptlrpc_nrs_init(void);
+void ptlrpc_nrs_fini(void);
+
+static inline int
+nrs_svcpt_has_hp(struct ptlrpc_service_part *svcpt)
+{
+ return svcpt->scp_nrs_hp != NULL;
+}
+
+static inline int
+nrs_svc_has_hp(struct ptlrpc_service *svc)
+{
+ /**
+ * If the first service partition has an HP NRS head, all service
+ * partitions will.
+ */
+ return nrs_svcpt_has_hp(svc->srv_parts[0]);
+}
+
+static inline struct ptlrpc_nrs *
+nrs_svcpt2nrs(struct ptlrpc_service_part *svcpt, bool hp)
+{
+ LASSERT(ergo(hp, nrs_svcpt_has_hp(svcpt)));
+ return hp ? svcpt->scp_nrs_hp : &svcpt->scp_nrs_reg;
+}
+
+static inline int
+nrs_pol2cptid(struct ptlrpc_nrs_policy *policy)
+{
+ return policy->pol_nrs->nrs_svcpt->scp_cpt;
+}
+
+static inline struct ptlrpc_service *
+nrs_pol2svc(struct ptlrpc_nrs_policy *policy)
+{
+ return policy->pol_nrs->nrs_svcpt->scp_service;
+}
+
+static inline struct ptlrpc_service_part *
+nrs_pol2svcpt(struct ptlrpc_nrs_policy *policy)
+{
+ return policy->pol_nrs->nrs_svcpt;
+}
+
+static inline struct cfs_cpt_table *
+nrs_pol2cptab(struct ptlrpc_nrs_policy *policy)
+{
+ return nrs_pol2svc(policy)->srv_cptable;
+}
+
+static inline struct ptlrpc_nrs_resource *
+nrs_request_resource(struct ptlrpc_nrs_request *nrq)
+{
+ LASSERT(nrq->nr_initialized);
+ LASSERT(!nrq->nr_finalized);
+
+ return nrq->nr_res_ptrs[nrq->nr_res_idx];
+}
+
+static inline struct ptlrpc_nrs_policy *
+nrs_request_policy(struct ptlrpc_nrs_request *nrq)
+{
+ return nrs_request_resource(nrq)->res_policy;
+}
+
/* recovd_thread.c */
int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink);
#if RS_DEBUG
extern spinlock_t ptlrpc_rs_debug_lock;
#endif
-extern spinlock_t ptlrpc_all_services_lock;
extern struct mutex pinger_mutex;
extern struct mutex ptlrpcd_mutex;
#if RS_DEBUG
spin_lock_init(&ptlrpc_rs_debug_lock);
#endif
- spin_lock_init(&ptlrpc_all_services_lock);
+ mutex_init(&ptlrpc_all_services_mutex);
mutex_init(&pinger_mutex);
mutex_init(&ptlrpcd_mutex);
ptlrpc_init_xid();
if (rc)
GOTO(cleanup, rc);
-#ifdef __KERNEL__
cleanup_phase = 7;
+ rc = ptlrpc_nrs_init();
+ if (rc)
+ GOTO(cleanup, rc);
+
+#ifdef __KERNEL__
+ cleanup_phase = 8;
rc = tgt_mod_init();
if (rc)
GOTO(cleanup, rc);
cleanup:
switch(cleanup_phase) {
#ifdef __KERNEL__
+ case 8:
+ ptlrpc_nrs_fini();
+#endif
case 7:
llog_recov_fini();
-#endif
case 6:
sptlrpc_fini();
case 5:
static void __exit ptlrpc_exit(void)
{
tgt_mod_exit();
+ ptlrpc_nrs_fini();
llog_recov_fini();
sptlrpc_fini();
ldlm_exit();
static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req);
static void ptlrpc_at_remove_timed(struct ptlrpc_request *req);
-static CFS_LIST_HEAD(ptlrpc_all_services);
-spinlock_t ptlrpc_all_services_lock;
+/** Holds a list of all PTLRPC services */
+CFS_LIST_HEAD(ptlrpc_all_services);
+/** Used to protect the \e ptlrpc_all_services list */
+struct mutex ptlrpc_all_services_mutex;
struct ptlrpc_request_buffer_desc *
ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt)
/* acitve requests and hp requests */
spin_lock_init(&svcpt->scp_req_lock);
- CFS_INIT_LIST_HEAD(&svcpt->scp_req_pending);
- CFS_INIT_LIST_HEAD(&svcpt->scp_hreq_pending);
/* reply states */
spin_lock_init(&svcpt->scp_rep_lock);
rc = LNetSetLazyPortal(service->srv_req_portal);
LASSERT(rc == 0);
- spin_lock(&ptlrpc_all_services_lock);
- cfs_list_add (&service->srv_list, &ptlrpc_all_services);
- spin_unlock(&ptlrpc_all_services_lock);
+ mutex_lock(&ptlrpc_all_services_mutex);
+ cfs_list_add (&service->srv_list, &ptlrpc_all_services);
+ mutex_unlock(&ptlrpc_all_services_mutex);
- if (proc_entry != NULL)
- ptlrpc_lprocfs_register_service(proc_entry, service);
+ if (proc_entry != NULL)
+ ptlrpc_lprocfs_register_service(proc_entry, service);
- CDEBUG(D_NET, "%s: Started, listening on portal %d\n",
- service->srv_name, service->srv_req_portal);
+ rc = ptlrpc_service_nrs_setup(service);
+ if (rc != 0)
+ GOTO(failed, rc);
+
+ CDEBUG(D_NET, "%s: Started, listening on portal %d\n",
+ service->srv_name, service->srv_req_portal);
#ifdef __KERNEL__
rc = ptlrpc_start_threads(service);
ptlrpc_server_hpreq_fini(req);
spin_lock(&svcpt->scp_req_lock);
+ ptlrpc_nrs_req_stop_nolock(req);
svcpt->scp_nreqs_active--;
if (req->rq_hp)
svcpt->scp_nhreqs_active--;
spin_unlock(&svcpt->scp_req_lock);
+ ptlrpc_nrs_req_finalize(req);
+
ptlrpc_server_drop_request(req);
}
* Put the request to the export list if the request may become
* a high priority one.
*/
-static int ptlrpc_server_hpreq_init(struct ptlrpc_service *svc,
+static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
struct ptlrpc_request *req)
{
- int rc = 0;
- ENTRY;
+ int rc = 0;
+ ENTRY;
- if (svc->srv_ops.so_hpreq_handler) {
- rc = svc->srv_ops.so_hpreq_handler(req);
- if (rc)
- RETURN(rc);
- }
- if (req->rq_export && req->rq_ops) {
- /* Perform request specific check. We should do this check
- * before the request is added into exp_hp_rpcs list otherwise
- * it may hit swab race at LU-1044. */
- if (req->rq_ops->hpreq_check)
- rc = req->rq_ops->hpreq_check(req);
+ if (svcpt->scp_service->srv_ops.so_hpreq_handler) {
+ rc = svcpt->scp_service->srv_ops.so_hpreq_handler(req);
+ if (rc < 0)
+ RETURN(rc);
+ LASSERT(rc == 0);
+ }
+ if (req->rq_export && req->rq_ops) {
+ /* Perform request specific check. We should do this check
+ * before the request is added into exp_hp_rpcs list otherwise
+ * it may hit swab race at LU-1044. */
+ if (req->rq_ops->hpreq_check) {
+ rc = req->rq_ops->hpreq_check(req);
+ /**
+ * XXX: Out of all current
+ * ptlrpc_hpreq_ops::hpreq_check(), only
+ * ldlm_cancel_hpreq_check() can return an error code;
+ * other functions assert in similar places, which seems
+ * odd. What also does not seem right is that handlers
+ * for those RPCs do not assert on the same checks, but
+ * rather handle the error cases. e.g. see
+ * ost_rw_hpreq_check(), and ost_brw_read(),
+ * ost_brw_write().
+ */
+ if (rc < 0)
+ RETURN(rc);
+ LASSERT(rc == 0 || rc == 1);
+ }
spin_lock_bh(&req->rq_export->exp_rpc_lock);
cfs_list_add(&req->rq_exp_list,
spin_unlock_bh(&req->rq_export->exp_rpc_lock);
}
+ ptlrpc_nrs_req_initialize(svcpt, req, rc);
+
RETURN(rc);
}
}
EXPORT_SYMBOL(ptlrpc_hpreq_handler);
-/**
- * Make the request a high priority one.
- *
- * All the high priority requests are queued in a separate FIFO
- * ptlrpc_service_part::scp_hpreq_pending list which is parallel to
- * ptlrpc_service_part::scp_req_pending list but has a higher priority
- * for handling.
- *
- * \see ptlrpc_server_handle_request().
- */
-static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service_part *svcpt,
- struct ptlrpc_request *req)
-{
- ENTRY;
-
- spin_lock(&req->rq_lock);
- if (req->rq_hp == 0) {
- int opc = lustre_msg_get_opc(req->rq_reqmsg);
-
- /* Add to the high priority queue. */
- cfs_list_move_tail(&req->rq_list, &svcpt->scp_hreq_pending);
- req->rq_hp = 1;
- if (opc != OBD_PING)
- DEBUG_REQ(D_RPCTRACE, req, "high priority req");
- }
- spin_unlock(&req->rq_lock);
- EXIT;
-}
-
-/**
- * \see ptlrpc_hpreq_reorder_nolock
- */
-void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
-{
- struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
- ENTRY;
-
- spin_lock(&svcpt->scp_req_lock);
- /* It may happen that the request is already taken for the processing
- * but still in the export list, or the request is not in the request
- * queue but in the export list already, do not add it into the
- * HP list. */
- if (!cfs_list_empty(&req->rq_list))
- ptlrpc_hpreq_reorder_nolock(svcpt, req);
- spin_unlock(&svcpt->scp_req_lock);
- EXIT;
-}
-EXPORT_SYMBOL(ptlrpc_hpreq_reorder);
-
-/**
- * Add a request to the regular or HP queue; optionally perform HP request
- * initialization.
- */
static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
struct ptlrpc_request *req)
{
int rc;
ENTRY;
- rc = ptlrpc_server_hpreq_init(svcpt->scp_service, req);
+ rc = ptlrpc_server_hpreq_init(svcpt, req);
if (rc < 0)
RETURN(rc);
- spin_lock(&svcpt->scp_req_lock);
-
- if (rc)
- ptlrpc_hpreq_reorder_nolock(svcpt, req);
- else
- cfs_list_add_tail(&req->rq_list, &svcpt->scp_req_pending);
-
- spin_unlock(&svcpt->scp_req_lock);
+ ptlrpc_nrs_req_add(svcpt, req, !!rc);
RETURN(0);
}
{
int running = svcpt->scp_nthrs_running;
+ if (!nrs_svcpt_has_hp(svcpt))
+ return 0;
+
if (force)
return 1;
if (svcpt->scp_nhreqs_active == 0)
return 1;
- return cfs_list_empty(&svcpt->scp_req_pending) ||
+ return !ptlrpc_nrs_req_pending_nolock(svcpt, false) ||
svcpt->scp_hreq_count < svcpt->scp_service->srv_hpreq_ratio;
}
int force)
{
return ptlrpc_server_allow_high(svcpt, force) &&
- !cfs_list_empty(&svcpt->scp_hreq_pending);
+ ptlrpc_nrs_req_pending_nolock(svcpt, true);
}
/**
return 0;
return svcpt->scp_nhreqs_active > 0 ||
- svcpt->scp_service->srv_ops.so_hpreq_handler == NULL;
+ !nrs_svcpt_has_hp(svcpt);
}
static int ptlrpc_server_normal_pending(struct ptlrpc_service_part *svcpt,
int force)
{
return ptlrpc_server_allow_normal(svcpt, force) &&
- !cfs_list_empty(&svcpt->scp_req_pending);
+ ptlrpc_nrs_req_pending_nolock(svcpt, false);
}
/**
ENTRY;
if (ptlrpc_server_high_pending(svcpt, force)) {
- req = cfs_list_entry(svcpt->scp_hreq_pending.next,
- struct ptlrpc_request, rq_list);
+ req = ptlrpc_nrs_req_poll_nolock(svcpt, true);
svcpt->scp_hreq_count++;
RETURN(req);
}
if (ptlrpc_server_normal_pending(svcpt, force)) {
- req = cfs_list_entry(svcpt->scp_req_pending.next,
- struct ptlrpc_request, rq_list);
+ req = ptlrpc_nrs_req_poll_nolock(svcpt, false);
svcpt->scp_hreq_count = 0;
RETURN(req);
}
}
}
}
-
- cfs_list_del_init(&request->rq_list);
+ ptlrpc_nrs_req_del_nolock(request);
svcpt->scp_nreqs_active++;
if (request->rq_hp)
svcpt->scp_nhreqs_active++;
+ ptlrpc_nrs_req_start_nolock(request);
spin_unlock(&svcpt->scp_req_lock);
ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
cfs_gettimeofday(&work_end);
timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
- CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
+ CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
"%s:%s+%d:%d:x"LPU64":%s:%d Request procesed in "
"%ldus (%ldus total) trans "LPU64" rc %d/%d\n",
cfs_curproc_comm(),
while (ptlrpc_server_request_pending(svcpt, 1)) {
req = ptlrpc_server_request_get(svcpt, 1);
- cfs_list_del(&req->rq_list);
+ ptlrpc_nrs_req_del_nolock(req);
svcpt->scp_nreqs_active++;
ptlrpc_server_hpreq_fini(req);
service->srv_is_stopping = 1;
- spin_lock(&ptlrpc_all_services_lock);
+ mutex_lock(&ptlrpc_all_services_mutex);
cfs_list_del_init(&service->srv_list);
- spin_unlock(&ptlrpc_all_services_lock);
-
- ptlrpc_lprocfs_unregister_service(service);
+ mutex_unlock(&ptlrpc_all_services_mutex);
ptlrpc_service_del_atimer(service);
ptlrpc_stop_all_threads(service);
ptlrpc_service_unlink_rqbd(service);
ptlrpc_service_purge_all(service);
+ ptlrpc_service_nrs_cleanup(service);
+
+ ptlrpc_lprocfs_unregister_service(service);
+
ptlrpc_service_free(service);
RETURN(0);
return 0;
}
- /* How long has the next entry been waiting? */
- if (cfs_list_empty(&svcpt->scp_req_pending)) {
- request = cfs_list_entry(svcpt->scp_hreq_pending.next,
- struct ptlrpc_request, rq_list);
- } else {
- request = cfs_list_entry(svcpt->scp_req_pending.next,
- struct ptlrpc_request, rq_list);
- }
+ /* How long has the next entry been waiting? */
+ request = ptlrpc_nrs_req_poll_nolock(svcpt, true);
+ if (request == NULL)
+ request = ptlrpc_nrs_req_poll_nolock(svcpt, false);
timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
spin_unlock(&svcpt->scp_req_lock);