Whamcloud - gitweb
LU-17504 build: fix gcc-13 [-Werror=stringop-overread] error
[fs/lustre-release.git] / lustre / ptlrpc / nrs.c
index 5343ab3..3f5b59d 100644 (file)
@@ -20,7 +20,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2011 Intel Corporation
+ * Copyright (c) 2014, 2016, Intel Corporation.
  *
  * Copyright 2012 Xyratex Technology Limited
  */
@@ -40,9 +40,6 @@
  */
 
 #define DEBUG_SUBSYSTEM S_RPC
-#ifndef __KERNEL__
-#include <liblustre.h>
-#endif
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_net.h>
 #include <libcfs/libcfs.h>
 #include "ptlrpc_internal.h"
 
-/* XXX: This is just for liblustre. Remove the #if defined directive when the
- * "cfs_" prefix is dropped from cfs_list_head. */
-#if defined (__linux__) && defined(__KERNEL__)
-extern struct list_head ptlrpc_all_services;
-#else
-extern struct cfs_list_head ptlrpc_all_services;
-#endif
-
 /**
  * NRS core object.
  */
@@ -72,6 +61,7 @@ static int nrs_policy_init(struct ptlrpc_nrs_policy *policy)
 static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
 {
        LASSERT(policy->pol_ref == 0);
+       LASSERT(refcount_read(&policy->pol_start_ref) == 0);
        LASSERT(policy->pol_req_queued == 0);
 
        if (policy->pol_desc->pd_ops->op_policy_fini != NULL)
@@ -97,31 +87,47 @@ static int nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy,
 
 static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
 {
-       struct ptlrpc_nrs *nrs = policy->pol_nrs;
        ENTRY;
 
-       if (policy->pol_desc->pd_ops->op_policy_stop != NULL) {
-               spin_unlock(&nrs->nrs_lock);
-
+       if (policy->pol_desc->pd_ops->op_policy_stop != NULL)
                policy->pol_desc->pd_ops->op_policy_stop(policy);
 
-               spin_lock(&nrs->nrs_lock);
-       }
-
-       LASSERT(cfs_list_empty(&policy->pol_list_queued));
+       LASSERT(list_empty(&policy->pol_list_queued));
        LASSERT(policy->pol_req_queued == 0 &&
                policy->pol_req_started == 0);
 
        policy->pol_private = NULL;
+       policy->pol_arg[0] = '\0';
 
        policy->pol_state = NRS_POL_STATE_STOPPED;
+       wake_up(&policy->pol_wq);
 
-       if (cfs_atomic_dec_and_test(&policy->pol_desc->pd_refs))
-               cfs_module_put(policy->pol_desc->pd_owner);
+       if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
+               module_put(policy->pol_desc->pd_owner);
 
        EXIT;
 }
 
+/**
+ * Increases the policy's usage started reference count.
+ */
+static inline void nrs_policy_started_get(struct ptlrpc_nrs_policy *policy)
+{
+       refcount_inc(&policy->pol_start_ref);
+}
+
+/**
+ * Decreases the policy's usage started reference count, and stops the policy
+ * in case it was already stopping and have no more outstanding usage
+ * references (which indicates it has no more queued or started requests, and
+ * can be safely stopped).
+ */
+static void nrs_policy_started_put(struct ptlrpc_nrs_policy *policy)
+{
+       if (refcount_dec_and_test(&policy->pol_start_ref))
+               nrs_policy_stop0(policy);
+}
+
 static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
 {
        struct ptlrpc_nrs *nrs = policy->pol_nrs;
@@ -148,9 +154,18 @@ static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
                nrs->nrs_policy_fallback = NULL;
        }
 
-       /* I have the only refcount */
-       if (policy->pol_ref == 1)
-               nrs_policy_stop0(policy);
+       /* Drop started ref and wait for requests to be drained */
+       spin_unlock(&nrs->nrs_lock);
+       nrs_policy_started_put(policy);
+
+       wait_event_timeout(policy->pol_wq,
+                          policy->pol_state == NRS_POL_STATE_STOPPED,
+                          cfs_time_seconds(30));
+
+       spin_lock(&nrs->nrs_lock);
+
+       if (policy->pol_state != NRS_POL_STATE_STOPPED)
+               RETURN(-EBUSY);
 
        RETURN(0);
 }
@@ -182,8 +197,10 @@ static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
        LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED);
        tmp->pol_state = NRS_POL_STATE_STOPPING;
 
