Whamcloud - gitweb
LU-14976 nrs: change nrs policies at run time 23/48523/14
authorEtienne AUJAMES <etienne.aujames@cea.fr>
Mon, 12 Sep 2022 11:13:13 +0000 (13:13 +0200)
committerOleg Drokin <green@whamcloud.com>
Sat, 22 Apr 2023 17:27:48 +0000 (17:27 +0000)
This patch take extra references on policy to avoid stop a NRS policy
with pending/queued request in it.

It uses a new refcount_t "pol_start_ref" for this purpose to keep
track of policy usage in started state. It enables to safely stop a
policy without "nrs_lock" and avoids to sleep in the spinlock.

It adds a wait queue field "pol_wq" in "struct ptlrpc_nrs_policy" to
wait all queued request in a stopping policy to be drained when
restarting policy with a different argument.

Add test sanityn 77r for this use case.

Test-Parameters: testlist=sanityn env=ONLY=77r,ONLY_REPEAT=20
Signed-off-by: Etienne AUJAMES <eaujames@ddn.com>
Change-Id: I1425f52324f755f1b76ea8210de52647c072a592
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/48523
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Feng Lei <flei@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_nrs.h
lustre/ptlrpc/nrs.c
lustre/tests/sanityn.sh

index fd80e40..626b09f 100644 (file)
@@ -618,6 +618,10 @@ struct ptlrpc_nrs_policy {
         */
        long                            pol_ref;
        /**
+        * Usage Reference count taken for a started policy
+        */
+       refcount_t                      pol_start_ref;
+       /**
         * Human-readable policy argument
         */
        char                            pol_arg[NRS_POL_ARG_MAX];
@@ -633,6 +637,10 @@ struct ptlrpc_nrs_policy {
         * Policy descriptor for this policy instance.
         */
        struct ptlrpc_nrs_pol_desc     *pol_desc;
+       /**
+        * Policy wait queue
+        */
+       wait_queue_head_t               pol_wq;
 };
 
 /**
index ccfc4e9..fd1bdea 100644 (file)
@@ -61,6 +61,7 @@ static int nrs_policy_init(struct ptlrpc_nrs_policy *policy)
 static void nrs_policy_fini(struct ptlrpc_nrs_policy *policy)
 {
        LASSERT(policy->pol_ref == 0);
+       LASSERT(refcount_read(&policy->pol_start_ref) == 0);
        LASSERT(policy->pol_req_queued == 0);
 
        if (policy->pol_desc->pd_ops->op_policy_fini != NULL)
@@ -96,8 +97,10 @@ static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
                policy->pol_req_started == 0);
 
        policy->pol_private = NULL;
+       policy->pol_arg[0] = '\0';
 
        policy->pol_state = NRS_POL_STATE_STOPPED;
+       wake_up(&policy->pol_wq);
 
        if (atomic_dec_and_test(&policy->pol_desc->pd_refs))
                module_put(policy->pol_desc->pd_owner);
@@ -105,6 +108,26 @@ static void nrs_policy_stop0(struct ptlrpc_nrs_policy *policy)
        EXIT;
 }
 
+/**
+ * Increases the policy's usage started reference count.
+ */
+static inline void nrs_policy_started_get(struct ptlrpc_nrs_policy *policy)
+{
+       refcount_inc(&policy->pol_start_ref);
+}
+
+/**
+ * Decreases the policy's usage started reference count, and stops the policy
+ * in case it was already stopping and have no more outstanding usage
+ * references (which indicates it has no more queued or started requests, and
+ * can be safely stopped).
+ */
+static void nrs_policy_started_put(struct ptlrpc_nrs_policy *policy)
+{
+       if (refcount_dec_and_test(&policy->pol_start_ref))
+               nrs_policy_stop0(policy);
+}
+
 static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
 {
        struct ptlrpc_nrs *nrs = policy->pol_nrs;
@@ -131,9 +154,18 @@ static int nrs_policy_stop_locked(struct ptlrpc_nrs_policy *policy)
                nrs->nrs_policy_fallback = NULL;
        }
 
-       /* I have the only refcount */
-       if (policy->pol_ref == 1)
-               nrs_policy_stop0(policy);
+       /* Drop started ref and wait for requests to be drained */
+       spin_unlock(&nrs->nrs_lock);
+       nrs_policy_started_put(policy);
+
+       wait_event_timeout(policy->pol_wq,
+                          policy->pol_state == NRS_POL_STATE_STOPPED,
+                          cfs_time_seconds(30));
+
+       spin_lock(&nrs->nrs_lock);
+
+       if (policy->pol_state != NRS_POL_STATE_STOPPED)
+               RETURN(-EBUSY);
 
        RETURN(0);
 }
