Whamcloud - gitweb
LU-17250 mgs: generate a new MDT configuration by copy
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mgs/mgs_llog.c
32  *
33  * Lustre Management Server (mgs) config llog creation
34  *
35  * Author: Nathan Rutman <nathan@clusterfs.com>
36  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
37  * Author: Mikhail Pershin <tappro@whamcloud.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_MGS
41 #define D_MGS D_CONFIG
42
43 #include <obd.h>
44 #include <uapi/linux/lustre/lustre_ioctl.h>
45 #include <uapi/linux/lustre/lustre_param.h>
46 #include <lustre_sec.h>
47 #include <lustre_quota.h>
48 #include <lustre_sec.h>
49
50 #include "mgs_internal.h"
51
52 /********************** Class functions ********************/
53
54 /**
55  * Find all logs in CONFIG directory and link then into list.
56  *
57  * \param[in] env       pointer to the thread context
58  * \param[in] mgs       pointer to the mgs device
59  * \param[out] log_list the list to hold the found llog name entry
60  *
61  * \retval              0 for success
62  * \retval              negative error number on failure
63  **/
64 int class_dentry_readdir(const struct lu_env *env, struct mgs_device *mgs,
65                          struct list_head *log_list)
66 {
67         struct dt_object *dir = mgs->mgs_configs_dir;
68         const struct dt_it_ops *iops;
69         struct dt_it *it;
70         struct mgs_direntry *de;
71         char *key;
72         int rc, key_sz;
73
74         INIT_LIST_HEAD(log_list);
75
76         LASSERT(dir);
77         LASSERT(dir->do_index_ops);
78
79         iops = &dir->do_index_ops->dio_it;
80         it = iops->init(env, dir, LUDA_64BITHASH);
81         if (IS_ERR(it))
82                 RETURN(PTR_ERR(it));
83
84         rc = iops->load(env, it, 0);
85         if (rc <= 0)
86                 GOTO(fini, rc = 0);
87
88         /* main cycle */
89         do {
90                 key = (void *)iops->key(env, it);
91                 if (IS_ERR(key)) {
92                         CERROR("%s: key failed when listing %s: rc = %d\n",
93                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
94                                (int) PTR_ERR(key));
95                         goto next;
96                 }
97                 key_sz = iops->key_size(env, it);
98                 LASSERT(key_sz > 0);
99
100                 /* filter out "." and ".." entries */
101                 if (key[0] == '.') {
102                         if (key_sz == 1)
103                                 goto next;
104                         if (key_sz == 2 && key[1] == '.')
105                                 goto next;
106                 }
107
108                 /* filter out backup files */
109                 if (lu_name_is_backup_file(key, key_sz, NULL)) {
110                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
111                                key_sz, key);
112                         goto next;
113                 }
114
115                 de = mgs_direntry_alloc(key_sz + 1);
116                 if (de == NULL) {
117                         rc = -ENOMEM;
118                         break;
119                 }
120
121                 memcpy(de->mde_name, key, key_sz);
122                 de->mde_name[key_sz] = 0;
123
124                 list_add(&de->mde_list, log_list);
125
126 next:
127                 rc = iops->next(env, it);
128         } while (rc == 0);
129         if (rc > 0)
130                 rc = 0;
131
132         iops->put(env, it);
133
134 fini:
135         iops->fini(env, it);
136         if (rc) {
137                 struct mgs_direntry *n;
138
139                 CERROR("%s: key failed when listing %s: rc = %d\n",
140                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
141
142                 list_for_each_entry_safe(de, n, log_list, mde_list) {
143                         list_del_init(&de->mde_list);
144                         mgs_direntry_free(de);
145                 }
146         }
147
148         RETURN(rc);
149 }
150
151 /******************** DB functions *********************/
152
153 static inline int name_create(char **newname, char *prefix, char *suffix)
154 {
155         LASSERT(newname);
156         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
157         if (!*newname)
158                 return -ENOMEM;
159         sprintf(*newname, "%s%s", prefix, suffix);
160         return 0;
161 }
162
163 static inline void name_destroy(char **name)
164 {
165         if (*name)
166                 OBD_FREE(*name, strlen(*name) + 1);
167         *name = NULL;
168 }
169
170 static inline int name_create_osp(char **ospname, char **devtype, char *tgtname,
171                                   int index)
172 {
173         size_t size = strlen(tgtname) + sizeof("-osx-MDTXXXX");
174         char *out = NULL;
175         char *type = NULL;
176
177         if (strstr(tgtname, "-MDT"))
178                 type = "osp";
179         else if (strstr(tgtname, "-OST"))
180                 type = "osc";
181         else
182                 return -EINVAL;
183
184         OBD_ALLOC(out, size);
185         if (!out)
186                 return -ENOMEM;
187
188         if (snprintf(out, size, "%s-%s-MDT%04x", tgtname, type, index) >= size)
189                 return -EOVERFLOW;
190
191         *ospname = out;
192         if (devtype)
193                 *devtype = type;
194
195         return 0;
196 }
197
198 static inline int name_create_lov(char **lovname, char *mdtname)
199 {
200         return name_create(lovname, mdtname, "-mdtlov");
201 }
202
203
204 struct mgs_fsdb_handler_data
205 {
206         struct fs_db   *fsdb;
207         __u32           ver;
208 };
209
210 /* from the (client) config log, figure out:
211  * 1. which ost's/mdt's are configured (by index)
212  * 2. what the last config step is
213  * 3. COMPAT_18 osc name
214 */
215 /* It might be better to have a separate db file, instead of parsing the info
216    out of the client log.  This is slow and potentially error-prone. */
217 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
218                             struct llog_rec_hdr *rec, void *data)
219 {
220         struct mgs_fsdb_handler_data *d = data;
221         struct fs_db *fsdb = d->fsdb;
222         int cfg_len = rec->lrh_len;
223         char *cfg_buf = (char *)(rec + 1);
224         struct lustre_cfg *lcfg;
225         u32 index;
226         int rc = 0;
227
228         ENTRY;
229         if (rec->lrh_type != OBD_CFG_REC) {
230                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
231                 RETURN(-EINVAL);
232         }
233
234         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
235         if (rc) {
236                 CERROR("Insane cfg\n");
237                 RETURN(rc);
238         }
239
240         lcfg = (struct lustre_cfg *)cfg_buf;
241
242         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
243                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
244
245         /* Figure out ost indicies */
246         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
247         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
248             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
249                 rc = kstrtouint(lustre_cfg_string(lcfg, 2), 10, &index);
250                 if (rc)
251                         RETURN(rc);
252
253                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
254                        lustre_cfg_string(lcfg, 1), index,
255                        lustre_cfg_string(lcfg, 2));
256                 set_bit(index, fsdb->fsdb_ost_index_map);
257         }
258
259         /* Figure out mdt indicies */
260         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
261         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
262             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
263                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
264                                        &index, NULL);
265                 if (rc != LDD_F_SV_TYPE_MDT) {
266                         CWARN("Unparsable MDC name %s, assuming index 0\n",
267                               lustre_cfg_string(lcfg, 0));
268                         index = 0;
269                 }
270                 rc = 0;
271                 CDEBUG(D_MGS, "MDT index is %u\n", index);
272                 if (!test_bit(index, fsdb->fsdb_mdt_index_map)) {
273                         set_bit(index, fsdb->fsdb_mdt_index_map);
274                         fsdb->fsdb_mdt_count++;
275                 }
276         }
277
278         /**
279          * figure out the old config. fsdb_gen = 0 means old log
280          * It is obsoleted and not supported anymore
281          */
282         if (fsdb->fsdb_gen == 0) {
283                 CERROR("Old config format is not supported\n");
284                 RETURN(-EINVAL);
285         }
286
287         /*
288          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
289          */
290         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
291             lcfg->lcfg_command == LCFG_ATTACH &&
292             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
293                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
294                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
295                         CWARN("MDT using 1.8 OSC name scheme\n");
296                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
297                 }
298         }
299
300         if (lcfg->lcfg_command == LCFG_MARKER) {
301                 struct cfg_marker *marker;
302
303                 marker = lustre_cfg_buf(lcfg, 1);
304                 d->ver = marker->cm_vers;
305
306                 /* Keep track of the latest marker step */
307                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
308         }
309
310         RETURN(rc);
311 }
312
313 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
314 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
315                                   struct mgs_device *mgs,
316                                   struct fs_db *fsdb)
317 {
318         char *logname;
319         struct llog_handle *loghandle;
320         struct llog_ctxt *ctxt;
321         struct mgs_fsdb_handler_data d = {
322                 .fsdb = fsdb,
323         };
324         int rc;
325
326         ENTRY;
327
328         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
329         LASSERT(ctxt != NULL);
330         rc = name_create(&logname, fsdb->fsdb_name, "-client");
331         if (rc)
332                 GOTO(out_put, rc);
333         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
334         if (rc)
335                 GOTO(out_pop, rc);
336
337         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
338         if (rc)
339                 GOTO(out_close, rc);
340
341         if (llog_get_size(loghandle) <= 1)
342                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
343
344         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
345         CDEBUG(D_INFO, "get_db = %d\n", rc);
346 out_close:
347         llog_close(env, loghandle);
348 out_pop:
349         name_destroy(&logname);
350 out_put:
351         llog_ctxt_put(ctxt);
352
353         RETURN(rc);
354 }
355
356 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
357 {
358         struct mgs_tgt_srpc_conf *tgtconf;
359
360         /* free target-specific rules */
361         while (fsdb->fsdb_srpc_tgt) {
362                 tgtconf = fsdb->fsdb_srpc_tgt;
363                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
364
365                 LASSERT(tgtconf->mtsc_tgt);
366
367                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
368                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
369                 OBD_FREE_PTR(tgtconf);
370         }
371
372         /* free general rules */
373         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
374 }
375
376 static void mgs_unlink_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
377 {
378         mutex_lock(&mgs->mgs_mutex);
379         if (likely(!list_empty(&fsdb->fsdb_list))) {
380                 LASSERTF(atomic_read(&fsdb->fsdb_ref) >= 2,
381                          "Invalid ref %d on %s\n",
382                          atomic_read(&fsdb->fsdb_ref),
383                          fsdb->fsdb_name);
384
385                 list_del_init(&fsdb->fsdb_list);
386                 /* Drop the reference on the list.*/
387                 mgs_put_fsdb(mgs, fsdb);
388         }
389         mutex_unlock(&mgs->mgs_mutex);
390 }
391
392 /* The caller must hold mgs->mgs_mutex. */
393 static inline struct fs_db *
394 mgs_find_fsdb_noref(struct mgs_device *mgs, const char *fsname)
395 {
396         struct fs_db *fsdb;
397         struct list_head *tmp;
398
399         list_for_each(tmp, &mgs->mgs_fs_db_list) {
400                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
401                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
402                         return fsdb;
403         }
404
405         return NULL;
406 }
407
408 /* The caller must hold mgs->mgs_mutex. */
409 static void mgs_remove_fsdb_by_name(struct mgs_device *mgs, const char *name)
410 {
411         struct fs_db *fsdb;
412
413         fsdb = mgs_find_fsdb_noref(mgs, name);
414         if (fsdb) {
415                 list_del_init(&fsdb->fsdb_list);
416                 /* Drop the reference on the list.*/
417                 mgs_put_fsdb(mgs, fsdb);
418         }
419 }
420
421 /* The caller must hold mgs->mgs_mutex. */
422 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, const char *fsname)
423 {
424         struct fs_db *fsdb;
425
426         fsdb = mgs_find_fsdb_noref(mgs, fsname);
427         if (fsdb)
428                 atomic_inc(&fsdb->fsdb_ref);
429
430         return fsdb;
431 }
432
433 /* The caller must hold mgs->mgs_mutex. */
434 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
435                                   struct mgs_device *mgs, char *fsname)
436 {
437         struct fs_db *fsdb;
438         int rc;
439         ENTRY;
440
441         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
442                 CERROR("fsname %s is too long\n", fsname);
443
444                 RETURN(ERR_PTR(-EINVAL));
445         }
446
447         OBD_ALLOC_PTR(fsdb);
448         if (!fsdb)
449                 RETURN(ERR_PTR(-ENOMEM));
450
451         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
452         mutex_init(&fsdb->fsdb_mutex);
453         INIT_LIST_HEAD(&fsdb->fsdb_list);
454         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
455         fsdb->fsdb_gen = 1;
456         INIT_LIST_HEAD(&fsdb->fsdb_clients);
457         atomic_set(&fsdb->fsdb_notify_phase, 0);
458         init_waitqueue_head(&fsdb->fsdb_notify_waitq);
459         init_completion(&fsdb->fsdb_notify_comp);
460
461         if (strcmp(fsname, MGSSELF_NAME) == 0) {
462                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
463                 fsdb->fsdb_mgs = mgs;
464                 if (logname_is_barrier(fsname))
465                         goto add;
466         } else {
467                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
468                 if (!fsdb->fsdb_mdt_index_map) {
469                         CERROR("No memory for MDT index maps\n");
470
471                         GOTO(err, rc = -ENOMEM);
472                 }
473
474                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
475                 if (!fsdb->fsdb_ost_index_map) {
476                         CERROR("No memory for OST index maps\n");
477
478                         GOTO(err, rc = -ENOMEM);
479                 }
480
481                 if (logname_is_barrier(fsname))
482                         goto add;
483
484                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
485                 if (rc)
486                         GOTO(err, rc);
487
488                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
489                 if (rc)
490                         GOTO(err, rc);
491
492                 /* initialise data for NID table */
493                 mgs_ir_init_fs(env, mgs, fsdb);
494                 lproc_mgs_add_live(mgs, fsdb);
495         }
496
497         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
498             strcmp(PARAMS_FILENAME, fsname) != 0) {
499                 /* populate the db from the client llog */
500                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
501                 if (rc) {
502                         CERROR("Can't get db from client log %d\n", rc);
503
504                         GOTO(err, rc);
505                 }
506         }
507
508         /* populate srpc rules from params llog */
509         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
510         if (rc) {
511                 CERROR("Can't get db from params log %d\n", rc);
512
513                 GOTO(err, rc);
514         }
515
516 add:
517         /* One ref is for the fsdb on the list.
518          * The other ref is for the caller. */
519         atomic_set(&fsdb->fsdb_ref, 2);
520         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
521
522         RETURN(fsdb);
523
524 err:
525         atomic_set(&fsdb->fsdb_ref, 1);
526         mgs_put_fsdb(mgs, fsdb);
527
528         RETURN(ERR_PTR(rc));
529 }
530
531 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
532 {
533         LASSERT(list_empty(&fsdb->fsdb_list));
534
535         lproc_mgs_del_live(mgs, fsdb);
536
537         /* deinitialize fsr */
538         if (fsdb->fsdb_mgs)
539                 mgs_ir_fini_fs(mgs, fsdb);
540
541         if (fsdb->fsdb_ost_index_map)
542                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
543         if (fsdb->fsdb_mdt_index_map)
544                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
545         name_destroy(&fsdb->fsdb_clilov);
546         name_destroy(&fsdb->fsdb_clilmv);
547         mgs_free_fsdb_srpc(fsdb);
548         OBD_FREE_PTR(fsdb);
549 }
550
551 void mgs_put_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
552 {
553         if (atomic_dec_and_test(&fsdb->fsdb_ref))
554                 mgs_free_fsdb(mgs, fsdb);
555 }
556
557 int mgs_init_fsdb_list(struct mgs_device *mgs)
558 {
559         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
560         return 0;
561 }
562
563 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
564 {
565         struct fs_db *fsdb;
566         struct list_head *tmp, *tmp2;
567
568         mutex_lock(&mgs->mgs_mutex);
569         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
570                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
571                 list_del_init(&fsdb->fsdb_list);
572                 mgs_put_fsdb(mgs, fsdb);
573         }
574         mutex_unlock(&mgs->mgs_mutex);
575         return 0;
576 }
577
578 /* The caller must hold mgs->mgs_mutex. */
579 int mgs_find_or_make_fsdb_nolock(const struct lu_env *env,
580                                 struct mgs_device *mgs,
581                                 char *name, struct fs_db **dbh)
582 {
583         struct fs_db *fsdb;
584         int rc = 0;
585         ENTRY;
586
587         fsdb = mgs_find_fsdb(mgs, name);
588         if (!fsdb) {
589                 fsdb = mgs_new_fsdb(env, mgs, name);
590                 if (IS_ERR(fsdb))
591                         rc = PTR_ERR(fsdb);
592
593                 CDEBUG(D_MGS, "Created new db: rc = %d\n", rc);
594         }
595
596         if (!rc)
597                 *dbh = fsdb;
598
599         RETURN(rc);
600 }
601
602 int mgs_find_or_make_fsdb(const struct lu_env *env, struct mgs_device *mgs,
603                           char *name, struct fs_db **dbh)
604 {
605         int rc;
606         ENTRY;
607
608         mutex_lock(&mgs->mgs_mutex);
609         rc = mgs_find_or_make_fsdb_nolock(env, mgs, name, dbh);
610         mutex_unlock(&mgs->mgs_mutex);
611
612         RETURN(rc);
613 }
614
615 /* 1 = index in use
616  * 0 = index unused
617  * -1= empty client log
618  */
619 int mgs_check_index(const struct lu_env *env,
620                     struct mgs_device *mgs,
621                     struct mgs_target_info *mti)
622 {
623         struct fs_db *fsdb;
624         void *imap;
625         int rc = 0;
626
627         ENTRY;
628         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
629
630         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
631         if (rc) {
632                 CERROR("Can't get db for %s\n", mti->mti_fsname);
633                 RETURN(rc);
634         }
635
636         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
637                 GOTO(out, rc = -1);
638
639         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
640                 imap = fsdb->fsdb_ost_index_map;
641         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
642                 imap = fsdb->fsdb_mdt_index_map;
643         else
644                 GOTO(out, rc = -EINVAL);
645
646         if (test_bit(mti->mti_stripe_index, imap))
647                 GOTO(out, rc = 1);
648
649         GOTO(out, rc = 0);
650
651 out:
652         mgs_put_fsdb(mgs, fsdb);
653         return rc;
654 }
655
656 static __inline__ int next_index(void *index_map, int map_len)
657 {
658         int i;
659
660         for (i = 0; i < map_len * 8; i++)
661                 if (!test_bit(i, index_map))
662                         return i;
663         CERROR("max index %d exceeded.\n", i);
664         return -1;
665 }
666
667 /* Make the mdt/ost server obd name based on the filesystem name */
668 static bool server_make_name(u32 flags, u16 index, const char *fs,
669                              char *name_buf, size_t name_buf_size)
670 {
671         bool invalid_flag = false;
672
673         if (flags & (LDD_F_SV_TYPE_MDT | LDD_F_SV_TYPE_OST)) {
674                 char reg_flag = '-';
675
676                 if (flags & LDD_F_WRITECONF)
677                         reg_flag = '=';
678                 else if (flags & LDD_F_VIRGIN)
679                         reg_flag = ':';
680
681                 if (!(flags & LDD_F_SV_ALL))
682                         snprintf(name_buf, name_buf_size, "%.8s%c%s%04x", fs,
683                                 reg_flag,
684                                 (flags & LDD_F_SV_TYPE_MDT) ? "MDT" : "OST",
685                                 index);
686         } else if (flags & LDD_F_SV_TYPE_MGS) {
687                 snprintf(name_buf, name_buf_size, "MGS");
688         } else {
689                 CERROR("unknown server type %#x\n", flags);
690                 invalid_flag = true;
691         }
692         return invalid_flag;
693 }
694
695 /* Return codes:
696  * 0  newly marked as in use
697  * <0 err
698  * +EALREADY for update of an old index
699  */
700 static int mgs_set_index(const struct lu_env *env,
701                          struct mgs_device *mgs,
702                          struct mgs_target_info *mti)
703 {
704         struct fs_db *fsdb;
705         void *imap;
706         int rc = 0;
707
708         ENTRY;
709
710         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
711         if (rc) {
712                 CERROR("Can't get db for %s\n", mti->mti_fsname);
713                 RETURN(rc);
714         }
715
716         mutex_lock(&fsdb->fsdb_mutex);
717         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
718                 imap = fsdb->fsdb_ost_index_map;
719         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
720                 imap = fsdb->fsdb_mdt_index_map;
721         else
722                 GOTO(out_up, rc = -EINVAL);
723
724         if (mti->mti_flags & LDD_F_NEED_INDEX) {
725                 rc = next_index(imap, INDEX_MAP_SIZE);
726                 if (rc == -1)
727                         GOTO(out_up, rc = -ERANGE);
728                 mti->mti_stripe_index = rc;
729         }
730
731         /* the last index(0xffff) is reserved for default value. */
732         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
733                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
734                                    "but index must be less than %u.\n",
735                                    mti->mti_svname, mti->mti_stripe_index,
736                                    INDEX_MAP_SIZE * 8 - 1);
737                 GOTO(out_up, rc = -ERANGE);
738         }
739
740         if (test_bit(mti->mti_stripe_index, imap)) {
741                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
742                     !(mti->mti_flags & LDD_F_WRITECONF)) {
743                         LCONSOLE_ERROR_MSG(
744                                 0x140,
745                                 "Server %s requested index %d, but that index is already in use. Use --writeconf to force\n",
746                                 mti->mti_svname,
747                                 mti->mti_stripe_index);
748                         GOTO(out_up, rc = -EADDRINUSE);
749                 } else {
750                         CDEBUG(D_MGS, "Server %s updating index %d\n",
751                                mti->mti_svname, mti->mti_stripe_index);
752                         GOTO(out_up, rc = EALREADY);
753                 }
754         } else {
755                 set_bit(mti->mti_stripe_index, imap);
756                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
757                         fsdb->fsdb_mdt_count++;
758         }
759
760         set_bit(mti->mti_stripe_index, imap);
761         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
762         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
763                              mti->mti_stripe_index, mti->mti_fsname,
764                              mti->mti_svname, sizeof(mti->mti_svname))) {
765                 CERROR("unknown server type %#x\n", mti->mti_flags);
766                 GOTO(out_up, rc = -EINVAL);
767         }
768
769         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
770                mti->mti_stripe_index);
771
772         GOTO(out_up, rc = 0);
773
774 out_up:
775         mutex_unlock(&fsdb->fsdb_mutex);
776         mgs_put_fsdb(mgs, fsdb);
777         return rc;
778 }
779
780 struct mgs_modify_lookup {
781         struct cfg_marker mml_marker;
782         int             mml_modified;
783 };
784
785 enum mgs_search_pool_status {
786         POOL_STATUS_NONE = 0,
787         POOL_STATUS_EXIST,
788         POOL_STATUS_OST_EXIST,
789 };
790
791 struct mgs_search_pool_data {
792         char                            *msp_tgt;
793         char                            *msp_fs;
794         char                            *msp_pool;
795         char                            *msp_ost;
796         enum mgs_search_pool_status     msp_status;
797         bool                            msp_skip;
798 };
799
800 static int mgs_search_pool_cb(const struct lu_env *env,
801                               struct llog_handle *llh,
802                               struct llog_rec_hdr *rec, void *data)
803 {
804         struct mgs_search_pool_data *d = data;
805         struct lustre_cfg *lcfg = REC_DATA(rec);
806         int cfg_len = REC_DATA_LEN(rec);
807         char *fsname;
808         char *poolname;
809         char *ostname = NULL;
810         int rc;
811         ENTRY;
812
813         if (rec->lrh_type != OBD_CFG_REC) {
814                 CDEBUG(D_ERROR, "Unhandled lrh_type: %#x\n", rec->lrh_type);
815                 RETURN(-EINVAL);
816         }
817
818         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
819         if (rc) {
820                 CDEBUG(D_ERROR, "Insane cfg\n");
821                 RETURN(rc);
822         }
823
824         /* check if section is skipped */
825         if (lcfg->lcfg_command == LCFG_MARKER) {
826                 struct cfg_marker *marker = lustre_cfg_buf(lcfg, 1);
827
828                 if (!(marker->cm_flags & CM_END))
829                         RETURN(0);
830
831                 d->msp_skip = (marker->cm_flags & CM_SKIP) ||
832                         strcmp(d->msp_tgt, marker->cm_tgtname) != 0;
833
834                 RETURN(0);
835         }
836
837         if (d->msp_skip)
838                 RETURN(0);
839
840         switch (lcfg->lcfg_command) {
841         case LCFG_POOL_ADD:
842         case LCFG_POOL_REM:
843                 ostname = lustre_cfg_string(lcfg, 3);
844                 fallthrough;
845         case LCFG_POOL_NEW:
846         case LCFG_POOL_DEL:
847                 fsname = lustre_cfg_string(lcfg, 1);
848                 poolname = lustre_cfg_string(lcfg, 2);
849                 break;
850         default:
851                 RETURN(0);
852         }
853
854         if (strcmp(d->msp_fs, fsname) != 0)
855                 RETURN(0);
856         if (strcmp(d->msp_pool, poolname) != 0)
857                 RETURN(0);
858         if (ostname && d->msp_ost && (strcmp(d->msp_ost, ostname) != 0))
859                 RETURN(0);
860
861         /* Found a non-skipped marker match */
862         CDEBUG(D_MGS, "Matched pool rec %u cmd:0x%x %s.%s %s\n",
863                rec->lrh_index, lcfg->lcfg_command, fsname, poolname,
864                ostname ? ostname : "");
865
866         switch (lcfg->lcfg_command) {
867         case LCFG_POOL_ADD:
868                 d->msp_status = POOL_STATUS_OST_EXIST;
869                 RETURN(LLOG_PROC_BREAK);
870         case LCFG_POOL_REM:
871                 d->msp_status = POOL_STATUS_EXIST;
872                 RETURN(LLOG_PROC_BREAK);
873         case LCFG_POOL_NEW:
874                 d->msp_status = POOL_STATUS_EXIST;
875                 RETURN(LLOG_PROC_BREAK);
876         case LCFG_POOL_DEL:
877                 d->msp_status = POOL_STATUS_NONE;
878                 RETURN(LLOG_PROC_BREAK);
879         default:
880                 break;
881         }
882
883         RETURN(0);
884 }
885
886 /**
887  * Search a pool in a MGS configuration.
888  * Return code:
889  * positive - return the status of the pool,
890  * negative - error
891  */
892 static
893 int mgs_search_pool(const struct lu_env *env, struct mgs_device *mgs,
894                     struct fs_db *fsdb, struct mgs_target_info *mti,
895                     char *logname, char *devname, char *fsname, char *poolname,
896                     char *ostname)
897 {
898         struct llog_handle *loghandle;
899         struct llog_ctxt *ctxt;
900         struct mgs_search_pool_data d;
901         int status = POOL_STATUS_NONE;
902         int rc;
903
904         ENTRY;
905
906         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
907
908
909         d.msp_tgt = devname;
910         d.msp_fs = fsname;
911         d.msp_pool = poolname;
912         d.msp_ost = ostname;
913         d.msp_status = POOL_STATUS_NONE;
914         d.msp_skip = false;
915
916         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
917         LASSERT(ctxt != NULL);
918         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
919         if (rc < 0) {
920                 if (rc == -ENOENT)
921                         rc = 0;
922                 GOTO(out_pop, rc);
923         }
924
925         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
926         if (rc)
927                 GOTO(out_close, rc);
928
929         if (llog_get_size(loghandle) <= 1)
930                 GOTO(out_close, rc = 0);
931
932         rc = llog_reverse_process(env, loghandle, mgs_search_pool_cb, &d, NULL);
933         if (rc == LLOG_PROC_BREAK)
934                 status = d.msp_status;
935
936 out_close:
937         llog_close(env, loghandle);
938 out_pop:
939         llog_ctxt_put(ctxt);
940
941         RETURN(rc < 0 ? rc : status);
942 }
943
944 static int mgs_modify_handler(const struct lu_env *env,
945                               struct llog_handle *llh,
946                               struct llog_rec_hdr *rec, void *data)
947 {
948         struct mgs_modify_lookup *mml = data;
949         struct cfg_marker *marker;
950         struct lustre_cfg *lcfg = REC_DATA(rec);
951         int cfg_len = REC_DATA_LEN(rec);
952         int rc;
953
954         ENTRY;
955         if (rec->lrh_type != OBD_CFG_REC) {
956                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
957                 RETURN(-EINVAL);
958         }
959
960         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
961         if (rc) {
962                 CERROR("Insane cfg\n");
963                 RETURN(rc);
964         }
965
966         /* We only care about markers */
967         if (lcfg->lcfg_command != LCFG_MARKER)
968                 RETURN(0);
969
970         marker = lustre_cfg_buf(lcfg, 1);
971         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
972             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
973             !(marker->cm_flags & CM_SKIP)) {
974                 /* Found a non-skipped marker match */
975                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
976                        rec->lrh_index, marker->cm_step,
977                        marker->cm_flags, mml->mml_marker.cm_flags,
978                        marker->cm_tgtname, marker->cm_comment);
979                 /* Overwrite the old marker llog entry */
980                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
981                 marker->cm_flags |= mml->mml_marker.cm_flags;
982                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
983                 rc = llog_write(env, llh, rec, rec->lrh_index);
984                 if (!rc)
985                         mml->mml_modified++;
986         }
987
988         RETURN(rc);
989 }
990
991 /**
992  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
993  * Return code:
994  * 0 - modified successfully,
995  * 1 - no modification was done
996  * negative - error
997  */
998 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
999                       struct fs_db *fsdb, struct mgs_target_info *mti,
1000                       char *logname, char *devname, char *comment, int flags)
1001 {
1002         struct llog_handle *loghandle;
1003         struct llog_ctxt *ctxt;
1004         struct mgs_modify_lookup *mml;
1005         int rc;
1006
1007         ENTRY;
1008
1009         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
1010         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
1011                flags);
1012
1013         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1014         LASSERT(ctxt != NULL);
1015         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
1016         if (rc < 0) {
1017                 if (rc == -ENOENT)
1018                         rc = 0;
1019                 GOTO(out_pop, rc);
1020         }
1021
1022         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1023         if (rc)
1024                 GOTO(out_close, rc);
1025
1026         if (llog_get_size(loghandle) <= 1)
1027                 GOTO(out_close, rc = 0);
1028
1029         OBD_ALLOC_PTR(mml);
1030         if (!mml)
1031                 GOTO(out_close, rc = -ENOMEM);
1032         rc = strscpy(mml->mml_marker.cm_comment, comment,
1033                      sizeof(mml->mml_marker.cm_comment));
1034         if (rc < 0)
1035                 GOTO(out_free, rc);
1036         rc = strscpy(mml->mml_marker.cm_tgtname, devname,
1037                      sizeof(mml->mml_marker.cm_tgtname));
1038         if (rc < 0)
1039                 GOTO(out_free, rc);
1040         /* Modify mostly means cancel */
1041         mml->mml_marker.cm_flags = flags;
1042         mml->mml_marker.cm_canceltime = flags ? ktime_get_real_seconds() : 0;
1043         mml->mml_modified = 0;
1044         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
1045                           NULL);
1046         if (!rc && !mml->mml_modified)
1047                 rc = 1;
1048
1049 out_free:
1050         OBD_FREE_PTR(mml);
1051
1052 out_close:
1053         llog_close(env, loghandle);
1054 out_pop:
1055         if (rc < 0)
1056                 CERROR("%s: modify %s/%s failed: rc = %d\n",
1057                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
1058         llog_ctxt_put(ctxt);
1059         RETURN(rc);
1060 }
1061
1062 enum replace_state {
1063         REPLACE_COPY = 0,
1064         REPLACE_SKIP,
1065         REPLACE_DONE,
1066         REPLACE_UUID,
1067         REPLACE_SETUP
1068 };
1069
1070 /** This structure is passed to mgs_replace_handler */
1071 struct mgs_replace_data {
1072         /* Nids are replaced for this target device */
1073         struct mgs_target_info target;
1074         /* Temporary modified llog */
1075         struct llog_handle *temp_llh;
1076         enum replace_state state;
1077         char *failover;
1078         char *nodeuuid;
1079 };
1080
1081 /**
1082  * Check: a) if block should be skipped
1083  * b) is it target block
1084  *
1085  * \param[in] lcfg
1086  * \param[in] mrd
1087  *
1088  * \retval 0 should not to be skipped
1089  * \retval 1 should to be skipped
1090  */
1091 static int check_markers(struct lustre_cfg *lcfg,
1092                          struct mgs_replace_data *mrd)
1093 {
1094          struct cfg_marker *marker;
1095
1096         /* Track markers. Find given device */
1097         if (lcfg->lcfg_command == LCFG_MARKER) {
1098                 marker = lustre_cfg_buf(lcfg, 1);
1099                 /* Clean llog from records marked as CM_SKIP.
1100                    CM_EXCLUDE records are used for "active" command
1101                    and can be restored if needed */
1102                 if ((marker->cm_flags & (CM_SKIP | CM_START)) ==
1103                     (CM_SKIP | CM_START)) {
1104                         mrd->state = REPLACE_SKIP;
1105                         return 1;
1106                 }
1107
1108                 if ((marker->cm_flags & (CM_SKIP | CM_END)) ==
1109                     (CM_SKIP | CM_END)) {
1110                         mrd->state = REPLACE_COPY;
1111                         return 1;
1112                 }
1113
1114                 if (strcmp(mrd->target.mti_svname, marker->cm_tgtname) == 0) {
1115                         LASSERT(!(marker->cm_flags & CM_START) ||
1116                                 !(marker->cm_flags & CM_END));
1117                         if (marker->cm_flags & CM_START) {
1118                                 if (!strncmp(marker->cm_comment,
1119                                              "add failnid", 11)) {
1120                                         mrd->state = REPLACE_SKIP;
1121                                 } else {
1122                                         mrd->state = REPLACE_UUID;
1123                                         mrd->failover = NULL;
1124                                 }
1125                         } else if (marker->cm_flags & CM_END)
1126                                 mrd->state = REPLACE_COPY;
1127
1128                         if (!strncmp(marker->cm_comment,
1129                                 "add failnid", 11))
1130                                 return 1;
1131                 }
1132         }
1133
1134         return 0;
1135 }
1136
1137 static
1138 int record_base_raw(const struct lu_env *env, struct llog_handle *llh,
1139                     char *cfgname, __u32 num, __u32 flags, lnet_nid_t nid,
1140                     int cmd, char *s1, char *s2, char *s3, char *s4)
1141 {
1142         struct mgs_thread_info  *mgi = mgs_env_info(env);
1143         struct llog_cfg_rec     *lcr;
1144         int rc;
1145
1146         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
1147                cmd, s1, s2, s3, s4);
1148
1149         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
1150         if (s1)
1151                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
1152         if (s2)
1153                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
1154         if (s3)
1155                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
1156         if (s4)
1157                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
1158
1159         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
1160         if (lcr == NULL)
1161                 return -ENOMEM;
1162
1163         lcr->lcr_cfg.lcfg_num = num;
1164         lcr->lcr_cfg.lcfg_flags = flags;
1165         lcr->lcr_cfg.lcfg_nid = nid;
1166         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1167
1168         lustre_cfg_rec_free(lcr);
1169
1170         if (rc < 0)
1171                 CDEBUG(D_MGS,
1172                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
1173                        cfgname, cmd, s1, s2, s3, s4, rc);
1174         return rc;
1175 }
1176
1177 static inline
1178 int record_base(const struct lu_env *env, struct llog_handle *llh,
1179                 char *cfgname, lnet_nid_t nid, int cmd,
1180                 char *s1, char *s2, char *s3, char *s4)
1181 {
1182         return record_base_raw(env, llh, cfgname, 0, 0, nid, cmd,
1183                                s1, s2, s3, s4);
1184 }
1185
1186 static inline int record_add_uuid(const struct lu_env *env,
1187                                   struct llog_handle *llh,
1188                                   struct lnet_nid *nid, char *uuid)
1189 {
1190         lnet_nid_t nid4 = 0;
1191         char *cfg2 = NULL;
1192
1193         if (nid_is_nid4(nid))
1194                 nid4 = lnet_nid_to_nid4(nid);
1195         else
1196                 cfg2 = libcfs_nidstr(nid);
1197         return record_base(env, llh, NULL, nid4, LCFG_ADD_UUID, uuid,
1198                            cfg2, NULL, NULL);
1199 }
1200
1201 static inline int record_add_conn(const struct lu_env *env,
1202                                   struct llog_handle *llh,
1203                                   char *devname, char *uuid)
1204 {
1205         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
1206                            NULL, NULL, NULL);
1207 }
1208
1209 static inline int record_attach(const struct lu_env *env,
1210                                 struct llog_handle *llh, char *devname,
1211                                 char *type, char *uuid)
1212 {
1213         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
1214                            NULL, NULL);
1215 }
1216
1217 static inline int record_setup(const struct lu_env *env,
1218                                struct llog_handle *llh, char *devname,
1219                                char *s1, char *s2, char *s3, char *s4)
1220 {
1221         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
1222 }
1223
1224 /**
1225  * \retval <0 record processing error
1226  * \retval n record is processed. No need copy original one.
1227  * \retval 0 record is not processed.
1228  */
1229 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
1230                            struct mgs_replace_data *mrd)
1231 {
1232         int nids_added = 0;
1233         struct lnet_nid nid;
1234         char *ptr;
1235         int rc = 0;
1236
1237         if (mrd->state == REPLACE_UUID &&
1238             lcfg->lcfg_command == LCFG_ADD_UUID) {
1239                 /* LCFG_ADD_UUID command found. Let's skip original command
1240                    and add passed nids */
1241                 ptr = mrd->target.mti_params;
1242                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1243                         if (!mrd->nodeuuid) {
1244                                 rc = name_create(&mrd->nodeuuid,
1245                                                  libcfs_nidstr(&nid), "");
1246                                 if (rc) {
1247                                         CERROR("Can't create uuid for "
1248                                                 "nid  %s, device %s\n",
1249                                                 libcfs_nidstr(&nid),
1250                                                 mrd->target.mti_svname);
1251                                         return rc;
1252                                 }
1253                         }
1254                         CDEBUG(D_MGS, "add nid %s with uuid %s, device %s\n",
1255                                libcfs_nidstr(&nid),
1256                                mrd->target.mti_params,
1257                                mrd->nodeuuid);
1258                         rc = record_add_uuid(env,
1259                                              mrd->temp_llh, &nid,
1260                                              mrd->nodeuuid);
1261                         if (rc)
1262                                 CWARN("%s: Can't add nid %s for uuid %s :rc=%d\n",
1263                                         mrd->target.mti_svname,
1264                                         libcfs_nidstr(&nid),
1265                                         mrd->nodeuuid, rc);
1266                         else
1267                                 nids_added++;
1268
1269                         if (*ptr == ':') {
1270                                 mrd->failover = ptr;
1271                                 break;
1272                         }
1273                 }
1274
1275                 if (nids_added == 0) {
1276                         CERROR("No new nids were added, nid %s with uuid %s, device %s\n",
1277                                libcfs_nidstr(&nid),
1278                                mrd->nodeuuid ? mrd->nodeuuid : "NULL",
1279                                mrd->target.mti_svname);
1280                         name_destroy(&mrd->nodeuuid);
1281                         return -ENXIO;
1282                 } else {
1283                         mrd->state = REPLACE_SETUP;
1284                 }
1285
1286                 return nids_added;
1287         }
1288
1289         if (mrd->state == REPLACE_SETUP && lcfg->lcfg_command == LCFG_SETUP) {
1290                 /* LCFG_SETUP command found. UUID should be changed */
1291                 rc = record_setup(env,
1292                                   mrd->temp_llh,
1293                                   /* devname the same */
1294                                   lustre_cfg_string(lcfg, 0),
1295                                   /* s1 is not changed */
1296                                   lustre_cfg_string(lcfg, 1),
1297                                   mrd->nodeuuid,
1298                                   /* s3 is not changed */
1299                                   lustre_cfg_string(lcfg, 3),
1300                                   /* s4 is not changed */
1301                                   lustre_cfg_string(lcfg, 4));
1302
1303                 name_destroy(&mrd->nodeuuid);
1304                 if (rc)
1305                         return rc;
1306
1307                 if (mrd->failover) {
1308                         ptr = mrd->failover;
1309                         while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1310                                 if (mrd->nodeuuid == NULL) {
1311                                         rc =  name_create(&mrd->nodeuuid,
1312                                                           libcfs_nidstr(&nid),
1313                                                           "");
1314                                         if (rc)
1315                                                 return rc;
1316                                 }
1317
1318                                 CDEBUG(D_MGS, "add nid %s for failover %s\n",
1319                                        libcfs_nidstr(&nid), mrd->nodeuuid);
1320                                 rc = record_add_uuid(env, mrd->temp_llh, &nid,
1321                                                      mrd->nodeuuid);
1322                                 if (rc) {
1323                                         CWARN("%s: Can't add nid %s for failover %s :rc = %d\n",
1324                                                 mrd->target.mti_svname,
1325                                                 libcfs_nidstr(&nid),
1326                                                 mrd->nodeuuid, rc);
1327                                         name_destroy(&mrd->nodeuuid);
1328                                         return rc;
1329                                 }
1330                                 if (*ptr == ':') {
1331                                         rc = record_add_conn(env,
1332                                                 mrd->temp_llh,
1333                                                 lustre_cfg_string(lcfg, 0),
1334                                                 mrd->nodeuuid);
1335                                         name_destroy(&mrd->nodeuuid);
1336                                         if (rc)
1337                                                 return rc;
1338                                 }
1339                         }
1340                         if (mrd->nodeuuid) {
1341                                 rc = record_add_conn(env, mrd->temp_llh,
1342                                                      lustre_cfg_string(lcfg, 0),
1343                                                      mrd->nodeuuid);
1344                                 name_destroy(&mrd->nodeuuid);
1345                                 if (rc)
1346                                         return rc;
1347                         }
1348                 }
1349                 mrd->state = REPLACE_DONE;
1350                 return rc ? rc : 1;
1351         }
1352
1353         /* All new UUID are added. Skip. */
1354         if (mrd->state == REPLACE_SETUP &&
1355                 lcfg->lcfg_command == LCFG_ADD_UUID)
1356                 return 1;
1357
1358         /* Another commands in target device block */
1359         return 0;
1360 }
1361
1362 /**
1363  * Handler that called for every record in llog.
1364  * Records are processed in order they placed in llog.
1365  *
1366  * \param[in] llh       log to be processed
1367  * \param[in] rec       current record
1368  * \param[in] data      mgs_replace_data structure
1369  *
1370  * \retval 0    success
1371  */
1372 static int mgs_replace_nids_handler(const struct lu_env *env,
1373                                     struct llog_handle *llh,
1374                                     struct llog_rec_hdr *rec,
1375                                     void *data)
1376 {
1377         struct mgs_replace_data *mrd;
1378         struct lustre_cfg *lcfg = REC_DATA(rec);
1379         int cfg_len = REC_DATA_LEN(rec);
1380         int rc;
1381         ENTRY;
1382
1383         mrd = (struct mgs_replace_data *)data;
1384
1385         if (rec->lrh_type != OBD_CFG_REC) {
1386                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
1387                        rec->lrh_type, lcfg->lcfg_command,
1388                        lustre_cfg_string(lcfg, 0),
1389                        lustre_cfg_string(lcfg, 1));
1390                 RETURN(-EINVAL);
1391         }
1392
1393         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1394         if (rc) {
1395                 /* Do not copy any invalidated records */
1396                 GOTO(skip_out, rc = 0);
1397         }
1398
1399         rc = check_markers(lcfg, mrd);
1400         if (rc || mrd->state == REPLACE_SKIP)
1401                 GOTO(skip_out, rc = 0);
1402
1403         /* Write to new log all commands outside target device block */
1404         if (mrd->state == REPLACE_COPY)
1405                 GOTO(copy_out, rc = 0);
1406
1407         if (mrd->state == REPLACE_DONE &&
1408             (lcfg->lcfg_command == LCFG_ADD_UUID ||
1409              lcfg->lcfg_command == LCFG_ADD_CONN)) {
1410                 if (!mrd->failover)
1411                         CWARN("Previous failover is deleted, but new one is "
1412                               "not set. This means you configure system "
1413                               "without failover or passed wrong replace_nids "
1414                               "command parameters. Device %s, passed nids %s\n",
1415                               mrd->target.mti_svname, mrd->target.mti_params);
1416                 GOTO(skip_out, rc = 0);
1417         }
1418
1419         rc = process_command(env, lcfg, mrd);
1420         if (rc < 0)
1421                 RETURN(rc);
1422
1423         if (rc)
1424                 RETURN(0);
1425 copy_out:
1426         /* Record is placed in temporary llog as is */
1427         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1428
1429         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1430                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1431                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1432         RETURN(rc);
1433
1434 skip_out:
1435         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1436                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1437                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1438         RETURN(rc);
1439 }
1440
1441 static int mgs_log_is_empty(const struct lu_env *env,
1442                             struct mgs_device *mgs, char *name)
1443 {
1444         struct llog_ctxt        *ctxt;
1445         int                      rc;
1446
1447         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1448         LASSERT(ctxt != NULL);
1449
1450         rc = llog_is_empty(env, ctxt, name);
1451         llog_ctxt_put(ctxt);
1452         return rc;
1453 }
1454
1455 static int mgs_replace_log(const struct lu_env *env,
1456                            struct obd_device *mgs,
1457                            char *logname, char *devname,
1458                            llog_cb_t replace_handler, void *data)
1459 {
1460         struct llog_handle *orig_llh, *backup_llh;
1461         struct llog_ctxt *ctxt;
1462         struct mgs_replace_data *mrd;
1463         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1464         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1465         char *backup;
1466         int rc, rc2, buf_size;
1467         time64_t now;
1468         ENTRY;
1469
1470         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1471         LASSERT(ctxt != NULL);
1472
1473         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1474                 /* Log is empty. Nothing to replace */
1475                 GOTO(out_put, rc = 0);
1476         }
1477
1478         now = ktime_get_real_seconds();
1479
1480         /* max time64_t in decimal fits into 20 bytes long string */
1481         buf_size = strlen(logname) + 1 + 20 + 1 + strlen(".bak") + 1;
1482         OBD_ALLOC(backup, buf_size);
1483         if (backup == NULL)
1484                 GOTO(out_put, rc = -ENOMEM);
1485
1486         snprintf(backup, buf_size, "%s.%llu.bak", logname, now);
1487
1488         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1489         if (rc == 0) {
1490                 /* Now erase original log file. Connections are not allowed.
1491                    Backup is already saved */
1492                 rc = llog_erase(env, ctxt, NULL, logname);
1493                 if (rc < 0)
1494                         GOTO(out_free, rc);
1495         } else if (rc != -ENOENT) {
1496                 CERROR("%s: can't make backup for %s: rc = %d\n",
1497                        mgs->obd_name, logname, rc);
1498                 GOTO(out_free,rc);
1499         }
1500
1501         /* open local log */
1502         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1503         if (rc)
1504                 GOTO(out_restore, rc);
1505
1506         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1507         if (rc)
1508                 GOTO(out_closel, rc);
1509
1510         /* open backup llog */
1511         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1512                        LLOG_OPEN_EXISTS);
1513         if (rc)
1514                 GOTO(out_closel, rc);
1515
1516         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1517         if (rc)
1518                 GOTO(out_close, rc);
1519
1520         if (llog_get_size(backup_llh) <= 1)
1521                 GOTO(out_close, rc = 0);
1522
1523         OBD_ALLOC_PTR(mrd);
1524         if (!mrd)
1525                 GOTO(out_close, rc = -ENOMEM);
1526         /* devname is only needed information to replace UUID records */
1527         if (devname)
1528                 strscpy(mrd->target.mti_svname, devname,
1529                         sizeof(mrd->target.mti_svname));
1530         /* data is parsed in llog callback */
1531         if (data)
1532                 strscpy(mrd->target.mti_params, data,
1533                         sizeof(mrd->target.mti_params));
1534         /* Copy records to this temporary llog */
1535         mrd->temp_llh = orig_llh;
1536
1537         rc = llog_process(env, backup_llh, replace_handler,
1538                           (void *)mrd, NULL);
1539         OBD_FREE_PTR(mrd);
1540 out_close:
1541         rc2 = llog_close(NULL, backup_llh);
1542         if (!rc)
1543                 rc = rc2;
1544 out_closel:
1545         rc2 = llog_close(NULL, orig_llh);
1546         if (!rc)
1547                 rc = rc2;
1548
1549 out_restore:
1550         if (rc) {
1551                 CERROR("%s: llog should be restored: rc = %d\n",
1552                        mgs->obd_name, rc);
1553                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1554                                   logname);
1555                 if (rc2 < 0)
1556                         CERROR("%s: can't restore backup %s: rc = %d\n",
1557                                mgs->obd_name, logname, rc2);
1558         }
1559
1560 out_free:
1561         OBD_FREE(backup, buf_size);
1562
1563 out_put:
1564         llog_ctxt_put(ctxt);
1565
1566         if (rc)
1567                 CERROR("%s: failed to replace log %s: rc = %d\n",
1568                        mgs->obd_name, logname, rc);
1569
1570         RETURN(rc);
1571 }
1572
1573 static int mgs_replace_nids_log(const struct lu_env *env,
1574                                 struct obd_device *obd,
1575                                 char *logname, char *devname, char *nids)
1576 {
1577         CDEBUG(D_MGS, "Replace NIDs for %s in %s\n", devname, logname);
1578         return mgs_replace_log(env, obd, logname, devname,
1579                                mgs_replace_nids_handler, nids);
1580 }
1581
1582 /**
1583  * Parse device name and get file system name and/or device index
1584  *
1585  * @devname     device name (ex. lustre-MDT0000)
1586  * @fsname      file system name extracted from @devname and returned
1587  *              to the caller (optional)
1588  * @index       device index extracted from @devname and returned to
1589  *              the caller (optional)
1590  *
1591  * RETURN       0                       success if we are only interested in
1592  *                                      extracting fsname from devname.
1593  *                                      i.e index is NULL
1594  *
1595  *              LDD_F_SV_TYPE_*         Besides extracting the fsname the
1596  *                                      user also wants the index. Report to
1597  *                                      the user the type of obd device the
1598  *                                      returned index belongs too.
1599  *
1600  *              -EINVAL                 The obd device name is improper so
1601  *                                      fsname could not be extracted.
1602  *
1603  *              -ENXIO                  Failed to extract the index out of
1604  *                                      the obd device name. Most likely an
1605  *                                      invalid obd device name
1606  */
1607 static int mgs_parse_devname(char *devname, char *fsname, u32 *index)
1608 {
1609         int rc = 0;
1610         ENTRY;
1611
1612         /* Extract fsname */
1613         if (fsname) {
1614                 rc = server_name2fsname(devname, fsname, NULL);
1615                 if (rc < 0) {
1616                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1617                                devname);
1618                         RETURN(-EINVAL);
1619                 }
1620         }
1621
1622         if (index) {
1623                 rc = server_name2index(devname, index, NULL);
1624                 if (rc < 0) {
1625                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1626                                devname);
1627                         RETURN(-ENXIO);
1628                 }
1629         }
1630
1631         /* server_name2index can return LDD_F_SV_TYPE_* so always return rc */
1632         RETURN(rc);
1633 }
1634
1635 /* This is only called during replace_nids */
1636 static int only_mgs_is_running(struct obd_device *mgs_obd)
1637 {
1638         int num_devices = class_obd_devs_count();
1639         int num_exports = 0;
1640         struct obd_export *exp;
1641
1642         spin_lock(&mgs_obd->obd_dev_lock);
1643         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1644                 /* skip self export */
1645                 if (exp == mgs_obd->obd_self_export)
1646                         continue;
1647
1648                 ++num_exports;
1649
1650                 if (num_exports > 1)
1651                         CERROR("%s: node %s still connected during replace_nids connect_flags:%llx\n",
1652                                mgs_obd->obd_name,
1653                                libcfs_nidstr(&exp->exp_nid_stats->nid),
1654                                exp_connect_flags(exp));
1655         }
1656         spin_unlock(&mgs_obd->obd_dev_lock);
1657
1658         /* osd, MGS and MGC + MGC export (nosvc starts MGC)
1659          *  (wc -l /proc/fs/lustre/devices <= 3) && (non self exports == 1)
1660          */
1661         return (num_devices <= 3) && (num_exports <= 1);
1662 }
1663
1664 static int name_create_mdt(char **logname, char *fsname, int mdt_idx)
1665 {
1666         char postfix[9];
1667
1668         if (mdt_idx > INDEX_MAP_MAX_VALUE)
1669                 return -E2BIG;
1670
1671         snprintf(postfix, sizeof(postfix), "-MDT%04x", mdt_idx);
1672         return name_create(logname, fsname, postfix);
1673 }
1674
1675 /**
1676  * Replace nids for \a device to \a nids values
1677  *
1678  * \param obd           MGS obd device
1679  * \param devname       nids need to be replaced for this device
1680  * (ex. lustre-OST0000)
1681  * \param nids          nids list (ex. nid1,nid2,nid3)
1682  *
1683  * \retval 0    success
1684  */
1685 int mgs_replace_nids(const struct lu_env *env,
1686                      struct mgs_device *mgs,
1687                      char *devname, char *nids)
1688 {
1689         /* Assume fsname is part of device name */
1690         char fsname[MTI_NAME_MAXLEN];
1691         int rc;
1692         __u32 index;
1693         char *logname;
1694         struct fs_db *fsdb = NULL;
1695         unsigned int i;
1696         int conn_state;
1697         struct obd_device *mgs_obd = mgs->mgs_obd;
1698         ENTRY;
1699
1700         /* We can only change NIDs if no other nodes are connected */
1701         spin_lock(&mgs_obd->obd_dev_lock);
1702         conn_state = mgs_obd->obd_no_conn;
1703         mgs_obd->obd_no_conn = 1;
1704         spin_unlock(&mgs_obd->obd_dev_lock);
1705
1706         /* We can not change nids if not only MGS is started */
1707         if (!only_mgs_is_running(mgs_obd)) {
1708                 CERROR("Only MGS is allowed to be started\n");
1709                 GOTO(out, rc = -EINPROGRESS);
1710         }
1711
1712         /* Get fsname and index */
1713         rc = mgs_parse_devname(devname, fsname, &index);
1714         if (rc < 0)
1715                 GOTO(out, rc);
1716
1717         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1718         if (rc) {
1719                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1720                 GOTO(out, rc);
1721         }
1722
1723         /* Process client llogs */
1724         rc = name_create(&logname, fsname, "-client");
1725         if (rc)
1726                 GOTO(out, rc);
1727         rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1728         name_destroy(&logname);
1729         if (rc) {
1730                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1731                        fsname, devname, rc);
1732                 GOTO(out, rc);
1733         }
1734
1735         /* Process MDT llogs */
1736         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1737                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1738                         continue;
1739                 rc = name_create_mdt(&logname, fsname, i);
1740                 if (rc)
1741                         GOTO(out, rc);
1742                 rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1743                 name_destroy(&logname);
1744                 if (rc)
1745                         GOTO(out, rc);
1746         }
1747
1748 out:
1749         spin_lock(&mgs_obd->obd_dev_lock);
1750         mgs_obd->obd_no_conn = conn_state;
1751         spin_unlock(&mgs_obd->obd_dev_lock);
1752
1753         if (fsdb)
1754                 mgs_put_fsdb(mgs, fsdb);
1755
1756         RETURN(rc);
1757 }
1758
1759 /**
1760  * This is called for every record in llog. Some of records are
1761  * skipped, others are copied to new log as is.
1762  * Records to be skipped are
1763  *  marker records marked SKIP
1764  *  records enclosed between SKIP markers
1765  *
1766  * \param[in] llh       log to be processed
1767  * \param[in] rec       current record
1768  * \param[in] data      mgs_replace_data structure
1769  *
1770  * \retval 0    success
1771  **/
1772 static int mgs_clear_config_handler(const struct lu_env *env,
1773                                     struct llog_handle *llh,
1774                                     struct llog_rec_hdr *rec, void *data)
1775 {
1776         struct mgs_replace_data *mrd;
1777         struct lustre_cfg *lcfg = REC_DATA(rec);
1778         int cfg_len = REC_DATA_LEN(rec);
1779         int rc;
1780
1781         ENTRY;
1782
1783         mrd = (struct mgs_replace_data *)data;
1784
1785         if (rec->lrh_type != OBD_CFG_REC) {
1786                 CDEBUG(D_MGS, "Config llog Name=%s, Record Index=%u, "
1787                        "Unhandled Record Type=%#x\n", llh->lgh_name,
1788                        rec->lrh_index, rec->lrh_type);
1789                 RETURN(-EINVAL);
1790         }
1791
1792         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1793         if (rc) {
1794                 CDEBUG(D_MGS, "Config llog Name=%s, Invalid config file.",
1795                        llh->lgh_name);
1796                 RETURN(-EINVAL);
1797         }
1798
1799         if (lcfg->lcfg_command == LCFG_MARKER) {
1800                 struct cfg_marker *marker;
1801
1802                 marker = lustre_cfg_buf(lcfg, 1);
1803                 if (marker->cm_flags & CM_SKIP) {
1804                         if (marker->cm_flags & CM_START)
1805                                 mrd->state = REPLACE_SKIP;
1806                         if (marker->cm_flags & CM_END)
1807                                 mrd->state = REPLACE_COPY;
1808                         /* SKIP section started or finished */
1809                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1810                                "cmd %x %s %s\n", rec->lrh_index, rc,
1811                                rec->lrh_len, lcfg->lcfg_command,
1812                                lustre_cfg_string(lcfg, 0),
1813                                lustre_cfg_string(lcfg, 1));
1814                         RETURN(0);
1815                 }
1816         } else {
1817                 if (mrd->state == REPLACE_SKIP) {
1818                         /* record enclosed between SKIP markers, skip it */
1819                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1820                                "cmd %x %s %s\n", rec->lrh_index, rc,
1821                                rec->lrh_len, lcfg->lcfg_command,
1822                                lustre_cfg_string(lcfg, 0),
1823                                lustre_cfg_string(lcfg, 1));
1824                         RETURN(0);
1825                 }
1826         }
1827
1828         /* Record is placed in temporary llog as is */
1829         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1830
1831         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1832                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1833                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1834         RETURN(rc);
1835 }
1836
1837 /*
1838  * Directory CONFIGS/ may contain files which are not config logs to
1839  * be cleared. Skip any llogs with a non-alphanumeric character after
1840  * the last '-'. For example, fsname-MDT0000.sav, fsname-MDT0000.bak,
1841  * fsname-MDT0000.orig, fsname-MDT0000~, fsname-MDT0000.20150516, etc.
1842  */
1843 static bool config_to_clear(const char *logname)
1844 {
1845         int i;
1846         char *str;
1847
1848         str = strrchr(logname, '-');
1849         if (!str)
1850                 return 0;
1851
1852         i = 0;
1853         while (isalnum(str[++i]));
1854         return str[i] == '\0';
1855 }
1856
1857 /**
1858  * Clear config logs for \a name
1859  *
1860  * \param env
1861  * \param mgs           MGS device
1862  * \param name          name of device or of filesystem
1863  *                      (ex. lustre-OST0000 or lustre) in later case all logs
1864  *                      will be cleared
1865  *
1866  * \retval 0            success
1867  */
1868 int mgs_clear_configs(const struct lu_env *env,
1869                      struct mgs_device *mgs, const char *name)
1870 {
1871         struct list_head dentry_list;
1872         struct mgs_direntry *dirent, *n;
1873         char *namedash;
1874         int conn_state;
1875         struct obd_device *mgs_obd = mgs->mgs_obd;
1876         int rc;
1877
1878         ENTRY;
1879
1880         /* Prevent clients and servers from connecting to mgs */
1881         spin_lock(&mgs_obd->obd_dev_lock);
1882         conn_state = mgs_obd->obd_no_conn;
1883         mgs_obd->obd_no_conn = 1;
1884         spin_unlock(&mgs_obd->obd_dev_lock);
1885
1886         /*
1887          * config logs cannot be cleaned if anything other than
1888          * MGS is started
1889          */
1890         if (!only_mgs_is_running(mgs_obd)) {
1891                 CERROR("Only MGS is allowed to be started\n");
1892                 GOTO(out, rc = -EBUSY);
1893         }
1894
1895         /* Find all the logs in the CONFIGS directory */
1896         rc = class_dentry_readdir(env, mgs, &dentry_list);
1897         if (rc) {
1898                 CERROR("%s: cannot read config directory '%s': rc = %d\n",
1899                        mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
1900                 GOTO(out, rc);
1901         }
1902
1903         if (list_empty(&dentry_list)) {
1904                 CERROR("%s: list empty reading config dir '%s': rc = %d\n",
1905                         mgs_obd->obd_name, MOUNT_CONFIGS_DIR, -ENOENT);
1906                 GOTO(out, rc = -ENOENT);
1907         }
1908
1909         OBD_ALLOC(namedash, strlen(name) + 2);
1910         if (namedash == NULL)
1911                 GOTO(out, rc = -ENOMEM);
1912         snprintf(namedash, strlen(name) + 2, "%s-", name);
1913
1914         list_for_each_entry(dirent, &dentry_list, mde_list) {
1915                 if (strcmp(name, dirent->mde_name) &&
1916                     strncmp(namedash, dirent->mde_name, strlen(namedash)))
1917                         continue;
1918                 if (!config_to_clear(dirent->mde_name))
1919                         continue;
1920                 CDEBUG(D_MGS, "%s: Clear config log %s\n",
1921                        mgs_obd->obd_name, dirent->mde_name);
1922                 rc = mgs_replace_log(env, mgs_obd, dirent->mde_name, NULL,
1923                                      mgs_clear_config_handler, NULL);
1924                 if (rc)
1925                         break;
1926         }
1927
1928         list_for_each_entry_safe(dirent, n, &dentry_list, mde_list) {
1929                 list_del_init(&dirent->mde_list);
1930                 mgs_direntry_free(dirent);
1931         }
1932         OBD_FREE(namedash, strlen(name) + 2);
1933 out:
1934         spin_lock(&mgs_obd->obd_dev_lock);
1935         mgs_obd->obd_no_conn = conn_state;
1936         spin_unlock(&mgs_obd->obd_dev_lock);
1937
1938         RETURN(rc);
1939 }
1940
1941 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1942                             char *devname, struct lov_desc *desc)
1943 {
1944         struct mgs_thread_info  *mgi = mgs_env_info(env);
1945         struct llog_cfg_rec     *lcr;
1946         int rc;
1947
1948         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1949         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1950         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1951         if (lcr == NULL)
1952                 return -ENOMEM;
1953
1954         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1955         lustre_cfg_rec_free(lcr);
1956         return rc;
1957 }
1958
1959 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1960                             char *devname, struct lmv_desc *desc)
1961 {
1962         struct mgs_thread_info  *mgi = mgs_env_info(env);
1963         struct llog_cfg_rec     *lcr;
1964         int rc;
1965
1966         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1967         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1968         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1969         if (lcr == NULL)
1970                 return -ENOMEM;
1971
1972         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1973         lustre_cfg_rec_free(lcr);
1974         return rc;
1975 }
1976
1977 static inline int record_mdc_add(const struct lu_env *env,
1978                                  struct llog_handle *llh,
1979                                  char *logname, char *mdcuuid,
1980                                  char *mdtuuid, char *index,
1981                                  char *gen)
1982 {
1983         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1984                            mdtuuid, index, gen, mdcuuid);
1985 }
1986
1987 static inline int record_lov_add(const struct lu_env *env,
1988                                  struct llog_handle *llh,
1989                                  char *lov_name, char *ost_uuid,
1990                                  char *index, char *gen)
1991 {
1992         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1993                            ost_uuid, index, gen, NULL);
1994 }
1995
1996 static inline int record_mount_opt(const struct lu_env *env,
1997                                    struct llog_handle *llh,
1998                                    char *profile, char *lov_name,
1999                                    char *mdc_name)
2000 {
2001         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
2002                            profile, lov_name, mdc_name, NULL);
2003 }
2004
2005 static int record_marker(const struct lu_env *env,
2006                          struct llog_handle *llh,
2007                          struct fs_db *fsdb, __u32 flags,
2008                          char *tgtname, char *comment)
2009 {
2010         struct mgs_thread_info *mgi = mgs_env_info(env);
2011         struct llog_cfg_rec *lcr;
2012         int rc;
2013         int cplen = 0;
2014
2015         if (flags & CM_START)
2016                 fsdb->fsdb_gen++;
2017         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
2018         mgi->mgi_marker.cm_flags = flags;
2019         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
2020         cplen = strscpy(mgi->mgi_marker.cm_tgtname, tgtname,
2021                         sizeof(mgi->mgi_marker.cm_tgtname));
2022         if (cplen < 0)
2023                 return cplen;
2024         cplen = strscpy(mgi->mgi_marker.cm_comment, comment,
2025                         sizeof(mgi->mgi_marker.cm_comment));
2026         if (cplen < 0)
2027                 return cplen;
2028         mgi->mgi_marker.cm_createtime = ktime_get_real_seconds();
2029         mgi->mgi_marker.cm_canceltime = 0;
2030         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2031         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
2032                             sizeof(mgi->mgi_marker));
2033         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
2034         if (lcr == NULL)
2035                 return -ENOMEM;
2036
2037         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
2038         lustre_cfg_rec_free(lcr);
2039         return rc;
2040 }
2041
2042 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
2043                             struct llog_handle **llh, char *name)
2044 {
2045         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
2046         struct llog_ctxt        *ctxt;
2047         int                      rc = 0;
2048         ENTRY;
2049
2050         if (*llh)
2051                 GOTO(out, rc = -EBUSY);
2052
2053         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2054         if (!ctxt)
2055                 GOTO(out, rc = -ENODEV);
2056         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
2057
2058         rc = llog_open_create(env, ctxt, llh, NULL, name);
2059         if (rc)
2060                 GOTO(out_ctxt, rc);
2061         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
2062         if (rc)
2063                 llog_close(env, *llh);
2064 out_ctxt:
2065         llog_ctxt_put(ctxt);
2066 out:
2067         if (rc) {
2068                 CERROR("%s: can't start log %s: rc = %d\n",
2069                        mgs->mgs_obd->obd_name, name, rc);
2070                 *llh = NULL;
2071         }
2072         RETURN(rc);
2073 }
2074
2075 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
2076 {
2077         int rc;
2078
2079         rc = llog_close(env, *llh);
2080         *llh = NULL;
2081
2082         return rc;
2083 }
2084
2085 /******************** config "macros" *********************/
2086
2087 /* write an lcfg directly into a log (with markers) */
2088 static int mgs_write_log_direct(const struct lu_env *env,
2089                                 struct mgs_device *mgs, struct fs_db *fsdb,
2090                                 char *logname, struct llog_cfg_rec *lcr,
2091                                 char *devname, char *comment)
2092 {
2093         struct llog_handle *llh = NULL;
2094         int rc;
2095
2096         ENTRY;
2097
2098         rc = record_start_log(env, mgs, &llh, logname);
2099         if (rc)
2100                 RETURN(rc);
2101
2102         /* FIXME These should be a single journal transaction */
2103         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
2104         if (rc)
2105                 GOTO(out_end, rc);
2106         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
2107         if (rc)
2108                 GOTO(out_end, rc);
2109         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
2110         if (rc)
2111                 GOTO(out_end, rc);
2112 out_end:
2113         record_end_log(env, &llh);
2114         RETURN(rc);
2115 }
2116
2117 /* write the lcfg in all logs for the given fs */
2118 static int mgs_write_log_direct_all(const struct lu_env *env,
2119                                     struct mgs_device *mgs,
2120                                     struct fs_db *fsdb,
2121                                     struct mgs_target_info *mti,
2122                                     struct llog_cfg_rec *lcr, char *devname,
2123                                     char *comment, int server_only)
2124 {
2125         struct list_head         log_list;
2126         struct mgs_direntry     *dirent, *n;
2127         char                    *fsname = mti->mti_fsname;
2128         int                      rc = 0, len = strlen(fsname);
2129
2130         ENTRY;
2131         /* Find all the logs in the CONFIGS directory */
2132         rc = class_dentry_readdir(env, mgs, &log_list);
2133         if (rc)
2134                 RETURN(rc);
2135
2136         /* Could use fsdb index maps instead of directory listing */
2137         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
2138                 list_del_init(&dirent->mde_list);
2139                 /* don't write to sptlrpc rule log */
2140                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
2141                         goto next;
2142
2143                 /* caller wants write server logs only */
2144                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
2145                         goto next;
2146
2147                 if (strlen(dirent->mde_name) <= len ||
2148                     strncmp(fsname, dirent->mde_name, len) != 0 ||
2149                     dirent->mde_name[len] != '-')
2150                         goto next;
2151
2152                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
2153                 /* Erase any old settings of this same parameter */
2154                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
2155                                 devname, comment, CM_SKIP);
2156                 if (rc < 0)
2157                         CERROR("%s: Can't modify llog %s: rc = %d\n",
2158                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2159                 if (lcr == NULL)
2160                         goto next;
2161                 /* Write the new one */
2162                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
2163                                           lcr, devname, comment);
2164                 if (rc != 0)
2165                         CERROR("%s: writing log %s: rc = %d\n",
2166                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2167 next:
2168                 mgs_direntry_free(dirent);
2169         }
2170
2171         RETURN(rc);
2172 }
2173
2174 /**
2175  * Replace a mdt name in MDT configuration name
2176  * mdtname should be formated like this: <fsname>-MDT<mdtindex>
2177  **/
2178 static
2179 int mgs_replace_mdtname(char *name, size_t size, char *mdtsrc, char *mdtnew)
2180 {
2181         char *ptr, *src, *new;
2182         size_t n;
2183
2184         if (!name || !size)
2185                 return 0;
2186
2187         src = strstr(mdtsrc, "-MDT");
2188         new = strstr(mdtnew, "-MDT");
2189         if (!src || !new)
2190                 return -EINVAL;
2191
2192         n = src - mdtsrc;
2193         if (strncmp(name, mdtsrc, n) != 0)
2194                 return -ENOENT;
2195         if (name[n] == '\0')
2196                 return 0;
2197         if (name[n] != '-')
2198                 return -ENOENT;
2199
2200         n = strlen(src);
2201         ptr = strstr(name, src);
2202         while (ptr) {
2203                 char *end = ptr + n;
2204
2205                 if (*end == '\0' || *end == '-' || *end == '_')
2206                         strncpy(ptr, new, n);
2207                 ptr = end;
2208                 ptr = strstr(ptr, src);
2209         }
2210
2211         return 0;
2212 }
2213
2214 static inline
2215 bool mgs_copy_skipped(struct cfg_marker *marker, struct mgs_target_info *mti,
2216                       char *mdtsrc)
2217 {
2218         size_t fsname_len = strlen(mti->mti_fsname);
2219         char *comment = marker->cm_comment;
2220         char *tgtname = marker->cm_tgtname;
2221
2222         if (marker->cm_flags & CM_SKIP)
2223                 return true;
2224
2225         /* fsname does not match? */
2226         if (strncmp(tgtname, mti->mti_fsname, fsname_len) != 0)
2227                 return true;
2228
2229         if (tgtname[fsname_len] != '\0' && tgtname[fsname_len] != '-')
2230                 return true;
2231
2232         /* filter out existing sections */
2233         if (strncmp(comment, "add mdt", 7) == 0 ||
2234             strncmp(comment, "lov setup", 9) == 0 ||
2235             strncmp(comment, "add osp", 7) == 0 ||
2236             strncmp(comment, "add osc", 7) == 0 ||
2237             strncmp(comment, "add failnid", 11) == 0)
2238                 return true;
2239
2240         /* exclude paramater for invalid osp devices (--writeconf case) */
2241         if (strncmp(tgtname, mti->mti_svname, strlen(mti->mti_svname)) == 0)
2242                 return true;
2243
2244         return false;
2245 }
2246
2247 /**
2248  * Update the marker->cm_tgtname with the new MDT target name:
2249  *      marker ... lustre-MDT0000  'mdt.identity_upcall'
2250  * -->  marker ... lustre-MDT0003  'mdt.identity_upcall'
2251  **/
2252 static inline
2253 void mgs_copy_update_marker(char *fsname, struct cfg_marker *marker,
2254                             char *mdtsrc, char *mdtnew)
2255 {
2256         mgs_replace_mdtname(marker->cm_tgtname, sizeof(marker->cm_tgtname),
2257                             mdtsrc, mdtnew);
2258 }
2259
2260 struct mgs_copy_data {
2261         struct llog_handle      *mcd_llh;
2262         struct mgs_target_info  *mcd_mti;
2263         struct fs_db            *mcd_fsdb;
2264         bool                    mcd_skip;
2265 };
2266
2267 /**
2268  * Walk through MDT config log records and convert the related records
2269  * into the target.
2270  *
2271  * The osp sections (add osp/add osc) are excluded. Those sections are
2272  * generated from client configuration by mgs_steal_osp_rec_from_client()
2273  **/
2274 static int mgs_copy_mdt_llog_handler(const struct lu_env *env,
2275                                       struct llog_handle *llh,
2276                                       struct llog_rec_hdr *rec, void *data)
2277 {
2278         struct mgs_copy_data *mcd = data;
2279         struct mgs_target_info *mti = mcd->mcd_mti;
2280         struct llog_handle *mdt_llh = mcd->mcd_llh;
2281         struct fs_db *fsdb = mcd->mcd_fsdb;
2282         int cfg_len = rec->lrh_len;
2283         char *cfg_buf = (char *)(rec + 1);
2284         struct lustre_cfg *lcfg;
2285         char *mdtsrc = llh->lgh_name;
2286         char *s[5] = { 0 };
2287         int i;
2288         int rc = 0;
2289
2290         ENTRY;
2291
2292         if (rec->lrh_type != OBD_CFG_REC) {
2293                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2294                 RETURN(-EINVAL);
2295         }
2296
2297         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
2298         if (rc) {
2299                 CERROR("Insane cfg\n");
2300                 RETURN(rc);
2301         }
2302
2303         lcfg = (struct lustre_cfg *)cfg_buf;
2304
2305         if (lcfg->lcfg_command == LCFG_MARKER) {
2306                 struct cfg_marker *marker = lustre_cfg_buf(lcfg, 1);
2307
2308                 if (marker->cm_flags & CM_START)
2309                         mcd->mcd_skip = mgs_copy_skipped(marker, mti, mdtsrc);
2310
2311                 if (mcd->mcd_skip)
2312                         RETURN(0);
2313
2314                 mgs_copy_update_marker(mti->mti_fsname, marker,
2315                                        mdtsrc, mti->mti_svname);
2316                 rc = record_marker(env, mdt_llh, fsdb, marker->cm_flags,
2317                                    marker->cm_tgtname, marker->cm_comment);
2318                 if (rc)
2319                         RETURN(rc);
2320
2321                 if (marker->cm_flags & CM_END)
2322                         mcd->mcd_skip = true;
2323
2324                 RETURN(0);
2325         }
2326
2327         if (mcd->mcd_skip)
2328                 RETURN(0);
2329
2330         /* init cfg strings with the client record values */
2331         for (i = 0; i < 5; i++)
2332                 s[i] = lustre_cfg_buf(lcfg, i);
2333
2334         /* convert records with the new target name */
2335         switch (lcfg->lcfg_command) {
2336         case LCFG_ATTACH:
2337         case LCFG_DETACH:
2338         case LCFG_ADD_CONN:
2339         case LCFG_DEL_CONN:
2340         case LCFG_SETUP:
2341         case LCFG_CLEANUP:
2342         case LCFG_ADD_MDC:
2343         case LCFG_DEL_MDC:
2344         case LCFG_LOV_ADD_INA:
2345         case LCFG_LOV_ADD_OBD:
2346         case LCFG_LOV_DEL_OBD:
2347                 RETURN(0);
2348                 break;
2349         default:
2350                 rc = mgs_replace_mdtname(s[0], LUSTRE_CFG_BUFLEN(lcfg, 0),
2351                                          mdtsrc, mti->mti_svname);
2352                 if (rc)
2353                         RETURN(rc);
2354                 break;
2355         }
2356
2357         rc = record_base_raw(env, mdt_llh, s[0],
2358                              lcfg->lcfg_num, lcfg->lcfg_flags, lcfg->lcfg_nid,
2359                              lcfg->lcfg_command, s[1], s[2], s[3], s[4]);
2360
2361         RETURN(rc);
2362 }
2363
2364
2365 /* copy an existing MDT configuration records for a new MDT configuration */
2366 static int mgs_copy_llog_from_mdt(const struct lu_env *env,
2367                                    struct mgs_device *mgs,
2368                                    struct mgs_target_info *mti,
2369                                    struct fs_db *fsdb)
2370 {
2371         char *logname = NULL;
2372         struct llog_handle *loghandle;
2373         struct llog_ctxt *ctxt;
2374         struct mgs_copy_data mcd = { 0 };
2375         int i;
2376         int rc;
2377
2378         ENTRY;
2379
2380         if (fsdb->fsdb_mdt_count < 2)
2381                 RETURN(0);
2382
2383         for_each_set_bit(i, fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE) {
2384                 if (mti->mti_stripe_index == i)
2385                         continue;
2386
2387                 rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2388                 if (rc)
2389                         GOTO(out_free, rc);
2390
2391                 if (mgs_log_is_empty(env, mgs, logname)) {
2392                         name_destroy(&logname);
2393                         continue;
2394                 }
2395
2396                 break;
2397         }
2398
2399         /* is there an existing MDT configuration? */
2400         if (!logname)
2401                 RETURN(0);
2402
2403         /* check if MDT source name is valid */
2404         if (strncmp(mti->mti_svname, logname, strlen(mti->mti_fsname) + 1) != 0)
2405                 GOTO(out_free, rc = -EINVAL);
2406         if (strlen(mti->mti_svname) != strlen(logname))
2407                 GOTO(out_free, rc = -EINVAL);
2408
2409         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2410         LASSERT(ctxt != NULL);
2411
2412         rc = llog_open(env, ctxt, &loghandle, NULL, logname,
2413                        LLOG_OPEN_EXISTS);
2414         if (rc < 0)
2415                 GOTO(out_put, rc);
2416
2417         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
2418         if (rc)
2419                 GOTO(out_close, rc);
2420
2421         mcd.mcd_mti = mti;
2422         mcd.mcd_fsdb = fsdb;
2423         mcd.mcd_skip = true;
2424
2425         rc = record_start_log(env, mgs, &mcd.mcd_llh, mti->mti_svname);
2426         if (rc)
2427                 GOTO(out_close, rc);
2428
2429         rc = llog_process_or_fork(env, loghandle, mgs_copy_mdt_llog_handler,
2430                                   &mcd, NULL, false);
2431         if (rc)
2432                 CWARN("%s: Fail to copy records from %s configuration: rc = %d\n",
2433                       mti->mti_svname, logname, rc);
2434
2435         record_end_log(env, &mcd.mcd_llh);
2436
2437         EXIT;
2438 out_close:
2439         llog_close(env, loghandle);
2440 out_put:
2441         llog_ctxt_put(ctxt);
2442 out_free:
2443         name_destroy(&logname);
2444
2445         return rc;
2446 }
2447
2448 struct mgs_steal_data {
2449         struct llog_handle      *msd_llh;
2450         struct fs_db            *msd_fsdb;
2451         struct mgs_target_info  *msd_mti;
2452         bool                    msd_skip;
2453         char                    *msd_ospname;
2454         char                    *msd_lovname;
2455         char                    *msd_lovuuid;
2456         char                    *msd_devtype;
2457 };
2458
2459 static inline
2460 int mgs_steal_update_names(char *tgtname, struct mgs_steal_data *msd)
2461 {
2462         struct mgs_target_info  *mti = msd->msd_mti;
2463         int rc = 0;
2464
2465         name_destroy(&msd->msd_ospname);
2466         name_destroy(&msd->msd_lovname);
2467         name_destroy(&msd->msd_lovuuid);
2468
2469         rc = name_create_osp(&msd->msd_ospname, &msd->msd_devtype, tgtname,
2470                              mti->mti_stripe_index);
2471         if (rc)
2472                 return rc;
2473
2474         rc = name_create_lov(&msd->msd_lovname, mti->mti_svname);
2475         if (rc)
2476                 goto err_free_osp;
2477
2478         rc = name_create(&msd->msd_lovuuid, msd->msd_lovname,  "_UUID");
2479         if (rc)
2480                 goto err_free_lov;
2481
2482         return 0;
2483
2484 err_free_lov:
2485         name_destroy(&msd->msd_lovname);
2486 err_free_osp:
2487         name_destroy(&msd->msd_ospname);
2488
2489         return rc;
2490 }
2491
2492 static inline
2493 int mgs_steal_skipped(struct mgs_steal_data *msd,
2494                       struct cfg_marker *marker)
2495 {
2496         char *tgtname = marker->cm_tgtname;
2497         char *comment = marker->cm_comment;
2498         char *fsname = msd->msd_mti->mti_fsname;
2499         size_t fsname_len = strlen(fsname);
2500
2501         if ((marker->cm_flags & CM_SKIP) ||
2502             strncmp(tgtname, fsname, fsname_len) != 0 ||
2503             tgtname[fsname_len] != '-')
2504                 return true;
2505
2506         if (strncmp(comment, "add mdc", 7) != 0 &&
2507             strncmp(comment, "add osc", 7) != 0 &&
2508             strncmp(comment, "add failnid", 11) != 0)
2509                 return true;
2510
2511         /* check for invalid osp devices (--writeconf case) */
2512         if (strcmp(tgtname, msd->msd_mti->mti_svname) == 0)
2513                 return true;
2514
2515         return false;
2516 }
2517
2518 /**
2519  * Walk through client config log for mdc/osc records and convert the related
2520  * records in osp records for the new MDT target.
2521  **/
2522 static int mgs_steal_client_llog_handler(const struct lu_env *env,
2523                                          struct llog_handle *llh,
2524                                          struct llog_rec_hdr *rec, void *data)
2525 {
2526         struct mgs_steal_data *msd = data;
2527         struct fs_db *fsdb = msd->msd_fsdb;
2528         int cfg_len = rec->lrh_len;
2529         char *cfg_buf = (char *)(rec + 1);
2530         struct lustre_cfg *lcfg;
2531         char *s[5] = { 0 };
2532         int i;
2533         int rc = 0;
2534
2535         ENTRY;
2536
2537         if (rec->lrh_type != OBD_CFG_REC) {
2538                 CERROR("%s: unhandled lrh_type %#x: rc = %d\n",
2539                        llh->lgh_name, rec->lrh_type, -EINVAL);
2540                 RETURN(-EINVAL);
2541         }
2542
2543         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
2544         if (rc) {
2545                 CERROR("%s: insane cfg: rc = %d\n", llh->lgh_name, rc);
2546                 RETURN(rc);
2547         }
2548
2549         lcfg = (struct lustre_cfg *)cfg_buf;
2550
2551         if (lcfg->lcfg_command == LCFG_MARKER) {
2552                 struct cfg_marker *marker = lustre_cfg_buf(lcfg, 1);
2553                 char *comment = marker->cm_comment;
2554
2555                 if (marker->cm_flags & CM_START) {
2556                         msd->msd_skip = mgs_steal_skipped(msd, marker);
2557                         if (msd->msd_skip)
2558                                 RETURN(0);
2559
2560                         rc = mgs_steal_update_names(marker->cm_tgtname, msd);
2561                         if (rc)
2562                                 RETURN(rc);
2563                 }
2564
2565                 if (msd->msd_skip)
2566                         RETURN(0);
2567
2568                 if (strncmp(comment, "add mdc", 7) == 0)
2569                         comment = "add osp";
2570
2571                 rc = record_marker(env, msd->msd_llh, fsdb, marker->cm_flags,
2572                                    marker->cm_tgtname, comment);
2573                 if (rc)
2574                         RETURN(rc);
2575
2576                 if (marker->cm_flags & CM_END)
2577                         msd->msd_skip = true;
2578
2579                 RETURN(0);
2580         }
2581
2582         if (msd->msd_skip)
2583                 RETURN(0);
2584
2585         /* init cfg strings with the client record values */
2586         for (i = 0; i < 5; i++)
2587                 s[i] = lustre_cfg_buf(lcfg, i);
2588
2589         /* convert the mdc records to osp */
2590         switch (lcfg->lcfg_command) {
2591         case LCFG_ADD_UUID:
2592                 break;
2593         case LCFG_ATTACH:
2594                 s[1] = msd->msd_devtype;
2595                 s[2] = msd->msd_lovuuid;
2596                 fallthrough;
2597         case LCFG_DETACH:
2598         case LCFG_ADD_CONN:
2599         case LCFG_DEL_CONN:
2600         case LCFG_SETUP:
2601         case LCFG_CLEANUP:
2602                 s[0] = msd->msd_ospname;
2603                 break;
2604         case LCFG_ADD_MDC:
2605         case LCFG_DEL_MDC:
2606         case LCFG_LOV_ADD_INA:
2607         case LCFG_LOV_ADD_OBD:
2608         case LCFG_LOV_DEL_OBD:
2609                 s[0] = msd->msd_lovname;
2610                 break;
2611         default:
2612                 RETURN(0);
2613         }
2614
2615         rc = record_base_raw(env, msd->msd_llh, s[0],
2616                              lcfg->lcfg_num, lcfg->lcfg_flags, lcfg->lcfg_nid,
2617                              lcfg->lcfg_command, s[1], s[2], s[3], s[4]);
2618
2619         RETURN(rc);
2620 }
2621
2622 /*
2623  * Steal mdc/osc records from the client configuration and convert them to
2624  * create osp devices (OST and MDT) for a new MDT configuration.
2625  *
2626  * fsdb->fsdb_mutex is already held in mgs_write_log_target
2627  * stealed from mgs_get_fsdb_from_llog
2628  */
2629 static int mgs_steal_osp_rec_from_client(const struct lu_env *env,
2630                                          struct mgs_device *mgs,
2631                                          char *client_name,
2632                                          struct mgs_target_info *mti,
2633                                          struct fs_db *fsdb)
2634 {
2635         struct llog_handle *loghandle;
2636         struct llog_ctxt *ctxt;
2637         struct mgs_steal_data msd = { 0 };
2638         int rc;
2639
2640         ENTRY;
2641
2642         if (mgs_log_is_empty(env, mgs, client_name))
2643                 RETURN(0);
2644
2645         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2646         LASSERT(ctxt != NULL);
2647
2648         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
2649                        LLOG_OPEN_EXISTS);
2650         if (rc < 0)
2651                 GOTO(out_pop, rc);
2652
2653         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
2654         if (rc)
2655                 GOTO(out_close, rc);
2656
2657         msd.msd_fsdb = fsdb;
2658         msd.msd_skip = true;
2659         msd.msd_mti = mti;
2660
2661         rc = record_start_log(env, mgs, &msd.msd_llh, mti->mti_svname);
2662         if (rc)
2663                 GOTO(out_close, rc);
2664
2665         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
2666                                   &msd, NULL, false);
2667         if (rc)
2668                 CWARN("%s: Fail to generate osp devices from %s : rc = %d\n",
2669                       mti->mti_svname, client_name, rc);
2670
2671         record_end_log(env, &msd.msd_llh);
2672
2673         name_destroy(&msd.msd_ospname);
2674         name_destroy(&msd.msd_lovname);
2675         name_destroy(&msd.msd_lovuuid);
2676
2677 out_close:
2678         llog_close(env, loghandle);
2679 out_pop:
2680         llog_ctxt_put(ctxt);
2681
2682         RETURN(rc);
2683 }
2684
2685 /* mount opt is the third thing in client logs */
2686 static int mgs_write_log_mount_opt(const struct lu_env *env,
2687                                    struct mgs_device *mgs, struct fs_db *fsdb,
2688                                    char *logname)
2689 {
2690         struct llog_handle *llh = NULL;
2691         int rc = 0;
2692
2693         ENTRY;
2694
2695         CDEBUG(D_MGS, "Writing mount options log for %s\n", logname);
2696
2697         rc = record_start_log(env, mgs, &llh, logname);
2698         if (rc)
2699                 RETURN(rc);
2700
2701         rc = record_marker(env, llh, fsdb, CM_START, logname, "mount opts");
2702         if (rc)
2703                 GOTO(out_end, rc);
2704         rc = record_mount_opt(env, llh, logname, fsdb->fsdb_clilov,
2705                               fsdb->fsdb_clilmv);
2706         if (rc)
2707                 GOTO(out_end, rc);
2708         rc = record_marker(env, llh, fsdb, CM_END, logname, "mount opts");
2709         if (rc)
2710                 GOTO(out_end, rc);
2711 out_end:
2712         record_end_log(env, &llh);
2713         RETURN(rc);
2714 }
2715
2716 /* lmv is the second thing for client logs */
2717 /* copied from mgs_write_log_lov. Please refer to that.  */
2718 static int mgs_write_log_lmv(const struct lu_env *env,
2719                              struct mgs_device *mgs,
2720                              struct fs_db *fsdb,
2721                              struct mgs_target_info *mti,
2722                              char *logname, char *lmvname)
2723 {
2724         struct llog_handle *llh = NULL;
2725         struct lmv_desc *lmvdesc;
2726         char *uuid;
2727         int rc = 0;
2728
2729         ENTRY;
2730         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname, logname);
2731
2732         OBD_ALLOC_PTR(lmvdesc);
2733         if (lmvdesc == NULL)
2734                 RETURN(-ENOMEM);
2735         lmvdesc->ld_active_tgt_count = 0;
2736         lmvdesc->ld_tgt_count = 0;
2737         sprintf((char *)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
2738         uuid = (char *)lmvdesc->ld_uuid.uuid;
2739
2740         rc = record_start_log(env, mgs, &llh, logname);
2741         if (rc)
2742                 GOTO(out_free, rc);
2743         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
2744         if (rc)
2745                 GOTO(out_end, rc);
2746         rc = record_attach(env, llh, lmvname, "lmv", uuid);
2747         if (rc)
2748                 GOTO(out_end, rc);
2749         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
2750         if (rc)
2751                 GOTO(out_end, rc);
2752         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
2753         if (rc)
2754                 GOTO(out_end, rc);
2755 out_end:
2756         record_end_log(env, &llh);
2757 out_free:
2758         OBD_FREE_PTR(lmvdesc);
2759         RETURN(rc);
2760 }
2761
2762 /* lov is the first thing in the mdt and client logs */
2763 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
2764                              struct fs_db *fsdb, struct mgs_target_info *mti,
2765                              char *logname, char *lovname)
2766 {
2767         struct llog_handle *llh = NULL;
2768         struct lov_desc *lovdesc;
2769         char *uuid;
2770         int rc = 0;
2771
2772         ENTRY;
2773         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
2774
2775         /*
2776          * #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
2777          * #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
2778          * uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
2779          */
2780
2781         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
2782         OBD_ALLOC_PTR(lovdesc);
2783         if (lovdesc == NULL)
2784                 RETURN(-ENOMEM);
2785         lovdesc->ld_magic = LOV_DESC_MAGIC;
2786         lovdesc->ld_tgt_count = 0;
2787         /* Defaults.  Can be changed later by lcfg config_param */
2788         lovdesc->ld_default_stripe_count = 1;
2789         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
2790         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
2791         lovdesc->ld_default_stripe_offset = -1;
2792         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
2793         sprintf((char *)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
2794         /* can these be the same? */
2795         uuid = (char *)lovdesc->ld_uuid.uuid;
2796
2797         /* This should always be the first entry in a log.
2798          * rc = mgs_clear_log(obd, logname);
2799          */
2800         rc = record_start_log(env, mgs, &llh, logname);
2801         if (rc)
2802                 GOTO(out_free, rc);
2803         /* FIXME these should be a single journal transaction */
2804         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
2805         if (rc)
2806                 GOTO(out_end, rc);
2807         rc = record_attach(env, llh, lovname, "lov", uuid);
2808         if (rc)
2809                 GOTO(out_end, rc);
2810         rc = record_lov_setup(env, llh, lovname, lovdesc);
2811         if (rc)
2812                 GOTO(out_end, rc);
2813         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
2814         if (rc)
2815                 GOTO(out_end, rc);
2816         EXIT;
2817 out_end:
2818         record_end_log(env, &llh);
2819 out_free:
2820         OBD_FREE_PTR(lovdesc);
2821         return rc;
2822 }
2823
2824 /* add failnids to open log */
2825 static int mgs_write_log_failnids(const struct lu_env *env,
2826                                   struct mgs_target_info *mti,
2827                                   struct llog_handle *llh,
2828                                   char *cliname)
2829 {
2830         char *failnodeuuid = NULL;
2831         char *ptr = mti->mti_params;
2832         struct lnet_nid nid;
2833         int rc = 0;
2834
2835         /*
2836          * #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
2837          * #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
2838          * #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
2839          * #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
2840          * #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
2841          * #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
2842          */
2843
2844         /*
2845          * Pull failnid info out of params string, which may contain something
2846          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
2847          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
2848          * etc.  However, convert_hostnames() should have caught those.
2849          */
2850         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2851                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2852                         char nidstr[LNET_NIDSTR_SIZE];
2853
2854                         if (failnodeuuid == NULL) {
2855                                 /* We don't know the failover node name,
2856                                  * so just use the first nid as the uuid */
2857                                 libcfs_nidstr_r(&nid, nidstr, sizeof(nidstr));
2858                                 rc = name_create(&failnodeuuid, nidstr, "");
2859                                 if (rc != 0)
2860                                         return rc;
2861                         }
2862                         CDEBUG(D_MGS,
2863                                "add nid %s for failover uuid %s, client %s\n",
2864                                libcfs_nidstr_r(&nid, nidstr, sizeof(nidstr)),
2865                                failnodeuuid, cliname);
2866                         rc = record_add_uuid(env, llh, &nid, failnodeuuid);
2867                         /*
2868                          * If *ptr is ':', we have added all NIDs for
2869                          * failnodeuuid.
2870                          */
2871                         if (*ptr == ':') {
2872                                 rc = record_add_conn(env, llh, cliname,
2873                                                      failnodeuuid);
2874                                 name_destroy(&failnodeuuid);
2875                                 failnodeuuid = NULL;
2876                         }
2877                 }
2878                 if (failnodeuuid) {
2879                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2880                         name_destroy(&failnodeuuid);
2881                         failnodeuuid = NULL;
2882                 }
2883         }
2884
2885         return rc;
2886 }
2887
2888 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2889                                     struct mgs_device *mgs,
2890                                     struct fs_db *fsdb,
2891                                     struct mgs_target_info *mti,
2892                                     char *logname, char *lmvname)
2893 {
2894         char tmp[LNET_NIDSTR_SIZE], *nidstr;
2895         struct llog_handle *llh = NULL;
2896         char *mdcname = NULL;
2897         char *nodeuuid = NULL;
2898         char *mdcuuid = NULL;
2899         char *lmvuuid = NULL;
2900         char index[6];
2901         int i, rc;
2902
2903         ENTRY;
2904         if (mgs_log_is_empty(env, mgs, logname)) {
2905                 CERROR("log is empty! Logical error\n");
2906                 RETURN(-EINVAL);
2907         }
2908
2909         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2910                mti->mti_svname, logname, lmvname);
2911
2912         if (!target_supports_large_nid(mti)) {
2913                 libcfs_nid2str_r(mti->mti_nids[0], tmp, sizeof(tmp));
2914                 nidstr = tmp;
2915         } else {
2916                 nidstr = mti->mti_nidlist[0];
2917         }
2918
2919         rc = name_create(&nodeuuid, nidstr, "");
2920         if (rc)
2921                 RETURN(rc);
2922         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2923         if (rc)
2924                 GOTO(out_free, rc);
2925         rc = name_create(&mdcuuid, mdcname, "_UUID");
2926         if (rc)
2927                 GOTO(out_free, rc);
2928         rc = name_create(&lmvuuid, lmvname, "_UUID");
2929         if (rc)
2930                 GOTO(out_free, rc);
2931
2932         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2933                         "add mdc", CM_SKIP);
2934         if (rc < 0)
2935                 GOTO(out_free, rc);
2936
2937         rc = record_start_log(env, mgs, &llh, logname);
2938         if (rc)
2939                 GOTO(out_free, rc);
2940         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2941                            "add mdc");
2942         if (rc)
2943                 GOTO(out_end, rc);
2944
2945         for (i = 0; i < mti->mti_nid_count; i++) {
2946                 struct lnet_nid nid;
2947
2948                 if (target_supports_large_nid(mti)) {
2949                         rc = libcfs_strnid(&nid, mti->mti_nidlist[i]);
2950                         if (rc < 0)
2951                                 GOTO(out_end, rc);
2952                 } else {
2953                         lnet_nid4_to_nid(mti->mti_nids[i], &nid);
2954                 }
2955
2956                 CDEBUG(D_MGS, "add nid %s for mdt\n", libcfs_nidstr(&nid));
2957                 rc = record_add_uuid(env, llh, &nid, nodeuuid);
2958                 if (rc)
2959                         GOTO(out_end, rc);
2960         }
2961
2962         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2963         if (rc)
2964                 GOTO(out_end, rc);
2965         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2966                           NULL, NULL);
2967         if (rc)
2968                 GOTO(out_end, rc);
2969         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2970         if (rc)
2971                 GOTO(out_end, rc);
2972         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2973         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2974                             index, "1");
2975         if (rc)
2976                 GOTO(out_end, rc);
2977         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2978                            "add mdc");
2979         if (rc)
2980                 GOTO(out_end, rc);
2981 out_end:
2982         record_end_log(env, &llh);
2983 out_free:
2984         name_destroy(&lmvuuid);
2985         name_destroy(&mdcuuid);
2986         name_destroy(&mdcname);
2987         name_destroy(&nodeuuid);
2988         RETURN(rc);
2989 }
2990
2991 static int name_create_mdt_and_lov(char **logname, char **lovname,
2992                                    struct fs_db *fsdb, int i)
2993 {
2994         int rc;
2995
2996         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2997         if (rc)
2998                 return rc;
2999         /* COMPAT_180 */
3000         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
3001                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
3002         else
3003                 rc = name_create(lovname, *logname, "-mdtlov");
3004         if (rc) {
3005                 name_destroy(logname);
3006                 *logname = NULL;
3007         }
3008         return rc;
3009 }
3010
3011 /* add new mdc to already existent MDS */
3012 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
3013                                     struct mgs_device *mgs,
3014                                     struct fs_db *fsdb,
3015                                     struct mgs_target_info *mti,
3016                                     int mdt_index, char *logname)
3017 {
3018         char tmp[LNET_NIDSTR_SIZE], *nidstr;
3019         struct llog_handle      *llh = NULL;
3020         char    *nodeuuid = NULL;
3021         char    *ospname = NULL;
3022         char    *lovuuid = NULL;
3023         char    *mdtuuid = NULL;
3024         char    *mdtname = NULL;
3025         char    *lovname = NULL;
3026         char    index_str[16];
3027         int     i, rc;
3028
3029         ENTRY;
3030         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3031                 CERROR("log is empty! Logical error\n");
3032                 RETURN(-EINVAL);
3033         }
3034
3035         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
3036                logname);
3037
3038         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
3039         if (rc)
3040                 RETURN(rc);
3041
3042         if (!target_supports_large_nid(mti)) {
3043                 libcfs_nid2str_r(mti->mti_nids[0], tmp, sizeof(tmp));
3044                 nidstr = tmp;
3045         } else {
3046                 nidstr = mti->mti_nidlist[0];
3047         }
3048
3049         rc = name_create(&nodeuuid, nidstr, "");
3050         if (rc)
3051                 GOTO(out_destory, rc);
3052
3053         rc = name_create_osp(&ospname, NULL, mdtname, mdt_index);
3054         if (rc)
3055                 GOTO(out_destory, rc);
3056
3057         rc = name_create_lov(&lovname, logname);
3058         if (rc)
3059                 GOTO(out_destory, rc);
3060
3061         rc = name_create(&lovuuid, lovname, "_UUID");
3062         if (rc)
3063                 GOTO(out_destory, rc);
3064
3065         rc = name_create(&mdtuuid, mdtname, "_UUID");
3066         if (rc)
3067                 GOTO(out_destory, rc);
3068
3069         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
3070                         "add osp", CM_SKIP);
3071         if (rc < 0)
3072                 GOTO(out_destory, rc);
3073
3074         rc = record_start_log(env, mgs, &llh, logname);
3075         if (rc)
3076                 GOTO(out_destory, rc);
3077
3078         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
3079                            "add osp");
3080         if (rc)
3081                 GOTO(out_destory, rc);
3082
3083         for (i = 0; i < mti->mti_nid_count; i++) {
3084                 struct lnet_nid nid;
3085
3086                 if (target_supports_large_nid(mti)) {
3087                         rc = libcfs_strnid(&nid, mti->mti_nidlist[i]);
3088                         if (rc < 0)
3089                                 GOTO(out_end, rc);
3090                 } else {
3091                         lnet_nid4_to_nid(mti->mti_nids[i], &nid);
3092                 }
3093
3094                 CDEBUG(D_MGS, "add nid %s for mdt\n", libcfs_nidstr(&nid));
3095                 rc = record_add_uuid(env, llh, &nid, nodeuuid);
3096                 if (rc)
3097                         GOTO(out_end, rc);
3098         }
3099
3100         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
3101         if (rc)
3102                 GOTO(out_end, rc);
3103
3104         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
3105                           NULL, NULL);
3106         if (rc)
3107                 GOTO(out_end, rc);
3108
3109         rc = mgs_write_log_failnids(env, mti, llh, ospname);
3110         if (rc)
3111                 GOTO(out_end, rc);
3112
3113         /* Add mdc(osp) to lod */
3114         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
3115         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
3116                          index_str, "1", NULL);
3117         if (rc)
3118                 GOTO(out_end, rc);
3119
3120         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
3121         if (rc)
3122                 GOTO(out_end, rc);
3123
3124 out_end:
3125         record_end_log(env, &llh);
3126
3127 out_destory:
3128         name_destroy(&mdtuuid);
3129         name_destroy(&lovuuid);
3130         name_destroy(&lovname);
3131         name_destroy(&ospname);
3132         name_destroy(&nodeuuid);
3133         name_destroy(&mdtname);
3134         RETURN(rc);
3135 }
3136
3137 static int mgs_write_log_mdt0(const struct lu_env *env,
3138                               struct mgs_device *mgs,
3139                               struct fs_db *fsdb,
3140                               struct mgs_target_info *mti)
3141 {
3142         char *log = mti->mti_svname;
3143         struct llog_handle *llh = NULL;
3144         struct obd_uuid *uuid;
3145         char *lovname;
3146         char mdt_index[6];
3147         char *ptr = mti->mti_params;
3148         int rc = 0, failout = 0;
3149
3150         ENTRY;
3151         OBD_ALLOC_PTR(uuid);
3152         if (uuid == NULL)
3153                 RETURN(-ENOMEM);
3154
3155         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
3156                 failout = (strncmp(ptr, "failout", 7) == 0);
3157
3158         rc = name_create(&lovname, log, "-mdtlov");
3159         if (rc)
3160                 GOTO(out_free, rc);
3161         if (mgs_log_is_empty(env, mgs, log)) {
3162                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
3163                 if (rc)
3164                         GOTO(out_lod, rc);
3165         }
3166
3167         sprintf(mdt_index, "%d", mti->mti_stripe_index);
3168
3169         rc = record_start_log(env, mgs, &llh, log);
3170         if (rc)
3171                 GOTO(out_lod, rc);
3172
3173         /* add MDT itself */
3174
3175         /* FIXME this whole fn should be a single journal transaction */
3176         sprintf(uuid->uuid, "%s_UUID", log);
3177         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
3178         if (rc)
3179                 GOTO(out_lod, rc);
3180         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid->uuid);
3181         if (rc)
3182                 GOTO(out_end, rc);
3183         rc = record_mount_opt(env, llh, log, lovname, NULL);
3184         if (rc)
3185                 GOTO(out_end, rc);
3186         rc = record_setup(env, llh, log, uuid->uuid, mdt_index, lovname,
3187                           failout ? "n" : "f");
3188         if (rc)
3189                 GOTO(out_end, rc);
3190         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
3191         if (rc)
3192                 GOTO(out_end, rc);
3193 out_end:
3194         record_end_log(env, &llh);
3195 out_lod:
3196         name_destroy(&lovname);
3197 out_free:
3198         OBD_FREE_PTR(uuid);
3199         RETURN(rc);
3200 }
3201
3202 /* envelope method for all layers log */
3203 static int mgs_write_log_mdt(const struct lu_env *env,
3204                              struct mgs_device *mgs,
3205                              struct fs_db *fsdb,
3206                              struct mgs_target_info *mti)
3207 {
3208         struct llog_handle *llh = NULL;
3209         char *cliname;
3210         int rc, i = 0;
3211
3212         ENTRY;
3213         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
3214
3215         if (mti->mti_uuid[0] == '\0') {
3216                 /* Make up our own uuid */
3217                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
3218                          "%s_UUID", mti->mti_svname);
3219         }
3220
3221         /* add mdt */
3222         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
3223         if (rc)
3224                 RETURN(rc);
3225
3226         /* Append the mdt info to the client log */
3227         rc = name_create(&cliname, mti->mti_fsname, "-client");
3228         if (rc)
3229                 RETURN(rc);
3230
3231         if (mgs_log_is_empty(env, mgs, cliname)) {
3232                 /* Start client log */
3233                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
3234                                        fsdb->fsdb_clilov);
3235                 if (rc)
3236                         GOTO(out_free, rc);
3237                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
3238                                        fsdb->fsdb_clilmv);
3239                 if (rc)
3240                         GOTO(out_free, rc);
3241                 rc = mgs_write_log_mount_opt(env, mgs, fsdb, cliname);
3242                 if (rc)
3243                         GOTO(out_free, rc);
3244         }
3245
3246         /*
3247          * #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
3248          * #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
3249          * #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
3250          * #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
3251          * #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
3252          */
3253         rc = mgs_steal_osp_rec_from_client(env, mgs, cliname, mti, fsdb);
3254         if (rc)
3255                 GOTO(out_free, rc);
3256
3257         /* Try to copy remaining configurations from an existing MDT target */
3258         rc = mgs_copy_llog_from_mdt(env, mgs, mti, fsdb);
3259         if (rc)
3260                 GOTO(out_free, rc);
3261
3262         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
3263                                       fsdb->fsdb_clilmv);
3264         if (rc)
3265                 GOTO(out_free, rc);
3266
3267         rc = record_start_log(env, mgs, &llh, cliname);
3268         if (rc)
3269                 GOTO(out_free, rc);
3270
3271         /* for_all_existing_mdt except current one */
3272         for_each_set_bit(i, fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE) {
3273                 char *logname;
3274
3275                 if (mti->mti_stripe_index == i)
3276                         continue;
3277
3278                 rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
3279                 if (rc)
3280                         GOTO(out_end, rc);
3281
3282                 /*
3283                  * NB: If the log for the MDT is empty, it means
3284                  * the MDT is only added to the index
3285                  * map, and not being process yet, i.e. this
3286                  * is an unregistered MDT, see mgs_write_log_target().
3287                  * so we should skip it. Otherwise
3288                  *
3289                  * 1. MGS get register request for MDT1 and MDT2.
3290                  *
3291                  * 2. Then both MDT1 and MDT2 are added into
3292                  * fsdb_mdt_index_map. (see mgs_set_index()).
3293                  *
3294                  * 3. Then MDT1 get the lock of fsdb_mutex, then
3295                  * generate the config log, here, it will regard MDT2
3296                  * as an existent MDT, and generate "add osp" for
3297                  * lustre-MDT0001-osp-MDT0002. Note: at the moment
3298                  * MDT0002 config log is still empty, so it will
3299                  * add "add osp" even before "lov setup", which
3300                  * will definitly cause trouble.
3301                  *
3302                  * 4. MDT1 registeration finished, fsdb_mutex is
3303                  * released, then MDT2 get in, then in above
3304                  * mgs_steal_osp_rec_from_client(), it will
3305                  * add another osp log for lustre-MDT0001-osp-MDT0002,
3306                  * which will cause another trouble.
3307                  */
3308                 if (!mgs_log_is_empty(env, mgs, logname))
3309                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti, i,
3310                                                       logname);
3311                 name_destroy(&logname);
3312                 if (rc)
3313                         GOTO(out_end, rc);
3314         }
3315 out_end:
3316         record_end_log(env, &llh);
3317 out_free:
3318         name_destroy(&cliname);
3319         RETURN(rc);
3320 }
3321
3322 /* Add the ost info to the client/mdt lov */
3323 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
3324                                     struct mgs_device *mgs, struct fs_db *fsdb,
3325                                     struct mgs_target_info *mti,
3326                                     char *logname, char *suffix, char *lovname,
3327                                     enum lustre_sec_part sec_part, int flags)
3328 {
3329         char tmp[LNET_NIDSTR_SIZE], *nidstr;
3330         struct llog_handle *llh = NULL;
3331         char *nodeuuid = NULL;
3332         char *oscname = NULL;
3333         char *oscuuid = NULL;
3334         char *lovuuid = NULL;
3335         char *svname = NULL;
3336         char index[6];
3337         int i, rc;
3338
3339         ENTRY;
3340         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
3341                mti->mti_svname, logname);
3342
3343         if (mgs_log_is_empty(env, mgs, logname)) {
3344                 CERROR("log is empty! Logical error\n");
3345                 RETURN(-EINVAL);
3346         }
3347
3348         if (!target_supports_large_nid(mti)) {
3349                 libcfs_nid2str_r(mti->mti_nids[0], tmp, sizeof(tmp));
3350                 nidstr = tmp;
3351         } else {
3352                 nidstr = mti->mti_nidlist[0];
3353         }
3354
3355         rc = name_create(&nodeuuid, nidstr, "");
3356         if (rc)
3357                 RETURN(rc);
3358         rc = name_create(&svname, mti->mti_svname, "-osc");
3359         if (rc)
3360                 GOTO(out_free, rc);
3361
3362         /* for the system upgraded from old 1.8, keep using the old osc naming
3363          * style for mdt, see name_create_mdt_osc(). LU-1257 */
3364         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
3365                 rc = name_create(&oscname, svname, "");
3366         else
3367                 rc = name_create(&oscname, svname, suffix);
3368         if (rc)
3369                 GOTO(out_free, rc);
3370
3371         rc = name_create(&oscuuid, oscname, "_UUID");
3372         if (rc)
3373                 GOTO(out_free, rc);
3374         rc = name_create(&lovuuid, lovname, "_UUID");
3375         if (rc)
3376                 GOTO(out_free, rc);
3377
3378         /*
3379          * #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
3380          * multihomed (#4)
3381          * #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
3382          * #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
3383          * #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
3384          * failover (#6,7)
3385          * #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
3386          * #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
3387          * #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
3388          */
3389
3390         rc = record_start_log(env, mgs, &llh, logname);
3391         if (rc)
3392                 GOTO(out_free, rc);
3393
3394         /* FIXME these should be a single journal transaction */
3395         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
3396                            "add osc");
3397         if (rc)
3398                 GOTO(out_end, rc);
3399
3400         /* NB: don't change record order, because upon MDT steal OSC config
3401          * from client, it treats all nids before LCFG_SETUP as target nids
3402          * (multiple interfaces), while nids after as failover node nids.
3403          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
3404          */
3405         for (i = 0; i < mti->mti_nid_count; i++) {
3406                 struct lnet_nid nid;
3407
3408                 if (target_supports_large_nid(mti)) {
3409                         rc = libcfs_strnid(&nid, mti->mti_nidlist[i]);
3410                         if (rc < 0)
3411                                 GOTO(out_end, rc);
3412                 } else {
3413                         lnet_nid4_to_nid(mti->mti_nids[i], &nid);
3414                 }
3415
3416                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nidstr(&nid));
3417                 rc = record_add_uuid(env, llh, &nid, nodeuuid);
3418                 if (rc)
3419                         GOTO(out_end, rc);
3420         }
3421
3422         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
3423         if (rc)
3424                 GOTO(out_end, rc);
3425         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
3426                           NULL, NULL);
3427         if (rc)
3428                 GOTO(out_end, rc);
3429         rc = mgs_write_log_failnids(env, mti, llh, oscname);
3430         if (rc)
3431                 GOTO(out_end, rc);
3432
3433         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
3434
3435         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
3436         if (rc)
3437                 GOTO(out_end, rc);
3438         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
3439                            "add osc");
3440         if (rc)
3441                 GOTO(out_end, rc);
3442 out_end:
3443         record_end_log(env, &llh);
3444 out_free:
3445         name_destroy(&lovuuid);
3446         name_destroy(&oscuuid);
3447         name_destroy(&oscname);
3448         name_destroy(&svname);
3449         name_destroy(&nodeuuid);
3450         RETURN(rc);
3451 }
3452
3453 static int mgs_write_log_ost(const struct lu_env *env,
3454                              struct mgs_device *mgs, struct fs_db *fsdb,
3455                              struct mgs_target_info *mti)
3456 {
3457         struct llog_handle *llh = NULL;
3458         char *logname, *lovname;
3459         char *ptr = mti->mti_params;
3460         int rc, flags = 0, failout = 0, i;
3461
3462         ENTRY;
3463         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
3464
3465         /* The ost startup log */
3466
3467         /* If the ost log already exists, that means that someone reformatted
3468          * the ost and it called target_add again.
3469          */
3470         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3471                 LCONSOLE_ERROR_MSG(0x141,
3472                                    "The config log for %s already exists, yet the server claims it never registered. It may have been reformatted, or the index changed. writeconf the MDT to regenerate all logs.\n",
3473                                    mti->mti_svname);
3474                 RETURN(-EALREADY);
3475         }
3476
3477         /*
3478          * attach obdfilter ost1 ost1_UUID
3479          * setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
3480          */
3481         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
3482                 failout = (strncmp(ptr, "failout", 7) == 0);
3483         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
3484         if (rc)
3485                 RETURN(rc);
3486         /* FIXME these should be a single journal transaction */
3487         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
3488         if (rc)
3489                 GOTO(out_end, rc);
3490         if (*mti->mti_uuid == '\0')
3491                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
3492                          "%s_UUID", mti->mti_svname);
3493         rc = record_attach(env, llh, mti->mti_svname,
3494                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
3495         if (rc)
3496                 GOTO(out_end, rc);
3497         rc = record_setup(env, llh, mti->mti_svname,
3498                           "dev"/*ignored*/, "type"/*ignored*/,
3499                           failout ? "n" : "f", NULL/*options*/);
3500         if (rc)
3501                 GOTO(out_end, rc);
3502         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
3503         if (rc)
3504                 GOTO(out_end, rc);
3505 out_end:
3506         record_end_log(env, &llh);
3507         if (rc)
3508                 RETURN(rc);
3509         /* We also have to update the other logs where this osc is part of
3510          * the lov
3511          */
3512
3513         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3514                 /* If we're upgrading, the old mdt log already has our
3515                  * entry. Let's do a fake one for fun.
3516                  */
3517                 /* Note that we can't add any new failnids, since we don't
3518                  * know the old osc names.
3519                  */
3520                 flags = CM_SKIP | CM_UPGRADE146;
3521         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
3522                 /* If the update flag isn't set, don't update client/mdt
3523                  * logs.
3524                  */
3525                 flags |= CM_SKIP;
3526                 LCONSOLE_WARN("Client log for %s was not updated; writeconf the MDT first to regenerate it.\n",
3527                         mti->mti_svname);
3528         }
3529
3530         /* Add ost to all MDT lov defs */
3531         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3532                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
3533                         char mdt_index[13];
3534
3535                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
3536                                                      i);
3537                         if (rc)
3538                                 RETURN(rc);
3539
3540                         snprintf(mdt_index, sizeof(mdt_index), "-MDT%04x", i);
3541                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
3542                                                       logname, mdt_index,
3543                                                       lovname, LUSTRE_SP_MDT,
3544                                                       flags);
3545                         name_destroy(&logname);
3546                         name_destroy(&lovname);
3547                         if (rc)
3548                                 RETURN(rc);
3549                 }
3550         }
3551
3552         /* Append ost info to the client log */
3553         rc = name_create(&logname, mti->mti_fsname, "-client");
3554         if (rc)
3555                 RETURN(rc);
3556         if (mgs_log_is_empty(env, mgs, logname)) {
3557                 /* Start client log */
3558                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
3559                                        fsdb->fsdb_clilov);
3560                 if (rc)
3561                         GOTO(out_free, rc);
3562                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
3563                                        fsdb->fsdb_clilmv);
3564                 if (rc)
3565                         GOTO(out_free, rc);
3566                 rc = mgs_write_log_mount_opt(env, mgs, fsdb, logname);
3567                 if (rc)
3568                         GOTO(out_free, rc);
3569         }
3570         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
3571                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
3572 out_free:
3573         name_destroy(&logname);
3574         RETURN(rc);
3575 }
3576
3577 static __inline__ int mgs_param_empty(char *ptr)
3578 {
3579         char *tmp = strchr(ptr, '=');
3580
3581         if (tmp && tmp[1] == '\0')
3582                 return 1;
3583         return 0;
3584 }
3585
3586 static int mgs_write_log_failnid_internal(const struct lu_env *env,
3587                                           struct mgs_device *mgs,
3588                                           struct fs_db *fsdb,
3589                                           struct mgs_target_info *mti,
3590                                           char *logname, char *cliname)
3591 {
3592         int rc;
3593         struct llog_handle *llh = NULL;
3594
3595         if (mgs_param_empty(mti->mti_params)) {
3596                 /* Remove _all_ failnids */
3597                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3598                                 mti->mti_svname, "add failnid", CM_SKIP);
3599                 return rc < 0 ? rc : 0;
3600         }
3601
3602         /* Otherwise failover nids are additive */
3603         rc = record_start_log(env, mgs, &llh, logname);
3604         if (rc)
3605                 return rc;
3606         /* FIXME this should be a single journal transaction */
3607         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
3608                            "add failnid");
3609         if (rc)
3610                 goto out_end;
3611         rc = mgs_write_log_failnids(env, mti, llh, cliname);
3612         if (rc)
3613                 goto out_end;
3614         rc = record_marker(env, llh, fsdb, CM_END,
3615                            mti->mti_svname, "add failnid");
3616 out_end:
3617         record_end_log(env, &llh);
3618         return rc;
3619 }
3620
3621 /* Add additional failnids to an existing log.
3622    The mdc/osc must have been added to logs first */
3623 /* tcp nids must be in dotted-quad ascii -
3624    we can't resolve hostnames from the kernel. */
3625 static int mgs_write_log_add_failnid(const struct lu_env *env,
3626                                      struct mgs_device *mgs,
3627                                      struct fs_db *fsdb,
3628                                      struct mgs_target_info *mti)
3629 {
3630         char *logname, *cliname;
3631         int i;
3632         int rc;
3633
3634         ENTRY;
3635         /* FIXME we currently can't erase the failnids
3636          * given when a target first registers, since they aren't part of
3637          * an "add uuid" stanza
3638          */
3639
3640         /* Verify that we know about this target */
3641         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3642                 LCONSOLE_ERROR_MSG(0x142,
3643                                    "The target %s has not registered yet. It must be started before failnids can be added.\n",
3644                                    mti->mti_svname);
3645                 RETURN(-ENOENT);
3646         }
3647
3648         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
3649         if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
3650                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
3651         else if (mti->mti_flags & LDD_F_SV_TYPE_OST)
3652                 rc = name_create(&cliname, mti->mti_svname, "-osc");
3653         else
3654                 RETURN(-EINVAL);
3655
3656         if (rc)
3657                 RETURN(rc);
3658
3659         /* Add failover nids to the client log */
3660         rc = name_create(&logname, mti->mti_fsname, "-client");
3661         if (rc) {
3662                 name_destroy(&cliname);
3663                 RETURN(rc);
3664         }
3665
3666         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
3667         name_destroy(&logname);
3668         name_destroy(&cliname);
3669         if (rc)
3670                 RETURN(rc);
3671
3672
3673         /* Add OST/MDT failover nids to the MDT logs as well */
3674         for_each_set_bit(i, fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE) {
3675                 /* No osp device fsname-MDTXXXX-osp-MDTXXXX in conf*/
3676                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT
3677                     && mti->mti_stripe_index == i)
3678                         continue;
3679
3680                 rc = name_create_mdt(&logname, mti->mti_fsname, i);
3681                 if (rc)
3682                         RETURN(rc);
3683
3684                 if (mgs_log_is_empty(env, mgs, logname)) {
3685                         name_destroy(&logname);
3686                         continue;
3687                 }
3688
3689                 rc = name_create_osp(&cliname, NULL, mti->mti_svname, i);
3690                 if (rc) {
3691                         name_destroy(&logname);
3692                         RETURN(rc);
3693                 }
3694                 rc = mgs_write_log_failnid_internal(env, mgs, fsdb, mti,
3695                                                     logname, cliname);
3696                 name_destroy(&cliname);
3697                 name_destroy(&logname);
3698                 if (rc)
3699                         RETURN(rc);
3700         }
3701
3702         RETURN(rc);
3703 }
3704
3705 static int mgs_wlp_lcfg(const struct lu_env *env,
3706                         struct mgs_device *mgs, struct fs_db *fsdb,
3707                         struct mgs_target_info *mti,
3708                         char *logname, struct lustre_cfg_bufs *bufs,
3709                         char *tgtname, char *ptr)
3710 {
3711         char comment[MTI_NAME_MAXLEN];
3712         char *tmp;
3713         struct llog_cfg_rec *lcr;
3714         int rc, del;
3715
3716         /* Erase any old settings of this same parameter */
3717         strscpy(comment, ptr, sizeof(comment));
3718         /* But don't try to match the value. */
3719         tmp = strchr(comment, '=');
3720         if (tmp != NULL)
3721                 *tmp = 0;
3722         /* FIXME we should skip settings that are the same as old values */
3723         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
3724         if (rc < 0)
3725                 return rc;
3726         del = mgs_param_empty(ptr);
3727
3728         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
3729                       "Setting" : "Modifying", tgtname, comment, logname);
3730         if (del) {
3731                 /* mgs_modify() will return 1 if nothing had to be done */
3732                 if (rc == 1)
3733                         rc = 0;
3734                 return rc;
3735         }
3736
3737         lustre_cfg_bufs_reset(bufs, tgtname);
3738         lustre_cfg_bufs_set_string(bufs, 1, ptr);
3739         if (mti->mti_flags & LDD_F_PARAM2)
3740                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
3741
3742         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
3743                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
3744         if (lcr == NULL)
3745                 return -ENOMEM;
3746
3747         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
3748                                   comment);
3749         lustre_cfg_rec_free(lcr);
3750         return rc;
3751 }
3752
3753 /* write global variable settings into log */
3754 static int mgs_write_log_sys(const struct lu_env *env,
3755                              struct mgs_device *mgs, struct fs_db *fsdb,
3756                              struct mgs_target_info *mti, char *sys, char *ptr)
3757 {
3758         struct mgs_thread_info  *mgi = mgs_env_info(env);
3759         struct lustre_cfg       *lcfg;
3760         struct llog_cfg_rec     *lcr;
3761         char *tmp, sep;
3762         int rc, cmd, convert = 1;
3763
3764         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
3765                 cmd = LCFG_SET_TIMEOUT;
3766         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
3767                 cmd = LCFG_SET_LDLM_TIMEOUT;
3768         /* Check for known params here so we can return error to lctl */
3769         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
3770                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
3771                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
3772                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
3773                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
3774                 cmd = LCFG_PARAM;
3775         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
3776                 convert = 0; /* Don't convert string value to integer */
3777                 cmd = LCFG_PARAM;
3778         } else {
3779                 return -EINVAL;
3780         }
3781
3782         if (mgs_param_empty(ptr))
3783                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
3784         else
3785                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
3786
3787         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
3788         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
3789         if (!convert && *tmp != '\0')
3790                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
3791         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3792         if (lcr == NULL)
3793                 return -ENOMEM;
3794
3795         lcfg = &lcr->lcr_cfg;
3796         if (convert) {
3797                 rc = kstrtouint(tmp, 0, &lcfg->lcfg_num);
3798                 if (rc)
3799                         GOTO(out_rec_free, rc);
3800         } else {
3801                 lcfg->lcfg_num = 0;
3802         }
3803
3804         /* truncate the comment to the parameter name */
3805         ptr = tmp - 1;
3806         sep = *ptr;
3807         *ptr = '\0';
3808         /* modify all servers and clients */
3809         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3810                                       *tmp == '\0' ? NULL : lcr,
3811                                       mti->mti_fsname, sys, 0);
3812         if (rc == 0 && *tmp != '\0') {
3813                 switch (cmd) {
3814                 case LCFG_SET_TIMEOUT:
3815                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
3816                                 class_process_config(lcfg);
3817                         break;
3818                 case LCFG_SET_LDLM_TIMEOUT:
3819                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
3820                                 class_process_config(lcfg);
3821                         break;
3822                 default:
3823                         break;
3824                 }
3825         }
3826         *ptr = sep;
3827 out_rec_free:
3828         lustre_cfg_rec_free(lcr);
3829         return rc;
3830 }
3831
3832 /* write quota settings into log */
3833 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
3834                                struct fs_db *fsdb, struct mgs_target_info *mti,
3835                                char *quota, char *ptr)
3836 {
3837         struct mgs_thread_info  *mgi = mgs_env_info(env);
3838         struct llog_cfg_rec     *lcr;
3839         char                    *tmp;
3840         char                     sep;
3841         int                      rc, cmd = LCFG_PARAM;
3842
3843         /* support only 'meta' and 'data' pools so far */
3844         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
3845             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
3846                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
3847                        "& quota.ost are)\n", ptr);
3848                 return -EINVAL;
3849         }
3850
3851         if (*tmp == '\0') {
3852                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
3853         } else {
3854                 CDEBUG(D_MGS, "global '%s'\n", quota);
3855
3856                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
3857                     strchr(tmp, 'p') == NULL &&
3858                     strcmp(tmp, "none") != 0) {
3859                         CERROR("enable option(%s) isn't supported\n", tmp);
3860                         return -EINVAL;
3861                 }
3862         }
3863
3864         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
3865         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
3866         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3867         if (lcr == NULL)
3868                 return -ENOMEM;
3869
3870         /* truncate the comment to the parameter name */
3871         ptr = tmp - 1;
3872         sep = *ptr;
3873         *ptr = '\0';
3874
3875         /* XXX we duplicated quota enable information in all server
3876          *     config logs, it should be moved to a separate config
3877          *     log once we cleanup the config log for global param. */
3878         /* modify all servers */
3879         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3880                                       *tmp == '\0' ? NULL : lcr,
3881                                       mti->mti_fsname, quota, 1);
3882         *ptr = sep;
3883         lustre_cfg_rec_free(lcr);
3884         return rc < 0 ? rc : 0;
3885 }
3886
3887 static int mgs_srpc_set_param_disk(const struct lu_env *env,
3888                                    struct mgs_device *mgs,
3889                                    struct fs_db *fsdb,
3890                                    struct mgs_target_info *mti,
3891                                    char *param)
3892 {
3893         struct mgs_thread_info  *mgi = mgs_env_info(env);
3894         struct llog_cfg_rec     *lcr;
3895         struct llog_handle      *llh = NULL;
3896         char                    *logname;
3897         char                    *comment, *ptr;
3898         int                      rc, len;
3899
3900         ENTRY;
3901
3902         /* get comment */
3903         ptr = strchr(param, '=');
3904         LASSERT(ptr != NULL);
3905         len = ptr - param;
3906
3907         OBD_ALLOC(comment, len + 1);
3908         if (comment == NULL)
3909                 RETURN(-ENOMEM);
3910         strncpy(comment, param, len);
3911         comment[len] = '\0';
3912
3913         /* prepare lcfg */
3914         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
3915         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
3916         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
3917         if (lcr == NULL)
3918                 GOTO(out_comment, rc = -ENOMEM);
3919
3920         /* construct log name */
3921         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
3922         if (rc < 0)
3923                 GOTO(out_lcfg, rc);
3924
3925         if (mgs_log_is_empty(env, mgs, logname)) {
3926                 rc = record_start_log(env, mgs, &llh, logname);
3927                 if (rc < 0)
3928                         GOTO(out, rc);
3929                 record_end_log(env, &llh);
3930         }
3931
3932         /* obsolete old one */
3933         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
3934                         comment, CM_SKIP);
3935         if (rc < 0)
3936                 GOTO(out, rc);
3937         /* write the new one */
3938         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
3939                                   mti->mti_svname, comment);
3940         if (rc)
3941                 CERROR("%s: error writing log %s: rc = %d\n",
3942                        mgs->mgs_obd->obd_name, logname, rc);
3943 out:
3944         name_destroy(&logname);
3945 out_lcfg:
3946         lustre_cfg_rec_free(lcr);
3947 out_comment:
3948         OBD_FREE(comment, len + 1);
3949         RETURN(rc);
3950 }
3951
3952 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3953                                         char *param)
3954 {
3955         char    *ptr;
3956
3957         /* disable the adjustable udesc parameter for now, i.e. use default
3958          * setting that client always ship udesc to MDT if possible. to enable
3959          * it simply remove the following line
3960          */
3961         goto error_out;
3962
3963         ptr = strchr(param, '=');
3964         if (ptr == NULL)
3965                 goto error_out;
3966         *ptr++ = '\0';
3967
3968         if (strcmp(param, PARAM_SRPC_UDESC))
3969                 goto error_out;
3970
3971         if (strcmp(ptr, "yes") == 0) {
3972                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3973                 CWARN("Enable user descriptor shipping from client to MDT\n");
3974         } else if (strcmp(ptr, "no") == 0) {
3975                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3976                 CWARN("Disable user descriptor shipping from client to MDT\n");
3977         } else {
3978                 *(ptr - 1) = '=';
3979                 goto error_out;
3980         }
3981         return 0;
3982
3983 error_out:
3984         CERROR("Invalid param: %s\n", param);
3985         return -EINVAL;
3986 }
3987
3988 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3989                                   const char *svname,
3990                                   char *param)
3991 {
3992         struct sptlrpc_rule rule;
3993         struct sptlrpc_rule_set *rset;
3994         int rc;
3995
3996         ENTRY;
3997         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3998                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3999                 RETURN(-EINVAL);
4000         }
4001
4002         if (strncmp(param, PARAM_SRPC_UDESC,
4003                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
4004                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
4005         }
4006
4007         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
4008                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
4009                 RETURN(-EINVAL);
4010         }
4011
4012         param += sizeof(PARAM_SRPC_FLVR) - 1;
4013
4014         rc = sptlrpc_parse_rule(param, &rule);
4015         if (rc)
4016                 RETURN(rc);
4017
4018         /* mgs rules implies must be mgc->mgs */
4019         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
4020                 if ((rule.sr_from != LUSTRE_SP_MGC &&
4021                      rule.sr_from != LUSTRE_SP_ANY) ||
4022                     (rule.sr_to != LUSTRE_SP_MGS &&
4023                      rule.sr_to != LUSTRE_SP_ANY))
4024                         RETURN(-EINVAL);
4025         }
4026
4027         /* prepare room for this coming rule. svcname format should be:
4028          * - fsname: general rule
4029          * - fsname-tgtname: target-specific rule
4030          */
4031         if (strchr(svname, '-')) {
4032                 struct mgs_tgt_srpc_conf *tgtconf;
4033                 int found = 0;
4034
4035                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
4036                      tgtconf = tgtconf->mtsc_next) {
4037                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
4038                                 found = 1;
4039                                 break;
4040                         }
4041                 }
4042
4043                 if (!found) {
4044                         int name_len;
4045
4046                         OBD_ALLOC_PTR(tgtconf);
4047                         if (tgtconf == NULL)
4048                                 RETURN(-ENOMEM);
4049
4050                         name_len = strlen(svname);
4051
4052                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
4053                         if (tgtconf->mtsc_tgt == NULL) {
4054                                 OBD_FREE_PTR(tgtconf);
4055                                 RETURN(-ENOMEM);
4056                         }
4057                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
4058
4059                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
4060                         fsdb->fsdb_srpc_tgt = tgtconf;
4061                 }
4062
4063                 rset = &tgtconf->mtsc_rset;
4064         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
4065                 /* put _mgs related srpc rule directly in mgs ruleset */
4066                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
4067         } else {
4068                 rset = &fsdb->fsdb_srpc_gen;
4069         }
4070
4071         rc = sptlrpc_rule_set_merge(rset, &rule);
4072
4073         RETURN(rc);
4074 }
4075
4076 static int mgs_srpc_set_param(const struct lu_env *env,
4077                               struct mgs_device *mgs,
4078                               struct fs_db *fsdb,
4079                               struct mgs_target_info *mti,
4080                               char *param)
4081 {
4082         char *copy;
4083         int rc, copy_size;
4084
4085         ENTRY;
4086 #ifndef HAVE_GSS
4087         RETURN(-EINVAL);
4088 #endif
4089         /* keep a copy of original param, which could be destroyed
4090          * during parsing
4091          */
4092         copy_size = strlen(param) + 1;
4093         OBD_ALLOC(copy, copy_size);
4094         if (copy == NULL)
4095                 return -ENOMEM;
4096         memcpy(copy, param, copy_size);
4097
4098         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
4099         if (rc)
4100                 goto out_free;
4101
4102         /* previous steps guaranteed the syntax is correct */
4103         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
4104         if (rc)
4105                 goto out_free;
4106
4107         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
4108                 /*
4109                  * for mgs rules, make them effective immediately.
4110                  */
4111                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
4112                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
4113                                                  &fsdb->fsdb_srpc_gen);
4114         }
4115
4116 out_free:
4117         OBD_FREE(copy, copy_size);
4118         RETURN(rc);
4119 }
4120
4121 struct mgs_srpc_read_data {
4122         struct fs_db   *msrd_fsdb;
4123         int             msrd_skip;
4124 };
4125
4126 static int mgs_srpc_read_handler(const struct lu_env *env,
4127                                  struct llog_handle *llh,
4128                                  struct llog_rec_hdr *rec, void *data)
4129 {
4130         struct mgs_srpc_read_data *msrd = data;
4131         struct cfg_marker         *marker;
4132         struct lustre_cfg         *lcfg = REC_DATA(rec);
4133         char                      *svname, *param;
4134         int                        cfg_len, rc;
4135
4136         ENTRY;
4137         if (rec->lrh_type != OBD_CFG_REC) {
4138                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
4139                 RETURN(-EINVAL);
4140         }
4141
4142         cfg_len = REC_DATA_LEN(rec);
4143
4144         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
4145         if (rc) {
4146                 CERROR("Insane cfg\n");
4147                 RETURN(rc);
4148         }
4149
4150         if (lcfg->lcfg_command == LCFG_MARKER) {
4151                 marker = lustre_cfg_buf(lcfg, 1);
4152
4153                 if (marker->cm_flags & CM_START &&
4154                     marker->cm_flags & CM_SKIP)
4155                         msrd->msrd_skip = 1;
4156                 if (marker->cm_flags & CM_END)
4157                         msrd->msrd_skip = 0;
4158
4159                 RETURN(0);
4160         }
4161
4162         if (msrd->msrd_skip)
4163                 RETURN(0);
4164
4165         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
4166                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
4167                 RETURN(0);
4168         }
4169
4170         svname = lustre_cfg_string(lcfg, 0);
4171         if (svname == NULL) {
4172                 CERROR("svname is empty\n");
4173                 RETURN(0);
4174         }
4175
4176         param = lustre_cfg_string(lcfg, 1);
4177         if (param == NULL) {
4178                 CERROR("param is empty\n");
4179                 RETURN(0);
4180         }
4181
4182         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
4183         if (rc)
4184                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
4185
4186         RETURN(0);
4187 }
4188
4189 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
4190                                 struct mgs_device *mgs,
4191                                 struct fs_db *fsdb)
4192 {
4193         struct llog_handle      *llh = NULL;
4194         struct llog_ctxt        *ctxt;
4195         char                    *logname;
4196         struct mgs_srpc_read_data  msrd;
4197         int                     rc;
4198
4199         ENTRY;
4200         /* construct log name */
4201         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
4202         if (rc)
4203                 RETURN(rc);
4204
4205         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4206         LASSERT(ctxt != NULL);
4207
4208         if (mgs_log_is_empty(env, mgs, logname))
4209                 GOTO(out, rc = 0);
4210
4211         rc = llog_open(env, ctxt, &llh, NULL, logname,
4212                        LLOG_OPEN_EXISTS);
4213         if (rc < 0) {
4214                 if (rc == -ENOENT)
4215                         rc = 0;
4216                 GOTO(out, rc);
4217         }
4218
4219         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
4220         if (rc)
4221                 GOTO(out_close, rc);
4222
4223         if (llog_get_size(llh) <= 1)
4224                 GOTO(out_close, rc = 0);
4225
4226         msrd.msrd_fsdb = fsdb;
4227         msrd.msrd_skip = 0;
4228
4229         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
4230                           NULL);
4231
4232 out_close:
4233         llog_close(env, llh);
4234 out:
4235         llog_ctxt_put(ctxt);
4236         name_destroy(&logname);
4237
4238         if (rc)
4239                 CERROR("failed to read sptlrpc config database: %d\n", rc);
4240         RETURN(rc);
4241 }
4242
4243 static int mgs_write_log_param2(const struct lu_env *env,
4244                                 struct mgs_device *mgs,
4245                                 struct fs_db *fsdb,
4246                                 struct mgs_target_info *mti, char *ptr)
4247 {
4248         struct lustre_cfg_bufs bufs;
4249         int rc;
4250
4251         ENTRY;
4252         CDEBUG(D_MGS, "next param '%s'\n", ptr);
4253
4254         /* PARAM_MGSNODE and PARAM_NETWORK are set only when formating
4255          * or during the inital mount. It can never change after that.
4256          */
4257         if (!class_match_param(ptr, PARAM_MGSNODE, NULL) ||
4258             !class_match_param(ptr, PARAM_NETWORK, NULL)) {
4259                 rc = 0;
4260                 goto end;
4261         }
4262
4263         /* Processed in mgs_write_log_ost. Another value that can't
4264          * be changed by lctl set_param -P.
4265          */
4266         if (!class_match_param(ptr, PARAM_FAILMODE, NULL)) {
4267                 LCONSOLE_ERROR_MSG(0x169,
4268                                    "%s can only be changed with tunefs.lustre and --writeconf\n",
4269                                    ptr);
4270                 rc = -EPERM;
4271                 goto end;
4272         }
4273
4274         /* FIXME !!! Support for sptlrpc is incomplete. Currently the change
4275          * doesn't transmit to the client. See LU-7183.
4276          */
4277         if (!class_match_param(ptr, PARAM_SRPC, NULL)) {
4278                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
4279                 goto end;
4280         }
4281
4282         /* Can't use class_match_param since ptr doesn't start with
4283          * PARAM_FAILNODE. So we look for PARAM_FAILNODE contained in ptr.
4284          */
4285         if (strstr(ptr, PARAM_FAILNODE)) {
4286                 /* Add a failover nidlist. We already processed failovers
4287                  * params for new targets in mgs_write_log_target.
4288                  */
4289                 const char *param;
4290
4291                 /* can't use wildcards with failover.node */
4292                 if (strchr(ptr, '*')) {
4293                         rc = -ENODEV;
4294                         goto end;
4295                 }
4296
4297                 param = strstr(ptr, PARAM_FAILNODE);
4298                 rc = strscpy(mti->mti_params, param, sizeof(mti->mti_params));
4299                 if (rc < 0)
4300                         goto end;
4301
4302                 CDEBUG(D_MGS, "Adding failnode with param %s\n",
4303                        mti->mti_params);
4304                 rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
4305                 goto end;
4306         }
4307
4308         /* root squash parameters must not be set on llite subsystem, this can
4309          * lead to inconsistencies between client and server values
4310          */
4311         if ((strstr(ptr, PARAM_NOSQUASHNIDS) ||
4312              strstr(ptr, PARAM_ROOTSQUASH)) &&
4313             strncmp(ptr, "llite.", strlen("llite.")) == 0) {
4314                 rc = -EINVAL;
4315                 CWARN("%s: cannot add %s param to llite subsystem, use mdt instead: rc=%d\n",
4316                       mgs->mgs_obd->obd_name,
4317                       strstr(ptr, PARAM_ROOTSQUASH) ?
4318                         PARAM_ROOTSQUASH : PARAM_NOSQUASHNIDS,
4319                       rc);
4320                 goto end;
4321         }
4322
4323         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
4324                           mti->mti_svname, ptr);
4325 end:
4326         RETURN(rc);
4327 }
4328
4329 /* Permanent settings of all parameters by writing into the appropriate
4330  * configuration logs.
4331  * A parameter with null value ("<param>='\0'") means to erase it out of
4332  * the logs.
4333  */
4334 static int mgs_write_log_param(const struct lu_env *env,
4335                                struct mgs_device *mgs, struct fs_db *fsdb,
4336                                struct mgs_target_info *mti, char *ptr)
4337 {
4338         struct mgs_thread_info *mgi = mgs_env_info(env);
4339         char *logname;
4340         char *tmp;
4341         int rc = 0;
4342         ENTRY;
4343
4344         /* For various parameter settings, we have to figure out which logs
4345          * care about them (e.g. both mdt and client for lov settings)
4346          */
4347         CDEBUG(D_MGS, "next param '%s'\n", ptr);
4348
4349         /* The params are stored in MOUNT_DATA_FILE and modified via
4350          * tunefs.lustre, or set using lctl conf_param
4351          */
4352
4353         /* Processed in lustre_start_mgc */
4354         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
4355                 GOTO(end, rc);
4356
4357         /* Processed in ost/mdt */
4358         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
4359                 GOTO(end, rc);
4360
4361         /* Processed in mgs_write_log_ost */
4362         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
4363                 if (mti->mti_flags & LDD_F_PARAM) {
4364                         LCONSOLE_ERROR_MSG(0x169,
4365                                            "%s can only be changed with tunefs.lustre and --writeconf\n",
4366                                            ptr);
4367                         rc = -EPERM;
4368                 }
4369                 GOTO(end, rc);
4370         }
4371
4372         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
4373                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
4374                 GOTO(end, rc);
4375         }
4376
4377         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
4378                 /* Add a failover nidlist */
4379                 rc = 0;
4380                 /* We already processed failovers params for new
4381                  * targets in mgs_write_log_target
4382                  */
4383                 if (mti->mti_flags & LDD_F_PARAM) {
4384                         CDEBUG(D_MGS, "Adding failnode\n");
4385                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
4386                 }
4387                 GOTO(end, rc);
4388         }
4389
4390         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
4391                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
4392                 GOTO(end, rc);
4393         }
4394
4395         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
4396                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
4397                 GOTO(end, rc);
4398         }
4399
4400         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
4401             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
4402                 /* active=0 means off, anything else means on */
4403                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
4404                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
4405                                            strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
4406                 int i;
4407
4408                 if (!deactive_osc) {
4409                         __u32   index;
4410
4411                         rc = server_name2index(mti->mti_svname, &index, NULL);
4412                         if (rc < 0)
4413                                 GOTO(end, rc);
4414
4415                         if (index == 0) {
4416                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
4417                                                    " (de)activated.\n",
4418                                                    mti->mti_svname);
4419                                 GOTO(end, rc = -EPERM);
4420                         }
4421                 }
4422
4423                 LCONSOLE_WARN("Permanently %sactivating %s\n",
4424                               flag ? "de" : "re", mti->mti_svname);
4425                 /* Modify clilov */
4426                 rc = name_create(&logname, mti->mti_fsname, "-client");
4427                 if (rc < 0)
4428                         GOTO(end, rc);
4429                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4430                                 mti->mti_svname,
4431                                 deactive_osc ? "add osc" : "add mdc", flag);
4432                 name_destroy(&logname);
4433                 if (rc < 0)
4434                         goto active_err;
4435
4436                 /* Modify mdtlov */
4437                 /* Add to all MDT logs for DNE */
4438                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4439                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4440                                 continue;
4441                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
4442                         if (rc < 0)
4443                                 GOTO(end, rc);
4444                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
4445                                         mti->mti_svname,
4446                                         deactive_osc ? "add osc" : "add osp",
4447                                         flag);
4448                         name_destroy(&logname);
4449                         if (rc < 0)
4450                                 goto active_err;
4451                 }
4452 active_err:
4453                 if (rc < 0) {
4454                         LCONSOLE_ERROR_MSG(0x145,
4455                                            "Couldn't find %s in log (%d). No permanent changes were made to the config log.\n",
4456                                            mti->mti_svname, rc);
4457                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
4458                                 LCONSOLE_ERROR_MSG(0x146,
4459                                                    "This may be because the log is in the old 1.4 style. Consider --writeconf to update the logs.\n");
4460                         GOTO(end, rc);
4461                 }
4462                 /* Fall through to osc/mdc proc for deactivating live
4463                  * OSC/OSP on running MDT / clients.
4464                  */
4465         }
4466         /* Below here, let obd's XXX_process_config methods handle it */
4467
4468         /* All lov. in proc */
4469         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
4470                 char *mdtlovname;
4471
4472                 CDEBUG(D_MGS, "lov param %s\n", ptr);
4473                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
4474                         LCONSOLE_ERROR_MSG(0x147,
4475                                            "LOV params must be set on the MDT, not %s. Ignoring.\n",
4476                                            mti->mti_svname);
4477                         GOTO(end, rc = 0);
4478                 }
4479
4480                 /* Modify mdtlov */
4481                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4482                         GOTO(end, rc = -ENODEV);
4483
4484                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
4485                                              mti->mti_stripe_index);
4486                 if (rc)
4487                         GOTO(end, rc);
4488                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4489                                   &mgi->mgi_bufs, mdtlovname, ptr);
4490                 name_destroy(&logname);
4491                 name_destroy(&mdtlovname);
4492                 if (rc)
4493                         GOTO(end, rc);
4494
4495                 /* Modify clilov */
4496                 rc = name_create(&logname, mti->mti_fsname, "-client");
4497                 if (rc)
4498                         GOTO(end, rc);
4499                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4500                                   fsdb->fsdb_clilov, ptr);
4501                 name_destroy(&logname);
4502                 GOTO(end, rc);
4503         }
4504
4505         /* All osc., mdc., llite. params in proc */
4506         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
4507             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
4508             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
4509                 char *cname;
4510
4511                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
4512                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
4513                                            " cannot be modified. Consider"
4514                                            " updating the configuration with"
4515                                            " --writeconf\n",
4516                                            mti->mti_svname);
4517                         GOTO(end, rc = -EINVAL);
4518                 }
4519                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
4520                         rc = name_create(&cname, mti->mti_fsname, "-client");
4521                         /* Add the client type to match the obdname in
4522                          * class_config_llog_handler
4523                          */
4524                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4525                         rc = name_create(&cname, mti->mti_svname, "-mdc");
4526                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4527                         rc = name_create(&cname, mti->mti_svname, "-osc");
4528                 } else {
4529                         GOTO(end, rc = -EINVAL);
4530                 }
4531                 if (rc)
4532                         GOTO(end, rc);
4533
4534                 /* Forbid direct update of llite root squash parameters.
4535                  * These parameters are indirectly set via the MDT settings.
4536                  * See (LU-1778) */
4537                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
4538                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4539                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4540                         LCONSOLE_ERROR("%s: root squash parameters can only "
4541                                 "be updated through MDT component\n",
4542                                 mti->mti_fsname);
4543                         name_destroy(&cname);
4544                         GOTO(end, rc = -EINVAL);
4545                 }
4546
4547                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4548
4549                 /* Modify client */
4550                 rc = name_create(&logname, mti->mti_fsname, "-client");
4551                 if (rc) {
4552                         name_destroy(&cname);
4553                         GOTO(end, rc);
4554                 }
4555                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4556                                   cname, ptr);
4557
4558                 /* osc params affect the MDT as well */
4559                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
4560                         int i;
4561
4562                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4563                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4564                                         continue;
4565                                 name_destroy(&cname);
4566                                 rc = name_create_osp(&cname, NULL,
4567                                                      mti->mti_svname, i);
4568                                 name_destroy(&logname);
4569                                 if (rc)
4570                                         break;
4571                                 rc = name_create_mdt(&logname,
4572                                                      mti->mti_fsname, i);
4573                                 if (rc)
4574                                         break;
4575                                 if (!mgs_log_is_empty(env, mgs, logname)) {
4576                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
4577                                                           mti, logname,
4578                                                           &mgi->mgi_bufs,
4579                                                           cname, ptr);
4580                                         if (rc)
4581                                                 break;
4582                                 }
4583                         }
4584                 }
4585
4586                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
4587                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
4588                     rc == 0) {
4589                         char suffix[16];
4590                         char *lodname = NULL;
4591                         char *param_str = NULL;
4592                         int i;
4593                         int index;
4594
4595                         /* replace mdc with osp */
4596                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
4597                         rc = server_name2index(mti->mti_svname, &index, NULL);
4598                         if (rc < 0) {
4599                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4600                                 GOTO(end, rc);
4601                         }
4602
4603                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4604                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4605                                         continue;
4606
4607                                 if (i == index)
4608                                         continue;
4609
4610                                 name_destroy(&logname);
4611                                 rc = name_create_mdt(&logname, mti->mti_fsname,
4612                                                      i);
4613                                 if (rc < 0)
4614                                         break;
4615
4616                                 if (mgs_log_is_empty(env, mgs, logname))
4617                                         continue;
4618
4619                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
4620                                          i);
4621                                 name_destroy(&cname);
4622                                 rc = name_create(&cname, mti->mti_svname,
4623                                                  suffix);
4624                                 if (rc < 0)
4625                                         break;
4626
4627                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4628                                                   &mgi->mgi_bufs, cname, ptr);
4629                                 if (rc < 0)
4630                                         break;
4631
4632                                 /* Add configuration log for noitfying LOD
4633                                  * to active/deactive the OSP. */
4634                                 name_destroy(&param_str);
4635                                 rc = name_create(&param_str, cname,
4636                                                  (*tmp == '0') ?  ".active=0" :
4637                                                  ".active=1");
4638                                 if (rc < 0)
4639                                         break;
4640
4641                                 name_destroy(&lodname);
4642                                 rc = name_create(&lodname, logname, "-mdtlov");
4643                                 if (rc < 0)
4644                                         break;
4645
4646                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4647                                                   &mgi->mgi_bufs, lodname,
4648                                                   param_str);
4649                                 if (rc < 0)
4650                                         break;
4651                         }
4652                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4653                         name_destroy(&lodname);
4654                         name_destroy(&param_str);
4655                 }
4656
4657                 name_destroy(&logname);
4658                 name_destroy(&cname);
4659                 GOTO(end, rc);
4660         }
4661
4662         /* All mdt. params in proc */
4663         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
4664                 int i;
4665                 __u32 idx;
4666
4667                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4668                 if (strncmp(mti->mti_svname, mti->mti_fsname,
4669                             MTI_NAME_MAXLEN) == 0)
4670                         /* device is unspecified completely? */
4671                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
4672                 else
4673                         rc = server_name2index(mti->mti_svname, &idx, NULL);
4674                 if (rc < 0)
4675                         goto active_err;
4676                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
4677                         goto active_err;
4678                 if (rc & LDD_F_SV_ALL) {
4679                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4680                                 if (!test_bit(i,
4681                                               fsdb->fsdb_mdt_index_map))
4682                                         continue;
4683                                 rc = name_create_mdt(&logname,
4684                                                      mti->mti_fsname, i);
4685                                 if (rc)
4686                                         goto active_err;
4687                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4688                                                   logname, &mgi->mgi_bufs,
4689                                                   logname, ptr);
4690                                 name_destroy(&logname);
4691                                 if (rc)
4692                                         goto active_err;
4693                         }
4694                 } else {
4695                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
4696                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
4697                                 LCONSOLE_ERROR("%s: root squash parameters "
4698                                         "cannot be applied to a single MDT\n",
4699                                         mti->mti_fsname);
4700                                 GOTO(end, rc = -EINVAL);
4701                         }
4702                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4703                                           mti->mti_svname, &mgi->mgi_bufs,
4704                                           mti->mti_svname, ptr);
4705                         if (rc)
4706                                 goto active_err;
4707                 }
4708
4709                 /* root squash settings are also applied to llite
4710                  * config log (see LU-1778) */
4711                 if (rc == 0 &&
4712                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4713                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4714                         char *cname;
4715                         char *ptr2;
4716
4717                         rc = name_create(&cname, mti->mti_fsname, "-client");
4718                         if (rc)
4719                                 GOTO(end, rc);
4720                         rc = name_create(&logname, mti->mti_fsname, "-client");
4721                         if (rc) {
4722                                 name_destroy(&cname);
4723                                 GOTO(end, rc);
4724                         }
4725                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
4726                         if (rc) {
4727                                 name_destroy(&cname);
4728                                 name_destroy(&logname);
4729                                 GOTO(end, rc);
4730                         }
4731                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4732                                           &mgi->mgi_bufs, cname, ptr2);
4733                         name_destroy(&ptr2);
4734                         name_destroy(&logname);
4735                         name_destroy(&cname);
4736                 }
4737                 GOTO(end, rc);
4738         }
4739
4740         /* All mdd., ost. and osd. params in proc */
4741         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
4742             (class_match_param(ptr, PARAM_LOD, NULL) == 0) ||
4743             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
4744             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
4745                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4746                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4747                         GOTO(end, rc = -ENODEV);
4748
4749                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4750                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
4751                 GOTO(end, rc);
4752         }
4753
4754         /* For handling degraded zfs OST */
4755         if (class_match_param(ptr, PARAM_AUTODEGRADE, NULL) == 0)
4756                 GOTO(end, rc);
4757
4758         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
4759
4760 end:
4761         if (rc)
4762                 CERROR("err %d on param '%s'\n", rc, ptr);
4763
4764         RETURN(rc);
4765 }
4766
4767 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
4768                          struct mgs_target_info *mti, struct fs_db *fsdb)
4769 {
4770         char    *buf, *params;
4771         int      rc = -EINVAL;
4772
4773         ENTRY;
4774
4775         /* set/check the new target index */
4776         rc = mgs_set_index(env, mgs, mti);
4777         if (rc < 0)
4778                 RETURN(rc);
4779
4780         if (rc == EALREADY) {
4781                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
4782                               mti->mti_stripe_index, mti->mti_svname);
4783                 /* We would like to mark old log sections as invalid
4784                    and add new log sections in the client and mdt logs.
4785                    But if we add new sections, then live clients will
4786                    get repeat setup instructions for already running
4787                    osc's. So don't update the client/mdt logs. */
4788                 mti->mti_flags &= ~LDD_F_UPDATE;
4789                 rc = 0;
4790         }
4791
4792         CFS_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
4793                          cfs_fail_val : 10);
4794
4795         mutex_lock(&fsdb->fsdb_mutex);
4796
4797         if (mti->mti_flags & (LDD_F_VIRGIN | LDD_F_WRITECONF)) {
4798                 /* Generate a log from scratch */
4799                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4800                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
4801                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4802                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
4803                 } else {
4804                         CERROR("Unknown target type %#x, can't create log for %s\n",
4805                                mti->mti_flags, mti->mti_svname);
4806                 }
4807                 if (rc) {
4808                         CERROR("Can't write logs for %s (%d)\n",
4809                                mti->mti_svname, rc);
4810                         GOTO(out_up, rc);
4811                 }
4812         } else {
4813                 /* Just update the params from tunefs in mgs_write_log_params */
4814                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
4815                 mti->mti_flags |= LDD_F_PARAM;
4816         }
4817
4818         /* allocate temporary buffer, where class_get_next_param will
4819          * make copy of a current  parameter
4820          */
4821         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
4822         if (buf == NULL)
4823                 GOTO(out_up, rc = -ENOMEM);
4824         params = mti->mti_params;
4825         while (params != NULL) {
4826                 rc = class_get_next_param(&params, buf);
4827                 if (rc) {
4828                         if (rc == 1)
4829                                 /* there is no next parameter, that is
4830                                  * not an error
4831                                  */
4832                                 rc = 0;
4833                         break;
4834                 }
4835                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
4836                        params, buf);
4837                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
4838                 if (rc)
4839                         break;
4840         }
4841
4842         OBD_FREE(buf, strlen(mti->mti_params) + 1);
4843
4844 out_up:
4845         mutex_unlock(&fsdb->fsdb_mutex);
4846         RETURN(rc);
4847 }
4848
4849 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
4850 {
4851         struct llog_ctxt        *ctxt;
4852         int                      rc = 0;
4853
4854         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4855         if (ctxt == NULL) {
4856                 CERROR("%s: MGS config context doesn't exist\n",
4857                        mgs->mgs_obd->obd_name);
4858                 rc = -ENODEV;
4859         } else {
4860                 rc = llog_erase(env, ctxt, NULL, name);
4861                 /* llog may not exist */
4862                 if (rc == -ENOENT)
4863                         rc = 0;
4864                 llog_ctxt_put(ctxt);
4865         }
4866
4867         if (rc)
4868                 CERROR("%s: failed to clear log %s: %d\n",
4869                        mgs->mgs_obd->obd_name, name, rc);
4870
4871         return rc;
4872 }
4873
4874 /* erase all logs for the given fs */
4875 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs,
4876                    const char *fsname)
4877 {
4878         struct list_head log_list;
4879         struct mgs_direntry *dirent, *n;
4880         char barrier_name[20] = {};
4881         char *suffix;
4882         int count = 0;
4883         int rc, len = strlen(fsname);
4884         ENTRY;
4885
4886         mutex_lock(&mgs->mgs_mutex);
4887
4888         /* Find all the logs in the CONFIGS directory */
4889         rc = class_dentry_readdir(env, mgs, &log_list);
4890         if (rc) {
4891                 mutex_unlock(&mgs->mgs_mutex);
4892                 RETURN(rc);
4893         }
4894
4895         if (list_empty(&log_list)) {
4896                 mutex_unlock(&mgs->mgs_mutex);
4897                 RETURN(-ENOENT);
4898         }
4899
4900         snprintf(barrier_name, sizeof(barrier_name) - 1, "%s-%s",
4901                  fsname, BARRIER_FILENAME);
4902         /* Delete the barrier fsdb */
4903         mgs_remove_fsdb_by_name(mgs, barrier_name);
4904         /* Delete the fs db */
4905         mgs_remove_fsdb_by_name(mgs, fsname);
4906         mutex_unlock(&mgs->mgs_mutex);
4907
4908         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4909                 list_del_init(&dirent->mde_list);
4910                 suffix = strrchr(dirent->mde_name, '-');
4911                 if (suffix != NULL) {
4912                         if ((len == suffix - dirent->mde_name) &&
4913                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
4914                                 CDEBUG(D_MGS, "Removing log %s\n",
4915                                        dirent->mde_name);
4916                                 mgs_erase_log(env, mgs, dirent->mde_name);
4917                                 count++;
4918                         }
4919                 }
4920                 mgs_direntry_free(dirent);
4921         }
4922
4923         if (count == 0)
4924                 rc = -ENOENT;
4925
4926         RETURN(rc);
4927 }
4928
4929 /* list all logs for the given fs */
4930 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
4931                   struct obd_ioctl_data *data)
4932 {
4933         struct list_head         log_list;
4934         struct mgs_direntry     *dirent, *n;
4935         char                    *out, *suffix, prefix[] = "config_log: ";
4936         int                      prefix_len = strlen(prefix);
4937         int                      len, remains, start = 0, rc;
4938
4939         ENTRY;
4940
4941         /* Find all the logs in the CONFIGS directory */
4942         rc = class_dentry_readdir(env, mgs, &log_list);
4943         if (rc)
4944                 RETURN(rc);
4945
4946         out = data->ioc_bulk;
4947         remains = data->ioc_inllen1;
4948         /* OBD_FAIL: fetch the config_log records from the specified one */
4949         if (CFS_FAIL_CHECK(OBD_FAIL_CATLIST))
4950                 data->ioc_count = cfs_fail_val;
4951
4952         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4953                 list_del_init(&dirent->mde_list);
4954                 suffix = strrchr(dirent->mde_name, '-');
4955                 if (suffix != NULL) {
4956                         len = prefix_len + dirent->mde_len + 1;
4957                         if (remains - len < 0) {
4958                                 /* No enough space for this record */
4959                                 mgs_direntry_free(dirent);
4960                                 goto out;
4961                         }
4962                         start++;
4963                         if (start < data->ioc_count) {
4964                                 mgs_direntry_free(dirent);
4965                                 continue;
4966                         }
4967                         len = scnprintf(out, remains, "%s%s\n", prefix,
4968                                         dirent->mde_name);
4969                         out += len;
4970                         remains -= len;
4971                 }
4972                 mgs_direntry_free(dirent);
4973                 if (remains <= 1)
4974                         /* Full */
4975                         goto out;
4976         }
4977         /* Finished */
4978         start = 0;
4979 out:
4980         data->ioc_count = start;
4981         RETURN(rc);
4982 }
4983
4984 struct mgs_lcfg_fork_data {
4985         struct lustre_cfg_bufs   mlfd_bufs;
4986         struct mgs_device       *mlfd_mgs;
4987         struct llog_handle      *mlfd_llh;
4988         const char              *mlfd_oldname;
4989         const char              *mlfd_newname;
4990         char                     mlfd_data[0];
4991 };
4992
4993 static bool contain_valid_fsname(char *buf, const char *fsname,
4994                                  int buflen, int namelen)
4995 {
4996         if (buflen < namelen)
4997                 return false;
4998
4999         if (memcmp(buf, fsname, namelen) != 0)
5000                 return false;
5001
5002         if (buf[namelen] != '\0' && buf[namelen] != '-')
5003                 return false;
5004
5005         return true;
5006 }
5007
5008 static int mgs_lcfg_fork_handler(const struct lu_env *env,
5009                                  struct llog_handle *o_llh,
5010                                  struct llog_rec_hdr *o_rec, void *data)
5011 {
5012         struct mgs_lcfg_fork_data *mlfd = data;
5013         struct lustre_cfg_bufs *n_bufs = &mlfd->mlfd_bufs;
5014         struct lustre_cfg *o_lcfg = (struct lustre_cfg *)(o_rec + 1);
5015         struct llog_cfg_rec *lcr;
5016         char *o_buf;
5017         char *n_buf = mlfd->mlfd_data;
5018         int o_buflen;
5019         int o_namelen = strlen(mlfd->mlfd_oldname);
5020         int n_namelen = strlen(mlfd->mlfd_newname);
5021         int diff = n_namelen - o_namelen;
5022         __u32 cmd = o_lcfg->lcfg_command;
5023         __u32 cnt = o_lcfg->lcfg_bufcount;
5024         int rc;
5025         int i;
5026         ENTRY;
5027
5028         /* buf[0] */
5029         o_buf = lustre_cfg_buf(o_lcfg, 0);
5030         o_buflen = o_lcfg->lcfg_buflens[0];
5031         if (contain_valid_fsname(o_buf, mlfd->mlfd_oldname, o_buflen,
5032                                  o_namelen)) {
5033                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
5034                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
5035                        o_buflen - o_namelen);
5036                 lustre_cfg_bufs_reset(n_bufs, n_buf);
5037                 n_buf += round_up(o_buflen + diff, 8);
5038         } else {
5039                 lustre_cfg_bufs_reset(n_bufs, o_buflen != 0 ? o_buf : NULL);
5040         }
5041
5042         switch (cmd) {
5043         case LCFG_MARKER: {
5044                 struct cfg_marker *o_marker;
5045                 struct cfg_marker *n_marker;
5046                 int tgt_namelen;
5047
5048                 if (cnt != 2) {
5049                         CDEBUG(D_MGS, "Unknown cfg marker entry with %d "
5050                                "buffers\n", cnt);
5051                         RETURN(-EINVAL);
5052                 }
5053
5054                 /* buf[1] is marker */
5055                 o_buf = lustre_cfg_buf(o_lcfg, 1);
5056                 o_buflen = o_lcfg->lcfg_buflens[1];
5057                 o_marker = (struct cfg_marker *)o_buf;
5058                 if (!contain_valid_fsname(o_marker->cm_tgtname,
5059                                           mlfd->mlfd_oldname,
5060                                           sizeof(o_marker->cm_tgtname),
5061                                           o_namelen)) {
5062                         lustre_cfg_bufs_set(n_bufs, 1, o_marker,
5063                                             sizeof(*o_marker));
5064                         break;
5065                 }
5066
5067                 n_marker = (struct cfg_marker *)n_buf;
5068                 *n_marker = *o_marker;
5069                 memcpy(n_marker->cm_tgtname, mlfd->mlfd_newname, n_namelen);
5070                 tgt_namelen = strlen(o_marker->cm_tgtname);
5071                 if (tgt_namelen > o_namelen)
5072                         memcpy(n_marker->cm_tgtname + n_namelen,
5073                                o_marker->cm_tgtname + o_namelen,
5074                                tgt_namelen - o_namelen);
5075                 n_marker->cm_tgtname[tgt_namelen + diff] = '\0';
5076                 lustre_cfg_bufs_set(n_bufs, 1, n_marker, sizeof(*n_marker));
5077                 break;
5078         }
5079         case LCFG_PARAM:
5080         case LCFG_SET_PARAM: {
5081                 for (i = 1; i < cnt; i++)
5082                         /* buf[i] is the param value, reuse it directly */
5083                         lustre_cfg_bufs_set(n_bufs, i,
5084                                             lustre_cfg_buf(o_lcfg, i),
5085                                             o_lcfg->lcfg_buflens[i]);
5086                 break;
5087         }
5088         case LCFG_POOL_NEW:
5089         case LCFG_POOL_ADD:
5090         case LCFG_POOL_REM:
5091         case LCFG_POOL_DEL: {
5092                 if (cnt < 3 || cnt > 4) {
5093                         CDEBUG(D_MGS, "Unknown cfg pool (%x) entry with %d "
5094                                "buffers\n", cmd, cnt);
5095                         RETURN(-EINVAL);
5096                 }
5097
5098                 /* buf[1] is fsname */
5099                 o_buf = lustre_cfg_buf(o_lcfg, 1);
5100                 o_buflen = o_lcfg->lcfg_buflens[1];
5101                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
5102                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
5103                        o_buflen - o_namelen);
5104                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen + diff);
5105                 n_buf += round_up(o_buflen + diff, 8);
5106
5107                 /* buf[2] is the pool name, reuse it directly */
5108                 lustre_cfg_bufs_set(n_bufs, 2, lustre_cfg_buf(o_lcfg, 2),
5109                                     o_lcfg->lcfg_buflens[2]);
5110
5111                 if (cnt == 3)
5112                         break;
5113
5114                 /* buf[3] is ostname */
5115                 o_buf = lustre_cfg_buf(o_lcfg, 3);
5116                 o_buflen = o_lcfg->lcfg_buflens[3];
5117                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
5118                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
5119                        o_buflen - o_namelen);
5120                 lustre_cfg_bufs_set(n_bufs, 3, n_buf, o_buflen + diff);
5121                 break;
5122         }
5123         case LCFG_SETUP: {
5124                 if (cnt == 2) {
5125                         o_buflen = o_lcfg->lcfg_buflens[1];
5126                         if (o_buflen == sizeof(struct lov_desc) ||
5127                             o_buflen == sizeof(struct lmv_desc)) {
5128                                 char *o_uuid;
5129                                 char *n_uuid;
5130                                 int uuid_len;
5131
5132                                 /* buf[1] */
5133                                 o_buf = lustre_cfg_buf(o_lcfg, 1);
5134                                 if (o_buflen == sizeof(struct lov_desc)) {
5135                                         struct lov_desc *o_desc =
5136                                                 (struct lov_desc *)o_buf;
5137                                         struct lov_desc *n_desc =
5138                                                 (struct lov_desc *)n_buf;
5139
5140                                         *n_desc = *o_desc;
5141                                         o_uuid = o_desc->ld_uuid.uuid;
5142                                         n_uuid = n_desc->ld_uuid.uuid;
5143                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
5144                                 } else {
5145                                         struct lmv_desc *o_desc =
5146                                                 (struct lmv_desc *)o_buf;
5147                                         struct lmv_desc *n_desc =
5148                                                 (struct lmv_desc *)n_buf;
5149
5150                                         *n_desc = *o_desc;
5151                                         o_uuid = o_desc->ld_uuid.uuid;
5152                                         n_uuid = n_desc->ld_uuid.uuid;
5153                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
5154                                 }
5155
5156                                 if (unlikely(!contain_valid_fsname(o_uuid,
5157                                                 mlfd->mlfd_oldname, uuid_len,
5158                                                 o_namelen))) {
5159                                         lustre_cfg_bufs_set(n_bufs, 1, o_buf,
5160                                                             o_buflen);
5161                                         break;
5162                                 }
5163
5164                                 memcpy(n_uuid, mlfd->mlfd_newname, n_namelen);
5165                                 uuid_len = strlen(o_uuid);
5166                                 if (uuid_len > o_namelen)
5167                                         memcpy(n_uuid + n_namelen,
5168                                                o_uuid + o_namelen,
5169                                                uuid_len - o_namelen);
5170                                 n_uuid[uuid_len + diff] = '\0';
5171                                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen);
5172                                 break;
5173                         } /* else case fall through */
5174                 } /* else case fall through */
5175         }
5176         fallthrough;
5177         default: {
5178                 for (i = 1; i < cnt; i++) {
5179                         o_buflen = o_lcfg->lcfg_buflens[i];
5180                         if (o_buflen == 0)
5181                                 continue;
5182
5183                         o_buf = lustre_cfg_buf(o_lcfg, i);
5184                         if (!contain_valid_fsname(o_buf, mlfd->mlfd_oldname,
5185                                                   o_buflen, o_namelen)) {
5186                                 lustre_cfg_bufs_set(n_bufs, i, o_buf, o_buflen);
5187                                 continue;
5188                         }
5189
5190                         memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
5191                         if (o_buflen == o_namelen) {
5192                                 lustre_cfg_bufs_set(n_bufs, i, n_buf,
5193                                                     n_namelen);
5194                                 n_buf += round_up(n_namelen, 8);
5195                                 continue;
5196                         }
5197
5198                         memcpy(n_buf + n_namelen, o_buf + o_namelen,
5199                                o_buflen - o_namelen);
5200                         lustre_cfg_bufs_set(n_bufs, i, n_buf, o_buflen + diff);
5201                         n_buf += round_up(o_buflen + diff, 8);
5202                 }
5203                 break;
5204         }
5205         }
5206
5207         lcr = lustre_cfg_rec_new(cmd, n_bufs);
5208         if (!lcr)
5209                 RETURN(-ENOMEM);
5210
5211         lcr->lcr_cfg = *o_lcfg;
5212         rc = llog_write(env, mlfd->mlfd_llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
5213         lustre_cfg_rec_free(lcr);
5214
5215         RETURN(rc);
5216 }
5217
5218 static int mgs_lcfg_fork_one(const struct lu_env *env, struct mgs_device *mgs,
5219                              struct mgs_direntry *mde, const char *oldname,
5220                              const char *newname)
5221 {
5222         struct llog_handle *old_llh = NULL;
5223         struct llog_handle *new_llh = NULL;
5224         struct llog_ctxt *ctxt = NULL;
5225         struct mgs_lcfg_fork_data *mlfd = NULL;
5226         char *name_buf = NULL;
5227         int name_buflen;
5228         int old_namelen = strlen(oldname);
5229         int new_namelen = strlen(newname);
5230         int rc;
5231         ENTRY;
5232
5233         name_buflen = mde->mde_len + new_namelen - old_namelen;
5234         OBD_ALLOC(name_buf, name_buflen);
5235         if (!name_buf)
5236                 RETURN(-ENOMEM);
5237
5238         memcpy(name_buf, newname, new_namelen);
5239         memcpy(name_buf + new_namelen, mde->mde_name + old_namelen,
5240                mde->mde_len - old_namelen);
5241
5242         CDEBUG(D_MGS, "Fork the config-log from %s to %s\n",
5243                mde->mde_name, name_buf);
5244
5245         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
5246         LASSERT(ctxt);
5247
5248         rc = llog_open_create(env, ctxt, &new_llh, NULL, name_buf);
5249         if (rc)
5250                 GOTO(out, rc);
5251
5252         rc = llog_init_handle(env, new_llh, LLOG_F_IS_PLAIN, NULL);
5253         if (rc)
5254                 GOTO(out, rc);
5255
5256         if (unlikely(mgs_log_is_empty(env, mgs, mde->mde_name)))
5257                 GOTO(out, rc = 0);
5258
5259         rc = llog_open(env, ctxt, &old_llh, NULL, mde->mde_name,
5260                        LLOG_OPEN_EXISTS);
5261         if (rc)
5262                 GOTO(out, rc);
5263
5264         rc = llog_init_handle(env, old_llh, LLOG_F_IS_PLAIN, NULL);
5265         if (rc)
5266                 GOTO(out, rc);
5267
5268         new_llh->lgh_hdr->llh_tgtuuid = old_llh->lgh_hdr->llh_tgtuuid;
5269
5270         OBD_ALLOC(mlfd, LLOG_MIN_CHUNK_SIZE);
5271         if (!mlfd)
5272                 GOTO(out, rc = -ENOMEM);
5273
5274         mlfd->mlfd_mgs = mgs;
5275         mlfd->mlfd_llh = new_llh;
5276         mlfd->mlfd_oldname = oldname;
5277         mlfd->mlfd_newname = newname;
5278
5279         rc = llog_process(env, old_llh, mgs_lcfg_fork_handler, mlfd, NULL);
5280         OBD_FREE(mlfd, LLOG_MIN_CHUNK_SIZE);
5281
5282         GOTO(out, rc);
5283
5284 out:
5285         if (old_llh)
5286                 llog_close(env, old_llh);
5287         if (new_llh)
5288                 llog_close(env, new_llh);
5289         if (name_buf)
5290                 OBD_FREE(name_buf, name_buflen);
5291         if (ctxt)
5292                 llog_ctxt_put(ctxt);
5293
5294         return rc;
5295 }
5296
5297 int mgs_lcfg_fork(const struct lu_env *env, struct mgs_device *mgs,
5298                   const char *oldname, const char *newname)
5299 {
5300         struct list_head log_list;
5301         struct mgs_direntry *dirent, *n;
5302         int olen = strlen(oldname);
5303         int nlen = strlen(newname);
5304         int count = 0;
5305         int rc = 0;
5306         ENTRY;
5307
5308         if (unlikely(!oldname || oldname[0] == '\0' ||
5309                      !newname || newname[0] == '\0'))
5310                 RETURN(-EINVAL);
5311
5312         if (strcmp(oldname, newname) == 0)
5313                 RETURN(-EINVAL);
5314
5315         /* lock it to prevent fork/erase/register in parallel. */
5316         mutex_lock(&mgs->mgs_mutex);
5317
5318         rc = class_dentry_readdir(env, mgs, &log_list);
5319         if (rc) {
5320                 mutex_unlock(&mgs->mgs_mutex);
5321                 RETURN(rc);
5322         }
5323
5324         if (list_empty(&log_list)) {
5325                 mutex_unlock(&mgs->mgs_mutex);
5326                 RETURN(-ENOENT);
5327         }
5328
5329         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
5330                 char *ptr;
5331
5332                 ptr = strrchr(dirent->mde_name, '-');
5333                 if (ptr) {
5334                         int tlen = ptr - dirent->mde_name;
5335
5336                         if (tlen == nlen &&
5337                             strncmp(newname, dirent->mde_name, tlen) == 0)
5338                                 GOTO(out, rc = -EEXIST);
5339
5340                         if (tlen == olen &&
5341                             strncmp(oldname, dirent->mde_name, tlen) == 0)
5342                                 continue;
5343                 }
5344
5345                 list_del_init(&dirent->mde_list);
5346                 mgs_direntry_free(dirent);
5347         }
5348
5349         if (list_empty(&log_list)) {
5350                 mutex_unlock(&mgs->mgs_mutex);
5351                 RETURN(-ENOENT);
5352         }
5353
5354         list_for_each_entry(dirent, &log_list, mde_list) {
5355                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, newname);
5356                 if (rc)
5357                         break;
5358
5359                 count++;
5360         }
5361
5362 out:
5363         mutex_unlock(&mgs->mgs_mutex);
5364
5365         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
5366                 list_del_init(&dirent->mde_list);
5367                 mgs_direntry_free(dirent);
5368         }
5369
5370         if (rc && count > 0)
5371                 mgs_erase_logs(env, mgs, newname);
5372
5373         RETURN(rc);
5374 }
5375
5376 int mgs_lcfg_erase(const struct lu_env *env, struct mgs_device *mgs,
5377                    const char *fsname)
5378 {
5379         int rc;
5380         ENTRY;
5381
5382         if (unlikely(!fsname || fsname[0] == '\0'))
5383                 RETURN(-EINVAL);
5384
5385         rc = mgs_erase_logs(env, mgs, fsname);
5386
5387         RETURN(rc);
5388 }
5389
5390 static int mgs_xattr_del(const struct lu_env *env, struct dt_object *obj)
5391 {
5392         struct dt_device *dev;
5393         struct thandle *th = NULL;
5394         int rc = 0;
5395
5396         ENTRY;
5397
5398         dev = container_of(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
5399         th = dt_trans_create(env, dev);
5400         if (IS_ERR(th))
5401                 RETURN(PTR_ERR(th));
5402
5403         rc = dt_declare_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
5404         if (rc)
5405                 GOTO(stop, rc);
5406
5407         rc = dt_trans_start_local(env, dev, th);
5408         if (rc)
5409                 GOTO(stop, rc);
5410
5411         dt_write_lock(env, obj, 0);
5412         rc = dt_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
5413
5414         GOTO(unlock, rc);
5415
5416 unlock:
5417         dt_write_unlock(env, obj);
5418
5419 stop:
5420         dt_trans_stop(env, dev, th);
5421
5422         return rc;
5423 }
5424
5425 int mgs_lcfg_rename(const struct lu_env *env, struct mgs_device *mgs)
5426 {
5427         struct list_head log_list;
5428         struct mgs_direntry *dirent, *n;
5429         char fsname[16];
5430         struct lu_buf buf = {
5431                 .lb_buf = fsname,
5432                 .lb_len = sizeof(fsname)
5433         };
5434         int rc = 0;
5435
5436         ENTRY;
5437
5438         rc = class_dentry_readdir(env, mgs, &log_list);
5439         if (rc)
5440                 RETURN(rc);
5441
5442         if (list_empty(&log_list))
5443                 RETURN(0);
5444
5445         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
5446                 struct dt_object *o = NULL;
5447                 char oldname[16];
5448                 char *ptr;
5449                 int len;
5450
5451                 list_del_init(&dirent->mde_list);
5452                 ptr = strrchr(dirent->mde_name, '-');
5453                 if (!ptr)
5454                         goto next;
5455
5456                 len = ptr - dirent->mde_name;
5457                 if (unlikely(len >= sizeof(oldname))) {
5458                         CDEBUG(D_MGS, "Skip invalid configuration file %s\n",
5459                                dirent->mde_name);
5460                         goto next;
5461                 }
5462
5463                 o = local_file_find(env, mgs->mgs_los, mgs->mgs_configs_dir,
5464                                     dirent->mde_name);
5465                 if (IS_ERR(o)) {
5466                         rc = PTR_ERR(o);
5467                         CDEBUG(D_MGS, "Fail to locate file %s: rc = %d\n",
5468                                dirent->mde_name, rc);
5469                         goto next;
5470                 }
5471
5472                 rc = dt_xattr_get(env, o, &buf, XATTR_TARGET_RENAME);
5473                 if (rc < 0) {
5474                         if (rc == -ENODATA)
5475                                 rc = 0;
5476                         else
5477                                 CDEBUG(D_MGS,
5478                                        "Fail to get EA for %s: rc = %d\n",
5479                                        dirent->mde_name, rc);
5480                         goto next;
5481                 }
5482
5483                 if (unlikely(rc == len &&
5484                              memcmp(fsname, dirent->mde_name, len) == 0)) {
5485                         /* The new fsname is the same as the old one. */
5486                         rc = mgs_xattr_del(env, o);
5487                         goto next;
5488                 }
5489
5490                 memcpy(oldname, dirent->mde_name, len);
5491                 oldname[len] = '\0';
5492                 fsname[rc] = '\0';
5493                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, fsname);
5494                 if (rc && rc != -EEXIST) {
5495                         CDEBUG(D_MGS, "Fail to fork %s: rc = %d\n",
5496                                dirent->mde_name, rc);
5497                         goto next;
5498                 }
5499
5500                 rc = mgs_erase_log(env, mgs, dirent->mde_name);
5501                 if (rc) {
5502                         CDEBUG(D_MGS, "Fail to erase old %s: rc = %d\n",
5503                                dirent->mde_name, rc);
5504                         /* keep it there if failed to remove it. */
5505                         rc = 0;
5506                 }
5507
5508 next:
5509                 if (o && !IS_ERR(o))
5510                         lu_object_put(env, &o->do_lu);
5511
5512                 mgs_direntry_free(dirent);
5513                 if (rc)
5514                         break;
5515         }
5516
5517         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
5518                 list_del_init(&dirent->mde_list);
5519                 mgs_direntry_free(dirent);
5520         }
5521
5522         RETURN(rc);
5523 }
5524
5525 /* Setup _mgs fsdb and log
5526  */
5527 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5528 {
5529         struct fs_db *fsdb = NULL;
5530         int rc;
5531         ENTRY;
5532
5533         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
5534         if (!rc)
5535                 mgs_put_fsdb(mgs, fsdb);
5536
5537         RETURN(rc);
5538 }
5539
5540 /* Setup params fsdb and log
5541  */
5542 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5543 {
5544         struct fs_db *fsdb = NULL;
5545         struct llog_handle *params_llh = NULL;
5546         int rc;
5547         ENTRY;
5548
5549         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5550         if (!rc) {
5551                 mutex_lock(&fsdb->fsdb_mutex);
5552                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
5553                 if (!rc)
5554                         rc = record_end_log(env, &params_llh);
5555                 mutex_unlock(&fsdb->fsdb_mutex);
5556                 mgs_put_fsdb(mgs, fsdb);
5557         }
5558
5559         RETURN(rc);
5560 }
5561
5562 /* Cleanup params fsdb and log
5563  */
5564 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
5565 {
5566         int rc;
5567
5568         rc = mgs_erase_logs(env, mgs, PARAMS_FILENAME);
5569         return rc == -ENOENT ? 0 : rc;
5570 }
5571
5572 /**
5573  * Fill in the mgs_target_info based on data devname and param provide.
5574  *
5575  * @env         thread context
5576  * @mgs         mgs device
5577  * @mti         mgs target info. We want to set this based other paramters
5578  *              passed to this function. Once setup we write it to the config
5579  *              logs.
5580  * @devname     optional OBD device name
5581  * @param       string that contains both what tunable to set and the value to
5582  *              set it to.
5583  *
5584  * RETURN       0 for success
5585  *              negative error number on failure
5586  **/
5587 static int mgs_set_conf_param(const struct lu_env *env, struct mgs_device *mgs,
5588                               struct mgs_target_info *mti, const char *devname,
5589                               const char *param)
5590 {
5591         struct fs_db *fsdb = NULL;
5592         int dev_type;
5593         int rc = 0;
5594
5595         ENTRY;
5596         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
5597         if (!devname) {
5598                 size_t len;
5599
5600                 /* We have two possible cases here:
5601                  *
5602                  * 1) the device name embedded in the param:
5603                  *    lustre-OST0000.osc.max_dirty_mb=32
5604                  *
5605                  * 2) the file system name is embedded in
5606                  *    the param: lustre.sys.at.min=0
5607                  */
5608                 len = strcspn(param, ".=");
5609                 if (!len || param[len] == '=')
5610                         RETURN(-EINVAL);
5611
5612                 if (len >= sizeof(mti->mti_svname))
5613                         RETURN(-E2BIG);
5614
5615                 snprintf(mti->mti_svname, sizeof(mti->mti_svname),
5616                          "%.*s", (int)len, param);
5617                 param += len + 1;
5618         } else {
5619                 rc = strscpy(mti->mti_svname, devname, sizeof(mti->mti_svname));
5620                 if (rc < 0)
5621                         RETURN(rc);
5622         }
5623
5624         if (!strlen(mti->mti_svname)) {
5625                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
5626                 RETURN(-ENOSYS);
5627         }
5628
5629         dev_type = mgs_parse_devname(mti->mti_svname, mti->mti_fsname,
5630                                      &mti->mti_stripe_index);
5631         switch (dev_type) {
5632         /* For this case we have an invalid obd device name */
5633         case -ENXIO:
5634                 CDEBUG(D_MGS, "%s don't contain an index\n", mti->mti_svname);
5635                 strscpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5636                 dev_type = 0;
5637                 break;
5638         /* Not an obd device, assume devname is the fsname.
5639          * User might of only provided fsname and not obd device
5640          */
5641         case -EINVAL:
5642                 CDEBUG(D_MGS, "%s is seen as a file system name\n", mti->mti_svname);
5643                 strscpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5644                 dev_type = 0;
5645                 break;
5646         default:
5647                 if (dev_type < 0)
5648                         GOTO(out, rc = dev_type);
5649
5650                 /* param related to llite isn't allowed to set by OST or MDT */
5651                 if (dev_type & LDD_F_SV_TYPE_OST ||
5652                     dev_type & LDD_F_SV_TYPE_MDT) {
5653                         /* param related to llite isn't allowed to set by OST
5654                          * or MDT
5655                          */
5656                         if (!strncmp(param, PARAM_LLITE,
5657                                      sizeof(PARAM_LLITE) - 1))
5658                                 GOTO(out, rc = -EINVAL);
5659
5660                         /* Strip -osc or -mdc suffix from svname */
5661                         if (server_make_name(dev_type, mti->mti_stripe_index,
5662                                              mti->mti_fsname, mti->mti_svname,
5663                                              sizeof(mti->mti_svname)))
5664                                 GOTO(out, rc = -EINVAL);
5665                 }
5666                 break;
5667         }
5668         rc = strscpy(mti->mti_params, param, sizeof(mti->mti_params));
5669         if (rc < 0)
5670                 GOTO(out, rc);
5671
5672         CDEBUG(D_MGS, "set_conf_param fs='%s' device='%s' param='%s'\n",
5673                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5674
5675         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
5676         if (rc)
5677                 GOTO(out, rc);
5678
5679         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
5680             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5681                 CERROR("No filesystem targets for %s. cfg_device from lctl "
5682                        "is '%s'\n", mti->mti_fsname, mti->mti_svname);
5683                 mgs_unlink_fsdb(mgs, fsdb);
5684                 GOTO(out, rc = -EINVAL);
5685         }
5686
5687         /*
5688          * Revoke lock so everyone updates.  Should be alright if
5689          * someone was already reading while we were updating the logs,
5690          * so we don't really need to hold the lock while we're
5691          * writing (above).
5692          */
5693         mti->mti_flags = dev_type | LDD_F_PARAM;
5694         mutex_lock(&fsdb->fsdb_mutex);
5695         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
5696         mutex_unlock(&fsdb->fsdb_mutex);
5697         mgs_revoke_lock(mgs, fsdb, MGS_CFG_T_CONFIG);
5698
5699 out:
5700         if (fsdb)
5701                 mgs_put_fsdb(mgs, fsdb);
5702
5703         RETURN(rc);
5704 }
5705
5706 static int mgs_set_param2(const struct lu_env *env, struct mgs_device *mgs,
5707                           struct mgs_target_info *mti, const char *param)
5708 {
5709         struct fs_db *fsdb = NULL;
5710         int dev_type;
5711         size_t len;
5712         int rc;
5713
5714         rc = strscpy(mti->mti_params, param, sizeof(mti->mti_params));
5715         if (rc < 0)
5716                 GOTO(out, rc);
5717
5718         len = strcspn(param, ".=");
5719         if (len && param[len] != '=') {
5720                 struct list_head *tmp;
5721                 char *ptr;
5722
5723                 param += len + 1;
5724                 ptr = strchr(param, '.');
5725
5726                 len = strlen(param);
5727                 if (ptr)
5728                         len -= strlen(ptr);
5729                 if (len >= sizeof(mti->mti_svname))
5730                         GOTO(out, rc = -E2BIG);
5731
5732                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "%.*s",
5733                         (int)len, param);
5734
5735                 mutex_lock(&mgs->mgs_mutex);
5736                 if (unlikely(list_empty(&mgs->mgs_fs_db_list))) {
5737                         mutex_unlock(&mgs->mgs_mutex);
5738                         GOTO(out, rc = -ENODEV);
5739                 }
5740
5741                 list_for_each(tmp, &mgs->mgs_fs_db_list) {
5742                         fsdb = list_entry(tmp, struct fs_db, fsdb_list);
5743                         if (fsdb->fsdb_has_lproc_entry &&
5744                             strcmp(fsdb->fsdb_name, "params") != 0 &&
5745                             strstr(param, fsdb->fsdb_name)) {
5746                                 snprintf(mti->mti_svname,
5747                                          sizeof(mti->mti_svname), "%s",
5748                                          fsdb->fsdb_name);
5749                                 break;
5750                         }
5751                         fsdb = NULL;
5752                 }
5753
5754                 if (!fsdb) {
5755                         snprintf(mti->mti_svname, sizeof(mti->mti_svname),
5756                                  "general");
5757                 }
5758                 mutex_unlock(&mgs->mgs_mutex);
5759         } else {
5760                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "general");
5761         }
5762
5763         CDEBUG(D_MGS, "set_param2 fs='%s' device='%s' param='%s'\n",
5764                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5765
5766         /* The return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5767          * A returned error tells us we don't have a target obd device.
5768          */
5769         dev_type = server_name2index(mti->mti_svname, &mti->mti_stripe_index,
5770                                      NULL);
5771         if (dev_type < 0)
5772                 dev_type = 0;
5773
5774         /* the return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5775          * Strip -osc or -mdc suffix from svname
5776          */
5777         if ((dev_type & LDD_F_SV_TYPE_OST || dev_type & LDD_F_SV_TYPE_MDT) &&
5778             server_make_name(dev_type, mti->mti_stripe_index,
5779                              mti->mti_fsname, mti->mti_svname,
5780                              sizeof(mti->mti_svname)))
5781                 GOTO(out, rc = -EINVAL);
5782
5783         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5784         if (rc)
5785                 GOTO(out, rc);
5786         /*
5787          * Revoke lock so everyone updates.  Should be alright if
5788          * someone was already reading while we were updating the logs,
5789          * so we don't really need to hold the lock while we're
5790          * writing (above).
5791          */
5792         mti->mti_flags = dev_type | LDD_F_PARAM2;
5793         mutex_lock(&fsdb->fsdb_mutex);
5794         rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
5795         mutex_unlock(&fsdb->fsdb_mutex);
5796         mgs_revoke_lock(mgs, fsdb, MGS_CFG_T_PARAMS);
5797         mgs_put_fsdb(mgs, fsdb);
5798 out:
5799         RETURN(rc);
5800 }
5801
5802 /* Set a permanent (config log) param for a target or fs
5803  *
5804  * @lcfg buf0 may contain the device (testfs-MDT0000) name
5805  *       buf1 contains the single parameter
5806  */
5807 int mgs_set_param(const struct lu_env *env, struct mgs_device *mgs,
5808                   struct lustre_cfg *lcfg)
5809 {
5810         const char *param = lustre_cfg_string(lcfg, 1);
5811         struct mgs_target_info *mti;
5812         int rc;
5813
5814         /* Create a fake mti to hold everything */
5815         OBD_ALLOC_PTR(mti);
5816         if (!mti)
5817                 return -ENOMEM;
5818
5819         print_lustre_cfg(lcfg);
5820
5821         if (lcfg->lcfg_command == LCFG_PARAM) {
5822                 /* For the case of lctl conf_param devname can be
5823                  * lustre, lustre-mdtlov, lustre-client, lustre-MDT0000
5824                  */
5825                 const char *devname = lustre_cfg_string(lcfg, 0);
5826
5827                 rc = mgs_set_conf_param(env, mgs, mti, devname, param);
5828         } else {
5829                 /* In the case of lctl set_param -P lcfg[0] will always
5830                  * be 'general'. At least for now.
5831                  */
5832                 rc = mgs_set_param2(env, mgs, mti, param);
5833         }
5834
5835         OBD_FREE_PTR(mti);
5836
5837         return rc;
5838 }
5839
5840 static int mgs_write_log_pool(const struct lu_env *env,
5841                               struct mgs_device *mgs, char *logname,
5842                               struct fs_db *fsdb, char *tgtname,
5843                               enum lcfg_command_type cmd,
5844                               char *fsname, char *poolname,
5845                               char *ostname, char *comment)
5846 {
5847         struct llog_handle *llh = NULL;
5848         int rc;
5849
5850         rc = record_start_log(env, mgs, &llh, logname);
5851         if (rc)
5852                 return rc;
5853         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
5854         if (rc)
5855                 goto out;
5856         rc = record_base(env, llh, tgtname, 0, cmd,
5857                          fsname, poolname, ostname, NULL);
5858         if (rc)
5859                 goto out;
5860         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
5861 out:
5862         record_end_log(env, &llh);
5863         return rc;
5864 }
5865
5866 static inline
5867 int mgs_pool_check_ostname(struct fs_db *fsdb, char *fsname, char *ostname)
5868 {
5869         char *ptr;
5870         unsigned int index;
5871
5872         /* check if ostname match fsname */
5873         ptr = strrchr(ostname, '-');
5874         if (!ptr || (strncmp(fsname, ostname, ptr - ostname) != 0))
5875                 RETURN(-EINVAL);
5876
5877         ptr++;
5878         if (sscanf(ptr, "OST%04x_UUID", &index) != 1)
5879                 return -EINVAL;
5880         if (index > INDEX_MAP_MAX_VALUE)
5881                 return -ERANGE;
5882         if (!test_bit(index, fsdb->fsdb_ost_index_map))
5883                 return -ENODEV;
5884
5885         return 0;
5886 }
5887
5888 static
5889 int mgs_pool_sanity(const struct lu_env *env, struct mgs_device *mgs,
5890                     struct fs_db *fsdb, struct mgs_target_info *mti,
5891                     char *logname, char *devname, enum lcfg_command_type cmd,
5892                     char *fsname, char *poolname, char *ostname)
5893 {
5894         char *lov = fsdb->fsdb_clilov;
5895         int status;
5896         int rc = 0;
5897
5898         status = mgs_search_pool(env, mgs, fsdb, mti, logname, lov,
5899                                  fsname, poolname, ostname);
5900         if (status < 0)
5901                 return status;
5902
5903         switch (cmd) {
5904         case LCFG_POOL_NEW:
5905                 if (status >= POOL_STATUS_EXIST)
5906                         rc = -EEXIST;
5907                 break;
5908         case LCFG_POOL_ADD:
5909                 if (status == POOL_STATUS_NONE)
5910                         rc = -ENOMEDIUM;
5911                 else if (status == POOL_STATUS_OST_EXIST)
5912                         rc = -EEXIST;
5913                 break;
5914         case LCFG_POOL_REM:
5915                 if (status == POOL_STATUS_NONE)
5916                         rc = -ENOMEDIUM;
5917                 if (status != POOL_STATUS_OST_EXIST)
5918                         rc = -ENOENT;
5919                 break;
5920         case LCFG_POOL_DEL:
5921                 if (status == POOL_STATUS_NONE)
5922                         rc = -ENOENT;
5923                 if (status == POOL_STATUS_OST_EXIST)
5924                         rc = -ENOTEMPTY;
5925                 break;
5926         default:
5927                 rc = -EINVAL;
5928                 break;
5929         }
5930
5931         return rc;
5932 }
5933
5934
5935 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
5936                  enum lcfg_command_type cmd, char *fsname,
5937                  char *poolname, char *ostname)
5938 {
5939         struct fs_db *fsdb;
5940         char *lovname;
5941         char *logname;
5942         char *label = NULL;
5943         char *canceled_label = NULL;
5944         int label_sz;
5945         struct mgs_target_info *mti = NULL;
5946         int rc, i;
5947         ENTRY;
5948
5949         if ((cmd == LCFG_POOL_REM || cmd == LCFG_POOL_ADD) && !ostname)
5950                 RETURN(-EINVAL);
5951         if ((cmd == LCFG_POOL_DEL || cmd == LCFG_POOL_NEW) && ostname)
5952                 ostname = NULL;
5953
5954         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
5955         if (rc) {
5956                 CERROR("Can't get db for %s\n", fsname);
5957                 RETURN(rc);
5958         }
5959         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5960                 CERROR("%s is not defined\n", fsname);
5961                 mgs_unlink_fsdb(mgs, fsdb);
5962                 GOTO(out_fsdb, rc = -EINVAL);
5963         }
5964
5965         label_sz = 10 + strlen(fsname) + strlen(poolname);
5966         if (ostname) {
5967                 rc =  mgs_pool_check_ostname(fsdb, fsname, ostname);
5968                 if (rc)
5969                         GOTO(out_fsdb, rc);
5970                 label_sz += strlen(ostname);
5971         }
5972
5973         OBD_ALLOC(label, label_sz);
5974         if (!label)
5975                 GOTO(out_fsdb, rc = -ENOMEM);
5976
5977         switch (cmd) {
5978         case LCFG_POOL_NEW:
5979                 sprintf(label, "new %s.%s", fsname, poolname);
5980                 break;
5981         case LCFG_POOL_ADD:
5982                 sprintf(label, "add %s.%s.%s", fsname, poolname, ostname);
5983                 break;
5984         case LCFG_POOL_REM:
5985                 OBD_ALLOC(canceled_label, label_sz);
5986                 if (canceled_label == NULL)
5987                         GOTO(out_label, rc = -ENOMEM);
5988                 sprintf(label, "rem %s.%s.%s", fsname, poolname, ostname);
5989                 sprintf(canceled_label, "add %s.%s.%s",
5990                         fsname, poolname, ostname);
5991                 break;
5992         case LCFG_POOL_DEL:
5993                 OBD_ALLOC(canceled_label, label_sz);
5994                 if (canceled_label == NULL)
5995                         GOTO(out_label, rc = -ENOMEM);
5996
5997                 sprintf(label, "del %s.%s", fsname, poolname);
5998                 sprintf(canceled_label, "new %s.%s", fsname, poolname);
5999                 break;
6000         default:
6001                 break;
6002         }
6003
6004         OBD_ALLOC_PTR(mti);
6005         if (!mti)
6006                 GOTO(out_cancel, rc = -ENOMEM);
6007         strscpy(mti->mti_svname, "lov pool", sizeof(mti->mti_svname));
6008
6009         mutex_lock(&fsdb->fsdb_mutex);
6010
6011         rc = name_create(&logname, fsname, "-client");
6012         if (rc)
6013                 GOTO(out_unlock, rc);
6014
6015         rc = mgs_pool_sanity(env, mgs, fsdb, mti, logname, fsdb->fsdb_clilov,
6016                              cmd, fsname, poolname, ostname);
6017         if (rc < 0)
6018                 GOTO(out_logname, rc);
6019
6020         if (canceled_label)
6021                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
6022                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
6023
6024         if (rc < 0)
6025                 GOTO(out_logname, rc);
6026
6027         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
6028                                 cmd, fsname, poolname, ostname, label);
6029         if (rc < 0)
6030                 GOTO(out_logname, rc);
6031
6032         name_destroy(&logname);
6033
6034         /* write pool def to all MDT logs */
6035         for_each_set_bit(i, fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE) {
6036                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb, i);
6037                 if (rc)
6038                         GOTO(out_unlock, rc);
6039
6040                 if (canceled_label)
6041                         rc = mgs_modify(env, mgs, fsdb, mti, logname, lovname,
6042                                         canceled_label, CM_SKIP);
6043
6044                 if (rc < 0)
6045                         GOTO(out_names, rc);
6046
6047                 rc = mgs_write_log_pool(env, mgs, logname, fsdb, lovname, cmd,
6048                                         fsname, poolname, ostname, label);
6049                 if (rc)
6050                         GOTO(out_names, rc);
6051
6052                 name_destroy(&logname);
6053                 name_destroy(&lovname);
6054         }
6055         mutex_unlock(&fsdb->fsdb_mutex);
6056
6057         /* request for update */
6058         mgs_revoke_lock(mgs, fsdb, MGS_CFG_T_CONFIG);
6059
6060         GOTO(out_mti, rc);
6061
6062 out_names:
6063         name_destroy(&lovname);
6064 out_logname:
6065         name_destroy(&logname);
6066 out_unlock:
6067         mutex_unlock(&fsdb->fsdb_mutex);
6068 out_mti:
6069         OBD_FREE_PTR(mti);
6070 out_cancel:
6071         if (canceled_label)
6072                 OBD_FREE(canceled_label, label_sz);
6073 out_label:
6074         OBD_FREE(label, label_sz);
6075 out_fsdb:
6076         mgs_put_fsdb(mgs, fsdb);
6077
6078         return rc;
6079 }