-       if (tmp->pol_ref == 0)
-               nrs_policy_stop0(tmp);
+       /* Drop started ref to free the policy */
+       spin_unlock(&nrs->nrs_lock);
+       nrs_policy_started_put(tmp);
+       spin_lock(&nrs->nrs_lock);
        EXIT;
 }
 
@@ -206,10 +223,12 @@ static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
  * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In
  * this case, the fallback policy is only left active in the NRS head.
  */
-static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
+static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg)
 {
-       struct ptlrpc_nrs      *nrs = policy->pol_nrs;
-       int                     rc = 0;
+       struct ptlrpc_nrs *nrs = policy->pol_nrs;
+       struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
+       char *srv_name = svcpt->scp_service->srv_name;
+       int rc = 0;
        ENTRY;
 
        /**
@@ -224,6 +243,13 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
        if (policy->pol_state == NRS_POL_STATE_STOPPING)
                RETURN(-EAGAIN);
 
+       if (arg && strlen(arg) >= sizeof(policy->pol_arg)) {
+               rc = -EINVAL;
+               CWARN("%s.%d NRS: arg '%s' is too long: rc = %d\n",
+                     srv_name, svcpt->scp_cpt, arg, rc);
+               return rc;
+       }
+
        if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
                /**
                 * This is for cases in which the user sets the policy to the
@@ -250,20 +276,33 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
                if (nrs->nrs_policy_fallback == NULL)
                        RETURN(-EPERM);
 
-               if (policy->pol_state == NRS_POL_STATE_STARTED)
-                       RETURN(0);
+               if (policy->pol_state == NRS_POL_STATE_STARTED) {
+                       /**
+                        * If the policy argument now is different from the last time,
+                        * stop the policy first and start it again with the new
+                        * argument.
+                        */
+                       if ((arg == NULL && strlen(policy->pol_arg) == 0) ||
+                           (arg != NULL && strcmp(policy->pol_arg, arg) == 0))
+                               RETURN(0);
+
+                       rc = nrs_policy_stop_locked(policy);
+                       if (rc)
+                               RETURN(rc);
+               }
        }
 
        /**
         * Increase the module usage count for policies registering from other
         * modules.
         */
-       if (cfs_atomic_inc_return(&policy->pol_desc->pd_refs) == 1 &&
-           !cfs_try_module_get(policy->pol_desc->pd_owner)) {
-               cfs_atomic_dec(&policy->pol_desc->pd_refs);
-               CERROR("NRS: cannot get module for policy %s; is it alive?\n",
-                      policy->pol_desc->pd_name);
-               RETURN(-ENODEV);
+       if (atomic_inc_return(&policy->pol_desc->pd_refs) == 1 &&
+           !try_module_get(policy->pol_desc->pd_owner)) {
+               atomic_dec(&policy->pol_desc->pd_refs);
+               rc = -ENODEV;
+               CERROR("%s.%d NRS: cannot get module for policy %s (is it alive?): rc = %d\n",
+                      srv_name, svcpt->scp_cpt, policy->pol_desc->pd_name, rc);
+               RETURN(rc);
        }
 
        /**
@@ -276,18 +315,23 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy)
        if (policy->pol_desc->pd_ops->op_policy_start) {
                spin_unlock(&nrs->nrs_lock);
 
-               rc = policy->pol_desc->pd_ops->op_policy_start(policy);
+               rc = policy->pol_desc->pd_ops->op_policy_start(policy, arg);
 
                spin_lock(&nrs->nrs_lock);
                if (rc != 0) {
-                       if (cfs_atomic_dec_and_test(&policy->pol_desc->pd_refs))
-                               cfs_module_put(policy->pol_desc->pd_owner);
+                       if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
+                               module_put(policy->pol_desc->pd_owner);
 
                        policy->pol_state = NRS_POL_STATE_STOPPED;
                        GOTO(out, rc);
                }
        }
 
+       if (arg)
+               strscpy(policy->pol_arg, arg, sizeof(policy->pol_arg));
+
+       /* take the started reference */
+       refcount_set(&policy->pol_start_ref, 1);
        policy->pol_state = NRS_POL_STATE_STARTED;
 
        if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
@@ -314,34 +358,23 @@ out:
 }
 
 /**
- * Increases the policy's usage reference count.
+ * Increases the policy's usage reference count (caller count).
  */
 static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
+__must_hold(&policy->pol_nrs->nrs_lock)
 {
        policy->pol_ref++;
 }
 
 /**
- * Decreases the policy's usage reference count, and stops the policy in case it
- * was already stopping and have no more outstanding usage references (which
- * indicates it has no more queued or started requests, and can be safely
- * stopped).
+ * Decreases the policy's usage reference count.
  */
 static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
