Whamcloud - gitweb
LU-7004 mgs: remove using obdname2fsname() from mgs layer.
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgs/mgs_llog.c
33  *
34  * Lustre Management Server (mgs) config llog creation
35  *
36  * Author: Nathan Rutman <nathan@clusterfs.com>
37  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
38  * Author: Mikhail Pershin <tappro@whamcloud.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MGS
42 #define D_MGS D_CONFIG
43
44 #include <obd.h>
45 #include <uapi/linux/lustre/lustre_ioctl.h>
46 #include <uapi/linux/lustre/lustre_param.h>
47 #include <lustre_sec.h>
48 #include <lustre_quota.h>
49 #include <lustre_sec.h>
50
51 #include "mgs_internal.h"
52
53 /********************** Class functions ********************/
54
55 /**
56  * Find all logs in CONFIG directory and link then into list.
57  *
58  * \param[in] env       pointer to the thread context
59  * \param[in] mgs       pointer to the mgs device
60  * \param[out] log_list the list to hold the found llog name entry
61  *
62  * \retval              0 for success
63  * \retval              negative error number on failure
64  **/
65 int class_dentry_readdir(const struct lu_env *env, struct mgs_device *mgs,
66                          struct list_head *log_list)
67 {
68         struct dt_object *dir = mgs->mgs_configs_dir;
69         const struct dt_it_ops *iops;
70         struct dt_it *it;
71         struct mgs_direntry *de;
72         char *key;
73         int rc, key_sz;
74
75         INIT_LIST_HEAD(log_list);
76
77         LASSERT(dir);
78         LASSERT(dir->do_index_ops);
79
80         iops = &dir->do_index_ops->dio_it;
81         it = iops->init(env, dir, LUDA_64BITHASH);
82         if (IS_ERR(it))
83                 RETURN(PTR_ERR(it));
84
85         rc = iops->load(env, it, 0);
86         if (rc <= 0)
87                 GOTO(fini, rc = 0);
88
89         /* main cycle */
90         do {
91                 key = (void *)iops->key(env, it);
92                 if (IS_ERR(key)) {
93                         CERROR("%s: key failed when listing %s: rc = %d\n",
94                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
95                                (int) PTR_ERR(key));
96                         goto next;
97                 }
98                 key_sz = iops->key_size(env, it);
99                 LASSERT(key_sz > 0);
100
101                 /* filter out "." and ".." entries */
102                 if (key[0] == '.') {
103                         if (key_sz == 1)
104                                 goto next;
105                         if (key_sz == 2 && key[1] == '.')
106                                 goto next;
107                 }
108
109                 /* filter out ".bak" files */
110                 /* sizeof(".bak") - 1 == 3 */
111                 if (key_sz >= 3 &&
112                     !memcmp(".bak", key + key_sz - 3, 3)) {
113                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
114                                key_sz, key);
115                         goto next;
116                 }
117
118                 de = mgs_direntry_alloc(key_sz + 1);
119                 if (de == NULL) {
120                         rc = -ENOMEM;
121                         break;
122                 }
123
124                 memcpy(de->mde_name, key, key_sz);
125                 de->mde_name[key_sz] = 0;
126
127                 list_add(&de->mde_list, log_list);
128
129 next:
130                 rc = iops->next(env, it);
131         } while (rc == 0);
132         if (rc > 0)
133                 rc = 0;
134
135         iops->put(env, it);
136
137 fini:
138         iops->fini(env, it);
139         if (rc) {
140                 struct mgs_direntry *n;
141
142                 CERROR("%s: key failed when listing %s: rc = %d\n",
143                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
144
145                 list_for_each_entry_safe(de, n, log_list, mde_list) {
146                         list_del_init(&de->mde_list);
147                         mgs_direntry_free(de);
148                 }
149         }
150
151         RETURN(rc);
152 }
153
154 /******************** DB functions *********************/
155
156 static inline int name_create(char **newname, char *prefix, char *suffix)
157 {
158         LASSERT(newname);
159         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
160         if (!*newname)
161                 return -ENOMEM;
162         sprintf(*newname, "%s%s", prefix, suffix);
163         return 0;
164 }
165
166 static inline void name_destroy(char **name)
167 {
168         if (*name)
169                 OBD_FREE(*name, strlen(*name) + 1);
170         *name = NULL;
171 }
172
173 struct mgs_fsdb_handler_data
174 {
175         struct fs_db   *fsdb;
176         __u32           ver;
177 };
178
179 /* from the (client) config log, figure out:
180         1. which ost's/mdt's are configured (by index)
181         2. what the last config step is
182         3. COMPAT_18 osc name
183 */
184 /* It might be better to have a separate db file, instead of parsing the info
185    out of the client log.  This is slow and potentially error-prone. */
186 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
187                             struct llog_rec_hdr *rec, void *data)
188 {
189         struct mgs_fsdb_handler_data *d = data;
190         struct fs_db *fsdb = d->fsdb;
191         int cfg_len = rec->lrh_len;
192         char *cfg_buf = (char*) (rec + 1);
193         struct lustre_cfg *lcfg;
194         __u32 index;
195         int rc = 0;
196         ENTRY;
197
198         if (rec->lrh_type != OBD_CFG_REC) {
199                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
200                 RETURN(-EINVAL);
201         }
202
203         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
204         if (rc) {
205                 CERROR("Insane cfg\n");
206                 RETURN(rc);
207         }
208
209         lcfg = (struct lustre_cfg *)cfg_buf;
210
211         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
212                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
213
214         /* Figure out ost indicies */
215         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
216         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
217             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
218                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
219                                        NULL, 10);
220                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
221                        lustre_cfg_string(lcfg, 1), index,
222                        lustre_cfg_string(lcfg, 2));
223                 set_bit(index, fsdb->fsdb_ost_index_map);
224         }
225
226         /* Figure out mdt indicies */
227         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
228         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
229             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
230                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
231                                        &index, NULL);
232                 if (rc != LDD_F_SV_TYPE_MDT) {
233                         CWARN("Unparsable MDC name %s, assuming index 0\n",
234                               lustre_cfg_string(lcfg, 0));
235                         index = 0;
236                 }
237                 rc = 0;
238                 CDEBUG(D_MGS, "MDT index is %u\n", index);
239                 if (!test_bit(index, fsdb->fsdb_mdt_index_map)) {
240                         set_bit(index, fsdb->fsdb_mdt_index_map);
241                         fsdb->fsdb_mdt_count++;
242                 }
243         }
244
245         /**
246          * figure out the old config. fsdb_gen = 0 means old log
247          * It is obsoleted and not supported anymore
248          */
249         if (fsdb->fsdb_gen == 0) {
250                 CERROR("Old config format is not supported\n");
251                 RETURN(-EINVAL);
252         }
253
254         /*
255          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
256          */
257         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
258             lcfg->lcfg_command == LCFG_ATTACH &&
259             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
260                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
261                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
262                         CWARN("MDT using 1.8 OSC name scheme\n");
263                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
264                 }
265         }
266
267         if (lcfg->lcfg_command == LCFG_MARKER) {
268                 struct cfg_marker *marker;
269                 marker = lustre_cfg_buf(lcfg, 1);
270
271                 d->ver = marker->cm_vers;
272
273                 /* Keep track of the latest marker step */
274                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
275         }
276
277         RETURN(rc);
278 }
279
280 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
281 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
282                                   struct mgs_device *mgs,
283                                   struct fs_db *fsdb)
284 {
285         char *logname;
286         struct llog_handle *loghandle;
287         struct llog_ctxt *ctxt;
288         struct mgs_fsdb_handler_data d = {
289                 .fsdb = fsdb,
290         };
291         int rc;
292
293         ENTRY;
294
295         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
296         LASSERT(ctxt != NULL);
297         rc = name_create(&logname, fsdb->fsdb_name, "-client");
298         if (rc)
299                 GOTO(out_put, rc);
300         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
301         if (rc)
302                 GOTO(out_pop, rc);
303
304         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
305         if (rc)
306                 GOTO(out_close, rc);
307
308         if (llog_get_size(loghandle) <= 1)
309                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
310
311         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
312         CDEBUG(D_INFO, "get_db = %d\n", rc);
313 out_close:
314         llog_close(env, loghandle);
315 out_pop:
316         name_destroy(&logname);
317 out_put:
318         llog_ctxt_put(ctxt);
319
320         RETURN(rc);
321 }
322
323 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
324 {
325         struct mgs_tgt_srpc_conf *tgtconf;
326
327         /* free target-specific rules */
328         while (fsdb->fsdb_srpc_tgt) {
329                 tgtconf = fsdb->fsdb_srpc_tgt;
330                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
331
332                 LASSERT(tgtconf->mtsc_tgt);
333
334                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
335                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
336                 OBD_FREE_PTR(tgtconf);
337         }
338
339         /* free general rules */
340         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
341 }
342
343 static void mgs_unlink_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
344 {
345         mutex_lock(&mgs->mgs_mutex);
346         if (likely(!list_empty(&fsdb->fsdb_list))) {
347                 LASSERTF(atomic_read(&fsdb->fsdb_ref) >= 2,
348                          "Invalid ref %d on %s\n",
349                          atomic_read(&fsdb->fsdb_ref),
350                          fsdb->fsdb_name);
351
352                 list_del_init(&fsdb->fsdb_list);
353                 /* Drop the reference on the list.*/
354                 mgs_put_fsdb(mgs, fsdb);
355         }
356         mutex_unlock(&mgs->mgs_mutex);
357 }
358
359 /* The caller must hold mgs->mgs_mutex. */
360 static inline struct fs_db *
361 mgs_find_fsdb_noref(struct mgs_device *mgs, const char *fsname)
362 {
363         struct fs_db *fsdb;
364         struct list_head *tmp;
365
366         list_for_each(tmp, &mgs->mgs_fs_db_list) {
367                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
368                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
369                         return fsdb;
370         }
371
372         return NULL;
373 }
374
375 /* The caller must hold mgs->mgs_mutex. */
376 static void mgs_remove_fsdb_by_name(struct mgs_device *mgs, const char *name)
377 {
378         struct fs_db *fsdb;
379
380         fsdb = mgs_find_fsdb_noref(mgs, name);
381         if (fsdb) {
382                 list_del_init(&fsdb->fsdb_list);
383                 /* Drop the reference on the list.*/
384                 mgs_put_fsdb(mgs, fsdb);
385         }
386 }
387
388 /* The caller must hold mgs->mgs_mutex. */
389 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, const char *fsname)
390 {
391         struct fs_db *fsdb;
392
393         fsdb = mgs_find_fsdb_noref(mgs, fsname);
394         if (fsdb)
395                 atomic_inc(&fsdb->fsdb_ref);
396
397         return fsdb;
398 }
399
400 /* The caller must hold mgs->mgs_mutex. */
401 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
402                                   struct mgs_device *mgs, char *fsname)
403 {
404         struct fs_db *fsdb;
405         int rc;
406         ENTRY;
407
408         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
409                 CERROR("fsname %s is too long\n", fsname);
410
411                 RETURN(ERR_PTR(-EINVAL));
412         }
413
414         OBD_ALLOC_PTR(fsdb);
415         if (!fsdb)
416                 RETURN(ERR_PTR(-ENOMEM));
417
418         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
419         mutex_init(&fsdb->fsdb_mutex);
420         INIT_LIST_HEAD(&fsdb->fsdb_list);
421         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
422         fsdb->fsdb_gen = 1;
423         INIT_LIST_HEAD(&fsdb->fsdb_clients);
424         atomic_set(&fsdb->fsdb_notify_phase, 0);
425         init_waitqueue_head(&fsdb->fsdb_notify_waitq);
426         init_completion(&fsdb->fsdb_notify_comp);
427
428         if (strcmp(fsname, MGSSELF_NAME) == 0) {
429                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
430                 fsdb->fsdb_mgs = mgs;
431                 if (logname_is_barrier(fsname))
432                         goto add;
433         } else {
434                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
435                 if (!fsdb->fsdb_mdt_index_map) {
436                         CERROR("No memory for MDT index maps\n");
437
438                         GOTO(err, rc = -ENOMEM);
439                 }
440
441                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
442                 if (!fsdb->fsdb_ost_index_map) {
443                         CERROR("No memory for OST index maps\n");
444
445                         GOTO(err, rc = -ENOMEM);
446                 }
447
448                 if (logname_is_barrier(fsname))
449                         goto add;
450
451                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
452                 if (rc)
453                         GOTO(err, rc);
454
455                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
456                 if (rc)
457                         GOTO(err, rc);
458
459                 /* initialise data for NID table */
460                 mgs_ir_init_fs(env, mgs, fsdb);
461                 lproc_mgs_add_live(mgs, fsdb);
462         }
463
464         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
465             strcmp(PARAMS_FILENAME, fsname) != 0) {
466                 /* populate the db from the client llog */
467                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
468                 if (rc) {
469                         CERROR("Can't get db from client log %d\n", rc);
470
471                         GOTO(err, rc);
472                 }
473         }
474
475         /* populate srpc rules from params llog */
476         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
477         if (rc) {
478                 CERROR("Can't get db from params log %d\n", rc);
479
480                 GOTO(err, rc);
481         }
482
483 add:
484         /* One ref is for the fsdb on the list.
485          * The other ref is for the caller. */
486         atomic_set(&fsdb->fsdb_ref, 2);
487         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
488
489         RETURN(fsdb);
490
491 err:
492         atomic_set(&fsdb->fsdb_ref, 1);
493         mgs_put_fsdb(mgs, fsdb);
494
495         RETURN(ERR_PTR(rc));
496 }
497
498 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
499 {
500         LASSERT(list_empty(&fsdb->fsdb_list));
501
502         lproc_mgs_del_live(mgs, fsdb);
503
504         /* deinitialize fsr */
505         if (fsdb->fsdb_mgs)
506                 mgs_ir_fini_fs(mgs, fsdb);
507
508         if (fsdb->fsdb_ost_index_map)
509                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
510         if (fsdb->fsdb_mdt_index_map)
511                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
512         name_destroy(&fsdb->fsdb_clilov);
513         name_destroy(&fsdb->fsdb_clilmv);
514         mgs_free_fsdb_srpc(fsdb);
515         OBD_FREE_PTR(fsdb);
516 }
517
518 void mgs_put_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
519 {
520         if (atomic_dec_and_test(&fsdb->fsdb_ref))
521                 mgs_free_fsdb(mgs, fsdb);
522 }
523
524 int mgs_init_fsdb_list(struct mgs_device *mgs)
525 {
526         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
527         return 0;
528 }
529
530 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
531 {
532         struct fs_db *fsdb;
533         struct list_head *tmp, *tmp2;
534
535         mutex_lock(&mgs->mgs_mutex);
536         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
537                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
538                 list_del_init(&fsdb->fsdb_list);
539                 mgs_put_fsdb(mgs, fsdb);
540         }
541         mutex_unlock(&mgs->mgs_mutex);
542         return 0;
543 }
544
545 /* The caller must hold mgs->mgs_mutex. */
546 int mgs_find_or_make_fsdb_nolock(const struct lu_env *env,
547                                 struct mgs_device *mgs,
548                                 char *name, struct fs_db **dbh)
549 {
550         struct fs_db *fsdb;
551         int rc = 0;
552         ENTRY;
553
554         fsdb = mgs_find_fsdb(mgs, name);
555         if (!fsdb) {
556                 fsdb = mgs_new_fsdb(env, mgs, name);
557                 if (IS_ERR(fsdb))
558                         rc = PTR_ERR(fsdb);
559
560                 CDEBUG(D_MGS, "Created new db: rc = %d\n", rc);
561         }
562
563         if (!rc)
564                 *dbh = fsdb;
565
566         RETURN(rc);
567 }
568
569 int mgs_find_or_make_fsdb(const struct lu_env *env, struct mgs_device *mgs,
570                           char *name, struct fs_db **dbh)
571 {
572         int rc;
573         ENTRY;
574
575         mutex_lock(&mgs->mgs_mutex);
576         rc = mgs_find_or_make_fsdb_nolock(env, mgs, name, dbh);
577         mutex_unlock(&mgs->mgs_mutex);
578
579         RETURN(rc);
580 }
581
582 /* 1 = index in use
583    0 = index unused
584    -1= empty client log */
585 int mgs_check_index(const struct lu_env *env,
586                     struct mgs_device *mgs,
587                     struct mgs_target_info *mti)
588 {
589         struct fs_db *fsdb;
590         void *imap;
591         int rc = 0;
592         ENTRY;
593
594         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
595
596         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
597         if (rc) {
598                 CERROR("Can't get db for %s\n", mti->mti_fsname);
599                 RETURN(rc);
600         }
601
602         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
603                 GOTO(out, rc = -1);
604
605         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
606                 imap = fsdb->fsdb_ost_index_map;
607         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
608                 imap = fsdb->fsdb_mdt_index_map;
609         else
610                 GOTO(out, rc = -EINVAL);
611
612         if (test_bit(mti->mti_stripe_index, imap))
613                 GOTO(out, rc = 1);
614
615         GOTO(out, rc = 0);
616
617 out:
618         mgs_put_fsdb(mgs, fsdb);
619         return rc;
620 }
621
622 static __inline__ int next_index(void *index_map, int map_len)
623 {
624         int i;
625         for (i = 0; i < map_len * 8; i++)
626                 if (!test_bit(i, index_map)) {
627                          return i;
628                  }
629         CERROR("max index %d exceeded.\n", i);
630         return -1;
631 }
632
633 /* Make the mdt/ost server obd name based on the filesystem name */
634 static bool server_make_name(u32 flags, u16 index, const char *fs,
635                              char *name_buf, size_t name_buf_size)
636 {
637         bool invalid_flag = false;
638
639         if (flags & (LDD_F_SV_TYPE_MDT | LDD_F_SV_TYPE_OST)) {
640                 if (!(flags & LDD_F_SV_ALL))
641                         snprintf(name_buf, name_buf_size, "%.8s%c%s%04x", fs,
642                                 (flags & LDD_F_VIRGIN) ? ':' :
643                                         ((flags & LDD_F_WRITECONF) ? '=' : '-'),
644                                 (flags & LDD_F_SV_TYPE_MDT) ? "MDT" : "OST",
645                                 index);
646         } else if (flags & LDD_F_SV_TYPE_MGS) {
647                 snprintf(name_buf, name_buf_size, "MGS");
648         } else {
649                 CERROR("unknown server type %#x\n", flags);
650                 invalid_flag = true;
651         }
652         return invalid_flag;
653 }
654
655 /* Return codes:
656         0  newly marked as in use
657         <0 err
658         +EALREADY for update of an old index */
659 static int mgs_set_index(const struct lu_env *env,
660                          struct mgs_device *mgs,
661                          struct mgs_target_info *mti)
662 {
663         struct fs_db *fsdb;
664         void *imap;
665         int rc = 0;
666         ENTRY;
667
668         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
669         if (rc) {
670                 CERROR("Can't get db for %s\n", mti->mti_fsname);
671                 RETURN(rc);
672         }
673
674         mutex_lock(&fsdb->fsdb_mutex);
675         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
676                 imap = fsdb->fsdb_ost_index_map;
677         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
678                 imap = fsdb->fsdb_mdt_index_map;
679         } else {
680                 GOTO(out_up, rc = -EINVAL);
681         }
682
683         if (mti->mti_flags & LDD_F_NEED_INDEX) {
684                 rc = next_index(imap, INDEX_MAP_SIZE);
685                 if (rc == -1)
686                         GOTO(out_up, rc = -ERANGE);
687                 mti->mti_stripe_index = rc;
688         }
689
690         /* the last index(0xffff) is reserved for default value. */
691         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
692                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
693                                    "but index must be less than %u.\n",
694                                    mti->mti_svname, mti->mti_stripe_index,
695                                    INDEX_MAP_SIZE * 8 - 1);
696                 GOTO(out_up, rc = -ERANGE);
697         }
698
699         if (test_bit(mti->mti_stripe_index, imap)) {
700                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
701                     !(mti->mti_flags & LDD_F_WRITECONF)) {
702                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
703                                            "%d, but that index is already in "
704                                            "use. Use --writeconf to force\n",
705                                            mti->mti_svname,
706                                            mti->mti_stripe_index);
707                         GOTO(out_up, rc = -EADDRINUSE);
708                 } else {
709                         CDEBUG(D_MGS, "Server %s updating index %d\n",
710                                mti->mti_svname, mti->mti_stripe_index);
711                         GOTO(out_up, rc = EALREADY);
712                 }
713         } else {
714                 set_bit(mti->mti_stripe_index, imap);
715                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
716                         fsdb->fsdb_mdt_count++;
717         }
718
719         set_bit(mti->mti_stripe_index, imap);
720         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
721         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
722                              mti->mti_stripe_index, mti->mti_fsname,
723                              mti->mti_svname, sizeof(mti->mti_svname))) {
724                 CERROR("unknown server type %#x\n", mti->mti_flags);
725                 GOTO(out_up, rc = -EINVAL);
726         }
727
728         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
729                mti->mti_stripe_index);
730
731         GOTO(out_up, rc = 0);
732
733 out_up:
734         mutex_unlock(&fsdb->fsdb_mutex);
735         mgs_put_fsdb(mgs, fsdb);
736         return rc;
737 }
738
739 struct mgs_modify_lookup {
740         struct cfg_marker mml_marker;
741         int               mml_modified;
742 };
743
744 static int mgs_check_record_match(const struct lu_env *env,
745                                 struct llog_handle *llh,
746                                 struct llog_rec_hdr *rec, void *data)
747 {
748         struct cfg_marker *mc_marker = data;
749         struct cfg_marker *marker;
750         struct lustre_cfg *lcfg = REC_DATA(rec);
751         int cfg_len = REC_DATA_LEN(rec);
752         int rc;
753         ENTRY;
754
755
756         if (rec->lrh_type != OBD_CFG_REC) {
757                 CDEBUG(D_ERROR, "Unhandled lrh_type: %#x\n", rec->lrh_type);
758                 RETURN(-EINVAL);
759         }
760
761         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
762         if (rc) {
763                 CDEBUG(D_ERROR, "Insane cfg\n");
764                 RETURN(rc);
765         }
766
767         /* We only care about markers */
768         if (lcfg->lcfg_command != LCFG_MARKER)
769                 RETURN(0);
770
771         marker = lustre_cfg_buf(lcfg, 1);
772
773         if (marker->cm_flags & CM_SKIP)
774                 RETURN(0);
775
776         if ((strcmp(mc_marker->cm_comment, marker->cm_comment) == 0) &&
777                 (strcmp(mc_marker->cm_tgtname, marker->cm_tgtname) == 0)) {
778                 /* Found a non-skipped marker match */
779                 CDEBUG(D_MGS, "Matched rec %u marker %d flag %x %s %s\n",
780                         rec->lrh_index, marker->cm_step,
781                         marker->cm_flags, marker->cm_tgtname,
782                         marker->cm_comment);
783                 rc = LLOG_PROC_BREAK;
784         }
785
786         RETURN(rc);
787 }
788
789 /**
790  * Check an existing config log record with matching comment and device
791  * Return code:
792  * 0 - checked successfully,
793  * LLOG_PROC_BREAK - record matches
794  * negative - error
795  */
796 static int mgs_check_marker(const struct lu_env *env, struct mgs_device *mgs,
797                 struct fs_db *fsdb, struct mgs_target_info *mti,
798                 char *logname, char *devname, char *comment)
799 {
800         struct llog_handle *loghandle;
801         struct llog_ctxt *ctxt;
802         struct cfg_marker *mc_marker;
803         int rc;
804
805         ENTRY;
806
807         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
808         CDEBUG(D_MGS, "mgs check %s/%s/%s\n", logname, devname, comment);
809
810         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
811         LASSERT(ctxt != NULL);
812         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
813         if (rc < 0) {
814                 if (rc == -ENOENT)
815                         rc = 0;
816                 GOTO(out_pop, rc);
817         }
818
819         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
820         if (rc)
821                 GOTO(out_close, rc);
822
823         if (llog_get_size(loghandle) <= 1)
824                 GOTO(out_close, rc = 0);
825
826         OBD_ALLOC_PTR(mc_marker);
827         if (!mc_marker)
828                 GOTO(out_close, rc = -ENOMEM);
829         if (strlcpy(mc_marker->cm_comment, comment,
830                 sizeof(mc_marker->cm_comment)) >=
831                 sizeof(mc_marker->cm_comment))
832                 GOTO(out_free, rc = -E2BIG);
833         if (strlcpy(mc_marker->cm_tgtname, devname,
834                 sizeof(mc_marker->cm_tgtname)) >=
835                 sizeof(mc_marker->cm_tgtname))
836                 GOTO(out_free, rc = -E2BIG);
837
838         rc = llog_process(env, loghandle, mgs_check_record_match,
839                         (void *)mc_marker, NULL);
840
841 out_free:
842         OBD_FREE_PTR(mc_marker);
843
844 out_close:
845         llog_close(env, loghandle);
846 out_pop:
847         if (rc && rc != LLOG_PROC_BREAK)
848                 CDEBUG(D_ERROR, "%s: mgs check %s/%s failed: rc = %d\n",
849                         mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
850         llog_ctxt_put(ctxt);
851         RETURN(rc);
852 }
853
854 static int mgs_modify_handler(const struct lu_env *env,
855                               struct llog_handle *llh,
856                               struct llog_rec_hdr *rec, void *data)
857 {
858         struct mgs_modify_lookup *mml = data;
859         struct cfg_marker *marker;
860         struct lustre_cfg *lcfg = REC_DATA(rec);
861         int cfg_len = REC_DATA_LEN(rec);
862         int rc;
863         ENTRY;
864
865         if (rec->lrh_type != OBD_CFG_REC) {
866                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
867                 RETURN(-EINVAL);
868         }
869
870         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
871         if (rc) {
872                 CERROR("Insane cfg\n");
873                 RETURN(rc);
874         }
875
876         /* We only care about markers */
877         if (lcfg->lcfg_command != LCFG_MARKER)
878                 RETURN(0);
879
880         marker = lustre_cfg_buf(lcfg, 1);
881         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
882             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
883             !(marker->cm_flags & CM_SKIP)) {
884                 /* Found a non-skipped marker match */
885                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
886                        rec->lrh_index, marker->cm_step,
887                        marker->cm_flags, mml->mml_marker.cm_flags,
888                        marker->cm_tgtname, marker->cm_comment);
889                 /* Overwrite the old marker llog entry */
890                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
891                 marker->cm_flags |= mml->mml_marker.cm_flags;
892                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
893                 rc = llog_write(env, llh, rec, rec->lrh_index);
894                 if (!rc)
895                          mml->mml_modified++;
896         }
897
898         RETURN(rc);
899 }
900
901 /**
902  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
903  * Return code:
904  * 0 - modified successfully,
905  * 1 - no modification was done
906  * negative - error
907  */
908 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
909                       struct fs_db *fsdb, struct mgs_target_info *mti,
910                       char *logname, char *devname, char *comment, int flags)
911 {
912         struct llog_handle *loghandle;
913         struct llog_ctxt *ctxt;
914         struct mgs_modify_lookup *mml;
915         int rc;
916
917         ENTRY;
918
919         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
920         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
921                flags);
922
923         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
924         LASSERT(ctxt != NULL);
925         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
926         if (rc < 0) {
927                 if (rc == -ENOENT)
928                         rc = 0;
929                 GOTO(out_pop, rc);
930         }
931
932         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
933         if (rc)
934                 GOTO(out_close, rc);
935
936         if (llog_get_size(loghandle) <= 1)
937                 GOTO(out_close, rc = 0);
938
939         OBD_ALLOC_PTR(mml);
940         if (!mml)
941                 GOTO(out_close, rc = -ENOMEM);
942         if (strlcpy(mml->mml_marker.cm_comment, comment,
943                     sizeof(mml->mml_marker.cm_comment)) >=
944             sizeof(mml->mml_marker.cm_comment))
945                 GOTO(out_free, rc = -E2BIG);
946         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
947                     sizeof(mml->mml_marker.cm_tgtname)) >=
948             sizeof(mml->mml_marker.cm_tgtname))
949                 GOTO(out_free, rc = -E2BIG);
950         /* Modify mostly means cancel */
951         mml->mml_marker.cm_flags = flags;
952         mml->mml_marker.cm_canceltime = flags ? ktime_get_real_seconds() : 0;
953         mml->mml_modified = 0;
954         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
955                           NULL);
956         if (!rc && !mml->mml_modified)
957                 rc = 1;
958
959 out_free:
960         OBD_FREE_PTR(mml);
961
962 out_close:
963         llog_close(env, loghandle);
964 out_pop:
965         if (rc < 0)
966                 CERROR("%s: modify %s/%s failed: rc = %d\n",
967                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
968         llog_ctxt_put(ctxt);
969         RETURN(rc);
970 }
971
972 enum replace_state {
973         REPLACE_COPY = 0,
974         REPLACE_SKIP,
975         REPLACE_DONE,
976         REPLACE_UUID,
977         REPLACE_SETUP
978 };
979
980 /** This structure is passed to mgs_replace_handler */
981 struct mgs_replace_data {
982         /* Nids are replaced for this target device */
983         struct mgs_target_info target;
984         /* Temporary modified llog */
985         struct llog_handle *temp_llh;
986         enum replace_state state;
987         char *failover;
988         char *nodeuuid;
989 };
990
991 /**
992  * Check: a) if block should be skipped
993  * b) is it target block
994  *
995  * \param[in] lcfg
996  * \param[in] mrd
997  *
998  * \retval 0 should not to be skipped
999  * \retval 1 should to be skipped
1000  */
1001 static int check_markers(struct lustre_cfg *lcfg,
1002                          struct mgs_replace_data *mrd)
1003 {
1004          struct cfg_marker *marker;
1005
1006         /* Track markers. Find given device */
1007         if (lcfg->lcfg_command == LCFG_MARKER) {
1008                 marker = lustre_cfg_buf(lcfg, 1);
1009                 /* Clean llog from records marked as CM_SKIP.
1010                    CM_EXCLUDE records are used for "active" command
1011                    and can be restored if needed */
1012                 if ((marker->cm_flags & (CM_SKIP | CM_START)) ==
1013                     (CM_SKIP | CM_START)) {
1014                         mrd->state = REPLACE_SKIP;
1015                         return 1;
1016                 }
1017
1018                 if ((marker->cm_flags & (CM_SKIP | CM_END)) ==
1019                     (CM_SKIP | CM_END)) {
1020                         mrd->state = REPLACE_COPY;
1021                         return 1;
1022                 }
1023
1024                 if (strcmp(mrd->target.mti_svname, marker->cm_tgtname) == 0) {
1025                         LASSERT(!(marker->cm_flags & CM_START) ||
1026                                 !(marker->cm_flags & CM_END));
1027                         if (marker->cm_flags & CM_START) {
1028                                 mrd->state = REPLACE_UUID;
1029                                 mrd->failover = NULL;
1030                         } else if (marker->cm_flags & CM_END)
1031                                 mrd->state = REPLACE_COPY;
1032                 }
1033         }
1034
1035         return 0;
1036 }
1037
1038 static int record_base(const struct lu_env *env, struct llog_handle *llh,
1039                      char *cfgname, lnet_nid_t nid, int cmd,
1040                      char *s1, char *s2, char *s3, char *s4)
1041 {
1042         struct mgs_thread_info  *mgi = mgs_env_info(env);
1043         struct llog_cfg_rec     *lcr;
1044         int rc;
1045
1046         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
1047                cmd, s1, s2, s3, s4);
1048
1049         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
1050         if (s1)
1051                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
1052         if (s2)
1053                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
1054         if (s3)
1055                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
1056         if (s4)
1057                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
1058
1059         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
1060         if (lcr == NULL)
1061                 return -ENOMEM;
1062
1063         lcr->lcr_cfg.lcfg_nid = nid;
1064         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1065
1066         lustre_cfg_rec_free(lcr);
1067
1068         if (rc < 0)
1069                 CDEBUG(D_MGS,
1070                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
1071                        cfgname, cmd, s1, s2, s3, s4, rc);
1072         return rc;
1073 }
1074
1075 static inline int record_add_uuid(const struct lu_env *env,
1076                                   struct llog_handle *llh,
1077                                   uint64_t nid, char *uuid)
1078 {
1079         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
1080                            NULL, NULL, NULL);
1081 }
1082
1083 static inline int record_add_conn(const struct lu_env *env,
1084                                   struct llog_handle *llh,
1085                                   char *devname, char *uuid)
1086 {
1087         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
1088                            NULL, NULL, NULL);
1089 }
1090
1091 static inline int record_attach(const struct lu_env *env,
1092                                 struct llog_handle *llh, char *devname,
1093                                 char *type, char *uuid)
1094 {
1095         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
1096                            NULL, NULL);
1097 }
1098
1099 static inline int record_setup(const struct lu_env *env,
1100                                struct llog_handle *llh, char *devname,
1101                                char *s1, char *s2, char *s3, char *s4)
1102 {
1103         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
1104 }
1105
1106 /**
1107  * \retval <0 record processing error
1108  * \retval n record is processed. No need copy original one.
1109  * \retval 0 record is not processed.
1110  */
1111 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
1112                            struct mgs_replace_data *mrd)
1113 {
1114         int nids_added = 0;
1115         lnet_nid_t nid;
1116         char *ptr;
1117         int rc = 0;
1118
1119         if (mrd->state == REPLACE_UUID &&
1120             lcfg->lcfg_command == LCFG_ADD_UUID) {
1121                 /* LCFG_ADD_UUID command found. Let's skip original command
1122                    and add passed nids */
1123                 ptr = mrd->target.mti_params;
1124                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1125                         if (!mrd->nodeuuid) {
1126                                 rc = name_create(&mrd->nodeuuid,
1127                                                  libcfs_nid2str(nid), "");
1128                                 if (rc) {
1129                                         CERROR("Can't create uuid for "
1130                                                 "nid  %s, device %s\n",
1131                                                 libcfs_nid2str(nid),
1132                                                 mrd->target.mti_svname);
1133                                         return rc;
1134                                 }
1135                         }
1136                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
1137                                "device %s\n", libcfs_nid2str(nid),
1138                                 mrd->target.mti_params,
1139                                 mrd->nodeuuid);
1140                         rc = record_add_uuid(env,
1141                                              mrd->temp_llh, nid,
1142                                              mrd->nodeuuid);
1143                         if (!rc)
1144                                 nids_added++;
1145
1146                         if (*ptr == ':') {
1147                                 mrd->failover = ptr;
1148                                 break;
1149                         }
1150                 }
1151
1152                 if (nids_added == 0) {
1153                         CERROR("No new nids were added, nid %s with uuid %s, "
1154                                "device %s\n", libcfs_nid2str(nid),
1155                                mrd->nodeuuid ? mrd->nodeuuid : "NULL",
1156                                mrd->target.mti_svname);
1157                         name_destroy(&mrd->nodeuuid);
1158                         return -ENXIO;
1159                 } else {
1160                         mrd->state = REPLACE_SETUP;
1161                 }
1162
1163                 return nids_added;
1164         }
1165
1166         if (mrd->state == REPLACE_SETUP && lcfg->lcfg_command == LCFG_SETUP) {
1167                 /* LCFG_SETUP command found. UUID should be changed */
1168                 rc = record_setup(env,
1169                                   mrd->temp_llh,
1170                                   /* devname the same */
1171                                   lustre_cfg_string(lcfg, 0),
1172                                   /* s1 is not changed */
1173                                   lustre_cfg_string(lcfg, 1),
1174                                   mrd->nodeuuid,
1175                                   /* s3 is not changed */
1176                                   lustre_cfg_string(lcfg, 3),
1177                                   /* s4 is not changed */
1178                                   lustre_cfg_string(lcfg, 4));
1179
1180                 name_destroy(&mrd->nodeuuid);
1181                 if (rc)
1182                         return rc;
1183
1184                 if (mrd->failover) {
1185                         ptr = mrd->failover;
1186                         while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1187                                 if (mrd->nodeuuid == NULL) {
1188                                         rc =  name_create(&mrd->nodeuuid,
1189                                                           libcfs_nid2str(nid),
1190                                                           "");
1191                                         if (rc)
1192                                                 return rc;
1193                                 }
1194
1195                                 CDEBUG(D_MGS, "add nid %s for failover %s\n",
1196                                        libcfs_nid2str(nid), mrd->nodeuuid);
1197                                 rc = record_add_uuid(env, mrd->temp_llh, nid,
1198                                                      mrd->nodeuuid);
1199                                 if (rc) {
1200                                         name_destroy(&mrd->nodeuuid);
1201                                         return rc;
1202                                 }
1203                                 if (*ptr == ':') {
1204                                         rc = record_add_conn(env,
1205                                                 mrd->temp_llh,
1206                                                 lustre_cfg_string(lcfg, 0),
1207                                                 mrd->nodeuuid);
1208                                         name_destroy(&mrd->nodeuuid);
1209                                         if (rc)
1210                                                 return rc;
1211                                 }
1212                         }
1213                         if (mrd->nodeuuid) {
1214                                 rc = record_add_conn(env, mrd->temp_llh,
1215                                                      lustre_cfg_string(lcfg, 0),
1216                                                      mrd->nodeuuid);
1217                                 name_destroy(&mrd->nodeuuid);
1218                                 if (rc)
1219                                         return rc;
1220                         }
1221                 }
1222                 mrd->state = REPLACE_DONE;
1223                 return rc ? rc : 1;
1224         }
1225
1226         /* Another commands in target device block */
1227         return 0;
1228 }
1229
1230 /**
1231  * Handler that called for every record in llog.
1232  * Records are processed in order they placed in llog.
1233  *
1234  * \param[in] llh       log to be processed
1235  * \param[in] rec       current record
1236  * \param[in] data      mgs_replace_data structure
1237  *
1238  * \retval 0    success
1239  */
1240 static int mgs_replace_nids_handler(const struct lu_env *env,
1241                                     struct llog_handle *llh,
1242                                     struct llog_rec_hdr *rec,
1243                                     void *data)
1244 {
1245         struct mgs_replace_data *mrd;
1246         struct lustre_cfg *lcfg = REC_DATA(rec);
1247         int cfg_len = REC_DATA_LEN(rec);
1248         int rc;
1249         ENTRY;
1250
1251         mrd = (struct mgs_replace_data *)data;
1252
1253         if (rec->lrh_type != OBD_CFG_REC) {
1254                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
1255                        rec->lrh_type, lcfg->lcfg_command,
1256                        lustre_cfg_string(lcfg, 0),
1257                        lustre_cfg_string(lcfg, 1));
1258                 RETURN(-EINVAL);
1259         }
1260
1261         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1262         if (rc) {
1263                 /* Do not copy any invalidated records */
1264                 GOTO(skip_out, rc = 0);
1265         }
1266
1267         rc = check_markers(lcfg, mrd);
1268         if (rc || mrd->state == REPLACE_SKIP)
1269                 GOTO(skip_out, rc = 0);
1270
1271         /* Write to new log all commands outside target device block */
1272         if (mrd->state == REPLACE_COPY)
1273                 GOTO(copy_out, rc = 0);
1274
1275         if (mrd->state == REPLACE_DONE &&
1276             (lcfg->lcfg_command == LCFG_ADD_UUID ||
1277              lcfg->lcfg_command == LCFG_ADD_CONN)) {
1278                 if (!mrd->failover)
1279                         CWARN("Previous failover is deleted, but new one is "
1280                               "not set. This means you configure system "
1281                               "without failover or passed wrong replace_nids "
1282                               "command parameters. Device %s, passed nids %s\n",
1283                               mrd->target.mti_svname, mrd->target.mti_params);
1284                 GOTO(skip_out, rc = 0);
1285         }
1286
1287         rc = process_command(env, lcfg, mrd);
1288         if (rc < 0)
1289                 RETURN(rc);
1290
1291         if (rc)
1292                 RETURN(0);
1293 copy_out:
1294         /* Record is placed in temporary llog as is */
1295         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1296
1297         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1298                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1299                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1300         RETURN(rc);
1301
1302 skip_out:
1303         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1304                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1305                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1306         RETURN(rc);
1307 }
1308
1309 static int mgs_log_is_empty(const struct lu_env *env,
1310                             struct mgs_device *mgs, char *name)
1311 {
1312         struct llog_ctxt        *ctxt;
1313         int                      rc;
1314
1315         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1316         LASSERT(ctxt != NULL);
1317
1318         rc = llog_is_empty(env, ctxt, name);
1319         llog_ctxt_put(ctxt);
1320         return rc;
1321 }
1322
1323 static int mgs_replace_log(const struct lu_env *env,
1324                            struct obd_device *mgs,
1325                            char *logname, char *devname,
1326                            llog_cb_t replace_handler, void *data)
1327 {
1328         struct llog_handle *orig_llh, *backup_llh;
1329         struct llog_ctxt *ctxt;
1330         struct mgs_replace_data *mrd;
1331         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1332         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1333         char *backup;
1334         int rc, rc2, buf_size;
1335         time64_t now;
1336         ENTRY;
1337
1338         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1339         LASSERT(ctxt != NULL);
1340
1341         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1342                 /* Log is empty. Nothing to replace */
1343                 GOTO(out_put, rc = 0);
1344         }
1345
1346         now = ktime_get_real_seconds();
1347
1348         /* max time64_t in decimal fits into 20 bytes long string */
1349         buf_size = strlen(logname) + 1 + 20 + 1 + strlen(".bak") + 1;
1350         OBD_ALLOC(backup, buf_size);
1351         if (backup == NULL)
1352                 GOTO(out_put, rc = -ENOMEM);
1353
1354         snprintf(backup, buf_size, "%s.%llu.bak", logname, now);
1355
1356         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1357         if (rc == 0) {
1358                 /* Now erase original log file. Connections are not allowed.
1359                    Backup is already saved */
1360                 rc = llog_erase(env, ctxt, NULL, logname);
1361                 if (rc < 0)
1362                         GOTO(out_free, rc);
1363         } else if (rc != -ENOENT) {
1364                 CERROR("%s: can't make backup for %s: rc = %d\n",
1365                        mgs->obd_name, logname, rc);
1366                 GOTO(out_free,rc);
1367         }
1368
1369         /* open local log */
1370         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1371         if (rc)
1372                 GOTO(out_restore, rc);
1373
1374         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1375         if (rc)
1376                 GOTO(out_closel, rc);
1377
1378         /* open backup llog */
1379         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1380                        LLOG_OPEN_EXISTS);
1381         if (rc)
1382                 GOTO(out_closel, rc);
1383
1384         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1385         if (rc)
1386                 GOTO(out_close, rc);
1387
1388         if (llog_get_size(backup_llh) <= 1)
1389                 GOTO(out_close, rc = 0);
1390
1391         OBD_ALLOC_PTR(mrd);
1392         if (!mrd)
1393                 GOTO(out_close, rc = -ENOMEM);
1394         /* devname is only needed information to replace UUID records */
1395         if (devname)
1396                 strlcpy(mrd->target.mti_svname, devname,
1397                         sizeof(mrd->target.mti_svname));
1398         /* data is parsed in llog callback */
1399         if (data)
1400                 strlcpy(mrd->target.mti_params, data,
1401                         sizeof(mrd->target.mti_params));
1402         /* Copy records to this temporary llog */
1403         mrd->temp_llh = orig_llh;
1404
1405         rc = llog_process(env, backup_llh, replace_handler,
1406                           (void *)mrd, NULL);
1407         OBD_FREE_PTR(mrd);
1408 out_close:
1409         rc2 = llog_close(NULL, backup_llh);
1410         if (!rc)
1411                 rc = rc2;
1412 out_closel:
1413         rc2 = llog_close(NULL, orig_llh);
1414         if (!rc)
1415                 rc = rc2;
1416
1417 out_restore:
1418         if (rc) {
1419                 CERROR("%s: llog should be restored: rc = %d\n",
1420                        mgs->obd_name, rc);
1421                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1422                                   logname);
1423                 if (rc2 < 0)
1424                         CERROR("%s: can't restore backup %s: rc = %d\n",
1425                                mgs->obd_name, logname, rc2);
1426         }
1427
1428 out_free:
1429         OBD_FREE(backup, buf_size);
1430
1431 out_put:
1432         llog_ctxt_put(ctxt);
1433
1434         if (rc)
1435                 CERROR("%s: failed to replace log %s: rc = %d\n",
1436                        mgs->obd_name, logname, rc);
1437
1438         RETURN(rc);
1439 }
1440
1441 static int mgs_replace_nids_log(const struct lu_env *env,
1442                                 struct obd_device *obd,
1443                                 char *logname, char *devname, char *nids)
1444 {
1445         CDEBUG(D_MGS, "Replace NIDs for %s in %s\n", devname, logname);
1446         return mgs_replace_log(env, obd, logname, devname,
1447                                mgs_replace_nids_handler, nids);
1448 }
1449
1450 /**
1451  * Parse device name and get file system name and/or device index
1452  *
1453  * @devname     device name (ex. lustre-MDT0000)
1454  * @fsname      file system name extracted from @devname and returned
1455  *              to the caller (optional)
1456  * @index       device index extracted from @devname and returned to
1457  *              the caller (optional)
1458  *
1459  * RETURN       0                       success if we are only interested in
1460  *                                      extracting fsname from devname.
1461  *                                      i.e index is NULL
1462  *
1463  *              LDD_F_SV_TYPE_*         Besides extracting the fsname the
1464  *                                      user also wants the index. Report to
1465  *                                      the user the type of obd device the
1466  *                                      returned index belongs too.
1467  *
1468  *              -EINVAL                 The obd device name is improper so
1469  *                                      fsname could not be extracted.
1470  *
1471  *              -ENXIO                  Failed to extract the index out of
1472  *                                      the obd device name. Most likely an
1473  *                                      invalid obd device name
1474  */
1475 static int mgs_parse_devname(char *devname, char *fsname, u32 *index)
1476 {
1477         int rc = 0;
1478         ENTRY;
1479
1480         /* Extract fsname */
1481         if (fsname) {
1482                 rc = server_name2fsname(devname, fsname, NULL);
1483                 if (rc < 0) {
1484                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1485                                devname);
1486                         RETURN(-EINVAL);
1487                 }
1488         }
1489
1490         if (index) {
1491                 rc = server_name2index(devname, index, NULL);
1492                 if (rc < 0) {
1493                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1494                                devname);
1495                         RETURN(-ENXIO);
1496                 }
1497         }
1498
1499         /* server_name2index can return LDD_F_SV_TYPE_* so always return rc */
1500         RETURN(rc);
1501 }
1502
1503 /* This is only called during replace_nids */
1504 static int only_mgs_is_running(struct obd_device *mgs_obd)
1505 {
1506         /* TDB: Is global variable with devices count exists? */
1507         int num_devices = get_devices_count();
1508         int num_exports = 0;
1509         struct obd_export *exp;
1510
1511         spin_lock(&mgs_obd->obd_dev_lock);
1512         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1513                 /* skip self export */
1514                 if (exp == mgs_obd->obd_self_export)
1515                         continue;
1516                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1517                         continue;
1518
1519                 ++num_exports;
1520
1521                 CERROR("%s: node %s still connected during replace_nids "
1522                        "connect_flags:%llx\n",
1523                        mgs_obd->obd_name,
1524                        libcfs_nid2str(exp->exp_nid_stats->nid),
1525                        exp_connect_flags(exp));
1526
1527         }
1528         spin_unlock(&mgs_obd->obd_dev_lock);
1529
1530         /* osd, MGS and MGC + self_export
1531            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1532         return (num_devices <= 3) && (num_exports == 0);
1533 }
1534
1535 static int name_create_mdt(char **logname, char *fsname, int i)
1536 {
1537         char mdt_index[9];
1538
1539         sprintf(mdt_index, "-MDT%04x", i);
1540         return name_create(logname, fsname, mdt_index);
1541 }
1542
1543 /**
1544  * Replace nids for \a device to \a nids values
1545  *
1546  * \param obd           MGS obd device
1547  * \param devname       nids need to be replaced for this device
1548  * (ex. lustre-OST0000)
1549  * \param nids          nids list (ex. nid1,nid2,nid3)
1550  *
1551  * \retval 0    success
1552  */
1553 int mgs_replace_nids(const struct lu_env *env,
1554                      struct mgs_device *mgs,
1555                      char *devname, char *nids)
1556 {
1557         /* Assume fsname is part of device name */
1558         char fsname[MTI_NAME_MAXLEN];
1559         int rc;
1560         __u32 index;
1561         char *logname;
1562         struct fs_db *fsdb = NULL;
1563         unsigned int i;
1564         int conn_state;
1565         struct obd_device *mgs_obd = mgs->mgs_obd;
1566         ENTRY;
1567
1568         /* We can only change NIDs if no other nodes are connected */
1569         spin_lock(&mgs_obd->obd_dev_lock);
1570         conn_state = mgs_obd->obd_no_conn;
1571         mgs_obd->obd_no_conn = 1;
1572         spin_unlock(&mgs_obd->obd_dev_lock);
1573
1574         /* We can not change nids if not only MGS is started */
1575         if (!only_mgs_is_running(mgs_obd)) {
1576                 CERROR("Only MGS is allowed to be started\n");
1577                 GOTO(out, rc = -EINPROGRESS);
1578         }
1579
1580         /* Get fsname and index */
1581         rc = mgs_parse_devname(devname, fsname, &index);
1582         if (rc < 0)
1583                 GOTO(out, rc);
1584
1585         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1586         if (rc) {
1587                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1588                 GOTO(out, rc);
1589         }
1590
1591         /* Process client llogs */
1592         name_create(&logname, fsname, "-client");
1593         rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1594         name_destroy(&logname);
1595         if (rc) {
1596                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1597                        fsname, devname, rc);
1598                 GOTO(out, rc);
1599         }
1600
1601         /* Process MDT llogs */
1602         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1603                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1604                         continue;
1605                 name_create_mdt(&logname, fsname, i);
1606                 rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1607                 name_destroy(&logname);
1608                 if (rc)
1609                         GOTO(out, rc);
1610         }
1611
1612 out:
1613         spin_lock(&mgs_obd->obd_dev_lock);
1614         mgs_obd->obd_no_conn = conn_state;
1615         spin_unlock(&mgs_obd->obd_dev_lock);
1616
1617         if (fsdb)
1618                 mgs_put_fsdb(mgs, fsdb);
1619
1620         RETURN(rc);
1621 }
1622
1623 /**
1624  * This is called for every record in llog. Some of records are
1625  * skipped, others are copied to new log as is.
1626  * Records to be skipped are
1627  *  marker records marked SKIP
1628  *  records enclosed between SKIP markers
1629  *
1630  * \param[in] llh       log to be processed
1631  * \param[in] rec       current record
1632  * \param[in] data      mgs_replace_data structure
1633  *
1634  * \retval 0    success
1635  **/
1636 static int mgs_clear_config_handler(const struct lu_env *env,
1637                                     struct llog_handle *llh,
1638                                     struct llog_rec_hdr *rec, void *data)
1639 {
1640         struct mgs_replace_data *mrd;
1641         struct lustre_cfg *lcfg = REC_DATA(rec);
1642         int cfg_len = REC_DATA_LEN(rec);
1643         int rc;
1644
1645         ENTRY;
1646
1647         mrd = (struct mgs_replace_data *)data;
1648
1649         if (rec->lrh_type != OBD_CFG_REC) {
1650                 CDEBUG(D_MGS, "Config llog Name=%s, Record Index=%u, "
1651                        "Unhandled Record Type=%#x\n", llh->lgh_name,
1652                        rec->lrh_index, rec->lrh_type);
1653                 RETURN(-EINVAL);
1654         }
1655
1656         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1657         if (rc) {
1658                 CDEBUG(D_MGS, "Config llog Name=%s, Invalid config file.",
1659                        llh->lgh_name);
1660                 RETURN(-EINVAL);
1661         }
1662
1663         if (lcfg->lcfg_command == LCFG_MARKER) {
1664                 struct cfg_marker *marker;
1665
1666                 marker = lustre_cfg_buf(lcfg, 1);
1667                 if (marker->cm_flags & CM_SKIP) {
1668                         if (marker->cm_flags & CM_START)
1669                                 mrd->state = REPLACE_SKIP;
1670                         if (marker->cm_flags & CM_END)
1671                                 mrd->state = REPLACE_COPY;
1672                         /* SKIP section started or finished */
1673                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1674                                "cmd %x %s %s\n", rec->lrh_index, rc,
1675                                rec->lrh_len, lcfg->lcfg_command,
1676                                lustre_cfg_string(lcfg, 0),
1677                                lustre_cfg_string(lcfg, 1));
1678                         RETURN(0);
1679                 }
1680         } else {
1681                 if (mrd->state == REPLACE_SKIP) {
1682                         /* record enclosed between SKIP markers, skip it */
1683                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1684                                "cmd %x %s %s\n", rec->lrh_index, rc,
1685                                rec->lrh_len, lcfg->lcfg_command,
1686                                lustre_cfg_string(lcfg, 0),
1687                                lustre_cfg_string(lcfg, 1));
1688                         RETURN(0);
1689                 }
1690         }
1691
1692         /* Record is placed in temporary llog as is */
1693         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1694
1695         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1696                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1697                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1698         RETURN(rc);
1699 }
1700
1701 /*
1702  * Directory CONFIGS/ may contain files which are not config logs to
1703  * be cleared. Skip any llogs with a non-alphanumeric character after
1704  * the last '-'. For example, fsname-MDT0000.sav, fsname-MDT0000.bak,
1705  * fsname-MDT0000.orig, fsname-MDT0000~, fsname-MDT0000.20150516, etc.
1706  */
1707 static bool config_to_clear(const char *logname)
1708 {
1709         int i;
1710         char *str;
1711
1712         str = strrchr(logname, '-');
1713         if (!str)
1714                 return 0;
1715
1716         i = 0;
1717         while (isalnum(str[++i]));
1718         return str[i] == '\0';
1719 }
1720
1721 /**
1722  * Clear config logs for \a name
1723  *
1724  * \param env
1725  * \param mgs           MGS device
1726  * \param name          name of device or of filesystem
1727  *                      (ex. lustre-OST0000 or lustre) in later case all logs
1728  *                      will be cleared
1729  *
1730  * \retval 0            success
1731  */
1732 int mgs_clear_configs(const struct lu_env *env,
1733                      struct mgs_device *mgs, const char *name)
1734 {
1735         struct list_head dentry_list;
1736         struct mgs_direntry *dirent, *n;
1737         char *namedash;
1738         int conn_state;
1739         struct obd_device *mgs_obd = mgs->mgs_obd;
1740         int rc;
1741
1742         ENTRY;
1743
1744         /* Prevent clients and servers from connecting to mgs */
1745         spin_lock(&mgs_obd->obd_dev_lock);
1746         conn_state = mgs_obd->obd_no_conn;
1747         mgs_obd->obd_no_conn = 1;
1748         spin_unlock(&mgs_obd->obd_dev_lock);
1749
1750         /*
1751          * config logs cannot be cleaned if anything other than
1752          * MGS is started
1753          */
1754         if (!only_mgs_is_running(mgs_obd)) {
1755                 CERROR("Only MGS is allowed to be started\n");
1756                 GOTO(out, rc = -EBUSY);
1757         }
1758
1759         /* Find all the logs in the CONFIGS directory */
1760         rc = class_dentry_readdir(env, mgs, &dentry_list);
1761         if (rc) {
1762                 CERROR("%s: cannot read config directory '%s': rc = %d\n",
1763                        mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
1764                 GOTO(out, rc);
1765         }
1766
1767         if (list_empty(&dentry_list)) {
1768                 CERROR("%s: list empty reading config dir '%s': rc = %d\n",
1769                         mgs_obd->obd_name, MOUNT_CONFIGS_DIR, -ENOENT);
1770                 GOTO(out, rc = -ENOENT);
1771         }
1772
1773         OBD_ALLOC(namedash, strlen(name) + 2);
1774         if (namedash == NULL)
1775                 GOTO(out, rc = -ENOMEM);
1776         snprintf(namedash, strlen(name) + 2, "%s-", name);
1777
1778         list_for_each_entry(dirent, &dentry_list, mde_list) {
1779                 if (strcmp(name, dirent->mde_name) &&
1780                     strncmp(namedash, dirent->mde_name, strlen(namedash)))
1781                         continue;
1782                 if (!config_to_clear(dirent->mde_name))
1783                         continue;
1784                 CDEBUG(D_MGS, "%s: Clear config log %s\n",
1785                        mgs_obd->obd_name, dirent->mde_name);
1786                 rc = mgs_replace_log(env, mgs_obd, dirent->mde_name, NULL,
1787                                      mgs_clear_config_handler, NULL);
1788                 if (rc)
1789                         break;
1790         }
1791
1792         list_for_each_entry_safe(dirent, n, &dentry_list, mde_list) {
1793                 list_del_init(&dirent->mde_list);
1794                 mgs_direntry_free(dirent);
1795         }
1796         OBD_FREE(namedash, strlen(name) + 2);
1797 out:
1798         spin_lock(&mgs_obd->obd_dev_lock);
1799         mgs_obd->obd_no_conn = conn_state;
1800         spin_unlock(&mgs_obd->obd_dev_lock);
1801
1802         RETURN(rc);
1803 }
1804
1805 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1806                             char *devname, struct lov_desc *desc)
1807 {
1808         struct mgs_thread_info  *mgi = mgs_env_info(env);
1809         struct llog_cfg_rec     *lcr;
1810         int rc;
1811
1812         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1813         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1814         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1815         if (lcr == NULL)
1816                 return -ENOMEM;
1817
1818         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1819         lustre_cfg_rec_free(lcr);
1820         return rc;
1821 }
1822
1823 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1824                             char *devname, struct lmv_desc *desc)
1825 {
1826         struct mgs_thread_info  *mgi = mgs_env_info(env);
1827         struct llog_cfg_rec     *lcr;
1828         int rc;
1829
1830         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1831         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1832         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1833         if (lcr == NULL)
1834                 return -ENOMEM;
1835
1836         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1837         lustre_cfg_rec_free(lcr);
1838         return rc;
1839 }
1840
1841 static inline int record_mdc_add(const struct lu_env *env,
1842                                  struct llog_handle *llh,
1843                                  char *logname, char *mdcuuid,
1844                                  char *mdtuuid, char *index,
1845                                  char *gen)
1846 {
1847         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1848                            mdtuuid,index,gen,mdcuuid);
1849 }
1850
1851 static inline int record_lov_add(const struct lu_env *env,
1852                                  struct llog_handle *llh,
1853                                  char *lov_name, char *ost_uuid,
1854                                  char *index, char *gen)
1855 {
1856         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1857                            ost_uuid, index, gen, NULL);
1858 }
1859
1860 static inline int record_mount_opt(const struct lu_env *env,
1861                                    struct llog_handle *llh,
1862                                    char *profile, char *lov_name,
1863                                    char *mdc_name)
1864 {
1865         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1866                            profile, lov_name, mdc_name, NULL);
1867 }
1868
1869 static int record_marker(const struct lu_env *env,
1870                          struct llog_handle *llh,
1871                          struct fs_db *fsdb, __u32 flags,
1872                          char *tgtname, char *comment)
1873 {
1874         struct mgs_thread_info *mgi = mgs_env_info(env);
1875         struct llog_cfg_rec *lcr;
1876         int rc;
1877         int cplen = 0;
1878
1879         if (flags & CM_START)
1880                 fsdb->fsdb_gen++;
1881         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1882         mgi->mgi_marker.cm_flags = flags;
1883         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1884         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1885                         sizeof(mgi->mgi_marker.cm_tgtname));
1886         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1887                 return -E2BIG;
1888         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1889                         sizeof(mgi->mgi_marker.cm_comment));
1890         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1891                 return -E2BIG;
1892         mgi->mgi_marker.cm_createtime = ktime_get_real_seconds();
1893         mgi->mgi_marker.cm_canceltime = 0;
1894         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1895         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1896                             sizeof(mgi->mgi_marker));
1897         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1898         if (lcr == NULL)
1899                 return -ENOMEM;
1900
1901         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1902         lustre_cfg_rec_free(lcr);
1903         return rc;
1904 }
1905
1906 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1907                             struct llog_handle **llh, char *name)
1908 {
1909         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1910         struct llog_ctxt        *ctxt;
1911         int                      rc = 0;
1912         ENTRY;
1913
1914         if (*llh)
1915                 GOTO(out, rc = -EBUSY);
1916
1917         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1918         if (!ctxt)
1919                 GOTO(out, rc = -ENODEV);
1920         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1921
1922         rc = llog_open_create(env, ctxt, llh, NULL, name);
1923         if (rc)
1924                 GOTO(out_ctxt, rc);
1925         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1926         if (rc)
1927                 llog_close(env, *llh);
1928 out_ctxt:
1929         llog_ctxt_put(ctxt);
1930 out:
1931         if (rc) {
1932                 CERROR("%s: can't start log %s: rc = %d\n",
1933                        mgs->mgs_obd->obd_name, name, rc);
1934                 *llh = NULL;
1935         }
1936         RETURN(rc);
1937 }
1938
1939 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1940 {
1941         int rc;
1942
1943         rc = llog_close(env, *llh);
1944         *llh = NULL;
1945
1946         return rc;
1947 }
1948
1949 /******************** config "macros" *********************/
1950
1951 /* write an lcfg directly into a log (with markers) */
1952 static int mgs_write_log_direct(const struct lu_env *env,
1953                                 struct mgs_device *mgs, struct fs_db *fsdb,
1954                                 char *logname, struct llog_cfg_rec *lcr,
1955                                 char *devname, char *comment)
1956 {
1957         struct llog_handle *llh = NULL;
1958         int rc;
1959
1960         ENTRY;
1961
1962         rc = record_start_log(env, mgs, &llh, logname);
1963         if (rc)
1964                 RETURN(rc);
1965
1966         /* FIXME These should be a single journal transaction */
1967         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1968         if (rc)
1969                 GOTO(out_end, rc);
1970         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1971         if (rc)
1972                 GOTO(out_end, rc);
1973         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1974         if (rc)
1975                 GOTO(out_end, rc);
1976 out_end:
1977         record_end_log(env, &llh);
1978         RETURN(rc);
1979 }
1980
1981 /* write the lcfg in all logs for the given fs */
1982 static int mgs_write_log_direct_all(const struct lu_env *env,
1983                                     struct mgs_device *mgs,
1984                                     struct fs_db *fsdb,
1985                                     struct mgs_target_info *mti,
1986                                     struct llog_cfg_rec *lcr, char *devname,
1987                                     char *comment, int server_only)
1988 {
1989         struct list_head         log_list;
1990         struct mgs_direntry     *dirent, *n;
1991         char                    *fsname = mti->mti_fsname;
1992         int                      rc = 0, len = strlen(fsname);
1993
1994         ENTRY;
1995         /* Find all the logs in the CONFIGS directory */
1996         rc = class_dentry_readdir(env, mgs, &log_list);
1997         if (rc)
1998                 RETURN(rc);
1999
2000         /* Could use fsdb index maps instead of directory listing */
2001         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
2002                 list_del_init(&dirent->mde_list);
2003                 /* don't write to sptlrpc rule log */
2004                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
2005                         goto next;
2006
2007                 /* caller wants write server logs only */
2008                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
2009                         goto next;
2010
2011                 if (strlen(dirent->mde_name) <= len ||
2012                     strncmp(fsname, dirent->mde_name, len) != 0 ||
2013                     dirent->mde_name[len] != '-')
2014                         goto next;
2015
2016                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
2017                 /* Erase any old settings of this same parameter */
2018                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
2019                                 devname, comment, CM_SKIP);
2020                 if (rc < 0)
2021                         CERROR("%s: Can't modify llog %s: rc = %d\n",
2022                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2023                 if (lcr == NULL)
2024                         goto next;
2025                 /* Write the new one */
2026                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
2027                                           lcr, devname, comment);
2028                 if (rc != 0)
2029                         CERROR("%s: writing log %s: rc = %d\n",
2030                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2031 next:
2032                 mgs_direntry_free(dirent);
2033         }
2034
2035         RETURN(rc);
2036 }
2037
2038 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2039                                     struct mgs_device *mgs,
2040                                     struct fs_db *fsdb,
2041                                     struct mgs_target_info *mti,
2042                                     int index, char *logname);
2043 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2044                                     struct mgs_device *mgs,
2045                                     struct fs_db *fsdb,
2046                                     struct mgs_target_info *mti,
2047                                     char *logname, char *suffix, char *lovname,
2048                                     enum lustre_sec_part sec_part, int flags);
2049 static int name_create_mdt_and_lov(char **logname, char **lovname,
2050                                    struct fs_db *fsdb, int i);
2051
2052 static int add_param(char *params, char *key, char *val)
2053 {
2054         char *start = params + strlen(params);
2055         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
2056         int keylen = 0;
2057
2058         if (key != NULL)
2059                 keylen = strlen(key);
2060         if (start + 1 + keylen + strlen(val) >= end) {
2061                 CERROR("params are too long: %s %s%s\n",
2062                        params, key != NULL ? key : "", val);
2063                 return -EINVAL;
2064         }
2065
2066         sprintf(start, " %s%s", key != NULL ? key : "", val);
2067         return 0;
2068 }
2069
2070 /**
2071  * Walk through client config log record and convert the related records
2072  * into the target.
2073  **/
2074 static int mgs_steal_client_llog_handler(const struct lu_env *env,
2075                                          struct llog_handle *llh,
2076                                          struct llog_rec_hdr *rec, void *data)
2077 {
2078         struct mgs_device *mgs;
2079         struct obd_device *obd;
2080         struct mgs_target_info *mti, *tmti;
2081         struct fs_db *fsdb;
2082         int cfg_len = rec->lrh_len;
2083         char *cfg_buf = (char*) (rec + 1);
2084         struct lustre_cfg *lcfg;
2085         int rc = 0;
2086         struct llog_handle *mdt_llh = NULL;
2087         static int got_an_osc_or_mdc = 0;
2088         /* 0: not found any osc/mdc;
2089            1: found osc;
2090            2: found mdc;
2091         */
2092         static int last_step = -1;
2093         int cplen = 0;
2094
2095         ENTRY;
2096
2097         mti = ((struct temp_comp*)data)->comp_mti;
2098         tmti = ((struct temp_comp*)data)->comp_tmti;
2099         fsdb = ((struct temp_comp*)data)->comp_fsdb;
2100         obd = ((struct temp_comp *)data)->comp_obd;
2101         mgs = lu2mgs_dev(obd->obd_lu_dev);
2102         LASSERT(mgs);
2103
2104         if (rec->lrh_type != OBD_CFG_REC) {
2105                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2106                 RETURN(-EINVAL);
2107         }
2108
2109         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
2110         if (rc) {
2111                 CERROR("Insane cfg\n");
2112                 RETURN(rc);
2113         }
2114
2115         lcfg = (struct lustre_cfg *)cfg_buf;
2116
2117         if (lcfg->lcfg_command == LCFG_MARKER) {
2118                 struct cfg_marker *marker;
2119                 marker = lustre_cfg_buf(lcfg, 1);
2120                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2121                     (marker->cm_flags & CM_START) &&
2122                      !(marker->cm_flags & CM_SKIP)) {
2123                         got_an_osc_or_mdc = 1;
2124                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
2125                                         sizeof(tmti->mti_svname));
2126                         if (cplen >= sizeof(tmti->mti_svname))
2127                                 RETURN(-E2BIG);
2128                         rc = record_start_log(env, mgs, &mdt_llh,
2129                                               mti->mti_svname);
2130                         if (rc)
2131                                 RETURN(rc);
2132                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
2133                                            mti->mti_svname, "add osc(copied)");
2134                         record_end_log(env, &mdt_llh);
2135                         last_step = marker->cm_step;
2136                         RETURN(rc);
2137                 }
2138                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2139                     (marker->cm_flags & CM_END) &&
2140                      !(marker->cm_flags & CM_SKIP)) {
2141                         LASSERT(last_step == marker->cm_step);
2142                         last_step = -1;
2143                         got_an_osc_or_mdc = 0;
2144                         memset(tmti, 0, sizeof(*tmti));
2145                         rc = record_start_log(env, mgs, &mdt_llh,
2146                                               mti->mti_svname);
2147                         if (rc)
2148                                 RETURN(rc);
2149                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
2150                                            mti->mti_svname, "add osc(copied)");
2151                         record_end_log(env, &mdt_llh);
2152                         RETURN(rc);
2153                 }
2154                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2155                     (marker->cm_flags & CM_START) &&
2156                      !(marker->cm_flags & CM_SKIP)) {
2157                         got_an_osc_or_mdc = 2;
2158                         last_step = marker->cm_step;
2159                         memcpy(tmti->mti_svname, marker->cm_tgtname,
2160                                strlen(marker->cm_tgtname));
2161
2162                         RETURN(rc);
2163                 }
2164                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2165                     (marker->cm_flags & CM_END) &&
2166                      !(marker->cm_flags & CM_SKIP)) {
2167                         LASSERT(last_step == marker->cm_step);
2168                         last_step = -1;
2169                         got_an_osc_or_mdc = 0;
2170                         memset(tmti, 0, sizeof(*tmti));
2171                         RETURN(rc);
2172                 }
2173         }
2174
2175         if (got_an_osc_or_mdc == 0 || last_step < 0)
2176                 RETURN(rc);
2177
2178         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
2179                 __u64 nodenid = lcfg->lcfg_nid;
2180
2181                 if (strlen(tmti->mti_uuid) == 0) {
2182                         /* target uuid not set, this config record is before
2183                          * LCFG_SETUP, this nid is one of target node nid.
2184                          */
2185                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
2186                         tmti->mti_nid_count++;
2187                 } else {
2188                         char nidstr[LNET_NIDSTR_SIZE];
2189
2190                         /* failover node nid */
2191                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
2192                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
2193                                         nidstr);
2194                 }
2195
2196                 RETURN(rc);
2197         }
2198
2199         if (lcfg->lcfg_command == LCFG_SETUP) {
2200                 char *target;
2201
2202                 target = lustre_cfg_string(lcfg, 1);
2203                 memcpy(tmti->mti_uuid, target, strlen(target));
2204                 RETURN(rc);
2205         }
2206
2207         /* ignore client side sptlrpc_conf_log */
2208         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
2209                 RETURN(rc);
2210
2211         if (lcfg->lcfg_command == LCFG_ADD_MDC &&
2212             strstr(lustre_cfg_string(lcfg, 0), "-clilmv") != NULL) {
2213                 int index;
2214
2215                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
2216                         RETURN (-EINVAL);
2217
2218                 memcpy(tmti->mti_fsname, mti->mti_fsname,
2219                        strlen(mti->mti_fsname));
2220                 tmti->mti_stripe_index = index;
2221
2222                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
2223                                               mti->mti_stripe_index,
2224                                               mti->mti_svname);
2225                 memset(tmti, 0, sizeof(*tmti));
2226                 RETURN(rc);
2227         }
2228
2229         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
2230                 int index;
2231                 char mdt_index[9];
2232                 char *logname, *lovname;
2233
2234                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2235                                              mti->mti_stripe_index);
2236                 if (rc)
2237                         RETURN(rc);
2238                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
2239
2240                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
2241                         name_destroy(&logname);
2242                         name_destroy(&lovname);
2243                         RETURN(-EINVAL);
2244                 }
2245
2246                 tmti->mti_stripe_index = index;
2247                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
2248                                          mdt_index, lovname,
2249                                          LUSTRE_SP_MDT, 0);
2250                 name_destroy(&logname);
2251                 name_destroy(&lovname);
2252                 RETURN(rc);
2253         }
2254         RETURN(rc);
2255 }
2256
2257 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
2258 /* stealed from mgs_get_fsdb_from_llog*/
2259 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
2260                                               struct mgs_device *mgs,
2261                                               char *client_name,
2262                                               struct temp_comp* comp)
2263 {
2264         struct llog_handle *loghandle;
2265         struct mgs_target_info *tmti;
2266         struct llog_ctxt *ctxt;
2267         int rc;
2268
2269         ENTRY;
2270
2271         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2272         LASSERT(ctxt != NULL);
2273
2274         OBD_ALLOC_PTR(tmti);
2275         if (tmti == NULL)
2276                 GOTO(out_ctxt, rc = -ENOMEM);
2277
2278         comp->comp_tmti = tmti;
2279         comp->comp_obd = mgs->mgs_obd;
2280
2281         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
2282                        LLOG_OPEN_EXISTS);
2283         if (rc < 0) {
2284                 if (rc == -ENOENT)
2285                         rc = 0;
2286                 GOTO(out_pop, rc);
2287         }
2288
2289         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
2290         if (rc)
2291                 GOTO(out_close, rc);
2292
2293         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
2294                                   (void *)comp, NULL, false);
2295         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
2296 out_close:
2297         llog_close(env, loghandle);
2298 out_pop:
2299         OBD_FREE_PTR(tmti);
2300 out_ctxt:
2301         llog_ctxt_put(ctxt);
2302         RETURN(rc);
2303 }
2304
2305 /* lmv is the second thing for client logs */
2306 /* copied from mgs_write_log_lov. Please refer to that.  */
2307 static int mgs_write_log_lmv(const struct lu_env *env,
2308                              struct mgs_device *mgs,
2309                              struct fs_db *fsdb,
2310                              struct mgs_target_info *mti,
2311                              char *logname, char *lmvname)
2312 {
2313         struct llog_handle *llh = NULL;
2314         struct lmv_desc *lmvdesc;
2315         char *uuid;
2316         int rc = 0;
2317         ENTRY;
2318
2319         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
2320
2321         OBD_ALLOC_PTR(lmvdesc);
2322         if (lmvdesc == NULL)
2323                 RETURN(-ENOMEM);
2324         lmvdesc->ld_active_tgt_count = 0;
2325         lmvdesc->ld_tgt_count = 0;
2326         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
2327         uuid = (char *)lmvdesc->ld_uuid.uuid;
2328
2329         rc = record_start_log(env, mgs, &llh, logname);
2330         if (rc)
2331                 GOTO(out_free, rc);
2332         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
2333         if (rc)
2334                 GOTO(out_end, rc);
2335         rc = record_attach(env, llh, lmvname, "lmv", uuid);
2336         if (rc)
2337                 GOTO(out_end, rc);
2338         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
2339         if (rc)
2340                 GOTO(out_end, rc);
2341         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
2342         if (rc)
2343                 GOTO(out_end, rc);
2344 out_end:
2345         record_end_log(env, &llh);
2346 out_free:
2347         OBD_FREE_PTR(lmvdesc);
2348         RETURN(rc);
2349 }
2350
2351 /* lov is the first thing in the mdt and client logs */
2352 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
2353                              struct fs_db *fsdb, struct mgs_target_info *mti,
2354                              char *logname, char *lovname)
2355 {
2356         struct llog_handle *llh = NULL;
2357         struct lov_desc *lovdesc;
2358         char *uuid;
2359         int rc = 0;
2360         ENTRY;
2361
2362         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
2363
2364         /*
2365         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
2366         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
2367               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
2368         */
2369
2370         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
2371         OBD_ALLOC_PTR(lovdesc);
2372         if (lovdesc == NULL)
2373                 RETURN(-ENOMEM);
2374         lovdesc->ld_magic = LOV_DESC_MAGIC;
2375         lovdesc->ld_tgt_count = 0;
2376         /* Defaults.  Can be changed later by lcfg config_param */
2377         lovdesc->ld_default_stripe_count = 1;
2378         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
2379         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
2380         lovdesc->ld_default_stripe_offset = -1;
2381         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
2382         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
2383         /* can these be the same? */
2384         uuid = (char *)lovdesc->ld_uuid.uuid;
2385
2386         /* This should always be the first entry in a log.
2387         rc = mgs_clear_log(obd, logname); */
2388         rc = record_start_log(env, mgs, &llh, logname);
2389         if (rc)
2390                 GOTO(out_free, rc);
2391         /* FIXME these should be a single journal transaction */
2392         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
2393         if (rc)
2394                 GOTO(out_end, rc);
2395         rc = record_attach(env, llh, lovname, "lov", uuid);
2396         if (rc)
2397                 GOTO(out_end, rc);
2398         rc = record_lov_setup(env, llh, lovname, lovdesc);
2399         if (rc)
2400                 GOTO(out_end, rc);
2401         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
2402         if (rc)
2403                 GOTO(out_end, rc);
2404         EXIT;
2405 out_end:
2406         record_end_log(env, &llh);
2407 out_free:
2408         OBD_FREE_PTR(lovdesc);
2409         return rc;
2410 }
2411
2412 /* add failnids to open log */
2413 static int mgs_write_log_failnids(const struct lu_env *env,
2414                                   struct mgs_target_info *mti,
2415                                   struct llog_handle *llh,
2416                                   char *cliname)
2417 {
2418         char *failnodeuuid = NULL;
2419         char *ptr = mti->mti_params;
2420         lnet_nid_t nid;
2421         int rc = 0;
2422
2423         /*
2424         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
2425         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
2426         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
2427         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
2428         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
2429         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
2430         */
2431
2432         /*
2433          * Pull failnid info out of params string, which may contain something
2434          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
2435          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
2436          * etc.  However, convert_hostnames() should have caught those.
2437          */
2438         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2439                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2440                         char nidstr[LNET_NIDSTR_SIZE];
2441
2442                         if (failnodeuuid == NULL) {
2443                                 /* We don't know the failover node name,
2444                                  * so just use the first nid as the uuid */
2445                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
2446                                 rc = name_create(&failnodeuuid, nidstr, "");
2447                                 if (rc != 0)
2448                                         return rc;
2449                         }
2450                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2451                                 "client %s\n",
2452                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
2453                                 failnodeuuid, cliname);
2454                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2455                         /*
2456                          * If *ptr is ':', we have added all NIDs for
2457                          * failnodeuuid.
2458                          */
2459                         if (*ptr == ':') {
2460                                 rc = record_add_conn(env, llh, cliname,
2461                                                      failnodeuuid);
2462                                 name_destroy(&failnodeuuid);
2463                                 failnodeuuid = NULL;
2464                         }
2465                 }
2466                 if (failnodeuuid) {
2467                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2468                         name_destroy(&failnodeuuid);
2469                         failnodeuuid = NULL;
2470                 }
2471         }
2472
2473         return rc;
2474 }
2475
2476 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2477                                     struct mgs_device *mgs,
2478                                     struct fs_db *fsdb,
2479                                     struct mgs_target_info *mti,
2480                                     char *logname, char *lmvname)
2481 {
2482         struct llog_handle *llh = NULL;
2483         char *mdcname = NULL;
2484         char *nodeuuid = NULL;
2485         char *mdcuuid = NULL;
2486         char *lmvuuid = NULL;
2487         char index[6];
2488         char nidstr[LNET_NIDSTR_SIZE];
2489         int i, rc;
2490         ENTRY;
2491
2492         if (mgs_log_is_empty(env, mgs, logname)) {
2493                 CERROR("log is empty! Logical error\n");
2494                 RETURN(-EINVAL);
2495         }
2496
2497         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2498                mti->mti_svname, logname, lmvname);
2499
2500         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2501         rc = name_create(&nodeuuid, nidstr, "");
2502         if (rc)
2503                 RETURN(rc);
2504         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2505         if (rc)
2506                 GOTO(out_free, rc);
2507         rc = name_create(&mdcuuid, mdcname, "_UUID");
2508         if (rc)
2509                 GOTO(out_free, rc);
2510         rc = name_create(&lmvuuid, lmvname, "_UUID");
2511         if (rc)
2512                 GOTO(out_free, rc);
2513
2514         rc = record_start_log(env, mgs, &llh, logname);
2515         if (rc)
2516                 GOTO(out_free, rc);
2517         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2518                            "add mdc");
2519         if (rc)
2520                 GOTO(out_end, rc);
2521         for (i = 0; i < mti->mti_nid_count; i++) {
2522                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2523                         libcfs_nid2str_r(mti->mti_nids[i],
2524                                          nidstr, sizeof(nidstr)));
2525
2526                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2527                 if (rc)
2528                         GOTO(out_end, rc);
2529         }
2530
2531         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2532         if (rc)
2533                 GOTO(out_end, rc);
2534         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2535                           NULL, NULL);
2536         if (rc)
2537                 GOTO(out_end, rc);
2538         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2539         if (rc)
2540                 GOTO(out_end, rc);
2541         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2542         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2543                             index, "1");
2544         if (rc)
2545                 GOTO(out_end, rc);
2546         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2547                            "add mdc");
2548         if (rc)
2549                 GOTO(out_end, rc);
2550 out_end:
2551         record_end_log(env, &llh);
2552 out_free:
2553         name_destroy(&lmvuuid);
2554         name_destroy(&mdcuuid);
2555         name_destroy(&mdcname);
2556         name_destroy(&nodeuuid);
2557         RETURN(rc);
2558 }
2559
2560 static inline int name_create_lov(char **lovname, char *mdtname,
2561                                   struct fs_db *fsdb, int index)
2562 {
2563         /* COMPAT_180 */
2564         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2565                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2566         else
2567                 return name_create(lovname, mdtname, "-mdtlov");
2568 }
2569
2570 static int name_create_mdt_and_lov(char **logname, char **lovname,
2571                                    struct fs_db *fsdb, int i)
2572 {
2573         int rc;
2574
2575         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2576         if (rc)
2577                 return rc;
2578         /* COMPAT_180 */
2579         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2580                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2581         else
2582                 rc = name_create(lovname, *logname, "-mdtlov");
2583         if (rc) {
2584                 name_destroy(logname);
2585                 *logname = NULL;
2586         }
2587         return rc;
2588 }
2589
2590 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2591                                       struct fs_db *fsdb, int i)
2592 {
2593         char suffix[16];
2594
2595         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2596                 sprintf(suffix, "-osc");
2597         else
2598                 sprintf(suffix, "-osc-MDT%04x", i);
2599         return name_create(oscname, ostname, suffix);
2600 }
2601
2602 /* add new mdc to already existent MDS */
2603 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2604                                     struct mgs_device *mgs,
2605                                     struct fs_db *fsdb,
2606                                     struct mgs_target_info *mti,
2607                                     int mdt_index, char *logname)
2608 {
2609         struct llog_handle      *llh = NULL;
2610         char    *nodeuuid = NULL;
2611         char    *ospname = NULL;
2612         char    *lovuuid = NULL;
2613         char    *mdtuuid = NULL;
2614         char    *svname = NULL;
2615         char    *mdtname = NULL;
2616         char    *lovname = NULL;
2617         char    index_str[16];
2618         char    nidstr[LNET_NIDSTR_SIZE];
2619         int     i, rc;
2620
2621         ENTRY;
2622         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2623                 CERROR("log is empty! Logical error\n");
2624                 RETURN (-EINVAL);
2625         }
2626
2627         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2628                logname);
2629
2630         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2631         if (rc)
2632                 RETURN(rc);
2633
2634         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2635         rc = name_create(&nodeuuid, nidstr, "");
2636         if (rc)
2637                 GOTO(out_destory, rc);
2638
2639         rc = name_create(&svname, mdtname, "-osp");
2640         if (rc)
2641                 GOTO(out_destory, rc);
2642
2643         sprintf(index_str, "-MDT%04x", mdt_index);
2644         rc = name_create(&ospname, svname, index_str);
2645         if (rc)
2646                 GOTO(out_destory, rc);
2647
2648         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2649         if (rc)
2650                 GOTO(out_destory, rc);
2651
2652         rc = name_create(&lovuuid, lovname, "_UUID");
2653         if (rc)
2654                 GOTO(out_destory, rc);
2655
2656         rc = name_create(&mdtuuid, mdtname, "_UUID");
2657         if (rc)
2658                 GOTO(out_destory, rc);
2659
2660         rc = record_start_log(env, mgs, &llh, logname);
2661         if (rc)
2662                 GOTO(out_destory, rc);
2663
2664         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2665                            "add osp");
2666         if (rc)
2667                 GOTO(out_destory, rc);
2668
2669         for (i = 0; i < mti->mti_nid_count; i++) {
2670                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2671                         libcfs_nid2str_r(mti->mti_nids[i],
2672                                          nidstr, sizeof(nidstr)));
2673                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2674                 if (rc)
2675                         GOTO(out_end, rc);
2676         }
2677
2678         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2679         if (rc)
2680                 GOTO(out_end, rc);
2681
2682         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2683                           NULL, NULL);
2684         if (rc)
2685                 GOTO(out_end, rc);
2686
2687         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2688         if (rc)
2689                 GOTO(out_end, rc);
2690
2691         /* Add mdc(osp) to lod */
2692         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2693         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2694                          index_str, "1", NULL);
2695         if (rc)
2696                 GOTO(out_end, rc);
2697
2698         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2699         if (rc)
2700                 GOTO(out_end, rc);
2701
2702 out_end:
2703         record_end_log(env, &llh);
2704
2705 out_destory:
2706         name_destroy(&mdtuuid);
2707         name_destroy(&lovuuid);
2708         name_destroy(&lovname);
2709         name_destroy(&ospname);
2710         name_destroy(&svname);
2711         name_destroy(&nodeuuid);
2712         name_destroy(&mdtname);
2713         RETURN(rc);
2714 }
2715
2716 static int mgs_write_log_mdt0(const struct lu_env *env,
2717                               struct mgs_device *mgs,
2718                               struct fs_db *fsdb,
2719                               struct mgs_target_info *mti)
2720 {
2721         char *log = mti->mti_svname;
2722         struct llog_handle *llh = NULL;
2723         char *uuid, *lovname;
2724         char mdt_index[6];
2725         char *ptr = mti->mti_params;
2726         int rc = 0, failout = 0;
2727         ENTRY;
2728
2729         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2730         if (uuid == NULL)
2731                 RETURN(-ENOMEM);
2732
2733         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2734                 failout = (strncmp(ptr, "failout", 7) == 0);
2735
2736         rc = name_create(&lovname, log, "-mdtlov");
2737         if (rc)
2738                 GOTO(out_free, rc);
2739         if (mgs_log_is_empty(env, mgs, log)) {
2740                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2741                 if (rc)
2742                         GOTO(out_lod, rc);
2743         }
2744
2745         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2746
2747         rc = record_start_log(env, mgs, &llh, log);
2748         if (rc)
2749                 GOTO(out_lod, rc);
2750
2751         /* add MDT itself */
2752
2753         /* FIXME this whole fn should be a single journal transaction */
2754         sprintf(uuid, "%s_UUID", log);
2755         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2756         if (rc)
2757                 GOTO(out_lod, rc);
2758         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2759         if (rc)
2760                 GOTO(out_end, rc);
2761         rc = record_mount_opt(env, llh, log, lovname, NULL);
2762         if (rc)
2763                 GOTO(out_end, rc);
2764         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2765                         failout ? "n" : "f");
2766         if (rc)
2767                 GOTO(out_end, rc);
2768         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2769         if (rc)
2770                 GOTO(out_end, rc);
2771 out_end:
2772         record_end_log(env, &llh);
2773 out_lod:
2774         name_destroy(&lovname);
2775 out_free:
2776         OBD_FREE(uuid, sizeof(struct obd_uuid));
2777         RETURN(rc);
2778 }
2779
2780 /* envelope method for all layers log */
2781 static int mgs_write_log_mdt(const struct lu_env *env,
2782                              struct mgs_device *mgs,
2783                              struct fs_db *fsdb,
2784                              struct mgs_target_info *mti)
2785 {
2786         struct mgs_thread_info *mgi = mgs_env_info(env);
2787         struct llog_handle *llh = NULL;
2788         char *cliname;
2789         int rc, i = 0;
2790         ENTRY;
2791
2792         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2793
2794         if (mti->mti_uuid[0] == '\0') {
2795                 /* Make up our own uuid */
2796                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2797                          "%s_UUID", mti->mti_svname);
2798         }
2799
2800         /* add mdt */
2801         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2802         if (rc)
2803                 RETURN(rc);
2804         /* Append the mdt info to the client log */
2805         rc = name_create(&cliname, mti->mti_fsname, "-client");
2806         if (rc)
2807                 RETURN(rc);
2808
2809         if (mgs_log_is_empty(env, mgs, cliname)) {
2810                 /* Start client log */
2811                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2812                                        fsdb->fsdb_clilov);
2813                 if (rc)
2814                         GOTO(out_free, rc);
2815                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2816                                        fsdb->fsdb_clilmv);
2817                 if (rc)
2818                         GOTO(out_free, rc);
2819         }
2820
2821         /*
2822         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2823         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2824         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2825         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2826         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2827         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2828         */
2829
2830         /* copy client info about lov/lmv */
2831         mgi->mgi_comp.comp_mti = mti;
2832         mgi->mgi_comp.comp_fsdb = fsdb;
2833
2834         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2835                                                 &mgi->mgi_comp);
2836         if (rc)
2837                 GOTO(out_free, rc);
2838         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2839                                       fsdb->fsdb_clilmv);
2840         if (rc)
2841                 GOTO(out_free, rc);
2842
2843         /* add mountopts */
2844         rc = record_start_log(env, mgs, &llh, cliname);
2845         if (rc)
2846                 GOTO(out_free, rc);
2847
2848         rc = record_marker(env, llh, fsdb, CM_START, cliname, "mount opts");
2849         if (rc)
2850                 GOTO(out_end, rc);
2851         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2852                               fsdb->fsdb_clilmv);
2853         if (rc)
2854                 GOTO(out_end, rc);
2855         rc = record_marker(env, llh, fsdb, CM_END, cliname, "mount opts");
2856         if (rc)
2857                 GOTO(out_end, rc);
2858
2859         /* for_all_existing_mdt except current one */
2860         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2861                 if (i !=  mti->mti_stripe_index &&
2862                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2863                         char *logname;
2864
2865                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2866                         if (rc)
2867                                 GOTO(out_end, rc);
2868
2869                         /* NB: If the log for the MDT is empty, it means
2870                          * the MDT is only added to the index
2871                          * map, and not being process yet, i.e. this
2872                          * is an unregistered MDT, see mgs_write_log_target().
2873                          * so we should skip it. Otherwise
2874                          *
2875                          * 1. MGS get register request for MDT1 and MDT2.
2876                          *
2877                          * 2. Then both MDT1 and MDT2 are added into
2878                          * fsdb_mdt_index_map. (see mgs_set_index()).
2879                          *
2880                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2881                          * generate the config log, here, it will regard MDT2
2882                          * as an existent MDT, and generate "add osp" for
2883                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2884                          * MDT0002 config log is still empty, so it will
2885                          * add "add osp" even before "lov setup", which
2886                          * will definitly cause trouble.
2887                          *
2888                          * 4. MDT1 registeration finished, fsdb_mutex is
2889                          * released, then MDT2 get in, then in above
2890                          * mgs_steal_llog_for_mdt_from_client(), it will
2891                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2892                          * which will cause another trouble.*/
2893                         if (!mgs_log_is_empty(env, mgs, logname))
2894                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2895                                                               mti, i, logname);
2896
2897                         name_destroy(&logname);
2898                         if (rc)
2899                                 GOTO(out_end, rc);
2900                 }
2901         }
2902 out_end:
2903         record_end_log(env, &llh);
2904 out_free:
2905         name_destroy(&cliname);
2906         RETURN(rc);
2907 }
2908
2909 /* Add the ost info to the client/mdt lov */
2910 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2911                                     struct mgs_device *mgs, struct fs_db *fsdb,
2912                                     struct mgs_target_info *mti,
2913                                     char *logname, char *suffix, char *lovname,
2914                                     enum lustre_sec_part sec_part, int flags)
2915 {
2916         struct llog_handle *llh = NULL;
2917         char *nodeuuid = NULL;
2918         char *oscname = NULL;
2919         char *oscuuid = NULL;
2920         char *lovuuid = NULL;
2921         char *svname = NULL;
2922         char index[6];
2923         char nidstr[LNET_NIDSTR_SIZE];
2924         int i, rc;
2925         ENTRY;
2926
2927         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2928                 mti->mti_svname, logname);
2929
2930         if (mgs_log_is_empty(env, mgs, logname)) {
2931                 CERROR("log is empty! Logical error\n");
2932                 RETURN(-EINVAL);
2933         }
2934
2935         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2936         rc = name_create(&nodeuuid, nidstr, "");
2937         if (rc)
2938                 RETURN(rc);
2939         rc = name_create(&svname, mti->mti_svname, "-osc");
2940         if (rc)
2941                 GOTO(out_free, rc);
2942
2943         /* for the system upgraded from old 1.8, keep using the old osc naming
2944          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2945         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2946                 rc = name_create(&oscname, svname, "");
2947         else
2948                 rc = name_create(&oscname, svname, suffix);
2949         if (rc)
2950                 GOTO(out_free, rc);
2951
2952         rc = name_create(&oscuuid, oscname, "_UUID");
2953         if (rc)
2954                 GOTO(out_free, rc);
2955         rc = name_create(&lovuuid, lovname, "_UUID");
2956         if (rc)
2957                 GOTO(out_free, rc);
2958
2959
2960         /*
2961         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2962         multihomed (#4)
2963         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2964         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2965         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2966         failover (#6,7)
2967         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2968         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2969         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2970         */
2971
2972         rc = record_start_log(env, mgs, &llh, logname);
2973         if (rc)
2974                 GOTO(out_free, rc);
2975
2976         /* FIXME these should be a single journal transaction */
2977         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2978                            "add osc");
2979         if (rc)
2980                 GOTO(out_end, rc);
2981
2982         /* NB: don't change record order, because upon MDT steal OSC config
2983          * from client, it treats all nids before LCFG_SETUP as target nids
2984          * (multiple interfaces), while nids after as failover node nids.
2985          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2986          */
2987         for (i = 0; i < mti->mti_nid_count; i++) {
2988                 CDEBUG(D_MGS, "add nid %s\n",
2989                         libcfs_nid2str_r(mti->mti_nids[i],
2990                                          nidstr, sizeof(nidstr)));
2991                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2992                 if (rc)
2993                         GOTO(out_end, rc);
2994         }
2995         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2996         if (rc)
2997                 GOTO(out_end, rc);
2998         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2999                           NULL, NULL);
3000         if (rc)
3001                 GOTO(out_end, rc);
3002         rc = mgs_write_log_failnids(env, mti, llh, oscname);
3003         if (rc)
3004                 GOTO(out_end, rc);
3005
3006         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
3007
3008         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
3009         if (rc)
3010                 GOTO(out_end, rc);
3011         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
3012                            "add osc");
3013         if (rc)
3014                 GOTO(out_end, rc);
3015 out_end:
3016         record_end_log(env, &llh);
3017 out_free:
3018         name_destroy(&lovuuid);
3019         name_destroy(&oscuuid);
3020         name_destroy(&oscname);
3021         name_destroy(&svname);
3022         name_destroy(&nodeuuid);
3023         RETURN(rc);
3024 }
3025
3026 static int mgs_write_log_ost(const struct lu_env *env,
3027                              struct mgs_device *mgs, struct fs_db *fsdb,
3028                              struct mgs_target_info *mti)
3029 {
3030         struct llog_handle *llh = NULL;
3031         char *logname, *lovname;
3032         char *ptr = mti->mti_params;
3033         int rc, flags = 0, failout = 0, i;
3034         ENTRY;
3035
3036         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
3037
3038         /* The ost startup log */
3039
3040         /* If the ost log already exists, that means that someone reformatted
3041            the ost and it called target_add again. */
3042         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3043                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
3044                                    "exists, yet the server claims it never "
3045                                    "registered. It may have been reformatted, "
3046                                    "or the index changed. writeconf the MDT to "
3047                                    "regenerate all logs.\n", mti->mti_svname);
3048                 RETURN(-EALREADY);
3049         }
3050
3051         /*
3052         attach obdfilter ost1 ost1_UUID
3053         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
3054         */
3055         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
3056                 failout = (strncmp(ptr, "failout", 7) == 0);
3057         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
3058         if (rc)
3059                 RETURN(rc);
3060         /* FIXME these should be a single journal transaction */
3061         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
3062         if (rc)
3063                 GOTO(out_end, rc);
3064         if (*mti->mti_uuid == '\0')
3065                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
3066                          "%s_UUID", mti->mti_svname);
3067         rc = record_attach(env, llh, mti->mti_svname,
3068                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
3069         if (rc)
3070                 GOTO(out_end, rc);
3071         rc = record_setup(env, llh, mti->mti_svname,
3072                           "dev"/*ignored*/, "type"/*ignored*/,
3073                           failout ? "n" : "f", NULL/*options*/);
3074         if (rc)
3075                 GOTO(out_end, rc);
3076         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
3077         if (rc)
3078                 GOTO(out_end, rc);
3079 out_end:
3080         record_end_log(env, &llh);
3081         if (rc)
3082                 RETURN(rc);
3083         /* We also have to update the other logs where this osc is part of
3084            the lov */
3085
3086         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3087                 /* If we're upgrading, the old mdt log already has our
3088                    entry. Let's do a fake one for fun. */
3089                 /* Note that we can't add any new failnids, since we don't
3090                    know the old osc names. */
3091                 flags = CM_SKIP | CM_UPGRADE146;
3092
3093         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
3094                 /* If the update flag isn't set, don't update client/mdt
3095                    logs. */
3096                 flags |= CM_SKIP;
3097                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
3098                               "the MDT first to regenerate it.\n",
3099                               mti->mti_svname);
3100         }
3101
3102         /* Add ost to all MDT lov defs */
3103         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3104                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
3105                         char mdt_index[13];
3106
3107                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
3108                                                      i);
3109                         if (rc)
3110                                 RETURN(rc);
3111
3112                         snprintf(mdt_index, sizeof(mdt_index), "-MDT%04x", i);
3113                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
3114                                                       logname, mdt_index,
3115                                                       lovname, LUSTRE_SP_MDT,
3116                                                       flags);
3117                         name_destroy(&logname);
3118                         name_destroy(&lovname);
3119                         if (rc)
3120                                 RETURN(rc);
3121                 }
3122         }
3123
3124         /* Append ost info to the client log */
3125         rc = name_create(&logname, mti->mti_fsname, "-client");
3126         if (rc)
3127                 RETURN(rc);
3128         if (mgs_log_is_empty(env, mgs, logname)) {
3129                 /* Start client log */
3130                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
3131                                        fsdb->fsdb_clilov);
3132                 if (rc)
3133                         GOTO(out_free, rc);
3134                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
3135                                        fsdb->fsdb_clilmv);
3136                 if (rc)
3137                         GOTO(out_free, rc);
3138         }
3139         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
3140                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
3141 out_free:
3142         name_destroy(&logname);
3143         RETURN(rc);
3144 }
3145
3146 static __inline__ int mgs_param_empty(char *ptr)
3147 {
3148         char *tmp;
3149
3150         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
3151                 return 1;
3152         return 0;
3153 }
3154
3155 static int mgs_write_log_failnid_internal(const struct lu_env *env,
3156                                           struct mgs_device *mgs,
3157                                           struct fs_db *fsdb,
3158                                           struct mgs_target_info *mti,
3159                                           char *logname, char *cliname)
3160 {
3161         int rc;
3162         struct llog_handle *llh = NULL;
3163
3164         if (mgs_param_empty(mti->mti_params)) {
3165                 /* Remove _all_ failnids */
3166                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3167                                 mti->mti_svname, "add failnid", CM_SKIP);
3168                 return rc < 0 ? rc : 0;
3169         }
3170
3171         /* Otherwise failover nids are additive */
3172         rc = record_start_log(env, mgs, &llh, logname);
3173         if (rc)
3174                 return rc;
3175                 /* FIXME this should be a single journal transaction */
3176         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
3177                            "add failnid");
3178         if (rc)
3179                 goto out_end;
3180         rc = mgs_write_log_failnids(env, mti, llh, cliname);
3181         if (rc)
3182                 goto out_end;
3183         rc = record_marker(env, llh, fsdb, CM_END,
3184                            mti->mti_svname, "add failnid");
3185 out_end:
3186         record_end_log(env, &llh);
3187         return rc;
3188 }
3189
3190
3191 /* Add additional failnids to an existing log.
3192    The mdc/osc must have been added to logs first */
3193 /* tcp nids must be in dotted-quad ascii -
3194    we can't resolve hostnames from the kernel. */
3195 static int mgs_write_log_add_failnid(const struct lu_env *env,
3196                                      struct mgs_device *mgs,
3197                                      struct fs_db *fsdb,
3198                                      struct mgs_target_info *mti)
3199 {
3200         char *logname, *cliname;
3201         int rc;
3202         ENTRY;
3203
3204         /* FIXME we currently can't erase the failnids
3205          * given when a target first registers, since they aren't part of
3206          * an "add uuid" stanza
3207          */
3208
3209         /* Verify that we know about this target */
3210         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3211                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
3212                                    "yet. It must be started before failnids "
3213                                    "can be added.\n", mti->mti_svname);
3214                 RETURN(-ENOENT);
3215         }
3216
3217         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
3218         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3219                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
3220         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3221                 rc = name_create(&cliname, mti->mti_svname, "-osc");
3222         } else {
3223                 RETURN(-EINVAL);
3224         }
3225         if (rc)
3226                 RETURN(rc);
3227
3228         /* Add failover nids to the client log */
3229         rc = name_create(&logname, mti->mti_fsname, "-client");
3230         if (rc) {
3231                 name_destroy(&cliname);
3232                 RETURN(rc);
3233         }
3234