static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
{
LASSERT(policy->pol_ref == 0);
+ LASSERT(refcount_read(&policy->pol_start_ref) == 0);
LASSERT(policy->pol_req_queued == 0);
if (policy->pol_desc->pd_ops->op_policy_fini != NULL)
policy->pol_req_started == 0);
policy->pol_private = NULL;
+ policy->pol_arg[0] = '\0';
policy->pol_state = NRS_POL_STATE_STOPPED;
+ wake_up(&policy->pol_wq);
if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
module_put(policy->pol_desc->pd_owner);
EXIT;
}
+/**
+ * Increases the policy's usage started reference count.
+ */
+static inline void nrs_policy_started_get(struct ptlrpc_nrs_policy *policy)
+{
+ refcount_inc(&policy->pol_start_ref);
+}
+
+/**
+ * Decreases the policy's usage started reference count, and stops the policy
+ * in case it was already stopping and have no more outstanding usage
+ * references (which indicates it has no more queued or started requests, and
+ * can be safely stopped).
+ */
+static void nrs_policy_started_put(struct ptlrpc_nrs_policy *policy)
+{
+ if (refcount_dec_and_test(&policy->pol_start_ref))
+ nrs_policy_stop0(policy);
+}
+
static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
{
struct ptlrpc_nrs *nrs = policy->pol_nrs;
nrs->nrs_policy_fallback = NULL;
}
- /* I have the only refcount */
- if (policy->pol_ref == 1)
- nrs_policy_stop0(policy);
+ /* Drop started ref and wait for requests to be drained */
+ spin_unlock(&nrs->nrs_lock);
+ nrs_policy_started_put(policy);
+
+ wait_event_timeout(policy->pol_wq,
+ policy->pol_state == NRS_POL_STATE_STOPPED,
+ cfs_time_seconds(30));
+
+ spin_lock(&nrs->nrs_lock);
+
+ if (policy->pol_state != NRS_POL_STATE_STOPPED)
+ RETURN(-EBUSY);
RETURN(0);
}
LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED);
tmp->pol_state = NRS_POL_STATE_STOPPING;
- if (tmp->pol_ref == 0)
- nrs_policy_stop0(tmp);
+ /* Drop started ref to free the policy */
+ spin_unlock(&nrs->nrs_lock);
+ nrs_policy_started_put(tmp);
+ spin_lock(&nrs->nrs_lock);
EXIT;
}
if (policy->pol_state == NRS_POL_STATE_STOPPING)
RETURN(-EAGAIN);
+ if (arg && strlen(arg) >= sizeof(policy->pol_arg)) {
+ CWARN("NRS: arg '%s' is too long\n", arg);
+ return -EINVAL;
+ }
+
if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
/**
* This is for cases in which the user sets the policy to the
* stop the policy first and start it again with the new
* argument.
*/
- if ((arg != NULL) && (strlen(arg) >= NRS_POL_ARG_MAX))
- return -EINVAL;
-
if ((arg == NULL && strlen(policy->pol_arg) == 0) ||
(arg != NULL && strcmp(policy->pol_arg, arg) == 0))
RETURN(0);
rc = nrs_policy_stop_locked(policy);
if (rc)
- RETURN(-EAGAIN);
+ RETURN(rc);
}
}
}
}
- if (arg != NULL) {
- if (strlcpy(policy->pol_arg, arg, sizeof(policy->pol_arg)) >=
- sizeof(policy->pol_arg)) {
- CERROR("NRS: arg '%s' is too long\n", arg);
- GOTO(out, rc = -E2BIG);
- }
- } else {
- policy->pol_arg[0] = '\0';
- }
+ if (arg)
+ strlcpy(policy->pol_arg, arg, sizeof(policy->pol_arg));
+ /* take the started reference */
+ refcount_set(&policy->pol_start_ref, 1);
policy->pol_state = NRS_POL_STATE_STARTED;
if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
}
/**
- * Increases the policy's usage reference count.
+ * Increases the policy's usage reference count (caller count).
*/
static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
+__must_hold(&policy->pol_nrs->nrs_lock)
{
policy->pol_ref++;
}
/**
- * Decreases the policy's usage reference count, and stops the policy in case it
- * was already stopping and have no more outstanding usage references (which
- * indicates it has no more queued or started requests, and can be safely
- * stopped).
+ * Decreases the policy's usage reference count.
*/
static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
+__must_hold(&policy->pol_nrs->nrs_lock)
{
LASSERT(policy->pol_ref > 0);
policy->pol_ref--;
- if (unlikely(policy->pol_ref == 0 &&
- policy->pol_state == NRS_POL_STATE_STOPPING))
- nrs_policy_stop0(policy);
-}
-
-static void nrs_policy_put(struct ptlrpc_nrs_policy *policy)
-{
- spin_lock(&policy->pol_nrs->nrs_lock);
- nrs_policy_put_locked(policy);
- spin_unlock(&policy->pol_nrs->nrs_lock);
}
/**
spin_lock(&nrs->nrs_lock);
fallback = nrs->nrs_policy_fallback;
- nrs_policy_get_locked(fallback);
+ nrs_policy_started_get(fallback);
primary = nrs->nrs_policy_primary;
if (primary != NULL)
- nrs_policy_get_locked(primary);
+ nrs_policy_started_get(primary);
spin_unlock(&nrs->nrs_lock);
* request.
*/
if (resp[NRS_RES_PRIMARY] == NULL)
- nrs_policy_put(primary);
+ nrs_policy_started_put(primary);
}
}
static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
{
struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
- struct ptlrpc_nrs *nrs = NULL;
- int i;
+ int i;
for (i = 0; i < NRS_RES_MAX; i++) {
if (resp[i] != NULL) {
if (pols[i] == NULL)
continue;
- if (nrs == NULL) {
- nrs = pols[i]->pol_nrs;
- spin_lock(&nrs->nrs_lock);
- }
- nrs_policy_put_locked(pols[i]);
+ nrs_policy_started_put(pols[i]);
}
-
- if (nrs != NULL)
- spin_unlock(&nrs->nrs_lock);
}
/**
LASSERT(policy->pol_req_queued > 0);
+ /* for a non-started policy, use force mode to drain requests */
+ if (unlikely(policy->pol_state != NRS_POL_STATE_STARTED))
+ force = true;
+
nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force);
LASSERT(ergo(nrq != NULL, nrs_request_policy(nrq) == policy));
if (rc == 0) {
policy->pol_nrs->nrs_req_queued++;
policy->pol_req_queued++;
+ /**
+ * Take an extra ref to avoid stopping policy with
+ * pending request in it
+ */
+ nrs_policy_started_get(policy);
return;
}
}
static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
{
struct ptlrpc_nrs_policy *policy = NULL;
+ int rc = 0;
ENTRY;
spin_lock(&nrs->nrs_lock);
policy = nrs_policy_find_locked(nrs, name);
if (policy == NULL) {
- spin_unlock(&nrs->nrs_lock);
-
- CERROR("Can't find NRS policy %s\n", name);
- RETURN(-ENOENT);
+ rc = -ENOENT;
+ CERROR("NRS: cannot find policy '%s': rc = %d\n", name, rc);
+ GOTO(out_unlock, rc);
}
if (policy->pol_ref > 1) {
- CERROR("Policy %s is busy with %d references\n", name,
- (int)policy->pol_ref);
- nrs_policy_put_locked(policy);
-
- spin_unlock(&nrs->nrs_lock);
- RETURN(-EBUSY);
+ rc = -EBUSY;
+ CERROR("NRS: policy '%s' is busy with %ld references: rc = %d",
+ name, policy->pol_ref, rc);
+ GOTO(out_put, rc);
}
LASSERT(policy->pol_req_queued == 0);
LASSERT(policy->pol_req_started == 0);
if (policy->pol_state != NRS_POL_STATE_STOPPED) {
- nrs_policy_stop_locked(policy);
- LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED);
+ rc = nrs_policy_stop_locked(policy);
+ if (rc) {
+ CERROR("NRS: failed to stop policy '%s' with refcount %d: rc = %d\n",
+ name, refcount_read(&policy->pol_start_ref), rc);
+ GOTO(out_put, rc);
+ }
}
+ LASSERT(policy->pol_private == NULL);
list_del(&policy->pol_list);
nrs->nrs_num_pols--;
+ EXIT;
+out_put:
nrs_policy_put_locked(policy);
-
+out_unlock:
spin_unlock(&nrs->nrs_lock);
- nrs_policy_fini(policy);
-
- LASSERT(policy->pol_private == NULL);
- OBD_FREE_PTR(policy);
+ if (rc == 0) {
+ nrs_policy_fini(policy);
+ OBD_FREE_PTR(policy);
+ }
- RETURN(0);
+ return rc;
}
/**
INIT_LIST_HEAD(&policy->pol_list);
INIT_LIST_HEAD(&policy->pol_list_queued);
+ init_waitqueue_head(&policy->pol_wq);
+
rc = nrs_policy_init(policy);
if (rc != 0) {
OBD_FREE_PTR(policy);
list_move_tail(&policy->pol_list_queued,
&policy->pol_nrs->nrs_policy_queued);
}
+
+ /* remove the extra ref for policy pending requests */
+ nrs_policy_started_put(policy);
}
/**
OBD_FREE_PTR(desc);
}
}
-
-/** @} nrs */
}
run_test 77p "Check validity of rule names for TBF policies"
+cleanup_77r() {
+ local pid=$1
+ local saved_jobid=$2
+ local current_jobid_var
+
+ echo "cleanup 77r $pid"
+
+ do_facet mds1 $LCTL set_param -n mds.MDS.mdt.nrs_policies=fifo
+ kill $pid || echo "fail to kill md thread"
+
+ current_jobid_var=$($LCTL get_param -n jobid_var)
+ if [ $saved_jobid != $current_jobid_var ]; then
+ set_persistent_param_and_check client \
+ "jobid_var" "$FSNAME.sys.jobid_var" $saved_jobid
+ fi
+
+ sleep 2
+ rm -rf $DIR1/$tdir
+}
+
+md_thread_run="true"
+md_thread_77r() {
+ local pid
+
+ while $md_thread_run; do
+ printf '%s\n' {$DIR1,$DIR2}/$tdir/${tfile}-{01..20} |
+ xargs -P20 -I{} $RUNAS bash -c 'touch {}; rm -f {}' \
+ &> /dev/null & pid=$!
+ trap "echo kill md_thread xargs; md_thread_run=false; kill $pid" INT TERM
+ wait $pid
+ done
+}
+
+wait_policy_state() {
+ local state="$1"
+ local policy="$2"
+ local change_pid="$3"
+ local time
+
+ for time in {1..60}; do
+ local nbr_started
+
+ nbr_started=$(do_facet mds1 $LCTL get_param mds.MDS.mdt.nrs_policies |
+ egrep -A2 "name: ${policy}$" | grep -c "state: $state")
+
+ [[ "$nbr_started" != 2 ]] || return 0
+ sleep 1
+ done
+
+ [[ -z "$change_pid" ]] || kill $change_pid || true
+ return 1
+}
+
+test_77r() { #LU-14976
+ local pid
+ local -A rules
+ local -a policies
+ local saved_jobid_var
+
+ rules["tbf uid"]="start md_rule uid={$RUNAS_ID} rate=1"
+ rules["tbf gid"]="start md_rule gid={$RUNAS_GID} rate=1"
+ rules["tbf jobid"]="start md_rule jobid={*.$RUNAS_ID} rate=1"
+ rules["tbf"]="start md_rule uid={$RUNAS_ID} rate=1"
+ policies=(
+ "tbf uid"
+ "tbf gid"
+ "tbf jobid"
+ "tbf"
+ "fifo"
+ )
+
+ test_mkdir -i 0 -c 1 $DIR1/$tdir
+ chmod 777 $DIR1/$tdir
+
+ # Configure jobid_var
+ saved_jobid_var=$($LCTL get_param -n jobid_var)
+ if [ $saved_jobid_var != procname_uid ]; then
+ set_persistent_param_and_check client \
+ "jobid_var" "$FSNAME.sys.jobid_var" procname_uid
+ fi
+
+ # start md thread
+ md_thread_77r & pid=$!
+ stack_trap "cleanup_77r $pid '$saved_jobid_var'"
+
+ local policy
+ for policy in "${policies[@]}"; do
+ local change_pid
+
+ # wait to queue requests
+ sleep 5
+
+ do_facet mds1 "$LCTL set_param mds.MDS.mdt.nrs_policies='$policy'" &
+ change_pid=$!
+
+ wait_policy_state "started" "$policy" "$change_pid" ||
+ error "timeout to start '$policy' policy"
+
+ [[ -n "${rules[$policy]}" ]] || continue
+
+ do_facet mds1 "$LCTL set_param mds.MDS.mdt.nrs_tbf_rule='${rules[$policy]}'" ||
+ error "fail to set rule '${rules[$policy]}' to '$policy'"
+ done
+
+ wait_policy_state "stopped" "tbf" ||
+ error "fail to stop tbf policy"
+
+ echo "check the number of requests in queue:"
+ local awkcmd='/name: / {last = $3} '
+ awkcmd+='/queued: / {printf " %s: %d\n", last, $2;'
+ awkcmd+=' if (last == "tbf" && $2 > 0) exit 1;}'
+ do_facet mds1 $LCTL get_param mds.MDS.mdt.nrs_policies | awk "$awkcmd" ||
+ error "request leak in tbf policies"
+}
+run_test 77r "Change type of tbf policy at run time"
+
test_78() { #LU-6673
local rc