+__must_hold(&policy->pol_nrs->nrs_lock)
 {
        LASSERT(policy->pol_ref > 0);
 
        policy->pol_ref--;
-       if (unlikely(policy->pol_ref == 0 &&
-           policy->pol_state == NRS_POL_STATE_STOPPING))
-               nrs_policy_stop0(policy);
-}
-
-static void nrs_policy_put(struct ptlrpc_nrs_policy *policy)
-{
-       spin_lock(&policy->pol_nrs->nrs_lock);
-       nrs_policy_put_locked(policy);
-       spin_unlock(&policy->pol_nrs->nrs_lock);
 }
 
 /**
@@ -352,7 +385,7 @@ static struct ptlrpc_nrs_policy * nrs_policy_find_locked(struct ptlrpc_nrs *nrs,
 {
        struct ptlrpc_nrs_policy *tmp;
 
-       cfs_list_for_each_entry(tmp, &nrs->nrs_policy_list, pol_list) {
+       list_for_each_entry(tmp, &nrs->nrs_policy_list, pol_list) {
                if (strncmp(tmp->pol_desc->pd_name, name,
                            NRS_POL_NAME_MAX) == 0) {
                        nrs_policy_get_locked(tmp);
@@ -464,11 +497,11 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
        spin_lock(&nrs->nrs_lock);
 
        fallback = nrs->nrs_policy_fallback;
-       nrs_policy_get_locked(fallback);
+       nrs_policy_started_get(fallback);
 
        primary = nrs->nrs_policy_primary;
        if (primary != NULL)
-               nrs_policy_get_locked(primary);
+               nrs_policy_started_get(primary);
 
        spin_unlock(&nrs->nrs_lock);
 
@@ -488,7 +521,7 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
                 * request.
                 */
                if (resp[NRS_RES_PRIMARY] == NULL)
-                       nrs_policy_put(primary);
+                       nrs_policy_started_put(primary);
        }
 }
 
@@ -505,8 +538,7 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
 static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
 {
        struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
-       struct ptlrpc_nrs        *nrs = NULL;
-       int                       i;
+       int i;
 
        for (i = 0; i < NRS_RES_MAX; i++) {
                if (resp[i] != NULL) {
@@ -522,15 +554,8 @@ static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
                if (pols[i] == NULL)
                        continue;
 
-               if (nrs == NULL) {
-                       nrs = pols[i]->pol_nrs;
-                       spin_lock(&nrs->nrs_lock);
-               }
-               nrs_policy_put_locked(pols[i]);
+               nrs_policy_started_put(pols[i]);
        }
-
-       if (nrs != NULL)
-               spin_unlock(&nrs->nrs_lock);
 }
 
 /**
@@ -557,6 +582,10 @@ struct ptlrpc_nrs_request * nrs_request_get(struct ptlrpc_nrs_policy *policy,
 
        LASSERT(policy->pol_req_queued > 0);
 
+       /* for a non-started policy, use force mode to drain requests */
+       if (unlikely(policy->pol_state != NRS_POL_STATE_STARTED))
+               force = true;
+
        nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force);
 
        LASSERT(ergo(nrq != NULL, nrs_request_policy(nrq) == policy));
@@ -595,6 +624,11 @@ static inline void nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
                if (rc == 0) {
                        policy->pol_nrs->nrs_req_queued++;
                        policy->pol_req_queued++;
+                       /**
+                        * Take an extra ref to avoid stopping policy with
+                        * pending request in it
+                        */
+                       nrs_policy_started_get(policy);
                        return;
                }
        }
@@ -659,6 +693,10 @@ static int nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name,
        if (policy == NULL)
                GOTO(out, rc = -ENOENT);
 
+       if (policy->pol_state != NRS_POL_STATE_STARTED &&
+           policy->pol_state != NRS_POL_STATE_STOPPED)
+               GOTO(out, rc = -EAGAIN);
+
        switch (opc) {
                /**
                 * Unknown opcode, pass it down to the policy-specific control
@@ -672,7 +710,7 @@ static int nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name,
                 * Start \e policy
                 */
        case PTLRPC_NRS_CTL_START:
-               rc = nrs_policy_start_locked(policy);
+               rc = nrs_policy_start_locked(policy, arg);
                break;
        }
 out:
