Whamcloud - gitweb
LU-5134 utils: Add parallel option to lctl set_param 55/10555/33
authorRyan Haasken <haasken@cray.com>
Tue, 3 May 2016 19:49:57 +0000 (15:49 -0400)
committerOleg Drokin <green@whamcloud.com>
Wed, 25 Oct 2023 18:03:58 +0000 (18:03 +0000)
Add a "-t" option to lctl set_param to enable setting multiple matched
parameters in parallel. When called with "-t", lctl will set up a work
queue of matched file names and spawn a fixed number of threads per
CPU. Each thread will pull items off the work queue, write to the file
associated with each work item, and return when there are no more
items on the work queue.

A field called po_parallel_threads is added to struct param_opts to
indicate the number of threads set_param should run in parallel. If in
parallel, jt_lcfg_setparam initializes a work queue and passes it to
do_param_op, which adds each matched item to the work queue. Once
jt_lcfg_setparam has called do_param_op for each param-value pair, it
passes the work queue to sp_run_threads, which creates threads, each
of which call write_param to set the parameter. If not in parallel,
jt_lcfg_setparam does not pass a work queue to do_param_op, and
do_param_op directly calls write_param on each matched param.

param_display was renamed to do_param_op to more accurately reflect
what it does.

If lctl is compiled without pthread support, "lctl set_param" will
still accept the "-t" option, but it will print a warning message, and
it will set the parameters in series.

The new "-t" option to set_param was documented in the lctl usage and
in the man page.

HPE-bug-id: LUS-2592
Signed-off-by: Ryan Haasken <haasken@cray.com>
Signed-off-by: Chris Horn <chris.horn@hpe.com>
Change-Id: I3f96a6f06c50d4ba2ce97050c35f46b976dfc005
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/10555
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Emoly Liu <emoly@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/doc/lctl-set_param.8
lustre/tests/sanity.sh
lustre/tests/test-framework.sh
lustre/utils/Makefile.am
lustre/utils/lctl.c
lustre/utils/lctl_thread.c [new file with mode: 0644]
lustre/utils/lctl_thread.h [new file with mode: 0644]
lustre/utils/lustre_cfg.c

index 28477a3..fbf7706 100644 (file)
@@ -2,7 +2,13 @@
 .SH NAME
 lctl-set_param \- Lustre filesystem set parameter utility
 .SH SYNOPSIS
-.IR "\fBlctl set_param " [ -n "] [" -P "] [" -d "] <" parameter \B= value ...>
+.B "\fBlctl set_param "
+.RB [ \-d ]
+.RB [ \-h ]
+.RB [ \-n ]
+.RB [ \-P ]
+.RB [ \-t [ \fITHREAD_COUNT ]]
+.RI < parameter \= value ...>
 .br
 .IR "\fBlctl set_param -F " < filename >
 .SH DESCRIPTION
@@ -63,6 +69,20 @@ and
 such as the filesystem and/or target name.  This option is only available
 in Lustre 2.5.0 and later clients, older clients cannot set persistent
 parameters, nor will they see them.
+.TP
+.B -t
+Spawn threads to set multiple parameters in parallel, optionally specifying
+the maximum number of threads to run (with no space between
+.B -t
+and
+.IR THREAD_COUNT ).
+Without this option parameter(s) are set serially.
+If
+.B lctl
+was compiled without pthread support, a warning message will be
+printed and parameters will be set serially.  If
+.B -t
+is specified without any argument, it runs up to 8 threads by default.
 .SH EXAMPLES
 .B # lctl set_param fail_loc=0 timeout=20
 .br
@@ -76,6 +96,13 @@ timeout=20
 .br
 20
 .br
+.B
+# lctl set_param -t2 "ldlm.namespaces.*osc*.lru_size=clear"
+.br
+ldlm.namespaces.fsname-OST0001-osc-MDT0000.lru_size=clear
+.br
+ldlm.namespaces.fsname-OST0000-osc-MDT0000.lru_size=clear
+.br
 .B # lctl set_param -P osc.testfs-OST*.max_dirty_mb=512
 .br
 osc.testfs-OST0000-osc-ffff8803c9c0f000.max_dirty_mb=512
index 56740d7..3f96ca7 100755 (executable)
@@ -3572,43 +3572,25 @@ test_29() {
        log 'first d29'
        ls -l $DIR/d29
 
-       declare -i LOCKCOUNTORIG=0
-       for lock_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_count); do
-               let LOCKCOUNTORIG=$LOCKCOUNTORIG+$lock_count
-       done
-       [ $LOCKCOUNTORIG -eq 0 ] && error "No mdc lock count" && return 1
+       local locks_orig=$(total_used_locks mdc)
+       (( $locks_orig != 0 )) || error "No mdc lock count"
 
-       declare -i LOCKUNUSEDCOUNTORIG=0
-       for unused_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_unused_count); do
-               let LOCKUNUSEDCOUNTORIG=$LOCKUNUSEDCOUNTORIG+$unused_count
-       done
+       local locks_unused_orig=$(total_unused_locks mdc)
 
        log 'second d29'
        ls -l $DIR/d29
        log 'done'
 
-       declare -i LOCKCOUNTCURRENT=0
-       for lock_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_count); do
-               let LOCKCOUNTCURRENT=$LOCKCOUNTCURRENT+$lock_count
-       done
+       local locks_current=$(total_used_locks mdc)
 
