Whamcloud - gitweb
LU-9680 net: Netlink improvements 58/44358/14
authorJames Simmons <jsimmons@infradead.org>
Wed, 27 Oct 2021 16:03:10 +0000 (12:03 -0400)
committerOleg Drokin <green@whamcloud.com>
Sun, 21 Nov 2021 20:49:58 +0000 (20:49 +0000)
With the expansion of the use of Netlink several issues have been
encountered. This patch fixes many of the issues. The issues are:

1) Fix idx handling in lnet_genl_parse_list() function. It needs
   to always been incremented. Some renaming suggestion for
   enum lnet_nl_scalar_attrs from Neil. New LN_SCALAR_ATTR_INT_VALUE
   to allow pushing integers as well as strings from userspace.

2) Create struct genl_filter_list which will be used to create
   a list of items to pass back to userland. This will be a common
   setup.

3) A normal user can't read /sys/debug/kernel/lustre which breaks
   lctl ***_params XXX since the first function called is
   llapi_param_get_paths(). Without the ability to read the
   debugfs tree glob() will fail. The solution is to use the
   kernel's glob function and just pass the requested string to
   the kernel.

4) For the external coordinator work you create a YAML parser
   that listens for kernel generated Netlink packets. This is
   a continuous stream vs an one time reply which we don't
   handle correctly. We move the handling of the completion
   of a Netlink packet series icompletely into the function
   yaml_netlink_msg_complete. In yaml_netlink_msg_parse() for
   the async case add "---" and in yaml_netlink_msg_complete()
   add "..." to define the beginning and end of a YAML document.

5) We have 3 types of setups. For kernel generated events it is
   possible to use just a YAML parser to listen for events. For
   the normal request -> reply setup we need both a YAML emitter
   and YAML parser. The last case is just sending commands to
   the kernel which only needs a YAML emitter. It is possible for
   that action to fail so we need to add handling for errors to
   the YAML emitter. We keep error handling for the YAML parser
   as well to handle the case of a stand along YAML parser listener.

6) Reworked the code that translates YAML to Netlink packets to
   send to the kernel so the both key and value pairs are sent
   seperately for the mapping case. This avoids dealing with
   complex string parsing in the kernel.

7) Error message handling was incorrect. struct nlmsgerr msg field
   is also the start of the nlattrs for the ext ack handling.

Test-Parameters: trivial
Change-Id: Ic8eee8fd0020b7a63565de6ef69f2c74bf4bdcd8
Signed-off-by: James Simmons <jsimmons@infradead.org>
Reviewed-on: https://review.whamcloud.com/44358
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Ben Evans <beevans@whamcloud.com>
Reviewed-by: Petros Koutoupis <petros.koutoupis@hpe.com>
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
libcfs/autoconf/lustre-libcfs.m4
libcfs/include/libcfs/linux/Makefile.am
libcfs/include/libcfs/linux/glob.h [new file with mode: 0644]
libcfs/include/libcfs/linux/linux-net.h
libcfs/libcfs/Makefile.in
libcfs/libcfs/linux/Makefile.am
libcfs/libcfs/linux/glob.c [new file with mode: 0644]
lnet/include/lnet/lib-types.h
lnet/include/uapi/linux/lnet/lnet-nl.h
lnet/lnet/api-ni.c
lnet/utils/lnetconfig/liblnetconfig_netlink.c

index b223340..4d72083 100644 (file)
@@ -262,7 +262,7 @@ EXTRA_KCFLAGS="$tmp_flags"
 #
 # LIBCFS_HAVE_NS_TO_TIMESPEC64
 #
-# Kernel version 4.16-rc3 commit a84d1169164b274f13b97a23ff235c000efe3b49
+# Kernel version 3.16-rc3 commit a84d1169164b274f13b97a23ff235c000efe3b49
 # introduced struct __kernel_old_timeval
 #
 AC_DEFUN([LIBCFS_HAVE_NS_TO_TIMESPEC64],[
@@ -280,6 +280,27 @@ kernel_old_timeval, [
 ]) # LIBCFS_HAVE_NS_TO_TIMESPEC64
 
 #
+# LIBCFS_HAVE_GLOB
+#
+# Kernel version 3.16 commit b01250856b25f4417c51aa33afc451fbf7da1484
+# added glob support to the Linux kernel
+#
+AC_DEFUN([LIBCFS_HAVE_GLOB],[
+tmp_flags="$EXTRA_KCFLAGS"
+EXTRA_KCFLAGS="-Werror"
+LB_CHECK_COMPILE([does 'glob_match()' exist],
+glob, [
+       #include <linux/glob.h>
+],[
+       return glob_match(NULL, NULL);
+],[
+       AC_DEFINE(HAVE_GLOB, 1,
+               [glob_match() is available])
+])
+EXTRA_KCFLAGS="$tmp_flags"
+]) # LIBCFS_HAVE_GLOB
+
+#
 # Kernel version 3.17 changed hlist_add_after to
 # hlist_add_behind
 #
@@ -1547,6 +1568,29 @@ EXTRA_KCFLAGS="$tmp_flags"
 ]) # LIBCFS_CACHE_DETAIL_WRITERS
 
 #