@@ -697,48 +735,57 @@ out:
 static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
 {
        struct ptlrpc_nrs_policy *policy = NULL;
+       struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
+       char *srv_name = svcpt->scp_service->srv_name;
+       int rc = 0;
        ENTRY;
 
        spin_lock(&nrs->nrs_lock);
 
        policy = nrs_policy_find_locked(nrs, name);
        if (policy == NULL) {
-               spin_unlock(&nrs->nrs_lock);
-
-               CERROR("Can't find NRS policy %s\n", name);
-               RETURN(-ENOENT);
+               rc = -ENOENT;
+               CERROR("%s.%d NRS: cannot find policy '%s': rc = %d\n",
+                      srv_name, svcpt->scp_cpt, name, rc);
+               GOTO(out_unlock, rc);
        }
 
        if (policy->pol_ref > 1) {
-               CERROR("Policy %s is busy with %d references\n", name,
-                      (int)policy->pol_ref);
-               nrs_policy_put_locked(policy);
-
-               spin_unlock(&nrs->nrs_lock);
-               RETURN(-EBUSY);
+               rc = -EBUSY;
+               CERROR("%s.%d NRS: policy '%s' is busy with %ld references: rc = %d\n",
+                       srv_name, svcpt->scp_cpt, name, policy->pol_ref, rc);
+               GOTO(out_put, rc);
        }
 
        LASSERT(policy->pol_req_queued == 0);
        LASSERT(policy->pol_req_started == 0);
 
        if (policy->pol_state != NRS_POL_STATE_STOPPED) {
-               nrs_policy_stop_locked(policy);
-               LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED);
+               rc = nrs_policy_stop_locked(policy);
+               if (rc) {
+                       CERROR("%s.%d NRS: failed to stop policy '%s' with refcount %d: rc = %d\n",
+                              srv_name, svcpt->scp_cpt, name,
+                              refcount_read(&policy->pol_start_ref), rc);
+                       GOTO(out_put, rc);
+               }
        }
 
-       cfs_list_del(&policy->pol_list);
+       LASSERT(policy->pol_private == NULL);
+       list_del(&policy->pol_list);
        nrs->nrs_num_pols--;
 
+       EXIT;
+out_put:
        nrs_policy_put_locked(policy);
-
+out_unlock:
        spin_unlock(&nrs->nrs_lock);
 
-       nrs_policy_fini(policy);
-
-       LASSERT(policy->pol_private == NULL);
-       OBD_FREE_PTR(policy);
+       if (rc == 0) {
+               nrs_policy_fini(policy);
+               OBD_FREE_PTR(policy);
+       }
 
-       RETURN(0);
+       return rc;
 }
 
 /**
@@ -754,10 +801,11 @@ static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
 static int nrs_policy_register(struct ptlrpc_nrs *nrs,
                               struct ptlrpc_nrs_pol_desc *desc)
 {
-       struct ptlrpc_nrs_policy       *policy;
-       struct ptlrpc_nrs_policy       *tmp;
-       struct ptlrpc_service_part     *svcpt = nrs->nrs_svcpt;
-       int                             rc;
+       struct ptlrpc_nrs_policy *policy;
+       struct ptlrpc_nrs_policy *tmp;
+       struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
+       char *srv_name = svcpt->scp_service->srv_name;
+       int rc;
        ENTRY;
 
        LASSERT(svcpt != NULL);
@@ -769,7 +817,7 @@ static int nrs_policy_register(struct ptlrpc_nrs *nrs,
        LASSERT(desc->pd_compat != NULL);
 
        OBD_CPT_ALLOC_GFP(policy, svcpt->scp_service->srv_cptable,
-                         svcpt->scp_cpt, sizeof(*policy), CFS_ALLOC_IO);
+                         svcpt->scp_cpt, sizeof(*policy), GFP_NOFS);
        if (policy == NULL)
                RETURN(-ENOMEM);
 
@@ -778,8 +826,10 @@ static int nrs_policy_register(struct ptlrpc_nrs *nrs,
        policy->pol_state   = NRS_POL_STATE_STOPPED;
        policy->pol_flags   = desc->pd_flags;
 
-       CFS_INIT_LIST_HEAD(&policy->pol_list);
-       CFS_INIT_LIST_HEAD(&policy->pol_list_queued);
+       INIT_LIST_HEAD(&policy->pol_list);
+       INIT_LIST_HEAD(&policy->pol_list_queued);
+
+       init_waitqueue_head(&policy->pol_wq);
 
        rc = nrs_policy_init(policy);
        if (rc != 0) {
@@ -791,23 +841,24 @@ static int nrs_policy_register(struct ptlrpc_nrs *nrs,
 
        tmp = nrs_policy_find_locked(nrs, policy->pol_desc->pd_name);
        if (tmp != NULL) {
-               CERROR("NRS policy %s has been registered, can't register it "
-                      "for %s\n", policy->pol_desc->pd_name,
-                      svcpt->scp_service->srv_name);
+               rc = -EEXIST;
+               CERROR("%s.%d NRS: policy %s has been registered, can't register it: rc = %d\n",
+                      srv_name, svcpt->scp_cpt, policy->pol_desc->pd_name,
+                      rc);
                nrs_policy_put_locked(tmp);
 
                spin_unlock(&nrs->nrs_lock);
                nrs_policy_fini(policy);
                OBD_FREE_PTR(policy);
 
-               RETURN(-EEXIST);
+               RETURN(rc);
        }
 
-       cfs_list_add_tail(&policy->pol_list, &nrs->nrs_policy_list);
+       list_add_tail(&policy->pol_list, &nrs->nrs_policy_list);
        nrs->nrs_num_pols++;
 
        if (policy->pol_flags & PTLRPC_NRS_FL_REG_START)
-               rc = nrs_policy_start_locked(policy);
+               rc = nrs_policy_start_locked(policy, NULL);
 
        spin_unlock(&nrs->nrs_lock);
 
@@ -838,8 +889,8 @@ static void ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req)
         * Add the policy to the NRS head's list of policies with enqueued
         * requests, if it has not been added there.
         */
