Whamcloud - gitweb
LU-398 ptlrpc: NRS framework follow-up patch
authorNikitas Angelinas <nikitas_angelinas@xyratex.com>
Fri, 1 Feb 2013 10:58:58 +0000 (10:58 +0000)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 1 Apr 2013 15:59:36 +0000 (11:59 -0400)
This patch addresses some outstanding issues that had been raised
by reviewers of the "Add the NRS framework and FIFO policy" patch,
and include some other improvements, e.g. it reworks the API
slightly in order to optimize some frequently-used operations,
does not uanncessarily policies in liblustre and client-only
kernel builds, and makes sure we hold module references when
required, for policies registering from other modules.

Signed-off-by: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
Change-Id: I9306d43e2aef20aa64d6870a56ae99859ce40cd5
Oracle-bug-id: b=13634
Xyratex-bug-id: MRP-73
Reviewed-on: http://review.whamcloud.com/5274
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Liang Zhen <liang.zhen@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_net.h
lustre/ldlm/ldlm_lockd.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/nrs.c
lustre/ptlrpc/nrs_fifo.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/service.c

index 0c8a806..da2e1cc 100644 (file)
@@ -767,6 +767,10 @@ struct ptlrpc_nrs_request;
  */
 enum ptlrpc_nrs_ctl {
        /**
  */
 enum ptlrpc_nrs_ctl {
        /**
+        * Not a valid opcode.
+        */
+       PTLRPC_NRS_CTL_INVALID,
+       /**
         * Activate the policy.
         */
        PTLRPC_NRS_CTL_START,
         * Activate the policy.
         */
        PTLRPC_NRS_CTL_START,
@@ -776,14 +780,6 @@ enum ptlrpc_nrs_ctl {
         */
        PTLRPC_NRS_CTL_STOP,
        /**
         */
        PTLRPC_NRS_CTL_STOP,
        /**
-        * Recycle resources for inactive policies.
-        */
-       PTLRPC_NRS_CTL_SHRINK,
-       /**
-        * Not a valid opcode.
-        */
-       PTLRPC_NRS_CTL_INVALID,
-       /**
         * Policies can start using opcodes from this value and onwards for
         * their own purposes; the assigned value itself is arbitrary.
         */
         * Policies can start using opcodes from this value and onwards for
         * their own purposes; the assigned value itself is arbitrary.
         */
@@ -800,20 +796,20 @@ struct ptlrpc_nrs_pol_ops {
        /**
         * Called during policy registration; this operation is optional.
         *
        /**
         * Called during policy registration; this operation is optional.
         *
-        * \param[in] policy The policy being initialized
+        * \param[in,out] policy The policy being initialized
         */
        int     (*op_policy_init) (struct ptlrpc_nrs_policy *policy);
        /**
         * Called during policy unregistration; this operation is optional.
         *
         */
        int     (*op_policy_init) (struct ptlrpc_nrs_policy *policy);
        /**
         * Called during policy unregistration; this operation is optional.
         *
-        * \param[in] policy The policy being unregistered/finalized
+        * \param[in,out] policy The policy being unregistered/finalized
         */
        void    (*op_policy_fini) (struct ptlrpc_nrs_policy *policy);
        /**
         * Called when activating a policy via lprocfs; policies allocate and
         * initialize their resources here; this operation is optional.
         *
         */
        void    (*op_policy_fini) (struct ptlrpc_nrs_policy *policy);
        /**
         * Called when activating a policy via lprocfs; policies allocate and
         * initialize their resources here; this operation is optional.
         *
-        * \param[in] policy The policy being started
+        * \param[in,out] policy The policy being started
         *
         * \see nrs_policy_start_locked()
         */
         *
         * \see nrs_policy_start_locked()
         */
@@ -822,9 +818,9 @@ struct ptlrpc_nrs_pol_ops {
         * Called when deactivating a policy via lprocfs; policies deallocate
         * their resources here; this operation is optional
         *
         * Called when deactivating a policy via lprocfs; policies deallocate
         * their resources here; this operation is optional
         *
-        * \param[in] policy The policy being stopped
+        * \param[in,out] policy The policy being stopped
         *
         *
-        * \see nrs_policy_stop_final()
+        * \see nrs_policy_stop0()
         */
        void    (*op_policy_stop) (struct ptlrpc_nrs_policy *policy);
        /**
         */
        void    (*op_policy_stop) (struct ptlrpc_nrs_policy *policy);
        /**
@@ -832,7 +828,7 @@ struct ptlrpc_nrs_pol_ops {
         * \e PTLRPC_NRS_CTL_START and \e PTLRPC_NRS_CTL_GET_INFO; analogous
         * to an ioctl; this operation is optional.
         *
         * \e PTLRPC_NRS_CTL_START and \e PTLRPC_NRS_CTL_GET_INFO; analogous
         * to an ioctl; this operation is optional.
         *
-        * \param[in]     policy The policy carrying out operation \a opc
+        * \param[in,out]        policy The policy carrying out operation \a opc
         * \param[in]     opc    The command operation being carried out
         * \param[in,out] arg    An generic buffer for communication between the
         *                       user and the control operation
         * \param[in]     opc    The command operation being carried out
         * \param[in,out] arg    An generic buffer for communication between the
         *                       user and the control operation
@@ -851,11 +847,11 @@ struct ptlrpc_nrs_pol_ops {
         * service. Policies should return -ve for requests they do not wish
         * to handle. This operation is mandatory.
         *
         * service. Policies should return -ve for requests they do not wish
         * to handle. This operation is mandatory.
         *
-        * \param[in]  policy     The policy we're getting resources for.
-        * \param[in]  nrq        The request we are getting resources for.
-        * \param[in]  parent     The parent resource of the resource being
+        * \param[in,out] policy  The policy we're getting resources for.
+        * \param[in,out] nrq     The request we are getting resources for.
+        * \param[in]     parent  The parent resource of the resource being
         *                        requested; set to NULL if none.
         *                        requested; set to NULL if none.
-        * \param[out] resp       The resource is to be returned here; the
+        * \param[out]    resp    The resource is to be returned here; the
         *                        fallback policy in an NRS head should
         *                        \e always return a non-NULL pointer value.
         * \param[in]  moving_req When set, signifies that this is an attempt
         *                        fallback policy in an NRS head should
         *                        \e always return a non-NULL pointer value.
         * \param[in]  moving_req When set, signifies that this is an attempt
@@ -883,42 +879,48 @@ struct ptlrpc_nrs_pol_ops {
         */
        int     (*op_res_get) (struct ptlrpc_nrs_policy *policy,
                               struct ptlrpc_nrs_request *nrq,
         */
        int     (*op_res_get) (struct ptlrpc_nrs_policy *policy,
                               struct ptlrpc_nrs_request *nrq,
-                              struct ptlrpc_nrs_resource *parent,
+                              const struct ptlrpc_nrs_resource *parent,
                               struct ptlrpc_nrs_resource **resp,
                               bool moving_req);
        /**
         * Called when releasing references taken for resources in the resource
         * hierarchy for the request; this operation is optional.
         *
                               struct ptlrpc_nrs_resource **resp,
                               bool moving_req);
        /**
         * Called when releasing references taken for resources in the resource
         * hierarchy for the request; this operation is optional.
         *
-        * \param[in] policy   The policy the resource belongs to
-        * \param[in] res      The resource to be freed
+        * \param[in,out] policy The policy the resource belongs to
+        * \param[in] res        The resource to be freed
         *
         * \see ptlrpc_nrs_req_finalize()
         * \see ptlrpc_nrs_hpreq_add_nolock()
         * \see ptlrpc_nrs_req_hp_move()
         */
        void    (*op_res_put) (struct ptlrpc_nrs_policy *policy,
         *
         * \see ptlrpc_nrs_req_finalize()
         * \see ptlrpc_nrs_hpreq_add_nolock()
         * \see ptlrpc_nrs_req_hp_move()
         */
        void    (*op_res_put) (struct ptlrpc_nrs_policy *policy,
-                              struct ptlrpc_nrs_resource *res);
+                              const struct ptlrpc_nrs_resource *res);
 
        /**
 
        /**
-        * Obtain a request for handling from the policy via polling; this
-        * operation is mandatory.
+        * Obtains a request for handling from the policy, and optionally
+        * removes the request from the policy; this operation is mandatory.
         *
         *
-        * \param[in] policy The policy to poll
+        * \param[in,out] policy The policy to poll
+        * \param[in]     peek   When set, signifies that we just want to
+        *                       examine the request, and not handle it, so the
+        *                       request is not removed from the policy.
+        * \param[in]     force  When set, it will force a policy to return a
+        *                       request if it has one queued.
         *
         *
-        * \retval NULL No erquest available for handling
+        * \retval NULL No request available for handling
         * \retval valid-pointer The request polled for handling
         *
         * \retval valid-pointer The request polled for handling
         *
-        * \see ptlrpc_nrs_req_poll_nolock()
+        * \see ptlrpc_nrs_req_get_nolock()
         */
        struct ptlrpc_nrs_request *
         */
        struct ptlrpc_nrs_request *
-               (*op_req_poll) (struct ptlrpc_nrs_policy *policy);
+               (*op_req_get) (struct ptlrpc_nrs_policy *policy, bool peek,
+                              bool force);
        /**
         * Called when attempting to add a request to a policy for later
         * handling; this operation is mandatory.
         *
        /**
         * Called when attempting to add a request to a policy for later
         * handling; this operation is mandatory.
         *
-        * \param[in] policy The policy on which to enqueue \a nrq
-        * \param[in] nrq    The request to enqueue
+        * \param[in,out] policy  The policy on which to enqueue \a nrq
+        * \param[in,out] nrq The request to enqueue
         *
         * \retval 0    success
         * \retval != 0 error
         *
         * \retval 0    success
         * \retval != 0 error
@@ -932,34 +934,20 @@ struct ptlrpc_nrs_pol_ops {
         * called after a request has been polled successfully from the policy
         * for handling; this operation is mandatory.
         *
         * called after a request has been polled successfully from the policy
         * for handling; this operation is mandatory.
         *
-        * \param[in] policy The policy the request \a nrq belongs to
-        * \param[in] nrq    The request to dequeue
+        * \param[in,out] policy The policy the request \a nrq belongs to
+        * \param[in,out] nrq    The request to dequeue
         *
         * \see ptlrpc_nrs_req_del_nolock()
         */
        void    (*op_req_dequeue) (struct ptlrpc_nrs_policy *policy,
                                   struct ptlrpc_nrs_request *nrq);
        /**
         *
         * \see ptlrpc_nrs_req_del_nolock()
         */
        void    (*op_req_dequeue) (struct ptlrpc_nrs_policy *policy,
                                   struct ptlrpc_nrs_request *nrq);
        /**
-        * Called before carrying out the request; should not block. Could be
-        * used for job/resource control; this operation is optional.
-        *
-        * \param[in] policy The policy which is starting to handle request
-        *                   \a nrq
-        * \param[in] nrq    The request
-        *
-        * \pre spin_is_locked(&svcpt->scp_req_lock)
-        *
-        * \see ptlrpc_nrs_req_start_nolock()
-        */
-       void    (*op_req_start) (struct ptlrpc_nrs_policy *policy,
-                                struct ptlrpc_nrs_request *nrq);
-       /**
         * Called after the request being carried out. Could be used for
         * job/resource control; this operation is optional.
         *
         * Called after the request being carried out. Could be used for
         * job/resource control; this operation is optional.
         *
-        * \param[in] policy The policy which is stopping to handle request
-        *                   \a nrq
-        * \param[in] nrq    The request
+        * \param[in,out] policy The policy which is stopping to handle request
+        *                       \a nrq
+        * \param[in,out] nrq    The request
         *
         * \pre spin_is_locked(&svcpt->scp_req_lock)
         *
         *
         * \pre spin_is_locked(&svcpt->scp_req_lock)
         *
@@ -979,6 +967,12 @@ struct ptlrpc_nrs_pol_ops {
        /**
         * Unegisters the policy's lprocfs interface with a PTLRPC service.
         *
        /**
         * Unegisters the policy's lprocfs interface with a PTLRPC service.
         *
+        * In cases of failed policy registration in
+        * \e ptlrpc_nrs_policy_register(), this function may be called for a
+        * service which has not registered the policy successfully, so
+        * implementations of this method should make sure their operations are
+        * safe in such cases.
+        *
         * \param[in] svc The service
         */
        void    (*op_lprocfs_fini) (struct ptlrpc_service *svc);
         * \param[in] svc The service
         */
        void    (*op_lprocfs_fini) (struct ptlrpc_service *svc);
@@ -990,9 +984,8 @@ struct ptlrpc_nrs_pol_ops {
 enum nrs_policy_flags {
        /**
         * Fallback policy, use this flag only on a single supported policy per
 enum nrs_policy_flags {
        /**
         * Fallback policy, use this flag only on a single supported policy per
-        * service. Do not use this flag for policies registering using
-        * ptlrpc_nrs_policy_register() (i.e. ones that are not in
-        * \e nrs_pols_builtin).
+        * service. The flag cannot be used on policies that use
+        * \e PTLRPC_NRS_FL_REG_EXTERN
         */
        PTLRPC_NRS_FL_FALLBACK          = (1 << 0),
        /**
         */
        PTLRPC_NRS_FL_FALLBACK          = (1 << 0),
        /**
@@ -1000,10 +993,8 @@ enum nrs_policy_flags {
         */
        PTLRPC_NRS_FL_REG_START         = (1 << 1),
        /**
         */
        PTLRPC_NRS_FL_REG_START         = (1 << 1),
        /**
-        * This is a polciy registering externally with NRS core, via
-        * ptlrpc_nrs_policy_register(), (i.e. one that is not in
-        * \e nrs_pols_builtin. Used to avoid ptlrpc_nrs_policy_register()
-        * racing with a policy start operation issued by the user via lprocfs.
+        * This is a policy registering from a module different to the one NRS
+        * core ships in (currently ptlrpc).
         */
        PTLRPC_NRS_FL_REG_EXTERN        = (1 << 2),
 };
         */
        PTLRPC_NRS_FL_REG_EXTERN        = (1 << 2),
 };
@@ -1016,9 +1007,9 @@ enum nrs_policy_flags {
  * in a service.
  */
 enum ptlrpc_nrs_queue_type {
  * in a service.
  */
 enum ptlrpc_nrs_queue_type {
-       PTLRPC_NRS_QUEUE_REG,
-       PTLRPC_NRS_QUEUE_HP,
-       PTLRPC_NRS_QUEUE_BOTH,
+       PTLRPC_NRS_QUEUE_REG    = (1 << 0),
+       PTLRPC_NRS_QUEUE_HP     = (1 << 1),
+       PTLRPC_NRS_QUEUE_BOTH   = (PTLRPC_NRS_QUEUE_REG | PTLRPC_NRS_QUEUE_HP)
 };
 
 /**
 };
 
 /**
@@ -1051,10 +1042,6 @@ struct ptlrpc_nrs {
        spinlock_t                      nrs_lock;
        /** XXX Possibly replace svcpt->scp_req_lock with another lock here. */
        /**
        spinlock_t                      nrs_lock;
        /** XXX Possibly replace svcpt->scp_req_lock with another lock here. */
        /**
-        * Linkage into nrs_core_heads_list
-        */
-       cfs_list_t                      nrs_heads;
-       /**
         * List of registered policies
         */
        cfs_list_t                      nrs_policy_list;
         * List of registered policies
         */
        cfs_list_t                      nrs_policy_list;
@@ -1093,7 +1080,6 @@ struct ptlrpc_nrs {
        unsigned long                   nrs_req_started;
        /**
         * # policies on this NRS
        unsigned long                   nrs_req_started;
        /**
         * # policies on this NRS
-        * TODO: Can we avoid having this?
         */
        unsigned                        nrs_num_pols;
        /**
         */
        unsigned                        nrs_num_pols;
        /**
@@ -1109,6 +1095,52 @@ struct ptlrpc_nrs {
 
 #define NRS_POL_NAME_MAX               16
 
 
 #define NRS_POL_NAME_MAX               16
 
+struct ptlrpc_nrs_pol_desc;
+
+/**
+ * Service compatibility predicate; this determines whether a policy is adequate
+ * for handling RPCs of a particular PTLRPC service.
+ *
+ * XXX:This should give the same result during policy registration and
+ * unregistration, and for all partitions of a service; so the result should not
+ * depend on temporal service or other properties, that may influence the
+ * result.
+ */
+typedef bool (*nrs_pol_desc_compat_t) (const struct ptlrpc_service *svc,
+                                      const struct ptlrpc_nrs_pol_desc *desc);
+
+struct ptlrpc_nrs_pol_conf {
+       /**
+        * Human-readable policy name
+        */
+       char                               nc_name[NRS_POL_NAME_MAX];
+       /**
+        * NRS operations for this policy
+        */
+       const struct ptlrpc_nrs_pol_ops   *nc_ops;
+       /**
+        * Service compatibility predicate
+        */
+       nrs_pol_desc_compat_t              nc_compat;
+       /**
+        * Set for policies that support a single ptlrpc service, i.e. ones that
+        * have \a pd_compat set to nrs_policy_compat_one(). The variable value
+        * depicts the name of the single service that such policies are
+        * compatible with.
+        */
+       const char                        *nc_compat_svc_name;
+       /**
+        * Owner module for this policy descriptor; policies registering from a
+        * different module to the one the NRS framework is held within
+        * (currently ptlrpc), should set this field to THIS_MODULE.
+        */
+       cfs_module_t                      *nc_owner;
+       /**
+        * Policy registration flags; a bitmast of \e nrs_policy_flags
+        */
+       unsigned                           nc_flags;
+};
+
 /**
  * NRS policy registering descriptor
  *
 /**
  * NRS policy registering descriptor
  *
@@ -1119,35 +1151,73 @@ struct ptlrpc_nrs_pol_desc {
        /**
         * Human-readable policy name
         */
        /**
         * Human-readable policy name
         */
-       char                            pd_name[NRS_POL_NAME_MAX];
+       char                                    pd_name[NRS_POL_NAME_MAX];
        /**
        /**
-        * NRS operations for this policy
+        * Link into nrs_core::nrs_policies
         */
         */
-       struct ptlrpc_nrs_pol_ops      *pd_ops;
+       cfs_list_t                              pd_list;
        /**
        /**
-        * Service Compatibility function; this determines whether a policy is
-        * adequate for handling RPCs of a particular PTLRPC service.
-        *
-        * XXX:This should give the same result during policy
-        * registration and unregistration, and for all partitions of a
-        * service; so the result should not depend on temporal service
-        * or other properties, that may influence the result.
+        * NRS operations for this policy
         */
         */
-       bool    (*pd_compat) (struct ptlrpc_service *svc,
-                             const struct ptlrpc_nrs_pol_desc *desc);
+       const struct ptlrpc_nrs_pol_ops        *pd_ops;
        /**
        /**
-        * Optionally set for policies that support a single ptlrpc service,
-        * i.e. ones that have \a pd_compat set to nrs_policy_compat_one()
+        * Service compatibility predicate
         */
         */
-       char                           *pd_compat_svc_name;
+       nrs_pol_desc_compat_t                   pd_compat;
        /**
        /**
-        * Bitmask of nrs_policy_flags
+        * Set for policies that are compatible with only one PTLRPC service.
+        *
+        * \see ptlrpc_nrs_pol_conf::nc_compat_svc_name
         */
         */
-       unsigned                        pd_flags;
+       const char                             *pd_compat_svc_name;
        /**
        /**
-        * Link into nrs_core::nrs_policies
-        */
-       cfs_list_t                      pd_list;
+        * Owner module for this policy descriptor.
+        *
+        * We need to hold a reference to the module whenever we might make use
+        * of any of the module's contents, i.e.
+        * - If one or more instances of the policy are at a state where they
+        *   might be handling a request, i.e.
+        *   ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
+        *   ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING as we will have to
+        *   call into the policy's ptlrpc_nrs_pol_ops() handlers. A reference
+        *   is taken on the module when
+        *   \e ptlrpc_nrs_pol_desc::pd_refs becomes 1, and released when it
+        *   becomes 0, so that we hold only one reference to the module maximum
+        *   at any time.
+        *
+        *   We do not need to hold a reference to the module, even though we
+        *   might use code and data from the module, in the following cases:
+        * - During external policy registration, because this should happen in
+        *   the module's init() function, in which case the module is safe from
+        *   removal because a reference is being held on the module by the
+        *   kernel, and iirc kmod (and I guess module-init-tools also) will
+        *   serialize any racing processes properly anyway.
+        * - During external policy unregistration, because this should happen
+        *   in a module's exit() function, and any attempts to start a policy
+        *   instance would need to take a reference on the module, and this is
+        *   not possible once we have reached the point where the exit()
+        *   handler is called.
+        * - During service registration and unregistration, as service setup
+        *   and cleanup, and policy registration, unregistration and policy
+        *   instance starting, are serialized by \e nrs_core::nrs_mutex, so
+        *   as long as users adhere to the convention of registering policies
+        *   in init() and unregistering them in module exit() functions, there
+        *   should not be a race between these operations.
+        * - During any policy-specific lprocfs operations, because a reference
+        *   is held by the kernel on a proc entry that has been entered by a
+        *   syscall, so as long as proc entries are removed during unregistration time,
+        *   then unregistration and lprocfs operations will be properly
+        *   serialized.
+        */
+       cfs_module_t                           *pd_owner;
+       /**
+        * Bitmask of \e nrs_policy_flags
+        */
+       unsigned                                pd_flags;
+       /**
+        * # of references on this descriptor
+        */
+       cfs_atomic_t                            pd_refs;
 };
 
 /**
 };
 
 /**
@@ -1161,17 +1231,6 @@ enum ptlrpc_nrs_pol_state {
         */
        NRS_POL_STATE_INVALID,
        /**
         */
        NRS_POL_STATE_INVALID,
        /**
-        * For now, this state is used exclusively for policies that register
-        * externally to NRS core, i.e. ones that do so via
-        * ptlrpc_nrs_policy_register() and are not part of nrs_pols_builtin;
-        * it is used to prevent a race condition between the policy registering
-        * with more than one service partition while service is operational,
-        * and the user starting the policy via lprocfs.
-        *
-        * \see nrs_pol_make_avail()
-        */
-       NRS_POL_STATE_UNAVAIL,
-       /**
         * Policies are at this state either at the start of their life, or
         * transition here when the user selects a different policy to act
         * as the primary one.
         * Policies are at this state either at the start of their life, or
         * transition here when the user selects a different policy to act
         * as the primary one.
@@ -1263,17 +1322,13 @@ struct ptlrpc_nrs_policy {
         */
        struct ptlrpc_nrs              *pol_nrs;
        /**
         */
        struct ptlrpc_nrs              *pol_nrs;
        /**
-        * NRS operations for this policy; points to ptlrpc_nrs_pol_desc::pd_ops
-        */
-       struct ptlrpc_nrs_pol_ops      *pol_ops;
-       /**
         * Private policy data; varies by policy type
         */
        void                           *pol_private;
        /**
         * Private policy data; varies by policy type
         */
        void                           *pol_private;
        /**
-        * Human-readable policy name; point to ptlrpc_nrs_pol_desc::pd_name
+        * Policy descriptor for this policy instance.
         */
         */
-       char                           *pol_name;
+       struct ptlrpc_nrs_pol_desc     *pol_desc;
 };
 
 /**
 };
 
 /**
@@ -1351,7 +1406,6 @@ struct nrs_fifo_head {
 };
 
 struct nrs_fifo_req {
 };
 
 struct nrs_fifo_req {
-       /** request header, must be the first member of structure */
        cfs_list_t              fr_list;
        __u64                   fr_sequence;
 };
        cfs_list_t              fr_list;
        __u64                   fr_sequence;
 };
@@ -1380,7 +1434,6 @@ struct ptlrpc_nrs_request {
        unsigned                        nr_res_idx;
        unsigned                        nr_initialized:1;
        unsigned                        nr_enqueued:1;
        unsigned                        nr_res_idx;
        unsigned                        nr_initialized:1;
        unsigned                        nr_enqueued:1;
-       unsigned                        nr_dequeued:1;
        unsigned                        nr_started:1;
        unsigned                        nr_finalized:1;
        cfs_binheap_node_t              nr_node;
        unsigned                        nr_started:1;
        unsigned                        nr_finalized:1;
        cfs_binheap_node_t              nr_node;
@@ -1715,8 +1768,8 @@ static inline int ptlrpc_req_interpret(const struct lu_env *env,
 /** \addtogroup  nrs
  * @{
  */
 /** \addtogroup  nrs
  * @{
  */
-int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_desc *desc);
-int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_desc *desc);
+int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf);
+int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_conf *conf);
 void ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req);
 void nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy,
                                struct ptlrpc_nrs_pol_info *info);
 void ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req);
 void nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy,
                                struct ptlrpc_nrs_pol_info *info);