+# LIBCFS_GENL_DUMPIT_INFO
+#
+# kernel v5.4-rc1 commit bf813b0afeae2f012f0e527a526c1b78ca21ad82
+# expanded struct genl_dumpit_info to include struct genl_family.
+#
+AC_DEFUN([LIBCFS_GENL_DUMPIT_INFO], [
+tmp_flags="$EXTRA_KCFLAGS"
+EXTRA_KCFLAGS="-Werror"
+LB_CHECK_COMPILE([if struct genl_dumpit_info has family field],
+genl_dumpit_info, [
+       #include <net/genetlink.h>
+],[
+       static struct genl_dumpit_info info;
+
+       info.family = NULL;
+],[
+       AC_DEFINE(HAVE_GENL_DUMPIT_INFO, 1,
+               [struct genl_dumpit_info has family field])
+])
+EXTRA_KCFLAGS="$tmp_flags"
+]) # LIBCFS_GENL_DUMPIT_INFO
+
+#
 # LIBCFS_KALLSYMS_LOOKUP
 #
 # kernel v5.6-11591-g0bd476e6c671
@@ -1784,6 +1828,7 @@ LIBCFS_MATCH_WILDCARD
 LIBCFS_HAVE_MAPPING_AS_EXITING_FLAG
 LIBCFS_IOV_ITER_HAS_TYPE
 # 3.16
+LIBCFS_HAVE_GLOB
 LIBCFS_HAVE_NS_TO_TIMESPEC64
 # 3.17
 LIBCFS_HLIST_ADD_AFTER
@@ -1874,7 +1919,8 @@ LIBCFS_GET_REQUEST_KEY_AUTH
 LIBCFS_LOOKUP_USER_KEY
 LIBCFS_FORCE_SIG_WITH_TASK
 LIBCFS_CACHE_DETAIL_WRITERS
-LIBCFS_HAVE_NR_UNSTABLE_NFS
+# 5.4
+LIBCFS_GENL_DUMPIT_INFO
 # 5.7
 LIBCFS_KALLSYMS_LOOKUP
 LIBCFS_TCP_SOCK_SET_QUICKACK
@@ -1884,6 +1930,7 @@ LIBCFS_TCP_SOCK_SET_KEEPCNT
 LIBCFS_HAVE_MMAP_LOCK
 LIBCFS_KERNEL_SETSOCKOPT
 LIBCFS_VMALLOC_2ARGS
+LIBCFS_HAVE_NR_UNSTABLE_NFS
 LIBCFS_SEC_RELEASE_SECCTX
 # 5.10
 LIBCFS_HAVE_KFREE_SENSITIVE
index 7e9b29b..d3fedf6 100644 (file)
@@ -1,3 +1,3 @@
 EXTRA_DIST = linux-misc.h linux-fs.h linux-mem.h linux-time.h linux-cpu.h \
             linux-list.h linux-hash.h linux-uuid.h linux-wait.h linux-net.h \
-            refcount.h processor.h xarray.h
+            glob.h refcount.h processor.h xarray.h
diff --git a/libcfs/include/libcfs/linux/glob.h b/libcfs/include/libcfs/linux/glob.h
new file mode 100644 (file)
index 0000000..fca03b5
--- /dev/null
@@ -0,0 +1,13 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _LINUX_GLOB_H
+#define _LINUX_GLOB_H
+
+#ifndef HAVE_GLOB
+
+#include <linux/types.h>       /* For bool */
+#include <linux/compiler.h>    /* For __pure */
+
+bool __pure glob_match(char const *pat, char const *str);
+#endif /* !HAVE_GLOB */
+
+#endif /* _LINUX_GLOB_H */
index 08d9cf9..a9502a3 100644 (file)
@@ -24,6 +24,7 @@
 #define __LIBCFS_LINUX_NET_H__
 
 #include <net/netlink.h>
+#include <net/genetlink.h>
 
 #ifndef HAVE_NLA_STRDUP
 char *nla_strdup(const struct nlattr *nla, gfp_t flags);