-       if (unlikely(cfs_list_empty(&policy->pol_list_queued)))
-               cfs_list_add_tail(&policy->pol_list_queued,
+       if (unlikely(list_empty(&policy->pol_list_queued)))
+               list_add_tail(&policy->pol_list_queued,
                                  &policy->pol_nrs->nrs_policy_queued);
 }
 
@@ -902,14 +953,13 @@ static int nrs_register_policies_locked(struct ptlrpc_nrs *nrs)
 
        LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
 
-       cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
+       list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
                if (nrs_policy_compatible(svc, desc)) {
                        rc = nrs_policy_register(nrs, desc);
                        if (rc != 0) {
-                               CERROR("Failed to register NRS policy %s for "
-                                      "partition %d of service %s: %d\n",
-                                      desc->pd_name, svcpt->scp_cpt,
-                                      svc->srv_name, rc);
+                               CERROR("%s.%d NRS: Failed to register policy %s: rc = %d\n",
+                                      svc->srv_name, svcpt->scp_cpt,
+                                      desc->pd_name, rc);
                                /**
                                 * Fail registration if any of the policies'
                                 * registration fails.
@@ -952,8 +1002,9 @@ static int nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs,
        nrs->nrs_svcpt = svcpt;
        nrs->nrs_queue_type = queue;
        spin_lock_init(&nrs->nrs_lock);
-       CFS_INIT_LIST_HEAD(&nrs->nrs_policy_list);
-       CFS_INIT_LIST_HEAD(&nrs->nrs_policy_queued);
+       INIT_LIST_HEAD(&nrs->nrs_policy_list);
+       INIT_LIST_HEAD(&nrs->nrs_policy_queued);
+       nrs->nrs_throttling = 0;
 
        rc = nrs_register_policies_locked(nrs);
 
@@ -1024,10 +1075,16 @@ static void nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt)
        LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
 
 again:
-       nrs = nrs_svcpt2nrs(svcpt, hp);
+       /* scp_nrs_hp could be NULL due to short of memory. */
+       nrs = hp ? svcpt->scp_nrs_hp : &svcpt->scp_nrs_reg;
+       /* check the nrs_svcpt to see if nrs is initialized. */
+       if (!nrs || !nrs->nrs_svcpt) {
+               EXIT;
+               return;
+       }
        nrs->nrs_stopping = 1;
 
-       cfs_list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list,
+       list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list,
                                     pol_list) {
                rc = nrs_policy_unregister(nrs, policy->pol_desc->pd_name);
                LASSERT(rc == 0);
@@ -1060,7 +1117,7 @@ static struct ptlrpc_nrs_pol_desc *nrs_policy_find_desc_locked(const char *name)
        struct ptlrpc_nrs_pol_desc     *tmp;
        ENTRY;
 
-       cfs_list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) {
+       list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) {
                if (strncmp(tmp->pd_name, name, NRS_POL_NAME_MAX) == 0)
                        RETURN(tmp);
        }
@@ -1091,13 +1148,14 @@ static int nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc)
        LASSERT(mutex_is_locked(&nrs_core.nrs_mutex));
        LASSERT(mutex_is_locked(&ptlrpc_all_services_mutex));
 