-       declare -i LOCKUNUSEDCOUNTCURRENT=0
-       for unused_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_unused_count); do
-               let LOCKUNUSEDCOUNTCURRENT=$LOCKUNUSEDCOUNTCURRENT+$unused_count
-       done
+       local locks_unused_current=$(total_unused_locks mdc)
 
-       if [[ $LOCKCOUNTCURRENT -gt $LOCKCOUNTORIG ]]; then
+       if (( $locks_current > $locks_orig )); then
                $LCTL set_param -n ldlm.dump_namespaces ""
-               error "CURRENT: $LOCKCOUNTCURRENT > $LOCKCOUNTORIG"
-               $LCTL dk | sort -k4 -t: > $TMP/test_29.dk
-               log "dumped log to $TMP/test_29.dk (bug 5793)"
-               return 2
+               error "CURRENT: $locks_current > $locks_orig"
        fi
-       if [[ $LOCKUNUSEDCOUNTCURRENT -gt $LOCKUNUSEDCOUNTORIG ]]; then
-               error "UNUSED: $LOCKUNUSEDCOUNTCURRENT > $LOCKUNUSEDCOUNTORIG"
-               $LCTL dk | sort -k4 -t: > $TMP/test_29.dk
-               log "dumped log to $TMP/test_29.dk (bug 5793)"
-               return 3
+       if (( $locks_unused_current > $locks_unused_orig )); then
+               error "UNUSED: $locks_unused_current > $locks_unused_orig"
        fi
 }
 run_test 29 "IT_GETATTR regression  ============================"
@@ -11347,10 +11329,6 @@ cleanup_test101bc() {
        set_osd_param $list '' writethrough_cache_enable 1
 }
 
-calc_total() {
-       awk 'BEGIN{total=0}; {total+=$1}; END{print total}'
-}
-
 ra_check_101() {
        local read_size=$1
        local stripe_size=$2
@@ -11359,7 +11337,7 @@ ra_check_101() {
        local discard_limit=$(( ((stride_length - 1) * 3 / stride_width) *
                                (stride_width - stride_length) ))
        local discard=$($LCTL get_param -n llite.*.read_ahead_stats |
-                 get_named_value 'read.but.discarded' | calc_total)
+                 get_named_value 'read.but.discarded' | calc_sum)
 
        if [[ $discard -gt $discard_limit ]]; then
                $LCTL get_param llite.*.read_ahead_stats
@@ -11524,7 +11502,7 @@ test_101e() {
        $LCTL get_param llite.*.max_cached_mb
        $LCTL get_param llite.*.read_ahead_stats
        local miss=$($LCTL get_param -n llite.*.read_ahead_stats |
-                    get_named_value 'misses' | calc_total)
+                    get_named_value 'misses' | calc_sum)
 
        for ((i = 0; i < $count; i++)); do
                rm -rf $file.$i 2>/dev/null
@@ -11558,7 +11536,7 @@ test_101f() {
        echo checking missing pages
        $LCTL get_param llite.*.read_ahead_stats
        local miss=$($LCTL get_param -n llite.*.read_ahead_stats |
-                       get_named_value 'misses' | calc_total)
+                       get_named_value 'misses' | calc_sum)
 
        $LCTL set_param debug="$old_debug"
        [ $miss -lt 3 ] || error "misses too much pages ('$miss')!"
@@ -11663,7 +11641,7 @@ test_101h() {
        echo "Read 10M of data but cross 64M bundary"
        dd if=$DIR/$tfile of=/dev/null bs=10M skip=6 count=1
        local miss=$($LCTL get_param -n llite.*.read_ahead_stats |
-                    get_named_value 'misses' | calc_total)
+                    get_named_value 'misses' | calc_sum)
        [ $miss -eq 1 ] || error "expected miss 1 but got $miss"
        rm -f $p $DIR/$tfile
 }
@@ -11711,7 +11689,7 @@ test_101j() {
                local count=$(($file_size / $blk))
                dd if=$DIR/$tfile bs=$blk count=$count of=/dev/null
                local miss=$($LCTL get_param -n llite.*.read_ahead_stats |
-                            get_named_value 'failed.to.fast.read' | calc_total)
+                            get_named_value 'failed.to.fast.read' | calc_sum)
                $LCTL get_param -n llite.*.read_ahead_stats
                [ $miss -eq $count ] || error "expected $count got $miss"
        done
@@ -11739,7 +11717,7 @@ test_readahead_base() {
        $MULTIOP $file or${iosz}c || error "failed to read $file"
        $LCTL get_param -n llite.*.read_ahead_stats
        ranum=$($LCTL get_param -n llite.*.read_ahead_stats |
-               awk '/readahead.pages/ { print $7 }' | calc_total)
+               awk '/readahead.pages/ { print $7 }' | calc_sum)
        (( $ranum <= $ramax )) ||
                error "read-ahead pages is $ranum more than $ramax"
        rm -rf $file || error "failed to remove $file"
@@ -14709,7 +14687,7 @@ test_124d() {
 
        [ $remaining -eq 0 ] || error "$remaining locks are not canceled"
 }
