*/
struct nrs_core nrs_core;
-static int
-nrs_policy_init(struct ptlrpc_nrs_policy *policy)
+static int nrs_policy_init(struct ptlrpc_nrs_policy *policy)
{
- return policy->pol_ops->op_policy_init != NULL ?
- policy->pol_ops->op_policy_init(policy) : 0;
+ return policy->pol_desc->pd_ops->op_policy_init != NULL ?
+ policy->pol_desc->pd_ops->op_policy_init(policy) : 0;
}
-static void
-nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
+static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
{
LASSERT(policy->pol_ref == 0);
LASSERT(policy->pol_req_queued == 0);
- if (policy->pol_ops->op_policy_fini != NULL)
- policy->pol_ops->op_policy_fini(policy);
+ if (policy->pol_desc->pd_ops->op_policy_fini != NULL)
+ policy->pol_desc->pd_ops->op_policy_fini(policy);
}
-static int
-nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc,
- void *arg)
+static int nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy,
+ enum ptlrpc_nrs_ctl opc, void *arg)
{
- return policy->pol_ops->op_policy_ctl != NULL ?
- policy->pol_ops->op_policy_ctl(policy, opc, arg) : -ENOSYS;
+ /**
+ * The policy may be stopped, but the lprocfs files and
+ * ptlrpc_nrs_policy instances remain present until unregistration time.
+ * Do not perform the ctl operation if the policy is stopped, as
+ * policy->pol_private will be NULL in such a case.
+ */
+ if (policy->pol_state == NRS_POL_STATE_STOPPED)
+ RETURN(-ENODEV);
+
+ RETURN(policy->pol_desc->pd_ops->op_policy_ctl != NULL ?
+ policy->pol_desc->pd_ops->op_policy_ctl(policy, opc, arg) :
+ -ENOSYS);
}
-static void
-nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
+static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
{
struct ptlrpc_nrs *nrs = policy->pol_nrs;
ENTRY;
- if (policy->pol_ops->op_policy_stop != NULL) {
+ if (policy->pol_desc->pd_ops->op_policy_stop != NULL) {
spin_unlock(&nrs->nrs_lock);
- policy->pol_ops->op_policy_stop(policy);
+ policy->pol_desc->pd_ops->op_policy_stop(policy);
spin_lock(&nrs->nrs_lock);
}
policy->pol_private = NULL;
policy->pol_state = NRS_POL_STATE_STOPPED;
+
+ if (cfs_atomic_dec_and_test(&policy->pol_desc->pd_refs))
+ cfs_module_put(policy->pol_desc->pd_owner);
+
EXIT;
}
-static int
-nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
+static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
{
struct ptlrpc_nrs *nrs = policy->pol_nrs;
ENTRY;
* ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING and if the policy has no
* pending usage references, to ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED.
*
- * \param[in] nrs The NRS head to carry out this operation on
+ * \param[in] nrs the NRS head to carry out this operation on
*/
-static void
-nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
+static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
{
struct ptlrpc_nrs_policy *tmp = nrs->nrs_policy_primary;
ENTRY;
* references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In
* this case, the fallback policy is only left active in the NRS head.
*/
-static int
-nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
+static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
{
struct ptlrpc_nrs *nrs = policy->pol_nrs;
int rc = 0;
LASSERT(policy->pol_state != NRS_POL_STATE_STARTING);
- if (policy->pol_state == NRS_POL_STATE_STOPPING ||
- policy->pol_state == NRS_POL_STATE_UNAVAIL)
+ if (policy->pol_state == NRS_POL_STATE_STOPPING)
RETURN(-EAGAIN);
if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
*/
if (policy == nrs->nrs_policy_fallback) {
nrs_policy_stop_primary(nrs);
-
RETURN(0);
}
}
/**
- * Serialize policy starting.across the NRS head
+ * Increase the module usage count for policies registering from other
+ * modules.
+ */
+ if (cfs_atomic_inc_return(&policy->pol_desc->pd_refs) == 1 &&
+ !cfs_try_module_get(policy->pol_desc->pd_owner)) {
+ cfs_atomic_dec(&policy->pol_desc->pd_refs);
+ CERROR("NRS: cannot get module for policy %s; is it alive?\n",
+ policy->pol_desc->pd_name);
+ RETURN(-ENODEV);
+ }
+
+ /**
+ * Serialize policy starting across the NRS head
*/
nrs->nrs_policy_starting = 1;
policy->pol_state = NRS_POL_STATE_STARTING;
- if (policy->pol_ops->op_policy_start) {
+ if (policy->pol_desc->pd_ops->op_policy_start) {
spin_unlock(&nrs->nrs_lock);
- rc = policy->pol_ops->op_policy_start(policy);
+ rc = policy->pol_desc->pd_ops->op_policy_start(policy);
spin_lock(&nrs->nrs_lock);
if (rc != 0) {
+ if (cfs_atomic_dec_and_test(&policy->pol_desc->pd_refs))
+ cfs_module_put(policy->pol_desc->pd_owner);
+
policy->pol_state = NRS_POL_STATE_STOPPED;
GOTO(out, rc);
}
/**
* Increases the policy's usage reference count.
*/
-static void
-nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
+static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
{
policy->pol_ref++;
}
* indicates it has no more queued or started requests, and can be safely
* stopped).
*/
-static void
-nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
+static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
{
LASSERT(policy->pol_ref > 0);
nrs_policy_stop0(policy);
}
-static void
-nrs_policy_put(struct ptlrpc_nrs_policy *policy)
+static void nrs_policy_put(struct ptlrpc_nrs_policy *policy)
{
spin_lock(&policy->pol_nrs->nrs_lock);
nrs_policy_put_locked(policy);
/**
* Find and return a policy by name.
*/
-static struct ptlrpc_nrs_policy *
-nrs_policy_find_locked(struct ptlrpc_nrs *nrs, char *name)
+static struct ptlrpc_nrs_policy * nrs_policy_find_locked(struct ptlrpc_nrs *nrs,
+ char *name)
{
struct ptlrpc_nrs_policy *tmp;
- cfs_list_for_each_entry(tmp, &(nrs)->nrs_policy_list, pol_list) {
- if (strncmp(tmp->pol_name, name, NRS_POL_NAME_MAX) == 0) {
+ cfs_list_for_each_entry(tmp, &nrs->nrs_policy_list, pol_list) {
+ if (strncmp(tmp->pol_desc->pd_name, name,
+ NRS_POL_NAME_MAX) == 0) {
nrs_policy_get_locked(tmp);
return tmp;
}
* Release references for the resource hierarchy moving upwards towards the
* policy instance resource.
*/
-static void
-nrs_resource_put(struct ptlrpc_nrs_resource *res)
+static void nrs_resource_put(struct ptlrpc_nrs_resource *res)
{
struct ptlrpc_nrs_policy *policy = res->res_policy;
- if (policy->pol_ops->op_res_put != NULL) {
+ if (policy->pol_desc->pd_ops->op_res_put != NULL) {
struct ptlrpc_nrs_resource *parent;
for (; res != NULL; res = parent) {
parent = res->res_parent;
- policy->pol_ops->op_res_put(policy, res);
+ policy->pol_desc->pd_ops->op_res_put(policy, res);
}
}
}
* Obtains references for each resource in the resource hierarchy for request
* \a nrq if it is to be handled by \a policy.
*
- * \param[in] policy The policy
- * \param[in] nrq The request
- * \param[in] moving_req Denotes whether this is a call to the function by
+ * \param[in] policy the policy
+ * \param[in] nrq the request
+ * \param[in] moving_req denotes whether this is a call to the function by
* ldlm_lock_reorder_req(), in order to move \a nrq to
* the high-priority NRS head; we should not sleep when
* set.
*
- * \retval NULL Resource hierarchy references not obtained
- * \retval valid-pointer The bottom level of the resource hierarchy
+ * \retval NULL resource hierarchy references not obtained
+ * \retval valid-pointer the bottom level of the resource hierarchy
*
* \see ptlrpc_nrs_pol_ops::op_res_get()
*/
-static struct ptlrpc_nrs_resource *
-nrs_resource_get(struct ptlrpc_nrs_policy *policy,
- struct ptlrpc_nrs_request *nrq, bool moving_req)
+static
+struct ptlrpc_nrs_resource * nrs_resource_get(struct ptlrpc_nrs_policy *policy,
+ struct ptlrpc_nrs_request *nrq,
+ bool moving_req)
{
/**
* Set to NULL to traverse the resource hierarchy from the top.
int rc;
while (1) {
- rc = policy->pol_ops->op_res_get(policy, nrq, res, &tmp,
- moving_req);
+ rc = policy->pol_desc->pd_ops->op_res_get(policy, nrq, res,
+ &tmp, moving_req);
if (rc < 0) {
if (res != NULL)
nrs_resource_put(res);
* the fallback and current primary policy (if any), that will later be used
* to handle request \a nrq.
*
- * \param[in] nrs The NRS head instance that will be handling request \a nrq.
- * \param[in] nrq The request that is being handled.
- * \param[out] resp The array where references to the resource hierarchy are
+ * \param[in] nrs the NRS head instance that will be handling request \a nrq.
+ * \param[in] nrq the request that is being handled.
+ * \param[out] resp the array where references to the resource hierarchy are
* stored.
- * \param[in] moving_req Is set when obtaining resources while moving a
+ * \param[in] moving_req is set when obtaining resources while moving a
* request from a policy on the regular NRS head to a
* policy on the HP NRS head (via
* ldlm_lock_reorder_req()). It signifies that
* a full explanation, see comment in
* ptlrpc_nrs_pol_ops::op_res_get().
*/
-static void
-nrs_resource_get_safe(struct ptlrpc_nrs *nrs, struct ptlrpc_nrs_request *nrq,
- struct ptlrpc_nrs_resource **resp, bool moving_req)
+static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
+ struct ptlrpc_nrs_request *nrq,
+ struct ptlrpc_nrs_resource **resp,
+ bool moving_req)
{
struct ptlrpc_nrs_policy *primary = NULL;
struct ptlrpc_nrs_policy *fallback = NULL;
/**
* Releases references to resource hierarchies and policies, because they are no
- * longer required; used when request handling has been completed, ot the
+ * longer required; used when request handling has been completed, or the
* request is moving to the high priority NRS head.
*
- * \param resp The resource hierarchy that is being released
+ * \param resp the resource hierarchy that is being released
*
* \see ptlrpcnrs_req_hp_move()
* \see ptlrpc_nrs_req_finalize()
*/
-static void
-nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
+static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
{
struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
struct ptlrpc_nrs *nrs = NULL;
}
/**
- * Obtains an NRS request from \a policy for handling via polling.
+ * Obtains an NRS request from \a policy for handling or examination; the
+ * request should be removed in the 'handling' case.
+ *
+ * Calling into this function implies we already know the policy has a request
+ * waiting to be handled.
*
- * \param[in] policy The policy being polled
- * \param[in,out] arg Reserved parameter
+ * \param[in] policy the policy from which a request
+ * \param[in] peek when set, signifies that we just want to examine the
+ * request, and not handle it, so the request is not removed
+ * from the policy.
+ * \param[in] force when set, it will force a policy to return a request if it
+ * has one pending
+ *
+ * \retval the NRS request to be handled
*/
-static struct ptlrpc_nrs_request *
-nrs_request_poll(struct ptlrpc_nrs_policy *policy)
+static inline
+struct ptlrpc_nrs_request * nrs_request_get(struct ptlrpc_nrs_policy *policy,
+ bool peek, bool force)
{
struct ptlrpc_nrs_request *nrq;
LASSERT(policy->pol_req_queued > 0);
- nrq = policy->pol_ops->op_req_poll(policy);
+ nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force);
- LASSERT(nrq != NULL);
- LASSERT(nrs_request_policy(nrq) == policy);
+ LASSERT(ergo(nrq != NULL, nrs_request_policy(nrq) == policy));
return nrq;
}
* function attempts to enqueue the request first on the primary policy
* (if any), since this is the preferred choice.
*
- * \param nrq The request being enqueued
+ * \param nrq the request being enqueued
*
* \see nrs_resource_get_safe()
*/
-static void
-nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
+static inline void nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
{
struct ptlrpc_nrs_policy *policy;
int rc;
nrq->nr_res_idx = i;
policy = nrq->nr_res_ptrs[i]->res_policy;
- rc = policy->pol_ops->op_req_enqueue(policy, nrq);
+ rc = policy->pol_desc->pd_ops->op_req_enqueue(policy, nrq);
if (rc == 0) {
policy->pol_nrs->nrs_req_queued++;
policy->pol_req_queued++;
}
/**
- * Dequeues request \a nrq from the policy which was used for handling it
- *
- * \param nrq The request being dequeued
- *
- * \see ptlrpc_nrs_req_del_nolock()
- */
-static void
-nrs_request_dequeue(struct ptlrpc_nrs_request *nrq)
-{
- struct ptlrpc_nrs_policy *policy;
-
- policy = nrs_request_policy(nrq);
-
- policy->pol_ops->op_req_dequeue(policy, nrq);
-
- LASSERT(policy->pol_nrs->nrs_req_queued > 0);
- LASSERT(policy->pol_req_queued > 0);
-
- policy->pol_nrs->nrs_req_queued--;
- policy->pol_req_queued--;
-}
-
-/**
- * Is called when the request starts being handled, after it has been enqueued,
- * polled and dequeued.
- *
- * \param[in] nrs The NRS request that is starting to be handled; can be used
- * for job/resource control.
- *
- * \see ptlrpc_nrs_req_start_nolock()
- */
-static void
-nrs_request_start(struct ptlrpc_nrs_request *nrq)
-{
- struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
-
- policy->pol_req_started++;
- policy->pol_nrs->nrs_req_started++;
- if (policy->pol_ops->op_req_start)
- policy->pol_ops->op_req_start(policy, nrq);
-}
-
-/**
* Called when a request has been handled
*
- * \param[in] nrs The request that has been handled; can be used for
+ * \param[in] nrs the request that has been handled; can be used for
* job/resource control.
*
* \see ptlrpc_nrs_req_stop_nolock()
*/
-static void
-nrs_request_stop(struct ptlrpc_nrs_request *nrq)
+static inline void nrs_request_stop(struct ptlrpc_nrs_request *nrq)
{
struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
- if (policy->pol_ops->op_req_stop)
- policy->pol_ops->op_req_stop(policy, nrq);
+ if (policy->pol_desc->pd_ops->op_req_stop)
+ policy->pol_desc->pd_ops->op_req_stop(policy, nrq);
LASSERT(policy->pol_nrs->nrs_req_started > 0);
LASSERT(policy->pol_req_started > 0);
* Handles opcodes that are common to all policy types within NRS core, and
* passes any unknown opcodes to the policy-specific control function.
*
- * \param[in] nrs The NRS head this policy belongs to.
- * \param[in] name The human-readable policy name; should be the same as
+ * \param[in] nrs the NRS head this policy belongs to.
+ * \param[in] name the human-readable policy name; should be the same as
* ptlrpc_nrs_pol_desc::pd_name.
- * \param[in] opc The opcode of the operation being carried out.
- * \param[in,out] arg Can be used to pass information in and out between when
+ * \param[in] opc the opcode of the operation being carried out.
+ * \param[in,out] arg can be used to pass information in and out between when
* carrying an operation; usually data that is private to
* the policy at some level, or generic policy status
* information.
* \retval -ve error condition
* \retval 0 operation was carried out successfully
*/
-static int
-nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name, enum ptlrpc_nrs_ctl opc,
- void *arg)
+static int nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name,
+ enum ptlrpc_nrs_ctl opc, void *arg)
{
struct ptlrpc_nrs_policy *policy;
int rc = 0;
+ ENTRY;
spin_lock(&nrs->nrs_lock);
case PTLRPC_NRS_CTL_START:
rc = nrs_policy_start_locked(policy);
break;
-
- /**
- * TODO: This may need to be augmented for resource deallocation
- * used by the policies.
- */
- case PTLRPC_NRS_CTL_SHRINK:
- rc = -ENOSYS;
- break;
}
out:
if (policy != NULL)
spin_unlock(&nrs->nrs_lock);
- return rc;
+ RETURN(rc);
}
/**
* Unregisters a policy by name.
*
- * \param[in] nrs The NRS head this policy belongs to.
- * \param[in] name The human-readable policy name; should be the same as
+ * \param[in] nrs the NRS head this policy belongs to.
+ * \param[in] name the human-readable policy name; should be the same as
* ptlrpc_nrs_pol_desc::pd_name
*
* \retval -ve error
* \retval 0 success
*/
-static int
-nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
+static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
{
struct ptlrpc_nrs_policy *policy = NULL;
ENTRY;
/**
* Register a policy from \policy descriptor \a desc with NRS head \a nrs.
*
- * \param[in] nrs The NRS head on which the policy will be registered.
- * \param[in] desc The policy descriptor from which the information will be
+ * \param[in] nrs the NRS head on which the policy will be registered.
+ * \param[in] desc the policy descriptor from which the information will be
* obtained to register the policy.
*
* \retval -ve error
* \retval 0 success
*/
-static int
-nrs_policy_register(struct ptlrpc_nrs *nrs,
- struct ptlrpc_nrs_pol_desc *desc)
+static int nrs_policy_register(struct ptlrpc_nrs *nrs,
+ struct ptlrpc_nrs_pol_desc *desc)
{
struct ptlrpc_nrs_policy *policy;
struct ptlrpc_nrs_policy *tmp;
LASSERT(svcpt != NULL);
LASSERT(desc->pd_ops != NULL);
LASSERT(desc->pd_ops->op_res_get != NULL);
- LASSERT(desc->pd_ops->op_req_poll != NULL);
+ LASSERT(desc->pd_ops->op_req_get != NULL);
LASSERT(desc->pd_ops->op_req_enqueue != NULL);
LASSERT(desc->pd_ops->op_req_dequeue != NULL);
LASSERT(desc->pd_compat != NULL);
RETURN(-ENOMEM);
policy->pol_nrs = nrs;
- policy->pol_name = desc->pd_name;
- policy->pol_ops = desc->pd_ops;
- policy->pol_state = desc->pd_flags & PTLRPC_NRS_FL_REG_EXTERN ?
- NRS_POL_STATE_UNAVAIL : NRS_POL_STATE_STOPPED;
- policy->pol_flags = desc->pd_flags & ~PTLRPC_NRS_FL_REG_EXTERN;
+ policy->pol_desc = desc;
+ policy->pol_state = NRS_POL_STATE_STOPPED;
+ policy->pol_flags = desc->pd_flags;
CFS_INIT_LIST_HEAD(&policy->pol_list);
CFS_INIT_LIST_HEAD(&policy->pol_list_queued);
spin_lock(&nrs->nrs_lock);
- tmp = nrs_policy_find_locked(nrs, policy->pol_name);
+ tmp = nrs_policy_find_locked(nrs, policy->pol_desc->pd_name);
if (tmp != NULL) {
CERROR("NRS policy %s has been registered, can't register it "
- "for %s\n",
- policy->pol_name, svcpt->scp_service->srv_name);
+ "for %s\n", policy->pol_desc->pd_name,
+ svcpt->scp_service->srv_name);
nrs_policy_put_locked(tmp);
spin_unlock(&nrs->nrs_lock);
spin_unlock(&nrs->nrs_lock);
if (rc != 0)
- (void) nrs_policy_unregister(nrs, policy->pol_name);
+ (void) nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
RETURN(rc);
}
* Enqueue request \a req using one of the policies its resources are referring
* to.
*
- * \param[in] req The request to enqueue.
+ * \param[in] req the request to enqueue.
*/
-static void
-ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
+static void ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
{
struct ptlrpc_nrs_policy *policy;
* Add the policy to the NRS head's list of policies with enqueued
* requests, if it has not been added there.
*/
- if (cfs_list_empty(&policy->pol_list_queued))
+ if (unlikely(cfs_list_empty(&policy->pol_list_queued)))
cfs_list_add_tail(&policy->pol_list_queued,
&policy->pol_nrs->nrs_policy_queued);
}
/**
* Enqueue a request on the high priority NRS head.
*
- * \param req The request to enqueue.
+ * \param req the request to enqueue.
*/
-static void
-ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req)
+static void ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req)
{
int opc = lustre_msg_get_opc(req->rq_reqmsg);
ENTRY;
EXIT;
}
-/* ptlrpc/nrs_fifo.c */
-extern struct ptlrpc_nrs_pol_desc ptlrpc_nrs_fifo_desc;
-
-/**
- * Array of policies that ship alongside NRS core; i.e. ones that do not
- * register externally using ptlrpc_nrs_policy_register().
- */
-static struct ptlrpc_nrs_pol_desc *nrs_pols_builtin[] = {
- &ptlrpc_nrs_fifo_desc,
-};
-
/**
* Returns a boolean predicate indicating whether the policy described by
* \a desc is adequate for use with service \a svc.
*
- * \param[in] nrs The service
- * \param[in] desc The policy descriptor
+ * \param[in] svc the service
+ * \param[in] desc the policy descriptor
*
- * \retval false The policy is not compatible with the service partition
- * \retval true The policy is compatible with the service partition
+ * \retval false the policy is not compatible with the service
+ * \retval true the policy is compatible with the service
*/
-static inline bool
-nrs_policy_compatible(struct ptlrpc_service *svc,
- const struct ptlrpc_nrs_pol_desc *desc)
+static inline bool nrs_policy_compatible(const struct ptlrpc_service *svc,
+ const struct ptlrpc_nrs_pol_desc *desc)
{
return desc->pd_compat(svc, desc);
}
* Registers all compatible policies in nrs_core.nrs_policies, for NRS head
* \a nrs.
*
- * \param[in] nrs The NRS head
+ * \param[in] nrs the NRS head
*
* \retval -ve error
* \retval 0 success
*
* \see ptlrpc_service_nrs_setup()
*/
-static int
-nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
+static int nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
{
struct ptlrpc_nrs_pol_desc *desc;
- /* For convenience */
+ /* for convenience */
struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
struct ptlrpc_service *svc = svcpt->scp_service;
int rc = -EINVAL;
* Initializes NRS head \a nrs of service partition \a svcpt, and registers all
* compatible policies in NRS core, with the NRS head.
*
- * \param[in] nrs The NRS head
- * \param[in] svcpt The PTLRPC service partition to setup
+ * \param[in] nrs the NRS head
+ * \param[in] svcpt the PTLRPC service partition to setup
+ *
+ * \retval -ve error
+ * \retval 0 success
*
* \pre mutex_is_locked(&nrs_core.nrs_mutex)
*/
-static int
-nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
- struct ptlrpc_service_part *svcpt)
+static int nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
+ struct ptlrpc_service_part *svcpt)
{
int rc;
enum ptlrpc_nrs_queue_type queue;
nrs->nrs_svcpt = svcpt;
nrs->nrs_queue_type = queue;
spin_lock_init(&nrs->nrs_lock);
- CFS_INIT_LIST_HEAD(&nrs->nrs_heads);
CFS_INIT_LIST_HEAD(&nrs->nrs_policy_list);
CFS_INIT_LIST_HEAD(&nrs->nrs_policy_queued);
- cfs_list_add_tail(&nrs->nrs_heads, &nrs_core.nrs_heads);
-
rc = nrs_register_policies_locked(nrs);
RETURN(rc);
* handles high-priority RPCs), and then registers all available compatible
* policies on those NRS heads.
*
- * \param[n] svcpt The PTLRPC service partition to setup
+ * \param[in,out] svcpt the PTLRPC service partition to setup
*
* \pre mutex_is_locked(&nrs_core.nrs_mutex)
*/
-static int
-nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt)
+static int nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt)
{
struct ptlrpc_nrs *nrs;
int rc;
*/
nrs = nrs_svcpt2nrs(svcpt, false);
rc = nrs_svcpt_setup_locked0(nrs, svcpt);
- if (rc)
+ if (rc < 0)
GOTO(out, rc);
/**
* Unregisters all policies on all available NRS heads in a service partition;
* called at PTLRPC service unregistration time.
*
- * \param[in] svcpt The PTLRPC service partition
+ * \param[in] svcpt the PTLRPC service partition
*
* \pre mutex_is_locked(&nrs_core.nrs_mutex)
*/
-static void
-nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt)
+static void nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt)
{
struct ptlrpc_nrs *nrs;
struct ptlrpc_nrs_policy *policy;
cfs_list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list,
pol_list) {
- rc = nrs_policy_unregister(nrs, policy->pol_name);
+ rc = nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
LASSERT(rc == 0);
}
- cfs_list_del(&nrs->nrs_heads);
-
/**
* If the service partition has an HP NRS head, clean that up as well.
*/
}
/**
- * Checks whether the policy in \a desc has been added to NRS core's list of
- * policies, \e nrs_core.nrs_policies.
+ * Returns the descriptor for a policy as identified by by \a name.
*
- * \param[in] desc The policy descriptor
+ * \param[in] name the policy name
*
- * \retval true The policy is present
- * \retval false The policy is not present
+ * \retval the policy descriptor
+ * \retval NULL
*/
-static bool
-nrs_policy_exists_locked(const struct ptlrpc_nrs_pol_desc *desc)
+static struct ptlrpc_nrs_pol_desc *nrs_policy_find_desc_locked(const char *name)
{
struct ptlrpc_nrs_pol_desc *tmp;
ENTRY;
cfs_list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) {
- if (strncmp(tmp->pd_name, desc->pd_name, NRS_POL_NAME_MAX) == 0)
- RETURN(true);
+ if (strncmp(tmp->pd_name, name, NRS_POL_NAME_MAX) == 0)
+ RETURN(tmp);
}
- RETURN(false);
+ RETURN(NULL);
}
/**
- * Removes the policy from all supported NRS heads.
+ * Removes the policy from all supported NRS heads of all partitions of all
+ * PTLRPC services.
*
- * \param[in] desc The policy descriptor to unregister
+ * \param[in] desc the policy descriptor to unregister
*
* \retval -ve error
* \retval 0 successfully unregistered policy on all supported NRS heads
*
* \pre mutex_is_locked(&nrs_core.nrs_mutex)
+ * \pre mutex_is_locked(&ptlrpc_all_services_mutex)
*/
-static int
-nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc)
+static int nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc)
{
- struct ptlrpc_nrs *nrs;
- int rc = 0;
+ struct ptlrpc_nrs *nrs;
+ struct ptlrpc_service *svc;
+ struct ptlrpc_service_part *svcpt;
+ int i;
+ int rc = 0;
ENTRY;
LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
+ LASSERT(mutex_is_locked(&ptlrpc_all_services_mutex));
- cfs_list_for_each_entry(nrs, &nrs_core.nrs_heads, nrs_heads) {
- if (!nrs_policy_compatible(nrs->nrs_svcpt->scp_service, desc)) {
- /**
- * The policy may only have registered on compatible
- * NRS heads.
- */
- continue;
- }
+ cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
- rc = nrs_policy_unregister(nrs, desc->pd_name);
+ if (!nrs_policy_compatible(svc, desc) ||
+ unlikely(svc->srv_is_stopping))
+ continue;
- /**
- * Ignore -ENOENT as the policy may not have registered
- * successfully on all service partitions.
- */
- if (rc == -ENOENT) {
- rc = 0;
- } else if (rc != 0) {
- CERROR("Failed to unregister NRS policy %s for "
- "partition %d of service %s: %d\n",
- desc->pd_name, nrs->nrs_svcpt->scp_cpt,
- nrs->nrs_svcpt->scp_service->srv_name, rc);
- break;
- }
- }
- RETURN(rc);
-}
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ bool hp = false;
-/**
- * Transitions a policy from ptlrpc_nrs_pol_state::NRS_POL_STATE_UNAVAIL to
- * ptlrpc_nrs_pol_state::STOPPED; is used to prevent policies that are
- * registering externally using ptlrpc_nrs_policy_register from starting
- * before they have successfully registered on all compatible service
- * partitions.
- *
- * \param[in] nrs The NRS head that the policy belongs to
- * \param[in] name The human-readable policy name
- */
-static void
-nrs_pol_make_available0(struct ptlrpc_nrs *nrs, char *name)
-{
- struct ptlrpc_nrs_policy *pol;
+again:
+ nrs = nrs_svcpt2nrs(svcpt, hp);
+ rc = nrs_policy_unregister(nrs, desc->pd_name);
+ /**
+ * Ignore -ENOENT as the policy may not have registered
+ * successfully on all service partitions.
+ */
+ if (rc == -ENOENT) {
+ rc = 0;
+ } else if (rc != 0) {
+ CERROR("Failed to unregister NRS policy %s for "
+ "partition %d of service %s: %d\n",
+ desc->pd_name, svcpt->scp_cpt,
+ svcpt->scp_service->srv_name, rc);
+ RETURN(rc);
+ }
- LASSERT(nrs);
- LASSERT(name);
+ if (!hp && nrs_svc_has_hp(svc)) {
+ hp = true;
+ goto again;
+ }
+ }
- spin_lock(&nrs->nrs_lock);
- pol = nrs_policy_find_locked(nrs, name);
- if (pol) {
- LASSERT(pol->pol_state == NRS_POL_STATE_UNAVAIL);
- pol->pol_state = NRS_POL_STATE_STOPPED;
- nrs_policy_put_locked(pol);
+ if (desc->pd_ops->op_lprocfs_fini != NULL)
+ desc->pd_ops->op_lprocfs_fini(svc);
}
- spin_unlock(&nrs->nrs_lock);
-}
-
-/**
- * Make the policy available on all compatible service partitions of all PTLRPC
- * services.
- *
- * \param[in] desc The descriptor for the policy that is to be made available
- *
- * \pre mutex_is_locked(&nrs_core.nrs_mutex)
- *
- * \see nrs_pol_make_available0()
- */
-static void
-nrs_pol_make_available_locked(struct ptlrpc_nrs_pol_desc *desc)
-{
- struct ptlrpc_nrs *nrs;
- ENTRY;
-
- LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
- /**
- * Cycle through all registered instances of the policy and place them
- * at the STOPPED state.
- */
- cfs_list_for_each_entry(nrs, &nrs_core.nrs_heads, nrs_heads) {
- if (!nrs_policy_compatible(nrs->nrs_svcpt->scp_service, desc))
- continue;
- nrs_pol_make_available0(nrs, desc->pd_name);
- }
- EXIT;
+ RETURN(rc);
}
/**
* Registers a new policy with NRS core.
*
- * Used for policies that register externally with NRS core, i.e. ones that are
- * not part of \e nrs_pols_builtin[]. The function will only succeed if policy
- * registration with all compatible service partitions is successful.
+ * The function will only succeed if policy registration with all compatible
+ * service partitions (if any) is successful.
+ *
+ * N.B. This function should be called either at ptlrpc module initialization
+ * time when registering a policy that ships with NRS core, or in a
+ * module's init() function for policies registering from other modules.
*
- * \param[in] desc The policy descriptor to register
+ * \param[in] conf configuration information for the new policy to register
*
* \retval -ve error
* \retval 0 success
*/
-int
-ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_desc *desc)
+int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
{
- struct ptlrpc_nrs *nrs;
struct ptlrpc_service *svc;
- struct ptlrpc_service_part *svcpt;
- int i;
- int rc;
- int rc2;
+ struct ptlrpc_nrs_pol_desc *desc;
+ int rc = 0;
ENTRY;
- LASSERT(desc != NULL);
+ LASSERT(conf != NULL);
+ LASSERT(conf->nc_ops != NULL);
+ LASSERT(conf->nc_compat != NULL);
+ LASSERT(ergo(conf->nc_compat == nrs_policy_compat_one,
+ conf->nc_compat_svc_name != NULL));
+ LASSERT(ergo((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0,
+ conf->nc_owner != NULL));
- desc->pd_name[NRS_POL_NAME_MAX - 1] = '\0';
+ conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
- if (desc->pd_flags & (PTLRPC_NRS_FL_FALLBACK |
- PTLRPC_NRS_FL_REG_START)) {
- CERROR("Failing to register NRS policy %s; re-check policy "
- "flags, externally-registered policies cannot act as "
- "fallback policies or be started immediately without "
- "interaction with lprocfs.\n", desc->pd_name);
+ /**
+ * External policies are not allowed to start immediately upon
+ * registration, as there is a relatively higher chance that their
+ * registration might fail. In such a case, some policy instances may
+ * already have requests queued wen unregistration needs to happen as
+ * part o cleanup; since there is currently no way to drain requests
+ * from a policy unless the service is unregistering, we just disallow
+ * this.
+ */
+ if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) &&
+ (conf->nc_flags & (PTLRPC_NRS_FL_FALLBACK |
+ PTLRPC_NRS_FL_REG_START))) {
+ CERROR("NRS: failing to register policy %s. Please check "
+ "policy flags; external policies cannot act as fallback "
+ "policies, or be started immediately upon registration "
+ "without interaction with lprocfs\n", conf->nc_name);
RETURN(-EINVAL);
}
- desc->pd_flags |= PTLRPC_NRS_FL_REG_EXTERN;
-
mutex_lock(&nrs_core.nrs_mutex);
- rc = nrs_policy_exists_locked(desc);
- if (rc) {
- CERROR("Failing to register NRS policy %s which has "
- "already been registered with NRS core!\n",
- desc->pd_name);
+ if (nrs_policy_find_desc_locked(conf->nc_name) != NULL) {
+ CERROR("NRS: failing to register policy %s which has already "
+ "been registered with NRS core!\n",
+ conf->nc_name);
GOTO(fail, rc = -EEXIST);
}
+ OBD_ALLOC_PTR(desc);
+ if (desc == NULL)
+ GOTO(fail, rc = -ENOMEM);
+
+ strncpy(desc->pd_name, conf->nc_name, NRS_POL_NAME_MAX);
+ desc->pd_ops = conf->nc_ops;
+ desc->pd_compat = conf->nc_compat;
+ desc->pd_compat_svc_name = conf->nc_compat_svc_name;
+ if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0)
+ desc->pd_owner = conf->nc_owner;
+ desc->pd_flags = conf->nc_flags;
+ cfs_atomic_set(&desc->pd_refs, 0);
+
+ /**
+ * For policies that are held in the same module as NRS (currently
+ * ptlrpc), do not register the policy with all compatible services,
+ * as the services will not have started at this point, since we are
+ * calling from ptlrpc module initialization code. In such cases each
+ * service will register all compatible policies later, via
+ * ptlrpc_service_nrs_setup().
+ */
+ if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) == 0)
+ goto internal;
+
/**
* Register the new policy on all compatible services
*/
mutex_lock(&ptlrpc_all_services_mutex);
cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
+ struct ptlrpc_service_part *svcpt;
+ int i;
+ int rc2;
- if (unlikely(svc->srv_is_stopping)) {
- mutex_unlock(&ptlrpc_all_services_mutex);
- GOTO(fail, rc = -ESRCH);
- }
-
- if (!nrs_policy_compatible(svc, desc)) {
- /**
- * Attempt to register the policy if it is
- * compatible, otherwise try the next service.
- */
+ if (!nrs_policy_compatible(svc, desc) ||
+ unlikely(svc->srv_is_stopping))
continue;
- }
- ptlrpc_service_for_each_part(svcpt, i, svc) {
- bool hp = false;
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ struct ptlrpc_nrs *nrs;
+ bool hp = false;
again:
nrs = nrs_svcpt2nrs(svcpt, hp);
rc = nrs_policy_register(nrs, desc);
if (rc != 0) {
CERROR("Failed to register NRS policy %s for "
"partition %d of service %s: %d\n",
- desc->pd_name, nrs->nrs_svcpt->scp_cpt,
- nrs->nrs_svcpt->scp_service->srv_name,
- rc);
+ desc->pd_name, svcpt->scp_cpt,
+ svcpt->scp_service->srv_name, rc);
rc2 = nrs_policy_unregister_locked(desc);
/**
*/
LASSERT(rc2 == 0);
mutex_unlock(&ptlrpc_all_services_mutex);
+ OBD_FREE_PTR(desc);
GOTO(fail, rc);
}
goto again;
}
}
+
+ /**
+ * No need to take a reference to other modules here, as we
+ * will be calling from the module's init() function.
+ */
if (desc->pd_ops->op_lprocfs_init != NULL) {
rc = desc->pd_ops->op_lprocfs_init(svc);
if (rc != 0) {
*/
LASSERT(rc2 == 0);
mutex_unlock(&ptlrpc_all_services_mutex);
+ OBD_FREE_PTR(desc);
GOTO(fail, rc);
}
}
}
mutex_unlock(&ptlrpc_all_services_mutex);
-
- /**
- * The policy has successfully registered with all service partitions,
- * so mark the policy instances at the NRS heads as available.
- */
- nrs_pol_make_available_locked(desc);
-
+internal:
cfs_list_add_tail(&desc->pd_list, &nrs_core.nrs_policies);
fail:
mutex_unlock(&nrs_core.nrs_mutex);
* Unregisters a previously registered policy with NRS core. All instances of
* the policy on all NRS heads of all supported services are removed.
*
- * \param[in] desc The descriptor of the policy to unregister
+ * N.B. This function should only be called from a module's exit() function.
+ * Although it can be used for policies that ship alongside NRS core, the
+ * function is primarily intended for policies that register externally,
+ * from other modules.
+ *
+ * \param[in] conf configuration information for the policy to unregister
*
* \retval -ve error
* \retval 0 success
*/
-int
-ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_desc *desc)
+int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_conf *conf)
{
- int rc;
- struct ptlrpc_service *svc;
+ struct ptlrpc_nrs_pol_desc *desc;
+ int rc;
ENTRY;
- LASSERT(desc != NULL);
+ LASSERT(conf != NULL);
- if (desc->pd_flags & PTLRPC_NRS_FL_FALLBACK) {
+ if (conf->nc_flags & PTLRPC_NRS_FL_FALLBACK) {
CERROR("Unable to unregister a fallback policy, unless the "
"PTLRPC service is stopping.\n");
RETURN(-EPERM);
}
- desc->pd_name[NRS_POL_NAME_MAX - 1] = '\0';
+ conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
mutex_lock(&nrs_core.nrs_mutex);
- rc = nrs_policy_exists_locked(desc);
- if (!rc) {
+ desc = nrs_policy_find_desc_locked(conf->nc_name);
+ if (desc == NULL) {
CERROR("Failing to unregister NRS policy %s which has "
"not been registered with NRS core!\n",
- desc->pd_name);
- GOTO(fail, rc = -ENOENT);
+ conf->nc_name);
+ GOTO(not_exist, rc = -ENOENT);
}
+ mutex_lock(&ptlrpc_all_services_mutex);
+
rc = nrs_policy_unregister_locked(desc);
- if (rc == -EBUSY) {
- CERROR("Please first stop policy %s on all service partitions "
- "and then retry to unregister the policy.\n",
- desc->pd_name);
+ if (rc < 0) {
+ if (rc == -EBUSY)
+ CERROR("Please first stop policy %s on all service "
+ "partitions and then retry to unregister the "
+ "policy.\n", conf->nc_name);
GOTO(fail, rc);
}
+
CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n",
- desc->pd_name);
+ conf->nc_name);
cfs_list_del(&desc->pd_list);
+ OBD_FREE_PTR(desc);
- /**
- * Unregister the policy's lprocfs interface from all compatible
- * services.
- */
- mutex_lock(&ptlrpc_all_services_mutex);
-
- cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
- if (!nrs_policy_compatible(svc, desc))
- continue;
-
- if (desc->pd_ops->op_lprocfs_fini != NULL)
- desc->pd_ops->op_lprocfs_fini(svc);
- }
-
+fail:
mutex_unlock(&ptlrpc_all_services_mutex);
-fail:
+not_exist:
mutex_unlock(&nrs_core.nrs_mutex);
RETURN(rc);
* Setup NRS heads on all service partitions of service \a svc, and register
* all compatible policies on those NRS heads.
*
- * \param[in] svc The service to setup
+ * To be called from withing ptl
+ * \param[in] svc the service to setup
*
* \retval -ve error, the calling logic should eventually call
* ptlrpc_service_nrs_cleanup() to undo any work performed
* \see ptlrpc_register_service()
* \see ptlrpc_service_nrs_cleanup()
*/
-int
-ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
+int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
{
struct ptlrpc_service_part *svcpt;
const struct ptlrpc_nrs_pol_desc *desc;
GOTO(failed, rc);
}
- /*
+ /**
* Set up lprocfs interfaces for all supported policies for the
* service.
*/
/**
* Unregisters all policies on all service partitions of service \a svc.
*
- * \param[in] svc The PTLRPC service to unregister
+ * \param[in] svc the PTLRPC service to unregister
*/
-void
-ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
+void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
{
struct ptlrpc_service_part *svcpt;
const struct ptlrpc_nrs_pol_desc *desc;
* taken on the regular head can later be swapped for HP head resources by
* ldlm_lock_reorder_req().
*
- * \param[in] svcpt The service partition
- * \param[in] req The request
- * \param[in] hp Which NRS head of \a svcpt to use
+ * \param[in] svcpt the service partition
+ * \param[in] req the request
+ * \param[in] hp which NRS head of \a svcpt to use
*/
-void
-ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
- struct ptlrpc_request *req, bool hp)
+void ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
+ struct ptlrpc_request *req, bool hp)
{
struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
* Releases resources for a request; is called after the request has been
* handled.
*
- * \param[in] req The request
+ * \param[in] req the request
*
* \see ptlrpc_server_finish_request()
*/
-void
-ptlrpc_nrs_req_finalize(struct ptlrpc_request *req)
+void ptlrpc_nrs_req_finalize(struct ptlrpc_request *req)
{
if (req->rq_nrq.nr_initialized) {
nrs_resource_put_safe(req->rq_nrq.nr_res_ptrs);
}
}
-void
-ptlrpc_nrs_req_start_nolock(struct ptlrpc_request *req)
-{
- req->rq_nrq.nr_started = 1;
- nrs_request_start(&req->rq_nrq);
-}
-
-void
-ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req)
+void ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req)
{
if (req->rq_nrq.nr_started)
nrs_request_stop(&req->rq_nrq);
* Enqueues request \a req on either the regular or high-priority NRS head
* of service partition \a svcpt.
*
- * \param[in] svcpt The service partition
- * \param[in] req The request to be enqueued
- * \param[in] hp Whether to enqueue the request on the regular or
+ * \param[in] svcpt the service partition
+ * \param[in] req the request to be enqueued
+ * \param[in] hp whether to enqueue the request on the regular or
* high-priority NRS head.
*/
-void
-ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
- struct ptlrpc_request *req, bool hp)
+void ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
+ struct ptlrpc_request *req, bool hp)
{
spin_lock(&svcpt->scp_req_lock);
spin_unlock(&svcpt->scp_req_lock);
}
+static void nrs_request_removed(struct ptlrpc_nrs_policy *policy)
+{
+ LASSERT(policy->pol_nrs->nrs_req_queued > 0);
+ LASSERT(policy->pol_req_queued > 0);
+
+ policy->pol_nrs->nrs_req_queued--;
+ policy->pol_req_queued--;
+
+ /**
+ * If the policy has no more requests queued, remove it from
+ * ptlrpc_nrs::nrs_policy_queued.
+ */
+ if (unlikely(policy->pol_req_queued == 0)) {
+ cfs_list_del_init(&policy->pol_list_queued);
+
+ /**
+ * If there are other policies with queued requests, move the
+ * current policy to the end so that we can round robin over
+ * all policies and drain the requests.
+ */
+ } else if (policy->pol_req_queued != policy->pol_nrs->nrs_req_queued) {
+ LASSERT(policy->pol_req_queued <
+ policy->pol_nrs->nrs_req_queued);
+
+ cfs_list_move_tail(&policy->pol_list_queued,
+ &policy->pol_nrs->nrs_policy_queued);
+ }
+}
+
/**
* Obtains a request for handling from an NRS head of service partition
* \a svcpt.
*
- * \param[in] svcpt The service partition
- * \param[in] hp Whether to obtain a request from the regular or
+ * \param[in] svcpt the service partition
+ * \param[in] hp whether to obtain a request from the regular or
* high-priority NRS head.
- *
- * \retval the request to be handled
- * \retval NULL on failure
+ * \param[in] peek when set, signifies that we just want to examine the
+ * request, and not handle it, so the request is not removed
+ * from the policy.
+ * \param[in] force when set, it will force a policy to return a request if it
+ * has one pending
+ *
+ * \retval the request to be handled
+ * \retval NULL the head has no requests to serve
*/
struct ptlrpc_request *
-ptlrpc_nrs_req_poll_nolock(struct ptlrpc_service_part *svcpt, bool hp)
+ptlrpc_nrs_req_get_nolock0(struct ptlrpc_service_part *svcpt, bool hp,
+ bool peek, bool force)
{
struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
struct ptlrpc_nrs_policy *policy;
struct ptlrpc_nrs_request *nrq;
- if (unlikely(nrs->nrs_req_queued == 0))
- return NULL;
-
/**
* Always try to drain requests from all NRS polices even if they are
* inactive, because the user can change policy status at runtime.
*/
- cfs_list_for_each_entry(policy, &(nrs)->nrs_policy_queued,
+ cfs_list_for_each_entry(policy, &nrs->nrs_policy_queued,
pol_list_queued) {
- nrq = nrs_request_poll(policy);
- if (likely(nrq != NULL))
+ nrq = nrs_request_get(policy, peek, force);
+ if (nrq != NULL) {
+ if (likely(!peek)) {
+ nrq->nr_started = 1;
+
+ policy->pol_req_started++;
+ policy->pol_nrs->nrs_req_started++;
+
+ nrs_request_removed(policy);
+ }
+
return container_of(nrq, struct ptlrpc_request, rq_nrq);
+ }
}
return NULL;
}
/**
- * Dequeues a request that was previously obtained via ptlrpc_nrs_req_poll() and
- * is about to be handled.
+ * Dequeues request \a req from the policy it has been enqueued on.
*
- * \param[in] req The request
+ * \param[in] req the request
*/
-void
-ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req)
+void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req)
{
- struct ptlrpc_nrs_policy *policy;
+ struct ptlrpc_nrs_policy *policy = nrs_request_policy(&req->rq_nrq);
- LASSERT(req->rq_nrq.nr_enqueued);
- LASSERT(!req->rq_nrq.nr_dequeued);
-
- policy = nrs_request_policy(&req->rq_nrq);
- nrs_request_dequeue(&req->rq_nrq);
- req->rq_nrq.nr_dequeued = 1;
-
- /**
- * If the policy has no more requests queued, remove it from
- * ptlrpc_nrs::nrs_policy_queued.
- */
- if (policy->pol_req_queued == 0) {
- cfs_list_del_init(&policy->pol_list_queued);
+ policy->pol_desc->pd_ops->op_req_dequeue(policy, &req->rq_nrq);
- /**
- * If there are other policies with queued requests, move the
- * current policy to the end so that we can round robin over
- * all policies and drain the requests.
- */
- } else if (policy->pol_req_queued != policy->pol_nrs->nrs_req_queued) {
- LASSERT(policy->pol_req_queued <
- policy->pol_nrs->nrs_req_queued);
+ req->rq_nrq.nr_enqueued = 0;
- cfs_list_move_tail(&policy->pol_list_queued,
- &policy->pol_nrs->nrs_policy_queued);
- }
+ nrs_request_removed(policy);
}
/**
* be called while holding ptlrpc_service_part::scp_req_lock to get a reliable
* result.
*
- * \param[in] svcpt The service partition to enquire.
- * \param[in] hp Whether the regular or high-priority NRS head is to be
+ * \param[in] svcpt the service partition to enquire.
+ * \param[in] hp whether the regular or high-priority NRS head is to be
* enquired.
*
- * \retval false The indicated NRS head has no enqueued requests.
- * \retval true The indicated NRS head has some enqueued requests.
+ * \retval false the indicated NRS head has no enqueued requests.
+ * \retval true the indicated NRS head has some enqueued requests.
*/
-bool
-ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
+bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
{
struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
/**
* Moves request \a req from the regular to the high-priority NRS head.
*
- * \param[in] req The request to move
+ * \param[in] req the request to move
*/
-void
-ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req)
+void ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req)
{
struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
- struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, true);
struct ptlrpc_nrs_request *nrq = &req->rq_nrq;
struct ptlrpc_nrs_resource *res1[NRS_RES_MAX];
struct ptlrpc_nrs_resource *res2[NRS_RES_MAX];
/**
* Obtain the high-priority NRS head resources.
- * XXX: Maybe want to remove nrs_resource[get|put]_safe() dance
- * when request cannot actually move; move this further down?
*/
- nrs_resource_get_safe(nrs, nrq, res1, true);
+ nrs_resource_get_safe(nrs_svcpt2nrs(svcpt, true), nrq, res1, true);
spin_lock(&svcpt->scp_req_lock);
goto out;
ptlrpc_nrs_req_del_nolock(req);
- nrq->nr_enqueued = nrq->nr_dequeued = 0;
memcpy(res2, nrq->nr_res_ptrs, NRS_RES_MAX * sizeof(res2[0]));
memcpy(nrq->nr_res_ptrs, res1, NRS_RES_MAX * sizeof(res1[0]));
* human-readable \a name, on either all partitions, or only on the first
* partition of service \a svc.
*
- * \param[in] svc The service the policy belongs to.
- * \param[in] queue Whether to carry out the command on the policy which
+ * \param[in] svc the service the policy belongs to.
+ * \param[in] queue whether to carry out the command on the policy which
* belongs to the regular, high-priority, or both NRS
* heads of service partitions of \a svc.
- * \param[in] name The policy to act upon, by human-readable name
- * \param[in] opc The opcode of the operation to carry out
- * \param[in] single When set, the operation will only be carried out on the
+ * \param[in] name the policy to act upon, by human-readable name
+ * \param[in] opc the opcode of the operation to carry out
+ * \param[in] single when set, the operation will only be carried out on the
* NRS heads of the first service partition of \a svc.
* This is useful for some policies which e.g. share
* identical values on the same parameters of different
* print out the values from the first service partition.
* Storing these values centrally elsewhere then could be
* another solution for this.
- * \param[in,out] arg Can be used as a generic in/out buffer between control
+ * \param[in,out] arg can be used as a generic in/out buffer between control
* operations and the user environment.
*
*\retval -ve error condition
*\retval 0 operation was carried out successfully
*/
-int
-ptlrpc_nrs_policy_control(struct ptlrpc_service *svc,
- enum ptlrpc_nrs_queue_type queue, char *name,
- enum ptlrpc_nrs_ctl opc, bool single, void *arg)
+int ptlrpc_nrs_policy_control(const struct ptlrpc_service *svc,
+ enum ptlrpc_nrs_queue_type queue, char *name,
+ enum ptlrpc_nrs_ctl opc, bool single, void *arg)
{
struct ptlrpc_service_part *svcpt;
int i;
int rc = 0;
ENTRY;
- ptlrpc_service_for_each_part(svcpt, i, svc) {
- switch (queue) {
- default:
- return -EINVAL;
+ LASSERT(opc != PTLRPC_NRS_CTL_INVALID);
- case PTLRPC_NRS_QUEUE_BOTH:
- case PTLRPC_NRS_QUEUE_REG:
+ if ((queue & PTLRPC_NRS_QUEUE_BOTH) == 0)
+ return -EINVAL;
+
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, false), name,
opc, arg);
if (rc != 0 || (queue == PTLRPC_NRS_QUEUE_REG &&
single))
GOTO(out, rc);
+ }
- if (queue == PTLRPC_NRS_QUEUE_REG)
- break;
-
- /* fallthrough */
-
- case PTLRPC_NRS_QUEUE_HP:
+ if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
/**
* XXX: We could optionally check for
* nrs_svc_has_hp(svc) here, and return an error if it
opc, arg);
if (rc != 0 || single)
GOTO(out, rc);
-
- break;
}
}
out:
RETURN(rc);
}
+
+/* ptlrpc/nrs_fifo.c */
+extern struct ptlrpc_nrs_pol_conf nrs_conf_fifo;
+#if defined HAVE_SERVER_SUPPORT && defined(__KERNEL__)
+/* ptlrpc/nrs_crr.c */
+extern struct ptlrpc_nrs_pol_conf nrs_conf_crrn;
+/* ptlrpc/nrs_orr.c */
+extern struct ptlrpc_nrs_pol_conf nrs_conf_orr;
+extern struct ptlrpc_nrs_pol_conf nrs_conf_trr;
+#endif
+
/**
- * Adds all policies that ship with NRS, i.e. those in the \e nrs_pols_builtin
- * array, to NRS core's list of policies \e nrs_core.nrs_policies.
+ * Adds all policies that ship with the ptlrpc module, to NRS core's list of
+ * policies \e nrs_core.nrs_policies.
*
- * \retval 0 All policy descriptors in \e nrs_pols_builtin have been added
- * successfully to \e nrs_core.nrs_policies
+ * \retval 0 all policies have been registered successfully
+ * \retval -ve error
*/
-int
-ptlrpc_nrs_init(void)
+int ptlrpc_nrs_init(void)
{
- int rc = -EINVAL;
- int i;
+ int rc;
ENTRY;
- /**
- * Initialize the NRS core object.
- */
mutex_init(&nrs_core.nrs_mutex);
- CFS_INIT_LIST_HEAD(&nrs_core.nrs_heads);
CFS_INIT_LIST_HEAD(&nrs_core.nrs_policies);
- for (i = 0; i < ARRAY_SIZE(nrs_pols_builtin); i++) {
- /**
- * No need to take nrs_core.nrs_mutex as there is no contention at
- * this early stage.
- */
- rc = nrs_policy_exists_locked(nrs_pols_builtin[i]);
- /**
- * This should not fail for in-tree policies.
- */
- LASSERT(rc == false);
- cfs_list_add_tail(&nrs_pols_builtin[i]->pd_list,
- &nrs_core.nrs_policies);
- }
+ rc = ptlrpc_nrs_policy_register(&nrs_conf_fifo);
+ if (rc != 0)
+ GOTO(fail, rc);
+
+#if defined HAVE_SERVER_SUPPORT && defined(__KERNEL__)
+ rc = ptlrpc_nrs_policy_register(&nrs_conf_crrn);
+ if (rc != 0)
+ GOTO(fail, rc);
+
+ rc = ptlrpc_nrs_policy_register(&nrs_conf_orr);
+ if (rc != 0)
+ GOTO(fail, rc);
+
+ rc = ptlrpc_nrs_policy_register(&nrs_conf_trr);
+ if (rc != 0)
+ GOTO(fail, rc);
+#endif
+
+ RETURN(rc);
+fail:
+ /**
+ * Since no PTLRPC services have been started at this point, all we need
+ * to do for cleanup is to free the descriptors.
+ */
+ ptlrpc_nrs_fini();
RETURN(rc);
}
/**
- * Stub finalization function
+ * Removes all policy desciptors from nrs_core::nrs_policies, and frees the
+ * policy descriptors.
+ *
+ * Since all PTLRPC services are stopped at this point, there are no more
+ * instances of any policies, because each service will have stopped its policy
+ * instances in ptlrpc_service_nrs_cleanup(), so we just need to free the
+ * descriptors here.
*/
-void
-ptlrpc_nrs_fini(void)
+void ptlrpc_nrs_fini(void)
{
+ struct ptlrpc_nrs_pol_desc *desc;
+ struct ptlrpc_nrs_pol_desc *tmp;
+
+ cfs_list_for_each_entry_safe(desc, tmp, &nrs_core.nrs_policies,
+ pd_list) {
+ cfs_list_del_init(&desc->pd_list);
+ OBD_FREE_PTR(desc);
+ }
}
/** @} nrs */