@@ -165,8 +197,10 @@ static void nrs_policy_stop_primary(struct ptlrpc_nrs *nrs)
        LASSERT(tmp->pol_state == NRS_POL_STATE_STARTED);
        tmp->pol_state = NRS_POL_STATE_STOPPING;
 
-       if (tmp->pol_ref == 0)
-               nrs_policy_stop0(tmp);
+       /* Drop started ref to free the policy */
+       spin_unlock(&nrs->nrs_lock);
+       nrs_policy_started_put(tmp);
+       spin_lock(&nrs->nrs_lock);
        EXIT;
 }
 
@@ -207,6 +241,11 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg)
        if (policy->pol_state == NRS_POL_STATE_STOPPING)
                RETURN(-EAGAIN);
 
+       if (arg && strlen(arg) >= sizeof(policy->pol_arg)) {
+               CWARN("NRS: arg '%s' is too long\n", arg);
+               return -EINVAL;
+       }
+
        if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
                /**
                 * This is for cases in which the user sets the policy to the
@@ -239,16 +278,13 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg)
                         * stop the policy first and start it again with the new
                         * argument.
                         */
-                       if ((arg != NULL) && (strlen(arg) >= NRS_POL_ARG_MAX))
-                               return -EINVAL;
-
                        if ((arg == NULL && strlen(policy->pol_arg) == 0) ||
                            (arg != NULL && strcmp(policy->pol_arg, arg) == 0))
                                RETURN(0);
 
                        rc = nrs_policy_stop_locked(policy);
                        if (rc)
-                               RETURN(-EAGAIN);
+                               RETURN(rc);
                }
        }
 
@@ -286,16 +322,11 @@ static int nrs_policy_start_locked(struct ptlrpc_nrs_policy *policy, char *arg)
                }
        }
 
-       if (arg != NULL) {
-               if (strlcpy(policy->pol_arg, arg, sizeof(policy->pol_arg)) >=
-                   sizeof(policy->pol_arg)) {
-                       CERROR("NRS: arg '%s' is too long\n", arg);
-                       GOTO(out, rc = -E2BIG);
-               }
-       } else {
-               policy->pol_arg[0] = '\0';
-       }
+       if (arg)
+               strlcpy(policy->pol_arg, arg, sizeof(policy->pol_arg));
 
+       /* take the started reference */
+       refcount_set(&policy->pol_start_ref, 1);
        policy->pol_state = NRS_POL_STATE_STARTED;
 
        if (policy->pol_flags & PTLRPC_NRS_FL_FALLBACK) {
@@ -322,34 +353,23 @@ out:
 }
 
 /**
- * Increases the policy's usage reference count.
+ * Increases the policy's usage reference count (caller count).
  */
 static inline void nrs_policy_get_locked(struct ptlrpc_nrs_policy *policy)
+__must_hold(&policy->pol_nrs->nrs_lock)
 {
        policy->pol_ref++;
 }
 
 /**
- * Decreases the policy's usage reference count, and stops the policy in case it
- * was already stopping and have no more outstanding usage references (which
- * indicates it has no more queued or started requests, and can be safely
- * stopped).
+ * Decreases the policy's usage reference count.
  */
 static void nrs_policy_put_locked(struct ptlrpc_nrs_policy *policy)
+__must_hold(&policy->pol_nrs->nrs_lock)
 {
        LASSERT(policy->pol_ref > 0);
 
        policy->pol_ref--;
-       if (unlikely(policy->pol_ref == 0 &&
-           policy->pol_state == NRS_POL_STATE_STOPPING))
-               nrs_policy_stop0(policy);
-}
-
-static void nrs_policy_put(struct ptlrpc_nrs_policy *policy)
-{
-       spin_lock(&policy->pol_nrs->nrs_lock);
-       nrs_policy_put_locked(policy);
-       spin_unlock(&policy->pol_nrs->nrs_lock);
 }
 
 /**
@@ -472,11 +492,11 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
        spin_lock(&nrs->nrs_lock);
 
        fallback = nrs->nrs_policy_fallback;
-       nrs_policy_get_locked(fallback);
+       nrs_policy_started_get(fallback);
 
        primary = nrs->nrs_policy_primary;
        if (primary != NULL)
-               nrs_policy_get_locked(primary);
+               nrs_policy_started_get(primary);
 
        spin_unlock(&nrs->nrs_lock);
 
@@ -496,7 +516,7 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
                 * request.
                 */
                if (resp[NRS_RES_PRIMARY] == NULL)
-                       nrs_policy_put(primary);
+                       nrs_policy_started_put(primary);
        }
 }
 