-run_test 124d "cancel very aged locks if lru-resize diasbaled"
+run_test 124d "cancel very aged locks if lru-resize disabled"
 
 test_125() { # 13358
        $LCTL get_param -n llite.*.client_type | grep -q local ||
index 5097117..8d119bc 100755 (executable)
@@ -6492,10 +6492,28 @@ set_nodes_failloc () {
        do_nodes $(comma_list $1)  lctl set_param fail_val=$fv fail_loc=$2
 }
 
+# Print the total of the lock_unused_count across all namespaces containing the
+# given wildcard. If the namespace wildcard is omitted, all namespaces will be
+# matched.
+# Usage: total_unused_locks [namespace_wildcard]
+total_unused_locks() {
+       $LCTL get_param -n "ldlm.namespaces.*$1*.lock_unused_count" | calc_sum
+}
+
+# Print the total of the lock_count across all namespaces containing the given
+# wildcard. If the namespace wilcard is omitted, all namespaces will be matched.
+# Usage: total_used_locks [namespace_wildcard]
+total_used_locks() {
+       $LCTL get_param -n "ldlm.namespaces.*$1*.lock_count" | calc_sum
+}
+
+# Cancel lru locks across all namespaces containing the given wildcard. If the
+# wilcard is omitted, lru locks will be canceled across all namespaces.
+# Usage: cancel_lru_locks [namespace_wildcard]
 cancel_lru_locks() {
        #$LCTL mark "cancel_lru_locks $1 start"
-       $LCTL set_param -n ldlm.namespaces.*$1*.lru_size=clear
-       $LCTL get_param ldlm.namespaces.*$1*.lock_unused_count | grep -v '=0'
+       $LCTL set_param -t4 -n "ldlm.namespaces.*$1*.lru_size=clear"
+       $LCTL get_param "ldlm.namespaces.*$1*.lock_unused_count" | grep -v '=0'
        #$LCTL mark "cancel_lru_locks $1 stop"
 }
 
index 8e84fd1..56f67af 100644 (file)
@@ -68,6 +68,7 @@ pkglib_LTLIBRARIES =
 lib_LTLIBRARIES = liblustreapi.la
 
 lctl_SOURCES = portals.c debug.c obd.c lustre_cfg.c lctl.c obdctl.h
+lctl_SOURCES += lctl_thread.c lctl_thread.h
 if SERVER
 lctl_SOURCES += lustre_lfsck.c lsnapshot.c
 endif
index d4db99a..6502d28 100644 (file)
@@ -45,6 +45,7 @@
 #include "obdctl.h"
 #include <linux/lustre/lustre_ver.h>
 #include <lustre/lustreapi.h>
+#include "lctl_thread.h"
 
 static int jt_pcc(int argc, char **argv);
 
@@ -194,13 +195,21 @@ command_t cmdlist[] = {
         "      (Especially useful when using patterns.)\n"
         "  -R  Get parameters recursively from the specified entry.\n"},
        {"set_param", jt_lcfg_setparam, 0, "set the Lustre or LNET parameter\n"
-        "usage: set_param [-n] [-P] [-d] [-F]"
-        "<param_path1=value1 param_path2=value2 ...>\n"
+        "usage: set_param [-n] [-P] [-d] [-F] "
+#ifdef HAVE_LIBPTHREAD
+        "[-t[THREAD_COUNT]] "
+#endif
+        "PARAM1=VALUE1 [PARAM2=VALUE2 ...]\n"
         "Set the value of the Lustre or LNET parameter at the specified path.\n"
         "  -n  Disable printing of the key name when printing values.\n"
         "  -P  Set the parameter permanently, filesystem-wide.\n"
         "  -d  Remove the permanent setting (only with -P option).\n"
-        "  -F  Read permanent configuration from a YAML file.\n"},
+        "  -F  Read permanent configuration from a YAML file.\n"
+#ifdef HAVE_LIBPTHREAD
+        "  -t  Set parameters in parallel, max THREAD_COUNT threads\n"
+        "    (default " STRINGIFY(LCFG_THREADS_DEF) ").\n"
+#endif
+       },
        {"apply_yaml", jt_lcfg_applyyaml, 0, "set/config the Lustre or LNET "
         "parameters using configuration from a YAML file.\n"
         "usage: apply_yaml file\n"},
