Whamcloud - gitweb
LU-5092 nodemap: transfer nodemaps between MGS, MDTs, and OSTs 30/11830/48
authorKit Westneat <kit.westneat@gmail.com>
Sat, 20 Feb 2016 22:11:25 +0000 (17:11 -0500)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 2 Jun 2016 04:38:13 +0000 (04:38 +0000)
This creates a new config lock on the MGS for transferring the
nodemaps.  The target MGCs can enqueue this lock, which is then
revoked by the MGS when the nodemaps are modified. The MGCs then
issue a read config RPC on the nodemap config lock, which causes
the MGS to transfer the nodemap index file to the MGCs. The MGCs
then process the index file, similarly to the way the MGS
processes it on start.

Signed-off-by: Kit Westneat <kit.westneat@gmail.com>
Change-Id: I99a34773298484f70a912761f5831b75196d41d8
Reviewed-on: http://review.whamcloud.com/11830
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_nodemap.h
lustre/include/obd_class.h
lustre/mgc/mgc_internal.h
lustre/mgc/mgc_request.c
lustre/mgs/mgs_handler.c
lustre/obdclass/dt_object.c
lustre/ptlrpc/nodemap_handler.c
lustre/ptlrpc/nodemap_internal.h
lustre/ptlrpc/nodemap_storage.c
lustre/tests/sanity-sec.sh

index 53cb965..b49774a 100644 (file)
@@ -137,4 +137,49 @@ void nm_config_file_deregister(const struct lu_env *env,
                               struct nm_config_file *ncf);
 struct lu_nodemap *nodemap_get_from_exp(struct obd_export *exp);
 void nodemap_putref(struct lu_nodemap *nodemap);
+
+#ifdef HAVE_SERVER_SUPPORT
+struct nodemap_range_tree {
+       struct interval_node *nmrt_range_interval_root;
+       unsigned int nmrt_range_highest_id;
+};
+
+struct nodemap_config {
+       /* Highest numerical lu_nodemap.nm_id defined */
+       unsigned int nmc_nodemap_highest_id;
+
+       /* Simple flag to determine if nodemaps are active */
+       bool nmc_nodemap_is_active;
+
+       /* Pointer to default nodemap as it is needed more often */
+       struct lu_nodemap *nmc_default_nodemap;
+
+       /**
+        * Lock required to access the range tree.
+        */
+       struct rw_semaphore nmc_range_tree_lock;
+       struct nodemap_range_tree nmc_range_tree;
+
+       /**
+        * Hash keyed on nodemap name containing all
+        * nodemaps
+        */
+       struct cfs_hash *nmc_nodemap_hash;
+};
+
+struct nodemap_config *nodemap_config_alloc(void);
+void nodemap_config_dealloc(struct nodemap_config *config);
+void nodemap_config_set_active(struct nodemap_config *config);
+
+int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
+                             struct lu_nodemap **recent_nodemap);
+#else /* disable nodemap processing in MGC of non-servers */
+static inline int nodemap_process_idx_pages(void *config,
+                                           union lu_page *lip,
+                                           struct lu_nodemap **recent_nodemap)
+{ return 0; }
+#endif /* HAVE_SERVER_SUPPORT */
+
+int nodemap_get_config_req(struct obd_device *mgs_obd,
+                          struct ptlrpc_request *req);
 #endif /* _LUSTRE_NODEMAP_H */
index 0b72e84..39ecf6b 100644 (file)
@@ -184,11 +184,12 @@ enum {
        CONFIG_T_SPTLRPC = 1,
        CONFIG_T_RECOVER = 2,
        CONFIG_T_PARAMS  = 3,
-       CONFIG_T_MAX     = 4
+       CONFIG_T_NODEMAP = 4,
+       CONFIG_T_MAX     = 5
 };
 
-#define PARAMS_FILENAME        "params"
-#define LCTL_UPCALL    "lctl"
+#define PARAMS_FILENAME                "params"
+#define LCTL_UPCALL            "lctl"
 
 /* list of active configuration logs  */
 struct config_llog_data {
@@ -199,6 +200,7 @@ struct config_llog_data {
        struct config_llog_data    *cld_sptlrpc;/* depended sptlrpc log */
        struct config_llog_data    *cld_params; /* common parameters log */
        struct config_llog_data    *cld_recover;/* imperative recover log */
+       struct config_llog_data    *cld_nodemap;/* nodemap log */
         struct obd_export          *cld_mgcexp;
        struct mutex                cld_lock;
         int                         cld_type;
index d854636..efbe0b8 100644 (file)
@@ -61,4 +61,9 @@ static inline int cld_is_recover(struct config_llog_data *cld)
         return cld->cld_type == CONFIG_T_RECOVER;
 }
 
+static inline int cld_is_nodemap(struct config_llog_data *cld)
+{
+       return cld->cld_type == CONFIG_T_NODEMAP;
+}
+
 #endif  /* _MGC_INTERNAL_H */
index 6f65aa9..6415307 100644 (file)
@@ -49,6 +49,7 @@
 #include <lustre_dlm.h>
 #include <lustre_disk.h>
 #include <lustre_log.h>
+#include <lustre_nodemap.h>
 #include <lustre_swab.h>
 #include <obd_class.h>
 
@@ -80,8 +81,9 @@ static int mgc_name2resid(char *name, int len, struct ldlm_res_id *res_id,
                 break;
        case CONFIG_T_RECOVER:
        case CONFIG_T_PARAMS:
-                resname = type;
-                break;
+       case CONFIG_T_NODEMAP:
+               resname = type;
+               break;
         default:
                 LBUG();
         }
@@ -145,22 +147,24 @@ static void config_log_put(struct config_llog_data *cld)
                list_del(&cld->cld_list_chain);
                spin_unlock(&config_list_lock);
 
-                CDEBUG(D_MGC, "dropping config log %s\n", cld->cld_logname);
+               CDEBUG(D_MGC, "dropping config log %s\n", cld->cld_logname);
 
-                if (cld->cld_recover)
-                        config_log_put(cld->cld_recover);
-                if (cld->cld_sptlrpc)
-                        config_log_put(cld->cld_sptlrpc);
+               if (cld->cld_recover)
+                       config_log_put(cld->cld_recover);
+               if (cld->cld_sptlrpc)
+                       config_log_put(cld->cld_sptlrpc);
                if (cld->cld_params)
                        config_log_put(cld->cld_params);
-                if (cld_is_sptlrpc(cld))
-                        sptlrpc_conf_log_stop(cld->cld_logname);
+               if (cld->cld_nodemap)
+                       config_log_put(cld->cld_nodemap);
+               if (cld_is_sptlrpc(cld))
+                       sptlrpc_conf_log_stop(cld->cld_logname);
 
-                class_export_put(cld->cld_mgcexp);
-                OBD_FREE(cld, sizeof(*cld) + strlen(cld->cld_logname) + 1);
-        }
+               class_export_put(cld->cld_mgcexp);
+               OBD_FREE(cld, sizeof(*cld) + strlen(cld->cld_logname) + 1);
+       }
 
-        EXIT;
+       EXIT;
 }
 
 /* Find a config log by name */