-       cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
+       list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
 
                if (!nrs_policy_compatible(svc, desc) ||
                    unlikely(svc->srv_is_stopping))
                        continue;
 
                ptlrpc_service_for_each_part(svcpt, i, svc) {
+                       char *srv_name = svcpt->scp_service->srv_name;
                        bool hp = false;
 
 again:
@@ -1110,10 +1168,9 @@ again:
                        if (rc == -ENOENT) {
                                rc = 0;
                        } else if (rc != 0) {
-                               CERROR("Failed to unregister NRS policy %s for "
-                                      "partition %d of service %s: %d\n",
-                                      desc->pd_name, svcpt->scp_cpt,
-                                      svcpt->scp_service->srv_name, rc);
+                               CERROR("%s.%d NRS: Failed to unregister policy %s: rc = %d\n",
+                                      srv_name, svcpt->scp_cpt, desc->pd_name,
+                                      rc);
                                RETURN(rc);
                        }
 
@@ -1145,7 +1202,7 @@ again:
  * \retval -ve error
  * \retval   0 success
  */
-int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
+static int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
 {
         struct ptlrpc_service         *svc;
        struct ptlrpc_nrs_pol_desc     *desc;
@@ -1174,34 +1231,40 @@ int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
        if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) &&
            (conf->nc_flags & (PTLRPC_NRS_FL_FALLBACK |
                               PTLRPC_NRS_FL_REG_START))) {
+               rc = -EINVAL;
                CERROR("NRS: failing to register policy %s. Please check "
                       "policy flags; external policies cannot act as fallback "
                       "policies, or be started immediately upon registration "
-                      "without interaction with lprocfs\n", conf->nc_name);
-               RETURN(-EINVAL);
+                      "without interaction with lprocfs: rc = %d\n",
+                      conf->nc_name, rc);
+               RETURN(rc);
        }
 
        mutex_lock(&nrs_core.nrs_mutex);
 
        if (nrs_policy_find_desc_locked(conf->nc_name) != NULL) {
-               CERROR("NRS: failing to register policy %s which has already "
-                      "been registered with NRS core!\n",
-                      conf->nc_name);
-               GOTO(fail, rc = -EEXIST);
+               rc = -EEXIST;
+               CERROR("NRS: failing to register policy %s which has already been registered with NRS core: rc = %d\n",
+                      conf->nc_name, rc);
+               GOTO(fail, rc);
        }
 
        OBD_ALLOC_PTR(desc);
        if (desc == NULL)
                GOTO(fail, rc = -ENOMEM);
 
-       strncpy(desc->pd_name, conf->nc_name, NRS_POL_NAME_MAX);
+       if (strscpy(desc->pd_name, conf->nc_name, sizeof(desc->pd_name)) >=
+           sizeof(desc->pd_name)) {
+               OBD_FREE_PTR(desc);
+               GOTO(fail, rc = -E2BIG);
+       }
        desc->pd_ops             = conf->nc_ops;
        desc->pd_compat          = conf->nc_compat;
        desc->pd_compat_svc_name = conf->nc_compat_svc_name;
        if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0)
                desc->pd_owner   = conf->nc_owner;
        desc->pd_flags           = conf->nc_flags;
-       cfs_atomic_set(&desc->pd_refs, 0);
+       atomic_set(&desc->pd_refs, 0);
 
        /**
         * For policies that are held in the same module as NRS (currently
@@ -1219,7 +1282,7 @@ int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
         */
        mutex_lock(&ptlrpc_all_services_mutex);
 