diff --git a/lustre/utils/lctl_thread.c b/lustre/utils/lctl_thread.c
new file mode 100644 (file)
index 0000000..f4bd5d7
--- /dev/null
@@ -0,0 +1,309 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ *
+ * lustre/utils/lctl_thread.c
+ *
+ * Author: Rajeev Mishra <rajeevm@hpe.com>
+ */
+ #include <errno.h>
+ #include <stdio.h>
+ #include <stdarg.h>
+ #include <ctype.h>
+ #include "lctl_thread.h"
+ #include <stdlib.h>
+ #include <libcfs/util/string.h>
+#if HAVE_LIBPTHREAD
+/**
+ * Initialize the given set_param work queue.
+ *
+ * \param[out] wq  the work queue to initialize
+ * \param[in] popt the options passed to set_param
+ *
+ * \retval 0 if successful
+ * \retval -errno if unsuccessful
+ */
+int spwq_init(struct sp_workq *wq, struct param_opts *popt)
+{
+       if (!wq)
+               return -EINVAL;
+
+       memset(wq, 0, sizeof(*wq));
+       wq->spwq_popt = popt;
+
+       /* pthread_mutex_init returns 0 for success, or errno for failure */
+       return -pthread_mutex_init(&wq->spwq_mutex, NULL);
+}
+
+/**
+ * Destroy and free space used by a set_param work queue.
+ *
+ * \param[in] wq the work queue to destroy
+ *
+ * \retval 0 if successful
+ * \retval -errno if unsuccessful
+ */
+int spwq_destroy(struct sp_workq *wq)
+{
+       int rc;
+
+       if (!wq)
+               return 0;
+
+       if (wq->spwq_items) {
+               int i;
+
+               for (i = 0; i < wq->spwq_len; i++) {
+                       free(wq->spwq_items[i].spwi_path);
+                       free(wq->spwq_items[i].spwi_param_name);
+                       /* wq->spwq_items[i].spwi_value was not malloc'd */
+               }
+               free(wq->spwq_items);
+       }
+
+       /* pthread_mutex_destroy returns 0 for success, or errno for failure */
+       rc = -pthread_mutex_destroy(&wq->spwq_mutex);
+
+       memset(wq, 0, sizeof(*wq));
+
+       return rc;
+}
+
+/**
+ * Expand the size of a work queue to fit the requested number of items.
+ *
+ * \param[in,out] wq    the work queue to expand
+ * \param[in] num_items the number of items to make room for in \a wq
+ *
+ * \retval 0 if successful
+ * \retval -errno if unsuccessful
+ */
+int spwq_expand(struct sp_workq *wq, size_t num_items)
+{
+       int space;
+       int new_size;
+       struct sp_work_item *tmp;
+
+       if (!wq)
+               return -EINVAL;
+
+       space = wq->spwq_size - wq->spwq_len;
+
+       /* First check if there's already enough room. */
+       if (space >= num_items)
+               return 0;
+
+       new_size = wq->spwq_len + num_items;
+
+       /* When spwq_items is NULL, realloc behaves like malloc */
+       tmp = realloc(wq->spwq_items, new_size * sizeof(struct sp_work_item));
+
+       if (!tmp)
+               return -ENOMEM;
+
+       wq->spwq_items = tmp;
+       wq->spwq_size = new_size;
+
+       return 0;
+}
+
+/**
+ * Add an item to a set_param work queue. Not thread-safe.
+ *
+ * \param[in,out] wq     the work queue to which the item should be added
+ * \param[in] path       the full path to the parameter file (will be copied)
+ * \param[in] param_name the name of the parameter (will be copied)
+ * \param[in] value      the value for the parameter (will not be copied)
+ *
+ * \retval 0 if successful
+ * \retval -errno if unsuccessful
+ */
+int spwq_add_item(struct sp_workq *wq, char *path,
+                        char *param_name, char *value)
+{
+       char *path_copy;
+       char *param_name_copy;
+       int rc;
+
+       if (!(wq && path && param_name && value))
+               return -EINVAL;
+
+       /* Hopefully the caller has expanded the work queue before calling this
+        * function, but make sure there's room just in case.
+        */
+       rc = spwq_expand(wq, 1);
+       if (rc < 0)
+               return rc;
+
+       path_copy = strdup(path);
+       if (!path_copy)
+               return -ENOMEM;
+
+       param_name_copy = strdup(param_name);
+       if (!param_name_copy) {
+               free(path_copy);
+               return -ENOMEM;
+       }
+
+       wq->spwq_items[wq->spwq_len].spwi_param_name = param_name_copy;
+       wq->spwq_items[wq->spwq_len].spwi_path = path_copy;
+       wq->spwq_items[wq->spwq_len].spwi_value = value;
+
+       wq->spwq_len++;
+
+       return 0;
+}
+
+/**
+ * Gets the next item from the set_param \a wq in a thread-safe manner.
+ *
+ * \param[in] wq  the workq from which to obtain the next item
+ * \param[out] wi the next work item in \a wa, will be set to NULL if \wq empty
+ *
+ * \retval 0 if successful (empty work queue is considered successful)
+ * \retval -errno if unsuccessful
+ */
+static int spwq_next_item(struct sp_workq *wq, struct sp_work_item **wi)
+{
+       int rc_lock;
+       int rc_unlock;
+
+       if (!(wq && wi))
+               return -EINVAL;
+
+       *wi = NULL;
+
+       rc_lock = pthread_mutex_lock(&wq->spwq_mutex);
+       if (rc_lock == 0) {
+               if (wq->spwq_cur_index < wq->spwq_len)
+                       *wi = &wq->spwq_items[wq->spwq_cur_index++];
+               rc_unlock = pthread_mutex_unlock(&wq->spwq_mutex);
+       }
+
+       return rc_lock != 0 ? -rc_lock : -rc_unlock;
+}
+
+/**
+ * A set_param worker thread which sets params from the workq.
+ *
+ * \param[in] arg a pointer to a struct sp_workq
+ *
+ * \retval 0 if successful
+ * \retval -errno if unsuccessful
+ */
+static void *sp_thread(void *arg)
+{
+       struct sp_workq *wq = (struct sp_workq *)arg;
+       struct param_opts *popt = wq->spwq_popt;
+       struct sp_work_item *work_item;
+       long int rc = 0;
+
+       rc = spwq_next_item(wq, &work_item);
+       if (rc < 0)
+               return (void *)rc;
+
+       while (work_item) {
+               char *path = work_item->spwi_path;
+               char *param_name = work_item->spwi_param_name;
+               char *value = work_item->spwi_value;
+               int rc2;
+
+               rc2 = write_param(path, param_name, popt, value);
+               if (rc2 < 0)
+                       rc = rc2;
+               rc2 = spwq_next_item(wq, &work_item);
+               if (rc2 < 0)
+                       rc = rc2;
+       }
+
+       return (void *)rc;
+}
+
+/**
+ * Spawn threads and set parameters in a work queue in parallel.
+ *
+ * \param[in] wq the work queue containing parameters to set
+ *
+ * \retval 0 if successful
+ * \retval -errno if unsuccessful
+ */
+int sp_run_threads(struct sp_workq *wq)
+{
+       int rc = 0;
+       int i;
+       int j;
+       int num_threads;
+       pthread_t *sp_threads;
+
+       if (!wq)
+               return -EINVAL;
+
+       if (wq->spwq_len == 0)
+               return 0;
+
+       num_threads = wq->spwq_popt->po_parallel_threads;
+       if (num_threads > wq->spwq_len)
+               num_threads = wq->spwq_len;
+
+       sp_threads = malloc(sizeof(pthread_t) * num_threads);
+       if (!sp_threads)
+               return -ENOMEM;
+
+       for (i = 0; i < num_threads; i++) {
+               rc = -pthread_create(&sp_threads[i], NULL, &sp_thread, wq);
+               if (rc != 0)
+                       break;
+       }
+
+       /* check if we failed to create any threads at all */
+       if (i == 0)
+               goto out_free;
+
+       /* ignore thread creation errors if at least one was created */
+       rc = 0;
+
+       for (j = 0; j < i; j++) {
+               int join_rc;
+               void *res = NULL;
+
+               join_rc = -pthread_join(sp_threads[j], &res);
+               if (join_rc && rc == 0)
+                       rc = join_rc;
+               if (res)
+                       /* this error takes priority over join errors */
+                       rc = (long int)res;
+       }
+
+out_free:
+       free(sp_threads);
+       return rc;
+}
+#else
+#define popt_is_parallel(popt) 0
+#define spwq_init(wq, popt) 0
+#define spwq_expand(wq, num_items) 0
+#define spwq_add_item(wq, path, param_name, value) 0
+#define sp_run_threads(wq) 0
+#define spwq_destroy(wq) 0
+struct sp_workq { int unused; }
+#endif /* HAVE_LIBPTHREAD */
diff --git a/lustre/utils/lctl_thread.h b/lustre/utils/lctl_thread.h
new file mode 100644 (file)
index 0000000..12f70ce
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * LGPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
+ *     alternatives
+ *
+ * Copyright (c) 2016, 2017, Intel Corporation.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the GNU Lesser General Public License
+ * (LGPL) version 2.1 or (at your discretion) any later version.
+ * (LGPL) version 2.1 accompanies this distribution, and is available at
+ * http://www.gnu.org/licenses/lgpl-2.1.html
+ *
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * LGPL HEADER END
+ */
+/*
+ *
+ * lustre/utils/lctl_thread.h
+ *
+ * Author: Rajeev Mishra <rajeevm@hpe.com>
+ */
+#if HAVE_LIBPTHREAD
+#include <pthread.h>
+#endif
+#ifndef STRINGIFY
+#define STRINGIFY(a) #a
+#endif
+
+struct param_opts {
+       unsigned int po_only_path:1;
+       unsigned int po_show_path:1;
+       unsigned int po_show_type:1;
+       unsigned int po_recursive:1;
+       unsigned int po_perm:1;
+       unsigned int po_delete:1;
+       unsigned int po_only_dir:1;
+       unsigned int po_file:1;
+       unsigned int po_yaml:1;
+       unsigned int po_detail:1;
+       unsigned int po_parallel_threads;
+};
+#define popt_is_parallel(popt) ((popt).po_parallel_threads > 0)
+
+#ifdef HAVE_LIBPTHREAD
+int write_param(const char *path, const char *param_name,
+               struct param_opts *popt, const char *value);
+
+#define LCFG_THREADS_DEF 8
+
+/* A work item for parallel set_param */
+struct sp_work_item {
+       /* The full path to the parameter file */
+       char *spwi_path;
+
+       /* The parameter name as returned by display_name */
+       char *spwi_param_name;
+
+       /* The value to which the parameter is to be set */
+       char *spwi_value;
+};
+
+/* A work queue struct for parallel set_param */
+struct sp_workq {
+       /* The parameter options passed to set_param */
+       struct param_opts *spwq_popt;
+
+       /* The number of valid items in spwq_items */
+       int spwq_len;
+
+       /* The size of the spwq_items list */
+       int spwq_size;
+
+       /* The current index into the spwq_items list */
+       int spwq_cur_index;
+
+       /* Array of work items. */
+       struct sp_work_item *spwq_items;
+
+       /* A mutex to control access to the work queue */
+       pthread_mutex_t spwq_mutex;
+};
+
+int spwq_init(struct sp_workq *wq, struct param_opts *popt);
+int spwq_destroy(struct sp_workq *wq);
+int spwq_expand(struct sp_workq *wq, size_t num_items);
+int spwq_add_item(struct sp_workq *wq, char *path, char *param_name,
+                 char *value);
+int sp_run_threads(struct sp_workq *wq);
+#endif
index ad6c78b..df3fd3d 100644 (file)
@@ -49,7 +49,6 @@
 #include <stdio.h>
 #include <stdarg.h>
 #include <ctype.h>