@@ -198,21 +202,22 @@ struct config_llog_data *config_log_find(char *logname,
 
 static
 struct config_llog_data *do_config_log_add(struct obd_device *obd,
-                                           char *logname,
-                                           int type,
-                                           struct config_llog_instance *cfg,
-                                           struct super_block *sb)
+                                          char *logname,
+                                          int type,
+                                          struct config_llog_instance *cfg,
+                                          struct super_block *sb)
 {
-        struct config_llog_data *cld;
-        int                      rc;
-        ENTRY;
+       struct config_llog_data *cld;
+       int rc;
 
-        CDEBUG(D_MGC, "do adding config log %s:%p\n", logname,
+       ENTRY;
+
+       CDEBUG(D_MGC, "do adding config log %s:%p\n", logname,
               cfg ? cfg->cfg_instance : NULL);
 
-        OBD_ALLOC(cld, sizeof(*cld) + strlen(logname) + 1);
-        if (!cld)
-                RETURN(ERR_PTR(-ENOMEM));
+       OBD_ALLOC(cld, sizeof(*cld) + strlen(logname) + 1);
+       if (!cld)
+               RETURN(ERR_PTR(-ENOMEM));
 
        strcpy(cld->cld_logname, logname);
        if (cfg)
@@ -226,32 +231,33 @@ struct config_llog_data *do_config_log_add(struct obd_device *obd,
        cld->cld_type = type;
        atomic_set(&cld->cld_refcount, 1);
 
-        /* Keep the mgc around until we are done */
-        cld->cld_mgcexp = class_export_get(obd->obd_self_export);
+       /* Keep the mgc around until we are done */
+       cld->cld_mgcexp = class_export_get(obd->obd_self_export);
 
-        if (cld_is_sptlrpc(cld)) {
-                sptlrpc_conf_log_start(logname);
-                cld->cld_cfg.cfg_obdname = obd->obd_name;
-        }
+       if (cld_is_sptlrpc(cld)) {
+               sptlrpc_conf_log_start(logname);
+               cld->cld_cfg.cfg_obdname = obd->obd_name;
+       }
 
-        rc = mgc_logname2resid(logname, &cld->cld_resid, type);
+       rc = mgc_logname2resid(logname, &cld->cld_resid, type);
 
        spin_lock(&config_list_lock);
        list_add(&cld->cld_list_chain, &config_llog_list);
        spin_unlock(&config_list_lock);
 
-        if (rc) {
-                config_log_put(cld);
-                RETURN(ERR_PTR(rc));
-        }
+       if (rc) {
+               config_log_put(cld);
+               RETURN(ERR_PTR(rc));
+       }
 
-        if (cld_is_sptlrpc(cld)) {
-                rc = mgc_process_log(obd, cld);
+       if (cld_is_sptlrpc(cld) || cld_is_nodemap(cld)) {
+               rc = mgc_process_log(obd, cld);
                if (rc && rc != -ENOENT)
-                        CERROR("failed processing sptlrpc log: %d\n", rc);
-        }
+                       CERROR("%s: failed processing log, type %d: rc = %d\n",
+                              obd->obd_name, type, rc);
+       }
 
-        RETURN(cld);
+       RETURN(cld);
 }
 
 static struct config_llog_data *config_recover_log_add(struct obd_device *obd,
@@ -308,62 +314,73 @@ static struct config_llog_data *config_params_log_add(struct obd_device *obd,
  * Each instance may be at a different point in the log.
  */
 static int config_log_add(struct obd_device *obd, char *logname,
-                          struct config_llog_instance *cfg,
-                          struct super_block *sb)
+                         struct config_llog_instance *cfg,
+                         struct super_block *sb)
 {
        struct lustre_sb_info   *lsi = s2lsi(sb);
        struct config_llog_data *cld;
        struct config_llog_data *sptlrpc_cld;
        struct config_llog_data *params_cld;
+       struct config_llog_data *nodemap_cld;
        char                    seclogname[32];
        char                    *ptr;
        int                     rc;
        ENTRY;
 
-        CDEBUG(D_MGC, "adding config log %s:%p\n", logname, cfg->cfg_instance);
+       CDEBUG(D_MGC, "adding config log %s:%p\n", logname, cfg->cfg_instance);
 
-        /*
-         * for each regular log, the depended sptlrpc log name is
-         * <fsname>-sptlrpc. multiple regular logs may share one sptlrpc log.
-         */
-        ptr = strrchr(logname, '-');
-        if (ptr == NULL || ptr - logname > 8) {
-                CERROR("logname %s is too long\n", logname);
-                RETURN(-EINVAL);
-        }
+       /*
+        * for each regular log, the depended sptlrpc log name is
+        * <fsname>-sptlrpc. multiple regular logs may share one sptlrpc log.
+        */
+       ptr = strrchr(logname, '-');
+       if (ptr == NULL || ptr - logname > 8) {
+               CERROR("logname %s is too long\n", logname);
+               RETURN(-EINVAL);
+       }
 
-        memcpy(seclogname, logname, ptr - logname);
-        strcpy(seclogname + (ptr - logname), "-sptlrpc");
+       memcpy(seclogname, logname, ptr - logname);
+       strcpy(seclogname + (ptr - logname), "-sptlrpc");
+
+       sptlrpc_cld = config_log_find(seclogname, NULL);
+       if (sptlrpc_cld == NULL) {
+               sptlrpc_cld = do_config_log_add(obd, seclogname,
+                                               CONFIG_T_SPTLRPC, NULL, NULL);
+               if (IS_ERR(sptlrpc_cld)) {
+                       CERROR("can't create sptlrpc log: %s\n", seclogname);
+                       GOTO(out, rc = PTR_ERR(sptlrpc_cld));
+               }
+       }
+
+       nodemap_cld = config_log_find(LUSTRE_NODEMAP_NAME, NULL);
+       if (!nodemap_cld && IS_SERVER(lsi) && !IS_MGS(lsi)) {
+               nodemap_cld = do_config_log_add(obd, LUSTRE_NODEMAP_NAME,
+                                               CONFIG_T_NODEMAP, NULL, NULL);
+               if (IS_ERR(nodemap_cld)) {
+                       rc = PTR_ERR(nodemap_cld);
+                       CERROR("%s: cannot create nodemap log: rc = %d\n",
+                              obd->obd_name, rc);
+                       GOTO(out_sptlrpc, rc);
+               }
+       }
 
-        sptlrpc_cld = config_log_find(seclogname, NULL);
-        if (sptlrpc_cld == NULL) {
-                sptlrpc_cld = do_config_log_add(obd, seclogname,
-                                                CONFIG_T_SPTLRPC, NULL, NULL);
-                if (IS_ERR(sptlrpc_cld)) {
-                        CERROR("can't create sptlrpc log: %s\n", seclogname);
-                       GOTO(out_err, rc = PTR_ERR(sptlrpc_cld));
-                }
-        }
        params_cld = config_params_log_add(obd, cfg, sb);
        if (IS_ERR(params_cld)) {
                rc = PTR_ERR(params_cld);
                CERROR("%s: can't create params log: rc = %d\n",
                       obd->obd_name, rc);
-               GOTO(out_err1, rc);
+               GOTO(out_nodemap, rc);
        }
 
        cld = do_config_log_add(obd, logname, CONFIG_T_CONFIG, cfg, sb);
        if (IS_ERR(cld)) {
                CERROR("can't create log: %s\n", logname);
-               GOTO(out_err2, rc = PTR_ERR(cld));
+               GOTO(out_params, rc = PTR_ERR(cld));
        }
 
-       cld->cld_sptlrpc = sptlrpc_cld;
-       cld->cld_params = params_cld;
-
-        LASSERT(lsi->lsi_lmd);
-        if (!(lsi->lsi_lmd->lmd_flags & LMD_FLG_NOIR)) {
-                struct config_llog_data *recover_cld;
+       LASSERT(lsi->lsi_lmd);
+       if (!(lsi->lsi_lmd->lmd_flags & LMD_FLG_NOIR)) {
+               struct config_llog_data *recover_cld;
                ptr = strrchr(seclogname, '-');
                if (ptr != NULL) {
                        *ptr = 0;
@@ -374,25 +391,32 @@ static int config_log_add(struct obd_device *obd, char *logname,
                        config_log_put(cld);
                        RETURN(-EINVAL);
                }
-                recover_cld = config_recover_log_add(obd, seclogname, cfg, sb);
+               recover_cld = config_recover_log_add(obd, seclogname, cfg, sb);
                if (IS_ERR(recover_cld))
-                       GOTO(out_err3, rc = PTR_ERR(recover_cld));
+                       GOTO(out_cld, rc = PTR_ERR(recover_cld));
                cld->cld_recover = recover_cld;
        }
 
+       cld->cld_sptlrpc = sptlrpc_cld;
+       cld->cld_params = params_cld;
+       cld->cld_nodemap = nodemap_cld;
+
        RETURN(0);
 
-out_err3:
+out_cld:
        config_log_put(cld);
 
-out_err2:
+out_params:
        config_log_put(params_cld);
 
-out_err1:
+out_nodemap:
+       config_log_put(nodemap_cld);
+
+out_sptlrpc:
        config_log_put(sptlrpc_cld);
 
-out_err:
-       RETURN(rc);
+out:
+       return rc;
 }
 
 DEFINE_MUTEX(llog_process_lock);
@@ -401,36 +425,38 @@ DEFINE_MUTEX(llog_process_lock);
  */
 static int config_log_end(char *logname, struct config_llog_instance *cfg)
 {
-        struct config_llog_data *cld;
-        struct config_llog_data *cld_sptlrpc = NULL;
+       struct config_llog_data *cld;
+       struct config_llog_data *cld_sptlrpc = NULL;
        struct config_llog_data *cld_params = NULL;
-        struct config_llog_data *cld_recover = NULL;
-        int rc = 0;
-        ENTRY;
+       struct config_llog_data *cld_recover = NULL;
+       struct config_llog_data *cld_nodemap = NULL;
+       int rc = 0;
+
+       ENTRY;
 
-        cld = config_log_find(logname, cfg);
-        if (cld == NULL)
-                RETURN(-ENOENT);
+       cld = config_log_find(logname, cfg);
+       if (cld == NULL)
+               RETURN(-ENOENT);
 
        mutex_lock(&cld->cld_lock);
-        /*
-         * if cld_stopping is set, it means we didn't start the log thus
-         * not owning the start ref. this can happen after previous umount:
-         * the cld still hanging there waiting for lock cancel, and we
-         * remount again but failed in the middle and call log_end without
-         * calling start_log.
-         */
-        if (unlikely(cld->cld_stopping)) {
+       /*
+        * if cld_stopping is set, it means we didn't start the log thus
+        * not owning the start ref. this can happen after previous umount:
+        * the cld still hanging there waiting for lock cancel, and we
+        * remount again but failed in the middle and call log_end without
+        * calling start_log.
+        */
+       if (unlikely(cld->cld_stopping)) {
                mutex_unlock(&cld->cld_lock);
-                /* drop the ref from the find */
-                config_log_put(cld);
-                RETURN(rc);
-        }
+               /* drop the ref from the find */
+               config_log_put(cld);
+               RETURN(rc);
+       }
 
-        cld->cld_stopping = 1;
+       cld->cld_stopping = 1;
 
-        cld_recover = cld->cld_recover;
-        cld->cld_recover = NULL;
+       cld_recover = cld->cld_recover;
+       cld->cld_recover = NULL;
        mutex_unlock(&cld->cld_lock);
 
        if (cld_recover) {
@@ -445,10 +471,12 @@ static int config_log_end(char *logname, struct config_llog_instance *cfg)
        cld->cld_sptlrpc = NULL;
        cld_params = cld->cld_params;
        cld->cld_params = NULL;
+       cld_nodemap = cld->cld_nodemap;
+       cld->cld_nodemap = NULL;
        spin_unlock(&config_list_lock);
 
-        if (cld_sptlrpc)
-                config_log_put(cld_sptlrpc);
+       if (cld_sptlrpc)
+               config_log_put(cld_sptlrpc);
 
        if (cld_params) {
                mutex_lock(&cld_params->cld_lock);
@@ -457,14 +485,21 @@ static int config_log_end(char *logname, struct config_llog_instance *cfg)
                config_log_put(cld_params);
        }
 
-        /* drop the ref from the find */
-        config_log_put(cld);
-        /* drop the start ref */
-        config_log_put(cld);
+       if (cld_nodemap) {
+               mutex_lock(&cld_nodemap->cld_lock);
+               cld_nodemap->cld_stopping = 1;
+               mutex_unlock(&cld_nodemap->cld_lock);
+               config_log_put(cld_nodemap);
+       }
 
-        CDEBUG(D_MGC, "end config log %s (%d)\n", logname ? logname : "client",
-               rc);
-        RETURN(rc);
+       /* drop the ref from the find */
+       config_log_put(cld);
+       /* drop the start ref */
+       config_log_put(cld);
+
+       CDEBUG(D_MGC, "end config log %s (%d)\n", logname ? logname : "client",
+              rc);
+       RETURN(rc);
 }
 
 #ifdef CONFIG_PROC_FS
@@ -1532,24 +1567,40 @@ static int mgc_apply_recover_logs(struct obd_device *mgc,
 
 /**
  * This function is called if this client was notified for target restarting
- * by the MGS. A CONFIG_READ RPC is going to send to fetch recovery logs.
+ * by the MGS. A CONFIG_READ RPC is going to send to fetch recovery or
+ * nodemap logs.
  */
-static int mgc_process_recover_log(struct obd_device *obd,
-                                   struct config_llog_data *cld)
+static int mgc_process_recover_nodemap_log(struct obd_device *obd,
+                                          struct config_llog_data *cld)
 {
-        struct ptlrpc_request *req = NULL;
-        struct config_llog_instance *cfg = &cld->cld_cfg;
-        struct mgs_config_body *body;
-        struct mgs_config_res  *res;
-        struct ptlrpc_bulk_desc *desc;
+       struct ptlrpc_request *req = NULL;
+       struct config_llog_instance *cfg = &cld->cld_cfg;
+       struct mgs_config_body *body;
+       struct mgs_config_res *res;
+
+       /* When a nodemap config is received, we build a new nodemap config,
+        * with new nodemap structs. We keep track of the most recently added
+        * nodemap since the config is read ordered by nodemap_id, and so it
+        * is likely that the next record will be related. Because access to
+        * the nodemaps is single threaded until the nodemap_config is active,
+        * we don't need to reference count with recent_nodemap, though
+        * recent_nodemap should be set to NULL when the nodemap_config
+        * is either destroyed or set active.
+        */
+       struct nodemap_config *new_config = NULL;
+       struct lu_nodemap *recent_nodemap = NULL;
+
+       struct ptlrpc_bulk_desc *desc;
        struct page **pages;
-        int nrpages;
-        bool eof = true;
+       __u64 config_read_offset = 0;
+       int nrpages;
+       bool eof = true;
        bool mne_swab = false;
-        int i;
-        int ealen;
-        int rc;
-        ENTRY;
+       int i;
+       int ealen;
+       int rc;
+
+       ENTRY;
 
         /* allocate buffer for bulk transfer.
          * if this is the first time for this mgs to read logs,
@@ -1558,7 +1609,7 @@ static int mgc_process_recover_log(struct obd_device *obd,
          * small and CONFIG_READ_NRPAGES will be used.
          */
         nrpages = CONFIG_READ_NRPAGES;
-        if (cfg->cfg_last_idx == 0) /* the first time */
+       if (cfg->cfg_last_idx == 0 || cld_is_nodemap(cld))
                 nrpages = CONFIG_READ_NRPAGES_INIT;
 
         OBD_ALLOC(pages, sizeof(*pages) * nrpages);
@@ -1571,29 +1622,42 @@ static int mgc_process_recover_log(struct obd_device *obd,
                         GOTO(out, rc = -ENOMEM);
         }
 
+#ifdef HAVE_SERVER_SUPPORT
+       if (cld_is_nodemap(cld)) {
+               new_config = nodemap_config_alloc();
+               if (IS_ERR(new_config)) {
+                       rc = PTR_ERR(new_config);
+                       new_config = NULL;
+                       GOTO(out, rc);
+               }
+       }
+#endif
 again:
-        LASSERT(cld_is_recover(cld));
+       LASSERT(cld_is_recover(cld) || cld_is_nodemap(cld));
        LASSERT(mutex_is_locked(&cld->cld_lock));
-        req = ptlrpc_request_alloc(class_exp2cliimp(cld->cld_mgcexp),
-                                   &RQF_MGS_CONFIG_READ);
-        if (req == NULL)
-                GOTO(out, rc = -ENOMEM);
+       req = ptlrpc_request_alloc(class_exp2cliimp(cld->cld_mgcexp),
+                                  &RQF_MGS_CONFIG_READ);
+       if (req == NULL)
+               GOTO(out, rc = -ENOMEM);
 
-        rc = ptlrpc_request_pack(req, LUSTRE_MGS_VERSION, MGS_CONFIG_READ);
-        if (rc)
-                GOTO(out, rc);
+       rc = ptlrpc_request_pack(req, LUSTRE_MGS_VERSION, MGS_CONFIG_READ);
+       if (rc)
+               GOTO(out, rc);
 
-        /* pack request */
-        body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
-        LASSERT(body != NULL);
-        LASSERT(sizeof(body->mcb_name) > strlen(cld->cld_logname));
+       /* pack request */
+       body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
+       LASSERT(body != NULL);
+       LASSERT(sizeof(body->mcb_name) > strlen(cld->cld_logname));
        if (strlcpy(body->mcb_name, cld->cld_logname, sizeof(body->mcb_name))
            >= sizeof(body->mcb_name))
                GOTO(out, rc = -E2BIG);
-        body->mcb_offset = cfg->cfg_last_idx + 1;
-        body->mcb_type   = cld->cld_type;
+       if (cld_is_nodemap(cld))
+               body->mcb_offset = config_read_offset;
+       else
+               body->mcb_offset = cfg->cfg_last_idx + 1;
+       body->mcb_type   = cld->cld_type;
        body->mcb_bits   = PAGE_CACHE_SHIFT;
-        body->mcb_units  = nrpages;
+       body->mcb_units  = nrpages;
 
        /* allocate bulk transfer descriptor */
        desc = ptlrpc_prep_bulk_imp(req, nrpages, 1,
@@ -1607,26 +1671,35 @@ again:
                desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
                                                 PAGE_CACHE_SIZE);
 
-        ptlrpc_request_set_replen(req);
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out, rc);
+       ptlrpc_request_set_replen(req);
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               GOTO(out, rc);
 
-        res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
-        if (res->mcr_size < res->mcr_offset)
-                GOTO(out, rc = -EINVAL);
+       res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
+       if (!res)
+               GOTO(out, rc = -EPROTO);
 
-        /* always update the index even though it might have errors with
-         * handling the recover logs */
-        cfg->cfg_last_idx = res->mcr_offset;
-        eof = res->mcr_offset == res->mcr_size;
+       if (cld_is_nodemap(cld)) {
+               config_read_offset = res->mcr_offset;
+               eof = config_read_offset == II_END_OFF;
+       } else {
+               if (res->mcr_size < res->mcr_offset)
+                       GOTO(out, rc = -EINVAL);
 
-        CDEBUG(D_INFO, "Latest version "LPD64", more %d.\n",
-               res->mcr_offset, eof == false);
+               /* always update the index even though it might have errors with
+                * handling the recover logs
+                */
+               cfg->cfg_last_idx = res->mcr_offset;
+               eof = res->mcr_offset == res->mcr_size;
 
-        ealen = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, 0);
-        if (ealen < 0)
-                GOTO(out, rc = ealen);
+               CDEBUG(D_INFO, "Latest version "LPD64", more %d.\n",
+                      res->mcr_offset, eof == false);
+       }
+
+       ealen = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, 0);
+       if (ealen < 0)
+               GOTO(out, rc = ealen);
 
        if (ealen > nrpages << PAGE_CACHE_SHIFT)
                GOTO(out, rc = -EINVAL);
@@ -1647,28 +1720,47 @@ again:
 
        for (i = 0; i < nrpages && ealen > 0; i++) {
                int rc2;
-               void *ptr;
+               union lu_page   *ptr;
 
                ptr = kmap(pages[i]);
-               rc2 = mgc_apply_recover_logs(obd, cld, res->mcr_offset, ptr,
-                                            min_t(int, ealen, PAGE_CACHE_SIZE),
-                                            mne_swab);
+               if (cld_is_nodemap(cld))
+                       rc2 = nodemap_process_idx_pages(new_config, ptr,
+                                                      &recent_nodemap);
+               else
+                       rc2 = mgc_apply_recover_logs(obd, cld, res->mcr_offset,
+                                                    ptr,
+                                                    min_t(int, ealen,
+                                                          PAGE_CACHE_SIZE),
+                                                    mne_swab);
                kunmap(pages[i]);
                if (rc2 < 0) {
-                       CWARN("Process recover log %s error %d\n",
-                             cld->cld_logname, rc2);
+                       CWARN("%s: error processing %s log %s: rc = %d\n",
+                             obd->obd_name,
+                             cld_is_nodemap(cld) ? "nodemap" : "recovery",
+                             cld->cld_logname,
+                             rc2);
                        break;
-                }
+               }
 
                ealen -= PAGE_CACHE_SIZE;
-        }
+       }
 
 out:
-        if (req)
-                ptlrpc_req_finished(req);
-
-        if (rc == 0 && !eof)
-                goto again;
+       if (req)
+               ptlrpc_req_finished(req);
+
+       if (rc == 0 && !eof)
+               goto again;
+
+#ifdef HAVE_SERVER_SUPPORT
+       if (new_config != NULL) {
+               recent_nodemap = NULL;
+               if (rc == 0)
+                       nodemap_config_set_active(new_config);
+               else
+                       nodemap_config_dealloc(new_config);
+       }
+#endif
 
        if (pages) {
                for (i = 0; i < nrpages; i++) {
@@ -1951,24 +2043,26 @@ restart:
        }
 
 
-        if (cld_is_recover(cld)) {
-                rc = 0; /* this is not a fatal error for recover log */
-                if (rcl == 0) {
-                        rc = mgc_process_recover_log(mgc, cld);
-                       if (rc != 0) {
-                               CERROR("%s: recover log %s failed: rc = %d"
-                                      "not fatal.\n", mgc->obd_name,
-                                      cld->cld_logname, rc);
-                               rc = 0;
+       if (cld_is_recover(cld) || cld_is_nodemap(cld)) {
+               if (!rcl)
+                       rc = mgc_process_recover_nodemap_log(mgc, cld);
+               else if (cld_is_nodemap(cld))
+                       rc = rcl;
+
+               if (cld_is_recover(cld) && rc) {
+                       if (!rcl) {
+                               CERROR("%s: recover log %s failed, not fatal: rc = %d\n",
+                                      mgc->obd_name, cld->cld_logname, rc);
                                cld->cld_lostlock = 1;
                        }
+                       rc = 0; /* this is not a fatal error for recover log */
                }
-        } else {
-                rc = mgc_process_cfg_log(mgc, cld, rcl != 0);
-        }
+       } else {
+               rc = mgc_process_cfg_log(mgc, cld, rcl != 0);
+       }
 
-        CDEBUG(D_MGC, "%s: configuration from log '%s' %sed (%d).\n",
-               mgc->obd_name, cld->cld_logname, rc ? "fail" : "succeed", rc);
+       CDEBUG(D_MGC, "%s: configuration from log '%s' %sed (%d).\n",
+              mgc->obd_name, cld->cld_logname, rc ? "fail" : "succeed", rc);
 
        mutex_unlock(&cld->cld_lock);
 
index f1a643e..b60aef9 100644 (file)
@@ -238,6 +238,7 @@ void mgs_revoke_lock(struct mgs_device *mgs, struct fs_db *fsdb, int type)
        LASSERT(rc == 0);
        switch (type) {
        case CONFIG_T_CONFIG:
+       case CONFIG_T_NODEMAP:
                cp = mgs_completion_ast_config;
                if (test_and_set_bit(FSDB_REVOKING_LOCK, &fsdb->fsdb_flags))
                        rc = -EALREADY;
@@ -514,6 +515,9 @@ static int mgs_config_read(struct tgt_session_info *tsi)
        case CONFIG_T_RECOVER:
                rc = mgs_get_ir_logs(req);
                break;
+       case CONFIG_T_NODEMAP:
+               rc = nodemap_get_config_req(req->rq_export->exp_obd, req);
+               break;
        case CONFIG_T_CONFIG:
                rc = -EOPNOTSUPP;
                break;
@@ -652,6 +656,7 @@ static int mgs_iocontrol_nodemap(const struct lu_env *env,
                                 struct obd_ioctl_data *data)
 {
        struct lustre_cfg       *lcfg = NULL;
+       struct fs_db            *fsdb;
        lnet_nid_t              nid;
        const char              *nodemap_name = NULL;
        const char              *nidstr = NULL;
@@ -774,6 +779,14 @@ static int mgs_iocontrol_nodemap(const struct lu_env *env,
                GOTO(out_lcfg, rc);
        }
 
+       /* revoke nodemap lock */
+       rc = mgs_find_or_make_fsdb(env, mgs, LUSTRE_NODEMAP_NAME, &fsdb);
+       if (rc < 0)
+               CWARN("%s: cannot make nodemap fsdb: rc = %d\n",
+                     mgs->mgs_obd->obd_name, rc);
+       else
+               mgs_revoke_lock(mgs, fsdb, CONFIG_T_NODEMAP);
+
 out_lcfg:
        OBD_FREE(lcfg, data->ioc_plen1);
 out:
index 2c13a3c..2a9016f 100644 (file)
@@ -803,7 +803,8 @@ out:
  * \param obj - is the index object to parse
  * \param rdpg - is the lu_rdpg descriptor associated with the transfer
  * \param filler - is the callback function responsible for filling a lu_page
- *                 with key/record pairs in the format wanted by the caller
+ *                 with key/record pairs in the format wanted by the caller.
+ *                 If NULL, uses dt_index_page_build
  * \param arg    - is an opaq argument passed to the filler function
  *
  * \retval sum (in bytes) of all filled lu_pages
@@ -822,6 +823,9 @@ int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
        LASSERT(rdpg->rp_pages != NULL);
        LASSERT(obj->do_index_ops != NULL);
 
+       if (filler == NULL)
+               filler = dt_index_page_build;
+
        nob = rdpg->rp_count;
        if (nob == 0)
                RETURN(-EFAULT);
index 28f34c9..5f82521 100644 (file)
@@ -92,6 +92,8 @@ static void nodemap_destroy(struct lu_nodemap *nodemap)
 void nodemap_getref(struct lu_nodemap *nodemap)
 {
        atomic_inc(&nodemap->nm_refcount);
+       CDEBUG(D_INFO, "GETting nodemap %s(p=%p) : new refcount %d\n",
+              nodemap->nm_name, nodemap, atomic_read(&nodemap->nm_refcount));
 }
 
 /**
@@ -105,6 +107,10 @@ void nodemap_putref(struct lu_nodemap *nodemap)
 
        LASSERT(atomic_read(&nodemap->nm_refcount) > 0);
 
+       CDEBUG(D_INFO, "PUTting nodemap %s(p=%p) : new refcount %d\n",
+              nodemap->nm_name, nodemap,
+              atomic_read(&nodemap->nm_refcount) - 1);
+
        if (atomic_dec_and_test(&nodemap->nm_refcount))
                nodemap_destroy(nodemap);
 }
@@ -1202,10 +1208,31 @@ static int nodemap_cleanup_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
        return 0;
 }
 
+struct nodemap_config *nodemap_config_alloc(void)
+{
+       struct nodemap_config *config;
+       int rc = 0;
+
+       OBD_ALLOC_PTR(config);
+       if (config == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       rc = nodemap_init_hash(config);
+       if (rc != 0) {
+               OBD_FREE_PTR(config);
+               return ERR_PTR(rc);
+       }
+
+       init_rwsem(&config->nmc_range_tree_lock);
+
+       return config;
+}
+EXPORT_SYMBOL(nodemap_config_alloc);
+
 /**
  * Walk the nodemap_hash and remove all nodemaps.
  */
-void nodemap_config_cleanup(struct nodemap_config *config)
+void nodemap_config_dealloc(struct nodemap_config *config)
 {
        struct lu_nodemap       *nodemap = NULL;
        struct lu_nodemap       *nodemap_temp;
@@ -1223,6 +1250,9 @@ void nodemap_config_cleanup(struct nodemap_config *config)
        list_for_each_entry_safe(nodemap, nodemap_temp, &nodemap_list_head,
                                 nm_list) {
                down_write(&config->nmc_range_tree_lock);
+
+               /* move members to new config */
+               nm_member_reclassify_nodemap(nodemap);
                list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
                                         rn_list)
                        range_delete(&config->nmc_range_tree, range);
@@ -1230,33 +1260,9 @@ void nodemap_config_cleanup(struct nodemap_config *config)
 
                nodemap_putref(nodemap);
        }
-}
-
-struct nodemap_config *nodemap_config_alloc(void)
-{
-       struct nodemap_config *config;
-       int rc = 0;
-
-       OBD_ALLOC_PTR(config);
-       if (config == NULL)
-               return ERR_PTR(-ENOMEM);
-
-       rc = nodemap_init_hash(config);
-       if (rc != 0) {
-               OBD_FREE_PTR(config);
-               return ERR_PTR(rc);
-       }
-
-       init_rwsem(&config->nmc_range_tree_lock);
-
-       return config;
-}
-
-void nodemap_config_dealloc(struct nodemap_config *config)
-{
-       nodemap_config_cleanup(config);
        OBD_FREE_PTR(config);
 }
+EXPORT_SYMBOL(nodemap_config_dealloc);
 
 static int nm_hash_list_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
                           struct hlist_node *hnode,
@@ -1319,6 +1325,7 @@ void nodemap_config_set_active(struct nodemap_config *config)
 
        EXIT;
 }
+EXPORT_SYMBOL(nodemap_config_set_active);
 
 /**
  * Cleanup nodemap module on exit
index d683d0d..0676aa7 100644 (file)
@@ -78,34 +78,6 @@ struct lu_idmap {
        struct rb_node  id_fs_to_client;
 };
 
-struct nodemap_range_tree {
-       struct interval_node *nmrt_range_interval_root;
-       unsigned int nmrt_range_highest_id;
-};
-
-struct nodemap_config {
-       /* Highest numerical lu_nodemap.nm_id defined */
-       unsigned int nmc_nodemap_highest_id;
-
-       /* Simple flag to determine if nodemaps are active */
-       bool nmc_nodemap_is_active;
-
-       /* Pointer to default nodemap as it is needed more often */
-       struct lu_nodemap *nmc_default_nodemap;
-
-       /**
-        * Lock required to access the range tree.
-        */
-       struct rw_semaphore nmc_range_tree_lock;
-       struct nodemap_range_tree nmc_range_tree;
-
-       /**
-        * Hash keyed on nodemap name containing all
-        * nodemaps
-        */
-       struct cfs_hash *nmc_nodemap_hash;
-};
-
 /* first 4 bits of the nodemap_id is the index type */
 struct nodemap_key {
        __u32 nk_nodemap_id;
@@ -138,9 +110,6 @@ static inline __u32 nm_idx_set_type(unsigned int id, enum nodemap_idx_type t)
        return (id & NM_TYPE_MASK) | (t << NM_TYPE_SHIFT);
 }
 
-struct nodemap_config *nodemap_config_alloc(void);
-void nodemap_config_dealloc(struct nodemap_config *config);
-void nodemap_config_set_active(struct nodemap_config *config);
 struct lu_nodemap *nodemap_create(const char *name,
                                  struct nodemap_config *config,
                                  bool is_default);
index cd396a9..b61e97d 100644 (file)
@@ -504,6 +504,9 @@ static int nodemap_process_keyrec(struct nodemap_config *config,
        type = nm_idx_get_type(nodemap_id);
        nodemap_id = nm_idx_set_type(nodemap_id, 0);
 
+       CDEBUG(D_INFO, "found config entry, nm_id %d type %d\n",
+              nodemap_id, type);
+
        /* find the correct nodemap in the load list */
        if (type == NODEMAP_RANGE_IDX || type == NODEMAP_UIDMAP_IDX ||
            type == NODEMAP_GIDMAP_IDX) {
@@ -706,7 +709,8 @@ out:
                        rc = PTR_ERR(new_config->nmc_default_nodemap);
                } else {
                        rc = nodemap_idx_nodemap_add_update(
-                                       new_config->nmc_default_nodemap, 0);
+                                       new_config->nmc_default_nodemap,
+                                       NM_ADD);
                        nodemap_putref(new_config->nmc_default_nodemap);
                }
        }
@@ -793,3 +797,156 @@ void nm_config_file_deregister(const struct lu_env *env,
        EXIT;
 }
 EXPORT_SYMBOL(nm_config_file_deregister);
+
+int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
+                             struct lu_nodemap **recent_nodemap)
+{
+       struct nodemap_key *key;
+       union nodemap_rec *rec;
+       char *entry;
+       int j;
+       int k;
+       int rc = 0;
+       int size = dt_nodemap_features.dif_keysize_max +
+                  dt_nodemap_features.dif_recsize_max;
+
+       for (j = 0; j < LU_PAGE_COUNT; j++) {
+               if (lip->lp_idx.lip_magic != LIP_MAGIC)
+                       return -EINVAL;
+
+               /* get and process keys and records from page */
+               for (k = 0; k < lip->lp_idx.lip_nr; k++) {
+                       entry = lip->lp_idx.lip_entries + k * size;
+                       key = (struct nodemap_key *)entry;
+
+                       entry += dt_nodemap_features.dif_keysize_max;
+                       rec = (union nodemap_rec *)entry;
+
+                       rc = nodemap_process_keyrec(config, key, rec,
+                                                   recent_nodemap);
+                       if (rc < 0)
+                               return rc;
+               }
+               lip++;
+       }
+       return 0;
+}
+EXPORT_SYMBOL(nodemap_process_idx_pages);
+
+int nodemap_index_read(struct lu_env *env,
+                      struct nm_config_file *ncf,
+                      struct idx_info *ii,
+                      const struct lu_rdpg *rdpg)
+{
+       struct dt_object        *nodemap_idx = ncf->ncf_obj;
+       int                      rc = 0;
+
+       ii->ii_keysize = dt_nodemap_features.dif_keysize_max;
+       ii->ii_recsize = dt_nodemap_features.dif_recsize_max;
+
+       dt_read_lock(env, nodemap_idx, 0);
+       rc = dt_index_walk(env, nodemap_idx, rdpg, NULL, ii);
+       CDEBUG(D_INFO, "walked index, hashend %llx\n", ii->ii_hash_end);
+
+       dt_read_unlock(env, nodemap_idx);
+       return rc;
+}
+EXPORT_SYMBOL(nodemap_index_read);
+
+/**
+ * Returns the current nodemap configuration to MGC by walking the nodemap
+ * config index and storing it in the response buffer.
+ *
+ * \param      req             incoming MGS_CONFIG_READ request
+ * \retval     0               success
+ * \retval     -EINVAL         malformed request
+ * \retval     -ENOTCONN       client evicted/reconnected already
+ * \retval     -ETIMEDOUT      client timeout or network error
+ * \retval     -ENOMEM
+ */
+int nodemap_get_config_req(struct obd_device *mgs_obd,
+                          struct ptlrpc_request *req)
+{
+       struct mgs_config_body *body;
+       struct mgs_config_res *res;
+       struct lu_rdpg rdpg;
+       struct idx_info nodemap_ii;
+       struct ptlrpc_bulk_desc *desc;
+       struct l_wait_info lwi;
+       int i;
+       int page_count;
+       int bytes = 0;
+       int rc = 0;
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
+       if (!body)
+               RETURN(-EINVAL);
+
+       if (body->mcb_type != CONFIG_T_NODEMAP)
+               RETURN(-EINVAL);
+
+       rdpg.rp_count = (body->mcb_units << body->mcb_bits);
+       rdpg.rp_npages = (rdpg.rp_count + PAGE_CACHE_SIZE - 1) >>
+               PAGE_CACHE_SHIFT;
+       if (rdpg.rp_npages > PTLRPC_MAX_BRW_PAGES)
+               RETURN(-EINVAL);
+
+       CDEBUG(D_INFO, "reading nodemap log, name '%s', size = %u\n",
+              body->mcb_name, rdpg.rp_count);
+
+       /* allocate pages to store the containers */
+       OBD_ALLOC(rdpg.rp_pages, sizeof(*rdpg.rp_pages) * rdpg.rp_npages);
+       if (rdpg.rp_pages == NULL)
+               RETURN(-ENOMEM);
+       for (i = 0; i < rdpg.rp_npages; i++) {
+               rdpg.rp_pages[i] = alloc_page(GFP_IOFS);
+               if (rdpg.rp_pages[i] == NULL)
+                       GOTO(out, rc = -ENOMEM);
+       }
+
+       rdpg.rp_hash = body->mcb_offset;
+       nodemap_ii.ii_magic = IDX_INFO_MAGIC;
+       nodemap_ii.ii_flags = II_FL_NOHASH;
+
+       bytes = nodemap_index_read(req->rq_svc_thread->t_env,
+                                  mgs_obd->u.obt.obt_nodemap_config_file,
+                                  &nodemap_ii, &rdpg);
+       if (bytes < 0)
+               GOTO(out, rc = bytes);
+
+       res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
+       if (res == NULL)
+               GOTO(out, rc = -EINVAL);
+       res->mcr_offset = nodemap_ii.ii_hash_end;
+       res->mcr_size = bytes;
+
+       page_count = (bytes + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
+       LASSERT(page_count <= rdpg.rp_count);
+       desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
+                                   PTLRPC_BULK_PUT_SOURCE |
+                                       PTLRPC_BULK_BUF_KIOV,
+                                   MGS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
+       if (desc == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       for (i = 0; i < page_count && bytes > 0; i++) {
+               ptlrpc_prep_bulk_page_pin(desc, rdpg.rp_pages[i], 0,
+                                         min_t(int, bytes, PAGE_CACHE_SIZE));
+               bytes -= PAGE_CACHE_SIZE;
+       }
+
+       rc = target_bulk_io(req->rq_export, desc, &lwi);
+       ptlrpc_free_bulk(desc);
+
+out:
+       if (rdpg.rp_pages != NULL) {
+               for (i = 0; i < rdpg.rp_npages; i++)
+                       if (rdpg.rp_pages[i] != NULL)
+                               __free_page(rdpg.rp_pages[i]);
+               OBD_FREE(rdpg.rp_pages,
+                        rdpg.rp_npages * sizeof(rdpg.rp_pages[0]));
+       }
+       return rc;
+}
+EXPORT_SYMBOL(nodemap_get_config_req);
index d9e7af8..c954d7e 100755 (executable)
@@ -21,6 +21,13 @@ init_test_env $@
 . ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
 init_logging
 
+NODEMAP_TESTS=$(seq 7 26)
+
+if ! check_versions; then
+       echo "It is NOT necessary to test nodemap under interoperation mode"
+       EXCEPT="$EXCEPT $NODEMAP_TESTS"
+fi
+
 [ "$SLOW" = "no" ] && EXCEPT_SLOW="26"
 
 [ "$ALWAYS_EXCEPT$EXCEPT$EXCEPT_SLOW" ] &&
@@ -949,16 +956,45 @@ test_15() {
 }
 run_test 15 "test id mapping"
 
-# Until nodemaps are distributed by MGS, they need to be distributed manually
-# This function and all calls to it should be removed once the MGS distributes
-# nodemaps to the MDS and OSS nodes directly.
-do_servers_not_mgs() {
+wait_nm_sync() {
+       local nodemap_name=$1
+       local key=$2
+       local proc_param="${nodemap_name}.${key}"
+       [ "$nodemap_name" == "active" ] && proc_param="active"
+
+       local is_active=$(do_facet mgs $LCTL get_param -n nodemap.active)
+       (( is_active == 0 )) && [ "$proc_param" != "active" ] && return
+
+       local max_retries=20
+       local is_sync
+       local out1=$(do_facet mgs $LCTL get_param nodemap.${proc_param})
+       local out2
        local mgs_ip=$(host_nids_address $mgs_HOST $NETTYPE)
-       for node in $(all_server_nodes); do
-               local node_ip=$(host_nids_address $node $NETTYPE)
-               [ $node_ip == $mgs_ip ] && continue
-               do_node $node_ip $*
+       local i
+
+       # wait up to 10 seconds for other servers to sync with mgs
+       for i in $(seq 1 10); do
+               for node in $(all_server_nodes); do
+                       local node_ip=$(host_nids_address $node $NETTYPE)
+
+                       is_sync=true
+                       [ $node_ip == $mgs_ip ] && continue
+
+                       out2=$(do_node $node_ip $LCTL get_param \
+                               nodemap.$proc_param 2>/dev/null)
+                       [ "$out1" != "$out2" ] && is_sync=false && break
+               done
+               $is_sync && break
+               sleep 1
        done
+       if ! $is_sync; then
+               echo MGS
+               echo $out1
+               echo OTHER
+               echo $out2
+               error "mgs and $nodemap_name ${key} mismatch, $i attempts"
+       fi
+       echo "waited $((i - 1)) seconds for sync"
 }
 
 create_fops_nodemaps() {
@@ -970,26 +1006,15 @@ create_fops_nodemaps() {
                do_facet mgs $LCTL nodemap_add c${i} || return 1
                do_facet mgs $LCTL nodemap_add_range    \
                        --name c${i} --range $client_nid || return 1
-               do_servers_not_mgs $LCTL set_param nodemap.add_nodemap=c${i} ||
-                       return 1
-               do_servers_not_mgs "$LCTL set_param " \
-                       "nodemap.add_nodemap_range='c${i} $client_nid'" ||
-                       return 1
                for map in ${FOPS_IDMAPS[i]}; do
                        do_facet mgs $LCTL nodemap_add_idmap --name c${i} \
                                --idtype uid --idmap ${map} || return 1
-                       do_servers_not_mgs "$LCTL set_param " \
-                               "nodemap.add_nodemap_idmap='c$i uid ${map}'" ||
-                               return 1
                        do_facet mgs $LCTL nodemap_add_idmap --name c${i} \
                                --idtype gid --idmap ${map} || return 1
-                       do_servers_not_mgs "$LCTL set_param " \
-                               " nodemap.add_nodemap_idmap='c$i gid ${map}'" ||
-                               return 1
                done
-               out1=$(do_facet mgs $LCTL get_param nodemap.c${i}.idmap)
-               out2=$(do_facet ost0 $LCTL get_param nodemap.c${i}.idmap)
-               [ "$out1" != "$out2" ] && error "mgs and oss maps mismatch"
+
+               wait_nm_sync c$i idmap
+
                i=$((i + 1))
        done
        return 0
@@ -1000,8 +1025,6 @@ delete_fops_nodemaps() {
        local client
        for client in $clients; do
                do_facet mgs $LCTL nodemap_del c${i} || return 1
-               do_servers_not_mgs $LCTL set_param nodemap.remove_nodemap=c$i ||
-                       return 1
                i=$((i + 1))
        done
        return 0
@@ -1028,8 +1051,9 @@ fops_test_setup() {
 
        do_facet mgs $LCTL nodemap_modify --name c0 --property admin --value 1
        do_facet mgs $LCTL nodemap_modify --name c0 --property trusted --value 1
-       do_servers_not_mgs $LCTL set_param nodemap.c0.admin_nodemap=1
-       do_servers_not_mgs $LCTL set_param nodemap.c0.trusted_nodemap=1
+
+       wait_nm_sync c0 admin_nodemap
+       wait_nm_sync c0 trusted_nodemap
 
        do_node ${clients_arr[0]} rm -rf $DIR/$tdir
        nm_test_mkdir
@@ -1039,12 +1063,13 @@ fops_test_setup() {
                --property admin --value $admin
        do_facet mgs $LCTL nodemap_modify --name c0 \
                --property trusted --value $trust
-       do_servers_not_mgs $LCTL set_param nodemap.c0.admin_nodemap=$admin
-       do_servers_not_mgs $LCTL set_param nodemap.c0.trusted_nodemap=$trust
 
        # flush MDT locks to make sure they are reacquired before test
        do_node ${clients_arr[0]} $LCTL set_param \
                ldlm.namespaces.$FSNAME-MDT*.lru_size=clear
+
+       wait_nm_sync c0 admin_nodemap
+       wait_nm_sync c0 trusted_nodemap
 }
 
 do_create_delete() {
@@ -1079,7 +1104,8 @@ do_fops_quota_test() {
        local qused_high=$((qused_orig + quota_fuzz))
        local qused_low=$((qused_orig - quota_fuzz))
        local testfile=$DIR/$tdir/$tfile
-       $run_u dd if=/dev/zero of=$testfile bs=1M count=1 >& /dev/null
+       $run_u dd if=/dev/zero of=$testfile bs=1M count=1 >& /dev/null ||
+               error "unable to write quota test file"
        sync; sync_all_data || true
 
        local qused_new=$(nodemap_check_quota "$run_u")
@@ -1087,8 +1113,8 @@ do_fops_quota_test() {
          $((qused_new)) -gt $((qused_high + 1024)) ] &&
                error "$qused_new != $qused_orig + 1M after write, " \
                      "fuzz is $quota_fuzz"
-       $run_u rm $testfile && d=1
-       $NODEMAP_TEST_QUOTA && wait_delete_completed_mds
+       $run_u rm $testfile || error "unable to remove quota test file"
+       wait_delete_completed_mds
 
        qused_new=$(nodemap_check_quota "$run_u")
        [ $((qused_new)) -lt $((qused_low)) \
@@ -1168,6 +1194,67 @@ get_cr_del_expected() {
        echo $FAILURE
 }
 
+test_fops_admin_cli_i=""
+test_fops_chmod_dir() {
+       local current_cli_i=$1
+       local perm_bits=$2
+       local dir_to_chmod=$3
+       local new_admin_cli_i=""
+
+       # do we need to set up a new admin client?
+       [ "$current_cli_i" == "0" ] && [ "$test_fops_admin_cli_i" != "1" ] &&
+               new_admin_cli_i=1
+       [ "$current_cli_i" != "0" ] && [ "$test_fops_admin_cli_i" != "0" ] &&
+               new_admin_cli_i=0
+
+       # if only one client, and non-admin, need to flip admin everytime
+       if [ "$num_clients" == "1" ]; then
+               test_fops_admin_val=$(do_facet mgs $LCTL get_param -n \
+                       nodemap.c0.admin_nodemap)
+               if [ "$test_fops_admin_val" != "1" ]; then
+                       do_facet mgs $LCTL nodemap_modify \
+                               --name c0 \
+                               --property admin \
+                               --value 1
+                       wait_nm_sync c0 admin_nodemap
+               fi
+       elif [ "$new_admin_cli_i" != "" ]; then
+               # restore admin val to old admin client
+               if [ "$test_fops_admin_cli_i" != "" ] &&
+                               [ "$test_fops_admin_val" != "1" ]; then
+                       do_facet mgs $LCTL nodemap_modify \
+                               --name c${test_fops_admin_cli_i} \
+                               --property admin \
+                               --value $test_fops_admin_val
+                       wait_nm_sync c${test_fops_admin_cli_i} admin_nodemap
+               fi
+
+               test_fops_admin_cli_i=$new_admin_cli_i
+               test_fops_admin_client=${clients_arr[$new_admin_cli_i]}
+               test_fops_admin_val=$(do_facet mgs $LCTL get_param -n \
+                       nodemap.c${new_admin_cli_i}.admin_nodemap)
+
+               if [ "$test_fops_admin_val" != "1" ]; then
+                       do_facet mgs $LCTL nodemap_modify \
+                               --name c${new_admin_cli_i} \
+                               --property admin \
+                               --value 1
+                       wait_nm_sync c${new_admin_cli_i} admin_nodemap
+               fi
+       fi
+
+       do_node $test_fops_admin_client chmod $perm_bits $DIR/$tdir || return 1
+
+       # remove admin for single client if originally non-admin
+       if [ "$num_clients" == "1" ] && [ "$test_fops_admin_val" != "1" ]; then
+               do_facet mgs $LCTL nodemap_modify --name c0 --property admin \
+                       --value 0
+               wait_nm_sync c0 admin_nodemap
+       fi
+
+       return 0
+}
+
 test_fops() {
        local mapmode="$1"
        local single_client="$2"
@@ -1194,8 +1281,6 @@ test_fops() {
                local cli_i=0
                for client in $clients; do
                        local u
-                       local admin=$(do_facet mgs $LCTL get_param -n \
-                                     nodemap.c$cli_i.admin_nodemap)
                        for u in ${client_user_list[$cli_i]}; do
                                local run_u="do_node $client \
                                             $RUNAS_CMD -u$u -g$u -G$u"
@@ -1203,41 +1288,15 @@ test_fops() {
                                        local mode=$(printf %03o $perm_bits)
                                        local key
                                        key="$mapmode:$user:c$cli_i:$u:$mode"
-                                       do_facet mgs $LCTL nodemap_modify \
-                                               --name c$cli_i            \
-                                               --property admin          \
-                                               --value 1
-                                       do_servers_not_mgs $LCTL set_param \
-                                               nodemap.c$cli_i.admin_nodemap=1
-                                       do_node $client chmod $mode $DIR/$tdir \
-                                               || error unable to chmod $key
-                                       do_facet mgs $LCTL nodemap_modify \
-                                               --name c$cli_i            \
-                                               --property admin          \
-                                               --value $admin
-                                       do_servers_not_mgs $LCTL set_param \
-                                           nodemap.c$cli_i.admin_nodemap=$admin
-
+                                       test_fops_chmod_dir $cli_i $mode \
+                                               $DIR/$tdir ||
+                                                       error cannot chmod $key
                                        do_create_delete "$run_u" "$key"
                                done
 
-                               # set test dir to 777 for quota test
-                               do_facet mgs $LCTL nodemap_modify \
-                                       --name c$cli_i            \
-                                       --property admin          \
-                                       --value 1
-                               do_servers_not_mgs $LCTL set_param \
-                                       nodemap.c$cli_i.admin_nodemap=1
-                               do_node $client chmod 777 $DIR/$tdir ||
-                                       error unable to chmod 777 $DIR/$tdir
-                               do_facet mgs $LCTL nodemap_modify \
-                                       --name c$cli_i            \
-                                       --property admin          \
-                                       --value $admin
-                               do_servers_not_mgs $LCTL set_param \
-                                   nodemap.c$cli_i.admin_nodemap=$admin
-
                                # check quota
+                               test_fops_chmod_dir $cli_i 777 $DIR/$tdir ||
+                                       error cannot chmod $key
                                do_fops_quota_test "$run_u"
                        done
 
@@ -1259,7 +1318,9 @@ nodemap_version_check () {
 
 nodemap_test_setup() {
        local rc
-       local active_nodemap=$1
+       local active_nodemap=1
+
+       [ "$1" == "0" ] && active_nodemap=0
 
        do_nodes $(comma_list $(all_mdts_nodes)) \
                $LCTL set_param mdt.*.identity_upcall=NONE
@@ -1269,20 +1330,14 @@ nodemap_test_setup() {
        rc=$?
        [[ $rc != 0 ]] && error "adding fops nodemaps failed $rc"
 
-       if [ "$active_nodemap" == "0" ]; then
-               do_facet mgs $LCTL set_param nodemap.active=0
-               do_servers_not_mgs $LCTL set_param nodemap.active=0
-               return
-       fi
+       do_facet mgs $LCTL nodemap_activate $active_nodemap
+       wait_nm_sync active
 
-       do_facet mgs $LCTL nodemap_activate 1
-       do_servers_not_mgs $LCTL set_param nodemap.active=1
        do_facet mgs $LCTL nodemap_modify --name default \
                --property admin --value 1
        do_facet mgs $LCTL nodemap_modify --name default \
                --property trusted --value 1
-       do_servers_not_mgs $LCTL set_param nodemap.default.admin_nodemap=1
-       do_servers_not_mgs $LCTL set_param nodemap.default.trusted_nodemap=1
+       wait_nm_sync default trusted_nodemap
 }
 
 nodemap_test_cleanup() {
@@ -1301,14 +1356,12 @@ nodemap_clients_admin_trusted() {
        for client in $clients; do
                do_facet mgs $LCTL nodemap_modify --name c0 \
                        --property admin --value $admin
-               do_servers_not_mgs $LCTL set_param \
-                       nodemap.c${i}.admin_nodemap=$admin
                do_facet mgs $LCTL nodemap_modify --name c0 \
                        --property trusted --value $tr
-               do_servers_not_mgs $LCTL set_param \
-                       nodemap.c${i}.trusted_nodemap=$tr
                i=$((i + 1))
        done
+       wait_nm_sync c$((i - 1)) admin_nodemap
+       wait_nm_sync c$((i - 1)) trusted_nodemap
 }
 
 test_16() {
@@ -1377,13 +1430,11 @@ test_21() {
                        --property admin --value 0
                do_facet mgs $LCTL nodemap_modify --name c${i} \
                        --property trusted --value $x
-               do_servers_not_mgs $LCTL set_param \
-                       nodemap.c${i}.admin_nodemap=0
-               do_servers_not_mgs $LCTL set_param \
-                       nodemap.c${i}.trusted_nodemap=$x
                x=0
                i=$((i + 1))
        done
+       wait_nm_sync c$((i - 1)) trusted_nodemap
+
        test_fops mapped_trusted_noadmin
        nodemap_test_cleanup
 }
@@ -1401,13 +1452,11 @@ test_22() {
                        --property admin --value 1
                do_facet mgs $LCTL nodemap_modify --name c${i} \
                        --property trusted --value $x
-               do_servers_not_mgs $LCTL set_param \
-                       nodemap.c${i}.admin_nodemap=1
-               do_servers_not_mgs $LCTL set_param \
-                       nodemap.c${i}.trusted_nodemap=$x
                x=0
                i=$((i + 1))
        done
+       wait_nm_sync c$((i - 1)) trusted_nodemap
+
        test_fops mapped_trusted_admin
        nodemap_test_cleanup
 }
@@ -1421,8 +1470,9 @@ nodemap_acl_test_setup() {
 
        do_facet mgs $LCTL nodemap_modify --name c0 --property admin --value 1
        do_facet mgs $LCTL nodemap_modify --name c0 --property trusted --value 1
-       do_servers_not_mgs $LCTL set_param nodemap.c0.admin_nodemap=1
-       do_servers_not_mgs $LCTL set_param nodemap.c0.trusted_nodemap=1
+
+       wait_nm_sync c0 admin_nodemap
+       wait_nm_sync c0 trusted_nodemap
 
        do_node ${clients_arr[0]} rm -rf $DIR/$tdir
        nm_test_mkdir
@@ -1433,9 +1483,8 @@ nodemap_acl_test_setup() {
                --property admin --value $admin
        do_facet mgs $LCTL nodemap_modify --name c0 \
                --property trusted --value $trust
-       do_servers_not_mgs $LCTL set_param nodemap.c0.admin_nodemap=$admin
-       do_servers_not_mgs $LCTL set_param nodemap.c0.trusted_nodemap=$trust
 
+       wait_nm_sync c0 trusted_nodemap
 }
 
 # returns 0 if the number of ACLs does not change on the second (mapped) client
@@ -1489,13 +1538,11 @@ test_23() {
 
        do_facet mgs $LCTL nodemap_modify --name c0 --property admin --value 1
        do_facet mgs $LCTL nodemap_modify --name c0 --property trusted --value 1
-       do_servers_not_mgs $LCTL set_param nodemap.c0.admin_nodemap=1
-       do_servers_not_mgs $LCTL set_param nodemap.c0.trusted_nodemap=1
 
        do_facet mgs $LCTL nodemap_modify --name c1 --property admin --value 0
        do_facet mgs $LCTL nodemap_modify --name c1 --property trusted --value 0
-       do_servers_not_mgs $LCTL set_param nodemap.c1.admin_nodemap=0
-       do_servers_not_mgs $LCTL set_param nodemap.c1.trusted_nodemap=0
+
+       wait_nm_sync c1 trusted_nodemap
 
        # setfacl on trusted cluster to unmapped user, verify it's not seen
        nodemap_acl_test $unmapped_fs ${clients_arr[0]} ${clients_arr[1]} ||
@@ -1516,8 +1563,8 @@ test_23() {
        # 2 mapped clusters
        do_facet mgs $LCTL nodemap_modify --name c0 --property admin --value 0
        do_facet mgs $LCTL nodemap_modify --name c0 --property trusted --value 0
-       do_servers_not_mgs $LCTL set_param nodemap.c0.admin_nodemap=0
-       do_servers_not_mgs $LCTL set_param nodemap.c0.trusted_nodemap=0
+
+       wait_nm_sync c0 trusted_nodemap
 
        # setfacl to mapped user on c1, also mapped to c0, verify it's seen
        nodemap_acl_test $mapped_c1 ${clients_arr[1]} ${clients_arr[0]} &&