@@ -76,6 +77,28 @@ static inline int cfs_nla_parse_nested(struct nlattr *tb[], int maxtype,
 
 #endif
 
+#ifndef HAVE_GENL_DUMPIT_INFO
+struct cfs_genl_dumpit_info {
+       const struct genl_family *family;
+       const struct genl_ops *ops;
+       struct nlattr **attrs;
+};
+
+static inline const struct cfs_genl_dumpit_info *
+lnet_genl_dumpit_info(struct netlink_callback *cb)
+{
+       return (const struct cfs_genl_dumpit_info *)cb->args[1];
+}
+#else
+#define cfs_genl_dumpit_info   genl_dumpit_info
+
+static inline const struct cfs_genl_dumpit_info *
+lnet_genl_dumpit_info(struct netlink_callback *cb)
+{
+       return (const struct cfs_genl_dumpit_info *)genl_dumpit_info(cb);
+}
+#endif /* HAVE_GENL_DUMPIT_INFO */
+
 #ifdef HAVE_KERNEL_SETSOCKOPT
 
 #include <net/tcp.h>
index 7a1c6cf..20b5a27 100644 (file)
@@ -3,6 +3,7 @@ MODULES = libcfs
 libcfs-linux-objs := linux-prim.o
 libcfs-linux-objs += linux-hash.o
 libcfs-linux-objs += linux-wait.o
+libcfs-linux-objs += glob.o
 libcfs-linux-objs += xarray.o
 
 libcfs-crypto-objs := crypto.o fname.o hkdf.o hooks.o keyring.o
index ef34750..959e1ff 100644 (file)
@@ -1,4 +1,5 @@
 EXTRA_DIST = linux-prim.c \
             linux-hash.c \
             linux-wait.c \
+            glob.c \
             xarray.c
diff --git a/libcfs/libcfs/linux/glob.c b/libcfs/libcfs/linux/glob.c
new file mode 100644 (file)
index 0000000..9019246
--- /dev/null
@@ -0,0 +1,117 @@
+#ifndef HAVE_GLOB
+#include <linux/module.h>
+#include "libcfs/linux/glob.h"
+
+/**
+ * glob_match - Shell-style pattern matching, like !fnmatch(pat, str, 0)
+ * @pat: Shell-style pattern to match, e.g. "*.[ch]".
+ * @str: String to match.  The pattern must match the entire string.
+ *
+ * Perform shell-style glob matching, returning true (1) if the match
+ * succeeds, or false (0) if it fails.  Equivalent to !fnmatch(@pat, @str, 0).
+ *
+ * Pattern metacharacters are ?, *, [ and \.
+ * (And, inside character classes, !, - and ].)
+ *
+ * This is small and simple implementation intended for device blacklists
+ * where a string is matched against a number of patterns.  Thus, it
+ * does not preprocess the patterns.  It is non-recursive, and run-time
+ * is at most quadratic: strlen(@str)*strlen(@pat).
+ *
+ * An example of the worst case is glob_match("*aaaaa", "aaaaaaaaaa");
+ * it takes 6 passes over the pattern before matching the string.
+ *
+ * Like !fnmatch(@pat, @str, 0) and unlike the shell, this does NOT
+ * treat / or leading . specially; it isn't actually used for pathnames.
+ *
+ * Note that according to glob(7) (and unlike bash), character classes
+ * are complemented by a leading !; this does not support the regex-style
+ * [^a-z] syntax.
+ *
+ * An opening bracket without a matching close is matched literally.
+ */
+bool __pure glob_match(char const *pat, char const *str)
+{
+       /*
+        * Backtrack to previous * on mismatch and retry starting one
+        * character later in the string.  Because * matches all characters
+        * (no exception for /), it can be easily proved that there's
+        * never a need to backtrack multiple levels.
+        */
+       char const *back_pat = NULL, *back_str = back_str;
+
+       /*
+        * Loop over each token (character or class) in pat, matching
+        * it against the remaining unmatched tail of str.  Return false
+        * on mismatch, or true after matching the trailing nul bytes.
+        */
+       for (;;) {
+               unsigned char c = *str++;
+               unsigned char d = *pat++;
+
+               switch (d) {
+               case '?':       /* Wildcard: anything but nul */
+                       if (c == '\0')
+                               return false;
+                       break;
+               case '*':       /* Any-length wildcard */
+                       if (*pat == '\0')       /* Optimize trailing * case */
+                               return true;
+                       back_pat = pat;
+                       back_str = --str;       /* Allow zero-length match */
+                       break;
+               case '[': {     /* Character class */
+                       bool match = false, inverted = (*pat == '!');
+                       char const *class = pat + inverted;
+                       unsigned char a = *class++;
+
+                       /*
+                        * Iterate over each span in the character class.
+                        * A span is either a single character a, or a
+                        * range a-b.  The first span may begin with ']'.
+                        */
+                       do {
+                               unsigned char b = a;
+
+                               if (a == '\0')  /* Malformed */
+                                       goto literal;
+
+                               if (class[0] == '-' && class[1] != ']') {
+                                       b = class[1];
+
+                                       if (b == '\0')
+                                               goto literal;
+
+                                       class += 2;
+                                       /* Any special action if a > b? */
+                               }
+                               match |= (a <= c && c <= b);
+                       } while ((a = *class++) != ']');
+
+                       if (match == inverted)
+                               goto backtrack;
+                       pat = class;
+                       }
+                       break;
+               case '\\':
+                       d = *pat++;
+                       /*FALLTHROUGH*/
+               default:        /* Literal character */
+literal:
+                       if (c == d) {
+                               if (d == '\0')
+                                       return true;
+                               break;
+                       }
+backtrack:
+                       if (c == '\0' || !back_pat)
+                               return false;   /* No point continuing */
+                       /* Try again from last *, one character later in str. */
+                       pat = back_pat;
+                       str = ++back_str;
+                       break;
+               }
+       }
+}
+EXPORT_SYMBOL(glob_match);
+#endif /* ! HAVE_GLOB */
index 45a420f..f4a64af 100644 (file)
@@ -1273,7 +1273,13 @@ struct lnet {
        struct list_head                ln_udsp_list;
 };
 
-static const struct nla_policy scalar_attr_policy[LN_SCALAR_CNT + 1] = {
+struct genl_filter_list {
+       struct list_head         lp_list;
+       void                    *lp_cursor;
+       bool                     lp_first;
+};
+
+static const struct nla_policy scalar_attr_policy[LN_SCALAR_MAX + 1] = {
        [LN_SCALAR_ATTR_LIST]           = { .type = NLA_NESTED },
        [LN_SCALAR_ATTR_LIST_SIZE]      = { .type = NLA_U16 },
        [LN_SCALAR_ATTR_INDEX]          = { .type = NLA_U16 },
index de354fb..8bc0317 100644 (file)
@@ -37,23 +37,44 @@ enum lnet_nl_key_format {
        LNKF_SEQUENCE           = 4,
 };
 
+/**
+ * enum lnet_nl_scalar_attrs           - scalar LNet netlink attributes used
+ *                                       to compose messages for sending or
+ *                                       receiving.
+ *
+ * @LN_SCALAR_ATTR_UNSPEC:             unspecified attribute to catch errors
+ * @LN_SCALAR_ATTR_PAD:                        padding for 64-bit attributes, ignore
+ *
+ * @LN_SCALAR_ATTR_LIST:               List of scalar attributes (NLA_NESTED)
+ * @LN_SCALAR_ATTR_LIST_SIZE:          Number of items in scalar list (NLA_U16)
+ * @LN_SCALAR_ATTR_INDEX:              True Netlink attr value (NLA_U16)
+ * @LN_SCALAR_ATTR_NLA_TYPE:           Data format for value part of the pair
+ *                                     (NLA_U16)
+ * @LN_SCALAR_ATTR_VALUE:              String value of key part of the pair.
+ *                                     (NLA_NUL_STRING)
+ * @LN_SCALAR_ATTR_INT_VALUE:          Numeric value of key part of the pair.
+ *                                     (NLA_S64)
+ * @LN_SCALAR_ATTR_KEY_FORMAT:         LNKF_* format of the key value pair.
+ */
 enum lnet_nl_scalar_attrs {
        LN_SCALAR_ATTR_UNSPEC = 0,
-       LN_SCALAR_ATTR_LIST,
+       LN_SCALAR_ATTR_PAD = LN_SCALAR_ATTR_UNSPEC,
 
+       LN_SCALAR_ATTR_LIST,
        LN_SCALAR_ATTR_LIST_SIZE,
        LN_SCALAR_ATTR_INDEX,
        LN_SCALAR_ATTR_NLA_TYPE,
        LN_SCALAR_ATTR_VALUE,
+       LN_SCALAR_ATTR_INT_VALUE,
        LN_SCALAR_ATTR_KEY_FORMAT,
 
-       __LN_SCALAR_ATTR_LAST,
+       __LN_SCALAR_ATTR_MAX_PLUS_ONE,
 };
 
-#define LN_SCALAR_CNT (__LN_SCALAR_ATTR_LAST - 1)
+#define LN_SCALAR_MAX (__LN_SCALAR_ATTR_MAX_PLUS_ONE - 1)
 
 struct ln_key_props {
-       char                    *lkp_values;
+       char                    *lkp_value;
        __u16                   lkp_key_format;
        __u16                   lkp_data_type;
 };
index c4d6e6c..f7be169 100644 (file)
@@ -2753,9 +2753,9 @@ static int lnet_genl_parse_list(struct sk_buff *msg,
                                    list->lkl_maxattr);
 
                nla_put_u16(msg, LN_SCALAR_ATTR_INDEX, count);
-               if (props[count].lkp_values)
+               if (props[count].lkp_value)
                        nla_put_string(msg, LN_SCALAR_ATTR_VALUE,
-                                      props[count].lkp_values);
+                                      props[count].lkp_value);
                if (props[count].lkp_key_format)
                        nla_put_u16(msg, LN_SCALAR_ATTR_KEY_FORMAT,
                                    props[count].lkp_key_format);
@@ -2767,13 +2767,14 @@ static int lnet_genl_parse_list(struct sk_buff *msg,
                        rc = lnet_genl_parse_list(msg, data, ++idx);
                        if (rc < 0)
                                return rc;
+                       idx = rc;
                }
 
                nla_nest_end(msg, key);
        }
 
        nla_nest_end(msg, node);
-       return 0;
+       return idx;
 }
 
 int lnet_genl_send_scalar_list(struct sk_buff *msg, u32 portid, u32 seq,
@@ -2798,7 +2799,7 @@ int lnet_genl_send_scalar_list(struct sk_buff *msg, u32 portid, u32 seq,
 canceled:
        if (rc < 0)
                genlmsg_cancel(msg, hdr);
-       return rc;
+       return rc > 0 ? 0 : rc;
 }
 EXPORT_SYMBOL(lnet_genl_send_scalar_list);
 
index 31864db..98e6ac4 100644 (file)
@@ -89,6 +89,10 @@ int64_t nla_get_s64(const struct nlattr *nla)
 
        return tmp;
 }