-
 #include <libcfs/util/ioctl.h>
 #include <libcfs/util/string.h>
 #include <libcfs/util/param.h>
 #include <linux/lnet/lnetctl.h>
 #include <linux/lustre/lustre_cfg.h>
 #include <linux/lustre/lustre_ioctl.h>
-#include <linux/lustre/lustre_ver.h>
-
 #include <linux/lustre/lustre_kernelcomm.h>
+#include <linux/lustre/lustre_ver.h>
 #include <lnetconfig/liblnetconfig.h>
+
+#include "lctl_thread.h"
 #include "lustreapi_internal.h"
 
 #include <sys/un.h>
@@ -445,19 +445,6 @@ int jt_lcfg_param(int argc, char **argv)
        return jt_lcfg_ioctl(&bufs, argv[0], LCFG_PARAM);
 }
 
-struct param_opts {
-       unsigned int po_only_path:1;
-       unsigned int po_show_path:1;
-       unsigned int po_show_type:1;
-       unsigned int po_recursive:1;
-       unsigned int po_perm:1;
-       unsigned int po_delete:1;
-       unsigned int po_only_dir:1;
-       unsigned int po_file:1;
-       unsigned int po_yaml:1;
-       unsigned int po_detail:1;
-};
-
 int lcfg_setparam_perm(char *func, char *buf)
 {
        int rc = 0;
@@ -857,7 +844,7 @@ free_buf:
  * \retval number of bytes written on success.
  * \retval -errno on error.
  */
-static int
+int
 write_param(const char *path, const char *param_name, struct param_opts *popt,
            const char *value)
 {
@@ -1094,29 +1081,35 @@ error:
        return rc;
 }
 
+
 /**
  * Perform a read, write or just a listing of a parameter
  *
- * \param[in] popt             list,set,get parameter options
- * \param[in] pattern          search filter for the path of the parameter
- * \param[in] value            value to set the parameter if write operation
- * \param[in] mode             what operation to perform with the parameter
+ * \param[in] popt     list,set,get parameter options
+ * \param[in] pattern  search filter for the path of the parameter
+ * \param[in] value    value to set the parameter if write operation
+ * \param[in] oper     what operation to perform with the parameter
+ * \param[out] wq      the work queue to which work items will be added or NULL
+ *                     if not in parallel
  *
  * \retval number of bytes written on success.
  * \retval -errno on error and prints error message.
  */
 static int
-param_display(struct param_opts *popt, char *pattern, char *value,
-             enum parameter_operation mode)
+do_param_op(struct param_opts *popt, char *pattern, char *value,
+           enum parameter_operation oper, struct sp_workq *wq)
 {
        int dup_count = 0;
        char **dup_cache;
        glob_t paths;
-       char *opname = parameter_opname[mode];
+       char *opname = parameter_opname[oper];
        int rc, i;
 
+       if (!wq && popt_is_parallel(*popt))
+               return -EINVAL;
+
        rc = llapi_param_get_paths(pattern, &paths);
-       if (rc != 0) {
+       if (rc) {
                rc = -errno;
                if (!popt->po_recursive && !(rc == -ENOENT && getuid() != 0)) {
                        fprintf(stderr, "error: %s: param_path '%s': %s\n",
@@ -1125,6 +1118,13 @@ param_display(struct param_opts *popt, char *pattern, char *value,
                return rc;
        }
 
+       if (popt_is_parallel(*popt) && paths.gl_pathc > 1) {
+               /* Allocate space for the glob paths in advance. */
+               rc = spwq_expand(wq, paths.gl_pathc);
+               if (rc < 0)
+                       goto out_param;
+       }
+
        dup_cache = calloc(paths.gl_pathc, sizeof(char *));
        if (!dup_cache) {
                rc = -ENOMEM;
@@ -1143,7 +1143,7 @@ param_display(struct param_opts *popt, char *pattern, char *value,
                if (stat(paths.gl_pathv[i], &st) == -1) {
                        fprintf(stderr, "error: %s: stat '%s': %s\n",
                                opname, paths.gl_pathv[i], strerror(errno));
-                       if (rc == 0)
+                       if (!rc)
                                rc = -errno;
                        continue;
                }
@@ -1156,26 +1156,33 @@ param_display(struct param_opts *popt, char *pattern, char *value,
                        fprintf(stderr,
                                "error: %s: generating name for '%s': %s\n",
                                opname, paths.gl_pathv[i], strerror(ENOMEM));
-                       if (rc == 0)
+                       if (!rc)
                                rc = -ENOMEM;
                        continue;
                }
 
-               switch (mode) {
+               switch (oper) {
                case GET_PARAM:
                        /* Read the contents of file to stdout */
                        if (S_ISREG(st.st_mode)) {
                                rc2 = read_param(paths.gl_pathv[i], param_name,
                                                 popt);
-                               if (rc2 < 0 && rc == 0)
+                               if (rc2 < 0 && !rc)
                                        rc = rc2;
                        }
                        break;
                case SET_PARAM:
                        if (S_ISREG(st.st_mode)) {
-                               rc2 = write_param(paths.gl_pathv[i],
-                                                 param_name, popt, value);
-                               if (rc2 < 0 && rc == 0)
+                               if (popt_is_parallel(*popt))
+                                       rc2 = spwq_add_item(wq,
+                                                           paths.gl_pathv[i],
+                                                           param_name, value);
+                               else
+                                       rc2 = write_param(paths.gl_pathv[i],
+                                                         param_name, popt,
+                                                         value);
+
+                               if (rc2 < 0 && !rc)
                                        rc = rc2;
                        }
                        break;
@@ -1228,7 +1235,7 @@ param_display(struct param_opts *popt, char *pattern, char *value,
                                opname, param_name, strerror(-rc2));
                        free(param_name);
                        param_name = NULL;
-                       if (rc == 0)
+                       if (!rc)
                                rc = rc2;
                        continue;
                }
@@ -1244,7 +1251,7 @@ param_display(struct param_opts *popt, char *pattern, char *value,
 
                /* Shouldn't happen but just in case */
                if (!tmp) {
-                       if (rc == 0)
+                       if (!rc)
                                rc = -EINVAL;
                        continue;
                }
@@ -1261,15 +1268,15 @@ param_display(struct param_opts *popt, char *pattern, char *value,
                if (rc2 >= sizeof(pathname)) {
                        fprintf(stderr, "error: %s: overflow processing '%s'\n",
                                opname, pathname);
-                       if (rc == 0)
+                       if (!rc)
                                rc = -EINVAL;
                        continue;
                }
 
-               rc2 = param_display(popt, pathname, value, mode);
-               if (rc2 != 0 && rc2 != -ENOENT) {
-                       /* errors will be printed by param_display() */
-                       if (rc == 0)
+               rc2 = do_param_op(popt, pathname, value, oper, wq);
+               if (!rc2 && rc2 != -ENOENT) {
+                       /* errors will be printed by do_param_op() */
+                       if (!rc)
                                rc = rc2;
                        continue;
                }
@@ -1334,7 +1341,7 @@ int jt_lcfg_listparam(int argc, char **argv)
                        continue;
                }
 
-               rc2 = param_display(&popt, path, NULL, LIST_PARAM);
+               rc2 = do_param_op(&popt, path, NULL, LIST_PARAM, NULL);
                if (rc2 < 0) {
                        if (rc == 0)
                                rc = rc2;
@@ -1422,7 +1429,9 @@ int jt_lcfg_getparam(int argc, char **argv)
                        continue;
                }
 
-               rc2 = param_display(&popt, path, NULL, mode);
+               rc2 = do_param_op(&popt, path, NULL,
+                                 popt.po_only_path ? LIST_PARAM : GET_PARAM,
+                                 NULL);
                if (rc2 < 0) {
                        if (rc == 0)
                                rc = rc2;
@@ -1600,14 +1609,14 @@ int jt_nodemap_info(int argc, char **argv)
 
        if (argc == 1 || strcmp("list", argv[1]) == 0) {
                popt.po_only_dir = 1;
-               rc = param_display(&popt, "nodemap/*", NULL, LIST_PARAM);
+               rc = do_param_op(&popt, "nodemap/*", NULL, LIST_PARAM, NULL);
        } else if (strcmp("all", argv[1]) == 0) {
-               rc = param_display(&popt, "nodemap/*/*", NULL, GET_PARAM);
+               rc = do_param_op(&popt, "nodemap/*/*", NULL, GET_PARAM, NULL);
        } else {
                char    pattern[PATH_MAX];
 
                snprintf(pattern, sizeof(pattern), "nodemap/%s/*", argv[1]);
-               rc = param_display(&popt, pattern, NULL, GET_PARAM);
+               rc = do_param_op(&popt, pattern, NULL, GET_PARAM, NULL);
                if (rc == -ESRCH)
                        fprintf(stderr,
                                "error: nodemap_info: cannot find nodemap %s\n",
@@ -1617,6 +1626,15 @@ int jt_nodemap_info(int argc, char **argv)
 }
 #endif
 
+/**
+ * Parses the command-line options to set_param.
+ *
+ * \param[in] argc     count of arguments given to set_param
+ * \param[in] argv     array of arguments given to set_param
+ * \param[out] popt    where set_param options will be saved
+ *
+ * \retval index in argv of the first non-option argv element (optind value)
+ */
 static int setparam_cmdline(int argc, char **argv, struct param_opts *popt)
 {
        int ch;
@@ -1628,12 +1646,35 @@ static int setparam_cmdline(int argc, char **argv, struct param_opts *popt)
        popt->po_perm = 0;
        popt->po_delete = 0;
        popt->po_file = 0;
+       popt->po_parallel_threads = 0;
+       opterr = 0;
 
-       while ((ch = getopt(argc, argv, "nPdF")) != -1) {
+       while ((ch = getopt(argc, argv, "dFnPt::")) != -1) {
                switch (ch) {
                case 'n':
                        popt->po_show_path = 0;
                        break;
+               case 't':
+#if HAVE_LIBPTHREAD
+                       if (optarg)
+                               popt->po_parallel_threads = atoi(optarg);
+                       else
+                               popt->po_parallel_threads = LCFG_THREADS_DEF;
+                       if (popt->po_parallel_threads < 2)
+                               return -EINVAL;
+                       break;
+#else
+                       {
+                       static bool printed;
+
+                       if (!printed) {
+                               printed = true;
+                               fprintf(stderr,
+                                       "warning: set_param: no pthread support, proceeding serially.\n");
+                       }
+                       }
+#endif
+                       break;
                case 'P':
                        popt->po_perm = 1;
                        break;
@@ -1654,6 +1695,48 @@ static int setparam_cmdline(int argc, char **argv, struct param_opts *popt)
        return optind;
 }
 
+/**
+ * Parse the arguments to set_param and return the first parameter and value
+ * pair and the number of arguments consumed.
+ *
+ * \param[in] argc   number of arguments remaining in argv
+ * \param[in] argv   list of param-value arguments to set_param (this function
+ *                   will modify the strings by overwriting '=' with '\0')
+ * \param[out] param the parameter name
+ * \param[out] value the parameter value
+ *
+ * \retval the number of args consumed from argv (1 for "param=value" format, 2
+ *         for "param value" format)
+ * \retval -errno if unsuccessful
+ */
+static int sp_parse_param_value(int argc, char **argv, char **param,
+                               char **value)
+{
+       char *tmp;
+
+       if (argc < 1 || !(argv && param && value))
+               return -EINVAL;
+
+       *param = argv[0];
+       tmp = strchr(*param, '=');
+       if (tmp) {
+               /* format: set_param a=b */
+               *tmp = '\0';
+               tmp++;
+               if (*tmp == '\0')
+                       return -EINVAL;
+               *value = tmp;
+               return 1;
+       }
+
+       /* format: set_param a b */
+       if (argc < 2)
+               return -EINVAL;
+       *value = argv[1];
+
+       return 2;
+}
+
 enum paramtype {
        PT_NONE = 0,
        PT_SETPARAM,
@@ -1887,11 +1970,22 @@ int jt_lcfg_applyyaml(int argc, char **argv)
        return lcfg_apply_param_yaml(argv[0], argv[index]);
 }
 
+/**
+ * Main set_param function.
+ *
+ * \param[in] argc     count of arguments given to set_param
+ * \param[in] argv     array of arguments given to set_param
+ *
+ * \retval 0 if successful
+ * \retval -errno if unsuccessful
+ */
 int jt_lcfg_setparam(int argc, char **argv)
 {
-       int rc = 0, index, i;
+       int rc = 0;
+       int index = 0;
        struct param_opts popt;
-       char *path = NULL, *value = NULL;
+       struct sp_workq wq;
+       struct sp_workq *wq_ptr = NULL;
 
        memset(&popt, 0, sizeof(popt));
        index = setparam_cmdline(argc, argv, &popt);
@@ -1914,49 +2008,65 @@ int jt_lcfg_setparam(int argc, char **argv)
                return lcfg_apply_param_yaml(argv[0], argv[index]);
 #endif
        }
-       for (i = index; i < argc; i++) {
-               int rc2;
 
-               path = argv[i];
-               value = strchr(path, '=');
-               if (value) {
-                       /* format: set_param a=b */
-                       *value = '\0';
-                       value++;
-                       if (*value == '\0') {
-                               fprintf(stderr,
-                                       "error: %s: setting %s: no value\n",
-                                       jt_cmdname(argv[0]), path);
-                               if (rc == 0)
-                                       rc = -EINVAL;
-                               continue;
-                       }
+       if (popt_is_parallel(popt)) {
+               rc = spwq_init(&wq, &popt);
+               if (rc < 0) {
+                       fprintf(stderr,
+                               "warning: parallel %s: failed to init work queue: %s. Proceeding serially.\n",
+                               jt_cmdname(argv[0]), strerror(-rc));
+                       rc = 0;
+                       popt.po_parallel_threads = 0;
                } else {
-                       /* format: set_param a b */
-                       i++;
-                       if (i >= argc) {
-                               fprintf(stderr,
-                                       "error: %s: setting %s: no value\n",
-                                       jt_cmdname(argv[0]), path);
-                               if (rc == 0)
-                                       rc = -EINVAL;
-                               break;
-                       }
-                       value = argv[i];
+                       wq_ptr = &wq;
                }
+       }
 
-               rc2 = clean_path(&popt, path);
+       while (index < argc) {
+               char *path = NULL;
+               char *value = NULL;
+
+               rc = sp_parse_param_value(argc - index, argv + index,
+                                         &path, &value);
+               if (rc < 0) {
+                       fprintf(stderr, "error: %s: setting %s: %s\n",
+                               jt_cmdname(argv[0]), path, strerror(-rc));
+                       break;
+               }
+               /* Increment index by the number of arguments consumed. */
+               index += rc;
+
+               rc = clean_path(&popt, path);
+               if (rc < 0)
+                       break;
+
+               rc = do_param_op(&popt, path, value, SET_PARAM, wq_ptr);
+               if (rc < 0)
+                       fprintf(stderr, "error: %s: setting '%s'='%s': %s\n",
+                               jt_cmdname(argv[0]), path, value,
+                               strerror(-rc));
+       }
+
+       if (popt_is_parallel(popt)) {
+               int rc2;
+               /* Spawn threads to set the parameters which made it into the
+                * work queue to emulate serial set_param behavior when errors
+                * are encountered above.
+                */
+               rc2 = sp_run_threads(&wq);
                if (rc2 < 0) {
-                       fprintf(stderr, "error: %s: cleaning %s: %s\n",
-                               jt_cmdname(argv[0]), path, strerror(-rc2));
-                       if (rc == 0)
+                       fprintf(stderr,
+                               "error: parallel %s: failed to run threads: %s\n",
+                               jt_cmdname(argv[0]), strerror(-rc2));
+                       if (!rc)
                                rc = rc2;
-                       continue;
                }
-
-               rc2 = param_display(&popt, path, value, SET_PARAM);
-               if (rc == 0)
-                       rc = rc2;
+               rc2 = spwq_destroy(&wq);
+               if (rc2 < 0) {
+                       fprintf(stderr,
+                               "warning: parallel %s: failed to cleanup work queue: %s\n",
+                               jt_cmdname(argv[0]), strerror(-rc2));
+               }
        }
 
        return rc;