Whamcloud - gitweb
LU-8837 mgc: move server-only code out of mgc_request.c
[fs/lustre-release.git] / lustre / mgc / mgc_request.c
index 4a98226..5863144 100644 (file)
 #define D_MGC D_CONFIG /*|D_WARNING*/
 
 #include <linux/module.h>
-#include <linux/kthread.h>
 #include <linux/random.h>
 
-#include <dt_object.h>
 #include <lprocfs_status.h>
 #include <lustre_dlm.h>
 #include <lustre_disk.h>
 #include <lustre_log.h>
-#include <lustre_nodemap.h>
 #include <lustre_swab.h>
 #include <obd_class.h>
-#include <lustre_barrier.h>
 
 #include "mgc_internal.h"
 
@@ -78,8 +74,10 @@ static int mgc_name2resid(char *name, int len, struct ldlm_res_id *res_id,
                 break;
        case MGS_CFG_T_RECOVER:
        case MGS_CFG_T_PARAMS:
+#ifdef HAVE_SERVER_SUPPORT
        case MGS_CFG_T_NODEMAP:
        case MGS_CFG_T_BARRIER:
+#endif
                resname = type;
                break;
         default:
@@ -151,11 +149,12 @@ static void config_log_put(struct config_llog_data *cld)
                spin_unlock(&config_list_lock);
 
                CDEBUG(D_MGC, "dropping config log %s\n", cld->cld_logname);
-
+#ifdef HAVE_SERVER_SUPPORT
                config_log_put(cld->cld_barrier);
+               config_log_put(cld->cld_nodemap);
+#endif
                config_log_put(cld->cld_recover);
                config_log_put(cld->cld_params);
-               config_log_put(cld->cld_nodemap);
                config_log_put(cld->cld_sptlrpc);
                if (cld_is_sptlrpc(cld)) {
                        cld->cld_stopping = 1;
@@ -256,22 +255,23 @@ struct config_llog_data *do_config_log_add(struct obd_device *obd,
        RETURN(cld);
 }
 
-static struct config_llog_data *config_recover_log_add(struct obd_device *obd,
-                                       char *fsname,
-                                       struct config_llog_instance *cfg,
-                                       struct super_block *sb)
+static struct config_llog_data *
+config_recover_log_add(struct obd_device *obd, char *fsname,
+                      struct config_llog_instance *cfg,
+                      struct super_block *sb)
 {
        struct config_llog_instance lcfg = *cfg;
        struct config_llog_data *cld;
        char logname[32];
 
+#ifdef HAVE_SERVER_SUPPORT
        if (IS_OST(s2lsi(sb)))
                return NULL;
 
        /* for osp-on-ost, see lustre_start_osp() */
        if (IS_MDT(s2lsi(sb)) && lcfg.cfg_instance)
                return NULL;
-
+#endif
        /* We have to use different llog for clients and MDTs for DNE,
         * where only clients are notified if one of DNE server restarts.
         */
@@ -321,8 +321,10 @@ config_log_add(struct obd_device *obd, char *logname,
        struct config_llog_data *cld = NULL;
        struct config_llog_data *sptlrpc_cld = NULL;
        struct config_llog_data *params_cld = NULL;
+#ifdef HAVE_SERVER_SUPPORT
        struct config_llog_data *nodemap_cld = NULL;
        struct config_llog_data *barrier_cld = NULL;
+#endif
        char seclogname[32];
        char *ptr;
        int rc;
@@ -349,12 +351,25 @@ config_log_add(struct obd_device *obd, char *logname,
                sptlrpc_cld = config_log_find_or_add(obd, seclogname, NULL,
                                                     MGS_CFG_T_SPTLRPC, cfg);
                if (IS_ERR(sptlrpc_cld)) {
-                       CERROR("%s: can't create sptlrpc log %s: rc = %ld\n",
-                              obd->obd_name, seclogname, PTR_ERR(sptlrpc_cld));
-                       RETURN(sptlrpc_cld);
+                       rc = PTR_ERR(sptlrpc_cld);
+                       CERROR("%s: can't create sptlrpc log %s: rc = %d\n",
+                              obd->obd_name, seclogname, rc);
+                       GOTO(out_err, rc);
                }
        }
 
+       if (cfg->cfg_sub_clds & CONFIG_SUB_PARAMS) {
+               params_cld = config_log_find_or_add(obd, PARAMS_FILENAME, sb,
+                                                   MGS_CFG_T_PARAMS, cfg);
+               if (IS_ERR(params_cld)) {
+                       rc = PTR_ERR(params_cld);
+                       CERROR("%s: can't create params log: rc = %d\n",
+                              obd->obd_name, rc);
+                       GOTO(out_sptlrpc, rc);
+               }
+       }
+
+#ifdef HAVE_SERVER_SUPPORT
        if (!IS_MGS(lsi) && cfg->cfg_sub_clds & CONFIG_SUB_NODEMAP) {
                nodemap_cld = config_log_find_or_add(obd, LUSTRE_NODEMAP_NAME,
                                                     NULL, MGS_CFG_T_NODEMAP,
@@ -363,18 +378,7 @@ config_log_add(struct obd_device *obd, char *logname,
                        rc = PTR_ERR(nodemap_cld);
                        CERROR("%s: cannot create nodemap log: rc = %d\n",
                               obd->obd_name, rc);
-                       GOTO(out_sptlrpc, rc);
-               }
-       }
-
-       if (cfg->cfg_sub_clds & CONFIG_SUB_PARAMS) {
-               params_cld = config_log_find_or_add(obd, PARAMS_FILENAME, sb,
-                                                   MGS_CFG_T_PARAMS, cfg);
-               if (IS_ERR(params_cld)) {
-                       rc = PTR_ERR(params_cld);
-                       CERROR("%s: can't create params log: rc = %d\n",
-                              obd->obd_name, rc);
-                       GOTO(out_nodemap, rc);
+                       GOTO(out_params, rc);
                }
        }
 
@@ -387,16 +391,20 @@ config_log_add(struct obd_device *obd, char *logname,
                        rc = PTR_ERR(barrier_cld);
                        CERROR("%s: can't create barrier log: rc = %d\n",
                               obd->obd_name, rc);
-                       GOTO(out_params, rc);
+                       GOTO(out_nodemap, rc);
                }
        }
-
+#endif
        cld = do_config_log_add(obd, logname, MGS_CFG_T_CONFIG, cfg, sb);
        if (IS_ERR(cld)) {
                rc = PTR_ERR(cld);
                CERROR("%s: can't create log: rc = %d\n",
                       obd->obd_name, rc);
-               GOTO(out_barrier, rc = PTR_ERR(cld));
+#ifdef HAVE_SERVER_SUPPORT
+               GOTO(out_barrier, rc);
+#else
+               GOTO(out_params, rc);
+#endif
        }
 
        LASSERT(lsi->lsi_lmd);
@@ -428,9 +436,11 @@ config_log_add(struct obd_device *obd, char *logname,
 
        if (!locked)
                mutex_lock(&cld->cld_lock);
-       cld->cld_params = params_cld;
+#ifdef HAVE_SERVER_SUPPORT
        cld->cld_barrier = barrier_cld;
        cld->cld_nodemap = nodemap_cld;
+#endif
+       cld->cld_params = params_cld;
        cld->cld_sptlrpc = sptlrpc_cld;
        mutex_unlock(&cld->cld_lock);
 
@@ -438,15 +448,17 @@ config_log_add(struct obd_device *obd, char *logname,
 
 out_cld:
        config_log_put(cld);
+#ifdef HAVE_SERVER_SUPPORT
 out_barrier:
        config_log_put(barrier_cld);
-out_params:
-       config_log_put(params_cld);
 out_nodemap:
        config_log_put(nodemap_cld);
+#endif
+out_params:
+       config_log_put(params_cld);
 out_sptlrpc:
        config_log_put(sptlrpc_cld);
-
+out_err:
        return ERR_PTR(rc);
 }
 
@@ -737,134 +749,6 @@ static void mgc_requeue_add(struct config_llog_data *cld)
 }
 
 /********************** class fns **********************/
-#ifdef HAVE_SERVER_SUPPORT
-static int mgc_local_llog_init(const struct lu_env *env,
-                              struct obd_device *obd,
-                              struct obd_device *disk)
-{
-       struct llog_ctxt        *ctxt;
-       int                      rc;
-
-       ENTRY;
-
-       rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, disk,
-                       &llog_osd_ops);
-       if (rc)
-               RETURN(rc);
-
-       ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
-       LASSERT(ctxt);
-       ctxt->loc_dir = obd->u.cli.cl_mgc_configs_dir;
-       llog_ctxt_put(ctxt);
-
-       RETURN(0);
-}
-
-static int mgc_local_llog_fini(const struct lu_env *env,
-                              struct obd_device *obd)
-{
-       struct llog_ctxt *ctxt;
-
-       ENTRY;
-
-       ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
-       llog_cleanup(env, ctxt);
-
-       RETURN(0);
-}
-
-static int mgc_fs_setup(const struct lu_env *env, struct obd_device *obd,
-                       struct super_block *sb)
-{
-       struct lustre_sb_info   *lsi = s2lsi(sb);
-       struct client_obd       *cli = &obd->u.cli;
-       struct lu_fid            rfid, fid;
-       struct dt_object        *root, *dto;
-       int                      rc = 0;
-
-       ENTRY;
-
-       LASSERT(lsi);
-       LASSERT(lsi->lsi_dt_dev);
-
-       /* The mgc fs exclusion mutex. Only one fs can be setup at a time. */
-       mutex_lock(&cli->cl_mgc_mutex);
-
-       /* Setup the configs dir */
-       fid.f_seq = FID_SEQ_LOCAL_NAME;
-       fid.f_oid = 1;
-       fid.f_ver = 0;
-       rc = local_oid_storage_init(env, lsi->lsi_dt_dev, &fid,
-                                   &cli->cl_mgc_los);
-       if (rc)
-               GOTO(out_mutex, rc);
-
-       rc = dt_root_get(env, lsi->lsi_dt_dev, &rfid);
-       if (rc)
-               GOTO(out_los, rc);
-
-       root = dt_locate_at(env, lsi->lsi_dt_dev, &rfid,
-                           &cli->cl_mgc_los->los_dev->dd_lu_dev, NULL);
-       if (unlikely(IS_ERR(root)))
-               GOTO(out_los, rc = PTR_ERR(root));
-
-       dto = local_file_find_or_create(env, cli->cl_mgc_los, root,
-                                       MOUNT_CONFIGS_DIR,
-                                       S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO);
-       dt_object_put_nocache(env, root);
-       if (IS_ERR(dto))
-               GOTO(out_los, rc = PTR_ERR(dto));
-
-       cli->cl_mgc_configs_dir = dto;
-
-       LASSERT(lsi->lsi_osd_exp->exp_obd->obd_lvfs_ctxt.dt);
-       rc = mgc_local_llog_init(env, obd, lsi->lsi_osd_exp->exp_obd);
-       if (rc)
-               GOTO(out_llog, rc);
-
-       /* We take an obd ref to insure that we can't get to mgc_cleanup
-        * without calling mgc_fs_cleanup first. */
-       class_incref(obd, "mgc_fs", obd);
-
-       /* We keep the cl_mgc_sem until mgc_fs_cleanup */
-       EXIT;
-out_llog:
-       if (rc) {
-               dt_object_put(env, cli->cl_mgc_configs_dir);
-               cli->cl_mgc_configs_dir = NULL;
-       }
-out_los:
-       if (rc < 0) {
-               local_oid_storage_fini(env, cli->cl_mgc_los);
-out_mutex:
-               cli->cl_mgc_los = NULL;
-               mutex_unlock(&cli->cl_mgc_mutex);
-       }
-       return rc;
-}
-
-static int mgc_fs_cleanup(const struct lu_env *env, struct obd_device *obd)
-{
-       struct client_obd       *cli = &obd->u.cli;
-       ENTRY;
-
-       LASSERT(cli->cl_mgc_los != NULL);
-
-       mgc_local_llog_fini(env, obd);
-
-       dt_object_put_nocache(env, cli->cl_mgc_configs_dir);
-       cli->cl_mgc_configs_dir = NULL;
-
-       local_oid_storage_fini(env, cli->cl_mgc_los);
-       cli->cl_mgc_los = NULL;
-
-       class_decref(obd, "mgc_fs", obd);
-       mutex_unlock(&cli->cl_mgc_mutex);
-
-       RETURN(0);
-}
-#endif /* HAVE_SERVER_SUPPORT */
-
 static int mgc_llog_init(const struct lu_env *env, struct obd_device *obd)
 {
        struct llog_ctxt        *ctxt;
@@ -1067,15 +951,6 @@ static int mgc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         RETURN(rc);
 }
 
-/* Not sure where this should go... */
-/* This is the timeout value for MGS_CONNECT request plus a ping interval, such
- * that we can have a chance to try the secondary MGS if any. */
-#define  MGC_ENQUEUE_LIMIT (INITIAL_CONNECT_TIMEOUT + (AT_OFF ? 0 : at_min) \
-                               + PING_INTERVAL)
-#define  MGC_TARGET_REG_LIMIT 10
-#define  MGC_TARGET_REG_LIMIT_MAX RECONNECT_DELAY_MAX
-#define  MGC_SEND_PARAM_LIMIT 10
-
 /* Take a config lock so we can get cancel notifications */
 static int mgc_enqueue(struct obd_export *exp, enum ldlm_type type,
                       union ldlm_policy_data *policy, enum ldlm_mode mode,
@@ -1149,58 +1024,6 @@ static void mgc_notify_active(struct obd_device *unused)
        /* TODO: Help the MGS rebuild nidtbl. -jay */
 }
 
-#ifdef HAVE_SERVER_SUPPORT
-/* Send target_reg message to MGS */
-static int mgc_target_register(struct obd_export *exp,
-                              struct mgs_target_info *mti)
-{
-       struct ptlrpc_request *req;
-       struct mgs_target_info *req_mti, *rep_mti;
-       int rc;
-       ENTRY;
-
-       req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
-                                       &RQF_MGS_TARGET_REG, LUSTRE_MGS_VERSION,
-                                       MGS_TARGET_REG);
-       if (req == NULL)
-               RETURN(-ENOMEM);
-
-       req_mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
-       if (!req_mti) {
-               ptlrpc_req_finished(req);
-               RETURN(-ENOMEM);
-       }
-
-       memcpy(req_mti, mti, sizeof(*req_mti));
-       ptlrpc_request_set_replen(req);
-       CDEBUG(D_MGC, "register %s\n", mti->mti_svname);
-       /* Limit how long we will wait for the enqueue to complete */
-       req->rq_delay_limit = MGC_TARGET_REG_LIMIT;
-
-       /* if the target needs to regenerate the config log in MGS, it's better
-        * to use some longer limit to let MGC have time to change connection to
-        * another MGS (or try again with the same MGS) for the target (server)
-        * will fail and exit if the request expired due to delay limit. */
-       if (mti->mti_flags & (LDD_F_UPDATE | LDD_F_NEED_INDEX))
-               req->rq_delay_limit = MGC_TARGET_REG_LIMIT_MAX;
-
-        rc = ptlrpc_queue_wait(req);
-       if (ptlrpc_client_replied(req)) {
-               rep_mti = req_capsule_server_get(&req->rq_pill,
-                                                &RMF_MGS_TARGET_INFO);
-               if (rep_mti)
-                       memcpy(mti, rep_mti, sizeof(*rep_mti));
-       }
-       if (!rc) {
-               CDEBUG(D_MGC, "register %s got index = %d\n",
-                      mti->mti_svname, mti->mti_stripe_index);
-        }
-        ptlrpc_req_finished(req);
-
-        RETURN(rc);
-}
-#endif /* HAVE_SERVER_SUPPORT */
-
 static int mgc_set_info_async(const struct lu_env *env, struct obd_export *exp,
                              u32 keylen, void *key,
                              u32 vallen, void *val,
@@ -1231,35 +1054,6 @@ static int mgc_set_info_async(const struct lu_env *env, struct obd_export *exp,
                RETURN(0);
        }
 
-#ifdef HAVE_SERVER_SUPPORT
-       /* FIXME move this to mgc_process_config */
-       if (KEY_IS(KEY_REGISTER_TARGET)) {
-               struct mgs_target_info *mti;
-
-               if (vallen != sizeof(struct mgs_target_info))
-                       RETURN(-EINVAL);
-               mti = (struct mgs_target_info *)val;
-               CDEBUG(D_MGC, "register_target %s %#x\n",
-                      mti->mti_svname, mti->mti_flags);
-               rc =  mgc_target_register(exp, mti);
-               RETURN(rc);
-       }
-       if (KEY_IS(KEY_SET_FS)) {
-               struct super_block *sb = (struct super_block *)val;
-
-               if (vallen != sizeof(struct super_block))
-                       RETURN(-EINVAL);
-
-               rc = mgc_fs_setup(env, exp->exp_obd, sb);
-               RETURN(rc);
-       }
-       if (KEY_IS(KEY_CLEAR_FS)) {
-               if (vallen != 0)
-                       RETURN(-EINVAL);
-               rc = mgc_fs_cleanup(env, exp->exp_obd);
-               RETURN(rc);
-       }
-#endif
         if (KEY_IS(KEY_MGSSEC)) {
                 struct client_obd     *cli = &exp->exp_obd->u.cli;
                 struct sptlrpc_flavor  flvr;
@@ -1304,7 +1098,10 @@ static int mgc_set_info_async(const struct lu_env *env, struct obd_export *exp,
                 RETURN(rc);
         }
 
-        RETURN(rc);
+#ifdef HAVE_SERVER_SUPPORT
+       rc = mgc_set_info_async_server(env, exp, keylen, key, vallen, val, set);
+#endif
+       RETURN(rc);
 }
 
 static int mgc_get_info(const struct lu_env *env, struct obd_export *exp,
@@ -1367,11 +1164,6 @@ static int mgc_import_event(struct obd_device *obd,
         RETURN(rc);
 }
 
-enum {
-       CONFIG_READ_NRPAGES_INIT = 1 << (20 - PAGE_SHIFT),
-        CONFIG_READ_NRPAGES      = 4
-};
-
 static int mgc_apply_recover_logs(struct obd_device *mgc,
                                  struct config_llog_data *cld,
                                  __u64 max_version,
@@ -1614,34 +1406,23 @@ fail:;
  * by the MGS. A CONFIG_READ RPC is going to send to fetch recovery or
  * nodemap logs.
  */
-static int mgc_process_recover_nodemap_log(struct obd_device *obd,
+static int mgc_process_recover_log(struct obd_device *obd,
                                           struct config_llog_data *cld)
 {
-       struct ptlrpc_connection *mgc_conn;
        struct ptlrpc_request *req = NULL;
        struct config_llog_instance *cfg = &cld->cld_cfg;
        struct mgs_config_body *body;
        struct mgs_config_res *res;
-       struct nodemap_config *new_config = NULL;
-       struct lu_nodemap *recent_nodemap = NULL;
        struct ptlrpc_bulk_desc *desc;
        struct page **pages = NULL;
-       __u64 config_read_offset = 0;
-       __u8 nodemap_cur_pass = 0;
        int nrpages = 0;
        bool eof = true;
        bool mne_swab = false;
        int i;
        int ealen;
        int rc;
-       ENTRY;
 
-       mgc_conn = class_exp2cliimp(cld->cld_mgcexp)->imp_connection;
-
-       /* don't need to get local config */
-       if (cld_is_nodemap(cld) &&
-           LNetIsPeerLocal(&mgc_conn->c_peer.nid))
-               GOTO(out, rc = 0);
+       ENTRY;
 
        /* allocate buffer for bulk transfer.
         * if this is the first time for this mgs to read logs,
@@ -1650,7 +1431,7 @@ static int mgc_process_recover_nodemap_log(struct obd_device *obd,
         * small and CONFIG_READ_NRPAGES will be used.
         */
        nrpages = CONFIG_READ_NRPAGES;
-       if (cfg->cfg_last_idx == 0 || cld_is_nodemap(cld))
+       if (cfg->cfg_last_idx == 0)
                nrpages = CONFIG_READ_NRPAGES_INIT;
 
        OBD_ALLOC_PTR_ARRAY_LARGE(pages, nrpages);
@@ -1664,17 +1445,7 @@ static int mgc_process_recover_nodemap_log(struct obd_device *obd,
        }
 
 again:
-#ifdef HAVE_SERVER_SUPPORT
-       if (cld_is_nodemap(cld) && config_read_offset == 0) {
-               new_config = nodemap_config_alloc();
-               if (IS_ERR(new_config)) {
-                       rc = PTR_ERR(new_config);
-                       new_config = NULL;
-                       GOTO(out, rc);
-               }
-       }
-#endif
-       LASSERT(cld_is_recover(cld) || cld_is_nodemap(cld));
+       LASSERT(cld_is_recover(cld));
        LASSERT(mutex_is_locked(&cld->cld_lock));
        req = ptlrpc_request_alloc(class_exp2cliimp(cld->cld_mgcexp),
                                   &RQF_MGS_CONFIG_READ);
@@ -1692,14 +1463,10 @@ again:
        if (strlcpy(body->mcb_name, cld->cld_logname, sizeof(body->mcb_name))
            >= sizeof(body->mcb_name))
                GOTO(out, rc = -E2BIG);
-       if (cld_is_nodemap(cld))
-               body->mcb_offset = config_read_offset;
-       else
-               body->mcb_offset = cfg->cfg_last_idx + 1;
+       body->mcb_offset = cfg->cfg_last_idx + 1;
        body->mcb_type   = cld->cld_type;
        body->mcb_bits   = PAGE_SHIFT;
        body->mcb_units  = nrpages;
-       body->mcb_nm_cur_pass = nodemap_cur_pass;
 
        /* allocate bulk transfer descriptor */
        desc = ptlrpc_prep_bulk_imp(req, nrpages, 1,
@@ -1722,23 +1489,17 @@ again:
        if (!res)
                GOTO(out, rc = -EPROTO);
 
-       if (cld_is_nodemap(cld)) {
-               config_read_offset = res->mcr_offset;
-               eof = config_read_offset == II_END_OFF;
-               nodemap_cur_pass = res->mcr_nm_cur_pass;
-       } else {
-               if (res->mcr_size < res->mcr_offset)
-                       GOTO(out, rc = -EINVAL);
+       if (res->mcr_size < res->mcr_offset)
+               GOTO(out, rc = -EINVAL);
 
-               /* always update the index even though it might have errors with
-                * handling the recover logs
-                */
-               cfg->cfg_last_idx = res->mcr_offset;
-               eof = res->mcr_offset == res->mcr_size;
+       /* always update the index even though it might have errors with
+        * handling the recover logs
+        */
+       cfg->cfg_last_idx = res->mcr_offset;
+       eof = res->mcr_offset == res->mcr_size;
 
-               CDEBUG(D_INFO, "Latest version %lld, more %d.\n",
-                      res->mcr_offset, eof == false);
-       }
+       CDEBUG(D_INFO, "Latest version %lld, more %d.\n",
+              res->mcr_offset, eof == false);
 
        ealen = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, 0);
        if (ealen < 0)
@@ -1748,13 +1509,6 @@ again:
                GOTO(out, rc = -EINVAL);
 
        if (ealen == 0) { /* no logs transferred */
-#ifdef HAVE_SERVER_SUPPORT
-               /* config changed since first read RPC */
-               if (cld_is_nodemap(cld) && config_read_offset == 0) {
-                       CDEBUG(D_INFO, "nodemap config changed in transit, retrying\n");
-                       GOTO(out, rc = -EAGAIN);
-               }
-#endif
                if (!eof)
                        rc = -EINVAL;
                GOTO(out, rc);
@@ -1762,34 +1516,18 @@ again:
 
        mne_swab = req_capsule_rep_need_swab(&req->rq_pill);
 
-       /* When a nodemap config is received, we build a new nodemap config,
-        * with new nodemap structs. We keep track of the most recently added
-        * nodemap since the config is read ordered by nodemap_id, and so it
-        * is likely that the next record will be related. Because access to
-        * the nodemaps is single threaded until the nodemap_config is active,
-        * we don't need to reference count with recent_nodemap, though
-        * recent_nodemap should be set to NULL when the nodemap_config
-        * is either destroyed or set active.
-        */
        for (i = 0; i < nrpages && ealen > 0; i++) {
                int rc2;
-               union lu_page   *ptr;
+               union lu_page *ptr;
 
                ptr = kmap(pages[i]);
-               if (cld_is_nodemap(cld))
-                       rc2 = nodemap_process_idx_pages(new_config, ptr,
-                                                      &recent_nodemap);
-               else
-                       rc2 = mgc_apply_recover_logs(obd, cld, res->mcr_offset,
-                                                    ptr,
-                                                    min_t(int, ealen,
-                                                          PAGE_SIZE),
-                                                    mne_swab);
+               rc2 = mgc_apply_recover_logs(obd, cld, res->mcr_offset, ptr,
+                                            min_t(int, ealen, PAGE_SIZE),
+                                            mne_swab);
                kunmap(pages[i]);
                if (rc2 < 0) {
-                       CWARN("%s: error processing %s log %s: rc = %d\n",
+                       CWARN("%s: error processing %s log recovery: rc = %d\n",
                              obd->obd_name,
-                             cld_is_nodemap(cld) ? "nodemap" : "recovery",
                              cld->cld_logname,
                              rc2);
                        GOTO(out, rc = rc2);
@@ -1807,16 +1545,6 @@ out:
        if (rc == 0 && !eof)
                goto again;
 
-#ifdef HAVE_SERVER_SUPPORT
-       if (new_config != NULL) {
-               /* recent_nodemap cannot be used after set_active/dealloc */
-               if (rc == 0)
-                       nodemap_config_set_active_mgc(new_config);
-               else
-                       nodemap_config_dealloc(new_config);
-       }
-#endif
-
        if (pages) {
                for (i = 0; i < nrpages; i++) {
                        if (pages[i] == NULL)
@@ -1828,141 +1556,6 @@ out:
        return rc;
 }
 
-#ifdef HAVE_SERVER_SUPPORT
-static int mgc_barrier_glimpse_ast(struct ldlm_lock *lock, void *data)
-{
-       struct config_llog_data *cld = lock->l_ast_data;
-       int rc;
-       ENTRY;
-
-       if (cld->cld_stopping)
-               RETURN(-ENODEV);
-
-       rc = barrier_handler(s2lsi(cld->cld_cfg.cfg_sb)->lsi_dt_dev,
-                            (struct ptlrpc_request *)data);
-
-       RETURN(rc);
-}
-
-/* Copy a remote log locally */
-static int mgc_llog_local_copy(const struct lu_env *env,
-                              struct obd_device *obd,
-                              struct llog_ctxt *rctxt,
-                              struct llog_ctxt *lctxt, char *logname)
-{
-       char    *temp_log;
-       int      rc;
-
-       ENTRY;
-
-       /*
-        * - copy it to backup using llog_backup()
-        * - copy remote llog to logname using llog_backup()
-        * - if failed then move bakup to logname again
-        */
-
-       OBD_ALLOC(temp_log, strlen(logname) + 2);
-       if (!temp_log)
-               RETURN(-ENOMEM);
-       sprintf(temp_log, "%sT", logname);
-
-       /* make a copy of local llog at first */
-       rc = llog_backup(env, obd, lctxt, lctxt, logname, temp_log);
-       if (rc < 0 && rc != -ENOENT)
-               GOTO(out, rc);
-       /* copy remote llog to the local copy */
-       rc = llog_backup(env, obd, rctxt, lctxt, logname, logname);
-       if (rc == -ENOENT) {
-               /* no remote llog, delete local one too */
-               llog_erase(env, lctxt, NULL, logname);
-       } else if (rc < 0) {
-               /* error during backup, get local one back from the copy */
-               llog_backup(env, obd, lctxt, lctxt, temp_log, logname);
-out:
-               CERROR("%s: failed to copy remote log %s: rc = %d\n",
-                      obd->obd_name, logname, rc);
-       }
-       llog_erase(env, lctxt, NULL, temp_log);
-       OBD_FREE(temp_log, strlen(logname) + 2);
-       return rc;
-}
-
-static int
-mgc_process_server_cfg_log(struct lu_env *env, struct llog_ctxt **ctxt,
-                          struct lustre_sb_info *lsi, struct obd_device *mgc,
-                          struct config_llog_data *cld, int local_only)
-{
-       struct llog_ctxt *lctxt = llog_get_context(mgc, LLOG_CONFIG_ORIG_CTXT);
-       struct client_obd *cli = &mgc->u.cli;
-       int rc = 0;
-
-       /* Copy the setup log locally if we can. Don't mess around if we're
-        * running an MGS though (logs are already local). */
-       if (lctxt && lsi && IS_SERVER(lsi) && !IS_MGS(lsi) &&
-           cli->cl_mgc_configs_dir != NULL &&
-           lu2dt_dev(cli->cl_mgc_configs_dir->do_lu.lo_dev) ==
-           lsi->lsi_dt_dev) {
-               if (!local_only && !lsi->lsi_dt_dev->dd_rdonly) {
-                       /* Only try to copy log if we have the lock. */
-                       CDEBUG(D_INFO, "%s: copy local log %s\n",
-                              mgc->obd_name, cld->cld_logname);
-
-                       rc = mgc_llog_local_copy(env, mgc, *ctxt, lctxt,
-                                                cld->cld_logname);
-                       if (!rc)
-                               lsi->lsi_flags &= ~LDD_F_NO_LOCAL_LOGS;
-               }
-               if (local_only || rc) {
-                       if (unlikely(lsi->lsi_flags & LDD_F_NO_LOCAL_LOGS) ||
-                           rc) {
-                               CWARN("%s: local log %s are not valid and/or remote logs are not accessbile rc = %d\n",
-                                     mgc->obd_name, cld->cld_logname, rc);
-                               GOTO(out_pop, rc = -EIO);
-                       }
-
-                       if (strcmp(cld->cld_logname, PARAMS_FILENAME) != 0 &&
-                           llog_is_empty(env, lctxt, cld->cld_logname)) {
-                               LCONSOLE_ERROR_MSG(0x13a, "Failed to get MGS log %s and no local copy.\n",
-                                                  cld->cld_logname);
-                               GOTO(out_pop, rc = -ENOENT);
-                       }
-                       CDEBUG(D_MGC, "%s: Failed to get MGS log %s, using local copy for now, will try to update later.\n",
-                              mgc->obd_name, cld->cld_logname);
-                       rc = 0;
-               }
-               /* Now, whether we copied or not, start using the local llog.
-                * If we failed to copy, we'll start using whatever the old
-                * log has. */
-               llog_ctxt_put(*ctxt);
-               *ctxt = lctxt;
-               lctxt = NULL;
-       } else if (local_only) { /* no local log at client side */
-               GOTO(out_pop, rc = -EIO);
-       }
-
-       rc = -EAGAIN;
-       if (lsi && IS_SERVER(lsi) && !IS_MGS(lsi) &&
-           lsi->lsi_dt_dev->dd_rdonly) {
-               struct llog_ctxt *rctxt;
-
-               /* Under readonly mode, we may have no local copy or local
-                * copy is incomplete, so try to use remote llog firstly. */
-               rctxt = llog_get_context(mgc, LLOG_CONFIG_REPL_CTXT);
-               LASSERT(rctxt);
-
-               rc = class_config_parse_llog(env, rctxt, cld->cld_logname,
-                                            &cld->cld_cfg);
-               llog_ctxt_put(rctxt);
-       }
-out_pop:
-       if (lctxt)
-               __llog_ctxt_put(env, lctxt);
-       return rc;
-}
-#else /* !HAVE_SERVER_SUPPORT */
-#define mgc_barrier_glimpse_ast        NULL
-#endif /* HAVE_SERVER_SUPPORT */
-
 /* local_only means it cannot get remote llogs */
 static int mgc_process_cfg_log(struct obd_device *mgc,
                               struct config_llog_data *cld, int local_only)
@@ -2146,15 +1739,18 @@ restart:
                }
        }
 
-       if (cld_is_recover(cld) || cld_is_nodemap(cld)) {
-               if (!rcl)
-                       rc = mgc_process_recover_nodemap_log(mgc, cld);
-               else if (cld_is_nodemap(cld))
+       if (cld_is_recover(cld) && !rcl)
+               rc = mgc_process_recover_log(mgc, cld);
+#ifdef HAVE_SERVER_SUPPORT
+       else if (cld_is_nodemap(cld)) {
+               if (rcl)
                        rc = rcl;
-
-       } else if (!cld_is_barrier(cld)) {
-               rc = mgc_process_cfg_log(mgc, cld, rcl != 0);
+               else
+                       rc = mgc_process_nodemap_log(mgc, cld);
        }
+#endif
+       else if (!cld_is_barrier(cld))
+               rc = mgc_process_cfg_log(mgc, cld, rcl != 0);
 
        CDEBUG(D_MGC, "%s: configuration from log '%s' %sed (%d).\n",
               mgc->obd_name, cld->cld_logname, rc ? "fail" : "succeed", rc);
@@ -2181,56 +1777,41 @@ restart:
 }
 
 
-/** Called from lustre_process_log.
+/* Called from lustre_process_log.
  * LCFG_LOG_START gets the config log from the MGS, processes it to start
  * any services, and adds it to the list logs to watch (follow).
  */
 static int mgc_process_config(struct obd_device *obd, size_t len, void *buf)
 {
-        struct lustre_cfg *lcfg = buf;
-        struct config_llog_instance *cfg = NULL;
-        char *logname;
-        int rc = 0;
-        ENTRY;
+       struct lustre_cfg *lcfg = buf;
+       struct config_llog_instance *cfg = NULL;
+       char *logname;
+       int rc;
 
-        switch(lcfg->lcfg_command) {
+       ENTRY;
 #ifdef HAVE_SERVER_SUPPORT
-       case LCFG_LOV_ADD_OBD: {
-               /* Overloading this cfg command: register a new target */
-               struct mgs_target_info *mti;
-
-               if (LUSTRE_CFG_BUFLEN(lcfg, 1) !=
-                   sizeof(struct mgs_target_info))
-                       GOTO(out, rc = -EINVAL);
-
-               mti = lustre_cfg_buf(lcfg, 1);
-               CDEBUG(D_MGC, "add_target %s %#x\n",
-                      mti->mti_svname, mti->mti_flags);
-               rc = mgc_target_register(obd->u.cli.cl_mgc_mgsexp, mti);
+       rc = mgc_process_config_server(obd, len, buf);
+       if (rc != -ENOENT)
+               RETURN(rc);
+#endif
+
+       switch (lcfg->lcfg_command) {
+       case LCFG_SPTLRPC_CONF: {
+               rc = sptlrpc_process_config(lcfg);
                break;
        }
-       case LCFG_LOV_DEL_OBD:
-               /* Unregister has no meaning at the moment. */
-               CERROR("lov_del_obd unimplemented\n");
-               rc = -EINVAL;
-               break;
-#endif
-        case LCFG_SPTLRPC_CONF: {
-                rc = sptlrpc_process_config(lcfg);
-                break;
-        }
-        case LCFG_LOG_START: {
-                struct config_llog_data *cld;
-                struct super_block *sb;
+       case LCFG_LOG_START: {
+               struct config_llog_data *cld;
+               struct super_block *sb;
 
-                logname = lustre_cfg_string(lcfg, 1);
-                cfg = (struct config_llog_instance *)lustre_cfg_buf(lcfg, 2);
-                sb = *(struct super_block **)lustre_cfg_buf(lcfg, 3);
+               logname = lustre_cfg_string(lcfg, 1);
+               cfg = (struct config_llog_instance *)lustre_cfg_buf(lcfg, 2);
+               sb = *(struct super_block **)lustre_cfg_buf(lcfg, 3);
 
-                CDEBUG(D_MGC, "parse_log %s from %d\n", logname,
-                       cfg->cfg_last_idx);
+               CDEBUG(D_MGC, "parse_log %s from %d\n", logname,
+                      cfg->cfg_last_idx);
 
-                /* We're only called through here on the initial mount */
+               /* We're only called through here on the initial mount */
                cld = config_log_add(obd, logname, cfg, sb);
                if (IS_ERR(cld)) {
                        rc = PTR_ERR(cld);
@@ -2259,8 +1840,8 @@ static int mgc_process_config(struct obd_device *obd, size_t len, void *buf)
                if (rc == 0 && cld->cld_params != NULL) {
                        rc = mgc_process_log(obd, cld->cld_params);
                        if (rc == -ENOENT) {
-                               CDEBUG(D_MGC, "There is no params "
-                                             "config file yet\n");
+                               CDEBUG(D_MGC,
+                                      "There is no params config file yet\n");
                                rc = 0;
                        }
                        /* params log is optional */
@@ -2269,25 +1850,23 @@ static int mgc_process_config(struct obd_device *obd, size_t len, void *buf)
                                       obd->obd_name, rc);
                }
 
-                break;
-        }
-        case LCFG_LOG_END: {
-                logname = lustre_cfg_string(lcfg, 1);
+               break;
+       }
+       case LCFG_LOG_END: {
+               logname = lustre_cfg_string(lcfg, 1);
 
-                if (lcfg->lcfg_bufcount >= 2)
-                        cfg = (struct config_llog_instance *)lustre_cfg_buf(
-                                lcfg, 2);
-                rc = config_log_end(logname, cfg);
-                break;
-        }
-        default: {
-                CERROR("Unknown command: %d\n", lcfg->lcfg_command);
-                GOTO(out, rc = -EINVAL);
+               if (lcfg->lcfg_bufcount >= 2)
+                       cfg = (struct config_llog_instance *)lustre_cfg_buf(lcfg, 2);
+               rc = config_log_end(logname, cfg);
+               break;
+       }
+       default:
+               CERROR("Unknown command: %d\n", lcfg->lcfg_command);
+               rc = -EINVAL;
+               break;
+       }
 
-        }
-        }
-out:
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static const struct obd_ops mgc_obd_ops = {