@@ -513,8 +533,7 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
 static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
 {
        struct ptlrpc_nrs_policy *pols[NRS_RES_MAX];
-       struct ptlrpc_nrs        *nrs = NULL;
-       int                       i;
+       int i;
 
        for (i = 0; i < NRS_RES_MAX; i++) {
                if (resp[i] != NULL) {
@@ -530,15 +549,8 @@ static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
                if (pols[i] == NULL)
                        continue;
 
-               if (nrs == NULL) {
-                       nrs = pols[i]->pol_nrs;
-                       spin_lock(&nrs->nrs_lock);
-               }
-               nrs_policy_put_locked(pols[i]);
+               nrs_policy_started_put(pols[i]);
        }
-
-       if (nrs != NULL)
-               spin_unlock(&nrs->nrs_lock);
 }
 
 /**
@@ -565,6 +577,10 @@ struct ptlrpc_nrs_request * nrs_request_get(struct ptlrpc_nrs_policy *policy,
 
        LASSERT(policy->pol_req_queued > 0);
 
+       /* for a non-started policy, use force mode to drain requests */
+       if (unlikely(policy->pol_state != NRS_POL_STATE_STARTED))
+               force = true;
+
        nrq = policy->pol_desc->pd_ops->op_req_get(policy, peek, force);
 
        LASSERT(ergo(nrq != NULL, nrs_request_policy(nrq) == policy));
@@ -603,6 +619,11 @@ static inline void nrs_request_enqueue(struct ptlrpc_nrs_request *nrq)
                if (rc == 0) {
                        policy->pol_nrs->nrs_req_queued++;
                        policy->pol_req_queued++;
+                       /**
+                        * Take an extra ref to avoid stopping policy with
+                        * pending request in it
+                        */
+                       nrs_policy_started_get(policy);
                        return;
                }
        }
@@ -709,48 +730,53 @@ out:
 static int nrs_policy_unregister(struct ptlrpc_nrs *nrs, char *name)
 {
        struct ptlrpc_nrs_policy *policy = NULL;
+       int rc = 0;
        ENTRY;
 
        spin_lock(&nrs->nrs_lock);
 
        policy = nrs_policy_find_locked(nrs, name);
        if (policy == NULL) {
-               spin_unlock(&nrs->nrs_lock);
-
-               CERROR("Can't find NRS policy %s\n", name);
-               RETURN(-ENOENT);
+               rc = -ENOENT;
+               CERROR("NRS: cannot find policy '%s': rc = %d\n", name, rc);
+               GOTO(out_unlock, rc);
        }
 
        if (policy->pol_ref > 1) {
-               CERROR("Policy %s is busy with %d references\n", name,
-                      (int)policy->pol_ref);
-               nrs_policy_put_locked(policy);
-
-               spin_unlock(&nrs->nrs_lock);
-               RETURN(-EBUSY);
+               rc = -EBUSY;
+               CERROR("NRS: policy '%s' is busy with %ld references: rc = %d",
+                      name, policy->pol_ref, rc);
+               GOTO(out_put, rc);
        }
 
        LASSERT(policy->pol_req_queued == 0);
        LASSERT(policy->pol_req_started == 0);
 
        if (policy->pol_state != NRS_POL_STATE_STOPPED) {
-               nrs_policy_stop_locked(policy);
-               LASSERT(policy->pol_state == NRS_POL_STATE_STOPPED);
+               rc = nrs_policy_stop_locked(policy);
+               if (rc) {
+                       CERROR("NRS: failed to stop policy '%s' with refcount %d: rc = %d\n",
+                              name, refcount_read(&policy->pol_start_ref), rc);
+                       GOTO(out_put, rc);
+               }
        }
 
+       LASSERT(policy->pol_private == NULL);
        list_del(&policy->pol_list);
        nrs->nrs_num_pols--;
 
+       EXIT;
+out_put:
        nrs_policy_put_locked(policy);
-
+out_unlock:
        spin_unlock(&nrs->nrs_lock);
 
-       nrs_policy_fini(policy);
-
-       LASSERT(policy->pol_private == NULL);
-       OBD_FREE_PTR(policy);
+       if (rc == 0) {
+               nrs_policy_fini(policy);
+               OBD_FREE_PTR(policy);
+       }
 
-       RETURN(0);
+       return rc;
 }
 
 /**
@@ -793,6 +819,8 @@ static int nrs_policy_register(struct ptlrpc_nrs *nrs,
        INIT_LIST_HEAD(&policy->pol_list);
        INIT_LIST_HEAD(&policy->pol_list_queued);
 
+       init_waitqueue_head(&policy->pol_wq);
+
        rc = nrs_policy_init(policy);
        if (rc != 0) {
                OBD_FREE_PTR(policy);
@@ -1495,6 +1523,9 @@ static void nrs_request_removed(struct ptlrpc_nrs_policy *policy)
                list_move_tail(&policy->pol_list_queued,
                                   &policy->pol_nrs->nrs_policy_queued);
        }
+
+       /* remove the extra ref for policy pending requests */
+       nrs_policy_started_put(policy);
 }
 
 /**
@@ -1782,5 +1813,3 @@ void ptlrpc_nrs_fini(void)
                OBD_FREE_PTR(desc);
        }
 }
-
-/** @} nrs */
index 86664a0..288020d 100755 (executable)
@@ -4674,6 +4674,122 @@ test_77p() {
 }
 run_test 77p "Check validity of rule names for TBF policies"
 