-       cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
+       list_for_each_entry(svc, &ptlrpc_all_services, srv_list) {
                struct ptlrpc_service_part     *svcpt;
                int                             i;
                int                             rc2;
@@ -1229,16 +1292,16 @@ int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
                        continue;
 
                ptlrpc_service_for_each_part(svcpt, i, svc) {
-                       struct ptlrpc_nrs      *nrs;
-                       bool                    hp = false;
+                       struct ptlrpc_nrs *nrs;
+                       char *srv_name = svcpt->scp_service->srv_name;
+                       bool hp = false;
 again:
                        nrs = nrs_svcpt2nrs(svcpt, hp);
                        rc = nrs_policy_register(nrs, desc);
                        if (rc != 0) {
-                               CERROR("Failed to register NRS policy %s for "
-                                      "partition %d of service %s: %d\n",
-                                      desc->pd_name, svcpt->scp_cpt,
-                                      svcpt->scp_service->srv_name, rc);
+                               CERROR("%s.%d NRS: Failed to register policy %s: rc = %d\n",
+                                      srv_name, svcpt->scp_cpt,
+                                      desc->pd_name, rc);
 
                                rc2 = nrs_policy_unregister_locked(desc);
                                /**
@@ -1277,80 +1340,12 @@ again:
 
        mutex_unlock(&ptlrpc_all_services_mutex);
 internal:
-       cfs_list_add_tail(&desc->pd_list, &nrs_core.nrs_policies);
+       list_add_tail(&desc->pd_list, &nrs_core.nrs_policies);
 fail:
        mutex_unlock(&nrs_core.nrs_mutex);
 
        RETURN(rc);
 }
-EXPORT_SYMBOL(ptlrpc_nrs_policy_register);
-
-/**
- * Unregisters a previously registered policy with NRS core. All instances of
- * the policy on all NRS heads of all supported services are removed.
- *
- * N.B. This function should only be called from a module's exit() function.
- *     Although it can be used for policies that ship alongside NRS core, the
- *     function is primarily intended for policies that register externally,
- *     from other modules.
- *
- * \param[in] conf configuration information for the policy to unregister
- *
- * \retval -ve error
- * \retval   0 success
- */
-int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_conf *conf)
-{
-       struct ptlrpc_nrs_pol_desc      *desc;
-       int                              rc;
-       ENTRY;
-
-       LASSERT(conf != NULL);
-
-       if (conf->nc_flags & PTLRPC_NRS_FL_FALLBACK) {
-               CERROR("Unable to unregister a fallback policy, unless the "
-                      "PTLRPC service is stopping.\n");
-               RETURN(-EPERM);
-       }
-
-       conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
-
-       mutex_lock(&nrs_core.nrs_mutex);
-
-       desc = nrs_policy_find_desc_locked(conf->nc_name);
-       if (desc == NULL) {
-               CERROR("Failing to unregister NRS policy %s which has "
-                      "not been registered with NRS core!\n",
-                      conf->nc_name);
-               GOTO(not_exist, rc = -ENOENT);
-       }
-
-       mutex_lock(&ptlrpc_all_services_mutex);
-
-       rc = nrs_policy_unregister_locked(desc);
-       if (rc < 0) {
-               if (rc == -EBUSY)
-                       CERROR("Please first stop policy %s on all service "
-                              "partitions and then retry to unregister the "
-                              "policy.\n", conf->nc_name);
-               GOTO(fail, rc);
-       }
-
-       CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n",
-              conf->nc_name);
-
-       cfs_list_del(&desc->pd_list);
-       OBD_FREE_PTR(desc);
-
-fail:
-       mutex_unlock(&ptlrpc_all_services_mutex);
-
-not_exist:
-       mutex_unlock(&nrs_core.nrs_mutex);
-
-       RETURN(rc);
-}
-EXPORT_SYMBOL(ptlrpc_nrs_policy_unregister);
 
 /**
  * Setup NRS heads on all service partitions of service \a svc, and register
@@ -1388,7 +1383,7 @@ int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc)
         * Set up lprocfs interfaces for all supported policies for the
         * service.
         */
-       cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
+       list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
                if (!nrs_policy_compatible(svc, desc))
                        continue;
 
@@ -1429,7 +1424,7 @@ void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc)
         * Clean up lprocfs interfaces for all supported policies for the
         * service.
         */
-       cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
+       list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) {
                if (!nrs_policy_compatible(svc, desc))
                        continue;
 
@@ -1526,7 +1521,7 @@ static void nrs_request_removed(struct ptlrpc_nrs_policy *policy)
         * ptlrpc_nrs::nrs_policy_queued.
         */
        if (unlikely(policy->pol_req_queued == 0)) {
-               cfs_list_del_init(&policy->pol_list_queued);
+               list_del_init(&policy->pol_list_queued);
 
                /**
                 * If there are other policies with queued requests, move the
@@ -1537,9 +1532,12 @@ static void nrs_request_removed(struct ptlrpc_nrs_policy *policy)
                LASSERT(policy->pol_req_queued <
                        policy->pol_nrs->nrs_req_queued);
 
-               cfs_list_move_tail(&policy->pol_list_queued,
+               list_move_tail(&policy->pol_list_queued,
                                   &policy->pol_nrs->nrs_policy_queued);
        }
+
+       /* remove the extra ref for policy pending requests */
+       nrs_policy_started_put(policy);
 }
 
 /**
@@ -1570,7 +1568,7 @@ ptlrpc_nrs_req_get_nolock0(struct ptlrpc_service_part *svcpt, bool hp,
         * Always try to drain requests from all NRS polices even if they are
         * inactive, because the user can change policy status at runtime.
         */
-       cfs_list_for_each_entry(policy, &nrs->nrs_policy_queued,
+       list_for_each_entry(policy, &nrs->nrs_policy_queued,
                                pol_list_queued) {
                nrq = nrs_request_get(policy, peek, force);
                if (nrq != NULL) {
@@ -1627,6 +1625,24 @@ bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
 };
 
 /**
+ * Returns whether NRS policy is throttling reqeust
+ *
+ * \param[in] svcpt the service partition to enquire.
+ * \param[in] hp    whether the regular or high-priority NRS head is to be
+ *                 enquired.
+ *
+ * \retval false the indicated NRS head has no enqueued requests.
+ * \retval true         the indicated NRS head has some enqueued requests.
+ */
+bool ptlrpc_nrs_req_throttling_nolock(struct ptlrpc_service_part *svcpt,
+                                     bool hp)
+{
+       struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp);
+
+       return !!nrs->nrs_throttling;
+};
+
+/**
  * Moves request \a req from the regular to the high-priority NRS head.
  *
  * \param[in] req the request to move
@@ -1738,17 +1754,6 @@ out:
        RETURN(rc);
 }
 
-
-/* ptlrpc/nrs_fifo.c */
-extern struct ptlrpc_nrs_pol_conf nrs_conf_fifo;
-#if defined HAVE_SERVER_SUPPORT && defined(__KERNEL__)
-/* ptlrpc/nrs_crr.c */
-extern struct ptlrpc_nrs_pol_conf nrs_conf_crrn;
-/* ptlrpc/nrs_orr.c */
-extern struct ptlrpc_nrs_pol_conf nrs_conf_orr;
-extern struct ptlrpc_nrs_pol_conf nrs_conf_trr;
-#endif
-
 /**
  * Adds all policies that ship with the ptlrpc module, to NRS core's list of
  * policies \e nrs_core.nrs_policies.
@@ -1762,13 +1767,13 @@ int ptlrpc_nrs_init(void)
        ENTRY;
 
        mutex_init(&nrs_core.nrs_mutex);
-       CFS_INIT_LIST_HEAD(&nrs_core.nrs_policies);
+       INIT_LIST_HEAD(&nrs_core.nrs_policies);
 
        rc = ptlrpc_nrs_policy_register(&nrs_conf_fifo);
        if (rc != 0)
                GOTO(fail, rc);
 
-#if defined HAVE_SERVER_SUPPORT && defined(__KERNEL__)
+#ifdef HAVE_SERVER_SUPPORT
        rc = ptlrpc_nrs_policy_register(&nrs_conf_crrn);
        if (rc != 0)
                GOTO(fail, rc);
@@ -1780,7 +1785,14 @@ int ptlrpc_nrs_init(void)
        rc = ptlrpc_nrs_policy_register(&nrs_conf_trr);
        if (rc != 0)
                GOTO(fail, rc);
-#endif
+       rc = ptlrpc_nrs_policy_register(&nrs_conf_tbf);
+       if (rc != 0)
+               GOTO(fail, rc);
+#endif /* HAVE_SERVER_SUPPORT */
+
+       rc = ptlrpc_nrs_policy_register(&nrs_conf_delay);
+       if (rc != 0)
+               GOTO(fail, rc);
 
        RETURN(rc);
 fail:
@@ -1794,7 +1806,7 @@ fail:
 }
 
 /**
- * Removes all policy desciptors from nrs_core::nrs_policies, and frees the
+ * Removes all policy descriptors from nrs_core::nrs_policies, and frees the
  * policy descriptors.
  *
  * Since all PTLRPC services are stopped at this point, there are no more
@@ -1807,11 +1819,9 @@ void ptlrpc_nrs_fini(void)
        struct ptlrpc_nrs_pol_desc *desc;
        struct ptlrpc_nrs_pol_desc *tmp;
 
-       cfs_list_for_each_entry_safe(desc, tmp, &nrs_core.nrs_policies,
+       list_for_each_entry_safe(desc, tmp, &nrs_core.nrs_policies,
                                     pd_list) {
-               cfs_list_del_init(&desc->pd_list);
+               list_del_init(&desc->pd_list);
                OBD_FREE_PTR(desc);
        }
 }
-
-/** @} nrs */