static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
{
LASSERT(policy->pol_ref == 0);
+ LASSERT(refcount_read(&policy->pol_start_ref) == 0);
LASSERT(policy->pol_req_queued == 0);
if (policy->pol_desc->pd_ops->op_policy_fini != NULL)
policy->pol_req_started == 0);
policy->pol_private = NULL;
+ policy->pol_arg[0] = '\0';
policy->pol_state = NRS_POL_STATE_STOPPED;
+ wake_up(&policy->pol_wq);
if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
module_put(policy->pol_desc->pd_owner);
EXIT;
}
+/**
+ * Increases the policy's usage started reference count.
+ */
+static inline void nrs_policy_started_get(struct ptlrpc_nrs_policy *policy)
+{
+ refcount_inc(&policy->pol_start_ref);
+}
+
+/**
+ * Decreases the policy's usage started reference count, and stops the policy
+ * in case it was already stopping and have no more outstanding usage
+ * references (which indicates it has no more queued or started requests, and
+ * can be safely stopped).
+ */
+static void nrs_policy_started_put(struct ptlrpc_nrs_policy *policy)
+{
+ if (refcount_dec_and_test(&policy->pol_start_ref))
+ nrs_policy_stop0(policy);
+}
+
static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
{
struct ptlrpc_nrs *nrs = policy->pol_nrs;
nrs->nrs_policy_fallback = NULL;
}
- /* I have the only refcount */
- if (policy->pol_ref == 1)
- nrs_policy_stop0(policy);
+ /* Drop started ref and wait for requests to be drained */
+ spin_unlock(&nrs->nrs_lock);
+ nrs_policy_started_put(policy);
+
+ wait_event_timeout(policy->pol_wq,
+ policy->pol_state == NRS_POL_STATE_STOPPED,
+ cfs_time_seconds(30));
+
+ spin_lock(&nrs->nrs_lock);
+
+ if (policy->pol_state != NRS_POL_STATE_STOPPED)
+ RETURN(-EBUSY);
RETURN(0);
}
LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED);
tmp->pol_state = NRS_POL_STATE_STOPPING;
- if (tmp->pol_ref == 0)
- nrs_policy_stop0(tmp);
+ /* Drop started ref to free the policy */
+ spin_unlock(&nrs->nrs_lock);
+ nrs_policy_started_put(tmp);
+ spin_lock(&nrs->nrs_lock);
EXIT;
}
*/
static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg)
{
- struct ptlrpc_nrs *nrs = policy->pol_nrs;
- int rc = 0;
+ struct ptlrpc_nrs *nrs = policy->pol_nrs;
+ struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
+ char *srv_name = svcpt->scp_service->srv_name;
+ int rc = 0;
ENTRY;
/**
if (policy->pol_state == NRS_POL_STATE_STOPPING)
RETURN(-EAGAIN);
+ if (arg && strlen(arg) >= sizeof(policy->pol_arg)) {
+ rc = -EINVAL;
+ CWARN("%s.%d NRS: arg '%s' is too long: rc = %d\n",
+ srv_name, svcpt->scp_cpt, arg, rc);
+ return rc;
+ }
+
if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
/**
* This is for cases in which the user sets the policy to the
* stop the policy first and start it again with the new
* argument.
*/
- if ((arg != NULL) && (strlen(arg) >= NRS_POL_ARG_MAX))
- return -EINVAL;
-
if ((arg == NULL && strlen(policy->pol_arg) == 0) ||
(arg != NULL && strcmp(policy->pol_arg, arg) == 0))
RETURN(0);
rc = nrs_policy_stop_locked(policy);
if (rc)
- RETURN(-EAGAIN);
+ RETURN(rc);
}
}
if (atomic_inc_return(&policy->pol_desc->pd_refs) == 1 &&
!try_module_get(policy->pol_desc->pd_owner)) {
atomic_dec(&policy->pol_desc->pd_refs);
- CERROR("NRS: cannot get module for policy %s; is it alive?\n",
- policy->pol_desc->pd_name);
- RETURN(-ENODEV);
+ rc = -ENODEV;
+ CERROR("%s.%d NRS: cannot get module for policy %s (is it alive?): rc = %d\n",
+ srv_name, svcpt->scp_cpt, policy->pol_desc->pd_name, rc);
+ RETURN(rc);
}
/**
}
}
- if (arg != NULL) {
- if (strlcpy(policy->pol_arg, arg, sizeof(policy->pol_arg)) >=
- sizeof(policy->pol_arg)) {
- CERROR("NRS: arg '%s' is too long\n", arg);
- GOTO(out, rc = -E2BIG);
- }
- } else {
- policy->pol_arg[0] = '\0';
- }
+ if (arg)
+ strlcpy(policy->pol_arg, arg, sizeof(policy->pol_arg));
+ /* take the started reference */
+ refcount_set(&policy->pol_start_ref, 1);
policy->pol_state = NRS_POL_STATE_STARTED;
if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
}
/**
- * Increases the policy's usage reference count.
+ * Increases the policy's usage reference count (caller count).
*/
static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
+__must_hold(&policy->pol_nrs->nrs_lock)
{
policy->pol_ref++;
}
/**
- * Decreases the policy's usage reference count, and stops the policy in case it
- * was already stopping and have no more outstanding usage references (which
- * indicates it has no more queued or started requests, and can be safely
- * stopped).
+ * Decreases the policy's usage reference count.
*/
static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
+__must_hold(&policy->pol_nrs->nrs_lock)
{
LASSERT(policy->pol_ref > 0);
policy->pol_ref--;
- if (unlikely(policy->pol_ref == 0 &&
- policy->pol_state == NRS_POL_STATE_STOPPING))
- nrs_policy_stop0(policy);
-}
-
-static void nrs_policy_put(struct ptlrpc_nrs_policy *policy)
-{
- spin_lock(&policy->pol_nrs->nrs_lock);
- nrs_policy_put_locked(policy);
- spin_unlock(&policy->pol_nrs->nrs_lock);
}
/**
spin_lock(&nrs->nrs_lock);
fallback = nrs->nrs_policy_fallback;
- nrs_policy_get_locked(fallback);
+ nrs_policy_started_get(fallback);
primary = nrs->nrs_policy_primary;
if (primary != NULL)
- nrs_policy_get_locked(primary);
+ nrs_policy_started_get(primary);
spin_unlock(&nrs->nrs_lock);
* request.
*/
if (resp[NRS_RES_PRIMARY] == NULL)
- nrs_policy_put(primary);
+ nrs_policy_started_put(primary);
}
}
static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
{
struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
- struct ptlrpc_nrs *nrs = NULL;
- int i;
+ int i;
for (i = 0; i < NRS_RES_MAX; i++) {
if (resp[i] != NULL) {
if (pols[i] == NULL)
continue;
- if (nrs == NULL) {
- nrs = pols[i]->pol_nrs;
- spin_lock(&nrs->nrs_lock);
- }
- nrs_policy_put_locked(pols[i]);
+ nrs_policy_started_put(pols[i]);
}
-
- if (nrs != NULL)
- spin_unlock(&nrs->nrs_lock);
}
/**
LASSERT(policy->pol_req_queued > 0);
+ /* for a non-started policy, use force mode to drain requests */
+ if (unlikely(policy->pol_state != NRS_POL_STATE_STARTED))
+ force = true;
+
nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force);
LASSERT(ergo(nrq != NULL, nrs_request_policy(nrq) == policy));
if (rc == 0) {
policy->pol_nrs->nrs_req_queued++;
policy->pol_req_queued++;
+ /**
+ * Take an extra ref to avoid stopping policy with
+ * pending request in it
+ */
+ nrs_policy_started_get(policy);
return;
}
}
static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
{
struct ptlrpc_nrs_policy *policy = NULL;
+ struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
+ char *srv_name = svcpt->scp_service->srv_name;
+ int rc = 0;
ENTRY;
spin_lock(&nrs->nrs_lock);
policy = nrs_policy_find_locked(nrs, name);
if (policy == NULL) {
- spin_unlock(&nrs->nrs_lock);
-
- CERROR("Can't find NRS policy %s\n", name);
- RETURN(-ENOENT);
+ rc = -ENOENT;
+ CERROR("%s.%d NRS: cannot find policy '%s': rc = %d\n",
+ srv_name, svcpt->scp_cpt, name, rc);
+ GOTO(out_unlock, rc);
}
if (policy->pol_ref > 1) {
- CERROR("Policy %s is busy with %d references\n", name,
- (int)policy->pol_ref);
- nrs_policy_put_locked(policy);
-
- spin_unlock(&nrs->nrs_lock);
- RETURN(-EBUSY);
+ rc = -EBUSY;
+ CERROR("%s.%d NRS: policy '%s' is busy with %ld references: rc = %d\n",
+ srv_name, svcpt->scp_cpt, name, policy->pol_ref, rc);
+ GOTO(out_put, rc);
}
LASSERT(policy->pol_req_queued == 0);
LASSERT(policy->pol_req_started == 0);
if (policy->pol_state != NRS_POL_STATE_STOPPED) {
- nrs_policy_stop_locked(policy);
- LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED);
+ rc = nrs_policy_stop_locked(policy);
+ if (rc) {
+ CERROR("%s.%d NRS: failed to stop policy '%s' with refcount %d: rc = %d\n",
+ srv_name, svcpt->scp_cpt, name,
+ refcount_read(&policy->pol_start_ref), rc);
+ GOTO(out_put, rc);
+ }
}
+ LASSERT(policy->pol_private == NULL);
list_del(&policy->pol_list);
nrs->nrs_num_pols--;
+ EXIT;
+out_put:
nrs_policy_put_locked(policy);
-
+out_unlock:
spin_unlock(&nrs->nrs_lock);
- nrs_policy_fini(policy);
-
- LASSERT(policy->pol_private == NULL);
- OBD_FREE_PTR(policy);
+ if (rc == 0) {
+ nrs_policy_fini(policy);
+ OBD_FREE_PTR(policy);
+ }
- RETURN(0);
+ return rc;
}
/**
static int nrs_policy_register(struct ptlrpc_nrs *nrs,
struct ptlrpc_nrs_pol_desc *desc)
{
- struct ptlrpc_nrs_policy *policy;
- struct ptlrpc_nrs_policy *tmp;
- struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
- int rc;
+ struct ptlrpc_nrs_policy *policy;
+ struct ptlrpc_nrs_policy *tmp;
+ struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
+ char *srv_name = svcpt->scp_service->srv_name;
+ int rc;
ENTRY;
LASSERT(svcpt != NULL);
INIT_LIST_HEAD(&policy->pol_list);
INIT_LIST_HEAD(&policy->pol_list_queued);
+ init_waitqueue_head(&policy->pol_wq);
+
rc = nrs_policy_init(policy);
if (rc != 0) {
OBD_FREE_PTR(policy);
tmp = nrs_policy_find_locked(nrs, policy->pol_desc->pd_name);
if (tmp != NULL) {
- CERROR("NRS policy %s has been registered, can't register it "
- "for %s\n", policy->pol_desc->pd_name,
- svcpt->scp_service->srv_name);
+ rc = -EEXIST;
+ CERROR("%s.%d NRS: policy %s has been registered, can't register it: rc = %d\n",
+ srv_name, svcpt->scp_cpt, policy->pol_desc->pd_name,
+ rc);
nrs_policy_put_locked(tmp);
spin_unlock(&nrs->nrs_lock);
nrs_policy_fini(policy);
OBD_FREE_PTR(policy);
- RETURN(-EEXIST);
+ RETURN(rc);
}
list_add_tail(&policy->pol_list, &nrs->nrs_policy_list);
if (nrs_policy_compatible(svc, desc)) {
rc = nrs_policy_register(nrs, desc);
if (rc != 0) {
- CERROR("Failed to register NRS policy %s for "
- "partition %d of service %s: %d\n",
- desc->pd_name, svcpt->scp_cpt,
- svc->srv_name, rc);
+ CERROR("%s.%d NRS: Failed to register policy %s: rc = %d\n",
+ svc->srv_name, svcpt->scp_cpt,
+ desc->pd_name, rc);
/**
* Fail registration if any of the policies'
* registration fails.
continue;
ptlrpc_service_for_each_part(svcpt, i, svc) {
+ char *srv_name = svcpt->scp_service->srv_name;
bool hp = false;
again:
if (rc == -ENOENT) {
rc = 0;
} else if (rc != 0) {
- CERROR("Failed to unregister NRS policy %s for "
- "partition %d of service %s: %d\n",
- desc->pd_name, svcpt->scp_cpt,
- svcpt->scp_service->srv_name, rc);
+ CERROR("%s.%d NRS: Failed to unregister policy %s: rc = %d\n",
+ srv_name, svcpt->scp_cpt, desc->pd_name,
+ rc);
RETURN(rc);
}
* \retval -ve error
* \retval 0 success
*/
-int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
+static int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
{
struct ptlrpc_service *svc;
struct ptlrpc_nrs_pol_desc *desc;
if ((conf->nc_flags & PTLRPC_NRS_FL_REG_EXTERN) &&
(conf->nc_flags & (PTLRPC_NRS_FL_FALLBACK |
PTLRPC_NRS_FL_REG_START))) {
+ rc = -EINVAL;
CERROR("NRS: failing to register policy %s. Please check "
"policy flags; external policies cannot act as fallback "
"policies, or be started immediately upon registration "
- "without interaction with lprocfs\n", conf->nc_name);
- RETURN(-EINVAL);
+ "without interaction with lprocfs: rc = %d\n",
+ conf->nc_name, rc);
+ RETURN(rc);
}
mutex_lock(&nrs_core.nrs_mutex);
if (nrs_policy_find_desc_locked(conf->nc_name) != NULL) {
- CERROR("NRS: failing to register policy %s which has already "
- "been registered with NRS core!\n",
- conf->nc_name);
- GOTO(fail, rc = -EEXIST);
+ rc = -EEXIST;
+ CERROR("NRS: failing to register policy %s which has already been registered with NRS core: rc = %d\n",
+ conf->nc_name, rc);
+ GOTO(fail, rc);
}
OBD_ALLOC_PTR(desc);
continue;
ptlrpc_service_for_each_part(svcpt, i, svc) {
- struct ptlrpc_nrs *nrs;
- bool hp = false;
+ struct ptlrpc_nrs *nrs;
+ char *srv_name = svcpt->scp_service->srv_name;
+ bool hp = false;
again:
nrs = nrs_svcpt2nrs(svcpt, hp);
rc = nrs_policy_register(nrs, desc);
if (rc != 0) {
- CERROR("Failed to register NRS policy %s for "
- "partition %d of service %s: %d\n",
- desc->pd_name, svcpt->scp_cpt,
- svcpt->scp_service->srv_name, rc);
+ CERROR("%s.%d NRS: Failed to register policy %s: rc = %d\n",
+ srv_name, svcpt->scp_cpt,
+ desc->pd_name, rc);
rc2 = nrs_policy_unregister_locked(desc);
/**
RETURN(rc);
}
-EXPORT_SYMBOL(ptlrpc_nrs_policy_register);
-
-/**
- * Unregisters a previously registered policy with NRS core. All instances of
- * the policy on all NRS heads of all supported services are removed.
- *
- * N.B. This function should only be called from a module's exit() function.
- * Although it can be used for policies that ship alongside NRS core, the
- * function is primarily intended for policies that register externally,
- * from other modules.
- *
- * \param[in] conf configuration information for the policy to unregister
- *
- * \retval -ve error
- * \retval 0 success
- */
-int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_conf *conf)
-{
- struct ptlrpc_nrs_pol_desc *desc;
- int rc;
- ENTRY;
-
- LASSERT(conf != NULL);
-
- if (conf->nc_flags & PTLRPC_NRS_FL_FALLBACK) {
- CERROR("Unable to unregister a fallback policy, unless the "
- "PTLRPC service is stopping.\n");
- RETURN(-EPERM);
- }
-
- conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
-
- mutex_lock(&nrs_core.nrs_mutex);
-
- desc = nrs_policy_find_desc_locked(conf->nc_name);
- if (desc == NULL) {
- CERROR("Failing to unregister NRS policy %s which has "
- "not been registered with NRS core!\n",
- conf->nc_name);
- GOTO(not_exist, rc = -ENOENT);
- }
-
- mutex_lock(&ptlrpc_all_services_mutex);
-
- rc = nrs_policy_unregister_locked(desc);
- if (rc < 0) {
- if (rc == -EBUSY)
- CERROR("Please first stop policy %s on all service "
- "partitions and then retry to unregister the "
- "policy.\n", conf->nc_name);
- GOTO(fail, rc);
- }
-
- CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n",
- conf->nc_name);
-
- list_del(&desc->pd_list);
- OBD_FREE_PTR(desc);
-
-fail:
- mutex_unlock(&ptlrpc_all_services_mutex);
-
-not_exist:
- mutex_unlock(&nrs_core.nrs_mutex);
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(ptlrpc_nrs_policy_unregister);
/**
* Setup NRS heads on all service partitions of service \a svc, and register
list_move_tail(&policy->pol_list_queued,
&policy->pol_nrs->nrs_policy_queued);
}
+
+ /* remove the extra ref for policy pending requests */
+ nrs_policy_started_put(policy);
}
/**
rc = ptlrpc_nrs_policy_register(&nrs_conf_tbf);
if (rc != 0)
GOTO(fail, rc);
+#endif /* HAVE_SERVER_SUPPORT */
rc = ptlrpc_nrs_policy_register(&nrs_conf_delay);
if (rc != 0)
GOTO(fail, rc);
-#endif /* HAVE_SERVER_SUPPORT */
RETURN(rc);
fail:
OBD_FREE_PTR(desc);
}
}
-
-/** @} nrs */