+cleanup_77r() {
+       local pid=$1
+       local saved_jobid=$2
+       local current_jobid_var
+
+       echo "cleanup 77r $pid"
+
+       do_facet mds1 $LCTL set_param -n mds.MDS.mdt.nrs_policies=fifo
+       kill $pid || echo "fail to kill md thread"
+
+       current_jobid_var=$($LCTL get_param -n jobid_var)
+       if [ $saved_jobid != $current_jobid_var ]; then
+               set_persistent_param_and_check client \
+                       "jobid_var" "$FSNAME.sys.jobid_var" $saved_jobid
+       fi
+
+       sleep 2
+       rm -rf $DIR1/$tdir
+}
+
+md_thread_run="true"
+md_thread_77r() {
+       local pid
+
+       while $md_thread_run; do
+               printf '%s\n' {$DIR1,$DIR2}/$tdir/${tfile}-{01..20} |
+                       xargs -P20 -I{} $RUNAS bash -c 'touch {}; rm -f {}' \
+                               &> /dev/null & pid=$!
+               trap "echo kill md_thread xargs; md_thread_run=false; kill $pid" INT TERM
+               wait $pid
+       done
+}
+
+wait_policy_state() {
+       local state="$1"
+       local policy="$2"
+       local change_pid="$3"
+       local time
+
+       for time in {1..60}; do
+               local nbr_started
+
+               nbr_started=$(do_facet mds1 $LCTL get_param mds.MDS.mdt.nrs_policies |
+                       egrep -A2 "name: ${policy}$" | grep -c "state: $state")
+
+               [[ "$nbr_started" != 2 ]] || return 0
+               sleep 1
+       done
+
+       [[ -z "$change_pid" ]] || kill $change_pid || true
+       return 1
+}
+
+test_77r() { #LU-14976
+       local pid
+       local -A rules
+       local -a policies
+       local saved_jobid_var
+
+       rules["tbf uid"]="start md_rule uid={$RUNAS_ID} rate=1"
+       rules["tbf gid"]="start md_rule gid={$RUNAS_GID} rate=1"
+       rules["tbf jobid"]="start md_rule jobid={*.$RUNAS_ID} rate=1"
+       rules["tbf"]="start md_rule uid={$RUNAS_ID} rate=1"
+       policies=(
+       "tbf uid"
+       "tbf gid"
+       "tbf jobid"
+       "tbf"
+       "fifo"
+       )
+
+       test_mkdir -i 0 -c 1 $DIR1/$tdir
+       chmod 777 $DIR1/$tdir
+
+       # Configure jobid_var
+       saved_jobid_var=$($LCTL get_param -n jobid_var)
+       if [ $saved_jobid_var != procname_uid ]; then
+               set_persistent_param_and_check client \
+                       "jobid_var" "$FSNAME.sys.jobid_var" procname_uid
+       fi
+
+       # start md thread
+       md_thread_77r & pid=$!
+       stack_trap "cleanup_77r $pid '$saved_jobid_var'"
+
+       local policy
+       for policy in "${policies[@]}"; do
+               local change_pid
+
+               # wait to queue requests
+               sleep 5
+
+               do_facet mds1 "$LCTL set_param mds.MDS.mdt.nrs_policies='$policy'" &
+               change_pid=$!
+
+               wait_policy_state "started" "$policy" "$change_pid" ||
+                       error "timeout to start '$policy' policy"
+
+               [[ -n "${rules[$policy]}" ]] || continue
+
+               do_facet mds1 "$LCTL set_param mds.MDS.mdt.nrs_tbf_rule='${rules[$policy]}'" ||
+                       error "fail to set rule '${rules[$policy]}' to '$policy'"
+       done
+
+       wait_policy_state "stopped" "tbf" ||
+               error "fail to stop tbf policy"
+
+       echo "check the number of requests in queue:"
+       local awkcmd='/name: / {last = $3} '
+       awkcmd+='/queued: / {printf "    %s: %d\n", last, $2;'
+       awkcmd+='       if (last == "tbf" && $2 > 0) exit 1;}'
+       do_facet mds1 $LCTL get_param mds.MDS.mdt.nrs_policies | awk "$awkcmd" ||
+               error "request leak in tbf policies"
+}
+run_test 77r "Change type of tbf policy at run time"
+
 test_78() { #LU-6673
        local rc