+
+#define NLA_PUT_S64(msg, attrtype, value) \
+       NLA_PUT_TYPE(msg, int64_t, attrtype, value)
+
 #endif /* ! HAVE_NLA_GET_S64 */
 
 /**
@@ -307,6 +311,7 @@ struct yaml_netlink_input {
        const char              *errmsg;
        struct nl_sock          *nl;
        bool                    complete;
+       bool                    async;
        unsigned int            indent;
        struct yaml_nl_node     *cur;
        struct yaml_nl_node     *root;
@@ -347,22 +352,28 @@ yaml_parser_set_reader_error(yaml_parser_t *parser, const char *problem,
  *     value will be.
  *
  * LN_SCALAR_ATTR_VALUE:
- *     The key's actually scalar value.
+ *     The string represnting key's actually scalar value.
+ *
+ * LN_SCALAR_ATTR_INT_VALUE:
+ *     For this case the key is an integer value. This shouldn't be
+ *     sent for the receive case since we are going to just turn it
+ *     into a string for YAML. Sending packets will make use of this.
  *
  * LN_SCALAR_ATTR_KEY_TYPE:
  *     What YAML format is it? block or flow. Only useful for
  *     LN_SCALAR_ATTR_NLA_TYPE of type NLA_NESTED or NLA_NUL_STRING
  *
- * LN_SCALAR_ATTR_LIST + CFS_SCALAR_LIST_SIZE:
+ * LN_SCALAR_ATTR_LIST + LN_SCALAR_LIST_SIZE:
  *     Defined the next collection which is a collection of nested
  *     attributes of the above.
  */