@@ -1727,8 +1780,7 @@ void nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy,
  *
  * For a reliable result, this should be checked under svcpt->scp_req lock.
  */
  *
  * For a reliable result, this should be checked under svcpt->scp_req lock.
  */
-static inline bool
-ptlrpc_nrs_req_can_move(struct ptlrpc_request *req)
+static inline bool ptlrpc_nrs_req_can_move(struct ptlrpc_request *req)
 {
        struct ptlrpc_nrs_request *nrq = &req->rq_nrq;
 
 {
        struct ptlrpc_nrs_request *nrq = &req->rq_nrq;
 
@@ -2436,38 +2488,36 @@ enum ptlrpcd_ctl_flags {
  * \addtogroup nrs
  * @{
  *
  * \addtogroup nrs
  * @{
  *
- * Service compatibility function; policy is compatible with all services.
+ * Service compatibility function; the policy is compatible with all services.
  *
  * \param[in] svc  The service the policy is attempting to register with.
  * \param[in] desc The policy descriptor
  *
  *
  * \param[in] svc  The service the policy is attempting to register with.
  * \param[in] desc The policy descriptor
  *
- * \retval true The policy is compatible with the NRS head
+ * \retval true The policy is compatible with the service
  *
  * \see ptlrpc_nrs_pol_desc::pd_compat()
  */
  *
  * \see ptlrpc_nrs_pol_desc::pd_compat()
  */
-static inline bool
-nrs_policy_compat_all(struct ptlrpc_service *svc,
-                     const struct ptlrpc_nrs_pol_desc *desc)
+static inline bool nrs_policy_compat_all(const struct ptlrpc_service *svc,
+                                        const struct ptlrpc_nrs_pol_desc *desc)
 {
        return true;
 }
 
 /**
 {
        return true;
 }
 
 /**
- * Service compatibility function; policy is compatible with only a specific
+ * Service compatibility function; the policy is compatible with only a specific
  * service which is identified by its human-readable name at
  * ptlrpc_service::srv_name.
  *
  * \param[in] svc  The service the policy is attempting to register with.
  * \param[in] desc The policy descriptor
  *
  * service which is identified by its human-readable name at
  * ptlrpc_service::srv_name.
  *
  * \param[in] svc  The service the policy is attempting to register with.
  * \param[in] desc The policy descriptor
  *
- * \retval false The policy is not compatible with the NRS head
- * \retval true         The policy is compatible with the NRS head
+ * \retval false The policy is not compatible with the service
+ * \retval true         The policy is compatible with the service
  *
  * \see ptlrpc_nrs_pol_desc::pd_compat()
  */
  *
  * \see ptlrpc_nrs_pol_desc::pd_compat()
  */
-static inline bool
-nrs_policy_compat_one(struct ptlrpc_service *svc,
-                     const struct ptlrpc_nrs_pol_desc *desc)
+static inline bool nrs_policy_compat_one(const struct ptlrpc_service *svc,
+                                        const struct ptlrpc_nrs_pol_desc *desc)
 {
        LASSERT(desc->pd_compat_svc_name != NULL);
        return strcmp(svc->srv_name, desc->pd_compat_svc_name) == 0;
 {
        LASSERT(desc->pd_compat_svc_name != NULL);
        return strcmp(svc->srv_name, desc->pd_compat_svc_name) == 0;
index 8d2057f..31925ff 100644 (file)
@@ -810,8 +810,8 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
                                rq_exp_list) {
                /* Do not process requests that were not yet added to there
                 * incoming queue or were already removed from there for
                                rq_exp_list) {
                /* Do not process requests that were not yet added to there
                 * incoming queue or were already removed from there for
-                * processing. We evaluate ptlrpc_request_reorderable() without
-                * holding svcpt->scp_req_lock, and then redo the checks with
+                * processing. We evaluate ptlrpc_nrs_req_can_move() without
+                * holding svcpt->scp_req_lock, and then redo the check with
                 * the lock held once we need to obtain a reliable result.
                 */
                if (ptlrpc_nrs_req_can_move(req) &&
                 * the lock held once we need to obtain a reliable result.
                 */
                if (ptlrpc_nrs_req_can_move(req) &&
index c68bcaf..52af507 100644 (file)
@@ -424,16 +424,13 @@ extern struct nrs_core nrs_core;
  *
  * \param[in] state The policy state
  */
  *
  * \param[in] state The policy state
  */
-static const char *
-nrs_state2str(enum ptlrpc_nrs_pol_state state)
+static const char *nrs_state2str(enum ptlrpc_nrs_pol_state state)
 {
        switch (state) {
        default:
                LBUG();
        case NRS_POL_STATE_INVALID:
                return "invalid";
 {
        switch (state) {
        default:
                LBUG();
        case NRS_POL_STATE_INVALID:
                return "invalid";
-       case NRS_POL_STATE_UNAVAIL:
-               return "unavail";
        case NRS_POL_STATE_STOPPED:
                return "stopped";
        case NRS_POL_STATE_STOPPING:
        case NRS_POL_STATE_STOPPED:
                return "stopped";
        case NRS_POL_STATE_STOPPING:
@@ -453,15 +450,14 @@ nrs_state2str(enum ptlrpc_nrs_pol_state state)
  * \param[in] policy The policy
  * \param[out] info  Holds returned status information
  */
  * \param[in] policy The policy
  * \param[out] info  Holds returned status information
  */
-void
-nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy,
-                          struct ptlrpc_nrs_pol_info *info)
+void nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy,
+                               struct ptlrpc_nrs_pol_info *info)
 {
        LASSERT(policy != NULL);
        LASSERT(info != NULL);
        LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock));
 
 {
        LASSERT(policy != NULL);
        LASSERT(info != NULL);
        LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock));
 
-       memcpy(info->pi_name, policy->pol_name, NRS_POL_NAME_MAX);
+       memcpy(info->pi_name, policy->pol_desc->pd_name, NRS_POL_NAME_MAX);
 
        info->pi_fallback    = !!(policy->pol_flags & PTLRPC_NRS_FL_FALLBACK);
        info->pi_state       = policy->pol_state;
 
        info->pi_fallback    = !!(policy->pol_flags & PTLRPC_NRS_FL_FALLBACK);
        info->pi_state       = policy->pol_state;
@@ -477,9 +473,8 @@ nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy,
  * Reads and prints policy status information for all policies of a PTLRPC
  * service.
  */
  * Reads and prints policy status information for all policies of a PTLRPC
  * service.
  */
-static int
-ptlrpc_lprocfs_rd_nrs(char *page, char **start, off_t off,
-                     int count, int *eof, void *data)
+static int ptlrpc_lprocfs_rd_nrs(char *page, char **start, off_t off,
+                                int count, int *eof, void *data)
 {
        struct ptlrpc_service          *svc = data;
        struct ptlrpc_service_part     *svcpt;
 {
        struct ptlrpc_service          *svc = data;
        struct ptlrpc_service_part     *svcpt;
@@ -662,9 +657,8 @@ out:
  * if the optional token is omitted, the operation is performed on both the
  * regular and high-priority (if the service has one) NRS head.
  */
  * if the optional token is omitted, the operation is performed on both the
  * regular and high-priority (if the service has one) NRS head.
  */
-static int
-ptlrpc_lprocfs_wr_nrs(struct file *file, const char *buffer,
-                     unsigned long count, void *data)
+static int ptlrpc_lprocfs_wr_nrs(struct file *file, const char *buffer,
+                                unsigned long count, void *data)
 {
        struct ptlrpc_service          *svc = data;
        enum ptlrpc_nrs_queue_type      queue = PTLRPC_NRS_QUEUE_BOTH;
 {
        struct ptlrpc_service          *svc = data;
        enum ptlrpc_nrs_queue_type      queue = PTLRPC_NRS_QUEUE_BOTH;
index 31c1dc2..a4c72c3 100644 (file)
@@ -63,41 +63,47 @@ extern struct cfs_list_head ptlrpc_all_services;
  */
 struct nrs_core nrs_core;
 
  */
 struct nrs_core nrs_core;
 
-static int
-nrs_policy_init(struct ptlrpc_nrs_policy *policy)
+static int nrs_policy_init(struct ptlrpc_nrs_policy *policy)
 {
 {
-       return policy->pol_ops->op_policy_init != NULL ?
-              policy->pol_ops->op_policy_init(policy) : 0;
+       return policy->pol_desc->pd_ops->op_policy_init != NULL ?
+              policy->pol_desc->pd_ops->op_policy_init(policy) : 0;
 }
 
 }
 
-static void
-nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
+static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
 {
        LASSERT(policy->pol_ref == 0);
        LASSERT(policy->pol_req_queued == 0);
 
 {
        LASSERT(policy->pol_ref == 0);
        LASSERT(policy->pol_req_queued == 0);
 
-       if (policy->pol_ops->op_policy_fini != NULL)
-               policy->pol_ops->op_policy_fini(policy);
+       if (policy->pol_desc->pd_ops->op_policy_fini != NULL)
+               policy->pol_desc->pd_ops->op_policy_fini(policy);
 }
 
 }
 
-static int
-nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc,
-                     void *arg)
+static int nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy,
+                                enum ptlrpc_nrs_ctl opc, void *arg)
 {
 {
-       return policy->pol_ops->op_policy_ctl != NULL ?
-              policy->pol_ops->op_policy_ctl(policy, opc, arg) : -ENOSYS;
+       /**
+        * The policy may be stopped, but the lprocfs files and
+        * ptlrpc_nrs_policy instances remain present until unregistration time.
+        * Do not perform the ctl operation if the policy is stopped, as
+        * policy->pol_private will be NULL in such a case.
+        */
+       if (policy->pol_state == NRS_POL_STATE_STOPPED)
+               RETURN(-ENODEV);
+
+       RETURN(policy->pol_desc->pd_ops->op_policy_ctl != NULL ?
+              policy->pol_desc->pd_ops->op_policy_ctl(policy, opc, arg) :
+              -ENOSYS);
 }
 
 }
 
-static void
-nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
+static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
 {
        struct ptlrpc_nrs *nrs = policy->pol_nrs;
        ENTRY;
 
 {
        struct ptlrpc_nrs *nrs = policy->pol_nrs;
        ENTRY;
 
-       if (policy->pol_ops->op_policy_stop != NULL) {
+       if (policy->pol_desc->pd_ops->op_policy_stop != NULL) {
                spin_unlock(&nrs->nrs_lock);
 
                spin_unlock(&nrs->nrs_lock);
 
-               policy->pol_ops->op_policy_stop(policy);
+               policy->pol_desc->pd_ops->op_policy_stop(policy);
 
                spin_lock(&nrs->nrs_lock);
        }
 
                spin_lock(&nrs->nrs_lock);
        }
@@ -109,11 +115,14 @@ nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
        policy->pol_private = NULL;
 
        policy->pol_state = NRS_POL_STATE_STOPPED;
        policy->pol_private = NULL;
 
        policy->pol_state = NRS_POL_STATE_STOPPED;
+
+       if (cfs_atomic_dec_and_test(&policy->pol_desc->pd_refs))
+               cfs_module_put(policy->pol_desc->pd_owner);
+
        EXIT;
 }
 
        EXIT;
 }
 
-static int
-nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
+static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
 {
        struct ptlrpc_nrs *nrs = policy->pol_nrs;
        ENTRY;
 {
        struct ptlrpc_nrs *nrs = policy->pol_nrs;
        ENTRY;
@@ -151,10 +160,9 @@ nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING and if the policy has no
  * pending usage references, to ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED.
  *
  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING and if the policy has no
  * pending usage references, to ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED.
  *
- * \param[in] nrs The NRS head to carry out this operation on
+ * \param[in] nrs the NRS head to carry out this operation on
  */
  */
-static void
-nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
+static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
 {
        struct ptlrpc_nrs_policy *tmp = nrs->nrs_policy_primary;
        ENTRY;
 {
        struct ptlrpc_nrs_policy *tmp = nrs->nrs_policy_primary;
        ENTRY;
@@ -198,8 +206,7 @@ nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
  * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In
  * this case, the fallback policy is only left active in the NRS head.
  */
  * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In
  * this case, the fallback policy is only left active in the NRS head.
  */
-static int
-nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
+static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
 {
        struct ptlrpc_nrs      *nrs = policy->pol_nrs;
        int                     rc = 0;
 {
        struct ptlrpc_nrs      *nrs = policy->pol_nrs;
        int                     rc = 0;
@@ -214,8 +221,7 @@ nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
 
        LASSERT(policy->pol_state != NRS_POL_STATE_STARTING);
 
 
        LASSERT(policy->pol_state != NRS_POL_STATE_STARTING);
 
-       if (policy->pol_state == NRS_POL_STATE_STOPPING ||
-           policy->pol_state == NRS_POL_STATE_UNAVAIL)
+       if (policy->pol_state == NRS_POL_STATE_STOPPING)
                RETURN(-EAGAIN);
 
        if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
                RETURN(-EAGAIN);
 
        if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
@@ -227,7 +233,6 @@ nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
                 */
                if (policy == nrs->nrs_policy_fallback) {
                        nrs_policy_stop_primary(nrs);
                 */
                if (policy == nrs->nrs_policy_fallback) {
                        nrs_policy_stop_primary(nrs);
-
                        RETURN(0);
                }
 
                        RETURN(0);
                }
 
@@ -250,19 +255,34 @@ nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
        }
 
        /**
        }
 
        /**
-        * Serialize policy starting.across the NRS head
+        * Increase the module usage count for policies registering from other
+        * modules.
+        */
+       if (cfs_atomic_inc_return(&policy->pol_desc->pd_refs) == 1 &&
+           !cfs_try_module_get(policy->pol_desc->pd_owner)) {
+               cfs_atomic_dec(&policy->pol_desc->pd_refs);
+               CERROR("NRS: cannot get module for policy %s; is it alive?\n",
+                      policy->pol_desc->pd_name);
+               RETURN(-ENODEV);
+       }
+
+       /**
+        * Serialize policy starting across the NRS head
         */
        nrs->nrs_policy_starting = 1;
 
        policy->pol_state = NRS_POL_STATE_STARTING;
 
         */
        nrs->nrs_policy_starting = 1;
 
        policy->pol_state = NRS_POL_STATE_STARTING;
 
-       if (policy->pol_ops->op_policy_start) {
+       if (policy->pol_desc->pd_ops->op_policy_start) {
                spin_unlock(&nrs->nrs_lock);
 
                spin_unlock(&nrs->nrs_lock);
 
-               rc = policy->pol_ops->op_policy_start(policy);
+               rc = policy->pol_desc->pd_ops->op_policy_start(policy);
 
                spin_lock(&nrs->nrs_lock);
                if (rc != 0) {
 
                spin_lock(&nrs->nrs_lock);
                if (rc != 0) {
+                       if (cfs_atomic_dec_and_test(&policy->pol_desc->pd_refs))
+                               cfs_module_put(policy->pol_desc->pd_owner);
+
                        policy->pol_state = NRS_POL_STATE_STOPPED;
                        GOTO(out, rc);
                }
                        policy->pol_state = NRS_POL_STATE_STOPPED;
                        GOTO(out, rc);
                }
@@ -296,8 +316,7 @@ out:
 /**
  * Increases the policy's usage reference count.
  */
 /**
  * Increases the policy's usage reference count.
  */
-static void
-nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
+static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
 {
        policy->pol_ref++;
 }
 {
        policy->pol_ref++;
 }
@@ -308,8 +327,7 @@ nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
  * indicates it has no more queued or started requests, and can be safely
  * stopped).
  */
  * indicates it has no more queued or started requests, and can be safely
  * stopped).
  */
-static void
-nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
+static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
 {
        LASSERT(policy->pol_ref > 0);
 
 {
        LASSERT(policy->pol_ref > 0);
 
@@ -319,8 +337,7 @@ nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
                nrs_policy_stop0(policy);
 }
 
                nrs_policy_stop0(policy);
 }
 
-static void
-nrs_policy_put(struct ptlrpc_nrs_policy *policy)
+static void nrs_policy_put(struct ptlrpc_nrs_policy *policy)
 {
        spin_lock(&policy->pol_nrs->nrs_lock);
        nrs_policy_put_locked(policy);
 {
        spin_lock(&policy->pol_nrs->nrs_lock);
        nrs_policy_put_locked(policy);
@@ -330,13 +347,14 @@ nrs_policy_put(struct ptlrpc_nrs_policy *policy)
 /**
  * Find and return a policy by name.
  */
 /**
  * Find and return a policy by name.
  */
-static struct ptlrpc_nrs_policy *
-nrs_policy_find_locked(struct ptlrpc_nrs *nrs, char *name)
+static struct ptlrpc_nrs_policy * nrs_policy_find_locked(struct ptlrpc_nrs *nrs,
+                                                        char *name)
 {
        struct ptlrpc_nrs_policy *tmp;
 
 {
        struct ptlrpc_nrs_policy *tmp;
 
-       cfs_list_for_each_entry(tmp, &(nrs)->nrs_policy_list, pol_list) {
-               if (strncmp(tmp->pol_name, name, NRS_POL_NAME_MAX) == 0) {
+       cfs_list_for_each_entry(tmp, &nrs->nrs_policy_list, pol_list) {
+               if (strncmp(tmp->pol_desc->pd_name, name,
+                           NRS_POL_NAME_MAX) == 0) {
                        nrs_policy_get_locked(tmp);
                        return tmp;
                }
                        nrs_policy_get_locked(tmp);
                        return tmp;
                }
@@ -348,17 +366,16 @@ nrs_policy_find_locked(struct ptlrpc_nrs *nrs, char *name)
  * Release references for the resource hierarchy moving upwards towards the
  * policy instance resource.
  */
  * Release references for the resource hierarchy moving upwards towards the
  * policy instance resource.
  */
-static void
-nrs_resource_put(struct ptlrpc_nrs_resource *res)
+static void nrs_resource_put(struct ptlrpc_nrs_resource *res)
 {
        struct ptlrpc_nrs_policy *policy = res->res_policy;
 
 {
        struct ptlrpc_nrs_policy *policy = res->res_policy;
 
-       if (policy->pol_ops->op_res_put != NULL) {
+       if (policy->pol_desc->pd_ops->op_res_put != NULL) {
                struct ptlrpc_nrs_resource *parent;
 
                for (; res != NULL; res = parent) {
                        parent = res->res_parent;
                struct ptlrpc_nrs_resource *parent;
 
                for (; res != NULL; res = parent) {
                        parent = res->res_parent;
-                       policy->pol_ops->op_res_put(policy, res);
+                       policy->pol_desc->pd_ops->op_res_put(policy, res);
                }
        }
 }
                }
        }
 }
@@ -367,21 +384,22 @@ nrs_resource_put(struct ptlrpc_nrs_resource *res)
  * Obtains references for each resource in the resource hierarchy for request
  * \a nrq if it is to be handled by \a policy.
  *
  * Obtains references for each resource in the resource hierarchy for request
  * \a nrq if it is to be handled by \a policy.
  *
- * \param[in] policy     The policy
- * \param[in] nrq        The request
- * \param[in] moving_req  Denotes whether this is a call to the function by
+ * \param[in] policy     the policy
+ * \param[in] nrq        the request
+ * \param[in] moving_req  denotes whether this is a call to the function by
  *                       ldlm_lock_reorder_req(), in order to move \a nrq to
  *                       the high-priority NRS head; we should not sleep when
  *                       set.
  *
  *                       ldlm_lock_reorder_req(), in order to move \a nrq to
  *                       the high-priority NRS head; we should not sleep when
  *                       set.
  *
- * \retval NULL Resource hierarchy references not obtained
- * \retval valid-pointer  The bottom level of the resource hierarchy
+ * \retval NULL                  resource hierarchy references not obtained
+ * \retval valid-pointer  the bottom level of the resource hierarchy
  *
  * \see ptlrpc_nrs_pol_ops::op_res_get()
  */
  *
  * \see ptlrpc_nrs_pol_ops::op_res_get()
  */
-static struct ptlrpc_nrs_resource *
-nrs_resource_get(struct ptlrpc_nrs_policy *policy,
-                struct ptlrpc_nrs_request *nrq, bool moving_req)
+static
+struct ptlrpc_nrs_resource * nrs_resource_get(struct ptlrpc_nrs_policy *policy,
+                                             struct ptlrpc_nrs_request *nrq,
+                                             bool moving_req)
 {
        /**
         * Set to NULL to traverse the resource hierarchy from the top.
 {
        /**
         * Set to NULL to traverse the resource hierarchy from the top.
@@ -391,8 +409,8 @@ nrs_resource_get(struct ptlrpc_nrs_policy *policy,
        int                         rc;
 
        while (1) {
        int                         rc;
 
        while (1) {
-               rc = policy->pol_ops->op_res_get(policy, nrq, res, &tmp,
-                                                moving_req);
+               rc = policy->pol_desc->pd_ops->op_res_get(policy, nrq, res,
+                                                         &tmp, moving_req);
                if (rc < 0) {
                        if (res != NULL)
                                nrs_resource_put(res);
                if (rc < 0) {
                        if (res != NULL)
                                nrs_resource_put(res);
@@ -418,11 +436,11 @@ nrs_resource_get(struct ptlrpc_nrs_policy *policy,
  * the fallback and current primary policy (if any), that will later be used
  * to handle request \a nrq.
  *
  * the fallback and current primary policy (if any), that will later be used
  * to handle request \a nrq.
  *
- * \param[in]  nrs  The NRS head instance that will be handling request \a nrq.
- * \param[in]  nrq  The request that is being handled.
- * \param[out] resp The array where references to the resource hierarchy are
+ * \param[in]  nrs  the NRS head instance that will be handling request \a nrq.
+ * \param[in]  nrq  the request that is being handled.
+ * \param[out] resp the array where references to the resource hierarchy are
  *                 stored.
  *                 stored.
- * \param[in]  moving_req  Is set when obtaining resources while moving a
+ * \param[in]  moving_req  is set when obtaining resources while moving a
  *                        request from a policy on the regular NRS head to a
  *                        policy on the HP NRS head (via
  *                        ldlm_lock_reorder_req()). It signifies that
  *                        request from a policy on the regular NRS head to a
  *                        policy on the HP NRS head (via
  *                        ldlm_lock_reorder_req()). It signifies that
@@ -430,9 +448,10 @@ nrs_resource_get(struct ptlrpc_nrs_policy *policy,
  *                        a full explanation, see comment in
  *                        ptlrpc_nrs_pol_ops::op_res_get().
  */
  *                        a full explanation, see comment in
  *                        ptlrpc_nrs_pol_ops::op_res_get().
  */
-static void
-nrs_resource_get_safe(struct ptlrpc_nrs *nrs, struct ptlrpc_nrs_request *nrq,
-                     struct ptlrpc_nrs_resource **resp, bool moving_req)
+static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
+                                 struct ptlrpc_nrs_request *nrq,
+                                 struct ptlrpc_nrs_resource **resp,
+                                 bool moving_req)
 {
        struct ptlrpc_nrs_policy   *primary = NULL;
        struct ptlrpc_nrs_policy   *fallback = NULL;
 {
        struct ptlrpc_nrs_policy   *primary = NULL;
        struct ptlrpc_nrs_policy   *fallback = NULL;
@@ -475,16 +494,15 @@ nrs_resource_get_safe(struct ptlrpc_nrs *nrs, struct ptlrpc_nrs_request *nrq,
 
 /**
  * Releases references to resource hierarchies and policies, because they are no
 
 /**
  * Releases references to resource hierarchies and policies, because they are no
- * longer required; used when request handling has been completed, ot the
+ * longer required; used when request handling has been completed, or the
  * request is moving to the high priority NRS head.
  *
  * request is moving to the high priority NRS head.
  *
- * \param resp The resource hierarchy that is being released
+ * \param resp the resource hierarchy that is being released
  *
  * \see ptlrpcnrs_req_hp_move()
  * \see ptlrpc_nrs_req_finalize()
  */
  *
  * \see ptlrpcnrs_req_hp_move()
  * \see ptlrpc_nrs_req_finalize()
  */
-static void
-nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
+static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
 {
        struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
        struct ptlrpc_nrs        *nrs = NULL;
 {
        struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
        struct ptlrpc_nrs        *nrs = NULL;
@@ -516,22 +534,32 @@ nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
 }
 
 /**
 }
 
 /**
- * Obtains an NRS request from \a policy for handling via polling.
+ * Obtains an NRS request from \a policy for handling or examination; the
+ * request should be removed in the 'handling' case.
+ *
+ * Calling into this function implies we already know the policy has a request
+ * waiting to be handled.
  *
  *
- * \param[in] policy   The policy being polled
- * \param[in,out] arg   Reserved parameter
+ * \param[in] policy the policy from which a request
+ * \param[in] peek   when set, signifies that we just want to examine the
+ *                  request, and not handle it, so the request is not removed
+ *                  from the policy.
+ * \param[in] force  when set, it will force a policy to return a request if it
+ *                  has one pending
+ *
+ * \retval the NRS request to be handled
  */
  */
-static struct ptlrpc_nrs_request *
-nrs_request_poll(struct ptlrpc_nrs_policy *policy)
+static inline
+struct ptlrpc_nrs_request * nrs_request_get(struct ptlrpc_nrs_policy *policy,
+                                           bool peek, bool force)
 {
        struct ptlrpc_nrs_request *nrq;
 
        LASSERT(policy->pol_req_queued > 0);
 
 {
        struct ptlrpc_nrs_request *nrq;
 
        LASSERT(policy->pol_req_queued > 0);
 
-       nrq = policy->pol_ops->op_req_poll(policy);
+       nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force);
 
 
-       LASSERT(nrq != NULL);
-       LASSERT(nrs_request_policy(nrq) == policy);
+       LASSERT(ergo(nrq != NULL, nrs_request_policy(nrq) == policy));
 
        return nrq;
 }
 
        return nrq;
 }
@@ -542,12 +570,11 @@ nrs_request_poll(struct ptlrpc_nrs_policy *policy)
  * function attempts to enqueue the request first on the primary policy
  * (if any), since this is the preferred choice.
  *
  * function attempts to enqueue the request first on the primary policy
  * (if any), since this is the preferred choice.
  *
- * \param nrq The request being enqueued
+ * \param nrq the request being enqueued
  *
  * \see nrs_resource_get_safe()
  */
  *
  * \see nrs_resource_get_safe()
  */
-static void
-nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
+static inline void nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
 {
        struct ptlrpc_nrs_policy *policy;
        int                       rc;
 {
        struct ptlrpc_nrs_policy *policy;
        int                       rc;
@@ -564,7 +591,7 @@ nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
                nrq->nr_res_idx = i;
                policy = nrq->nr_res_ptrs[i]->res_policy;
 
                nrq->nr_res_idx = i;
                policy = nrq->nr_res_ptrs[i]->res_policy;
 
-               rc = policy->pol_ops->op_req_enqueue(policy, nrq);
+               rc = policy->pol_desc->pd_ops->op_req_enqueue(policy, nrq);
                if (rc == 0) {
                        policy->pol_nrs->nrs_req_queued++;
                        policy->pol_req_queued++;
                if (rc == 0) {
                        policy->pol_nrs->nrs_req_queued++;
                        policy->pol_req_queued++;
@@ -580,63 +607,19 @@ nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
 }
 
 /**
 }
 
 /**
- * Dequeues request \a nrq from the policy which was used for handling it
- *
- * \param nrq The request being dequeued
- *
- * \see ptlrpc_nrs_req_del_nolock()
- */
-static void
-nrs_request_dequeue(struct ptlrpc_nrs_request *nrq)
-{
-       struct ptlrpc_nrs_policy *policy;
-
-       policy = nrs_request_policy(nrq);
-
-       policy->pol_ops->op_req_dequeue(policy, nrq);
-
-       LASSERT(policy->pol_nrs->nrs_req_queued > 0);
-       LASSERT(policy->pol_req_queued > 0);
-
-       policy->pol_nrs->nrs_req_queued--;
-       policy->pol_req_queued--;
-}
-
-/**
- * Is called when the request starts being handled, after it has been enqueued,
- * polled and dequeued.
- *
- * \param[in] nrs The NRS request that is starting to be handled; can be used
- *               for job/resource control.
- *
- * \see ptlrpc_nrs_req_start_nolock()
- */
-static void
-nrs_request_start(struct ptlrpc_nrs_request *nrq)
-{
-       struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
-
-       policy->pol_req_started++;
-       policy->pol_nrs->nrs_req_started++;
-       if (policy->pol_ops->op_req_start)
-               policy->pol_ops->op_req_start(policy, nrq);
-}
-
-/**
  * Called when a request has been handled
  *
  * Called when a request has been handled
  *
- * \param[in] nrs The request that has been handled; can be used for
+ * \param[in] nrs the request that has been handled; can be used for
  *               job/resource control.
  *
  * \see ptlrpc_nrs_req_stop_nolock()
  */
  *               job/resource control.
  *
  * \see ptlrpc_nrs_req_stop_nolock()
  */
-static void
-nrs_request_stop(struct ptlrpc_nrs_request *nrq)
+static inline void nrs_request_stop(struct ptlrpc_nrs_request *nrq)
 {
        struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
 
 {
        struct ptlrpc_nrs_policy *policy = nrs_request_policy(nrq);
 
-       if (policy->pol_ops->op_req_stop)
-               policy->pol_ops->op_req_stop(policy, nrq);
+       if (policy->pol_desc->pd_ops->op_req_stop)
+               policy->pol_desc->pd_ops->op_req_stop(policy, nrq);
 
        LASSERT(policy->pol_nrs->nrs_req_started > 0);
        LASSERT(policy->pol_req_started > 0);
 
        LASSERT(policy->pol_nrs->nrs_req_started > 0);
        LASSERT(policy->pol_req_started > 0);
@@ -651,11 +634,11 @@ nrs_request_stop(struct ptlrpc_nrs_request *nrq)
  * Handles opcodes that are common to all policy types within NRS core, and
  * passes any unknown opcodes to the policy-specific control function.
  *
  * Handles opcodes that are common to all policy types within NRS core, and
  * passes any unknown opcodes to the policy-specific control function.
  *
- * \param[in]    nrs  The NRS head this policy belongs to.
- * \param[in]    name The human-readable policy name; should be the same as
+ * \param[in]    nrs  the NRS head this policy belongs to.
+ * \param[in]    name the human-readable policy name; should be the same as
  *                    ptlrpc_nrs_pol_desc::pd_name.
  *                    ptlrpc_nrs_pol_desc::pd_name.
- * \param[in]    opc  The opcode of the operation being carried out.
- * \param[in,out] arg  Can be used to pass information in and out between when
+ * \param[in]    opc  the opcode of the operation being carried out.
+ * \param[in,out] arg  can be used to pass information in and out between when
  *                    carrying an operation; usually data that is private to
  *                    the policy at some level, or generic policy status
  *                    information.
  *                    carrying an operation; usually data that is private to
  *                    the policy at some level, or generic policy status
  *                    information.
@@ -663,12 +646,12 @@ nrs_request_stop(struct ptlrpc_nrs_request *nrq)
  * \retval -ve error condition
  * \retval   0 operation was carried out successfully
  */
  * \retval -ve error condition
  * \retval   0 operation was carried out successfully
  */
-static int
-nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name, enum ptlrpc_nrs_ctl opc,
-              void *arg)
+static int nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name,
+                         enum ptlrpc_nrs_ctl opc, void *arg)
 {
        struct ptlrpc_nrs_policy       *policy;
        int                             rc = 0;
 {
        struct ptlrpc_nrs_policy       *policy;
        int                             rc = 0;
+       ENTRY;
 
        spin_lock(&nrs->nrs_lock);
 
 
        spin_lock(&nrs->nrs_lock);
 
@@ -691,14 +674,6 @@ nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name, enum ptlrpc_nrs_ctl opc,
        case PTLRPC_NRS_CTL_START:
                rc = nrs_policy_start_locked(policy);
                break;
        case PTLRPC_NRS_CTL_START:
                rc = nrs_policy_start_locked(policy);
                break;
-
-               /**
-                * TODO: This may need to be augmented for resource deallocation
-                * used by the policies.
-                */
-       case PTLRPC_NRS_CTL_SHRINK:
-               rc = -ENOSYS;
-               break;
        }
 out:
        if (policy != NULL)
        }
 out:
        if (policy != NULL)
@@ -706,21 +681,20 @@ out:
 
        spin_unlock(&nrs->nrs_lock);
 
 
        spin_unlock(&nrs->nrs_lock);
 
-       return rc;
+       RETURN(rc);
 }
 
 /**
  * Unregisters a policy by name.
  *
 }
 
 /**
  * Unregisters a policy by name.
  *
- * \param[in] nrs  The NRS head this policy belongs to.
- * \param[in] name The human-readable policy name; should be the same as
+ * \param[in] nrs  the NRS head this policy belongs to.
+ * \param[in] name the human-readable policy name; should be the same as
  *                ptlrpc_nrs_pol_desc::pd_name
  *
  * \retval -ve error
  * \retval   0 success
  */
  *                ptlrpc_nrs_pol_desc::pd_name
  *
  * \retval -ve error
  * \retval   0 success
  */
-static int
-nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
+static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
 {
        struct ptlrpc_nrs_policy *policy = NULL;
        ENTRY;
 {
        struct ptlrpc_nrs_policy *policy = NULL;
        ENTRY;
@@ -770,16 +744,15 @@ nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
 /**
  * Register a policy from \policy descriptor \a desc with NRS head \a nrs.
  *
 /**
  * Register a policy from \policy descriptor \a desc with NRS head \a nrs.
  *
- * \param[in] nrs   The NRS head on which the policy will be registered.
- * \param[in] desc  The policy descriptor from which the information will be
+ * \param[in] nrs   the NRS head on which the policy will be registered.
+ * \param[in] desc  the policy descriptor from which the information will be
  *                 obtained to register the policy.
  *
  * \retval -ve error
  * \retval   0 success
  */
  *                 obtained to register the policy.
  *
  * \retval -ve error
  * \retval   0 success
  */
-static int
-nrs_policy_register(struct ptlrpc_nrs *nrs,
-                   struct ptlrpc_nrs_pol_desc *desc)
+static int nrs_policy_register(struct ptlrpc_nrs *nrs,
+                              struct ptlrpc_nrs_pol_desc *desc)
 {
        struct ptlrpc_nrs_policy       *policy;
        struct ptlrpc_nrs_policy       *tmp;
 {
        struct ptlrpc_nrs_policy       *policy;
        struct ptlrpc_nrs_policy       *tmp;
@@ -790,7 +763,7 @@ nrs_policy_register(struct ptlrpc_nrs *nrs,
        LASSERT(svcpt != NULL);
        LASSERT(desc->pd_ops != NULL);
        LASSERT(desc->pd_ops->op_res_get != NULL);
        LASSERT(svcpt != NULL);
        LASSERT(desc->pd_ops != NULL);
        LASSERT(desc->pd_ops->op_res_get != NULL);
-       LASSERT(desc->pd_ops->op_req_poll != NULL);
+       LASSERT(desc->pd_ops->op_req_get != NULL);
        LASSERT(desc->pd_ops->op_req_enqueue != NULL);
        LASSERT(desc->pd_ops->op_req_dequeue != NULL);
        LASSERT(desc->pd_compat != NULL);
        LASSERT(desc->pd_ops->op_req_enqueue != NULL);
        LASSERT(desc->pd_ops->op_req_dequeue != NULL);
        LASSERT(desc->pd_compat != NULL);
@@ -801,11 +774,9 @@ nrs_policy_register(struct ptlrpc_nrs *nrs,
                RETURN(-ENOMEM);
 
        policy->pol_nrs     = nrs;
                RETURN(-ENOMEM);
 
        policy->pol_nrs     = nrs;
-       policy->pol_name    = desc->pd_name;
-       policy->pol_ops     = desc->pd_ops;
-       policy->pol_state   = desc->pd_flags & PTLRPC_NRS_FL_REG_EXTERN ?
-                             NRS_POL_STATE_UNAVAIL : NRS_POL_STATE_STOPPED;
-       policy->pol_flags   = desc->pd_flags & ~PTLRPC_NRS_FL_REG_EXTERN;
+       policy->pol_desc    = desc;
+       policy->pol_state   = NRS_POL_STATE_STOPPED;
+       policy->pol_flags   = desc->pd_flags;
 
        CFS_INIT_LIST_HEAD(&policy->pol_list);
        CFS_INIT_LIST_HEAD(&policy->pol_list_queued);
 
        CFS_INIT_LIST_HEAD(&policy->pol_list);
        CFS_INIT_LIST_HEAD(&policy->pol_list_queued);
@@ -818,11 +789,11 @@ nrs_policy_register(struct ptlrpc_nrs *nrs,
 
        spin_lock(&nrs->nrs_lock);
 
 
        spin_lock(&nrs->nrs_lock);
 
-       tmp = nrs_policy_find_locked(nrs, policy->pol_name);
+       tmp = nrs_policy_find_locked(nrs, policy->pol_desc->pd_name);
        if (tmp != NULL) {
                CERROR("NRS policy %s has been registered, can't register it "
        if (tmp != NULL) {
                CERROR("NRS policy %s has been registered, can't register it "
-                      "for %s\n",
-                      policy->pol_name, svcpt->scp_service->srv_name);
+                      "for %s\n", policy->pol_desc->pd_name,
+                      svcpt->scp_service->srv_name);
                nrs_policy_put_locked(tmp);
 
                spin_unlock(&nrs->nrs_lock);
                nrs_policy_put_locked(tmp);
 
                spin_unlock(&nrs->nrs_lock);
@@ -841,7 +812,7 @@ nrs_policy_register(struct ptlrpc_nrs *nrs,
        spin_unlock(&nrs->nrs_lock);
 
        if (rc != 0)
        spin_unlock(&nrs->nrs_lock);
 
        if (rc != 0)
-               (void) nrs_policy_unregister(nrs, policy->pol_name);
+               (void) nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -850,10 +821,9 @@ nrs_policy_register(struct ptlrpc_nrs *nrs,
  * Enqueue request \a req using one of the policies its resources are referring
  * to.
  *
  * Enqueue request \a req using one of the policies its resources are referring
  * to.
  *
- * \param[in] req The request to enqueue.
+ * \param[in] req the request to enqueue.
  */
  */
-static void
-ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
+static void ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
 {
        struct ptlrpc_nrs_policy       *policy;
 
 {
        struct ptlrpc_nrs_policy       *policy;
 
@@ -868,7 +838,7 @@ ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
         * Add the policy to the NRS head's list of policies with enqueued
         * requests, if it has not been added there.
         */
         * Add the policy to the NRS head's list of policies with enqueued
         * requests, if it has not been added there.
         */
-       if (cfs_list_empty(&policy->pol_list_queued))
+       if (unlikely(cfs_list_empty(&policy->pol_list_queued)))
                cfs_list_add_tail(&policy->pol_list_queued,
                                  &policy->pol_nrs->nrs_policy_queued);
 }
                cfs_list_add_tail(&policy->pol_list_queued,
                                  &policy->pol_nrs->nrs_policy_queued);
 }
@@ -876,10 +846,9 @@ ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
 /**
  * Enqueue a request on the high priority NRS head.
  *
 /**
  * Enqueue a request on the high priority NRS head.
  *
- * \param req The request to enqueue.
+ * \param req the request to enqueue.
  */
  */
-static void
-ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req)
+static void ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req)
 {
        int     opc = lustre_msg_get_opc(req->rq_reqmsg);
        ENTRY;
 {
        int     opc = lustre_msg_get_opc(req->rq_reqmsg);
        ENTRY;
@@ -893,30 +862,18 @@ ptlrpc_nrs_hpreq_add_nolock(struct ptlrpc_request *req)
        EXIT;
 }
 
        EXIT;
 }
 
-/* ptlrpc/nrs_fifo.c */
-extern struct ptlrpc_nrs_pol_desc ptlrpc_nrs_fifo_desc;
-
-/**
- * Array of policies that ship alongside NRS core; i.e. ones that do not
- * register externally using ptlrpc_nrs_policy_register().
- */
-static struct ptlrpc_nrs_pol_desc *nrs_pols_builtin[] = {
-       &ptlrpc_nrs_fifo_desc,
-};
-
 /**
  * Returns a boolean predicate indicating whether the policy described by
  * \a desc is adequate for use with service \a svc.
  *
 /**
  * Returns a boolean predicate indicating whether the policy described by
  * \a desc is adequate for use with service \a svc.
  *
- * \param[in] nrs    The service
- * \param[in] desc  The policy descriptor
+ * \param[in] svc  the service
+ * \param[in] desc the policy descriptor
  *
  *
- * \retval false The policy is not compatible with the service partition
- * \retval true         The policy is compatible with the service partition
+ * \retval false the policy is not compatible with the service
+ * \retval true         the policy is compatible with the service
  */
  */
-static inline bool
-nrs_policy_compatible(struct ptlrpc_service *svc,
-                     const struct ptlrpc_nrs_pol_desc *desc)
+static inline bool nrs_policy_compatible(const struct ptlrpc_service *svc,
+                                        const struct ptlrpc_nrs_pol_desc *desc)
 {
        return desc->pd_compat(svc, desc);
 }
 {
        return desc->pd_compat(svc, desc);
 }
@@ -925,7 +882,7 @@ nrs_policy_compatible(struct ptlrpc_service *svc,
  * Registers all compatible policies in nrs_core.nrs_policies, for NRS head
  * \a nrs.
  *
  * Registers all compatible policies in nrs_core.nrs_policies, for NRS head
  * \a nrs.
  *
- * \param[in] nrs The NRS head
+ * \param[in] nrs the NRS head
  *
  * \retval -ve error
  * \retval   0 success
  *
  * \retval -ve error
  * \retval   0 success
@@ -934,11 +891,10 @@ nrs_policy_compatible(struct ptlrpc_service *svc,
  *
  * \see ptlrpc_service_nrs_setup()
  */
  *
  * \see ptlrpc_service_nrs_setup()
  */
-static int
-nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
+static int nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
 {
        struct ptlrpc_nrs_pol_desc *desc;
 {
        struct ptlrpc_nrs_pol_desc *desc;
-       /* For convenience */
+       /* for convenience */
        struct ptlrpc_service_part       *svcpt = nrs->nrs_svcpt;
        struct ptlrpc_service            *svc = svcpt->scp_service;
        int                               rc = -EINVAL;
        struct ptlrpc_service_part       *svcpt = nrs->nrs_svcpt;
        struct ptlrpc_service            *svc = svcpt->scp_service;
        int                               rc = -EINVAL;
@@ -970,14 +926,16 @@ nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
  * Initializes NRS head \a nrs of service partition \a svcpt, and registers all
  * compatible policies in NRS core, with the NRS head.
  *
  * Initializes NRS head \a nrs of service partition \a svcpt, and registers all
  * compatible policies in NRS core, with the NRS head.
  *
- * \param[in] nrs   The NRS head
- * \param[in] svcpt The PTLRPC service partition to setup
+ * \param[in] nrs   the NRS head
+ * \param[in] svcpt the PTLRPC service partition to setup
+ *
+ * \retval -ve error
+ * \retval   0 success
  *
  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
  */
  *
  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
  */
-static int
-nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
-                       struct ptlrpc_service_part *svcpt)
+static int nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
+                                  struct ptlrpc_service_part *svcpt)
 {
        int                             rc;
        enum ptlrpc_nrs_queue_type      queue;
 {
        int                             rc;
        enum ptlrpc_nrs_queue_type      queue;
@@ -994,12 +952,9 @@ nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
        nrs->nrs_svcpt = svcpt;
        nrs->nrs_queue_type = queue;
        spin_lock_init(&nrs->nrs_lock);
        nrs->nrs_svcpt = svcpt;
        nrs->nrs_queue_type = queue;
        spin_lock_init(&nrs->nrs_lock);
-       CFS_INIT_LIST_HEAD(&nrs->nrs_heads);
        CFS_INIT_LIST_HEAD(&nrs->nrs_policy_list);
        CFS_INIT_LIST_HEAD(&nrs->nrs_policy_queued);
 
        CFS_INIT_LIST_HEAD(&nrs->nrs_policy_list);
        CFS_INIT_LIST_HEAD(&nrs->nrs_policy_queued);
 
-       cfs_list_add_tail(&nrs->nrs_heads, &nrs_core.nrs_heads);
-
        rc = nrs_register_policies_locked(nrs);
 
        RETURN(rc);
        rc = nrs_register_policies_locked(nrs);
 
        RETURN(rc);
@@ -1010,12 +965,11 @@ nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
  * handles high-priority RPCs), and then registers all available compatible
  * policies on those NRS heads.
  *
  * handles high-priority RPCs), and then registers all available compatible
  * policies on those NRS heads.
  *
- * \param[n] svcpt The PTLRPC service partition to setup
+ * \param[in,out] svcpt the PTLRPC service partition to setup
  *
  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
  */
  *
  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
  */
-static int
-nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt)
+static int nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt)
 {
        struct ptlrpc_nrs              *nrs;
        int                             rc;
 {
        struct ptlrpc_nrs              *nrs;
        int                             rc;
@@ -1028,7 +982,7 @@ nrs_svcpt_setup_locked(struct ptlrpc_service_part *svcpt)
         */
        nrs = nrs_svcpt2nrs(svcpt, false);
        rc = nrs_svcpt_setup_locked0(nrs, svcpt);
         */
        nrs = nrs_svcpt2nrs(svcpt, false);
        rc = nrs_svcpt_setup_locked0(nrs, svcpt);
-       if (rc)
+       if (rc < 0)
                GOTO(out, rc);
 
        /**
                GOTO(out, rc);
 
        /**
@@ -1054,12 +1008,11 @@ out:
  * Unregisters all policies on all available NRS heads in a service partition;
  * called at PTLRPC service unregistration time.
  *
  * Unregisters all policies on all available NRS heads in a service partition;
  * called at PTLRPC service unregistration time.
  *
- * \param[in] svcpt The PTLRPC service partition
+ * \param[in] svcpt the PTLRPC service partition
  *
  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
  */
  *
  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
  */
-static void
-nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt)
+static void nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt)
 {
        struct ptlrpc_nrs              *nrs;
        struct ptlrpc_nrs_policy       *policy;
 {
        struct ptlrpc_nrs              *nrs;
        struct ptlrpc_nrs_policy       *policy;
@@ -1076,12 +1029,10 @@ again:
 
        cfs_list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list,
                                     pol_list) {
 
        cfs_list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list,
                                     pol_list) {
-               rc = nrs_policy_unregister(nrs, policy->pol_name);
+               rc = nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
                LASSERT(rc == 0);
        }
 
                LASSERT(rc == 0);
        }
 
-       cfs_list_del(&nrs->nrs_heads);
-
        /**
         * If the service partition has an HP NRS head, clean that up as well.
         */
        /**
         * If the service partition has an HP NRS head, clean that up as well.
         */
@@ -1097,211 +1048,197 @@ again:
 }
 
 /**
 }
 
 /**
- * Checks whether the policy in \a desc has been added to NRS core's list of
- * policies, \e nrs_core.nrs_policies.
+ * Returns the descriptor for a policy as identified by by \a name.
  *
  *
- * \param[in] desc The policy descriptor
+ * \param[in] name the policy name
  *
  *
- * \retval true The policy is present
- * \retval false The policy is not present
+ * \retval the policy descriptor
+ * \retval NULL
  */
  */
-static bool
-nrs_policy_exists_locked(const struct ptlrpc_nrs_pol_desc *desc)
+static struct ptlrpc_nrs_pol_desc *nrs_policy_find_desc_locked(const char *name)
 {
        struct ptlrpc_nrs_pol_desc     *tmp;
        ENTRY;
 
        cfs_list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) {
 {
        struct ptlrpc_nrs_pol_desc     *tmp;
        ENTRY;
 
        cfs_list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) {
-               if (strncmp(tmp->pd_name, desc->pd_name, NRS_POL_NAME_MAX) == 0)
-                       RETURN(true);
+               if (strncmp(tmp->pd_name, name, NRS_POL_NAME_MAX) == 0)
+                       RETURN(tmp);
        }
        }
-       RETURN(false);
+       RETURN(NULL);
 }
 
 /**
 }
 
 /**
- * Removes the policy from all supported NRS heads.
+ * Removes the policy from all supported NRS heads of all partitions of all
+ * PTLRPC services.
  *
  *
- * \param[in] desc The policy descriptor to unregister
+ * \param[in] desc the policy descriptor to unregister
  *
  * \retval -ve error
  * \retval  0  successfully unregistered policy on all supported NRS heads
  *
  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
  *
  * \retval -ve error
  * \retval  0  successfully unregistered policy on all supported NRS heads
  *
  * \pre mutex_is_locked(&nrs_core.nrs_mutex)
+ * \pre mutex_is_locked(&ptlrpc_all_services_mutex)
  */
  */
-static int
-nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc)
+static int nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc)
 {
 {
-       struct ptlrpc_nrs      *nrs;
-       int                     rc = 0;
+       struct ptlrpc_nrs              *nrs;
+       struct ptlrpc_service          *svc;
+       struct ptlrpc_service_part     *svcpt;
+       int                             i;
+       int                             rc = 0;
        ENTRY;
 
        LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
        ENTRY;
 
        LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
+       LASSERT(mutex_is_locked(&ptlrpc_all_services_mutex));
 
 
-       cfs_list_for_each_entry(nrs, &nrs_core.nrs_heads, nrs_heads) {
-               if (!nrs_policy_compatible(nrs->nrs_svcpt->scp_service, desc)) {
-                       /**
-                        * The policy may only have registered on compatible
-                        * NRS heads.
-                        */
-                       continue;
-               }
+       cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
 
 
-               rc = nrs_policy_unregister(nrs, desc->pd_name);
+               if (!nrs_policy_compatible(svc, desc) ||
+                   unlikely(svc->srv_is_stopping))
+                       continue;
 
 
-               /**
-                * Ignore -ENOENT as the policy may not have registered
-                * successfully on all service partitions.
-                */
-               if (rc == -ENOENT) {
-                       rc = 0;
-               } else if (rc != 0) {
-                       CERROR("Failed to unregister NRS policy %s for "
-                              "partition %d of service %s: %d\n",
-                              desc->pd_name, nrs->nrs_svcpt->scp_cpt,
-                              nrs->nrs_svcpt->scp_service->srv_name, rc);
-                       break;
-               }
-       }
-       RETURN(rc);
-}
+               ptlrpc_service_for_each_part(svcpt, i, svc) {
+                       bool hp = false;
 
 
-/**
- * Transitions a policy from ptlrpc_nrs_pol_state::NRS_POL_STATE_UNAVAIL to
- * ptlrpc_nrs_pol_state::STOPPED; is used to prevent policies that are
- * registering externally using ptlrpc_nrs_policy_register from starting
- * before they have successfully registered on all compatible service
- * partitions.
- *
- * \param[in] nrs  The NRS head that the policy belongs to
- * \param[in] name The human-readable policy name
- */
-static void
-nrs_pol_make_available0(struct ptlrpc_nrs *nrs, char *name)
-{
-       struct ptlrpc_nrs_policy *pol;
+again:
+                       nrs = nrs_svcpt2nrs(svcpt, hp);
+                       rc = nrs_policy_unregister(nrs, desc->pd_name);
+                       /**
+                        * Ignore -ENOENT as the policy may not have registered
+                        * successfully on all service partitions.
+                        */
+                       if (rc == -ENOENT) {
+                               rc = 0;
+                       } else if (rc != 0) {
+                               CERROR("Failed to unregister NRS policy %s for "
+                                      "partition %d of service %s: %d\n",
+                                      desc->pd_name, svcpt->scp_cpt,
+                                      svcpt->scp_service->srv_name, rc);
+                               RETURN(rc);
+                       }
 
 
-       LASSERT(nrs);
-       LASSERT(name);
+                       if (!hp && nrs_svc_has_hp(svc)) {
+                               hp = true;
+                               goto again;
+                       }
+               }
 
 
-       spin_lock(&nrs->nrs_lock);
-       pol = nrs_policy_find_locked(nrs, name);
-       if (pol) {
-               LASSERT(pol->pol_state == NRS_POL_STATE_UNAVAIL);
-               pol->pol_state = NRS_POL_STATE_STOPPED;
-               nrs_policy_put_locked(pol);
+               if (desc->pd_ops->op_lprocfs_fini != NULL)
+                       desc->pd_ops->op_lprocfs_fini(svc);
        }
        }
-       spin_unlock(&nrs->nrs_lock);
-}
 
 
-/**
- * Make the policy available on all compatible service partitions of all PTLRPC
- * services.
- *
- * \param[in] desc The descriptor for the policy that is to be made available
- *
- * \pre mutex_is_locked(&nrs_core.nrs_mutex)
- *
- * \see nrs_pol_make_available0()
- */
-static void
-nrs_pol_make_available_locked(struct ptlrpc_nrs_pol_desc *desc)
-{
-       struct ptlrpc_nrs        *nrs;
-       ENTRY;
-
-       LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
-
-        /**
-         * Cycle through all registered instances of the policy and place them
-         * at the STOPPED state.
-         */
-       cfs_list_for_each_entry(nrs, &nrs_core.nrs_heads, nrs_heads) {
-               if (!nrs_policy_compatible(nrs->nrs_svcpt->scp_service, desc))
-                       continue;
-               nrs_pol_make_available0(nrs, desc->pd_name);
-       }
-       EXIT;
+       RETURN(rc);
 }
 
 /**
  * Registers a new policy with NRS core.
  *
 }
 
 /**
  * Registers a new policy with NRS core.
  *
- * Used for policies that register externally with NRS core, i.e. ones that are
- * not part of \e nrs_pols_builtin[]. The function will only succeed if policy
- * registration with all compatible service partitions is successful.
+ * The function will only succeed if policy registration with all compatible
+ * service partitions (if any) is successful.
+ *
+ * N.B. This function should be called either at ptlrpc module initialization
+ *     time when registering a policy that ships with NRS core, or in a
+ *     module's init() function for policies registering from other modules.
  *
  *
- * \param[in] desc The policy descriptor to register
+ * \param[in] conf configuration information for the new policy to register
  *
  * \retval -ve error
  * \retval   0 success
  */
  *
  * \retval -ve error
  * \retval   0 success
  */
-int
-ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_desc *desc)
+int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
 {
 {
-       struct ptlrpc_nrs              *nrs;
         struct ptlrpc_service         *svc;
         struct ptlrpc_service         *svc;
-       struct ptlrpc_service_part     *svcpt;
-       int                             i;
-       int                             rc;
-       int                             rc2;
+       struct ptlrpc_nrs_pol_desc     *desc;
+       int                             rc = 0;
        ENTRY;
 
        ENTRY;
 
-       LASSERT(desc != NULL);
+       LASSERT(conf != NULL);
+       LASSERT(conf->nc_ops != NULL);
+       LASSERT(conf->nc_compat != NULL);
+       LASSERT(ergo(conf->nc_compat == nrs_policy_compat_one,
+               conf->nc_compat_svc_name != NULL));
+       LASSERT(ergo((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0,
+                    conf->nc_owner != NULL));
 
 
-       desc->pd_name[NRS_POL_NAME_MAX - 1] = '\0';
+       conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
 
 
-       if (desc->pd_flags & (PTLRPC_NRS_FL_FALLBACK |
-                             PTLRPC_NRS_FL_REG_START)) {
-               CERROR("Failing to register NRS policy %s; re-check policy "
-                      "flags, externally-registered policies cannot act as "
-                      "fallback policies or be started immediately without "
-                      "interaction with lprocfs.\n", desc->pd_name);
+       /**
+        * External policies are not allowed to start immediately upon
+        * registration, as there is a relatively higher chance that their
+        * registration might fail. In such a case, some policy instances may
+        * already have requests queued wen unregistration needs to happen as
+        * part o cleanup; since there is currently no way to drain requests
+        * from a policy unless the service is unregistering, we just disallow
+        * this.
+        */
+       if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) &&
+           (conf->nc_flags & (PTLRPC_NRS_FL_FALLBACK |
+                              PTLRPC_NRS_FL_REG_START))) {
+               CERROR("NRS: failing to register policy %s. Please check "
+                      "policy flags; external policies cannot act as fallback "
+                      "policies, or be started immediately upon registration "
+                      "without interaction with lprocfs\n", conf->nc_name);
                RETURN(-EINVAL);
        }
 
                RETURN(-EINVAL);
        }
 
-       desc->pd_flags |= PTLRPC_NRS_FL_REG_EXTERN;
-
        mutex_lock(&nrs_core.nrs_mutex);
 
        mutex_lock(&nrs_core.nrs_mutex);
 
-       rc = nrs_policy_exists_locked(desc);
-       if (rc) {
-               CERROR("Failing to register NRS policy %s which has "
-                      "already been registered with NRS core!\n",
-                      desc->pd_name);
+       if (nrs_policy_find_desc_locked(conf->nc_name) != NULL) {
+               CERROR("NRS: failing to register policy %s which has already "
+                      "been registered with NRS core!\n",
+                      conf->nc_name);
                GOTO(fail, rc = -EEXIST);
        }
 
                GOTO(fail, rc = -EEXIST);
        }
 
+       OBD_ALLOC_PTR(desc);
+       if (desc == NULL)
+               GOTO(fail, rc = -ENOMEM);
+
+       strncpy(desc->pd_name, conf->nc_name, NRS_POL_NAME_MAX);
+       desc->pd_ops             = conf->nc_ops;
+       desc->pd_compat          = conf->nc_compat;
+       desc->pd_compat_svc_name = conf->nc_compat_svc_name;
+       if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0)
+               desc->pd_owner   = conf->nc_owner;
+       desc->pd_flags           = conf->nc_flags;
+       cfs_atomic_set(&desc->pd_refs, 0);
+
+       /**
+        * For policies that are held in the same module as NRS (currently
+        * ptlrpc), do not register the policy with all compatible services,
+        * as the services will not have started at this point, since we are
+        * calling from ptlrpc module initialization code. In such cases each
+        * service will register all compatible policies later, via
+        * ptlrpc_service_nrs_setup().
+        */
+       if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) == 0)
+               goto internal;
+
        /**
         * Register the new policy on all compatible services
         */
        mutex_lock(&ptlrpc_all_services_mutex);
 
        cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
        /**
         * Register the new policy on all compatible services
         */
        mutex_lock(&ptlrpc_all_services_mutex);
 
        cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
+               struct ptlrpc_service_part     *svcpt;
+               int                             i;
+               int                             rc2;
 
 
-               if (unlikely(svc->srv_is_stopping)) {
-                       mutex_unlock(&ptlrpc_all_services_mutex);
-                       GOTO(fail, rc = -ESRCH);
-               }
-
-               if (!nrs_policy_compatible(svc, desc)) {
-                       /**
-                        * Attempt to register the policy if it is
-                        * compatible, otherwise try the next service.
-                        */
+               if (!nrs_policy_compatible(svc, desc) ||
+                   unlikely(svc->srv_is_stopping))
                        continue;
                        continue;
-               }
-               ptlrpc_service_for_each_part(svcpt, i, svc) {
-                       bool hp = false;
 
 
+               ptlrpc_service_for_each_part(svcpt, i, svc) {
+                       struct ptlrpc_nrs      *nrs;
+                       bool                    hp = false;
 again:
                        nrs = nrs_svcpt2nrs(svcpt, hp);
                        rc = nrs_policy_register(nrs, desc);
                        if (rc != 0) {
                                CERROR("Failed to register NRS policy %s for "
                                       "partition %d of service %s: %d\n",
 again:
                        nrs = nrs_svcpt2nrs(svcpt, hp);
                        rc = nrs_policy_register(nrs, desc);
                        if (rc != 0) {
                                CERROR("Failed to register NRS policy %s for "
                                       "partition %d of service %s: %d\n",
-                                      desc->pd_name, nrs->nrs_svcpt->scp_cpt,
-                                      nrs->nrs_svcpt->scp_service->srv_name,
-                                      rc);
+                                      desc->pd_name, svcpt->scp_cpt,
+                                      svcpt->scp_service->srv_name, rc);
 
                                rc2 = nrs_policy_unregister_locked(desc);
                                /**
 
                                rc2 = nrs_policy_unregister_locked(desc);
                                /**
@@ -1309,6 +1246,7 @@ again:
                                 */
                                LASSERT(rc2 == 0);
                                mutex_unlock(&ptlrpc_all_services_mutex);
                                 */
                                LASSERT(rc2 == 0);
                                mutex_unlock(&ptlrpc_all_services_mutex);
+                               OBD_FREE_PTR(desc);
                                GOTO(fail, rc);
                        }
 
                                GOTO(fail, rc);
                        }
 
@@ -1317,6 +1255,11 @@ again:
                                goto again;
                        }
                }
                                goto again;
                        }
                }
+
+               /**
+                * No need to take a reference to other modules here, as we
+                * will be calling from the module's init() function.
+                */
                if (desc->pd_ops->op_lprocfs_init != NULL) {
                        rc = desc->pd_ops->op_lprocfs_init(svc);
                        if (rc != 0) {
                if (desc->pd_ops->op_lprocfs_init != NULL) {
                        rc = desc->pd_ops->op_lprocfs_init(svc);
                        if (rc != 0) {
@@ -1326,19 +1269,14 @@ again:
                                 */
                                LASSERT(rc2 == 0);
                                mutex_unlock(&ptlrpc_all_services_mutex);
                                 */
                                LASSERT(rc2 == 0);
                                mutex_unlock(&ptlrpc_all_services_mutex);
+                               OBD_FREE_PTR(desc);
                                GOTO(fail, rc);
                        }
                }
        }
 
        mutex_unlock(&ptlrpc_all_services_mutex);
                                GOTO(fail, rc);
                        }
                }
        }
 
        mutex_unlock(&ptlrpc_all_services_mutex);
-
-       /**
-        * The policy has successfully registered with all service partitions,
-        * so mark the policy instances at the NRS heads as available.
-        */
-       nrs_pol_make_available_locked(desc);
-
+internal:
        cfs_list_add_tail(&desc->pd_list, &nrs_core.nrs_policies);
 fail:
        mutex_unlock(&nrs_core.nrs_mutex);
        cfs_list_add_tail(&desc->pd_list, &nrs_core.nrs_policies);
 fail:
        mutex_unlock(&nrs_core.nrs_mutex);
@@ -1351,67 +1289,63 @@ EXPORT_SYMBOL(ptlrpc_nrs_policy_register);
  * Unregisters a previously registered policy with NRS core. All instances of
  * the policy on all NRS heads of all supported services are removed.
  *
  * Unregisters a previously registered policy with NRS core. All instances of
  * the policy on all NRS heads of all supported services are removed.
  *
- * \param[in] desc The descriptor of the policy to unregister
+ * N.B. This function should only be called from a module's exit() function.
+ *     Although it can be used for policies that ship alongside NRS core, the
+ *     function is primarily intended for policies that register externally,
+ *     from other modules.
+ *
+ * \param[in] conf configuration information for the policy to unregister
  *
  * \retval -ve error
  * \retval   0 success
  */
  *
  * \retval -ve error
  * \retval   0 success
  */
-int
-ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_desc *desc)
+int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_conf *conf)
 {
 {
-       int                    rc;
-       struct ptlrpc_service *svc;
+       struct ptlrpc_nrs_pol_desc      *desc;
+       int                              rc;
        ENTRY;
 
        ENTRY;
 
-       LASSERT(desc != NULL);
+       LASSERT(conf != NULL);
 
 
-       if (desc->pd_flags & PTLRPC_NRS_FL_FALLBACK) {
+       if (conf->nc_flags & PTLRPC_NRS_FL_FALLBACK) {
                CERROR("Unable to unregister a fallback policy, unless the "
                       "PTLRPC service is stopping.\n");
                RETURN(-EPERM);
        }
 
                CERROR("Unable to unregister a fallback policy, unless the "
                       "PTLRPC service is stopping.\n");
                RETURN(-EPERM);
        }
 
-       desc->pd_name[NRS_POL_NAME_MAX - 1] = '\0';
+       conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
 
        mutex_lock(&nrs_core.nrs_mutex);
 
 
        mutex_lock(&nrs_core.nrs_mutex);
 
-       rc = nrs_policy_exists_locked(desc);
-       if (!rc) {
+       desc = nrs_policy_find_desc_locked(conf->nc_name);
+       if (desc == NULL) {
                CERROR("Failing to unregister NRS policy %s which has "
                       "not been registered with NRS core!\n",
                CERROR("Failing to unregister NRS policy %s which has "
                       "not been registered with NRS core!\n",
-                      desc->pd_name);
-               GOTO(fail, rc = -ENOENT);
+                      conf->nc_name);
+               GOTO(not_exist, rc = -ENOENT);
        }
 
        }
 
+       mutex_lock(&ptlrpc_all_services_mutex);
+
        rc = nrs_policy_unregister_locked(desc);
        rc = nrs_policy_unregister_locked(desc);
-       if (rc == -EBUSY) {
-               CERROR("Please first stop policy %s on all service partitions "
-                      "and then retry to unregister the policy.\n",
-                      desc->pd_name);
+       if (rc < 0) {
+               if (rc == -EBUSY)
+                       CERROR("Please first stop policy %s on all service "
+                              "partitions and then retry to unregister the "
+                              "policy.\n", conf->nc_name);
                GOTO(fail, rc);
        }
                GOTO(fail, rc);
        }
+
        CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n",
        CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n",
-              desc->pd_name);
+              conf->nc_name);
 
        cfs_list_del(&desc->pd_list);
 
        cfs_list_del(&desc->pd_list);
+       OBD_FREE_PTR(desc);
 
 
-       /**
-        * Unregister the policy's lprocfs interface from all compatible
-        * services.
-        */
-       mutex_lock(&ptlrpc_all_services_mutex);
-
-       cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
-               if (!nrs_policy_compatible(svc, desc))
-                       continue;
-
-               if (desc->pd_ops->op_lprocfs_fini != NULL)
-                       desc->pd_ops->op_lprocfs_fini(svc);
-       }
-
+fail:
        mutex_unlock(&ptlrpc_all_services_mutex);
 
        mutex_unlock(&ptlrpc_all_services_mutex);
 
-fail:
+not_exist:
        mutex_unlock(&nrs_core.nrs_mutex);
 
        RETURN(rc);
        mutex_unlock(&nrs_core.nrs_mutex);
 
        RETURN(rc);
@@ -1422,7 +1356,8 @@ EXPORT_SYMBOL(ptlrpc_nrs_policy_unregister);
  * Setup NRS heads on all service partitions of service \a svc, and register
  * all compatible policies on those NRS heads.
  *
  * Setup NRS heads on all service partitions of service \a svc, and register
  * all compatible policies on those NRS heads.
  *
- * \param[in] svc  The service to setup
+ * To be called from withing ptl
+ * \param[in] svc the service to setup
  *
  * \retval -ve error, the calling logic should eventually call
  *                   ptlrpc_service_nrs_cleanup() to undo any work performed
  *
  * \retval -ve error, the calling logic should eventually call
  *                   ptlrpc_service_nrs_cleanup() to undo any work performed
@@ -1431,8 +1366,7 @@ EXPORT_SYMBOL(ptlrpc_nrs_policy_unregister);
  * \see ptlrpc_register_service()
  * \see ptlrpc_service_nrs_cleanup()
  */
  * \see ptlrpc_register_service()
  * \see ptlrpc_service_nrs_cleanup()
  */
-int
-ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
+int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
 {
        struct ptlrpc_service_part             *svcpt;
        const struct ptlrpc_nrs_pol_desc       *desc;
 {
        struct ptlrpc_service_part             *svcpt;
        const struct ptlrpc_nrs_pol_desc       *desc;
@@ -1450,7 +1384,7 @@ ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
                        GOTO(failed, rc);
        }
 
                        GOTO(failed, rc);
        }
 
-       /*
+       /**
         * Set up lprocfs interfaces for all supported policies for the
         * service.
         */
         * Set up lprocfs interfaces for all supported policies for the
         * service.
         */
@@ -1475,10 +1409,9 @@ failed:
 /**
  * Unregisters all policies on all service partitions of service \a svc.
  *
 /**
  * Unregisters all policies on all service partitions of service \a svc.
  *
- * \param[in] svc The PTLRPC service to unregister
+ * \param[in] svc the PTLRPC service to unregister
  */
  */
-void
-ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
+void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
 {
        struct ptlrpc_service_part           *svcpt;
        const struct ptlrpc_nrs_pol_desc     *desc;
 {
        struct ptlrpc_service_part           *svcpt;
        const struct ptlrpc_nrs_pol_desc     *desc;
@@ -1514,13 +1447,12 @@ ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
  * taken on the regular head can later be swapped for HP head resources by
  * ldlm_lock_reorder_req().
  *
  * taken on the regular head can later be swapped for HP head resources by
  * ldlm_lock_reorder_req().
  *
- * \param[in] svcpt The service partition
- * \param[in] req   The request
- * \param[in] hp    Which NRS head of \a svcpt to use
+ * \param[in] svcpt the service partition
+ * \param[in] req   the request
+ * \param[in] hp    which NRS head of \a svcpt to use
  */
  */
-void
-ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
-                         struct ptlrpc_request *req, bool hp)
+void ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
+                              struct ptlrpc_request *req, bool hp)
 {
        struct ptlrpc_nrs       *nrs = nrs_svcpt2nrs(svcpt, hp);
 
 {
        struct ptlrpc_nrs       *nrs = nrs_svcpt2nrs(svcpt, hp);
 
@@ -1539,12 +1471,11 @@ ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
  * Releases resources for a request; is called after the request has been
  * handled.
  *
  * Releases resources for a request; is called after the request has been
  * handled.
  *
- * \param[in] req The request
+ * \param[in] req the request
  *
  * \see ptlrpc_server_finish_request()
  */
  *
  * \see ptlrpc_server_finish_request()
  */
-void
-ptlrpc_nrs_req_finalize(struct ptlrpc_request *req)
+void ptlrpc_nrs_req_finalize(struct ptlrpc_request *req)
 {
        if (req->rq_nrq.nr_initialized) {
                nrs_resource_put_safe(req->rq_nrq.nr_res_ptrs);
 {
        if (req->rq_nrq.nr_initialized) {
                nrs_resource_put_safe(req->rq_nrq.nr_res_ptrs);
@@ -1554,15 +1485,7 @@ ptlrpc_nrs_req_finalize(struct ptlrpc_request *req)
        }
 }
 
        }
 }
 
-void
-ptlrpc_nrs_req_start_nolock(struct ptlrpc_request *req)
-{
-       req->rq_nrq.nr_started = 1;
-       nrs_request_start(&req->rq_nrq);
-}
-
-void
-ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req)
+void ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req)
 {
        if (req->rq_nrq.nr_started)
                nrs_request_stop(&req->rq_nrq);
 {
        if (req->rq_nrq.nr_started)
                nrs_request_stop(&req->rq_nrq);
@@ -1572,14 +1495,13 @@ ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req)
  * Enqueues request \a req on either the regular or high-priority NRS head
  * of service partition \a svcpt.
  *
  * Enqueues request \a req on either the regular or high-priority NRS head
  * of service partition \a svcpt.
  *
- * \param[in] svcpt The service partition
- * \param[in] req   The request to be enqueued
- * \param[in] hp    Whether to enqueue the request on the regular or
+ * \param[in] svcpt the service partition
+ * \param[in] req   the request to be enqueued
+ * \param[in] hp    whether to enqueue the request on the regular or
  *                 high-priority NRS head.
  */
  *                 high-priority NRS head.
  */
-void
-ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
-                  struct ptlrpc_request *req, bool hp)
+void ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
+                       struct ptlrpc_request *req, bool hp)
 {
        spin_lock(&svcpt->scp_req_lock);
 
 {
        spin_lock(&svcpt->scp_req_lock);
 
@@ -1591,78 +1513,97 @@ ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
        spin_unlock(&svcpt->scp_req_lock);
 }
 
        spin_unlock(&svcpt->scp_req_lock);
 }
 
+static void nrs_request_removed(struct ptlrpc_nrs_policy *policy)
+{
+       LASSERT(policy->pol_nrs->nrs_req_queued > 0);
+       LASSERT(policy->pol_req_queued > 0);
+
+       policy->pol_nrs->nrs_req_queued--;
+       policy->pol_req_queued--;
+
+       /**
+        * If the policy has no more requests queued, remove it from
+        * ptlrpc_nrs::nrs_policy_queued.
+        */
+       if (unlikely(policy->pol_req_queued == 0)) {
+               cfs_list_del_init(&policy->pol_list_queued);
+
+               /**
+                * If there are other policies with queued requests, move the
+                * current policy to the end so that we can round robin over
+                * all policies and drain the requests.
+                */
+       } else if (policy->pol_req_queued != policy->pol_nrs->nrs_req_queued) {
+               LASSERT(policy->pol_req_queued <
+                       policy->pol_nrs->nrs_req_queued);
+
+               cfs_list_move_tail(&policy->pol_list_queued,
+                                  &policy->pol_nrs->nrs_policy_queued);
+       }
+}
+
 /**
  * Obtains a request for handling from an NRS head of service partition
  * \a svcpt.
  *
 /**
  * Obtains a request for handling from an NRS head of service partition
  * \a svcpt.
  *
- * \param[in] svcpt The service partition
- * \param[in] hp    Whether to obtain a request from the regular or
+ * \param[in] svcpt the service partition
+ * \param[in] hp    whether to obtain a request from the regular or
  *                 high-priority NRS head.
  *                 high-priority NRS head.
- *
- * \retval the request to be handled
- * \retval NULL on failure
+ * \param[in] peek  when set, signifies that we just want to examine the
+ *                 request, and not handle it, so the request is not removed
+ *                 from the policy.
+ * \param[in] force when set, it will force a policy to return a request if it
+ *                 has one pending
+ *
+ * \retval the request to be handled
+ * \retval NULL the head has no requests to serve
  */
 struct ptlrpc_request *
  */
 struct ptlrpc_request *
-ptlrpc_nrs_req_poll_nolock(struct ptlrpc_service_part *svcpt, bool hp)
+ptlrpc_nrs_req_get_nolock0(struct ptlrpc_service_part *svcpt, bool hp,
+                          bool peek, bool force)
 {
        struct ptlrpc_nrs         *nrs = nrs_svcpt2nrs(svcpt, hp);
        struct ptlrpc_nrs_policy  *policy;
        struct ptlrpc_nrs_request *nrq;
 
 {
        struct ptlrpc_nrs         *nrs = nrs_svcpt2nrs(svcpt, hp);
        struct ptlrpc_nrs_policy  *policy;
        struct ptlrpc_nrs_request *nrq;
 
-       if (unlikely(nrs->nrs_req_queued == 0))
-               return NULL;
-
        /**
         * Always try to drain requests from all NRS polices even if they are
         * inactive, because the user can change policy status at runtime.
         */
        /**
         * Always try to drain requests from all NRS polices even if they are
         * inactive, because the user can change policy status at runtime.
         */
-       cfs_list_for_each_entry(policy, &(nrs)->nrs_policy_queued,
+       cfs_list_for_each_entry(policy, &nrs->nrs_policy_queued,
                                pol_list_queued) {
                                pol_list_queued) {
-               nrq = nrs_request_poll(policy);
-               if (likely(nrq != NULL))
+               nrq = nrs_request_get(policy, peek, force);
+               if (nrq != NULL) {
+                       if (likely(!peek)) {
+                               nrq->nr_started = 1;
+
+                               policy->pol_req_started++;
+                               policy->pol_nrs->nrs_req_started++;
+
+                               nrs_request_removed(policy);
+                       }
+
                        return container_of(nrq, struct ptlrpc_request, rq_nrq);
                        return container_of(nrq, struct ptlrpc_request, rq_nrq);
+               }
        }
 
        return NULL;
 }
 
 /**
        }
 
        return NULL;
 }
 
 /**
- * Dequeues a request that was previously obtained via ptlrpc_nrs_req_poll() and
- * is about to be handled.
+ * Dequeues request \a req from the policy it has been enqueued on.
  *
  *
- * \param[in] req The request
+ * \param[in] req the request
  */
  */
-void
-ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req)
+void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req)
 {
 {
-       struct ptlrpc_nrs_policy  *policy;
+       struct ptlrpc_nrs_policy *policy = nrs_request_policy(&req->rq_nrq);
 
 
-       LASSERT(req->rq_nrq.nr_enqueued);
-       LASSERT(!req->rq_nrq.nr_dequeued);
+       policy->pol_desc->pd_ops->op_req_dequeue(policy, &req->rq_nrq);
 
 
-       policy = nrs_request_policy(&req->rq_nrq);
-       nrs_request_dequeue(&req->rq_nrq);
-       req->rq_nrq.nr_dequeued = 1;
+       req->rq_nrq.nr_enqueued = 0;
 
 
-       /**
-        * If the policy has no more requests queued, remove it from
-        * ptlrpc_nrs::nrs_policy_queued.
-        */
-       if (policy->pol_req_queued == 0) {
-               cfs_list_del_init(&policy->pol_list_queued);
-
-               /**
-                * If there are other policies with queued requests, move the
-                * current policy to the end so that we can round robin over
-                * all policies and drain the requests.
-                */
-       } else if (policy->pol_req_queued != policy->pol_nrs->nrs_req_queued) {
-               LASSERT(policy->pol_req_queued <
-                       policy->pol_nrs->nrs_req_queued);
-
-               cfs_list_move_tail(&policy->pol_list_queued,
-                                  &policy->pol_nrs->nrs_policy_queued);
-       }
+       nrs_request_removed(policy);
 }
 
 /**
 }
 
 /**
@@ -1671,15 +1612,14 @@ ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req)
  * be called while holding ptlrpc_service_part::scp_req_lock to get a reliable
  * result.
  *
  * be called while holding ptlrpc_service_part::scp_req_lock to get a reliable
  * result.
  *
- * \param[in] svcpt The service partition to enquire.
- * \param[in] hp    Whether the regular or high-priority NRS head is to be
+ * \param[in] svcpt the service partition to enquire.
+ * \param[in] hp    whether the regular or high-priority NRS head is to be
  *                 enquired.
  *
  *                 enquired.
  *
- * \retval false The indicated NRS head has no enqueued requests.
- * \retval true         The indicated NRS head has some enqueued requests.
+ * \retval false the indicated NRS head has no enqueued requests.
+ * \retval true         the indicated NRS head has some enqueued requests.
  */
  */
-bool
-ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
+bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
 {
        struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
 
 {
        struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
 
@@ -1689,13 +1629,11 @@ ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
 /**
  * Moves request \a req from the regular to the high-priority NRS head.
  *
 /**
  * Moves request \a req from the regular to the high-priority NRS head.
  *
- * \param[in] req The request to move
+ * \param[in] req the request to move
  */
  */
-void
-ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req)
+void ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req)
 {
        struct ptlrpc_service_part      *svcpt = req->rq_rqbd->rqbd_svcpt;
 {
        struct ptlrpc_service_part      *svcpt = req->rq_rqbd->rqbd_svcpt;
-       struct ptlrpc_nrs               *nrs = nrs_svcpt2nrs(svcpt, true);
        struct ptlrpc_nrs_request       *nrq = &req->rq_nrq;
        struct ptlrpc_nrs_resource      *res1[NRS_RES_MAX];
        struct ptlrpc_nrs_resource      *res2[NRS_RES_MAX];
        struct ptlrpc_nrs_request       *nrq = &req->rq_nrq;
        struct ptlrpc_nrs_resource      *res1[NRS_RES_MAX];
        struct ptlrpc_nrs_resource      *res2[NRS_RES_MAX];
@@ -1703,10 +1641,8 @@ ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req)
 
        /**
         * Obtain the high-priority NRS head resources.
 
        /**
         * Obtain the high-priority NRS head resources.
-        * XXX: Maybe want to remove nrs_resource[get|put]_safe() dance
-        * when request cannot actually move; move this further down?
         */
         */
-       nrs_resource_get_safe(nrs, nrq, res1, true);
+       nrs_resource_get_safe(nrs_svcpt2nrs(svcpt, true), nrq, res1, true);
 
        spin_lock(&svcpt->scp_req_lock);
 
 
        spin_lock(&svcpt->scp_req_lock);
 
@@ -1714,7 +1650,6 @@ ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req)
                goto out;
 
        ptlrpc_nrs_req_del_nolock(req);
                goto out;
 
        ptlrpc_nrs_req_del_nolock(req);
-       nrq->nr_enqueued = nrq->nr_dequeued = 0;
 
        memcpy(res2, nrq->nr_res_ptrs, NRS_RES_MAX * sizeof(res2[0]));
        memcpy(nrq->nr_res_ptrs, res1, NRS_RES_MAX * sizeof(res1[0]));
 
        memcpy(res2, nrq->nr_res_ptrs, NRS_RES_MAX * sizeof(res2[0]));
        memcpy(nrq->nr_res_ptrs, res1, NRS_RES_MAX * sizeof(res1[0]));
@@ -1740,13 +1675,13 @@ out:
  * human-readable \a name, on either all partitions, or only on the first
  * partition of service \a svc.
  *
  * human-readable \a name, on either all partitions, or only on the first
  * partition of service \a svc.
  *
- * \param[in]    svc    The service the policy belongs to.
- * \param[in]    queue  Whether to carry out the command on the policy which
+ * \param[in]    svc    the service the policy belongs to.
+ * \param[in]    queue  whether to carry out the command on the policy which
  *                      belongs to the regular, high-priority, or both NRS
  *                      heads of service partitions of \a svc.
  *                      belongs to the regular, high-priority, or both NRS
  *                      heads of service partitions of \a svc.
- * \param[in]    name   The policy to act upon, by human-readable name
- * \param[in]    opc    The opcode of the operation to carry out
- * \param[in]    single When set, the operation will only be carried out on the
+ * \param[in]    name   the policy to act upon, by human-readable name
+ * \param[in]    opc    the opcode of the operation to carry out
+ * \param[in]    single when set, the operation will only be carried out on the
  *                      NRS heads of the first service partition of \a svc.
  *                      This is useful for some policies which e.g. share
  *                      identical values on the same parameters of different
  *                      NRS heads of the first service partition of \a svc.
  *                      This is useful for some policies which e.g. share
  *                      identical values on the same parameters of different
@@ -1755,41 +1690,36 @@ out:
  *                      print out the values from the first service partition.
  *                      Storing these values centrally elsewhere then could be
  *                      another solution for this.
  *                      print out the values from the first service partition.
  *                      Storing these values centrally elsewhere then could be
  *                      another solution for this.
- * \param[in,out] arg   Can be used as a generic in/out buffer between control
+ * \param[in,out] arg   can be used as a generic in/out buffer between control
  *                      operations and the user environment.
  *
  *\retval -ve error condition
  *\retval   0 operation was carried out successfully
  */
  *                      operations and the user environment.
  *
  *\retval -ve error condition
  *\retval   0 operation was carried out successfully
  */
-int
-ptlrpc_nrs_policy_control(struct ptlrpc_service *svc,
-                         enum ptlrpc_nrs_queue_type queue, char *name,
-                         enum ptlrpc_nrs_ctl opc, bool single, void *arg)
+int ptlrpc_nrs_policy_control(const struct ptlrpc_service *svc,
+                             enum ptlrpc_nrs_queue_type queue, char *name,
+                             enum ptlrpc_nrs_ctl opc, bool single, void *arg)
 {
        struct ptlrpc_service_part     *svcpt;
        int                             i;
        int                             rc = 0;
        ENTRY;
 
 {
        struct ptlrpc_service_part     *svcpt;
        int                             i;
        int                             rc = 0;
        ENTRY;
 
-       ptlrpc_service_for_each_part(svcpt, i, svc) {
-               switch (queue) {
-               default:
-                       return -EINVAL;
+       LASSERT(opc != PTLRPC_NRS_CTL_INVALID);
+
+       if ((queue & PTLRPC_NRS_QUEUE_BOTH) == 0)
+               return -EINVAL;
 
 
-               case PTLRPC_NRS_QUEUE_BOTH:
-               case PTLRPC_NRS_QUEUE_REG:
+       ptlrpc_service_for_each_part(svcpt, i, svc) {
+               if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
                        rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, false), name,
                                            opc, arg);
                        if (rc != 0 || (queue == PTLRPC_NRS_QUEUE_REG &&
                                        single))
                                GOTO(out, rc);
                        rc = nrs_policy_ctl(nrs_svcpt2nrs(svcpt, false), name,
                                            opc, arg);
                        if (rc != 0 || (queue == PTLRPC_NRS_QUEUE_REG &&
                                        single))
                                GOTO(out, rc);
+               }
 
 
-                       if (queue == PTLRPC_NRS_QUEUE_REG)
-                               break;
-
-                       /* fallthrough */
-
-               case PTLRPC_NRS_QUEUE_HP:
+               if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
                        /**
                         * XXX: We could optionally check for
                         * nrs_svc_has_hp(svc) here, and return an error if it
                        /**
                         * XXX: We could optionally check for
                         * nrs_svc_has_hp(svc) here, and return an error if it
@@ -1802,58 +1732,65 @@ ptlrpc_nrs_policy_control(struct ptlrpc_service *svc,
                                            opc, arg);
                        if (rc != 0 || single)
                                GOTO(out, rc);
                                            opc, arg);
                        if (rc != 0 || single)
                                GOTO(out, rc);
-
-                       break;
                }
        }
 out:
        RETURN(rc);
 }
 
                }
        }
 out:
        RETURN(rc);
 }
 
+
+/* ptlrpc/nrs_fifo.c */
+extern struct ptlrpc_nrs_pol_conf nrs_conf_fifo;
+
 /**
 /**
- * Adds all policies that ship with NRS, i.e. those in the \e nrs_pols_builtin
- * array, to NRS core's list of policies \e nrs_core.nrs_policies.
+ * Adds all policies that ship with the ptlrpc module, to NRS core's list of
+ * policies \e nrs_core.nrs_policies.
  *
  *
- * \retval 0 All policy descriptors in \e nrs_pols_builtin have been added
- *          successfully to \e nrs_core.nrs_policies
+ * \retval 0 all policies have been registered successfully
+ * \retval -ve error
  */
  */
-int
-ptlrpc_nrs_init(void)
+int ptlrpc_nrs_init(void)
 {
 {
-       int     rc = -EINVAL;
-       int     i;
+       int     rc;
        ENTRY;
 
        ENTRY;
 
-       /**
-        * Initialize the NRS core object.
-        */
        mutex_init(&nrs_core.nrs_mutex);
        mutex_init(&nrs_core.nrs_mutex);
-       CFS_INIT_LIST_HEAD(&nrs_core.nrs_heads);
        CFS_INIT_LIST_HEAD(&nrs_core.nrs_policies);
 
        CFS_INIT_LIST_HEAD(&nrs_core.nrs_policies);
 
-       for (i = 0; i < ARRAY_SIZE(nrs_pols_builtin); i++) {
-               /**
-                * No need to take nrs_core.nrs_mutex as there is no contention at
-                * this early stage.
-                */
-               rc = nrs_policy_exists_locked(nrs_pols_builtin[i]);
-               /**
-                * This should not fail for in-tree policies.
-                */
-               LASSERT(rc == false);
-               cfs_list_add_tail(&nrs_pols_builtin[i]->pd_list,
-                                 &nrs_core.nrs_policies);
-       }
+       rc = ptlrpc_nrs_policy_register(&nrs_conf_fifo);
+       if (rc != 0)
+               GOTO(fail, rc);
+
+       RETURN(rc);
+fail:
+       /**
+        * Since no PTLRPC services have been started at this point, all we need
+        * to do for cleanup is to free the descriptors.
+        */
+       ptlrpc_nrs_fini();
 
        RETURN(rc);
 }
 
 /**
 
        RETURN(rc);
 }
 
 /**
- * Stub finalization function
+ * Removes all policy desciptors from nrs_core::nrs_policies, and frees the
+ * policy descriptors.
+ *
+ * Since all PTLRPC services are stopped at this point, there are no more
+ * instances of any policies, because each service will have stopped its policy
+ * instances in ptlrpc_service_nrs_cleanup(), so we just need to free the
+ * descriptors here.
  */
  */
-void
-ptlrpc_nrs_fini(void)
+void ptlrpc_nrs_fini(void)
 {
 {
+       struct ptlrpc_nrs_pol_desc *desc;
+       struct ptlrpc_nrs_pol_desc *tmp;
+
+       cfs_list_for_each_entry_safe(desc, tmp, &nrs_core.nrs_policies,
+                                    pd_list) {
+               cfs_list_del_init(&desc->pd_list);
+               OBD_FREE_PTR(desc);
+       }
 }
 
 /** @} nrs */
 }
 
 /** @} nrs */
index 1f4de96..a77d533 100644 (file)
@@ -64,6 +64,8 @@
  * @{
  */
 
  * @{
  */
 
+#define NRS_POL_NAME_FIFO      "fifo"
+
 /**
  * Is called before the policy transitions into
  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
 /**
  * Is called before the policy transitions into
  * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
@@ -77,8 +79,7 @@
  * \see nrs_policy_register()
  * \see nrs_policy_ctl()
  */
  * \see nrs_policy_register()
  * \see nrs_policy_ctl()
  */
-static int
-nrs_fifo_start(struct ptlrpc_nrs_policy *policy)
+static int nrs_fifo_start(struct ptlrpc_nrs_policy *policy)
 {
        struct nrs_fifo_head *head;
 
 {
        struct nrs_fifo_head *head;
 
@@ -100,8 +101,7 @@ nrs_fifo_start(struct ptlrpc_nrs_policy *policy)
  *
  * \see nrs_policy_stop0()
  */
  *
  * \see nrs_policy_stop0()
  */
-static void
-nrs_fifo_stop(struct ptlrpc_nrs_policy *policy)
+static void nrs_fifo_stop(struct ptlrpc_nrs_policy *policy)
 {
        struct nrs_fifo_head *head = policy->pol_private;
 
 {
        struct nrs_fifo_head *head = policy->pol_private;
 
@@ -129,11 +129,10 @@ nrs_fifo_stop(struct ptlrpc_nrs_policy *policy)
  *
  * \see nrs_resource_get_safe()
  */
  *
  * \see nrs_resource_get_safe()
  */
-static int
-nrs_fifo_res_get(struct ptlrpc_nrs_policy *policy,
-                struct ptlrpc_nrs_request *nrq,
-                struct ptlrpc_nrs_resource *parent,
-                struct ptlrpc_nrs_resource **resp, bool moving_req)
+static int nrs_fifo_res_get(struct ptlrpc_nrs_policy *policy,
+                           struct ptlrpc_nrs_request *nrq,
+                           const struct ptlrpc_nrs_resource *parent,
+                           struct ptlrpc_nrs_resource **resp, bool moving_req)
 {
        /**
         * Just return the resource embedded inside nrs_fifo_head, and end this
 {
        /**
         * Just return the resource embedded inside nrs_fifo_head, and end this
@@ -144,24 +143,46 @@ nrs_fifo_res_get(struct ptlrpc_nrs_policy *policy,
 }
 
 /**
 }
 
 /**
- * Called when polling the fifo policy for a request.
+ * Called when getting a request from the FIFO policy for handling, or just
+ * peeking; removes the request from the policy when it is to be handled.
  *
  *
- * \param[in] policy The policy being polled
+ * \param[in] policy The policy
+ * \param[in] peek   When set, signifies that we just want to examine the
+ *                  request, and not handle it, so the request is not removed
+ *                  from the policy.
+ * \param[in] force  Force the policy to return a request; unused in this
+ *                  policy
  *
  * \retval The request to be handled; this is the next request in the FIFO
  *        queue
  *
  * \retval The request to be handled; this is the next request in the FIFO
  *        queue
- * \see ptlrpc_nrs_req_poll_nolock()
+ *
+ * \see ptlrpc_nrs_req_get_nolock()
+ * \see nrs_request_get()
  */
  */
-static struct ptlrpc_nrs_request *
-nrs_fifo_req_poll(struct ptlrpc_nrs_policy *policy)
+static
+struct ptlrpc_nrs_request * nrs_fifo_req_get(struct ptlrpc_nrs_policy *policy,
+                                            bool peek, bool force)
 {
 {
-       struct nrs_fifo_head *head = policy->pol_private;
+       struct nrs_fifo_head      *head = policy->pol_private;
+       struct ptlrpc_nrs_request *nrq;
 
 
-       LASSERT(head != NULL);
+       nrq = unlikely(cfs_list_empty(&head->fh_list)) ? NULL :
+             cfs_list_entry(head->fh_list.next, struct ptlrpc_nrs_request,
+                            nr_u.fifo.fr_list);
+
+       if (likely(!peek && nrq != NULL)) {
+               struct ptlrpc_request *req = container_of(nrq,
+                                                         struct ptlrpc_request,
+                                                         rq_nrq);
 
 
-       return cfs_list_empty(&head->fh_list) ? NULL :
-              cfs_list_entry(head->fh_list.next, struct ptlrpc_nrs_request,
-                             nr_u.fifo.fr_list);
+               cfs_list_del_init(&nrq->nr_u.fifo.fr_list);
+
+               CDEBUG(D_RPCTRACE, "NRS start %s request from %s, seq: "LPU64
+                      "\n", policy->pol_desc->pd_name,
+                      libcfs_id2str(req->rq_peer), nrq->nr_u.fifo.fr_sequence);
+       }
+
+       return nrq;
 }
 
 /**
 }
 
 /**
@@ -173,9 +194,8 @@ nrs_fifo_req_poll(struct ptlrpc_nrs_policy *policy)
  * \retval 0 success; nrs_request_enqueue() assumes this function will always
  *                   succeed
  */
  * \retval 0 success; nrs_request_enqueue() assumes this function will always
  *                   succeed
  */
-static int
-nrs_fifo_req_add(struct ptlrpc_nrs_policy *policy,
-                struct ptlrpc_nrs_request *nrq)
+static int nrs_fifo_req_add(struct ptlrpc_nrs_policy *policy,
+                           struct ptlrpc_nrs_request *nrq)
 {
        struct nrs_fifo_head *head;
 
 {
        struct nrs_fifo_head *head;
 
@@ -196,34 +216,14 @@ nrs_fifo_req_add(struct ptlrpc_nrs_policy *policy,
  * \param[in] policy The policy
  * \param[in] nrq    The request to remove
  */
  * \param[in] policy The policy
  * \param[in] nrq    The request to remove
  */
-static void
-nrs_fifo_req_del(struct ptlrpc_nrs_policy *policy,
-                struct ptlrpc_nrs_request *nrq)
+static void nrs_fifo_req_del(struct ptlrpc_nrs_policy *policy,
+                            struct ptlrpc_nrs_request *nrq)
 {
        LASSERT(!cfs_list_empty(&nrq->nr_u.fifo.fr_list));
        cfs_list_del_init(&nrq->nr_u.fifo.fr_list);
 }
 
 /**
 {
        LASSERT(!cfs_list_empty(&nrq->nr_u.fifo.fr_list));
        cfs_list_del_init(&nrq->nr_u.fifo.fr_list);
 }
 
 /**
- * Prints a debug statement right before the request \a nrq starts being
- * handled.
- *
- * \param[in] policy The policy handling the request
- * \param[in] nrq    The request being handled
- */
-static void
-nrs_fifo_req_start(struct ptlrpc_nrs_policy *policy,
-                  struct ptlrpc_nrs_request *nrq)
-{
-       struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
-                                                 rq_nrq);
-
-       CDEBUG(D_RPCTRACE, "NRS start %s request from %s, seq: "LPU64"\n",
-              nrs_request_policy(nrq)->pol_name, libcfs_id2str(req->rq_peer),
-              nrq->nr_u.fifo.fr_sequence);
-}
-
-/**
  * Prints a debug statement right before the request \a nrq stops being
  * handled.
  *
  * Prints a debug statement right before the request \a nrq stops being
  * handled.
  *
@@ -233,40 +233,38 @@ nrs_fifo_req_start(struct ptlrpc_nrs_policy *policy,
  * \see ptlrpc_server_finish_request()
  * \see ptlrpc_nrs_req_stop_nolock()
  */
  * \see ptlrpc_server_finish_request()
  * \see ptlrpc_nrs_req_stop_nolock()
  */
-static void
-nrs_fifo_req_stop(struct ptlrpc_nrs_policy *policy,
-                 struct ptlrpc_nrs_request *nrq)
+static void nrs_fifo_req_stop(struct ptlrpc_nrs_policy *policy,
+                             struct ptlrpc_nrs_request *nrq)
 {
        struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
                                                  rq_nrq);
 
        CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: "LPU64"\n",
 {
        struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
                                                  rq_nrq);
 
        CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: "LPU64"\n",
-              nrs_request_policy(nrq)->pol_name, libcfs_id2str(req->rq_peer),
+              policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
               nrq->nr_u.fifo.fr_sequence);
 }
 
 /**
  * FIFO policy operations
  */
               nrq->nr_u.fifo.fr_sequence);
 }
 
 /**
  * FIFO policy operations
  */
-static struct ptlrpc_nrs_pol_ops nrs_fifo_ops = {
+static const struct ptlrpc_nrs_pol_ops nrs_fifo_ops = {
        .op_policy_start        = nrs_fifo_start,
        .op_policy_stop         = nrs_fifo_stop,
        .op_res_get             = nrs_fifo_res_get,
        .op_policy_start        = nrs_fifo_start,
        .op_policy_stop         = nrs_fifo_stop,
        .op_res_get             = nrs_fifo_res_get,
-       .op_req_poll            = nrs_fifo_req_poll,
+       .op_req_get             = nrs_fifo_req_get,
        .op_req_enqueue         = nrs_fifo_req_add,
        .op_req_dequeue         = nrs_fifo_req_del,
        .op_req_enqueue         = nrs_fifo_req_add,
        .op_req_dequeue         = nrs_fifo_req_del,
-       .op_req_start           = nrs_fifo_req_start,
        .op_req_stop            = nrs_fifo_req_stop,
 };
 
 /**
        .op_req_stop            = nrs_fifo_req_stop,
 };
 
 /**
- * FIFO policy descriptor
+ * FIFO policy configuration
  */
  */
-struct ptlrpc_nrs_pol_desc ptlrpc_nrs_fifo_desc = {
-       .pd_name                = "fifo",
-       .pd_ops                 = &nrs_fifo_ops,
-       .pd_compat              = nrs_policy_compat_all,
-       .pd_flags               = PTLRPC_NRS_FL_FALLBACK |
+struct ptlrpc_nrs_pol_conf nrs_conf_fifo = {
+       .nc_name                = NRS_POL_NAME_FIFO,
+       .nc_ops                 = &nrs_fifo_ops,
+       .nc_compat              = nrs_policy_compat_all,
+       .nc_flags               = PTLRPC_NRS_FL_FALLBACK |
                                  PTLRPC_NRS_FL_REG_START
 };
 
                                  PTLRPC_NRS_FL_REG_START
 };
 
index 5c24498..ca0a582 100644 (file)
@@ -95,26 +95,19 @@ void ptlrpc_lprocfs_do_request_stat (struct ptlrpc_request *req,
  */
 struct nrs_core {
        /**
  */
 struct nrs_core {
        /**
-        * Protects nrs_core::nrs_heads, nrs_core::nrs_policies, serializes
-        * external policy registration/unregistration, and NRS core lprocfs
-        * operations.
+        * Protects nrs_core::nrs_policies, serializes external policy
+        * registration/unregistration, and NRS core lprocfs operations.
         */
        struct mutex nrs_mutex;
        /* XXX: This is just for liblustre. Remove the #if defined directive
         * when the * "cfs_" prefix is dropped from cfs_list_head. */
 #if defined (__linux__) && defined(__KERNEL__)
        /**
         */
        struct mutex nrs_mutex;
        /* XXX: This is just for liblustre. Remove the #if defined directive
         * when the * "cfs_" prefix is dropped from cfs_list_head. */
 #if defined (__linux__) && defined(__KERNEL__)
        /**
-        * List of all NRS heads on all service partitions of all services;
-        * protected by nrs_core::nrs_mutex.
-        */
-       struct list_head nrs_heads;
-       /**
         * List of all policy descriptors registered with NRS core; protected
         * by nrs_core::nrs_mutex.
         */
        struct list_head nrs_policies;
 #else
         * List of all policy descriptors registered with NRS core; protected
         * by nrs_core::nrs_mutex.
         */
        struct list_head nrs_policies;
 #else
-       struct cfs_list_head nrs_heads;
        struct cfs_list_head nrs_policies;
 #endif
 
        struct cfs_list_head nrs_policies;
 #endif
 
@@ -126,31 +119,43 @@ void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc);
 void ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
                               struct ptlrpc_request *req, bool hp);
 void ptlrpc_nrs_req_finalize(struct ptlrpc_request *req);
 void ptlrpc_nrs_req_initialize(struct ptlrpc_service_part *svcpt,
                               struct ptlrpc_request *req, bool hp);
 void ptlrpc_nrs_req_finalize(struct ptlrpc_request *req);
-void ptlrpc_nrs_req_start_nolock(struct ptlrpc_request *req);
 void ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req);
 void ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
                        struct ptlrpc_request *req, bool hp);
 void ptlrpc_nrs_req_stop_nolock(struct ptlrpc_request *req);
 void ptlrpc_nrs_req_add(struct ptlrpc_service_part *svcpt,
                        struct ptlrpc_request *req, bool hp);
+
 struct ptlrpc_request *
 struct ptlrpc_request *
-ptlrpc_nrs_req_poll_nolock(struct ptlrpc_service_part *svcpt,
-                                                 bool hp);
+ptlrpc_nrs_req_get_nolock0(struct ptlrpc_service_part *svcpt, bool hp,
+                          bool peek, bool force);
+
+static inline struct ptlrpc_request *
+ptlrpc_nrs_req_get_nolock(struct ptlrpc_service_part *svcpt, bool hp,
+                         bool force)
+{
+       return ptlrpc_nrs_req_get_nolock0(svcpt, hp, false, force);
+}
+
+static inline struct ptlrpc_request *
+ptlrpc_nrs_req_peek_nolock(struct ptlrpc_service_part *svcpt, bool hp)
+{
+       return ptlrpc_nrs_req_get_nolock0(svcpt, hp, true, false);
+}
+
 void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req);
 bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp);
 
 void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req);
 bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp);
 
-int ptlrpc_nrs_policy_control(struct ptlrpc_service *svc,
+int ptlrpc_nrs_policy_control(const struct ptlrpc_service *svc,
                              enum ptlrpc_nrs_queue_type queue, char *name,
                              enum ptlrpc_nrs_ctl opc, bool single, void *arg);
 
 int ptlrpc_nrs_init(void);
 void ptlrpc_nrs_fini(void);
 
                              enum ptlrpc_nrs_queue_type queue, char *name,
                              enum ptlrpc_nrs_ctl opc, bool single, void *arg);
 
 int ptlrpc_nrs_init(void);
 void ptlrpc_nrs_fini(void);
 
-static inline int
-nrs_svcpt_has_hp(struct ptlrpc_service_part *svcpt)
+static inline bool nrs_svcpt_has_hp(const struct ptlrpc_service_part *svcpt)
 {
        return svcpt->scp_nrs_hp != NULL;
 }
 
 {
        return svcpt->scp_nrs_hp != NULL;
 }
 
-static inline int
-nrs_svc_has_hp(struct ptlrpc_service *svc)
+static inline bool nrs_svc_has_hp(const struct ptlrpc_service *svc)
 {
        /**
         * If the first service partition has an HP NRS head, all service
 {
        /**
         * If the first service partition has an HP NRS head, all service
@@ -159,33 +164,32 @@ nrs_svc_has_hp(struct ptlrpc_service *svc)
        return nrs_svcpt_has_hp(svc->srv_parts[0]);
 }
 
        return nrs_svcpt_has_hp(svc->srv_parts[0]);
 }
 
-static inline struct ptlrpc_nrs *
-nrs_svcpt2nrs(struct ptlrpc_service_part *svcpt, bool hp)
+static inline
+struct ptlrpc_nrs *nrs_svcpt2nrs(struct ptlrpc_service_part *svcpt, bool hp)
 {
        LASSERT(ergo(hp, nrs_svcpt_has_hp(svcpt)));
        return hp ? svcpt->scp_nrs_hp : &svcpt->scp_nrs_reg;
 }
 
 {
        LASSERT(ergo(hp, nrs_svcpt_has_hp(svcpt)));
        return hp ? svcpt->scp_nrs_hp : &svcpt->scp_nrs_reg;
 }
 
-static inline int
-nrs_pol2cptid(struct ptlrpc_nrs_policy *policy)
+static inline int nrs_pol2cptid(const struct ptlrpc_nrs_policy *policy)
 {
        return policy->pol_nrs->nrs_svcpt->scp_cpt;
 }
 
 {
        return policy->pol_nrs->nrs_svcpt->scp_cpt;
 }
 
-static inline struct ptlrpc_service *
-nrs_pol2svc(struct ptlrpc_nrs_policy *policy)
+static inline
+struct ptlrpc_service *nrs_pol2svc(struct ptlrpc_nrs_policy *policy)
 {
        return policy->pol_nrs->nrs_svcpt->scp_service;
 }
 
 {
        return policy->pol_nrs->nrs_svcpt->scp_service;
 }
 
-static inline struct ptlrpc_service_part *
-nrs_pol2svcpt(struct ptlrpc_nrs_policy *policy)
+static inline
+struct ptlrpc_service_part *nrs_pol2svcpt(struct ptlrpc_nrs_policy *policy)
 {
        return policy->pol_nrs->nrs_svcpt;
 }
 
 {
        return policy->pol_nrs->nrs_svcpt;
 }
 
-static inline struct cfs_cpt_table *
-nrs_pol2cptab(struct ptlrpc_nrs_policy *policy)
+static inline
+struct cfs_cpt_table *nrs_pol2cptab(struct ptlrpc_nrs_policy *policy)
 {
        return nrs_pol2svc(policy)->srv_cptable;
 }
 {
        return nrs_pol2svc(policy)->srv_cptable;
 }
@@ -199,8 +203,8 @@ nrs_request_resource(struct ptlrpc_nrs_request *nrq)
        return nrq->nr_res_ptrs[nrq->nr_res_idx];
 }
 
        return nrq->nr_res_ptrs[nrq->nr_res_idx];
 }
 
-static inline struct ptlrpc_nrs_policy *
-nrs_request_policy(struct ptlrpc_nrs_request *nrq)
+static inline
+struct ptlrpc_nrs_policy *nrs_request_policy(struct ptlrpc_nrs_request *nrq)
 {
        return nrs_request_resource(nrq)->res_policy;
 }
 {
        return nrs_request_resource(nrq)->res_policy;
 }
index 5b2a77f..63fee6d 100644 (file)
@@ -1583,16 +1583,16 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
  * User can call it w/o any lock but need to hold
  * ptlrpc_service_part::scp_req_lock to get reliable result
  */
  * User can call it w/o any lock but need to hold
  * ptlrpc_service_part::scp_req_lock to get reliable result
  */
-static int ptlrpc_server_allow_high(struct ptlrpc_service_part *svcpt,
-                                   int force)
+static bool ptlrpc_server_allow_high(struct ptlrpc_service_part *svcpt,
+                                    bool force)
 {
        int running = svcpt->scp_nthrs_running;
 
        if (!nrs_svcpt_has_hp(svcpt))
 {
        int running = svcpt->scp_nthrs_running;
 
        if (!nrs_svcpt_has_hp(svcpt))
-               return 0;
+               return false;
 
        if (force)
 
        if (force)
-               return 1;
+               return true;
 
        if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL &&
                     CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) {
 
        if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL &&
                     CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) {
@@ -1603,17 +1603,17 @@ static int ptlrpc_server_allow_high(struct ptlrpc_service_part *svcpt,
        }
 
        if (svcpt->scp_nreqs_active >= running - 1)
        }
 
        if (svcpt->scp_nreqs_active >= running - 1)
-               return 0;
+               return false;
 
        if (svcpt->scp_nhreqs_active == 0)
 
        if (svcpt->scp_nhreqs_active == 0)
-               return 1;
+               return true;
 
        return !ptlrpc_nrs_req_pending_nolock(svcpt, false) ||
               svcpt->scp_hreq_count < svcpt->scp_service->srv_hpreq_ratio;
 }
 
 
        return !ptlrpc_nrs_req_pending_nolock(svcpt, false) ||
               svcpt->scp_hreq_count < svcpt->scp_service->srv_hpreq_ratio;
 }
 
-static int ptlrpc_server_high_pending(struct ptlrpc_service_part *svcpt,
-                                     int force)
+static bool ptlrpc_server_high_pending(struct ptlrpc_service_part *svcpt,
+                                      bool force)
 {
        return ptlrpc_server_allow_high(svcpt, force) &&
               ptlrpc_nrs_req_pending_nolock(svcpt, true);
 {
        return ptlrpc_server_allow_high(svcpt, force) &&
               ptlrpc_nrs_req_pending_nolock(svcpt, true);
@@ -1628,13 +1628,13 @@ static int ptlrpc_server_high_pending(struct ptlrpc_service_part *svcpt,
  * User can call it w/o any lock but need to hold
  * ptlrpc_service_part::scp_req_lock to get reliable result
  */
  * User can call it w/o any lock but need to hold
  * ptlrpc_service_part::scp_req_lock to get reliable result
  */
-static int ptlrpc_server_allow_normal(struct ptlrpc_service_part *svcpt,
-                                     int force)
+static bool ptlrpc_server_allow_normal(struct ptlrpc_service_part *svcpt,
+                                      bool force)
 {
        int running = svcpt->scp_nthrs_running;
 #ifndef __KERNEL__
        if (1) /* always allow to handle normal request for liblustre */
 {
        int running = svcpt->scp_nthrs_running;
 #ifndef __KERNEL__
        if (1) /* always allow to handle normal request for liblustre */
-               return 1;
+               return true;
 #endif
        if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL &&
                     CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) {
 #endif
        if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL &&
                     CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) {
@@ -1646,17 +1646,16 @@ static int ptlrpc_server_allow_normal(struct ptlrpc_service_part *svcpt,
 
        if (force ||
            svcpt->scp_nreqs_active < running - 2)
 
        if (force ||
            svcpt->scp_nreqs_active < running - 2)
-               return 1;
+               return true;
 
        if (svcpt->scp_nreqs_active >= running - 1)
 
        if (svcpt->scp_nreqs_active >= running - 1)
-               return 0;
+               return false;
 
 
-       return svcpt->scp_nhreqs_active > 0 ||
-              !nrs_svcpt_has_hp(svcpt);
+       return svcpt->scp_nhreqs_active > 0 || !nrs_svcpt_has_hp(svcpt);
 }
 
 }
 
-static int ptlrpc_server_normal_pending(struct ptlrpc_service_part *svcpt,
-                                       int force)
+static bool ptlrpc_server_normal_pending(struct ptlrpc_service_part *svcpt,
+                                        bool force)
 {
        return ptlrpc_server_allow_normal(svcpt, force) &&
               ptlrpc_nrs_req_pending_nolock(svcpt, false);
 {
        return ptlrpc_server_allow_normal(svcpt, force) &&
               ptlrpc_nrs_req_pending_nolock(svcpt, false);
@@ -1670,8 +1669,8 @@ static int ptlrpc_server_normal_pending(struct ptlrpc_service_part *svcpt,
  * \see ptlrpc_server_allow_normal
  * \see ptlrpc_server_allow high
  */
  * \see ptlrpc_server_allow_normal
  * \see ptlrpc_server_allow high
  */
-static inline int
-ptlrpc_server_request_pending(struct ptlrpc_service_part *svcpt, int force)
+static inline bool
+ptlrpc_server_request_pending(struct ptlrpc_service_part *svcpt, bool force)
 {
        return ptlrpc_server_high_pending(svcpt, force) ||
               ptlrpc_server_normal_pending(svcpt, force);
 {
        return ptlrpc_server_high_pending(svcpt, force) ||
               ptlrpc_server_normal_pending(svcpt, force);
@@ -1683,21 +1682,25 @@ ptlrpc_server_request_pending(struct ptlrpc_service_part *svcpt, int force)
  * Returns a pointer to fetched request.
  */
 static struct ptlrpc_request *
  * Returns a pointer to fetched request.
  */
 static struct ptlrpc_request *
-ptlrpc_server_request_get(struct ptlrpc_service_part *svcpt, int force)
+ptlrpc_server_request_get(struct ptlrpc_service_part *svcpt, bool force)
 {
        struct ptlrpc_request *req;
        ENTRY;
 
        if (ptlrpc_server_high_pending(svcpt, force)) {
 {
        struct ptlrpc_request *req;
        ENTRY;
 
        if (ptlrpc_server_high_pending(svcpt, force)) {
-               req = ptlrpc_nrs_req_poll_nolock(svcpt, true);
-               svcpt->scp_hreq_count++;
-               RETURN(req);
+               req = ptlrpc_nrs_req_get_nolock(svcpt, true, force);
+               if (req != NULL) {
+                       svcpt->scp_hreq_count++;
+                       RETURN(req);
+               }
        }
 
        if (ptlrpc_server_normal_pending(svcpt, force)) {
        }
 
        if (ptlrpc_server_normal_pending(svcpt, force)) {
-               req = ptlrpc_nrs_req_poll_nolock(svcpt, false);
-               svcpt->scp_hreq_count = 0;
-               RETURN(req);
+               req = ptlrpc_nrs_req_get_nolock(svcpt, false, force);
+               if (req != NULL) {
+                       svcpt->scp_hreq_count = 0;
+                       RETURN(req);
+               }
        }
        RETURN(NULL);
 }
        }
        RETURN(NULL);
 }
@@ -1878,8 +1881,8 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
                RETURN(0);
        }
 #endif
                RETURN(0);
        }
 #endif
-       request = ptlrpc_server_request_get(svcpt, 0);
-       if  (request == NULL) {
+       request = ptlrpc_server_request_get(svcpt, false);
+       if (request == NULL) {
                spin_unlock(&svcpt->scp_req_lock);
                 RETURN(0);
         }
                spin_unlock(&svcpt->scp_req_lock);
                 RETURN(0);
         }
@@ -1896,19 +1899,12 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
                        OBD_FAIL_TIMEOUT(fail_opc, 4);
 
                        spin_lock(&svcpt->scp_req_lock);
                        OBD_FAIL_TIMEOUT(fail_opc, 4);
 
                        spin_lock(&svcpt->scp_req_lock);
-                       request = ptlrpc_server_request_get(svcpt, 0);
-                       if  (request == NULL) {
-                               spin_unlock(&svcpt->scp_req_lock);
-                               RETURN(0);
-                       }
                }
        }
                }
        }
-       ptlrpc_nrs_req_del_nolock(request);
        svcpt->scp_nreqs_active++;
        if (request->rq_hp)
                svcpt->scp_nhreqs_active++;
 
        svcpt->scp_nreqs_active++;
        if (request->rq_hp)
                svcpt->scp_nhreqs_active++;
 
-       ptlrpc_nrs_req_start_nolock(request);
        spin_unlock(&svcpt->scp_req_lock);
 
         ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
        spin_unlock(&svcpt->scp_req_lock);
 
         ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
@@ -2334,7 +2330,7 @@ ptlrpc_wait_event(struct ptlrpc_service_part *svcpt,
        l_wait_event_exclusive_head(svcpt->scp_waitq,
                                ptlrpc_thread_stopping(thread) ||
                                ptlrpc_server_request_incoming(svcpt) ||
        l_wait_event_exclusive_head(svcpt->scp_waitq,
                                ptlrpc_thread_stopping(thread) ||
                                ptlrpc_server_request_incoming(svcpt) ||
-                               ptlrpc_server_request_pending(svcpt, 0) ||
+                               ptlrpc_server_request_pending(svcpt, false) ||
                                ptlrpc_rqbd_pending(svcpt) ||
                                ptlrpc_at_check(svcpt), &lwi);
 
                                ptlrpc_rqbd_pending(svcpt) ||
                                ptlrpc_at_check(svcpt), &lwi);
 
@@ -2480,7 +2476,7 @@ static int ptlrpc_main(void *arg)
                if (ptlrpc_at_check(svcpt))
                        ptlrpc_at_check_timed(svcpt);
 
                if (ptlrpc_at_check(svcpt))
                        ptlrpc_at_check_timed(svcpt);
 
-               if (ptlrpc_server_request_pending(svcpt, 0)) {
+               if (ptlrpc_server_request_pending(svcpt, false)) {
                        lu_context_enter(&env->le_ctx);
                        ptlrpc_server_handle_request(svcpt, thread);
                        lu_context_exit(&env->le_ctx);
                        lu_context_enter(&env->le_ctx);
                        ptlrpc_server_handle_request(svcpt, thread);
                        lu_context_exit(&env->le_ctx);
@@ -3046,9 +3042,8 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc)
                        ptlrpc_server_finish_request(svcpt, req);
                }
 
                        ptlrpc_server_finish_request(svcpt, req);
                }
 
-               while (ptlrpc_server_request_pending(svcpt, 1)) {
-                       req = ptlrpc_server_request_get(svcpt, 1);
-                       ptlrpc_nrs_req_del_nolock(req);
+               while (ptlrpc_server_request_pending(svcpt, true)) {
+                       req = ptlrpc_server_request_get(svcpt, true);
                        svcpt->scp_nreqs_active++;
                        ptlrpc_server_hpreq_fini(req);
 
                        svcpt->scp_nreqs_active++;
                        ptlrpc_server_hpreq_fini(req);
 
@@ -3166,10 +3161,10 @@ int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
 
        spin_lock(&svcpt->scp_req_lock);
         /* How long has the next entry been waiting? */
 
        spin_lock(&svcpt->scp_req_lock);
         /* How long has the next entry been waiting? */
-       if (ptlrpc_server_high_pending(svcpt, 1))
-               request = ptlrpc_nrs_req_poll_nolock(svcpt, true);
-       else if (ptlrpc_server_normal_pending(svcpt, 1))
-               request = ptlrpc_nrs_req_poll_nolock(svcpt, false);
+       if (ptlrpc_server_high_pending(svcpt, true))
+               request = ptlrpc_nrs_req_peek_nolock(svcpt, true);
+       else if (ptlrpc_server_normal_pending(svcpt, true))
+               request = ptlrpc_nrs_req_peek_nolock(svcpt, false);
 
        if (request == NULL) {
                spin_unlock(&svcpt->scp_req_lock);
 
        if (request == NULL) {
                spin_unlock(&svcpt->scp_req_lock);