X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fnrs.c;h=52d3225deba6bb7ed1c7193cd87539cc3931b846;hp=5343ab3a6ae22e8eee71f95902a0f412d8af40c8;hb=HEAD;hpb=c85f006d40cc5c9504ea873fc815ce2eaa1ee062 diff --git a/lustre/ptlrpc/nrs.c b/lustre/ptlrpc/nrs.c index 5343ab3..3f5b59d 100644 --- a/lustre/ptlrpc/nrs.c +++ b/lustre/ptlrpc/nrs.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2011 Intel Corporation + * Copyright (c) 2014, 2016, Intel Corporation. * * Copyright 2012 Xyratex Technology Limited */ @@ -40,9 +40,6 @@ */ #define DEBUG_SUBSYSTEM S_RPC -#ifndef __KERNEL__ -#include -#endif #include #include #include @@ -50,14 +47,6 @@ #include #include "ptlrpc_internal.h" -/* XXX: This is just for liblustre. Remove the #if defined directive when the - * "cfs_" prefix is dropped from cfs_list_head. */ -#if defined (__linux__) && defined(__KERNEL__) -extern struct list_head ptlrpc_all_services; -#else -extern struct cfs_list_head ptlrpc_all_services; -#endif - /** * NRS core object. */ @@ -72,6 +61,7 @@ static int nrs_policy_init(struct ptlrpc_nrs_policy *policy) static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy) { LASSERT(policy->pol_ref == 0); + LASSERT(refcount_read(&policy->pol_start_ref) == 0); LASSERT(policy->pol_req_queued == 0); if (policy->pol_desc->pd_ops->op_policy_fini != NULL) @@ -97,31 +87,47 @@ static int nrs_policy_ctl_locked(struct ptlrpc_nrs_policy *policy, static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy) { - struct ptlrpc_nrs *nrs = policy->pol_nrs; ENTRY; - if (policy->pol_desc->pd_ops->op_policy_stop != NULL) { - spin_unlock(&nrs->nrs_lock); - + if (policy->pol_desc->pd_ops->op_policy_stop != NULL) policy->pol_desc->pd_ops->op_policy_stop(policy); - spin_lock(&nrs->nrs_lock); - } - - LASSERT(cfs_list_empty(&policy->pol_list_queued)); + LASSERT(list_empty(&policy->pol_list_queued)); LASSERT(policy->pol_req_queued == 0 && policy->pol_req_started == 0); policy->pol_private = NULL; + policy->pol_arg[0] = '\0'; policy->pol_state = NRS_POL_STATE_STOPPED; + wake_up(&policy->pol_wq); - if (cfs_atomic_dec_and_test(&policy->pol_desc->pd_refs)) - cfs_module_put(policy->pol_desc->pd_owner); + if (atomic_dec_and_test(&policy->pol_desc->pd_refs)) + module_put(policy->pol_desc->pd_owner); EXIT; } +/** + * Increases the policy's usage started reference count. + */ +static inline void nrs_policy_started_get(struct ptlrpc_nrs_policy *policy) +{ + refcount_inc(&policy->pol_start_ref); +} + +/** + * Decreases the policy's usage started reference count, and stops the policy + * in case it was already stopping and have no more outstanding usage + * references (which indicates it has no more queued or started requests, and + * can be safely stopped). + */ +static void nrs_policy_started_put(struct ptlrpc_nrs_policy *policy) +{ + if (refcount_dec_and_test(&policy->pol_start_ref)) + nrs_policy_stop0(policy); +} + static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy) { struct ptlrpc_nrs *nrs = policy->pol_nrs; @@ -148,9 +154,18 @@ static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy) nrs->nrs_policy_fallback = NULL; } - /* I have the only refcount */ - if (policy->pol_ref == 1) - nrs_policy_stop0(policy); + /* Drop started ref and wait for requests to be drained */ + spin_unlock(&nrs->nrs_lock); + nrs_policy_started_put(policy); + + wait_event_timeout(policy->pol_wq, + policy->pol_state == NRS_POL_STATE_STOPPED, + cfs_time_seconds(30)); + + spin_lock(&nrs->nrs_lock); + + if (policy->pol_state != NRS_POL_STATE_STOPPED) + RETURN(-EBUSY); RETURN(0); } @@ -182,8 +197,10 @@ static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs) LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED); tmp->pol_state = NRS_POL_STATE_STOPPING; - if (tmp->pol_ref == 0) - nrs_policy_stop0(tmp); + /* Drop started ref to free the policy */ + spin_unlock(&nrs->nrs_lock); + nrs_policy_started_put(tmp); + spin_lock(&nrs->nrs_lock); EXIT; } @@ -206,10 +223,12 @@ static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs) * references on the policy to ptlrpc_nrs_pol_stae::NRS_POL_STATE_STOPPED. In * this case, the fallback policy is only left active in the NRS head. */ -static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy) +static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg) { - struct ptlrpc_nrs *nrs = policy->pol_nrs; - int rc = 0; + struct ptlrpc_nrs *nrs = policy->pol_nrs; + struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt; + char *srv_name = svcpt->scp_service->srv_name; + int rc = 0; ENTRY; /** @@ -224,6 +243,13 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy) if (policy->pol_state == NRS_POL_STATE_STOPPING) RETURN(-EAGAIN); + if (arg && strlen(arg) >= sizeof(policy->pol_arg)) { + rc = -EINVAL; + CWARN("%s.%d NRS: arg '%s' is too long: rc = %d\n", + srv_name, svcpt->scp_cpt, arg, rc); + return rc; + } + if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) { /** * This is for cases in which the user sets the policy to the @@ -250,20 +276,33 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy) if (nrs->nrs_policy_fallback == NULL) RETURN(-EPERM); - if (policy->pol_state == NRS_POL_STATE_STARTED) - RETURN(0); + if (policy->pol_state == NRS_POL_STATE_STARTED) { + /** + * If the policy argument now is different from the last time, + * stop the policy first and start it again with the new + * argument. + */ + if ((arg == NULL && strlen(policy->pol_arg) == 0) || + (arg != NULL && strcmp(policy->pol_arg, arg) == 0)) + RETURN(0); + + rc = nrs_policy_stop_locked(policy); + if (rc) + RETURN(rc); + } } /** * Increase the module usage count for policies registering from other * modules. */ - if (cfs_atomic_inc_return(&policy->pol_desc->pd_refs) == 1 && - !cfs_try_module_get(policy->pol_desc->pd_owner)) { - cfs_atomic_dec(&policy->pol_desc->pd_refs); - CERROR("NRS: cannot get module for policy %s; is it alive?\n", - policy->pol_desc->pd_name); - RETURN(-ENODEV); + if (atomic_inc_return(&policy->pol_desc->pd_refs) == 1 && + !try_module_get(policy->pol_desc->pd_owner)) { + atomic_dec(&policy->pol_desc->pd_refs); + rc = -ENODEV; + CERROR("%s.%d NRS: cannot get module for policy %s (is it alive?): rc = %d\n", + srv_name, svcpt->scp_cpt, policy->pol_desc->pd_name, rc); + RETURN(rc); } /** @@ -276,18 +315,23 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy) if (policy->pol_desc->pd_ops->op_policy_start) { spin_unlock(&nrs->nrs_lock); - rc = policy->pol_desc->pd_ops->op_policy_start(policy); + rc = policy->pol_desc->pd_ops->op_policy_start(policy, arg); spin_lock(&nrs->nrs_lock); if (rc != 0) { - if (cfs_atomic_dec_and_test(&policy->pol_desc->pd_refs)) - cfs_module_put(policy->pol_desc->pd_owner); + if (atomic_dec_and_test(&policy->pol_desc->pd_refs)) + module_put(policy->pol_desc->pd_owner); policy->pol_state = NRS_POL_STATE_STOPPED; GOTO(out, rc); } } + if (arg) + strscpy(policy->pol_arg, arg, sizeof(policy->pol_arg)); + + /* take the started reference */ + refcount_set(&policy->pol_start_ref, 1); policy->pol_state = NRS_POL_STATE_STARTED; if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) { @@ -314,34 +358,23 @@ out: } /** - * Increases the policy's usage reference count. + * Increases the policy's usage reference count (caller count). */ static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy) +__must_hold(&policy->pol_nrs->nrs_lock) { policy->pol_ref++; } /** - * Decreases the policy's usage reference count, and stops the policy in case it - * was already stopping and have no more outstanding usage references (which - * indicates it has no more queued or started requests, and can be safely - * stopped). + * Decreases the policy's usage reference count. */ static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy) +__must_hold(&policy->pol_nrs->nrs_lock) { LASSERT(policy->pol_ref > 0); policy->pol_ref--; - if (unlikely(policy->pol_ref == 0 && - policy->pol_state == NRS_POL_STATE_STOPPING)) - nrs_policy_stop0(policy); -} - -static void nrs_policy_put(struct ptlrpc_nrs_policy *policy) -{ - spin_lock(&policy->pol_nrs->nrs_lock); - nrs_policy_put_locked(policy); - spin_unlock(&policy->pol_nrs->nrs_lock); } /** @@ -352,7 +385,7 @@ static struct ptlrpc_nrs_policy * nrs_policy_find_locked(struct ptlrpc_nrs *nrs, { struct ptlrpc_nrs_policy *tmp; - cfs_list_for_each_entry(tmp, &nrs->nrs_policy_list, pol_list) { + list_for_each_entry(tmp, &nrs->nrs_policy_list, pol_list) { if (strncmp(tmp->pol_desc->pd_name, name, NRS_POL_NAME_MAX) == 0) { nrs_policy_get_locked(tmp); @@ -464,11 +497,11 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs, spin_lock(&nrs->nrs_lock); fallback = nrs->nrs_policy_fallback; - nrs_policy_get_locked(fallback); + nrs_policy_started_get(fallback); primary = nrs->nrs_policy_primary; if (primary != NULL) - nrs_policy_get_locked(primary); + nrs_policy_started_get(primary); spin_unlock(&nrs->nrs_lock); @@ -488,7 +521,7 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs, * request. */ if (resp[NRS_RES_PRIMARY] == NULL) - nrs_policy_put(primary); + nrs_policy_started_put(primary); } } @@ -505,8 +538,7 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs, static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp) { struct ptlrpc_nrs_policy *pols[NRS_RES_MAX]; - struct ptlrpc_nrs *nrs = NULL; - int i; + int i; for (i = 0; i < NRS_RES_MAX; i++) { if (resp[i] != NULL) { @@ -522,15 +554,8 @@ static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp) if (pols[i] == NULL) continue; - if (nrs == NULL) { - nrs = pols[i]->pol_nrs; - spin_lock(&nrs->nrs_lock); - } - nrs_policy_put_locked(pols[i]); + nrs_policy_started_put(pols[i]); } - - if (nrs != NULL) - spin_unlock(&nrs->nrs_lock); } /** @@ -557,6 +582,10 @@ struct ptlrpc_nrs_request * nrs_request_get(struct ptlrpc_nrs_policy *policy, LASSERT(policy->pol_req_queued > 0); + /* for a non-started policy, use force mode to drain requests */ + if (unlikely(policy->pol_state != NRS_POL_STATE_STARTED)) + force = true; + nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force); LASSERT(ergo(nrq != NULL, nrs_request_policy(nrq) == policy)); @@ -595,6 +624,11 @@ static inline void nrs_request_enqueue(struct ptlrpc_nrs_request *nrq) if (rc == 0) { policy->pol_nrs->nrs_req_queued++; policy->pol_req_queued++; + /** + * Take an extra ref to avoid stopping policy with + * pending request in it + */ + nrs_policy_started_get(policy); return; } } @@ -659,6 +693,10 @@ static int nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name, if (policy == NULL) GOTO(out, rc = -ENOENT); + if (policy->pol_state != NRS_POL_STATE_STARTED && + policy->pol_state != NRS_POL_STATE_STOPPED) + GOTO(out, rc = -EAGAIN); + switch (opc) { /** * Unknown opcode, pass it down to the policy-specific control @@ -672,7 +710,7 @@ static int nrs_policy_ctl(struct ptlrpc_nrs *nrs, char *name, * Start \e policy */ case PTLRPC_NRS_CTL_START: - rc = nrs_policy_start_locked(policy); + rc = nrs_policy_start_locked(policy, arg); break; } out: @@ -697,48 +735,57 @@ out: static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name) { struct ptlrpc_nrs_policy *policy = NULL; + struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt; + char *srv_name = svcpt->scp_service->srv_name; + int rc = 0; ENTRY; spin_lock(&nrs->nrs_lock); policy = nrs_policy_find_locked(nrs, name); if (policy == NULL) { - spin_unlock(&nrs->nrs_lock); - - CERROR("Can't find NRS policy %s\n", name); - RETURN(-ENOENT); + rc = -ENOENT; + CERROR("%s.%d NRS: cannot find policy '%s': rc = %d\n", + srv_name, svcpt->scp_cpt, name, rc); + GOTO(out_unlock, rc); } if (policy->pol_ref > 1) { - CERROR("Policy %s is busy with %d references\n", name, - (int)policy->pol_ref); - nrs_policy_put_locked(policy); - - spin_unlock(&nrs->nrs_lock); - RETURN(-EBUSY); + rc = -EBUSY; + CERROR("%s.%d NRS: policy '%s' is busy with %ld references: rc = %d\n", + srv_name, svcpt->scp_cpt, name, policy->pol_ref, rc); + GOTO(out_put, rc); } LASSERT(policy->pol_req_queued == 0); LASSERT(policy->pol_req_started == 0); if (policy->pol_state != NRS_POL_STATE_STOPPED) { - nrs_policy_stop_locked(policy); - LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED); + rc = nrs_policy_stop_locked(policy); + if (rc) { + CERROR("%s.%d NRS: failed to stop policy '%s' with refcount %d: rc = %d\n", + srv_name, svcpt->scp_cpt, name, + refcount_read(&policy->pol_start_ref), rc); + GOTO(out_put, rc); + } } - cfs_list_del(&policy->pol_list); + LASSERT(policy->pol_private == NULL); + list_del(&policy->pol_list); nrs->nrs_num_pols--; + EXIT; +out_put: nrs_policy_put_locked(policy); - +out_unlock: spin_unlock(&nrs->nrs_lock); - nrs_policy_fini(policy); - - LASSERT(policy->pol_private == NULL); - OBD_FREE_PTR(policy); + if (rc == 0) { + nrs_policy_fini(policy); + OBD_FREE_PTR(policy); + } - RETURN(0); + return rc; } /** @@ -754,10 +801,11 @@ static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name) static int nrs_policy_register(struct ptlrpc_nrs *nrs, struct ptlrpc_nrs_pol_desc *desc) { - struct ptlrpc_nrs_policy *policy; - struct ptlrpc_nrs_policy *tmp; - struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt; - int rc; + struct ptlrpc_nrs_policy *policy; + struct ptlrpc_nrs_policy *tmp; + struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt; + char *srv_name = svcpt->scp_service->srv_name; + int rc; ENTRY; LASSERT(svcpt != NULL); @@ -769,7 +817,7 @@ static int nrs_policy_register(struct ptlrpc_nrs *nrs, LASSERT(desc->pd_compat != NULL); OBD_CPT_ALLOC_GFP(policy, svcpt->scp_service->srv_cptable, - svcpt->scp_cpt, sizeof(*policy), CFS_ALLOC_IO); + svcpt->scp_cpt, sizeof(*policy), GFP_NOFS); if (policy == NULL) RETURN(-ENOMEM); @@ -778,8 +826,10 @@ static int nrs_policy_register(struct ptlrpc_nrs *nrs, policy->pol_state = NRS_POL_STATE_STOPPED; policy->pol_flags = desc->pd_flags; - CFS_INIT_LIST_HEAD(&policy->pol_list); - CFS_INIT_LIST_HEAD(&policy->pol_list_queued); + INIT_LIST_HEAD(&policy->pol_list); + INIT_LIST_HEAD(&policy->pol_list_queued); + + init_waitqueue_head(&policy->pol_wq); rc = nrs_policy_init(policy); if (rc != 0) { @@ -791,23 +841,24 @@ static int nrs_policy_register(struct ptlrpc_nrs *nrs, tmp = nrs_policy_find_locked(nrs, policy->pol_desc->pd_name); if (tmp != NULL) { - CERROR("NRS policy %s has been registered, can't register it " - "for %s\n", policy->pol_desc->pd_name, - svcpt->scp_service->srv_name); + rc = -EEXIST; + CERROR("%s.%d NRS: policy %s has been registered, can't register it: rc = %d\n", + srv_name, svcpt->scp_cpt, policy->pol_desc->pd_name, + rc); nrs_policy_put_locked(tmp); spin_unlock(&nrs->nrs_lock); nrs_policy_fini(policy); OBD_FREE_PTR(policy); - RETURN(-EEXIST); + RETURN(rc); } - cfs_list_add_tail(&policy->pol_list, &nrs->nrs_policy_list); + list_add_tail(&policy->pol_list, &nrs->nrs_policy_list); nrs->nrs_num_pols++; if (policy->pol_flags & PTLRPC_NRS_FL_REG_START) - rc = nrs_policy_start_locked(policy); + rc = nrs_policy_start_locked(policy, NULL); spin_unlock(&nrs->nrs_lock); @@ -838,8 +889,8 @@ static void ptlrpc_nrs_req_add_nolock(struct ptlrpc_request *req) * Add the policy to the NRS head's list of policies with enqueued * requests, if it has not been added there. */ - if (unlikely(cfs_list_empty(&policy->pol_list_queued))) - cfs_list_add_tail(&policy->pol_list_queued, + if (unlikely(list_empty(&policy->pol_list_queued))) + list_add_tail(&policy->pol_list_queued, &policy->pol_nrs->nrs_policy_queued); } @@ -902,14 +953,13 @@ static int nrs_register_policies_locked(struct ptlrpc_nrs *nrs) LASSERT(mutex_is_locked(&nrs_core.nrs_mutex)); - cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) { + list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) { if (nrs_policy_compatible(svc, desc)) { rc = nrs_policy_register(nrs, desc); if (rc != 0) { - CERROR("Failed to register NRS policy %s for " - "partition %d of service %s: %d\n", - desc->pd_name, svcpt->scp_cpt, - svc->srv_name, rc); + CERROR("%s.%d NRS: Failed to register policy %s: rc = %d\n", + svc->srv_name, svcpt->scp_cpt, + desc->pd_name, rc); /** * Fail registration if any of the policies' * registration fails. @@ -952,8 +1002,9 @@ static int nrs_svcpt_setup_locked0(struct ptlrpc_nrs *nrs, nrs->nrs_svcpt = svcpt; nrs->nrs_queue_type = queue; spin_lock_init(&nrs->nrs_lock); - CFS_INIT_LIST_HEAD(&nrs->nrs_policy_list); - CFS_INIT_LIST_HEAD(&nrs->nrs_policy_queued); + INIT_LIST_HEAD(&nrs->nrs_policy_list); + INIT_LIST_HEAD(&nrs->nrs_policy_queued); + nrs->nrs_throttling = 0; rc = nrs_register_policies_locked(nrs); @@ -1024,10 +1075,16 @@ static void nrs_svcpt_cleanup_locked(struct ptlrpc_service_part *svcpt) LASSERT(mutex_is_locked(&nrs_core.nrs_mutex)); again: - nrs = nrs_svcpt2nrs(svcpt, hp); + /* scp_nrs_hp could be NULL due to short of memory. */ + nrs = hp ? svcpt->scp_nrs_hp : &svcpt->scp_nrs_reg; + /* check the nrs_svcpt to see if nrs is initialized. */ + if (!nrs || !nrs->nrs_svcpt) { + EXIT; + return; + } nrs->nrs_stopping = 1; - cfs_list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list, + list_for_each_entry_safe(policy, tmp, &nrs->nrs_policy_list, pol_list) { rc = nrs_policy_unregister(nrs, policy->pol_desc->pd_name); LASSERT(rc == 0); @@ -1060,7 +1117,7 @@ static struct ptlrpc_nrs_pol_desc *nrs_policy_find_desc_locked(const char *name) struct ptlrpc_nrs_pol_desc *tmp; ENTRY; - cfs_list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) { + list_for_each_entry(tmp, &nrs_core.nrs_policies, pd_list) { if (strncmp(tmp->pd_name, name, NRS_POL_NAME_MAX) == 0) RETURN(tmp); } @@ -1091,13 +1148,14 @@ static int nrs_policy_unregister_locked(struct ptlrpc_nrs_pol_desc *desc) LASSERT(mutex_is_locked(&nrs_core.nrs_mutex)); LASSERT(mutex_is_locked(&ptlrpc_all_services_mutex)); - cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) { + list_for_each_entry(svc, &ptlrpc_all_services, srv_list) { if (!nrs_policy_compatible(svc, desc) || unlikely(svc->srv_is_stopping)) continue; ptlrpc_service_for_each_part(svcpt, i, svc) { + char *srv_name = svcpt->scp_service->srv_name; bool hp = false; again: @@ -1110,10 +1168,9 @@ again: if (rc == -ENOENT) { rc = 0; } else if (rc != 0) { - CERROR("Failed to unregister NRS policy %s for " - "partition %d of service %s: %d\n", - desc->pd_name, svcpt->scp_cpt, - svcpt->scp_service->srv_name, rc); + CERROR("%s.%d NRS: Failed to unregister policy %s: rc = %d\n", + srv_name, svcpt->scp_cpt, desc->pd_name, + rc); RETURN(rc); } @@ -1145,7 +1202,7 @@ again: * \retval -ve error * \retval 0 success */ -int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf) +static int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf) { struct ptlrpc_service *svc; struct ptlrpc_nrs_pol_desc *desc; @@ -1174,34 +1231,40 @@ int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf) if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) && (conf->nc_flags & (PTLRPC_NRS_FL_FALLBACK | PTLRPC_NRS_FL_REG_START))) { + rc = -EINVAL; CERROR("NRS: failing to register policy %s. Please check " "policy flags; external policies cannot act as fallback " "policies, or be started immediately upon registration " - "without interaction with lprocfs\n", conf->nc_name); - RETURN(-EINVAL); + "without interaction with lprocfs: rc = %d\n", + conf->nc_name, rc); + RETURN(rc); } mutex_lock(&nrs_core.nrs_mutex); if (nrs_policy_find_desc_locked(conf->nc_name) != NULL) { - CERROR("NRS: failing to register policy %s which has already " - "been registered with NRS core!\n", - conf->nc_name); - GOTO(fail, rc = -EEXIST); + rc = -EEXIST; + CERROR("NRS: failing to register policy %s which has already been registered with NRS core: rc = %d\n", + conf->nc_name, rc); + GOTO(fail, rc); } OBD_ALLOC_PTR(desc); if (desc == NULL) GOTO(fail, rc = -ENOMEM); - strncpy(desc->pd_name, conf->nc_name, NRS_POL_NAME_MAX); + if (strscpy(desc->pd_name, conf->nc_name, sizeof(desc->pd_name)) >= + sizeof(desc->pd_name)) { + OBD_FREE_PTR(desc); + GOTO(fail, rc = -E2BIG); + } desc->pd_ops = conf->nc_ops; desc->pd_compat = conf->nc_compat; desc->pd_compat_svc_name = conf->nc_compat_svc_name; if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) != 0) desc->pd_owner = conf->nc_owner; desc->pd_flags = conf->nc_flags; - cfs_atomic_set(&desc->pd_refs, 0); + atomic_set(&desc->pd_refs, 0); /** * For policies that are held in the same module as NRS (currently @@ -1219,7 +1282,7 @@ int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf) */ mutex_lock(&ptlrpc_all_services_mutex); - cfs_list_for_each_entry(svc, &ptlrpc_all_services, srv_list) { + list_for_each_entry(svc, &ptlrpc_all_services, srv_list) { struct ptlrpc_service_part *svcpt; int i; int rc2; @@ -1229,16 +1292,16 @@ int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf) continue; ptlrpc_service_for_each_part(svcpt, i, svc) { - struct ptlrpc_nrs *nrs; - bool hp = false; + struct ptlrpc_nrs *nrs; + char *srv_name = svcpt->scp_service->srv_name; + bool hp = false; again: nrs = nrs_svcpt2nrs(svcpt, hp); rc = nrs_policy_register(nrs, desc); if (rc != 0) { - CERROR("Failed to register NRS policy %s for " - "partition %d of service %s: %d\n", - desc->pd_name, svcpt->scp_cpt, - svcpt->scp_service->srv_name, rc); + CERROR("%s.%d NRS: Failed to register policy %s: rc = %d\n", + srv_name, svcpt->scp_cpt, + desc->pd_name, rc); rc2 = nrs_policy_unregister_locked(desc); /** @@ -1277,80 +1340,12 @@ again: mutex_unlock(&ptlrpc_all_services_mutex); internal: - cfs_list_add_tail(&desc->pd_list, &nrs_core.nrs_policies); + list_add_tail(&desc->pd_list, &nrs_core.nrs_policies); fail: mutex_unlock(&nrs_core.nrs_mutex); RETURN(rc); } -EXPORT_SYMBOL(ptlrpc_nrs_policy_register); - -/** - * Unregisters a previously registered policy with NRS core. All instances of - * the policy on all NRS heads of all supported services are removed. - * - * N.B. This function should only be called from a module's exit() function. - * Although it can be used for policies that ship alongside NRS core, the - * function is primarily intended for policies that register externally, - * from other modules. - * - * \param[in] conf configuration information for the policy to unregister - * - * \retval -ve error - * \retval 0 success - */ -int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_conf *conf) -{ - struct ptlrpc_nrs_pol_desc *desc; - int rc; - ENTRY; - - LASSERT(conf != NULL); - - if (conf->nc_flags & PTLRPC_NRS_FL_FALLBACK) { - CERROR("Unable to unregister a fallback policy, unless the " - "PTLRPC service is stopping.\n"); - RETURN(-EPERM); - } - - conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0'; - - mutex_lock(&nrs_core.nrs_mutex); - - desc = nrs_policy_find_desc_locked(conf->nc_name); - if (desc == NULL) { - CERROR("Failing to unregister NRS policy %s which has " - "not been registered with NRS core!\n", - conf->nc_name); - GOTO(not_exist, rc = -ENOENT); - } - - mutex_lock(&ptlrpc_all_services_mutex); - - rc = nrs_policy_unregister_locked(desc); - if (rc < 0) { - if (rc == -EBUSY) - CERROR("Please first stop policy %s on all service " - "partitions and then retry to unregister the " - "policy.\n", conf->nc_name); - GOTO(fail, rc); - } - - CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n", - conf->nc_name); - - cfs_list_del(&desc->pd_list); - OBD_FREE_PTR(desc); - -fail: - mutex_unlock(&ptlrpc_all_services_mutex); - -not_exist: - mutex_unlock(&nrs_core.nrs_mutex); - - RETURN(rc); -} -EXPORT_SYMBOL(ptlrpc_nrs_policy_unregister); /** * Setup NRS heads on all service partitions of service \a svc, and register @@ -1388,7 +1383,7 @@ int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc) * Set up lprocfs interfaces for all supported policies for the * service. */ - cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) { + list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) { if (!nrs_policy_compatible(svc, desc)) continue; @@ -1429,7 +1424,7 @@ void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc) * Clean up lprocfs interfaces for all supported policies for the * service. */ - cfs_list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) { + list_for_each_entry(desc, &nrs_core.nrs_policies, pd_list) { if (!nrs_policy_compatible(svc, desc)) continue; @@ -1526,7 +1521,7 @@ static void nrs_request_removed(struct ptlrpc_nrs_policy *policy) * ptlrpc_nrs::nrs_policy_queued. */ if (unlikely(policy->pol_req_queued == 0)) { - cfs_list_del_init(&policy->pol_list_queued); + list_del_init(&policy->pol_list_queued); /** * If there are other policies with queued requests, move the @@ -1537,9 +1532,12 @@ static void nrs_request_removed(struct ptlrpc_nrs_policy *policy) LASSERT(policy->pol_req_queued < policy->pol_nrs->nrs_req_queued); - cfs_list_move_tail(&policy->pol_list_queued, + list_move_tail(&policy->pol_list_queued, &policy->pol_nrs->nrs_policy_queued); } + + /* remove the extra ref for policy pending requests */ + nrs_policy_started_put(policy); } /** @@ -1570,7 +1568,7 @@ ptlrpc_nrs_req_get_nolock0(struct ptlrpc_service_part *svcpt, bool hp, * Always try to drain requests from all NRS polices even if they are * inactive, because the user can change policy status at runtime. */ - cfs_list_for_each_entry(policy, &nrs->nrs_policy_queued, + list_for_each_entry(policy, &nrs->nrs_policy_queued, pol_list_queued) { nrq = nrs_request_get(policy, peek, force); if (nrq != NULL) { @@ -1627,6 +1625,24 @@ bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp) }; /** + * Returns whether NRS policy is throttling reqeust + * + * \param[in] svcpt the service partition to enquire. + * \param[in] hp whether the regular or high-priority NRS head is to be + * enquired. + * + * \retval false the indicated NRS head has no enqueued requests. + * \retval true the indicated NRS head has some enqueued requests. + */ +bool ptlrpc_nrs_req_throttling_nolock(struct ptlrpc_service_part *svcpt, + bool hp) +{ + struct ptlrpc_nrs *nrs = nrs_svcpt2nrs(svcpt, hp); + + return !!nrs->nrs_throttling; +}; + +/** * Moves request \a req from the regular to the high-priority NRS head. * * \param[in] req the request to move @@ -1738,17 +1754,6 @@ out: RETURN(rc); } - -/* ptlrpc/nrs_fifo.c */ -extern struct ptlrpc_nrs_pol_conf nrs_conf_fifo; -#if defined HAVE_SERVER_SUPPORT && defined(__KERNEL__) -/* ptlrpc/nrs_crr.c */ -extern struct ptlrpc_nrs_pol_conf nrs_conf_crrn; -/* ptlrpc/nrs_orr.c */ -extern struct ptlrpc_nrs_pol_conf nrs_conf_orr; -extern struct ptlrpc_nrs_pol_conf nrs_conf_trr; -#endif - /** * Adds all policies that ship with the ptlrpc module, to NRS core's list of * policies \e nrs_core.nrs_policies. @@ -1762,13 +1767,13 @@ int ptlrpc_nrs_init(void) ENTRY; mutex_init(&nrs_core.nrs_mutex); - CFS_INIT_LIST_HEAD(&nrs_core.nrs_policies); + INIT_LIST_HEAD(&nrs_core.nrs_policies); rc = ptlrpc_nrs_policy_register(&nrs_conf_fifo); if (rc != 0) GOTO(fail, rc); -#if defined HAVE_SERVER_SUPPORT && defined(__KERNEL__) +#ifdef HAVE_SERVER_SUPPORT rc = ptlrpc_nrs_policy_register(&nrs_conf_crrn); if (rc != 0) GOTO(fail, rc); @@ -1780,7 +1785,14 @@ int ptlrpc_nrs_init(void) rc = ptlrpc_nrs_policy_register(&nrs_conf_trr); if (rc != 0) GOTO(fail, rc); -#endif + rc = ptlrpc_nrs_policy_register(&nrs_conf_tbf); + if (rc != 0) + GOTO(fail, rc); +#endif /* HAVE_SERVER_SUPPORT */ + + rc = ptlrpc_nrs_policy_register(&nrs_conf_delay); + if (rc != 0) + GOTO(fail, rc); RETURN(rc); fail: @@ -1794,7 +1806,7 @@ fail: } /** - * Removes all policy desciptors from nrs_core::nrs_policies, and frees the + * Removes all policy descriptors from nrs_core::nrs_policies, and frees the * policy descriptors. * * Since all PTLRPC services are stopped at this point, there are no more @@ -1807,11 +1819,9 @@ void ptlrpc_nrs_fini(void) struct ptlrpc_nrs_pol_desc *desc; struct ptlrpc_nrs_pol_desc *tmp; - cfs_list_for_each_entry_safe(desc, tmp, &nrs_core.nrs_policies, + list_for_each_entry_safe(desc, tmp, &nrs_core.nrs_policies, pd_list) { - cfs_list_del_init(&desc->pd_list); + list_del_init(&desc->pd_list); OBD_FREE_PTR(desc); } } - -/** @} nrs */