-static struct nla_policy scalar_attr_policy[LN_SCALAR_CNT + 1] = {
+static struct nla_policy scalar_attr_policy[LN_SCALAR_MAX + 1] = {
        [LN_SCALAR_ATTR_LIST]           = { .type = NLA_NESTED },
        [LN_SCALAR_ATTR_LIST_SIZE]      = { .type = NLA_U16 },
        [LN_SCALAR_ATTR_INDEX]          = { .type = NLA_U16 },
        [LN_SCALAR_ATTR_NLA_TYPE]       = { .type = NLA_U16 },
        [LN_SCALAR_ATTR_VALUE]          = { .type = NLA_STRING },
+       [LN_SCALAR_ATTR_INT_VALUE]      = { .type = NLA_S64 },
        [LN_SCALAR_ATTR_KEY_FORMAT]     = { .type = NLA_U16 },
 };
 
@@ -370,7 +381,7 @@ static int yaml_parse_key_list(struct yaml_netlink_input *data,
                               struct yaml_nl_node *parent,
                               struct nlattr *list)
 {
-       struct nlattr *tbl_info[LN_SCALAR_CNT + 1];
+       struct nlattr *tbl_info[LN_SCALAR_MAX + 1];
        struct yaml_nl_node *node = NULL;
        struct nlattr *attr;
        int rem;
@@ -378,7 +389,7 @@ static int yaml_parse_key_list(struct yaml_netlink_input *data,
        nla_for_each_nested(attr, list, rem) {
                uint16_t index = 0;
 
-               if (nla_parse_nested(tbl_info, LN_SCALAR_CNT, attr,
+               if (nla_parse_nested(tbl_info, LN_SCALAR_MAX, attr,
                                     scalar_attr_policy))
                        break;
 
@@ -434,12 +445,12 @@ static int yaml_parse_key_list(struct yaml_netlink_input *data,
                        name = nla_strdup(tbl_info[LN_SCALAR_ATTR_VALUE]);
                        if (!name)
                                return NL_STOP;
-                       node->keys.lkl_list[index].lkp_values = name;
+                       node->keys.lkl_list[index].lkp_value = name;
                }
 
                if (tbl_info[LN_SCALAR_ATTR_LIST]) {
-                       int rc =  yaml_parse_key_list(data, node,
-                                                     tbl_info[LN_SCALAR_ATTR_LIST]);
+                       int rc = yaml_parse_key_list(data, node,
+                                                    tbl_info[LN_SCALAR_ATTR_LIST]);
                        if (rc != NL_OK)
                                return rc;
                }
@@ -514,7 +525,7 @@ static void yaml_parse_value_list(struct yaml_netlink_input *data, int *size,
                struct nlattr *attr;
 
                attr = attr_array[i];
-               if (!attr && !keys[i].lkp_values)
+               if (!attr && !keys[i].lkp_value)
                        continue;
 
                if (keys[i].lkp_data_type != NLA_NUL_STRING &&
@@ -538,7 +549,7 @@ static void yaml_parse_value_list(struct yaml_netlink_input *data, int *size,
 
                        if (mapping & LNKF_MAPPING) {
                                len = snprintf(data->buffer, *size, "%s: ",
-                                              keys[i].lkp_values);
+                                              keys[i].lkp_value);
                                if (len < 0)
                                        goto unwind;
                                data->buffer += len;
@@ -583,7 +594,7 @@ static void yaml_parse_value_list(struct yaml_netlink_input *data, int *size,
                                        len = snprintf(data->buffer, *size,
                                                       "%*s%s: %c ",
                                                       data->indent, "",
-                                                      keys[i].lkp_values,
+                                                      keys[i].lkp_value,
                                                       brace);
                                } else {
                                        if (keys[i].lkp_key_format &
@@ -596,7 +607,7 @@ static void yaml_parse_value_list(struct yaml_netlink_input *data, int *size,
                                        len = snprintf(data->buffer, *size,
                                                       "%*s%s:\n",
                                                       data->indent, "",
-                                                      keys[i].lkp_values);
+                                                      keys[i].lkp_value);
                                }
                                if (len < 0)
                                        goto unwind;
@@ -634,10 +645,10 @@ static void yaml_parse_value_list(struct yaml_netlink_input *data, int *size,
                                /* The top level is special so only print
                                 * once
                                 */
-                               if (strlen(keys[i].lkp_values)) {
+                               if (strlen(keys[i].lkp_value)) {
                                        len = snprintf(data->buffer,
                                                       *size, "%s:\n",
-                                                      keys[i].lkp_values);
+                                                      keys[i].lkp_value);
                                        if (len < 0)
                                                goto unwind;
                                        data->buffer += len;
@@ -652,9 +663,9 @@ static void yaml_parse_value_list(struct yaml_netlink_input *data, int *size,
                                                data->indent += 2;
                                }
 not_first:
-                               if (attr && parent->lkp_values) {
-                                       free(parent->lkp_values);
-                                       parent->lkp_values = nla_strdup(attr);
+                               if (attr && parent->lkp_value) {
+                                       free(parent->lkp_value);
+                                       parent->lkp_value = nla_strdup(attr);
                                }
                        }
                        break;
@@ -727,9 +738,9 @@ static int yaml_netlink_msg_parse(struct nl_msg *msg, void *arg)
        struct nlmsghdr *nlh = nlmsg_hdr(msg);
 
        if (nlh->nlmsg_flags & NLM_F_CREATE) {
-               struct nlattr *attrs[LN_SCALAR_CNT + 1];
+               struct nlattr *attrs[LN_SCALAR_MAX + 1];
 
-               if (genlmsg_parse(nlh, 0, attrs, LN_SCALAR_CNT + 1,
+               if (genlmsg_parse(nlh, 0, attrs, LN_SCALAR_MAX + 1,
                                  scalar_attr_policy))
                        return NL_SKIP;
 
@@ -742,6 +753,18 @@ static int yaml_netlink_msg_parse(struct nl_msg *msg, void *arg)
                        /* reset to root node */
                        data->cur = data->root;
                }
+
+               /* For streaming insert '---' to define start of
+                * YAML document. This allows use to extract
+                * documents out of a multiplexed stream.
+                */
+               if (data->async) {
+                       char *start_doc = "---\n";
+                       size_t len = strlen(start_doc) + 1;
+
+                       strncpy(data->buffer, start_doc, len);
+                       data->buffer += len - 1;
+               }
        } else {
                uint16_t maxtype = data->cur->keys.lkl_maxattr;
                struct nla_policy policy[maxtype];
@@ -761,10 +784,8 @@ static int yaml_netlink_msg_parse(struct nl_msg *msg, void *arg)
                                      &data->cur->keys.lkl_list[1]);
        }
 
-       if (nlh->nlmsg_flags & NLM_F_MULTI && nlh->nlmsg_type != NLMSG_DONE)
-               return NL_OK;
-
-       return NL_STOP;
+       /* Let yaml_netlink_msg_complete end collecting data */
+       return NL_OK;
 }
 
 static bool cleanup_children(struct yaml_nl_node *parent)
@@ -776,8 +797,8 @@ static bool cleanup_children(struct yaml_nl_node *parent)
                int i;
 
                for (i = 1; i < parent->keys.lkl_maxattr; i++)
-                       if (keys[i].lkp_values)
-                               free(keys[i].lkp_values);
+                       if (keys[i].lkp_value)
+                               free(keys[i].lkp_value);
                nl_list_del(&parent->list);
                return true;
        }
@@ -810,28 +831,42 @@ static int yaml_netlink_msg_complete(struct nl_msg *msg, void *arg)
                /* Newer kernels support NLM_F_ACK_TLVS in nlmsg_flags
                 * which gives greater detail why we failed.
                 */
-               if (nlh->nlmsg_flags & NLM_F_ACK_TLVS) {
-                       struct nla_policy extack_policy[NLMSGERR_ATTR_MAX + 1] = {
-                               [NLMSGERR_ATTR_MSG]     = { .type = NLA_STRING },
-                               [NLMSGERR_ATTR_OFFS]    = { .type = NLA_U32 },
-                       };
+               if (nlh->nlmsg_flags & NLM_F_ACK_TLVS &&
+                   !(nlh->nlmsg_flags & NLM_F_CAPPED)) {
+                       struct nlattr *head = ((void *)&errmsg->msg);
                        struct nlattr *tb[NLMSGERR_ATTR_MAX + 1];
 
-                       if (nlmsg_parse(nlh, 0, tb, sizeof(extack_policy),
-                                       extack_policy) == 0) {
+                       if (nla_parse(tb, NLMSGERR_ATTR_MAX + 1, head,
+                                     nlmsg_attrlen(nlh, 0), NULL) == 0) {
                                if (tb[NLMSGERR_ATTR_MSG])
-                                       data->errmsg = nla_get_string(tb[NLMSGERR_ATTR_MSG]);
+                                       data->errmsg = nla_strdup(tb[NLMSGERR_ATTR_MSG]);
                        }
                }
 #endif /* HAVE_USRSPC_NLMSGERR */
                data->parser->error = YAML_READER_ERROR;
+               data->complete = true;
+
+               return NL_STOP;
        } else {
-               cleanup_children(data->root);
-               free(data->root);
+               if (data->root) {
+                       cleanup_children(data->root);
+                       free(data->root);
+                       data->root = NULL;
+               }
+               /* For streaming insert '...' to define end of
+                * YAML document
+                */
+               if (data->async) {
+                       char *end_doc = "...\n";
+                       size_t len = strlen(end_doc) + 1;
+
+                       strncpy(data->buffer, end_doc, len);
+                       data->buffer += len - 1;
+               } else
+                       data->complete = true;
        }
 
-       data->complete = true;
-       return NL_STOP;
+       return data->async ? NL_OK : NL_STOP;
 }
 
 /**
@@ -919,6 +954,7 @@ yaml_parser_set_input_netlink(yaml_parser_t *reply, struct nl_sock *nl,
        }
 
        buf->nl = nl;
+       buf->async = stream;
        buf->parser = reply;
        yaml_parser_set_input(buf->parser, yaml_netlink_read_handler, buf);
 
@@ -987,11 +1023,16 @@ static enum lnet_nl_key_format yaml_format_type(yaml_emitter_t *emitter,
                                                unsigned int *offset,
                                                enum lnet_nl_key_format prev)
 {
+       enum lnet_nl_key_format fmt = 0;
        unsigned int indent = *offset;
        unsigned int new_indent = 0;
 
-       if (strchr(line, '{') || strchr(line, '['))
-               return LNKF_FLOW;
+       if (strchr(line, '{') || strchr(line, '[')) {
+               fmt = LNKF_FLOW;
+               if (strchr(line, '{'))
+                       fmt |= LNKF_MAPPING;
+               return fmt;
+       }
 
        new_indent = indent_level(line);
        if (new_indent < indent) {
@@ -1001,7 +1042,11 @@ static enum lnet_nl_key_format yaml_format_type(yaml_emitter_t *emitter,
 
        if (strncmp(line + new_indent, "- ", 2) == 0) {
                *offset = new_indent + emitter->best_indent;
-               return LNKF_SEQUENCE;
+               fmt = LNKF_SEQUENCE;
+
+               if (strstr(line + new_indent, ": "))
+                       fmt |= LNKF_MAPPING;
+               return fmt;
        }
 
        if (indent != new_indent) {
@@ -1010,7 +1055,34 @@ static enum lnet_nl_key_format yaml_format_type(yaml_emitter_t *emitter,
                        return LNKF_MAPPING;
        }
 
-       return 0;
+       return fmt;
+}
+
+static int yaml_fill_scalar_data(struct nl_msg *msg,
+                                enum lnet_nl_key_format fmt,
+                                char *line)
+{
+       char *sep = strchr(line, ':');
+       int rc = 0;
+
+       if (sep)
+               *sep++ = '\0';
+
+       NLA_PUT_STRING(msg, LN_SCALAR_ATTR_VALUE, line);
+       if (fmt & LNKF_MAPPING && sep) {
+               while (isspace(*sep))
+                       ++sep;
+
+               if (strspn(sep, "0123456789") == strlen(sep)) {
+                       unsigned long num = strtoull(sep, NULL, 0);
+
+                       NLA_PUT_S64(msg, LN_SCALAR_ATTR_INT_VALUE, num);
+               } else {
+                       NLA_PUT_STRING(msg, LN_SCALAR_ATTR_VALUE, sep);
+               }
+       }
+nla_put_failure:
+       return rc;
 }
 
 static int yaml_create_nested_list(struct yaml_netlink_output *out,
@@ -1026,35 +1098,51 @@ static int yaml_create_nested_list(struct yaml_netlink_output *out,
        if (!list) {
                yaml_emitter_set_writer_error(out->emitter,
                                              "Emmitter netlink list creation failed");
-               nlmsg_free(msg);
                rc = -EINVAL;
                goto nla_put_failure;
        }
 
        if (fmt & LNKF_FLOW) {
-               while ((line = strsep(hdr, ",")) != NULL) {
-                       char *tmp = NULL;
+               char *tmp = NULL;
 
-                       if (strchr(line, '{') ||
-                           strchr(line, '[') ||
-                           strchr(line, ' '))
-                               line++;
+               tmp = strchr(*hdr, '{');
+               if (!tmp) {
+                       tmp = strchr(*hdr, '[');
+                       if (!tmp) {
+                               yaml_emitter_set_writer_error(out->emitter,
+                                                             "Emmitter flow format invalid");
+                               rc = -EINVAL;
+                               goto nla_put_failure;
+                       }
+               }
+               *tmp = ' ';
 
-                       tmp = strchr(line, '}');
-                       if (!tmp)
-                               tmp = strchr(line, ']');
-                       if (tmp)
-                               *tmp = '\0';
+               tmp = strchr(*hdr, '}');
+               if (!tmp) {
+                       tmp = strchr(*hdr, ']');
+                       if (!tmp) {
+                               yaml_emitter_set_writer_error(out->emitter,
+                                                             "Emmitter flow format invalid");
+                               rc = -EINVAL;
+                               goto nla_put_failure;
+                       }
+               }
+               *tmp = '\0';
 
-                       NLA_PUT_STRING(msg,
-                                      LN_SCALAR_ATTR_VALUE,
-                                      line);
+               while ((line = strsep(hdr, ",")) != NULL) {
+                       if (isspace(line[0]))
+                               line++;
+                       rc = yaml_fill_scalar_data(msg, fmt, line);
+                       if (rc < 0)
+                               goto nla_put_failure;
                }
                nla_nest_end(msg, list);
                return 0;
        }
 
-       NLA_PUT_STRING(msg, LN_SCALAR_ATTR_VALUE, *hdr + *indent);
+       rc = yaml_fill_scalar_data(msg, fmt, *hdr + *indent);
+       if (rc < 0)
+               goto nla_put_failure;
        do {
                line = strsep(entry, "\n");
 have_next_line:
@@ -1065,7 +1153,7 @@ have_next_line:
                if (fmt == LNKF_END)
                        break;
 
-               if (fmt) {
+               if (fmt & ~LNKF_MAPPING) { /* Filter out mappings */
                        rc = yaml_create_nested_list(out, msg, &line, entry,
                                                     indent, fmt);
                        if (rc)
@@ -1073,8 +1161,9 @@ have_next_line:
 
                        goto have_next_line;
                } else {
-                       NLA_PUT_STRING(msg, LN_SCALAR_ATTR_VALUE,
-                                      line + *indent);
+                       rc = yaml_fill_scalar_data(msg, fmt, line + *indent);
+                       if (rc)
+                               goto nla_put_failure;
                }
        } while (strcmp(line, ""));
 
@@ -1097,20 +1186,20 @@ static void yaml_quotation_handling(char *buf)
 {
        char *tmp = buf, *line;
 
+       line = strstr(tmp, "! \'");
+       if (line)
+               line[0] = ' ';
+
        while ((line = strchr(tmp, '\"')) != NULL) {
-               line[0] = '%';
-               line[1] = ' ';
-               tmp = strchr(line, '\"') - 1;
+               line[0] = ' ';
+               tmp = strchr(line, '\"');
                tmp[0] = ' ';
-               tmp[1] = '%';
        }
 
        while ((line = strchr(tmp, '\'')) != NULL) {
-               line[0] = '%';
-               line[1] = ' ';
-               tmp = strchr(line, '\'') - 1;
+               line[0] = ' ';
+               tmp = strchr(line, '\'');
                tmp[0] = ' ';
-               tmp[1] = '%';
        }
 }
 
@@ -1188,11 +1277,11 @@ complicated:
 
                        fmt = yaml_format_type(out->emitter, line, &indent,
                                               fmt);
-                       if (fmt) {
+                       if (fmt & ~LNKF_MAPPING) {
                                rc = yaml_create_nested_list(out, msg, &line,
                                                             &entry, &indent,
                                                             fmt);
-                               if (rc) {
+                               if (rc < 0) {
                                        yaml_emitter_set_writer_error(out->emitter,
                                                                      nl_geterror(rc));
                                        nlmsg_free(msg);
@@ -1204,8 +1293,14 @@ complicated:
                                if (line)
                                        goto already_have_line;
                        } else {
-                               NLA_PUT_STRING(msg, LN_SCALAR_ATTR_VALUE,
-                                              line + indent);
+                               rc = yaml_fill_scalar_data(msg, fmt,
+                                                          line + indent);
+                               if (rc < 0) {
+                                       yaml_emitter_set_writer_error(out->emitter,
+                                                                     nl_geterror(rc));
+                                       nlmsg_free(msg);
+                                       goto nla_put_failure;
+                               }
                        }
                }
        }
@@ -1232,6 +1327,40 @@ nla_put_failure:
        return out->emitter->error == YAML_NO_ERROR ? 1 : 0;
 }
 
+/* This is the libnl callback for when an error has happened
+ * kernel side. An error message is sent back to the user.
+ */
+static int yaml_netlink_write_error(struct sockaddr_nl *who,
+                                   struct nlmsgerr *errmsg, void *arg)
+{
+       struct yaml_netlink_output *data = arg;
+       struct nlmsghdr *nlh = &errmsg->msg;
+
+       if ((nlh->nlmsg_type == NLMSG_ERROR ||
+            nlh->nlmsg_flags & NLM_F_ACK_TLVS) && errmsg->error) {
+               const char *errstr = nl_geterror(nl_syserr2nlerr(errmsg->error));
+
+#ifdef HAVE_USRSPC_NLMSGERR
+               /* Newer kernels support NLM_F_ACK_TLVS in nlmsg_flags
+                * which gives greater detail why we failed.
+                */
+               if (nlh->nlmsg_flags & NLM_F_ACK_TLVS &&
+                   !(nlh->nlmsg_flags & NLM_F_CAPPED)) {
+                       struct nlattr *head = ((void *)&errmsg->msg);
+                       struct nlattr *tb[NLMSGERR_ATTR_MAX + 1];
+
+                       if (nla_parse(tb, NLMSGERR_ATTR_MAX + 1, head,
+                                     nlmsg_attrlen(nlh, 0), NULL) == 0) {
+                               if (tb[NLMSGERR_ATTR_MSG])
+                                       errstr = nla_strdup(tb[NLMSGERR_ATTR_MSG]);
+                       }
+               }
+#endif /* HAVE_USRSPC_NLMSGERR */
+               yaml_emitter_set_writer_error(data->emitter, errstr);
+       }
+       return NL_STOP;
+}
+
 /* This function is used by external utilities to use Netlink with
  * libyaml so we can turn YAML documentations into Netlink message
  * to send. This behavior mirrors yaml_emitter_set_output_file()
@@ -1242,6 +1371,7 @@ yaml_emitter_set_output_netlink(yaml_emitter_t *sender, struct nl_sock *nl,
                                char *family, int version, int cmd, int flags)
 {
        struct yaml_netlink_output *out;
+       int rc;
 
        out = calloc(1, sizeof(*out));
        if (!out) {
@@ -1257,6 +1387,32 @@ yaml_emitter_set_output_netlink(yaml_emitter_t *sender, struct nl_sock *nl,
                free(out);
                return false;
        }
+
+       rc = nl_socket_modify_err_cb(nl, NL_CB_CUSTOM,
+                                    yaml_netlink_write_error, out);
+       if (rc < 0) {
+               yaml_emitter_set_writer_error(sender,
+                                             "failed to register error handling");
+               free(out);
+               return false;
+       }
+
+       rc = nl_socket_enable_broadcast_error(nl);
+       if (rc < 0) {
+               yaml_emitter_set_writer_error(sender,
+                                             "failed to enable broadcast errors");
+               free(out);
+               return false;
+       }
+
+       rc = nl_socket_set_ext_ack(nl, true);
+       if (rc < 0) {
+               yaml_emitter_set_writer_error(sender,
+                                             "failed to enable ext ack");
+               free(out);
+               return false;
+       }
+
        out->emitter = sender;
        out->nl = nl;
        out->family = family;
@@ -1322,14 +1478,12 @@ void yaml_parser_log_error(yaml_parser_t *parser, FILE *log, const char *errmsg)
                        extra = parser->problem;
 
                if (parser->problem_value != -1) {
-                       fprintf(log,
-                               "Failed to %s: reader error '%s':#%X at %ld'\n",
-                               errmsg, extra, parser->problem_value,
+                       fprintf(log, "Reader error: '%s':#%X at %ld'\n",
+                               extra, parser->problem_value,
                                (long)parser->problem_offset);
                } else {
-                       fprintf(log,
-                               "Failed to %s: reader error '%s' at %ld\n",
-                               errmsg, extra, (long)parser->problem_offset);
+                       fprintf(log, "Reader error: '%s' at %ld\n",
+                               extra, (long)parser->problem_offset);
                }
        /* fallthrough */
        default: