Whamcloud - gitweb
LU-10384 mgs: replace_nids large string and failover support
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgs/mgs_llog.c
33  *
34  * Lustre Management Server (mgs) config llog creation
35  *
36  * Author: Nathan Rutman <nathan@clusterfs.com>
37  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
38  * Author: Mikhail Pershin <tappro@whamcloud.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MGS
42 #define D_MGS D_CONFIG
43
44 #include <obd.h>
45 #include <uapi/linux/lustre/lustre_ioctl.h>
46 #include <uapi/linux/lustre/lustre_param.h>
47 #include <lustre_sec.h>
48 #include <lustre_quota.h>
49 #include <lustre_sec.h>
50
51 #include "mgs_internal.h"
52
53 /********************** Class functions ********************/
54
55 /**
56  * Find all logs in CONFIG directory and link then into list.
57  *
58  * \param[in] env       pointer to the thread context
59  * \param[in] mgs       pointer to the mgs device
60  * \param[out] log_list the list to hold the found llog name entry
61  *
62  * \retval              0 for success
63  * \retval              negative error number on failure
64  **/
65 int class_dentry_readdir(const struct lu_env *env, struct mgs_device *mgs,
66                          struct list_head *log_list)
67 {
68         struct dt_object *dir = mgs->mgs_configs_dir;
69         const struct dt_it_ops *iops;
70         struct dt_it *it;
71         struct mgs_direntry *de;
72         char *key;
73         int rc, key_sz;
74
75         INIT_LIST_HEAD(log_list);
76
77         LASSERT(dir);
78         LASSERT(dir->do_index_ops);
79
80         iops = &dir->do_index_ops->dio_it;
81         it = iops->init(env, dir, LUDA_64BITHASH);
82         if (IS_ERR(it))
83                 RETURN(PTR_ERR(it));
84
85         rc = iops->load(env, it, 0);
86         if (rc <= 0)
87                 GOTO(fini, rc = 0);
88
89         /* main cycle */
90         do {
91                 key = (void *)iops->key(env, it);
92                 if (IS_ERR(key)) {
93                         CERROR("%s: key failed when listing %s: rc = %d\n",
94                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
95                                (int) PTR_ERR(key));
96                         goto next;
97                 }
98                 key_sz = iops->key_size(env, it);
99                 LASSERT(key_sz > 0);
100
101                 /* filter out "." and ".." entries */
102                 if (key[0] == '.') {
103                         if (key_sz == 1)
104                                 goto next;
105                         if (key_sz == 2 && key[1] == '.')
106                                 goto next;
107                 }
108
109                 /* filter out ".bak" files */
110                 /* sizeof(".bak") - 1 == 3 */
111                 if (key_sz >= 3 &&
112                     !memcmp(".bak", key + key_sz - 3, 3)) {
113                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
114                                key_sz, key);
115                         goto next;
116                 }
117
118                 de = mgs_direntry_alloc(key_sz + 1);
119                 if (de == NULL) {
120                         rc = -ENOMEM;
121                         break;
122                 }
123
124                 memcpy(de->mde_name, key, key_sz);
125                 de->mde_name[key_sz] = 0;
126
127                 list_add(&de->mde_list, log_list);
128
129 next:
130                 rc = iops->next(env, it);
131         } while (rc == 0);
132         if (rc > 0)
133                 rc = 0;
134
135         iops->put(env, it);
136
137 fini:
138         iops->fini(env, it);
139         if (rc) {
140                 struct mgs_direntry *n;
141
142                 CERROR("%s: key failed when listing %s: rc = %d\n",
143                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
144
145                 list_for_each_entry_safe(de, n, log_list, mde_list) {
146                         list_del_init(&de->mde_list);
147                         mgs_direntry_free(de);
148                 }
149         }
150
151         RETURN(rc);
152 }
153
154 /******************** DB functions *********************/
155
156 static inline int name_create(char **newname, char *prefix, char *suffix)
157 {
158         LASSERT(newname);
159         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
160         if (!*newname)
161                 return -ENOMEM;
162         sprintf(*newname, "%s%s", prefix, suffix);
163         return 0;
164 }
165
166 static inline void name_destroy(char **name)
167 {
168         if (*name)
169                 OBD_FREE(*name, strlen(*name) + 1);
170         *name = NULL;
171 }
172
173 struct mgs_fsdb_handler_data
174 {
175         struct fs_db   *fsdb;
176         __u32           ver;
177 };
178
179 /* from the (client) config log, figure out:
180         1. which ost's/mdt's are configured (by index)
181         2. what the last config step is
182         3. COMPAT_18 osc name
183 */
184 /* It might be better to have a separate db file, instead of parsing the info
185    out of the client log.  This is slow and potentially error-prone. */
186 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
187                             struct llog_rec_hdr *rec, void *data)
188 {
189         struct mgs_fsdb_handler_data *d = data;
190         struct fs_db *fsdb = d->fsdb;
191         int cfg_len = rec->lrh_len;
192         char *cfg_buf = (char*) (rec + 1);
193         struct lustre_cfg *lcfg;
194         __u32 index;
195         int rc = 0;
196         ENTRY;
197
198         if (rec->lrh_type != OBD_CFG_REC) {
199                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
200                 RETURN(-EINVAL);
201         }
202
203         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
204         if (rc) {
205                 CERROR("Insane cfg\n");
206                 RETURN(rc);
207         }
208
209         lcfg = (struct lustre_cfg *)cfg_buf;
210
211         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
212                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
213
214         /* Figure out ost indicies */
215         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
216         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
217             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
218                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
219                                        NULL, 10);
220                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
221                        lustre_cfg_string(lcfg, 1), index,
222                        lustre_cfg_string(lcfg, 2));
223                 set_bit(index, fsdb->fsdb_ost_index_map);
224         }
225
226         /* Figure out mdt indicies */
227         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
228         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
229             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
230                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
231                                        &index, NULL);
232                 if (rc != LDD_F_SV_TYPE_MDT) {
233                         CWARN("Unparsable MDC name %s, assuming index 0\n",
234                               lustre_cfg_string(lcfg, 0));
235                         index = 0;
236                 }
237                 rc = 0;
238                 CDEBUG(D_MGS, "MDT index is %u\n", index);
239                 if (!test_bit(index, fsdb->fsdb_mdt_index_map)) {
240                         set_bit(index, fsdb->fsdb_mdt_index_map);
241                         fsdb->fsdb_mdt_count++;
242                 }
243         }
244
245         /**
246          * figure out the old config. fsdb_gen = 0 means old log
247          * It is obsoleted and not supported anymore
248          */
249         if (fsdb->fsdb_gen == 0) {
250                 CERROR("Old config format is not supported\n");
251                 RETURN(-EINVAL);
252         }
253
254         /*
255          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
256          */
257         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
258             lcfg->lcfg_command == LCFG_ATTACH &&
259             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
260                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
261                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
262                         CWARN("MDT using 1.8 OSC name scheme\n");
263                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
264                 }
265         }
266
267         if (lcfg->lcfg_command == LCFG_MARKER) {
268                 struct cfg_marker *marker;
269                 marker = lustre_cfg_buf(lcfg, 1);
270
271                 d->ver = marker->cm_vers;
272
273                 /* Keep track of the latest marker step */
274                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
275         }
276
277         RETURN(rc);
278 }
279
280 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
281 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
282                                   struct mgs_device *mgs,
283                                   struct fs_db *fsdb)
284 {
285         char *logname;
286         struct llog_handle *loghandle;
287         struct llog_ctxt *ctxt;
288         struct mgs_fsdb_handler_data d = {
289                 .fsdb = fsdb,
290         };
291         int rc;
292
293         ENTRY;
294
295         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
296         LASSERT(ctxt != NULL);
297         rc = name_create(&logname, fsdb->fsdb_name, "-client");
298         if (rc)
299                 GOTO(out_put, rc);
300         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
301         if (rc)
302                 GOTO(out_pop, rc);
303
304         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
305         if (rc)
306                 GOTO(out_close, rc);
307
308         if (llog_get_size(loghandle) <= 1)
309                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
310
311         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
312         CDEBUG(D_INFO, "get_db = %d\n", rc);
313 out_close:
314         llog_close(env, loghandle);
315 out_pop:
316         name_destroy(&logname);
317 out_put:
318         llog_ctxt_put(ctxt);
319
320         RETURN(rc);
321 }
322
323 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
324 {
325         struct mgs_tgt_srpc_conf *tgtconf;
326
327         /* free target-specific rules */
328         while (fsdb->fsdb_srpc_tgt) {
329                 tgtconf = fsdb->fsdb_srpc_tgt;
330                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
331
332                 LASSERT(tgtconf->mtsc_tgt);
333
334                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
335                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
336                 OBD_FREE_PTR(tgtconf);
337         }
338
339         /* free general rules */
340         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
341 }
342
343 static void mgs_unlink_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
344 {
345         mutex_lock(&mgs->mgs_mutex);
346         if (likely(!list_empty(&fsdb->fsdb_list))) {
347                 LASSERTF(atomic_read(&fsdb->fsdb_ref) >= 2,
348                          "Invalid ref %d on %s\n",
349                          atomic_read(&fsdb->fsdb_ref),
350                          fsdb->fsdb_name);
351
352                 list_del_init(&fsdb->fsdb_list);
353                 /* Drop the reference on the list.*/
354                 mgs_put_fsdb(mgs, fsdb);
355         }
356         mutex_unlock(&mgs->mgs_mutex);
357 }
358
359 /* The caller must hold mgs->mgs_mutex. */
360 static inline struct fs_db *
361 mgs_find_fsdb_noref(struct mgs_device *mgs, const char *fsname)
362 {
363         struct fs_db *fsdb;
364         struct list_head *tmp;
365
366         list_for_each(tmp, &mgs->mgs_fs_db_list) {
367                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
368                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
369                         return fsdb;
370         }
371
372         return NULL;
373 }
374
375 /* The caller must hold mgs->mgs_mutex. */
376 static void mgs_remove_fsdb_by_name(struct mgs_device *mgs, const char *name)
377 {
378         struct fs_db *fsdb;
379
380         fsdb = mgs_find_fsdb_noref(mgs, name);
381         if (fsdb) {
382                 list_del_init(&fsdb->fsdb_list);
383                 /* Drop the reference on the list.*/
384                 mgs_put_fsdb(mgs, fsdb);
385         }
386 }
387
388 /* The caller must hold mgs->mgs_mutex. */
389 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, const char *fsname)
390 {
391         struct fs_db *fsdb;
392
393         fsdb = mgs_find_fsdb_noref(mgs, fsname);
394         if (fsdb)
395                 atomic_inc(&fsdb->fsdb_ref);
396
397         return fsdb;
398 }
399
400 /* The caller must hold mgs->mgs_mutex. */
401 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
402                                   struct mgs_device *mgs, char *fsname)
403 {
404         struct fs_db *fsdb;
405         int rc;
406         ENTRY;
407
408         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
409                 CERROR("fsname %s is too long\n", fsname);
410
411                 RETURN(ERR_PTR(-EINVAL));
412         }
413
414         OBD_ALLOC_PTR(fsdb);
415         if (!fsdb)
416                 RETURN(ERR_PTR(-ENOMEM));
417
418         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
419         mutex_init(&fsdb->fsdb_mutex);
420         INIT_LIST_HEAD(&fsdb->fsdb_list);
421         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
422         fsdb->fsdb_gen = 1;
423         INIT_LIST_HEAD(&fsdb->fsdb_clients);
424         atomic_set(&fsdb->fsdb_notify_phase, 0);
425         init_waitqueue_head(&fsdb->fsdb_notify_waitq);
426         init_completion(&fsdb->fsdb_notify_comp);
427
428         if (strcmp(fsname, MGSSELF_NAME) == 0) {
429                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
430                 fsdb->fsdb_mgs = mgs;
431                 if (logname_is_barrier(fsname))
432                         goto add;
433         } else {
434                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
435                 if (!fsdb->fsdb_mdt_index_map) {
436                         CERROR("No memory for MDT index maps\n");
437
438                         GOTO(err, rc = -ENOMEM);
439                 }
440
441                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
442                 if (!fsdb->fsdb_ost_index_map) {
443                         CERROR("No memory for OST index maps\n");
444
445                         GOTO(err, rc = -ENOMEM);
446                 }
447
448                 if (logname_is_barrier(fsname))
449                         goto add;
450
451                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
452                 if (rc)
453                         GOTO(err, rc);
454
455                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
456                 if (rc)
457                         GOTO(err, rc);
458
459                 /* initialise data for NID table */
460                 mgs_ir_init_fs(env, mgs, fsdb);
461                 lproc_mgs_add_live(mgs, fsdb);
462         }
463
464         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
465             strcmp(PARAMS_FILENAME, fsname) != 0) {
466                 /* populate the db from the client llog */
467                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
468                 if (rc) {
469                         CERROR("Can't get db from client log %d\n", rc);
470
471                         GOTO(err, rc);
472                 }
473         }
474
475         /* populate srpc rules from params llog */
476         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
477         if (rc) {
478                 CERROR("Can't get db from params log %d\n", rc);
479
480                 GOTO(err, rc);
481         }
482
483 add:
484         /* One ref is for the fsdb on the list.
485          * The other ref is for the caller. */
486         atomic_set(&fsdb->fsdb_ref, 2);
487         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
488
489         RETURN(fsdb);
490
491 err:
492         atomic_set(&fsdb->fsdb_ref, 1);
493         mgs_put_fsdb(mgs, fsdb);
494
495         RETURN(ERR_PTR(rc));
496 }
497
498 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
499 {
500         LASSERT(list_empty(&fsdb->fsdb_list));
501
502         lproc_mgs_del_live(mgs, fsdb);
503
504         /* deinitialize fsr */
505         if (fsdb->fsdb_mgs)
506                 mgs_ir_fini_fs(mgs, fsdb);
507
508         if (fsdb->fsdb_ost_index_map)
509                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
510         if (fsdb->fsdb_mdt_index_map)
511                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
512         name_destroy(&fsdb->fsdb_clilov);
513         name_destroy(&fsdb->fsdb_clilmv);
514         mgs_free_fsdb_srpc(fsdb);
515         OBD_FREE_PTR(fsdb);
516 }
517
518 void mgs_put_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
519 {
520         if (atomic_dec_and_test(&fsdb->fsdb_ref))
521                 mgs_free_fsdb(mgs, fsdb);
522 }
523
524 int mgs_init_fsdb_list(struct mgs_device *mgs)
525 {
526         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
527         return 0;
528 }
529
530 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
531 {
532         struct fs_db *fsdb;
533         struct list_head *tmp, *tmp2;
534
535         mutex_lock(&mgs->mgs_mutex);
536         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
537                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
538                 list_del_init(&fsdb->fsdb_list);
539                 mgs_put_fsdb(mgs, fsdb);
540         }
541         mutex_unlock(&mgs->mgs_mutex);
542         return 0;
543 }
544
545 /* The caller must hold mgs->mgs_mutex. */
546 int mgs_find_or_make_fsdb_nolock(const struct lu_env *env,
547                                 struct mgs_device *mgs,
548                                 char *name, struct fs_db **dbh)
549 {
550         struct fs_db *fsdb;
551         int rc = 0;
552         ENTRY;
553
554         fsdb = mgs_find_fsdb(mgs, name);
555         if (!fsdb) {
556                 fsdb = mgs_new_fsdb(env, mgs, name);
557                 if (IS_ERR(fsdb))
558                         rc = PTR_ERR(fsdb);
559
560                 CDEBUG(D_MGS, "Created new db: rc = %d\n", rc);
561         }
562
563         if (!rc)
564                 *dbh = fsdb;
565
566         RETURN(rc);
567 }
568
569 int mgs_find_or_make_fsdb(const struct lu_env *env, struct mgs_device *mgs,
570                           char *name, struct fs_db **dbh)
571 {
572         int rc;
573         ENTRY;
574
575         mutex_lock(&mgs->mgs_mutex);
576         rc = mgs_find_or_make_fsdb_nolock(env, mgs, name, dbh);
577         mutex_unlock(&mgs->mgs_mutex);
578
579         RETURN(rc);
580 }
581
582 /* 1 = index in use
583    0 = index unused
584    -1= empty client log */
585 int mgs_check_index(const struct lu_env *env,
586                     struct mgs_device *mgs,
587                     struct mgs_target_info *mti)
588 {
589         struct fs_db *fsdb;
590         void *imap;
591         int rc = 0;
592         ENTRY;
593
594         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
595
596         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
597         if (rc) {
598                 CERROR("Can't get db for %s\n", mti->mti_fsname);
599                 RETURN(rc);
600         }
601
602         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
603                 GOTO(out, rc = -1);
604
605         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
606                 imap = fsdb->fsdb_ost_index_map;
607         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
608                 imap = fsdb->fsdb_mdt_index_map;
609         else
610                 GOTO(out, rc = -EINVAL);
611
612         if (test_bit(mti->mti_stripe_index, imap))
613                 GOTO(out, rc = 1);
614
615         GOTO(out, rc = 0);
616
617 out:
618         mgs_put_fsdb(mgs, fsdb);
619         return rc;
620 }
621
622 static __inline__ int next_index(void *index_map, int map_len)
623 {
624         int i;
625         for (i = 0; i < map_len * 8; i++)
626                 if (!test_bit(i, index_map)) {
627                          return i;
628                  }
629         CERROR("max index %d exceeded.\n", i);
630         return -1;
631 }
632
633 /* Make the mdt/ost server obd name based on the filesystem name */
634 static bool server_make_name(u32 flags, u16 index, const char *fs,
635                              char *name_buf, size_t name_buf_size)
636 {
637         bool invalid_flag = false;
638
639         if (flags & (LDD_F_SV_TYPE_MDT | LDD_F_SV_TYPE_OST)) {
640                 if (!(flags & LDD_F_SV_ALL))
641                         snprintf(name_buf, name_buf_size, "%.8s%c%s%04x", fs,
642                                 (flags & LDD_F_VIRGIN) ? ':' :
643                                         ((flags & LDD_F_WRITECONF) ? '=' : '-'),
644                                 (flags & LDD_F_SV_TYPE_MDT) ? "MDT" : "OST",
645                                 index);
646         } else if (flags & LDD_F_SV_TYPE_MGS) {
647                 snprintf(name_buf, name_buf_size, "MGS");
648         } else {
649                 CERROR("unknown server type %#x\n", flags);
650                 invalid_flag = true;
651         }
652         return invalid_flag;
653 }
654
655 /* Return codes:
656         0  newly marked as in use
657         <0 err
658         +EALREADY for update of an old index */
659 static int mgs_set_index(const struct lu_env *env,
660                          struct mgs_device *mgs,
661                          struct mgs_target_info *mti)
662 {
663         struct fs_db *fsdb;
664         void *imap;
665         int rc = 0;
666         ENTRY;
667
668         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
669         if (rc) {
670                 CERROR("Can't get db for %s\n", mti->mti_fsname);
671                 RETURN(rc);
672         }
673
674         mutex_lock(&fsdb->fsdb_mutex);
675         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
676                 imap = fsdb->fsdb_ost_index_map;
677         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
678                 imap = fsdb->fsdb_mdt_index_map;
679         } else {
680                 GOTO(out_up, rc = -EINVAL);
681         }
682
683         if (mti->mti_flags & LDD_F_NEED_INDEX) {
684                 rc = next_index(imap, INDEX_MAP_SIZE);
685                 if (rc == -1)
686                         GOTO(out_up, rc = -ERANGE);
687                 mti->mti_stripe_index = rc;
688         }
689
690         /* the last index(0xffff) is reserved for default value. */
691         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
692                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
693                                    "but index must be less than %u.\n",
694                                    mti->mti_svname, mti->mti_stripe_index,
695                                    INDEX_MAP_SIZE * 8 - 1);
696                 GOTO(out_up, rc = -ERANGE);
697         }
698
699         if (test_bit(mti->mti_stripe_index, imap)) {
700                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
701                     !(mti->mti_flags & LDD_F_WRITECONF)) {
702                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
703                                            "%d, but that index is already in "
704                                            "use. Use --writeconf to force\n",
705                                            mti->mti_svname,
706                                            mti->mti_stripe_index);
707                         GOTO(out_up, rc = -EADDRINUSE);
708                 } else {
709                         CDEBUG(D_MGS, "Server %s updating index %d\n",
710                                mti->mti_svname, mti->mti_stripe_index);
711                         GOTO(out_up, rc = EALREADY);
712                 }
713         } else {
714                 set_bit(mti->mti_stripe_index, imap);
715                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
716                         fsdb->fsdb_mdt_count++;
717         }
718
719         set_bit(mti->mti_stripe_index, imap);
720         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
721         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
722                              mti->mti_stripe_index, mti->mti_fsname,
723                              mti->mti_svname, sizeof(mti->mti_svname))) {
724                 CERROR("unknown server type %#x\n", mti->mti_flags);
725                 GOTO(out_up, rc = -EINVAL);
726         }
727
728         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
729                mti->mti_stripe_index);
730
731         GOTO(out_up, rc = 0);
732
733 out_up:
734         mutex_unlock(&fsdb->fsdb_mutex);
735         mgs_put_fsdb(mgs, fsdb);
736         return rc;
737 }
738
739 struct mgs_modify_lookup {
740         struct cfg_marker mml_marker;
741         int               mml_modified;
742 };
743
744 static int mgs_check_record_match(const struct lu_env *env,
745                                 struct llog_handle *llh,
746                                 struct llog_rec_hdr *rec, void *data)
747 {
748         struct cfg_marker *mc_marker = data;
749         struct cfg_marker *marker;
750         struct lustre_cfg *lcfg = REC_DATA(rec);
751         int cfg_len = REC_DATA_LEN(rec);
752         int rc;
753         ENTRY;
754
755
756         if (rec->lrh_type != OBD_CFG_REC) {
757                 CDEBUG(D_ERROR, "Unhandled lrh_type: %#x\n", rec->lrh_type);
758                 RETURN(-EINVAL);
759         }
760
761         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
762         if (rc) {
763                 CDEBUG(D_ERROR, "Insane cfg\n");
764                 RETURN(rc);
765         }
766
767         /* We only care about markers */
768         if (lcfg->lcfg_command != LCFG_MARKER)
769                 RETURN(0);
770
771         marker = lustre_cfg_buf(lcfg, 1);
772
773         if (marker->cm_flags & CM_SKIP)
774                 RETURN(0);
775
776         if ((strcmp(mc_marker->cm_comment, marker->cm_comment) == 0) &&
777                 (strcmp(mc_marker->cm_tgtname, marker->cm_tgtname) == 0)) {
778                 /* Found a non-skipped marker match */
779                 CDEBUG(D_MGS, "Matched rec %u marker %d flag %x %s %s\n",
780                         rec->lrh_index, marker->cm_step,
781                         marker->cm_flags, marker->cm_tgtname,
782                         marker->cm_comment);
783                 rc = LLOG_PROC_BREAK;
784         }
785
786         RETURN(rc);
787 }
788
789 /**
790  * Check an existing config log record with matching comment and device
791  * Return code:
792  * 0 - checked successfully,
793  * LLOG_PROC_BREAK - record matches
794  * negative - error
795  */
796 static int mgs_check_marker(const struct lu_env *env, struct mgs_device *mgs,
797                 struct fs_db *fsdb, struct mgs_target_info *mti,
798                 char *logname, char *devname, char *comment)
799 {
800         struct llog_handle *loghandle;
801         struct llog_ctxt *ctxt;
802         struct cfg_marker *mc_marker;
803         int rc;
804
805         ENTRY;
806
807         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
808         CDEBUG(D_MGS, "mgs check %s/%s/%s\n", logname, devname, comment);
809
810         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
811         LASSERT(ctxt != NULL);
812         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
813         if (rc < 0) {
814                 if (rc == -ENOENT)
815                         rc = 0;
816                 GOTO(out_pop, rc);
817         }
818
819         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
820         if (rc)
821                 GOTO(out_close, rc);
822
823         if (llog_get_size(loghandle) <= 1)
824                 GOTO(out_close, rc = 0);
825
826         OBD_ALLOC_PTR(mc_marker);
827         if (!mc_marker)
828                 GOTO(out_close, rc = -ENOMEM);
829         if (strlcpy(mc_marker->cm_comment, comment,
830                 sizeof(mc_marker->cm_comment)) >=
831                 sizeof(mc_marker->cm_comment))
832                 GOTO(out_free, rc = -E2BIG);
833         if (strlcpy(mc_marker->cm_tgtname, devname,
834                 sizeof(mc_marker->cm_tgtname)) >=
835                 sizeof(mc_marker->cm_tgtname))
836                 GOTO(out_free, rc = -E2BIG);
837
838         rc = llog_process(env, loghandle, mgs_check_record_match,
839                         (void *)mc_marker, NULL);
840
841 out_free:
842         OBD_FREE_PTR(mc_marker);
843
844 out_close:
845         llog_close(env, loghandle);
846 out_pop:
847         if (rc && rc != LLOG_PROC_BREAK)
848                 CDEBUG(D_ERROR, "%s: mgs check %s/%s failed: rc = %d\n",
849                         mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
850         llog_ctxt_put(ctxt);
851         RETURN(rc);
852 }
853
854 static int mgs_modify_handler(const struct lu_env *env,
855                               struct llog_handle *llh,
856                               struct llog_rec_hdr *rec, void *data)
857 {
858         struct mgs_modify_lookup *mml = data;
859         struct cfg_marker *marker;
860         struct lustre_cfg *lcfg = REC_DATA(rec);
861         int cfg_len = REC_DATA_LEN(rec);
862         int rc;
863         ENTRY;
864
865         if (rec->lrh_type != OBD_CFG_REC) {
866                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
867                 RETURN(-EINVAL);
868         }
869
870         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
871         if (rc) {
872                 CERROR("Insane cfg\n");
873                 RETURN(rc);
874         }
875
876         /* We only care about markers */
877         if (lcfg->lcfg_command != LCFG_MARKER)
878                 RETURN(0);
879
880         marker = lustre_cfg_buf(lcfg, 1);
881         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
882             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
883             !(marker->cm_flags & CM_SKIP)) {
884                 /* Found a non-skipped marker match */
885                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
886                        rec->lrh_index, marker->cm_step,
887                        marker->cm_flags, mml->mml_marker.cm_flags,
888                        marker->cm_tgtname, marker->cm_comment);
889                 /* Overwrite the old marker llog entry */
890                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
891                 marker->cm_flags |= mml->mml_marker.cm_flags;
892                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
893                 rc = llog_write(env, llh, rec, rec->lrh_index);
894                 if (!rc)
895                          mml->mml_modified++;
896         }
897
898         RETURN(rc);
899 }
900
901 /**
902  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
903  * Return code:
904  * 0 - modified successfully,
905  * 1 - no modification was done
906  * negative - error
907  */
908 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
909                       struct fs_db *fsdb, struct mgs_target_info *mti,
910                       char *logname, char *devname, char *comment, int flags)
911 {
912         struct llog_handle *loghandle;
913         struct llog_ctxt *ctxt;
914         struct mgs_modify_lookup *mml;
915         int rc;
916
917         ENTRY;
918
919         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
920         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
921                flags);
922
923         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
924         LASSERT(ctxt != NULL);
925         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
926         if (rc < 0) {
927                 if (rc == -ENOENT)
928                         rc = 0;
929                 GOTO(out_pop, rc);
930         }
931
932         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
933         if (rc)
934                 GOTO(out_close, rc);
935
936         if (llog_get_size(loghandle) <= 1)
937                 GOTO(out_close, rc = 0);
938
939         OBD_ALLOC_PTR(mml);
940         if (!mml)
941                 GOTO(out_close, rc = -ENOMEM);
942         if (strlcpy(mml->mml_marker.cm_comment, comment,
943                     sizeof(mml->mml_marker.cm_comment)) >=
944             sizeof(mml->mml_marker.cm_comment))
945                 GOTO(out_free, rc = -E2BIG);
946         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
947                     sizeof(mml->mml_marker.cm_tgtname)) >=
948             sizeof(mml->mml_marker.cm_tgtname))
949                 GOTO(out_free, rc = -E2BIG);
950         /* Modify mostly means cancel */
951         mml->mml_marker.cm_flags = flags;
952         mml->mml_marker.cm_canceltime = flags ? ktime_get_real_seconds() : 0;
953         mml->mml_modified = 0;
954         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
955                           NULL);
956         if (!rc && !mml->mml_modified)
957                 rc = 1;
958
959 out_free:
960         OBD_FREE_PTR(mml);
961
962 out_close:
963         llog_close(env, loghandle);
964 out_pop:
965         if (rc < 0)
966                 CERROR("%s: modify %s/%s failed: rc = %d\n",
967                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
968         llog_ctxt_put(ctxt);
969         RETURN(rc);
970 }
971
972 enum replace_state {
973         REPLACE_COPY = 0,
974         REPLACE_SKIP,
975         REPLACE_DONE,
976         REPLACE_UUID,
977         REPLACE_SETUP
978 };
979
980 /** This structure is passed to mgs_replace_handler */
981 struct mgs_replace_data {
982         /* Nids are replaced for this target device */
983         struct mgs_target_info target;
984         /* Temporary modified llog */
985         struct llog_handle *temp_llh;
986         enum replace_state state;
987         char *failover;
988         char *nodeuuid;
989 };
990
991 /**
992  * Check: a) if block should be skipped
993  * b) is it target block
994  *
995  * \param[in] lcfg
996  * \param[in] mrd
997  *
998  * \retval 0 should not to be skipped
999  * \retval 1 should to be skipped
1000  */
1001 static int check_markers(struct lustre_cfg *lcfg,
1002                          struct mgs_replace_data *mrd)
1003 {
1004          struct cfg_marker *marker;
1005
1006         /* Track markers. Find given device */
1007         if (lcfg->lcfg_command == LCFG_MARKER) {
1008                 marker = lustre_cfg_buf(lcfg, 1);
1009                 /* Clean llog from records marked as CM_SKIP.
1010                    CM_EXCLUDE records are used for "active" command
1011                    and can be restored if needed */
1012                 if ((marker->cm_flags & (CM_SKIP | CM_START)) ==
1013                     (CM_SKIP | CM_START)) {
1014                         mrd->state = REPLACE_SKIP;
1015                         return 1;
1016                 }
1017
1018                 if ((marker->cm_flags & (CM_SKIP | CM_END)) ==
1019                     (CM_SKIP | CM_END)) {
1020                         mrd->state = REPLACE_COPY;
1021                         return 1;
1022                 }
1023
1024                 if (strcmp(mrd->target.mti_svname, marker->cm_tgtname) == 0) {
1025                         LASSERT(!(marker->cm_flags & CM_START) ||
1026                                 !(marker->cm_flags & CM_END));
1027                         if (marker->cm_flags & CM_START) {
1028                                 mrd->state = REPLACE_UUID;
1029                                 mrd->failover = NULL;
1030                         } else if (marker->cm_flags & CM_END)
1031                                 mrd->state = REPLACE_COPY;
1032                 }
1033         }
1034
1035         return 0;
1036 }
1037
1038 static int record_base(const struct lu_env *env, struct llog_handle *llh,
1039                      char *cfgname, lnet_nid_t nid, int cmd,
1040                      char *s1, char *s2, char *s3, char *s4)
1041 {
1042         struct mgs_thread_info  *mgi = mgs_env_info(env);
1043         struct llog_cfg_rec     *lcr;
1044         int rc;
1045
1046         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
1047                cmd, s1, s2, s3, s4);
1048
1049         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
1050         if (s1)
1051                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
1052         if (s2)
1053                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
1054         if (s3)
1055                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
1056         if (s4)
1057                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
1058
1059         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
1060         if (lcr == NULL)
1061                 return -ENOMEM;
1062
1063         lcr->lcr_cfg.lcfg_nid = nid;
1064         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1065
1066         lustre_cfg_rec_free(lcr);
1067
1068         if (rc < 0)
1069                 CDEBUG(D_MGS,
1070                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
1071                        cfgname, cmd, s1, s2, s3, s4, rc);
1072         return rc;
1073 }
1074
1075 static inline int record_add_uuid(const struct lu_env *env,
1076                                   struct llog_handle *llh,
1077                                   uint64_t nid, char *uuid)
1078 {
1079         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
1080                            NULL, NULL, NULL);
1081 }
1082
1083 static inline int record_add_conn(const struct lu_env *env,
1084                                   struct llog_handle *llh,
1085                                   char *devname, char *uuid)
1086 {
1087         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
1088                            NULL, NULL, NULL);
1089 }
1090
1091 static inline int record_attach(const struct lu_env *env,
1092                                 struct llog_handle *llh, char *devname,
1093                                 char *type, char *uuid)
1094 {
1095         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
1096                            NULL, NULL);
1097 }
1098
1099 static inline int record_setup(const struct lu_env *env,
1100                                struct llog_handle *llh, char *devname,
1101                                char *s1, char *s2, char *s3, char *s4)
1102 {
1103         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
1104 }
1105
1106 /**
1107  * \retval <0 record processing error
1108  * \retval n record is processed. No need copy original one.
1109  * \retval 0 record is not processed.
1110  */
1111 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
1112                            struct mgs_replace_data *mrd)
1113 {
1114         int nids_added = 0;
1115         lnet_nid_t nid;
1116         char *ptr;
1117         int rc = 0;
1118
1119         if (mrd->state == REPLACE_UUID &&
1120             lcfg->lcfg_command == LCFG_ADD_UUID) {
1121                 /* LCFG_ADD_UUID command found. Let's skip original command
1122                    and add passed nids */
1123                 ptr = mrd->target.mti_params;
1124                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1125                         if (!mrd->nodeuuid) {
1126                                 rc = name_create(&mrd->nodeuuid,
1127                                                  libcfs_nid2str(nid), "");
1128                                 if (rc) {
1129                                         CERROR("Can't create uuid for "
1130                                                 "nid  %s, device %s\n",
1131                                                 libcfs_nid2str(nid),
1132                                                 mrd->target.mti_svname);
1133                                         return rc;
1134                                 }
1135                         }
1136                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
1137                                "device %s\n", libcfs_nid2str(nid),
1138                                 mrd->target.mti_params,
1139                                 mrd->nodeuuid);
1140                         rc = record_add_uuid(env,
1141                                              mrd->temp_llh, nid,
1142                                              mrd->nodeuuid);
1143                         if (!rc)
1144                                 nids_added++;
1145
1146                         if (*ptr == ':') {
1147                                 mrd->failover = ptr;
1148                                 break;
1149                         }
1150                 }
1151
1152                 if (nids_added == 0) {
1153                         CERROR("No new nids were added, nid %s with uuid %s, "
1154                                "device %s\n", libcfs_nid2str(nid),
1155                                mrd->nodeuuid ? mrd->nodeuuid : "NULL",
1156                                mrd->target.mti_svname);
1157                         name_destroy(&mrd->nodeuuid);
1158                         return -ENXIO;
1159                 } else {
1160                         mrd->state = REPLACE_SETUP;
1161                 }
1162
1163                 return nids_added;
1164         }
1165
1166         if (mrd->state == REPLACE_SETUP && lcfg->lcfg_command == LCFG_SETUP) {
1167                 /* LCFG_SETUP command found. UUID should be changed */
1168                 rc = record_setup(env,
1169                                   mrd->temp_llh,
1170                                   /* devname the same */
1171                                   lustre_cfg_string(lcfg, 0),
1172                                   /* s1 is not changed */
1173                                   lustre_cfg_string(lcfg, 1),
1174                                   mrd->nodeuuid,
1175                                   /* s3 is not changed */
1176                                   lustre_cfg_string(lcfg, 3),
1177                                   /* s4 is not changed */
1178                                   lustre_cfg_string(lcfg, 4));
1179
1180                 name_destroy(&mrd->nodeuuid);
1181                 if (rc)
1182                         return rc;
1183
1184                 if (mrd->failover) {
1185                         ptr = mrd->failover;
1186                         while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1187                                 if (mrd->nodeuuid == NULL) {
1188                                         rc =  name_create(&mrd->nodeuuid,
1189                                                           libcfs_nid2str(nid),
1190                                                           "");
1191                                         if (rc)
1192                                                 return rc;
1193                                 }
1194
1195                                 CDEBUG(D_MGS, "add nid %s for failover %s\n",
1196                                        libcfs_nid2str(nid), mrd->nodeuuid);
1197                                 rc = record_add_uuid(env, mrd->temp_llh, nid,
1198                                                      mrd->nodeuuid);
1199                                 if (rc) {
1200                                         name_destroy(&mrd->nodeuuid);
1201                                         return rc;
1202                                 }
1203                                 if (*ptr == ':') {
1204                                         rc = record_add_conn(env,
1205                                                 mrd->temp_llh,
1206                                                 lustre_cfg_string(lcfg, 0),
1207                                                 mrd->nodeuuid);
1208                                         name_destroy(&mrd->nodeuuid);
1209                                         if (rc)
1210                                                 return rc;
1211                                 }
1212                         }
1213                         if (mrd->nodeuuid) {
1214                                 rc = record_add_conn(env, mrd->temp_llh,
1215                                                      lustre_cfg_string(lcfg, 0),
1216                                                      mrd->nodeuuid);
1217                                 name_destroy(&mrd->nodeuuid);
1218                                 if (rc)
1219                                         return rc;
1220                         }
1221                 }
1222                 mrd->state = REPLACE_DONE;
1223                 return rc ? rc : 1;
1224         }
1225
1226         /* Another commands in target device block */
1227         return 0;
1228 }
1229
1230 /**
1231  * Handler that called for every record in llog.
1232  * Records are processed in order they placed in llog.
1233  *
1234  * \param[in] llh       log to be processed
1235  * \param[in] rec       current record
1236  * \param[in] data      mgs_replace_data structure
1237  *
1238  * \retval 0    success
1239  */
1240 static int mgs_replace_nids_handler(const struct lu_env *env,
1241                                     struct llog_handle *llh,
1242                                     struct llog_rec_hdr *rec,
1243                                     void *data)
1244 {
1245         struct mgs_replace_data *mrd;
1246         struct lustre_cfg *lcfg = REC_DATA(rec);
1247         int cfg_len = REC_DATA_LEN(rec);
1248         int rc;
1249         ENTRY;
1250
1251         mrd = (struct mgs_replace_data *)data;
1252
1253         if (rec->lrh_type != OBD_CFG_REC) {
1254                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
1255                        rec->lrh_type, lcfg->lcfg_command,
1256                        lustre_cfg_string(lcfg, 0),
1257                        lustre_cfg_string(lcfg, 1));
1258                 RETURN(-EINVAL);
1259         }
1260
1261         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1262         if (rc) {
1263                 /* Do not copy any invalidated records */
1264                 GOTO(skip_out, rc = 0);
1265         }
1266
1267         rc = check_markers(lcfg, mrd);
1268         if (rc || mrd->state == REPLACE_SKIP)
1269                 GOTO(skip_out, rc = 0);
1270
1271         /* Write to new log all commands outside target device block */
1272         if (mrd->state == REPLACE_COPY)
1273                 GOTO(copy_out, rc = 0);
1274
1275         if (mrd->state == REPLACE_DONE &&
1276             (lcfg->lcfg_command == LCFG_ADD_UUID ||
1277              lcfg->lcfg_command == LCFG_ADD_CONN)) {
1278                 if (!mrd->failover)
1279                         CWARN("Previous failover is deleted, but new one is "
1280                               "not set. This means you configure system "
1281                               "without failover or passed wrong replace_nids "
1282                               "command parameters. Device %s, passed nids %s\n",
1283                               mrd->target.mti_svname, mrd->target.mti_params);
1284                 GOTO(skip_out, rc = 0);
1285         }
1286
1287         rc = process_command(env, lcfg, mrd);
1288         if (rc < 0)
1289                 RETURN(rc);
1290
1291         if (rc)
1292                 RETURN(0);
1293 copy_out:
1294         /* Record is placed in temporary llog as is */
1295         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1296
1297         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1298                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1299                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1300         RETURN(rc);
1301
1302 skip_out:
1303         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1304                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1305                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1306         RETURN(rc);
1307 }
1308
1309 static int mgs_log_is_empty(const struct lu_env *env,
1310                             struct mgs_device *mgs, char *name)
1311 {
1312         struct llog_ctxt        *ctxt;
1313         int                      rc;
1314
1315         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1316         LASSERT(ctxt != NULL);
1317
1318         rc = llog_is_empty(env, ctxt, name);
1319         llog_ctxt_put(ctxt);
1320         return rc;
1321 }
1322
1323 static int mgs_replace_log(const struct lu_env *env,
1324                            struct obd_device *mgs,
1325                            char *logname, char *devname,
1326                            llog_cb_t replace_handler, void *data)
1327 {
1328         struct llog_handle *orig_llh, *backup_llh;
1329         struct llog_ctxt *ctxt;
1330         struct mgs_replace_data *mrd;
1331         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1332         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1333         char *backup;
1334         int rc, rc2, buf_size;
1335         time64_t now;
1336         ENTRY;
1337
1338         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1339         LASSERT(ctxt != NULL);
1340
1341         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1342                 /* Log is empty. Nothing to replace */
1343                 GOTO(out_put, rc = 0);
1344         }
1345
1346         now = ktime_get_real_seconds();
1347
1348         /* max time64_t in decimal fits into 20 bytes long string */
1349         buf_size = strlen(logname) + 1 + 20 + 1 + strlen(".bak") + 1;
1350         OBD_ALLOC(backup, buf_size);
1351         if (backup == NULL)
1352                 GOTO(out_put, rc = -ENOMEM);
1353
1354         snprintf(backup, buf_size, "%s.%llu.bak", logname, now);
1355
1356         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1357         if (rc == 0) {
1358                 /* Now erase original log file. Connections are not allowed.
1359                    Backup is already saved */
1360                 rc = llog_erase(env, ctxt, NULL, logname);
1361                 if (rc < 0)
1362                         GOTO(out_free, rc);
1363         } else if (rc != -ENOENT) {
1364                 CERROR("%s: can't make backup for %s: rc = %d\n",
1365                        mgs->obd_name, logname, rc);
1366                 GOTO(out_free,rc);
1367         }
1368
1369         /* open local log */
1370         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1371         if (rc)
1372                 GOTO(out_restore, rc);
1373
1374         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1375         if (rc)
1376                 GOTO(out_closel, rc);
1377
1378         /* open backup llog */
1379         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1380                        LLOG_OPEN_EXISTS);
1381         if (rc)
1382                 GOTO(out_closel, rc);
1383
1384         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1385         if (rc)
1386                 GOTO(out_close, rc);
1387
1388         if (llog_get_size(backup_llh) <= 1)
1389                 GOTO(out_close, rc = 0);
1390
1391         OBD_ALLOC_PTR(mrd);
1392         if (!mrd)
1393                 GOTO(out_close, rc = -ENOMEM);
1394         /* devname is only needed information to replace UUID records */
1395         if (devname)
1396                 strlcpy(mrd->target.mti_svname, devname,
1397                         sizeof(mrd->target.mti_svname));
1398         /* data is parsed in llog callback */
1399         if (data)
1400                 strlcpy(mrd->target.mti_params, data,
1401                         sizeof(mrd->target.mti_params));
1402         /* Copy records to this temporary llog */
1403         mrd->temp_llh = orig_llh;
1404
1405         rc = llog_process(env, backup_llh, replace_handler,
1406                           (void *)mrd, NULL);
1407         OBD_FREE_PTR(mrd);
1408 out_close:
1409         rc2 = llog_close(NULL, backup_llh);
1410         if (!rc)
1411                 rc = rc2;
1412 out_closel:
1413         rc2 = llog_close(NULL, orig_llh);
1414         if (!rc)
1415                 rc = rc2;
1416
1417 out_restore:
1418         if (rc) {
1419                 CERROR("%s: llog should be restored: rc = %d\n",
1420                        mgs->obd_name, rc);
1421                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1422                                   logname);
1423                 if (rc2 < 0)
1424                         CERROR("%s: can't restore backup %s: rc = %d\n",
1425                                mgs->obd_name, logname, rc2);
1426         }
1427
1428 out_free:
1429         OBD_FREE(backup, buf_size);
1430
1431 out_put:
1432         llog_ctxt_put(ctxt);
1433
1434         if (rc)
1435                 CERROR("%s: failed to replace log %s: rc = %d\n",
1436                        mgs->obd_name, logname, rc);
1437
1438         RETURN(rc);
1439 }
1440
1441 static int mgs_replace_nids_log(const struct lu_env *env,
1442                                 struct obd_device *obd,
1443                                 char *logname, char *devname, char *nids)
1444 {
1445         CDEBUG(D_MGS, "Replace NIDs for %s in %s\n", devname, logname);
1446         return mgs_replace_log(env, obd, logname, devname,
1447                                mgs_replace_nids_handler, nids);
1448 }
1449
1450 /**
1451  * Parse device name and get file system name and/or device index
1452  *
1453  * @devname     device name (ex. lustre-MDT0000)
1454  * @fsname      file system name extracted from @devname and returned
1455  *              to the caller (optional)
1456  * @index       device index extracted from @devname and returned to
1457  *              the caller (optional)
1458  *
1459  * RETURN       0                       success if we are only interested in
1460  *                                      extracting fsname from devname.
1461  *                                      i.e index is NULL
1462  *
1463  *              LDD_F_SV_TYPE_*         Besides extracting the fsname the
1464  *                                      user also wants the index. Report to
1465  *                                      the user the type of obd device the
1466  *                                      returned index belongs too.
1467  *
1468  *              -EINVAL                 The obd device name is improper so
1469  *                                      fsname could not be extracted.
1470  *
1471  *              -ENXIO                  Failed to extract the index out of
1472  *                                      the obd device name. Most likely an
1473  *                                      invalid obd device name
1474  */
1475 static int mgs_parse_devname(char *devname, char *fsname, u32 *index)
1476 {
1477         int rc = 0;
1478         ENTRY;
1479
1480         /* Extract fsname */
1481         if (fsname) {
1482                 rc = server_name2fsname(devname, fsname, NULL);
1483                 if (rc < 0) {
1484                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1485                                devname);
1486                         RETURN(-EINVAL);
1487                 }
1488         }
1489
1490         if (index) {
1491                 rc = server_name2index(devname, index, NULL);
1492                 if (rc < 0) {
1493                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1494                                devname);
1495                         RETURN(-ENXIO);
1496                 }
1497         }
1498
1499         /* server_name2index can return LDD_F_SV_TYPE_* so always return rc */
1500         RETURN(rc);
1501 }
1502
1503 /* This is only called during replace_nids */
1504 static int only_mgs_is_running(struct obd_device *mgs_obd)
1505 {
1506         /* TDB: Is global variable with devices count exists? */
1507         int num_devices = get_devices_count();
1508         int num_exports = 0;
1509         struct obd_export *exp;
1510
1511         spin_lock(&mgs_obd->obd_dev_lock);
1512         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1513                 /* skip self export */
1514                 if (exp == mgs_obd->obd_self_export)
1515                         continue;
1516                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1517                         continue;
1518
1519                 ++num_exports;
1520
1521                 CERROR("%s: node %s still connected during replace_nids "
1522                        "connect_flags:%llx\n",
1523                        mgs_obd->obd_name,
1524                        libcfs_nid2str(exp->exp_nid_stats->nid),
1525                        exp_connect_flags(exp));
1526
1527         }
1528         spin_unlock(&mgs_obd->obd_dev_lock);
1529
1530         /* osd, MGS and MGC + self_export
1531            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1532         return (num_devices <= 3) && (num_exports == 0);
1533 }
1534
1535 static int name_create_mdt(char **logname, char *fsname, int i)
1536 {
1537         char mdt_index[9];
1538
1539         sprintf(mdt_index, "-MDT%04x", i);
1540         return name_create(logname, fsname, mdt_index);
1541 }
1542
1543 /**
1544  * Replace nids for \a device to \a nids values
1545  *
1546  * \param obd           MGS obd device
1547  * \param devname       nids need to be replaced for this device
1548  * (ex. lustre-OST0000)
1549  * \param nids          nids list (ex. nid1,nid2,nid3)
1550  *
1551  * \retval 0    success
1552  */
1553 int mgs_replace_nids(const struct lu_env *env,
1554                      struct mgs_device *mgs,
1555                      char *devname, char *nids)
1556 {
1557         /* Assume fsname is part of device name */
1558         char fsname[MTI_NAME_MAXLEN];
1559         int rc;
1560         __u32 index;
1561         char *logname;
1562         struct fs_db *fsdb = NULL;
1563         unsigned int i;
1564         int conn_state;
1565         struct obd_device *mgs_obd = mgs->mgs_obd;
1566         ENTRY;
1567
1568         /* We can only change NIDs if no other nodes are connected */
1569         spin_lock(&mgs_obd->obd_dev_lock);
1570         conn_state = mgs_obd->obd_no_conn;
1571         mgs_obd->obd_no_conn = 1;
1572         spin_unlock(&mgs_obd->obd_dev_lock);
1573
1574         /* We can not change nids if not only MGS is started */
1575         if (!only_mgs_is_running(mgs_obd)) {
1576                 CERROR("Only MGS is allowed to be started\n");
1577                 GOTO(out, rc = -EINPROGRESS);
1578         }
1579
1580         /* Get fsname and index */
1581         rc = mgs_parse_devname(devname, fsname, &index);
1582         if (rc < 0)
1583                 GOTO(out, rc);
1584
1585         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1586         if (rc) {
1587                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1588                 GOTO(out, rc);
1589         }
1590
1591         /* Process client llogs */
1592         name_create(&logname, fsname, "-client");
1593         rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1594         name_destroy(&logname);
1595         if (rc) {
1596                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1597                        fsname, devname, rc);
1598                 GOTO(out, rc);
1599         }
1600
1601         /* Process MDT llogs */
1602         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1603                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1604                         continue;
1605                 name_create_mdt(&logname, fsname, i);
1606                 rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1607                 name_destroy(&logname);
1608                 if (rc)
1609                         GOTO(out, rc);
1610         }
1611
1612 out:
1613         spin_lock(&mgs_obd->obd_dev_lock);
1614         mgs_obd->obd_no_conn = conn_state;
1615         spin_unlock(&mgs_obd->obd_dev_lock);
1616
1617         if (fsdb)
1618                 mgs_put_fsdb(mgs, fsdb);
1619
1620         RETURN(rc);
1621 }
1622
1623 /**
1624  * This is called for every record in llog. Some of records are
1625  * skipped, others are copied to new log as is.
1626  * Records to be skipped are
1627  *  marker records marked SKIP
1628  *  records enclosed between SKIP markers
1629  *
1630  * \param[in] llh       log to be processed
1631  * \param[in] rec       current record
1632  * \param[in] data      mgs_replace_data structure
1633  *
1634  * \retval 0    success
1635  **/
1636 static int mgs_clear_config_handler(const struct lu_env *env,
1637                                     struct llog_handle *llh,
1638                                     struct llog_rec_hdr *rec, void *data)
1639 {
1640         struct mgs_replace_data *mrd;
1641         struct lustre_cfg *lcfg = REC_DATA(rec);
1642         int cfg_len = REC_DATA_LEN(rec);
1643         int rc;
1644
1645         ENTRY;
1646
1647         mrd = (struct mgs_replace_data *)data;
1648
1649         if (rec->lrh_type != OBD_CFG_REC) {
1650                 CDEBUG(D_MGS, "Config llog Name=%s, Record Index=%u, "
1651                        "Unhandled Record Type=%#x\n", llh->lgh_name,
1652                        rec->lrh_index, rec->lrh_type);
1653                 RETURN(-EINVAL);
1654         }
1655
1656         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1657         if (rc) {
1658                 CDEBUG(D_MGS, "Config llog Name=%s, Invalid config file.",
1659                        llh->lgh_name);
1660                 RETURN(-EINVAL);
1661         }
1662
1663         if (lcfg->lcfg_command == LCFG_MARKER) {
1664                 struct cfg_marker *marker;
1665
1666                 marker = lustre_cfg_buf(lcfg, 1);
1667                 if (marker->cm_flags & CM_SKIP) {
1668                         if (marker->cm_flags & CM_START)
1669                                 mrd->state = REPLACE_SKIP;
1670                         if (marker->cm_flags & CM_END)
1671                                 mrd->state = REPLACE_COPY;
1672                         /* SKIP section started or finished */
1673                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1674                                "cmd %x %s %s\n", rec->lrh_index, rc,
1675                                rec->lrh_len, lcfg->lcfg_command,
1676                                lustre_cfg_string(lcfg, 0),
1677                                lustre_cfg_string(lcfg, 1));
1678                         RETURN(0);
1679                 }
1680         } else {
1681                 if (mrd->state == REPLACE_SKIP) {
1682                         /* record enclosed between SKIP markers, skip it */
1683                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1684                                "cmd %x %s %s\n", rec->lrh_index, rc,
1685                                rec->lrh_len, lcfg->lcfg_command,
1686                                lustre_cfg_string(lcfg, 0),
1687                                lustre_cfg_string(lcfg, 1));
1688                         RETURN(0);
1689                 }
1690         }
1691
1692         /* Record is placed in temporary llog as is */
1693         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1694
1695         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1696                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1697                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1698         RETURN(rc);
1699 }
1700
1701 /*
1702  * Directory CONFIGS/ may contain files which are not config logs to
1703  * be cleared. Skip any llogs with a non-alphanumeric character after
1704  * the last '-'. For example, fsname-MDT0000.sav, fsname-MDT0000.bak,
1705  * fsname-MDT0000.orig, fsname-MDT0000~, fsname-MDT0000.20150516, etc.
1706  */
1707 static bool config_to_clear(const char *logname)
1708 {
1709         int i;
1710         char *str;
1711
1712         str = strrchr(logname, '-');
1713         if (!str)
1714                 return 0;
1715
1716         i = 0;
1717         while (isalnum(str[++i]));
1718         return str[i] == '\0';
1719 }
1720
1721 /**
1722  * Clear config logs for \a name
1723  *
1724  * \param env
1725  * \param mgs           MGS device
1726  * \param name          name of device or of filesystem
1727  *                      (ex. lustre-OST0000 or lustre) in later case all logs
1728  *                      will be cleared
1729  *
1730  * \retval 0            success
1731  */
1732 int mgs_clear_configs(const struct lu_env *env,
1733                      struct mgs_device *mgs, const char *name)
1734 {
1735         struct list_head dentry_list;
1736         struct mgs_direntry *dirent, *n;
1737         char *namedash;
1738         int conn_state;
1739         struct obd_device *mgs_obd = mgs->mgs_obd;
1740         int rc;
1741
1742         ENTRY;
1743
1744         /* Prevent clients and servers from connecting to mgs */
1745         spin_lock(&mgs_obd->obd_dev_lock);
1746         conn_state = mgs_obd->obd_no_conn;
1747         mgs_obd->obd_no_conn = 1;
1748         spin_unlock(&mgs_obd->obd_dev_lock);
1749
1750         /*
1751          * config logs cannot be cleaned if anything other than
1752          * MGS is started
1753          */
1754         if (!only_mgs_is_running(mgs_obd)) {
1755                 CERROR("Only MGS is allowed to be started\n");
1756                 GOTO(out, rc = -EBUSY);
1757         }
1758
1759         /* Find all the logs in the CONFIGS directory */
1760         rc = class_dentry_readdir(env, mgs, &dentry_list);
1761         if (rc) {
1762                 CERROR("%s: cannot read config directory '%s': rc = %d\n",
1763                        mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
1764                 GOTO(out, rc);
1765         }
1766
1767         if (list_empty(&dentry_list)) {
1768                 CERROR("%s: list empty reading config dir '%s': rc = %d\n",
1769                         mgs_obd->obd_name, MOUNT_CONFIGS_DIR, -ENOENT);
1770                 GOTO(out, rc = -ENOENT);
1771         }
1772
1773         OBD_ALLOC(namedash, strlen(name) + 2);
1774         if (namedash == NULL)
1775                 GOTO(out, rc = -ENOMEM);
1776         snprintf(namedash, strlen(name) + 2, "%s-", name);
1777
1778         list_for_each_entry(dirent, &dentry_list, mde_list) {
1779                 if (strcmp(name, dirent->mde_name) &&
1780                     strncmp(namedash, dirent->mde_name, strlen(namedash)))
1781                         continue;
1782                 if (!config_to_clear(dirent->mde_name))
1783                         continue;
1784                 CDEBUG(D_MGS, "%s: Clear config log %s\n",
1785                        mgs_obd->obd_name, dirent->mde_name);
1786                 rc = mgs_replace_log(env, mgs_obd, dirent->mde_name, NULL,
1787                                      mgs_clear_config_handler, NULL);
1788                 if (rc)
1789                         break;
1790         }
1791
1792         list_for_each_entry_safe(dirent, n, &dentry_list, mde_list) {
1793                 list_del_init(&dirent->mde_list);
1794                 mgs_direntry_free(dirent);
1795         }
1796         OBD_FREE(namedash, strlen(name) + 2);
1797 out:
1798         spin_lock(&mgs_obd->obd_dev_lock);
1799         mgs_obd->obd_no_conn = conn_state;
1800         spin_unlock(&mgs_obd->obd_dev_lock);
1801
1802         RETURN(rc);
1803 }
1804
1805 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1806                             char *devname, struct lov_desc *desc)
1807 {
1808         struct mgs_thread_info  *mgi = mgs_env_info(env);
1809         struct llog_cfg_rec     *lcr;
1810         int rc;
1811
1812         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1813         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1814         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1815         if (lcr == NULL)
1816                 return -ENOMEM;
1817
1818         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1819         lustre_cfg_rec_free(lcr);
1820         return rc;
1821 }
1822
1823 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1824                             char *devname, struct lmv_desc *desc)
1825 {
1826         struct mgs_thread_info  *mgi = mgs_env_info(env);
1827         struct llog_cfg_rec     *lcr;
1828         int rc;
1829
1830         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1831         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1832         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1833         if (lcr == NULL)
1834                 return -ENOMEM;
1835
1836         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1837         lustre_cfg_rec_free(lcr);
1838         return rc;
1839 }
1840
1841 static inline int record_mdc_add(const struct lu_env *env,
1842                                  struct llog_handle *llh,
1843                                  char *logname, char *mdcuuid,
1844                                  char *mdtuuid, char *index,
1845                                  char *gen)
1846 {
1847         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1848                            mdtuuid,index,gen,mdcuuid);
1849 }
1850
1851 static inline int record_lov_add(const struct lu_env *env,
1852                                  struct llog_handle *llh,
1853                                  char *lov_name, char *ost_uuid,
1854                                  char *index, char *gen)
1855 {
1856         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1857                            ost_uuid, index, gen, NULL);
1858 }
1859
1860 static inline int record_mount_opt(const struct lu_env *env,
1861                                    struct llog_handle *llh,
1862                                    char *profile, char *lov_name,
1863                                    char *mdc_name)
1864 {
1865         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1866                            profile, lov_name, mdc_name, NULL);
1867 }
1868
1869 static int record_marker(const struct lu_env *env,
1870                          struct llog_handle *llh,
1871                          struct fs_db *fsdb, __u32 flags,
1872                          char *tgtname, char *comment)
1873 {
1874         struct mgs_thread_info *mgi = mgs_env_info(env);
1875         struct llog_cfg_rec *lcr;
1876         int rc;
1877         int cplen = 0;
1878
1879         if (flags & CM_START)
1880                 fsdb->fsdb_gen++;
1881         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1882         mgi->mgi_marker.cm_flags = flags;
1883         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1884         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1885                         sizeof(mgi->mgi_marker.cm_tgtname));
1886         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1887                 return -E2BIG;
1888         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1889                         sizeof(mgi->mgi_marker.cm_comment));
1890         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1891                 return -E2BIG;
1892         mgi->mgi_marker.cm_createtime = ktime_get_real_seconds();
1893         mgi->mgi_marker.cm_canceltime = 0;
1894         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1895         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1896                             sizeof(mgi->mgi_marker));
1897         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1898         if (lcr == NULL)
1899                 return -ENOMEM;
1900
1901         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1902         lustre_cfg_rec_free(lcr);
1903         return rc;
1904 }
1905
1906 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1907                             struct llog_handle **llh, char *name)
1908 {
1909         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1910         struct llog_ctxt        *ctxt;
1911         int                      rc = 0;
1912         ENTRY;
1913
1914         if (*llh)
1915                 GOTO(out, rc = -EBUSY);
1916
1917         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1918         if (!ctxt)
1919                 GOTO(out, rc = -ENODEV);
1920         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1921
1922         rc = llog_open_create(env, ctxt, llh, NULL, name);
1923         if (rc)
1924                 GOTO(out_ctxt, rc);
1925         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1926         if (rc)
1927                 llog_close(env, *llh);
1928 out_ctxt:
1929         llog_ctxt_put(ctxt);
1930 out:
1931         if (rc) {
1932                 CERROR("%s: can't start log %s: rc = %d\n",
1933                        mgs->mgs_obd->obd_name, name, rc);
1934                 *llh = NULL;
1935         }
1936         RETURN(rc);
1937 }
1938
1939 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1940 {
1941         int rc;
1942
1943         rc = llog_close(env, *llh);
1944         *llh = NULL;
1945
1946         return rc;
1947 }
1948
1949 /******************** config "macros" *********************/
1950
1951 /* write an lcfg directly into a log (with markers) */
1952 static int mgs_write_log_direct(const struct lu_env *env,
1953                                 struct mgs_device *mgs, struct fs_db *fsdb,
1954                                 char *logname, struct llog_cfg_rec *lcr,
1955                                 char *devname, char *comment)
1956 {
1957         struct llog_handle *llh = NULL;
1958         int rc;
1959
1960         ENTRY;
1961
1962         rc = record_start_log(env, mgs, &llh, logname);
1963         if (rc)
1964                 RETURN(rc);
1965
1966         /* FIXME These should be a single journal transaction */
1967         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1968         if (rc)
1969                 GOTO(out_end, rc);
1970         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1971         if (rc)
1972                 GOTO(out_end, rc);
1973         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1974         if (rc)
1975                 GOTO(out_end, rc);
1976 out_end:
1977         record_end_log(env, &llh);
1978         RETURN(rc);
1979 }
1980
1981 /* write the lcfg in all logs for the given fs */
1982 static int mgs_write_log_direct_all(const struct lu_env *env,
1983                                     struct mgs_device *mgs,
1984                                     struct fs_db *fsdb,
1985                                     struct mgs_target_info *mti,
1986                                     struct llog_cfg_rec *lcr, char *devname,
1987                                     char *comment, int server_only)
1988 {
1989         struct list_head         log_list;
1990         struct mgs_direntry     *dirent, *n;
1991         char                    *fsname = mti->mti_fsname;
1992         int                      rc = 0, len = strlen(fsname);
1993
1994         ENTRY;
1995         /* Find all the logs in the CONFIGS directory */
1996         rc = class_dentry_readdir(env, mgs, &log_list);
1997         if (rc)
1998                 RETURN(rc);
1999
2000         /* Could use fsdb index maps instead of directory listing */
2001         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
2002                 list_del_init(&dirent->mde_list);
2003                 /* don't write to sptlrpc rule log */
2004                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
2005                         goto next;
2006
2007                 /* caller wants write server logs only */
2008                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
2009                         goto next;
2010
2011                 if (strlen(dirent->mde_name) <= len ||
2012                     strncmp(fsname, dirent->mde_name, len) != 0 ||
2013                     dirent->mde_name[len] != '-')
2014                         goto next;
2015
2016                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
2017                 /* Erase any old settings of this same parameter */
2018                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
2019                                 devname, comment, CM_SKIP);
2020                 if (rc < 0)
2021                         CERROR("%s: Can't modify llog %s: rc = %d\n",
2022                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2023                 if (lcr == NULL)
2024                         goto next;
2025                 /* Write the new one */
2026                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
2027                                           lcr, devname, comment);
2028                 if (rc != 0)
2029                         CERROR("%s: writing log %s: rc = %d\n",
2030                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2031 next:
2032                 mgs_direntry_free(dirent);
2033         }
2034
2035         RETURN(rc);
2036 }
2037
2038 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2039                                     struct mgs_device *mgs,
2040                                     struct fs_db *fsdb,
2041                                     struct mgs_target_info *mti,
2042                                     int index, char *logname);
2043 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2044                                     struct mgs_device *mgs,
2045                                     struct fs_db *fsdb,
2046                                     struct mgs_target_info *mti,
2047                                     char *logname, char *suffix, char *lovname,
2048                                     enum lustre_sec_part sec_part, int flags);
2049 static int name_create_mdt_and_lov(char **logname, char **lovname,
2050                                    struct fs_db *fsdb, int i);
2051
2052 static int add_param(char *params, char *key, char *val)
2053 {
2054         char *start = params + strlen(params);
2055         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
2056         int keylen = 0;
2057
2058         if (key != NULL)
2059                 keylen = strlen(key);
2060         if (start + 1 + keylen + strlen(val) >= end) {
2061                 CERROR("params are too long: %s %s%s\n",
2062                        params, key != NULL ? key : "", val);
2063                 return -EINVAL;
2064         }
2065
2066         sprintf(start, " %s%s", key != NULL ? key : "", val);
2067         return 0;
2068 }
2069
2070 /**
2071  * Walk through client config log record and convert the related records
2072  * into the target.
2073  **/
2074 static int mgs_steal_client_llog_handler(const struct lu_env *env,
2075                                          struct llog_handle *llh,
2076                                          struct llog_rec_hdr *rec, void *data)
2077 {
2078         struct mgs_device *mgs;
2079         struct obd_device *obd;
2080         struct mgs_target_info *mti, *tmti;
2081         struct fs_db *fsdb;
2082         int cfg_len = rec->lrh_len;
2083         char *cfg_buf = (char*) (rec + 1);
2084         struct lustre_cfg *lcfg;
2085         int rc = 0;
2086         struct llog_handle *mdt_llh = NULL;
2087         static int got_an_osc_or_mdc = 0;
2088         /* 0: not found any osc/mdc;
2089            1: found osc;
2090            2: found mdc;
2091         */
2092         static int last_step = -1;
2093         int cplen = 0;
2094
2095         ENTRY;
2096
2097         mti = ((struct temp_comp*)data)->comp_mti;
2098         tmti = ((struct temp_comp*)data)->comp_tmti;
2099         fsdb = ((struct temp_comp*)data)->comp_fsdb;
2100         obd = ((struct temp_comp *)data)->comp_obd;
2101         mgs = lu2mgs_dev(obd->obd_lu_dev);
2102         LASSERT(mgs);
2103
2104         if (rec->lrh_type != OBD_CFG_REC) {
2105                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2106                 RETURN(-EINVAL);
2107         }
2108
2109         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
2110         if (rc) {
2111                 CERROR("Insane cfg\n");
2112                 RETURN(rc);
2113         }
2114
2115         lcfg = (struct lustre_cfg *)cfg_buf;
2116
2117         if (lcfg->lcfg_command == LCFG_MARKER) {
2118                 struct cfg_marker *marker;
2119                 marker = lustre_cfg_buf(lcfg, 1);
2120                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2121                     (marker->cm_flags & CM_START) &&
2122                      !(marker->cm_flags & CM_SKIP)) {
2123                         got_an_osc_or_mdc = 1;
2124                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
2125                                         sizeof(tmti->mti_svname));
2126                         if (cplen >= sizeof(tmti->mti_svname))
2127                                 RETURN(-E2BIG);
2128                         rc = record_start_log(env, mgs, &mdt_llh,
2129                                               mti->mti_svname);
2130                         if (rc)
2131                                 RETURN(rc);
2132                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
2133                                            mti->mti_svname, "add osc(copied)");
2134                         record_end_log(env, &mdt_llh);
2135                         last_step = marker->cm_step;
2136                         RETURN(rc);
2137                 }
2138                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2139                     (marker->cm_flags & CM_END) &&
2140                      !(marker->cm_flags & CM_SKIP)) {
2141                         LASSERT(last_step == marker->cm_step);
2142                         last_step = -1;
2143                         got_an_osc_or_mdc = 0;
2144                         memset(tmti, 0, sizeof(*tmti));
2145                         rc = record_start_log(env, mgs, &mdt_llh,
2146                                               mti->mti_svname);
2147                         if (rc)
2148                                 RETURN(rc);
2149                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
2150                                            mti->mti_svname, "add osc(copied)");
2151                         record_end_log(env, &mdt_llh);
2152                         RETURN(rc);
2153                 }
2154                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2155                     (marker->cm_flags & CM_START) &&
2156                      !(marker->cm_flags & CM_SKIP)) {
2157                         got_an_osc_or_mdc = 2;
2158                         last_step = marker->cm_step;
2159                         memcpy(tmti->mti_svname, marker->cm_tgtname,
2160                                strlen(marker->cm_tgtname));
2161
2162                         RETURN(rc);
2163                 }
2164                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2165                     (marker->cm_flags & CM_END) &&
2166                      !(marker->cm_flags & CM_SKIP)) {
2167                         LASSERT(last_step == marker->cm_step);
2168                         last_step = -1;
2169                         got_an_osc_or_mdc = 0;
2170                         memset(tmti, 0, sizeof(*tmti));
2171                         RETURN(rc);
2172                 }
2173         }
2174
2175         if (got_an_osc_or_mdc == 0 || last_step < 0)
2176                 RETURN(rc);
2177
2178         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
2179                 __u64 nodenid = lcfg->lcfg_nid;
2180
2181                 if (strlen(tmti->mti_uuid) == 0) {
2182                         /* target uuid not set, this config record is before
2183                          * LCFG_SETUP, this nid is one of target node nid.
2184                          */
2185                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
2186                         tmti->mti_nid_count++;
2187                 } else {
2188                         char nidstr[LNET_NIDSTR_SIZE];
2189
2190                         /* failover node nid */
2191                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
2192                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
2193                                         nidstr);
2194                 }
2195
2196                 RETURN(rc);
2197         }
2198
2199         if (lcfg->lcfg_command == LCFG_SETUP) {
2200                 char *target;
2201
2202                 target = lustre_cfg_string(lcfg, 1);
2203                 memcpy(tmti->mti_uuid, target, strlen(target));
2204                 RETURN(rc);
2205         }
2206
2207         /* ignore client side sptlrpc_conf_log */
2208         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
2209                 RETURN(rc);
2210
2211         if (lcfg->lcfg_command == LCFG_ADD_MDC &&
2212             strstr(lustre_cfg_string(lcfg, 0), "-clilmv") != NULL) {
2213                 int index;
2214
2215                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
2216                         RETURN (-EINVAL);
2217
2218                 memcpy(tmti->mti_fsname, mti->mti_fsname,
2219                        strlen(mti->mti_fsname));
2220                 tmti->mti_stripe_index = index;
2221
2222                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
2223                                               mti->mti_stripe_index,
2224                                               mti->mti_svname);
2225                 memset(tmti, 0, sizeof(*tmti));
2226                 RETURN(rc);
2227         }
2228
2229         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
2230                 int index;
2231                 char mdt_index[9];
2232                 char *logname, *lovname;
2233
2234                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2235                                              mti->mti_stripe_index);
2236                 if (rc)
2237                         RETURN(rc);
2238                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
2239
2240                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
2241                         name_destroy(&logname);
2242                         name_destroy(&lovname);
2243                         RETURN(-EINVAL);
2244                 }
2245
2246                 tmti->mti_stripe_index = index;
2247                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
2248                                          mdt_index, lovname,
2249                                          LUSTRE_SP_MDT, 0);
2250                 name_destroy(&logname);
2251                 name_destroy(&lovname);
2252                 RETURN(rc);
2253         }
2254         RETURN(rc);
2255 }
2256
2257 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
2258 /* stealed from mgs_get_fsdb_from_llog*/
2259 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
2260                                               struct mgs_device *mgs,
2261                                               char *client_name,
2262                                               struct temp_comp* comp)
2263 {
2264         struct llog_handle *loghandle;
2265         struct mgs_target_info *tmti;
2266         struct llog_ctxt *ctxt;
2267         int rc;
2268
2269         ENTRY;
2270
2271         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2272         LASSERT(ctxt != NULL);
2273
2274         OBD_ALLOC_PTR(tmti);
2275         if (tmti == NULL)
2276                 GOTO(out_ctxt, rc = -ENOMEM);
2277
2278         comp->comp_tmti = tmti;
2279         comp->comp_obd = mgs->mgs_obd;
2280
2281         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
2282                        LLOG_OPEN_EXISTS);
2283         if (rc < 0) {
2284                 if (rc == -ENOENT)
2285                         rc = 0;
2286                 GOTO(out_pop, rc);
2287         }
2288
2289         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
2290         if (rc)
2291                 GOTO(out_close, rc);
2292
2293         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
2294                                   (void *)comp, NULL, false);
2295         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
2296 out_close:
2297         llog_close(env, loghandle);
2298 out_pop:
2299         OBD_FREE_PTR(tmti);
2300 out_ctxt:
2301         llog_ctxt_put(ctxt);
2302         RETURN(rc);
2303 }
2304
2305 /* lmv is the second thing for client logs */
2306 /* copied from mgs_write_log_lov. Please refer to that.  */
2307 static int mgs_write_log_lmv(const struct lu_env *env,
2308                              struct mgs_device *mgs,
2309                              struct fs_db *fsdb,
2310                              struct mgs_target_info *mti,
2311                              char *logname, char *lmvname)
2312 {
2313         struct llog_handle *llh = NULL;
2314         struct lmv_desc *lmvdesc;
2315         char *uuid;
2316         int rc = 0;
2317         ENTRY;
2318
2319         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
2320
2321         OBD_ALLOC_PTR(lmvdesc);
2322         if (lmvdesc == NULL)
2323                 RETURN(-ENOMEM);
2324         lmvdesc->ld_active_tgt_count = 0;
2325         lmvdesc->ld_tgt_count = 0;
2326         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
2327         uuid = (char *)lmvdesc->ld_uuid.uuid;
2328
2329         rc = record_start_log(env, mgs, &llh, logname);
2330         if (rc)
2331                 GOTO(out_free, rc);
2332         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
2333         if (rc)
2334                 GOTO(out_end, rc);
2335         rc = record_attach(env, llh, lmvname, "lmv", uuid);
2336         if (rc)
2337                 GOTO(out_end, rc);
2338         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
2339         if (rc)
2340                 GOTO(out_end, rc);
2341         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
2342         if (rc)
2343                 GOTO(out_end, rc);
2344 out_end:
2345         record_end_log(env, &llh);
2346 out_free:
2347         OBD_FREE_PTR(lmvdesc);
2348         RETURN(rc);
2349 }
2350
2351 /* lov is the first thing in the mdt and client logs */
2352 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
2353                              struct fs_db *fsdb, struct mgs_target_info *mti,
2354                              char *logname, char *lovname)
2355 {
2356         struct llog_handle *llh = NULL;
2357         struct lov_desc *lovdesc;
2358         char *uuid;
2359         int rc = 0;
2360         ENTRY;
2361
2362         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
2363
2364         /*
2365         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
2366         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
2367               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
2368         */
2369
2370         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
2371         OBD_ALLOC_PTR(lovdesc);
2372         if (lovdesc == NULL)
2373                 RETURN(-ENOMEM);
2374         lovdesc->ld_magic = LOV_DESC_MAGIC;
2375         lovdesc->ld_tgt_count = 0;
2376         /* Defaults.  Can be changed later by lcfg config_param */
2377         lovdesc->ld_default_stripe_count = 1;
2378         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
2379         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
2380         lovdesc->ld_default_stripe_offset = -1;
2381         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
2382         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
2383         /* can these be the same? */
2384         uuid = (char *)lovdesc->ld_uuid.uuid;
2385
2386         /* This should always be the first entry in a log.
2387         rc = mgs_clear_log(obd, logname); */
2388         rc = record_start_log(env, mgs, &llh, logname);
2389         if (rc)
2390                 GOTO(out_free, rc);
2391         /* FIXME these should be a single journal transaction */
2392         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
2393         if (rc)
2394                 GOTO(out_end, rc);
2395         rc = record_attach(env, llh, lovname, "lov", uuid);
2396         if (rc)
2397                 GOTO(out_end, rc);
2398         rc = record_lov_setup(env, llh, lovname, lovdesc);
2399         if (rc)
2400                 GOTO(out_end, rc);
2401         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
2402         if (rc)
2403                 GOTO(out_end, rc);
2404         EXIT;
2405 out_end:
2406         record_end_log(env, &llh);
2407 out_free:
2408         OBD_FREE_PTR(lovdesc);
2409         return rc;
2410 }
2411
2412 /* add failnids to open log */
2413 static int mgs_write_log_failnids(const struct lu_env *env,
2414                                   struct mgs_target_info *mti,
2415                                   struct llog_handle *llh,
2416                                   char *cliname)
2417 {
2418         char *failnodeuuid = NULL;
2419         char *ptr = mti->mti_params;
2420         lnet_nid_t nid;
2421         int rc = 0;
2422
2423         /*
2424         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
2425         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
2426         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
2427         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
2428         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
2429         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
2430         */
2431
2432         /*
2433          * Pull failnid info out of params string, which may contain something
2434          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
2435          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
2436          * etc.  However, convert_hostnames() should have caught those.
2437          */
2438         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2439                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2440                         char nidstr[LNET_NIDSTR_SIZE];
2441
2442                         if (failnodeuuid == NULL) {
2443                                 /* We don't know the failover node name,
2444                                  * so just use the first nid as the uuid */
2445                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
2446                                 rc = name_create(&failnodeuuid, nidstr, "");
2447                                 if (rc != 0)
2448                                         return rc;
2449                         }
2450                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2451                                 "client %s\n",
2452                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
2453                                 failnodeuuid, cliname);
2454                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2455                         /*
2456                          * If *ptr is ':', we have added all NIDs for
2457                          * failnodeuuid.
2458                          */
2459                         if (*ptr == ':') {
2460                                 rc = record_add_conn(env, llh, cliname,
2461                                                      failnodeuuid);
2462                                 name_destroy(&failnodeuuid);
2463                                 failnodeuuid = NULL;
2464                         }
2465                 }
2466                 if (failnodeuuid) {
2467                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2468                         name_destroy(&failnodeuuid);
2469                         failnodeuuid = NULL;
2470                 }
2471         }
2472
2473         return rc;
2474 }
2475
2476 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2477                                     struct mgs_device *mgs,
2478                                     struct fs_db *fsdb,
2479                                     struct mgs_target_info *mti,
2480                                     char *logname, char *lmvname)
2481 {
2482         struct llog_handle *llh = NULL;
2483         char *mdcname = NULL;
2484         char *nodeuuid = NULL;
2485         char *mdcuuid = NULL;
2486         char *lmvuuid = NULL;
2487         char index[6];
2488         char nidstr[LNET_NIDSTR_SIZE];
2489         int i, rc;
2490         ENTRY;
2491
2492         if (mgs_log_is_empty(env, mgs, logname)) {
2493                 CERROR("log is empty! Logical error\n");
2494                 RETURN(-EINVAL);
2495         }
2496
2497         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2498                mti->mti_svname, logname, lmvname);
2499
2500         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2501         rc = name_create(&nodeuuid, nidstr, "");
2502         if (rc)
2503                 RETURN(rc);
2504         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2505         if (rc)
2506                 GOTO(out_free, rc);
2507         rc = name_create(&mdcuuid, mdcname, "_UUID");
2508         if (rc)
2509                 GOTO(out_free, rc);
2510         rc = name_create(&lmvuuid, lmvname, "_UUID");
2511         if (rc)
2512                 GOTO(out_free, rc);
2513
2514         rc = record_start_log(env, mgs, &llh, logname);
2515         if (rc)
2516                 GOTO(out_free, rc);
2517         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2518                            "add mdc");
2519         if (rc)
2520                 GOTO(out_end, rc);
2521         for (i = 0; i < mti->mti_nid_count; i++) {
2522                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2523                         libcfs_nid2str_r(mti->mti_nids[i],
2524                                          nidstr, sizeof(nidstr)));
2525
2526                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2527                 if (rc)
2528                         GOTO(out_end, rc);
2529         }
2530
2531         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2532         if (rc)
2533                 GOTO(out_end, rc);
2534         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2535                           NULL, NULL);
2536         if (rc)
2537                 GOTO(out_end, rc);
2538         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2539         if (rc)
2540                 GOTO(out_end, rc);
2541         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2542         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2543                             index, "1");
2544         if (rc)
2545                 GOTO(out_end, rc);
2546         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2547                            "add mdc");
2548         if (rc)
2549                 GOTO(out_end, rc);
2550 out_end:
2551         record_end_log(env, &llh);
2552 out_free:
2553         name_destroy(&lmvuuid);
2554         name_destroy(&mdcuuid);
2555         name_destroy(&mdcname);
2556         name_destroy(&nodeuuid);
2557         RETURN(rc);
2558 }
2559
2560 static inline int name_create_lov(char **lovname, char *mdtname,
2561                                   struct fs_db *fsdb, int index)
2562 {
2563         /* COMPAT_180 */
2564         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2565                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2566         else
2567                 return name_create(lovname, mdtname, "-mdtlov");
2568 }
2569
2570 static int name_create_mdt_and_lov(char **logname, char **lovname,
2571                                    struct fs_db *fsdb, int i)
2572 {
2573         int rc;
2574
2575         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2576         if (rc)
2577                 return rc;
2578         /* COMPAT_180 */
2579         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2580                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2581         else
2582                 rc = name_create(lovname, *logname, "-mdtlov");
2583         if (rc) {
2584                 name_destroy(logname);
2585                 *logname = NULL;
2586         }
2587         return rc;
2588 }
2589
2590 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2591                                       struct fs_db *fsdb, int i)
2592 {
2593         char suffix[16];
2594
2595         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2596                 sprintf(suffix, "-osc");
2597         else
2598                 sprintf(suffix, "-osc-MDT%04x", i);
2599         return name_create(oscname, ostname, suffix);
2600 }
2601
2602 /* add new mdc to already existent MDS */
2603 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2604                                     struct mgs_device *mgs,
2605                                     struct fs_db *fsdb,
2606                                     struct mgs_target_info *mti,
2607                                     int mdt_index, char *logname)
2608 {
2609         struct llog_handle      *llh = NULL;
2610         char    *nodeuuid = NULL;
2611         char    *ospname = NULL;
2612         char    *lovuuid = NULL;
2613         char    *mdtuuid = NULL;
2614         char    *svname = NULL;
2615         char    *mdtname = NULL;
2616         char    *lovname = NULL;
2617         char    index_str[16];
2618         char    nidstr[LNET_NIDSTR_SIZE];
2619         int     i, rc;
2620
2621         ENTRY;
2622         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2623                 CERROR("log is empty! Logical error\n");
2624                 RETURN (-EINVAL);
2625         }
2626
2627         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2628                logname);
2629
2630         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2631         if (rc)
2632                 RETURN(rc);
2633
2634         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2635         rc = name_create(&nodeuuid, nidstr, "");
2636         if (rc)
2637                 GOTO(out_destory, rc);
2638
2639         rc = name_create(&svname, mdtname, "-osp");
2640         if (rc)
2641                 GOTO(out_destory, rc);
2642
2643         sprintf(index_str, "-MDT%04x", mdt_index);
2644         rc = name_create(&ospname, svname, index_str);
2645         if (rc)
2646                 GOTO(out_destory, rc);
2647
2648         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2649         if (rc)
2650                 GOTO(out_destory, rc);
2651
2652         rc = name_create(&lovuuid, lovname, "_UUID");
2653         if (rc)
2654                 GOTO(out_destory, rc);
2655
2656         rc = name_create(&mdtuuid, mdtname, "_UUID");
2657         if (rc)
2658                 GOTO(out_destory, rc);
2659
2660         rc = record_start_log(env, mgs, &llh, logname);
2661         if (rc)
2662                 GOTO(out_destory, rc);
2663
2664         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2665                            "add osp");
2666         if (rc)
2667                 GOTO(out_destory, rc);
2668
2669         for (i = 0; i < mti->mti_nid_count; i++) {
2670                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2671                         libcfs_nid2str_r(mti->mti_nids[i],
2672                                          nidstr, sizeof(nidstr)));
2673                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2674                 if (rc)
2675                         GOTO(out_end, rc);
2676         }
2677
2678         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2679         if (rc)
2680                 GOTO(out_end, rc);
2681
2682         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2683                           NULL, NULL);
2684         if (rc)
2685                 GOTO(out_end, rc);
2686
2687         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2688         if (rc)
2689                 GOTO(out_end, rc);
2690
2691         /* Add mdc(osp) to lod */
2692         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2693         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2694                          index_str, "1", NULL);
2695         if (rc)
2696                 GOTO(out_end, rc);
2697
2698         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2699         if (rc)
2700                 GOTO(out_end, rc);
2701
2702 out_end:
2703         record_end_log(env, &llh);
2704
2705 out_destory:
2706         name_destroy(&mdtuuid);
2707         name_destroy(&lovuuid);
2708         name_destroy(&lovname);
2709         name_destroy(&ospname);
2710         name_destroy(&svname);
2711         name_destroy(&nodeuuid);
2712         name_destroy(&mdtname);
2713         RETURN(rc);
2714 }
2715
2716 static int mgs_write_log_mdt0(const struct lu_env *env,
2717                               struct mgs_device *mgs,
2718                               struct fs_db *fsdb,
2719                               struct mgs_target_info *mti)
2720 {
2721         char *log = mti->mti_svname;
2722         struct llog_handle *llh = NULL;
2723         char *uuid, *lovname;
2724         char mdt_index[6];
2725         char *ptr = mti->mti_params;
2726         int rc = 0, failout = 0;
2727         ENTRY;
2728
2729         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2730         if (uuid == NULL)
2731                 RETURN(-ENOMEM);
2732
2733         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2734                 failout = (strncmp(ptr, "failout", 7) == 0);
2735
2736         rc = name_create(&lovname, log, "-mdtlov");
2737         if (rc)
2738                 GOTO(out_free, rc);
2739         if (mgs_log_is_empty(env, mgs, log)) {
2740                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2741                 if (rc)
2742                         GOTO(out_lod, rc);
2743         }
2744
2745         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2746
2747         rc = record_start_log(env, mgs, &llh, log);
2748         if (rc)
2749                 GOTO(out_lod, rc);
2750
2751         /* add MDT itself */
2752
2753         /* FIXME this whole fn should be a single journal transaction */
2754         sprintf(uuid, "%s_UUID", log);
2755         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2756         if (rc)
2757                 GOTO(out_lod, rc);
2758         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2759         if (rc)
2760                 GOTO(out_end, rc);
2761         rc = record_mount_opt(env, llh, log, lovname, NULL);
2762         if (rc)
2763                 GOTO(out_end, rc);
2764         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2765                         failout ? "n" : "f");
2766         if (rc)
2767                 GOTO(out_end, rc);
2768         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2769         if (rc)
2770                 GOTO(out_end, rc);
2771 out_end:
2772         record_end_log(env, &llh);
2773 out_lod:
2774         name_destroy(&lovname);
2775 out_free:
2776         OBD_FREE(uuid, sizeof(struct obd_uuid));
2777         RETURN(rc);
2778 }
2779
2780 /* envelope method for all layers log */
2781 static int mgs_write_log_mdt(const struct lu_env *env,
2782                              struct mgs_device *mgs,
2783                              struct fs_db *fsdb,
2784                              struct mgs_target_info *mti)
2785 {
2786         struct mgs_thread_info *mgi = mgs_env_info(env);
2787         struct llog_handle *llh = NULL;
2788         char *cliname;
2789         int rc, i = 0;
2790         ENTRY;
2791
2792         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2793
2794         if (mti->mti_uuid[0] == '\0') {
2795                 /* Make up our own uuid */
2796                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2797                          "%s_UUID", mti->mti_svname);
2798         }
2799
2800         /* add mdt */
2801         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2802         if (rc)
2803                 RETURN(rc);
2804         /* Append the mdt info to the client log */
2805         rc = name_create(&cliname, mti->mti_fsname, "-client");
2806         if (rc)
2807                 RETURN(rc);
2808
2809         if (mgs_log_is_empty(env, mgs, cliname)) {
2810                 /* Start client log */
2811                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2812                                        fsdb->fsdb_clilov);
2813                 if (rc)
2814                         GOTO(out_free, rc);
2815                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2816                                        fsdb->fsdb_clilmv);
2817                 if (rc)
2818                         GOTO(out_free, rc);
2819         }
2820
2821         /*
2822         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2823         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2824         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2825         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2826         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2827         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2828         */
2829
2830         /* copy client info about lov/lmv */
2831         mgi->mgi_comp.comp_mti = mti;
2832         mgi->mgi_comp.comp_fsdb = fsdb;
2833
2834         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2835                                                 &mgi->mgi_comp);
2836         if (rc)
2837                 GOTO(out_free, rc);
2838         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2839                                       fsdb->fsdb_clilmv);
2840         if (rc)
2841                 GOTO(out_free, rc);
2842
2843         /* add mountopts */
2844         rc = record_start_log(env, mgs, &llh, cliname);
2845         if (rc)
2846                 GOTO(out_free, rc);
2847
2848         rc = record_marker(env, llh, fsdb, CM_START, cliname, "mount opts");
2849         if (rc)
2850                 GOTO(out_end, rc);
2851         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2852                               fsdb->fsdb_clilmv);
2853         if (rc)
2854                 GOTO(out_end, rc);
2855         rc = record_marker(env, llh, fsdb, CM_END, cliname, "mount opts");
2856         if (rc)
2857                 GOTO(out_end, rc);
2858
2859         /* for_all_existing_mdt except current one */
2860         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2861                 if (i !=  mti->mti_stripe_index &&
2862                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2863                         char *logname;
2864
2865                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2866                         if (rc)
2867                                 GOTO(out_end, rc);
2868
2869                         /* NB: If the log for the MDT is empty, it means
2870                          * the MDT is only added to the index
2871                          * map, and not being process yet, i.e. this
2872                          * is an unregistered MDT, see mgs_write_log_target().
2873                          * so we should skip it. Otherwise
2874                          *
2875                          * 1. MGS get register request for MDT1 and MDT2.
2876                          *
2877                          * 2. Then both MDT1 and MDT2 are added into
2878                          * fsdb_mdt_index_map. (see mgs_set_index()).
2879                          *
2880                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2881                          * generate the config log, here, it will regard MDT2
2882                          * as an existent MDT, and generate "add osp" for
2883                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2884                          * MDT0002 config log is still empty, so it will
2885                          * add "add osp" even before "lov setup", which
2886                          * will definitly cause trouble.
2887                          *
2888                          * 4. MDT1 registeration finished, fsdb_mutex is
2889                          * released, then MDT2 get in, then in above
2890                          * mgs_steal_llog_for_mdt_from_client(), it will
2891                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2892                          * which will cause another trouble.*/
2893                         if (!mgs_log_is_empty(env, mgs, logname))
2894                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2895                                                               mti, i, logname);
2896
2897                         name_destroy(&logname);
2898                         if (rc)
2899                                 GOTO(out_end, rc);
2900                 }
2901         }
2902 out_end:
2903         record_end_log(env, &llh);
2904 out_free:
2905         name_destroy(&cliname);
2906         RETURN(rc);
2907 }
2908
2909 /* Add the ost info to the client/mdt lov */
2910 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2911                                     struct mgs_device *mgs, struct fs_db *fsdb,
2912                                     struct mgs_target_info *mti,
2913                                     char *logname, char *suffix, char *lovname,
2914                                     enum lustre_sec_part sec_part, int flags)
2915 {
2916         struct llog_handle *llh = NULL;
2917         char *nodeuuid = NULL;
2918         char *oscname = NULL;
2919         char *oscuuid = NULL;
2920         char *lovuuid = NULL;
2921         char *svname = NULL;
2922         char index[6];
2923         char nidstr[LNET_NIDSTR_SIZE];
2924         int i, rc;
2925         ENTRY;
2926
2927         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2928                 mti->mti_svname, logname);
2929
2930         if (mgs_log_is_empty(env, mgs, logname)) {
2931                 CERROR("log is empty! Logical error\n");
2932                 RETURN(-EINVAL);
2933         }
2934
2935         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2936         rc = name_create(&nodeuuid, nidstr, "");
2937         if (rc)
2938                 RETURN(rc);
2939         rc = name_create(&svname, mti->mti_svname, "-osc");
2940         if (rc)
2941                 GOTO(out_free, rc);
2942
2943         /* for the system upgraded from old 1.8, keep using the old osc naming
2944          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2945         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2946                 rc = name_create(&oscname, svname, "");
2947         else
2948                 rc = name_create(&oscname, svname, suffix);
2949         if (rc)
2950                 GOTO(out_free, rc);
2951
2952         rc = name_create(&oscuuid, oscname, "_UUID");
2953         if (rc)
2954                 GOTO(out_free, rc);
2955         rc = name_create(&lovuuid, lovname, "_UUID");
2956         if (rc)
2957                 GOTO(out_free, rc);
2958
2959
2960         /*
2961         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2962         multihomed (#4)
2963         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2964         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2965         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2966         failover (#6,7)
2967         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2968         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2969         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2970         */
2971
2972         rc = record_start_log(env, mgs, &llh, logname);
2973         if (rc)
2974                 GOTO(out_free, rc);
2975
2976         /* FIXME these should be a single journal transaction */
2977         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2978                            "add osc");
2979         if (rc)
2980                 GOTO(out_end, rc);
2981
2982         /* NB: don't change record order, because upon MDT steal OSC config
2983          * from client, it treats all nids before LCFG_SETUP as target nids
2984          * (multiple interfaces), while nids after as failover node nids.
2985          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2986          */
2987         for (i = 0; i < mti->mti_nid_count; i++) {
2988                 CDEBUG(D_MGS, "add nid %s\n",
2989                         libcfs_nid2str_r(mti->mti_nids[i],
2990                                          nidstr, sizeof(nidstr)));
2991                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2992                 if (rc)
2993                         GOTO(out_end, rc);
2994         }
2995         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2996         if (rc)
2997                 GOTO(out_end, rc);
2998         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2999                           NULL, NULL);
3000         if (rc)
3001                 GOTO(out_end, rc);
3002         rc = mgs_write_log_failnids(env, mti, llh, oscname);
3003         if (rc)
3004                 GOTO(out_end, rc);
3005
3006         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
3007
3008         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
3009         if (rc)
3010                 GOTO(out_end, rc);
3011         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
3012                            "add osc");
3013         if (rc)
3014                 GOTO(out_end, rc);
3015 out_end:
3016         record_end_log(env, &llh);
3017 out_free:
3018         name_destroy(&lovuuid);
3019         name_destroy(&oscuuid);
3020         name_destroy(&oscname);
3021         name_destroy(&svname);
3022         name_destroy(&nodeuuid);
3023         RETURN(rc);
3024 }
3025
3026 static int mgs_write_log_ost(const struct lu_env *env,
3027                              struct mgs_device *mgs, struct fs_db *fsdb,
3028                              struct mgs_target_info *mti)
3029 {
3030         struct llog_handle *llh = NULL;
3031         char *logname, *lovname;
3032         char *ptr = mti->mti_params;
3033         int rc, flags = 0, failout = 0, i;
3034         ENTRY;
3035
3036         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
3037
3038         /* The ost startup log */
3039
3040         /* If the ost log already exists, that means that someone reformatted
3041            the ost and it called target_add again. */
3042         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3043                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
3044                                    "exists, yet the server claims it never "
3045                                    "registered. It may have been reformatted, "
3046                                    "or the index changed. writeconf the MDT to "
3047                                    "regenerate all logs.\n", mti->mti_svname);
3048                 RETURN(-EALREADY);
3049         }
3050
3051         /*
3052         attach obdfilter ost1 ost1_UUID
3053         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
3054         */
3055         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
3056                 failout = (strncmp(ptr, "failout", 7) == 0);
3057         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
3058         if (rc)
3059                 RETURN(rc);
3060         /* FIXME these should be a single journal transaction */
3061         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
3062         if (rc)
3063                 GOTO(out_end, rc);
3064         if (*mti->mti_uuid == '\0')
3065                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
3066                          "%s_UUID", mti->mti_svname);
3067         rc = record_attach(env, llh, mti->mti_svname,
3068                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
3069         if (rc)
3070                 GOTO(out_end, rc);
3071         rc = record_setup(env, llh, mti->mti_svname,
3072                           "dev"/*ignored*/, "type"/*ignored*/,
3073                           failout ? "n" : "f", NULL/*options*/);
3074         if (rc)
3075                 GOTO(out_end, rc);
3076         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
3077         if (rc)
3078                 GOTO(out_end, rc);
3079 out_end:
3080         record_end_log(env, &llh);
3081         if (rc)
3082                 RETURN(rc);
3083         /* We also have to update the other logs where this osc is part of
3084            the lov */
3085
3086         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3087                 /* If we're upgrading, the old mdt log already has our
3088                    entry. Let's do a fake one for fun. */
3089                 /* Note that we can't add any new failnids, since we don't
3090                    know the old osc names. */
3091                 flags = CM_SKIP | CM_UPGRADE146;
3092
3093         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
3094                 /* If the update flag isn't set, don't update client/mdt
3095                    logs. */
3096                 flags |= CM_SKIP;
3097                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
3098                               "the MDT first to regenerate it.\n",
3099                               mti->mti_svname);
3100         }
3101
3102         /* Add ost to all MDT lov defs */
3103         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3104                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
3105                         char mdt_index[13];
3106
3107                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
3108                                                      i);
3109                         if (rc)
3110                                 RETURN(rc);
3111
3112                         snprintf(mdt_index, sizeof(mdt_index), "-MDT%04x", i);
3113                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
3114                                                       logname, mdt_index,
3115                                                       lovname, LUSTRE_SP_MDT,
3116                                                       flags);
3117                         name_destroy(&logname);
3118                         name_destroy(&lovname);
3119                         if (rc)
3120                                 RETURN(rc);
3121                 }
3122         }
3123
3124         /* Append ost info to the client log */
3125         rc = name_create(&logname, mti->mti_fsname, "-client");
3126         if (rc)
3127                 RETURN(rc);
3128         if (mgs_log_is_empty(env, mgs, logname)) {
3129                 /* Start client log */
3130                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
3131                                        fsdb->fsdb_clilov);
3132                 if (rc)
3133                         GOTO(out_free, rc);
3134                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
3135                                        fsdb->fsdb_clilmv);
3136                 if (rc)
3137                         GOTO(out_free, rc);
3138         }
3139         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
3140                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
3141 out_free:
3142         name_destroy(&logname);
3143         RETURN(rc);
3144 }
3145
3146 static __inline__ int mgs_param_empty(char *ptr)
3147 {
3148         char *tmp;
3149
3150         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
3151                 return 1;
3152         return 0;
3153 }
3154
3155 static int mgs_write_log_failnid_internal(const struct lu_env *env,
3156                                           struct mgs_device *mgs,
3157                                           struct fs_db *fsdb,
3158                                           struct mgs_target_info *mti,
3159                                           char *logname, char *cliname)
3160 {
3161         int rc;
3162         struct llog_handle *llh = NULL;
3163
3164         if (mgs_param_empty(mti->mti_params)) {
3165                 /* Remove _all_ failnids */
3166                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3167                                 mti->mti_svname, "add failnid", CM_SKIP);
3168                 return rc < 0 ? rc : 0;
3169         }
3170
3171         /* Otherwise failover nids are additive */
3172         rc = record_start_log(env, mgs, &llh, logname);
3173         if (rc)
3174                 return rc;
3175                 /* FIXME this should be a single journal transaction */
3176         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
3177                            "add failnid");
3178         if (rc)
3179                 goto out_end;
3180         rc = mgs_write_log_failnids(env, mti, llh, cliname);
3181         if (rc)
3182                 goto out_end;
3183         rc = record_marker(env, llh, fsdb, CM_END,
3184                            mti->mti_svname, "add failnid");
3185 out_end:
3186         record_end_log(env, &llh);
3187         return rc;
3188 }
3189
3190
3191 /* Add additional failnids to an existing log.
3192    The mdc/osc must have been added to logs first */
3193 /* tcp nids must be in dotted-quad ascii -
3194    we can't resolve hostnames from the kernel. */
3195 static int mgs_write_log_add_failnid(const struct lu_env *env,
3196                                      struct mgs_device *mgs,
3197                                      struct fs_db *fsdb,
3198                                      struct mgs_target_info *mti)
3199 {
3200         char *logname, *cliname;
3201         int rc;
3202         ENTRY;
3203
3204         /* FIXME we currently can't erase the failnids
3205          * given when a target first registers, since they aren't part of
3206          * an "add uuid" stanza
3207          */
3208
3209         /* Verify that we know about this target */
3210         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3211                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
3212                                    "yet. It must be started before failnids "
3213                                    "can be added.\n", mti->mti_svname);
3214                 RETURN(-ENOENT);
3215         }
3216
3217         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
3218         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3219                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
3220         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3221                 rc = name_create(&cliname, mti->mti_svname, "-osc");
3222         } else {
3223                 RETURN(-EINVAL);
3224         }
3225         if (rc)
3226                 RETURN(rc);
3227
3228         /* Add failover nids to the client log */
3229         rc = name_create(&logname, mti->mti_fsname, "-client");
3230         if (rc) {
3231                 name_destroy(&cliname);
3232                 RETURN(rc);
3233         }
3234
3235         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
3236         name_destroy(&logname);
3237         name_destroy(&cliname);
3238         if (rc)
3239                 RETURN(rc);
3240
3241         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3242                 /* Add OST failover nids to the MDT logs as well */
3243                 int i;
3244
3245                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3246                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3247                                 continue;
3248                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3249                         if (rc)
3250                                 RETURN(rc);
3251                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
3252                                                  fsdb, i);
3253                         if (rc) {
3254                                 name_destroy(&logname);
3255                                 RETURN(rc);
3256                         }
3257                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
3258                                                             mti, logname,
3259                                                             cliname);
3260                         name_destroy(&cliname);
3261                         name_destroy(&logname);
3262                         if (rc)
3263                                 RETURN(rc);
3264                 }
3265         }
3266
3267         RETURN(rc);
3268 }
3269
3270 static int mgs_wlp_lcfg(const struct lu_env *env,
3271                         struct mgs_device *mgs, struct fs_db *fsdb,
3272                         struct mgs_target_info *mti,
3273                         char *logname, struct lustre_cfg_bufs *bufs,
3274                         char *tgtname, char *ptr)
3275 {
3276         char comment[MTI_NAME_MAXLEN];
3277         char *tmp;
3278         struct llog_cfg_rec *lcr;
3279         int rc, del;
3280
3281         /* Erase any old settings of this same parameter */
3282         memcpy(comment, ptr, MTI_NAME_MAXLEN);
3283         comment[MTI_NAME_MAXLEN - 1] = 0;
3284         /* But don't try to match the value. */
3285         tmp = strchr(comment, '=');
3286         if (tmp != NULL)
3287                 *tmp = 0;
3288         /* FIXME we should skip settings that are the same as old values */
3289         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
3290         if (rc < 0)
3291                 return rc;
3292         del = mgs_param_empty(ptr);
3293
3294         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
3295                       "Setting" : "Modifying", tgtname, comment, logname);
3296         if (del) {
3297                 /* mgs_modify() will return 1 if nothing had to be done */
3298                 if (rc == 1)
3299                         rc = 0;
3300                 return rc;
3301         }
3302
3303         lustre_cfg_bufs_reset(bufs, tgtname);
3304         lustre_cfg_bufs_set_string(bufs, 1, ptr);
3305         if (mti->mti_flags & LDD_F_PARAM2)
3306                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
3307
3308         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
3309                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
3310         if (lcr == NULL)
3311                 return -ENOMEM;
3312
3313         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
3314                                   comment);
3315         lustre_cfg_rec_free(lcr);
3316         return rc;
3317 }
3318
3319 /* write global variable settings into log */
3320 static int mgs_write_log_sys(const struct lu_env *env,
3321                              struct mgs_device *mgs, struct fs_db *fsdb,
3322                              struct mgs_target_info *mti, char *sys, char *ptr)
3323 {
3324         struct mgs_thread_info  *mgi = mgs_env_info(env);
3325         struct lustre_cfg       *lcfg;
3326         struct llog_cfg_rec     *lcr;
3327         char *tmp, sep;
3328         int rc, cmd, convert = 1;
3329
3330         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
3331                 cmd = LCFG_SET_TIMEOUT;
3332         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
3333                 cmd = LCFG_SET_LDLM_TIMEOUT;
3334         /* Check for known params here so we can return error to lctl */
3335         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
3336                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
3337                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
3338                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
3339                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
3340                 cmd = LCFG_PARAM;
3341         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
3342                 convert = 0; /* Don't convert string value to integer */
3343                 cmd = LCFG_PARAM;
3344         } else {
3345                 return -EINVAL;
3346         }
3347
3348         if (mgs_param_empty(ptr))
3349                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
3350         else
3351                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
3352
3353         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
3354         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
3355         if (!convert && *tmp != '\0')
3356                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
3357         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3358         if (lcr == NULL)
3359                 return -ENOMEM;
3360
3361         lcfg = &lcr->lcr_cfg;
3362         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
3363         /* truncate the comment to the parameter name */
3364         ptr = tmp - 1;
3365         sep = *ptr;
3366         *ptr = '\0';
3367         /* modify all servers and clients */
3368         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3369                                       *tmp == '\0' ? NULL : lcr,
3370                                       mti->mti_fsname, sys, 0);
3371         if (rc == 0 && *tmp != '\0') {
3372                 switch (cmd) {
3373                 case LCFG_SET_TIMEOUT:
3374                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
3375                                 class_process_config(lcfg);
3376                         break;
3377                 case LCFG_SET_LDLM_TIMEOUT:
3378                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
3379                                 class_process_config(lcfg);
3380                         break;
3381                 default:
3382                         break;
3383                 }
3384         }
3385         *ptr = sep;
3386         lustre_cfg_rec_free(lcr);
3387         return rc;
3388 }
3389
3390 /* write quota settings into log */
3391 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
3392                                struct fs_db *fsdb, struct mgs_target_info *mti,
3393                                char *quota, char *ptr)
3394 {
3395         struct mgs_thread_info  *mgi = mgs_env_info(env);
3396         struct llog_cfg_rec     *lcr;
3397         char                    *tmp;
3398         char                     sep;
3399         int                      rc, cmd = LCFG_PARAM;
3400
3401         /* support only 'meta' and 'data' pools so far */
3402         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
3403             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
3404                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
3405                        "& quota.ost are)\n", ptr);
3406                 return -EINVAL;
3407         }
3408
3409         if (*tmp == '\0') {
3410                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
3411         } else {
3412                 CDEBUG(D_MGS, "global '%s'\n", quota);
3413
3414                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
3415                     strchr(tmp, 'p') == NULL &&
3416                     strcmp(tmp, "none") != 0) {
3417                         CERROR("enable option(%s) isn't supported\n", tmp);
3418                         return -EINVAL;
3419                 }
3420         }
3421
3422         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
3423         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
3424         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3425         if (lcr == NULL)
3426                 return -ENOMEM;
3427
3428         /* truncate the comment to the parameter name */
3429         ptr = tmp - 1;
3430         sep = *ptr;
3431         *ptr = '\0';
3432
3433         /* XXX we duplicated quota enable information in all server
3434          *     config logs, it should be moved to a separate config
3435          *     log once we cleanup the config log for global param. */
3436         /* modify all servers */
3437         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3438                                       *tmp == '\0' ? NULL : lcr,
3439                                       mti->mti_fsname, quota, 1);
3440         *ptr = sep;
3441         lustre_cfg_rec_free(lcr);
3442         return rc < 0 ? rc : 0;
3443 }
3444
3445 static int mgs_srpc_set_param_disk(const struct lu_env *env,
3446                                    struct mgs_device *mgs,
3447                                    struct fs_db *fsdb,
3448                                    struct mgs_target_info *mti,
3449                                    char *param)
3450 {
3451         struct mgs_thread_info  *mgi = mgs_env_info(env);
3452         struct llog_cfg_rec     *lcr;
3453         struct llog_handle      *llh = NULL;
3454         char                    *logname;
3455         char                    *comment, *ptr;
3456         int                      rc, len;
3457
3458         ENTRY;
3459
3460         /* get comment */
3461         ptr = strchr(param, '=');
3462         LASSERT(ptr != NULL);
3463         len = ptr - param;
3464
3465         OBD_ALLOC(comment, len + 1);
3466         if (comment == NULL)
3467                 RETURN(-ENOMEM);
3468         strncpy(comment, param, len);
3469         comment[len] = '\0';
3470
3471         /* prepare lcfg */
3472         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
3473         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
3474         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
3475         if (lcr == NULL)
3476                 GOTO(out_comment, rc = -ENOMEM);
3477
3478         /* construct log name */
3479         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
3480         if (rc < 0)
3481                 GOTO(out_lcfg, rc);
3482
3483         if (mgs_log_is_empty(env, mgs, logname)) {
3484                 rc = record_start_log(env, mgs, &llh, logname);
3485                 if (rc < 0)
3486                         GOTO(out, rc);
3487                 record_end_log(env, &llh);
3488         }
3489
3490         /* obsolete old one */
3491         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
3492                         comment, CM_SKIP);
3493         if (rc < 0)
3494                 GOTO(out, rc);
3495         /* write the new one */
3496         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
3497                                   mti->mti_svname, comment);
3498         if (rc)
3499                 CERROR("%s: error writing log %s: rc = %d\n",
3500                        mgs->mgs_obd->obd_name, logname, rc);
3501 out:
3502         name_destroy(&logname);
3503 out_lcfg:
3504         lustre_cfg_rec_free(lcr);
3505 out_comment:
3506         OBD_FREE(comment, len + 1);
3507         RETURN(rc);
3508 }
3509
3510 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3511                                         char *param)
3512 {
3513         char    *ptr;
3514
3515         /* disable the adjustable udesc parameter for now, i.e. use default
3516          * setting that client always ship udesc to MDT if possible. to enable
3517          * it simply remove the following line */
3518         goto error_out;
3519
3520         ptr = strchr(param, '=');
3521         if (ptr == NULL)
3522                 goto error_out;
3523         *ptr++ = '\0';
3524
3525         if (strcmp(param, PARAM_SRPC_UDESC))
3526                 goto error_out;
3527
3528         if (strcmp(ptr, "yes") == 0) {
3529                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3530                 CWARN("Enable user descriptor shipping from client to MDT\n");
3531         } else if (strcmp(ptr, "no") == 0) {
3532                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3533                 CWARN("Disable user descriptor shipping from client to MDT\n");
3534         } else {
3535                 *(ptr - 1) = '=';
3536                 goto error_out;
3537         }
3538         return 0;
3539
3540 error_out:
3541         CERROR("Invalid param: %s\n", param);
3542         return -EINVAL;
3543 }
3544
3545 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3546                                   const char *svname,
3547                                   char *param)
3548 {
3549         struct sptlrpc_rule      rule;
3550         struct sptlrpc_rule_set *rset;
3551         int                      rc;
3552         ENTRY;
3553
3554         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3555                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3556                 RETURN(-EINVAL);
3557         }
3558
3559         if (strncmp(param, PARAM_SRPC_UDESC,
3560                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3561                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3562         }
3563
3564         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3565                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3566                 RETURN(-EINVAL);
3567         }
3568
3569         param += sizeof(PARAM_SRPC_FLVR) - 1;
3570
3571         rc = sptlrpc_parse_rule(param, &rule);
3572         if (rc)
3573                 RETURN(rc);
3574
3575         /* mgs rules implies must be mgc->mgs */
3576         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3577                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3578                      rule.sr_from != LUSTRE_SP_ANY) ||
3579                     (rule.sr_to != LUSTRE_SP_MGS &&
3580                      rule.sr_to != LUSTRE_SP_ANY))
3581                         RETURN(-EINVAL);
3582         }
3583
3584         /* preapre room for this coming rule. svcname format should be:
3585          * - fsname: general rule
3586          * - fsname-tgtname: target-specific rule
3587          */
3588         if (strchr(svname, '-')) {
3589                 struct mgs_tgt_srpc_conf *tgtconf;
3590                 int                       found = 0;
3591
3592                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3593                      tgtconf = tgtconf->mtsc_next) {
3594                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3595                                 found = 1;
3596                                 break;
3597                         }
3598                 }
3599
3600                 if (!found) {
3601                         int name_len;
3602
3603                         OBD_ALLOC_PTR(tgtconf);
3604                         if (tgtconf == NULL)
3605                                 RETURN(-ENOMEM);
3606
3607                         name_len = strlen(svname);
3608
3609                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3610                         if (tgtconf->mtsc_tgt == NULL) {
3611                                 OBD_FREE_PTR(tgtconf);
3612                                 RETURN(-ENOMEM);
3613                         }
3614                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3615
3616                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3617                         fsdb->fsdb_srpc_tgt = tgtconf;
3618                 }
3619
3620                 rset = &tgtconf->mtsc_rset;
3621         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3622                 /* put _mgs related srpc rule directly in mgs ruleset */
3623                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3624         } else {
3625                 rset = &fsdb->fsdb_srpc_gen;
3626         }
3627
3628         rc = sptlrpc_rule_set_merge(rset, &rule);
3629
3630         RETURN(rc);
3631 }
3632
3633 static int mgs_srpc_set_param(const struct lu_env *env,
3634                               struct mgs_device *mgs,
3635                               struct fs_db *fsdb,
3636                               struct mgs_target_info *mti,
3637                               char *param)
3638 {
3639         char                   *copy;
3640         int                     rc, copy_size;
3641         ENTRY;
3642
3643 #ifndef HAVE_GSS
3644         RETURN(-EINVAL);
3645 #endif
3646         /* keep a copy of original param, which could be destroied
3647          * during parsing */
3648         copy_size = strlen(param) + 1;
3649         OBD_ALLOC(copy, copy_size);
3650         if (copy == NULL)
3651                 return -ENOMEM;
3652         memcpy(copy, param, copy_size);
3653
3654         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3655         if (rc)
3656                 goto out_free;
3657
3658         /* previous steps guaranteed the syntax is correct */
3659         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3660         if (rc)
3661                 goto out_free;
3662
3663         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3664                 /*
3665                  * for mgs rules, make them effective immediately.
3666                  */
3667                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3668                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3669                                                  &fsdb->fsdb_srpc_gen);
3670         }
3671
3672 out_free:
3673         OBD_FREE(copy, copy_size);
3674         RETURN(rc);
3675 }
3676
3677 struct mgs_srpc_read_data {
3678         struct fs_db   *msrd_fsdb;
3679         int             msrd_skip;
3680 };
3681
3682 static int mgs_srpc_read_handler(const struct lu_env *env,
3683                                  struct llog_handle *llh,
3684                                  struct llog_rec_hdr *rec, void *data)
3685 {
3686         struct mgs_srpc_read_data *msrd = data;
3687         struct cfg_marker         *marker;
3688         struct lustre_cfg         *lcfg = REC_DATA(rec);
3689         char                      *svname, *param;
3690         int                        cfg_len, rc;
3691         ENTRY;
3692
3693         if (rec->lrh_type != OBD_CFG_REC) {
3694                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3695                 RETURN(-EINVAL);
3696         }
3697
3698         cfg_len = REC_DATA_LEN(rec);
3699
3700         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3701         if (rc) {
3702                 CERROR("Insane cfg\n");
3703                 RETURN(rc);
3704         }
3705
3706         if (lcfg->lcfg_command == LCFG_MARKER) {
3707                 marker = lustre_cfg_buf(lcfg, 1);
3708
3709                 if (marker->cm_flags & CM_START &&
3710                     marker->cm_flags & CM_SKIP)
3711                         msrd->msrd_skip = 1;
3712                 if (marker->cm_flags & CM_END)
3713                         msrd->msrd_skip = 0;
3714
3715                 RETURN(0);
3716         }
3717
3718         if (msrd->msrd_skip)
3719                 RETURN(0);
3720
3721         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3722                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3723                 RETURN(0);
3724         }
3725
3726         svname = lustre_cfg_string(lcfg, 0);
3727         if (svname == NULL) {
3728                 CERROR("svname is empty\n");
3729                 RETURN(0);
3730         }
3731
3732         param = lustre_cfg_string(lcfg, 1);
3733         if (param == NULL) {
3734                 CERROR("param is empty\n");
3735                 RETURN(0);
3736         }
3737
3738         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3739         if (rc)
3740                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3741
3742         RETURN(0);
3743 }
3744
3745 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3746                                 struct mgs_device *mgs,
3747                                 struct fs_db *fsdb)
3748 {
3749         struct llog_handle        *llh = NULL;
3750         struct llog_ctxt          *ctxt;
3751         char                      *logname;
3752         struct mgs_srpc_read_data  msrd;
3753         int                        rc;
3754         ENTRY;
3755
3756         /* construct log name */
3757         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3758         if (rc)
3759                 RETURN(rc);
3760
3761         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3762         LASSERT(ctxt != NULL);
3763
3764         if (mgs_log_is_empty(env, mgs, logname))
3765                 GOTO(out, rc = 0);
3766
3767         rc = llog_open(env, ctxt, &llh, NULL, logname,
3768                        LLOG_OPEN_EXISTS);
3769         if (rc < 0) {
3770                 if (rc == -ENOENT)
3771                         rc = 0;
3772                 GOTO(out, rc);
3773         }
3774
3775         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3776         if (rc)
3777                 GOTO(out_close, rc);
3778
3779         if (llog_get_size(llh) <= 1)
3780                 GOTO(out_close, rc = 0);
3781
3782         msrd.msrd_fsdb = fsdb;
3783         msrd.msrd_skip = 0;
3784
3785         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3786                           NULL);
3787
3788 out_close:
3789         llog_close(env, llh);
3790 out:
3791         llog_ctxt_put(ctxt);
3792         name_destroy(&logname);
3793
3794         if (rc)
3795                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3796         RETURN(rc);
3797 }
3798
3799 static int mgs_write_log_param2(const struct lu_env *env,
3800                                 struct mgs_device *mgs,
3801                                 struct fs_db *fsdb,
3802                                 struct mgs_target_info *mti, char *ptr)
3803 {
3804         struct lustre_cfg_bufs bufs;
3805         int rc;
3806
3807         ENTRY;
3808         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3809
3810         /* PARAM_MGSNODE and PARAM_NETWORK are set only when formating
3811          * or during the inital mount. It can never change after that.
3812          */
3813         if (!class_match_param(ptr, PARAM_MGSNODE, NULL) ||
3814             !class_match_param(ptr, PARAM_NETWORK, NULL)) {
3815                 rc = 0;
3816                 goto end;
3817         }
3818
3819         /* Processed in mgs_write_log_ost. Another value that can't
3820          * be changed by lctl set_param -P.
3821          */
3822         if (!class_match_param(ptr, PARAM_FAILMODE, NULL)) {
3823                 LCONSOLE_ERROR_MSG(0x169,
3824                                    "%s can only be changed with tunefs.lustre and --writeconf\n",
3825                                    ptr);
3826                 rc = -EPERM;
3827                 goto end;
3828         }
3829
3830         /* FIXME !!! Support for sptlrpc is incomplete. Currently the change
3831          * doesn't transmit to the client. See LU-7183.
3832          */
3833         if (!class_match_param(ptr, PARAM_SRPC, NULL)) {
3834                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3835                 goto end;
3836         }
3837
3838         /* Can't use class_match_param since ptr doesn't start with
3839          * PARAM_FAILNODE. So we look for PARAM_FAILNODE contained in ptr.
3840          */
3841         if (strstr(ptr, PARAM_FAILNODE)) {
3842                 /* Add a failover nidlist. We already processed failovers
3843                  * params for new targets in mgs_write_log_target.
3844                  */
3845                 const char *param;
3846
3847                 /* can't use wildcards with failover.node */
3848                 if (strchr(ptr, '*')) {
3849                         rc = -ENODEV;
3850                         goto end;
3851                 }
3852
3853                 param = strstr(ptr, PARAM_FAILNODE);
3854                 if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
3855                     sizeof(mti->mti_params)) {
3856                         rc = -E2BIG;
3857                         goto end;
3858                 }
3859
3860                 CDEBUG(D_MGS, "Adding failnode with param %s\n",
3861                        mti->mti_params);
3862                 rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3863                 goto end;
3864         }
3865
3866         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
3867                           mti->mti_svname, ptr);
3868 end:
3869         RETURN(rc);
3870 }
3871
3872 /* Permanent settings of all parameters by writing into the appropriate
3873  * configuration logs.
3874  * A parameter with null value ("<param>='\0'") means to erase it out of
3875  * the logs.
3876  */
3877 static int mgs_write_log_param(const struct lu_env *env,
3878                                struct mgs_device *mgs, struct fs_db *fsdb,
3879                                struct mgs_target_info *mti, char *ptr)
3880 {
3881         struct mgs_thread_info *mgi = mgs_env_info(env);
3882         char *logname;
3883         char *tmp;
3884         int rc = 0;
3885         ENTRY;
3886
3887         /* For various parameter settings, we have to figure out which logs
3888            care about them (e.g. both mdt and client for lov settings) */
3889         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3890
3891         /* The params are stored in MOUNT_DATA_FILE and modified via
3892            tunefs.lustre, or set using lctl conf_param */
3893
3894         /* Processed in lustre_start_mgc */
3895         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3896                 GOTO(end, rc);
3897
3898         /* Processed in ost/mdt */
3899         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3900                 GOTO(end, rc);
3901
3902         /* Processed in mgs_write_log_ost */
3903         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3904                 if (mti->mti_flags & LDD_F_PARAM) {
3905                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3906                                            "changed with tunefs.lustre"
3907                                            "and --writeconf\n", ptr);
3908                         rc = -EPERM;
3909                 }
3910                 GOTO(end, rc);
3911         }
3912
3913         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3914                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3915                 GOTO(end, rc);
3916         }
3917
3918         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3919                 /* Add a failover nidlist */
3920                 rc = 0;
3921                 /* We already processed failovers params for new
3922                    targets in mgs_write_log_target */
3923                 if (mti->mti_flags & LDD_F_PARAM) {
3924                         CDEBUG(D_MGS, "Adding failnode\n");
3925                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3926                 }
3927                 GOTO(end, rc);
3928         }
3929
3930         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3931                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3932                 GOTO(end, rc);
3933         }
3934
3935         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3936                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3937                 GOTO(end, rc);
3938         }
3939
3940         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3941             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3942                 /* active=0 means off, anything else means on */
3943                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3944                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3945                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3946                 int i;
3947
3948                 if (!deactive_osc) {
3949                         __u32   index;
3950
3951                         rc = server_name2index(mti->mti_svname, &index, NULL);
3952                         if (rc < 0)
3953                                 GOTO(end, rc);
3954
3955                         if (index == 0) {
3956                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3957                                                    " (de)activated.\n",
3958                                                    mti->mti_svname);
3959                                 GOTO(end, rc = -EPERM);
3960                         }
3961                 }
3962
3963                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3964                               flag ? "de" : "re", mti->mti_svname);
3965                 /* Modify clilov */
3966                 rc = name_create(&logname, mti->mti_fsname, "-client");
3967                 if (rc < 0)
3968                         GOTO(end, rc);
3969                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3970                                 mti->mti_svname,
3971                                 deactive_osc ? "add osc" : "add mdc", flag);
3972                 name_destroy(&logname);
3973                 if (rc < 0)
3974                         goto active_err;
3975
3976                 /* Modify mdtlov */
3977                 /* Add to all MDT logs for DNE */
3978                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3979                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3980                                 continue;
3981                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3982                         if (rc < 0)
3983                                 GOTO(end, rc);
3984                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3985                                         mti->mti_svname,
3986                                         deactive_osc ? "add osc" : "add osp",
3987                                         flag);
3988                         name_destroy(&logname);
3989                         if (rc < 0)
3990                                 goto active_err;
3991                 }
3992 active_err:
3993                 if (rc < 0) {
3994                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3995                                            "log (%d). No permanent "
3996                                            "changes were made to the "
3997                                            "config log.\n",
3998                                            mti->mti_svname, rc);
3999                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
4000                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
4001                                                    " because the log"
4002                                                    "is in the old 1.4"
4003                                                    "style. Consider "
4004                                                    " --writeconf to "
4005                                                    "update the logs.\n");
4006                         GOTO(end, rc);
4007                 }
4008                 /* Fall through to osc/mdc proc for deactivating live
4009                    OSC/OSP on running MDT / clients. */
4010         }
4011         /* Below here, let obd's XXX_process_config methods handle it */
4012
4013         /* All lov. in proc */
4014         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
4015                 char *mdtlovname;
4016
4017                 CDEBUG(D_MGS, "lov param %s\n", ptr);
4018                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
4019                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
4020                                            "set on the MDT, not %s. "
4021                                            "Ignoring.\n",
4022                                            mti->mti_svname);
4023                         GOTO(end, rc = 0);
4024                 }
4025
4026                 /* Modify mdtlov */
4027                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4028                         GOTO(end, rc = -ENODEV);
4029
4030                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
4031                                              mti->mti_stripe_index);
4032                 if (rc)
4033                         GOTO(end, rc);
4034                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4035                                   &mgi->mgi_bufs, mdtlovname, ptr);
4036                 name_destroy(&logname);
4037                 name_destroy(&mdtlovname);
4038                 if (rc)
4039                         GOTO(end, rc);
4040
4041                 /* Modify clilov */
4042                 rc = name_create(&logname, mti->mti_fsname, "-client");
4043                 if (rc)
4044                         GOTO(end, rc);
4045                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4046                                   fsdb->fsdb_clilov, ptr);
4047                 name_destroy(&logname);
4048                 GOTO(end, rc);
4049         }
4050
4051         /* All osc., mdc., llite. params in proc */
4052         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
4053             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
4054             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
4055                 char *cname;
4056
4057                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
4058                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
4059                                            " cannot be modified. Consider"
4060                                            " updating the configuration with"
4061                                            " --writeconf\n",
4062                                            mti->mti_svname);
4063                         GOTO(end, rc = -EINVAL);
4064                 }
4065                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
4066                         rc = name_create(&cname, mti->mti_fsname, "-client");
4067                         /* Add the client type to match the obdname in
4068                            class_config_llog_handler */
4069                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4070                         rc = name_create(&cname, mti->mti_svname, "-mdc");
4071                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4072                         rc = name_create(&cname, mti->mti_svname, "-osc");
4073                 } else {
4074                         GOTO(end, rc = -EINVAL);
4075                 }
4076                 if (rc)
4077                         GOTO(end, rc);
4078
4079                 /* Forbid direct update of llite root squash parameters.
4080                  * These parameters are indirectly set via the MDT settings.
4081                  * See (LU-1778) */
4082                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
4083                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4084                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4085                         LCONSOLE_ERROR("%s: root squash parameters can only "
4086                                 "be updated through MDT component\n",
4087                                 mti->mti_fsname);
4088                         name_destroy(&cname);
4089                         GOTO(end, rc = -EINVAL);
4090                 }
4091
4092                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4093
4094                 /* Modify client */
4095                 rc = name_create(&logname, mti->mti_fsname, "-client");
4096                 if (rc) {
4097                         name_destroy(&cname);
4098                         GOTO(end, rc);
4099                 }
4100                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4101                                   cname, ptr);
4102
4103                 /* osc params affect the MDT as well */
4104                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
4105                         int i;
4106
4107                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
4108                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4109                                         continue;
4110                                 name_destroy(&cname);
4111                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
4112                                                          fsdb, i);
4113                                 name_destroy(&logname);
4114                                 if (rc)
4115                                         break;
4116                                 rc = name_create_mdt(&logname,
4117                                                      mti->mti_fsname, i);
4118                                 if (rc)
4119                                         break;
4120                                 if (!mgs_log_is_empty(env, mgs, logname)) {
4121                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
4122                                                           mti, logname,
4123                                                           &mgi->mgi_bufs,
4124                                                           cname, ptr);
4125                                         if (rc)
4126                                                 break;
4127                                 }
4128                         }
4129                 }
4130
4131                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
4132                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
4133                     rc == 0) {
4134                         char suffix[16];
4135                         char *lodname = NULL;
4136                         char *param_str = NULL;
4137                         int i;
4138                         int index;
4139
4140                         /* replace mdc with osp */
4141                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
4142                         rc = server_name2index(mti->mti_svname, &index, NULL);
4143                         if (rc < 0) {
4144                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4145                                 GOTO(end, rc);
4146                         }
4147
4148                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4149                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4150                                         continue;
4151
4152                                 if (i == index)
4153                                         continue;
4154
4155                                 name_destroy(&logname);
4156                                 rc = name_create_mdt(&logname, mti->mti_fsname,
4157                                                      i);
4158                                 if (rc < 0)
4159                                         break;
4160
4161                                 if (mgs_log_is_empty(env, mgs, logname))
4162                                         continue;
4163
4164                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
4165                                          i);
4166                                 name_destroy(&cname);
4167                                 rc = name_create(&cname, mti->mti_svname,
4168                                                  suffix);
4169                                 if (rc < 0)
4170                                         break;
4171
4172                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4173                                                   &mgi->mgi_bufs, cname, ptr);
4174                                 if (rc < 0)
4175                                         break;
4176
4177                                 /* Add configuration log for noitfying LOD
4178                                  * to active/deactive the OSP. */
4179                                 name_destroy(&param_str);
4180                                 rc = name_create(&param_str, cname,
4181                                                  (*tmp == '0') ?  ".active=0" :
4182                                                  ".active=1");
4183                                 if (rc < 0)
4184                                         break;
4185
4186                                 name_destroy(&lodname);
4187                                 rc = name_create(&lodname, logname, "-mdtlov");
4188                                 if (rc < 0)
4189                                         break;
4190
4191                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4192                                                   &mgi->mgi_bufs, lodname,
4193                                                   param_str);
4194                                 if (rc < 0)
4195                                         break;
4196                         }
4197                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4198                         name_destroy(&lodname);
4199                         name_destroy(&param_str);
4200                 }
4201
4202                 name_destroy(&logname);
4203                 name_destroy(&cname);
4204                 GOTO(end, rc);
4205         }
4206
4207         /* All mdt. params in proc */
4208         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
4209                 int i;
4210                 __u32 idx;
4211
4212                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4213                 if (strncmp(mti->mti_svname, mti->mti_fsname,
4214                             MTI_NAME_MAXLEN) == 0)
4215                         /* device is unspecified completely? */
4216                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
4217                 else
4218                         rc = server_name2index(mti->mti_svname, &idx, NULL);
4219                 if (rc < 0)
4220                         goto active_err;
4221                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
4222                         goto active_err;
4223                 if (rc & LDD_F_SV_ALL) {
4224                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4225                                 if (!test_bit(i,
4226                                                   fsdb->fsdb_mdt_index_map))
4227                                         continue;
4228                                 rc = name_create_mdt(&logname,
4229                                                 mti->mti_fsname, i);
4230                                 if (rc)
4231                                         goto active_err;
4232                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4233                                                   logname, &mgi->mgi_bufs,
4234                                                   logname, ptr);
4235                                 name_destroy(&logname);
4236                                 if (rc)
4237                                         goto active_err;
4238                         }
4239                 } else {
4240                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
4241                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
4242                                 LCONSOLE_ERROR("%s: root squash parameters "
4243                                         "cannot be applied to a single MDT\n",
4244                                         mti->mti_fsname);
4245                                 GOTO(end, rc = -EINVAL);
4246                         }
4247                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4248                                           mti->mti_svname, &mgi->mgi_bufs,
4249                                           mti->mti_svname, ptr);
4250                         if (rc)
4251                                 goto active_err;
4252                 }
4253
4254                 /* root squash settings are also applied to llite
4255                  * config log (see LU-1778) */
4256                 if (rc == 0 &&
4257                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4258                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4259                         char *cname;
4260                         char *ptr2;
4261
4262                         rc = name_create(&cname, mti->mti_fsname, "-client");
4263                         if (rc)
4264                                 GOTO(end, rc);
4265                         rc = name_create(&logname, mti->mti_fsname, "-client");
4266                         if (rc) {
4267                                 name_destroy(&cname);
4268                                 GOTO(end, rc);
4269                         }
4270                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
4271                         if (rc) {
4272                                 name_destroy(&cname);
4273                                 name_destroy(&logname);
4274                                 GOTO(end, rc);
4275                         }
4276                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4277                                           &mgi->mgi_bufs, cname, ptr2);
4278                         name_destroy(&ptr2);
4279                         name_destroy(&logname);
4280                         name_destroy(&cname);
4281                 }
4282                 GOTO(end, rc);
4283         }
4284
4285         /* All mdd., ost. and osd. params in proc */
4286         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
4287             (class_match_param(ptr, PARAM_LOD, NULL) == 0) ||
4288             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
4289             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
4290                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4291                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4292                         GOTO(end, rc = -ENODEV);
4293
4294                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4295                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
4296                 GOTO(end, rc);
4297         }
4298
4299         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
4300
4301 end:
4302         if (rc)
4303                 CERROR("err %d on param '%s'\n", rc, ptr);
4304
4305         RETURN(rc);
4306 }
4307
4308 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
4309                          struct mgs_target_info *mti, struct fs_db *fsdb)
4310 {
4311         char    *buf, *params;
4312         int      rc = -EINVAL;
4313
4314         ENTRY;
4315
4316         /* set/check the new target index */
4317         rc = mgs_set_index(env, mgs, mti);
4318         if (rc < 0)
4319                 RETURN(rc);
4320
4321         if (rc == EALREADY) {
4322                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
4323                               mti->mti_stripe_index, mti->mti_svname);
4324                 /* We would like to mark old log sections as invalid
4325                    and add new log sections in the client and mdt logs.
4326                    But if we add new sections, then live clients will
4327                    get repeat setup instructions for already running
4328                    osc's. So don't update the client/mdt logs. */
4329                 mti->mti_flags &= ~LDD_F_UPDATE;
4330                 rc = 0;
4331         }
4332
4333         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
4334                          cfs_fail_val : 10);
4335
4336         mutex_lock(&fsdb->fsdb_mutex);
4337
4338         if (mti->mti_flags & (LDD_F_VIRGIN | LDD_F_WRITECONF)) {
4339                 /* Generate a log from scratch */
4340                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4341                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
4342                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4343                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
4344                 } else {
4345                         CERROR("Unknown target type %#x, can't create log for %s\n",
4346                                mti->mti_flags, mti->mti_svname);
4347                 }
4348                 if (rc) {
4349                         CERROR("Can't write logs for %s (%d)\n",
4350                                mti->mti_svname, rc);
4351                         GOTO(out_up, rc);
4352                 }
4353         } else {
4354                 /* Just update the params from tunefs in mgs_write_log_params */
4355                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
4356                 mti->mti_flags |= LDD_F_PARAM;
4357         }
4358
4359         /* allocate temporary buffer, where class_get_next_param will
4360            make copy of a current  parameter */
4361         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
4362         if (buf == NULL)
4363                 GOTO(out_up, rc = -ENOMEM);
4364         params = mti->mti_params;
4365         while (params != NULL) {
4366                 rc = class_get_next_param(&params, buf);
4367                 if (rc) {
4368                         if (rc == 1)
4369                                 /* there is no next parameter, that is
4370                                    not an error */
4371                                 rc = 0;
4372                         break;
4373                 }
4374                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
4375                        params, buf);
4376                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
4377                 if (rc)
4378                         break;
4379         }
4380
4381         OBD_FREE(buf, strlen(mti->mti_params) + 1);
4382
4383 out_up:
4384         mutex_unlock(&fsdb->fsdb_mutex);
4385         RETURN(rc);
4386 }
4387
4388 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
4389 {
4390         struct llog_ctxt        *ctxt;
4391         int                      rc = 0;
4392
4393         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4394         if (ctxt == NULL) {
4395                 CERROR("%s: MGS config context doesn't exist\n",
4396                        mgs->mgs_obd->obd_name);
4397                 rc = -ENODEV;
4398         } else {
4399                 rc = llog_erase(env, ctxt, NULL, name);
4400                 /* llog may not exist */
4401                 if (rc == -ENOENT)
4402                         rc = 0;
4403                 llog_ctxt_put(ctxt);
4404         }
4405
4406         if (rc)
4407                 CERROR("%s: failed to clear log %s: %d\n",
4408                        mgs->mgs_obd->obd_name, name, rc);
4409
4410         return rc;
4411 }
4412
4413 /* erase all logs for the given fs */
4414 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs,
4415                    const char *fsname)
4416 {
4417         struct list_head log_list;
4418         struct mgs_direntry *dirent, *n;
4419         char barrier_name[20] = {};
4420         char *suffix;
4421         int count = 0;
4422         int rc, len = strlen(fsname);
4423         ENTRY;
4424
4425         mutex_lock(&mgs->mgs_mutex);
4426
4427         /* Find all the logs in the CONFIGS directory */
4428         rc = class_dentry_readdir(env, mgs, &log_list);
4429         if (rc) {
4430                 mutex_unlock(&mgs->mgs_mutex);
4431                 RETURN(rc);
4432         }
4433
4434         if (list_empty(&log_list)) {
4435                 mutex_unlock(&mgs->mgs_mutex);
4436                 RETURN(-ENOENT);
4437         }
4438
4439         snprintf(barrier_name, sizeof(barrier_name) - 1, "%s-%s",
4440                  fsname, BARRIER_FILENAME);
4441         /* Delete the barrier fsdb */
4442         mgs_remove_fsdb_by_name(mgs, barrier_name);
4443         /* Delete the fs db */
4444         mgs_remove_fsdb_by_name(mgs, fsname);
4445         mutex_unlock(&mgs->mgs_mutex);
4446
4447         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4448                 list_del_init(&dirent->mde_list);
4449                 suffix = strrchr(dirent->mde_name, '-');
4450                 if (suffix != NULL) {
4451                         if ((len == suffix - dirent->mde_name) &&
4452                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
4453                                 CDEBUG(D_MGS, "Removing log %s\n",
4454                                        dirent->mde_name);
4455                                 mgs_erase_log(env, mgs, dirent->mde_name);
4456                                 count++;
4457                         }
4458                 }
4459                 mgs_direntry_free(dirent);
4460         }
4461
4462         if (count == 0)
4463                 rc = -ENOENT;
4464
4465         RETURN(rc);
4466 }
4467
4468 /* list all logs for the given fs */
4469 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
4470                   struct obd_ioctl_data *data)
4471 {
4472         struct list_head         log_list;
4473         struct mgs_direntry     *dirent, *n;
4474         char                    *out, *suffix;
4475         int                      l, remains, rc;
4476
4477         ENTRY;
4478
4479         /* Find all the logs in the CONFIGS directory */
4480         rc = class_dentry_readdir(env, mgs, &log_list);
4481         if (rc)
4482                 RETURN(rc);
4483
4484         out = data->ioc_bulk;
4485         remains = data->ioc_inllen1;
4486         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4487                 list_del_init(&dirent->mde_list);
4488                 suffix = strrchr(dirent->mde_name, '-');
4489                 if (suffix != NULL) {
4490                         l = snprintf(out, remains, "config_log: %s\n",
4491                                      dirent->mde_name);
4492                         out += l;
4493                         remains -= l;
4494                 }
4495                 mgs_direntry_free(dirent);
4496                 if (remains <= 0)
4497                         break;
4498         }
4499         RETURN(rc);
4500 }
4501
4502 struct mgs_lcfg_fork_data {
4503         struct lustre_cfg_bufs   mlfd_bufs;
4504         struct mgs_device       *mlfd_mgs;
4505         struct llog_handle      *mlfd_llh;
4506         const char              *mlfd_oldname;
4507         const char              *mlfd_newname;
4508         char                     mlfd_data[0];
4509 };
4510
4511 static bool contain_valid_fsname(char *buf, const char *fsname,
4512                                  int buflen, int namelen)
4513 {
4514         if (buflen < namelen)
4515                 return false;
4516
4517         if (memcmp(buf, fsname, namelen) != 0)
4518                 return false;
4519
4520         if (buf[namelen] != '\0' && buf[namelen] != '-')
4521                 return false;
4522
4523         return true;
4524 }
4525
4526 static int mgs_lcfg_fork_handler(const struct lu_env *env,
4527                                  struct llog_handle *o_llh,
4528                                  struct llog_rec_hdr *o_rec, void *data)
4529 {
4530         struct mgs_lcfg_fork_data *mlfd = data;
4531         struct lustre_cfg_bufs *n_bufs = &mlfd->mlfd_bufs;
4532         struct lustre_cfg *o_lcfg = (struct lustre_cfg *)(o_rec + 1);
4533         struct llog_cfg_rec *lcr;
4534         char *o_buf;
4535         char *n_buf = mlfd->mlfd_data;
4536         int o_buflen;
4537         int o_namelen = strlen(mlfd->mlfd_oldname);
4538         int n_namelen = strlen(mlfd->mlfd_newname);
4539         int diff = n_namelen - o_namelen;
4540         __u32 cmd = o_lcfg->lcfg_command;
4541         __u32 cnt = o_lcfg->lcfg_bufcount;
4542         int rc;
4543         int i;
4544         ENTRY;
4545
4546         /* buf[0] */
4547         o_buf = lustre_cfg_buf(o_lcfg, 0);
4548         o_buflen = o_lcfg->lcfg_buflens[0];
4549         if (contain_valid_fsname(o_buf, mlfd->mlfd_oldname, o_buflen,
4550                                  o_namelen)) {
4551                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4552                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4553                        o_buflen - o_namelen);
4554                 lustre_cfg_bufs_reset(n_bufs, n_buf);
4555                 n_buf += cfs_size_round(o_buflen + diff);
4556         } else {
4557                 lustre_cfg_bufs_reset(n_bufs, o_buflen != 0 ? o_buf : NULL);
4558         }
4559
4560         switch (cmd) {
4561         case LCFG_MARKER: {
4562                 struct cfg_marker *o_marker;
4563                 struct cfg_marker *n_marker;
4564                 int tgt_namelen;
4565
4566                 if (cnt != 2) {
4567                         CDEBUG(D_MGS, "Unknown cfg marker entry with %d "
4568                                "buffers\n", cnt);
4569                         RETURN(-EINVAL);
4570                 }
4571
4572                 /* buf[1] is marker */
4573                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4574                 o_buflen = o_lcfg->lcfg_buflens[1];
4575                 o_marker = (struct cfg_marker *)o_buf;
4576                 if (!contain_valid_fsname(o_marker->cm_tgtname,
4577                                           mlfd->mlfd_oldname,
4578                                           sizeof(o_marker->cm_tgtname),
4579                                           o_namelen)) {
4580                         lustre_cfg_bufs_set(n_bufs, 1, o_marker,
4581                                             sizeof(*o_marker));
4582                         break;
4583                 }
4584
4585                 n_marker = (struct cfg_marker *)n_buf;
4586                 *n_marker = *o_marker;
4587                 memcpy(n_marker->cm_tgtname, mlfd->mlfd_newname, n_namelen);
4588                 tgt_namelen = strlen(o_marker->cm_tgtname);
4589                 if (tgt_namelen > o_namelen)
4590                         memcpy(n_marker->cm_tgtname + n_namelen,
4591                                o_marker->cm_tgtname + o_namelen,
4592                                tgt_namelen - o_namelen);
4593                 n_marker->cm_tgtname[tgt_namelen + diff] = '\0';
4594                 lustre_cfg_bufs_set(n_bufs, 1, n_marker, sizeof(*n_marker));
4595                 break;
4596         }
4597         case LCFG_PARAM:
4598         case LCFG_SET_PARAM: {
4599                 for (i = 1; i < cnt; i++)
4600                         /* buf[i] is the param value, reuse it directly */
4601                         lustre_cfg_bufs_set(n_bufs, i,
4602                                             lustre_cfg_buf(o_lcfg, i),
4603                                             o_lcfg->lcfg_buflens[i]);
4604                 break;
4605         }
4606         case LCFG_POOL_NEW:
4607         case LCFG_POOL_ADD:
4608         case LCFG_POOL_REM:
4609         case LCFG_POOL_DEL: {
4610                 if (cnt < 3 || cnt > 4) {
4611                         CDEBUG(D_MGS, "Unknown cfg pool (%x) entry with %d "
4612                                "buffers\n", cmd, cnt);
4613                         RETURN(-EINVAL);
4614                 }
4615
4616                 /* buf[1] is fsname */
4617                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4618                 o_buflen = o_lcfg->lcfg_buflens[1];
4619                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4620                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4621                        o_buflen - o_namelen);
4622                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen + diff);
4623                 n_buf += cfs_size_round(o_buflen + diff);
4624
4625                 /* buf[2] is the pool name, reuse it directly */
4626                 lustre_cfg_bufs_set(n_bufs, 2, lustre_cfg_buf(o_lcfg, 2),
4627                                     o_lcfg->lcfg_buflens[2]);
4628
4629                 if (cnt == 3)
4630                         break;
4631
4632                 /* buf[3] is ostname */
4633                 o_buf = lustre_cfg_buf(o_lcfg, 3);
4634                 o_buflen = o_lcfg->lcfg_buflens[3];
4635                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4636                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4637                        o_buflen - o_namelen);
4638                 lustre_cfg_bufs_set(n_bufs, 3, n_buf, o_buflen + diff);
4639                 break;
4640         }
4641         case LCFG_SETUP: {
4642                 if (cnt == 2) {
4643                         o_buflen = o_lcfg->lcfg_buflens[1];
4644                         if (o_buflen == sizeof(struct lov_desc) ||
4645                             o_buflen == sizeof(struct lmv_desc)) {
4646                                 char *o_uuid;
4647                                 char *n_uuid;
4648                                 int uuid_len;
4649
4650                                 /* buf[1] */
4651                                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4652                                 if (o_buflen == sizeof(struct lov_desc)) {
4653                                         struct lov_desc *o_desc =
4654                                                 (struct lov_desc *)o_buf;
4655                                         struct lov_desc *n_desc =
4656                                                 (struct lov_desc *)n_buf;
4657
4658                                         *n_desc = *o_desc;
4659                                         o_uuid = o_desc->ld_uuid.uuid;
4660                                         n_uuid = n_desc->ld_uuid.uuid;
4661                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4662                                 } else {
4663                                         struct lmv_desc *o_desc =
4664                                                 (struct lmv_desc *)o_buf;
4665                                         struct lmv_desc *n_desc =
4666                                                 (struct lmv_desc *)n_buf;
4667
4668                                         *n_desc = *o_desc;
4669                                         o_uuid = o_desc->ld_uuid.uuid;
4670                                         n_uuid = n_desc->ld_uuid.uuid;
4671                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4672                                 }
4673
4674                                 if (unlikely(!contain_valid_fsname(o_uuid,
4675                                                 mlfd->mlfd_oldname, uuid_len,
4676                                                 o_namelen))) {
4677                                         lustre_cfg_bufs_set(n_bufs, 1, o_buf,
4678                                                             o_buflen);
4679                                         break;
4680                                 }
4681
4682                                 memcpy(n_uuid, mlfd->mlfd_newname, n_namelen);
4683                                 uuid_len = strlen(o_uuid);
4684                                 if (uuid_len > o_namelen)
4685                                         memcpy(n_uuid + n_namelen,
4686                                                o_uuid + o_namelen,
4687                                                uuid_len - o_namelen);
4688                                 n_uuid[uuid_len + diff] = '\0';
4689                                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen);
4690                                 break;
4691                         } /* else case fall through */
4692                 } /* else case fall through */
4693         }
4694         default: {
4695                 for (i = 1; i < cnt; i++) {
4696                         o_buflen = o_lcfg->lcfg_buflens[i];
4697                         if (o_buflen == 0)
4698                                 continue;
4699
4700                         o_buf = lustre_cfg_buf(o_lcfg, i);
4701                         if (!contain_valid_fsname(o_buf, mlfd->mlfd_oldname,
4702                                                   o_buflen, o_namelen)) {
4703                                 lustre_cfg_bufs_set(n_bufs, i, o_buf, o_buflen);
4704                                 continue;
4705                         }
4706
4707                         memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4708                         if (o_buflen == o_namelen) {
4709                                 lustre_cfg_bufs_set(n_bufs, i, n_buf,
4710                                                     n_namelen);
4711                                 n_buf += cfs_size_round(n_namelen);
4712                                 continue;
4713                         }
4714
4715                         memcpy(n_buf + n_namelen, o_buf + o_namelen,
4716                                o_buflen - o_namelen);
4717                         lustre_cfg_bufs_set(n_bufs, i, n_buf, o_buflen + diff);
4718                         n_buf += cfs_size_round(o_buflen + diff);
4719                 }
4720                 break;
4721         }
4722         }
4723
4724         lcr = lustre_cfg_rec_new(cmd, n_bufs);
4725         if (!lcr)
4726                 RETURN(-ENOMEM);
4727
4728         lcr->lcr_cfg = *o_lcfg;
4729         rc = llog_write(env, mlfd->mlfd_llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
4730         lustre_cfg_rec_free(lcr);
4731
4732         RETURN(rc);
4733 }
4734
4735 static int mgs_lcfg_fork_one(const struct lu_env *env, struct mgs_device *mgs,
4736                              struct mgs_direntry *mde, const char *oldname,
4737                              const char *newname)
4738 {
4739         struct llog_handle *old_llh = NULL;
4740         struct llog_handle *new_llh = NULL;
4741         struct llog_ctxt *ctxt = NULL;
4742         struct mgs_lcfg_fork_data *mlfd = NULL;
4743         char *name_buf = NULL;
4744         int name_buflen;
4745         int old_namelen = strlen(oldname);
4746         int new_namelen = strlen(newname);
4747         int rc;
4748         ENTRY;
4749
4750         name_buflen = mde->mde_len + new_namelen - old_namelen;
4751         OBD_ALLOC(name_buf, name_buflen);
4752         if (!name_buf)
4753                 RETURN(-ENOMEM);
4754
4755         memcpy(name_buf, newname, new_namelen);
4756         memcpy(name_buf + new_namelen, mde->mde_name + old_namelen,
4757                mde->mde_len - old_namelen);
4758
4759         CDEBUG(D_MGS, "Fork the config-log from %s to %s\n",
4760                mde->mde_name, name_buf);
4761
4762         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4763         LASSERT(ctxt);
4764
4765         rc = llog_open_create(env, ctxt, &new_llh, NULL, name_buf);
4766         if (rc)
4767                 GOTO(out, rc);
4768
4769         rc = llog_init_handle(env, new_llh, LLOG_F_IS_PLAIN, NULL);
4770         if (rc)
4771                 GOTO(out, rc);
4772
4773         if (unlikely(mgs_log_is_empty(env, mgs, mde->mde_name)))
4774                 GOTO(out, rc = 0);
4775
4776         rc = llog_open(env, ctxt, &old_llh, NULL, mde->mde_name,
4777                        LLOG_OPEN_EXISTS);
4778         if (rc)
4779                 GOTO(out, rc);
4780
4781         rc = llog_init_handle(env, old_llh, LLOG_F_IS_PLAIN, NULL);
4782         if (rc)
4783                 GOTO(out, rc);
4784
4785         new_llh->lgh_hdr->llh_tgtuuid = old_llh->lgh_hdr->llh_tgtuuid;
4786
4787         OBD_ALLOC(mlfd, LLOG_MIN_CHUNK_SIZE);
4788         if (!mlfd)
4789                 GOTO(out, rc = -ENOMEM);
4790
4791         mlfd->mlfd_mgs = mgs;
4792         mlfd->mlfd_llh = new_llh;
4793         mlfd->mlfd_oldname = oldname;
4794         mlfd->mlfd_newname = newname;
4795
4796         rc = llog_process(env, old_llh, mgs_lcfg_fork_handler, mlfd, NULL);
4797         OBD_FREE(mlfd, LLOG_MIN_CHUNK_SIZE);
4798
4799         GOTO(out, rc);
4800
4801 out:
4802         if (old_llh)
4803                 llog_close(env, old_llh);
4804         if (new_llh)
4805                 llog_close(env, new_llh);
4806         if (name_buf)
4807                 OBD_FREE(name_buf, name_buflen);
4808         if (ctxt)
4809                 llog_ctxt_put(ctxt);
4810
4811         return rc;
4812 }
4813
4814 int mgs_lcfg_fork(const struct lu_env *env, struct mgs_device *mgs,
4815                   const char *oldname, const char *newname)
4816 {
4817         struct list_head log_list;
4818         struct mgs_direntry *dirent, *n;
4819         int olen = strlen(oldname);
4820         int nlen = strlen(newname);
4821         int count = 0;
4822         int rc = 0;
4823         ENTRY;
4824
4825         if (unlikely(!oldname || oldname[0] == '\0' ||
4826                      !newname || newname[0] == '\0'))
4827                 RETURN(-EINVAL);
4828
4829         if (strcmp(oldname, newname) == 0)
4830                 RETURN(-EINVAL);
4831
4832         /* lock it to prevent fork/erase/register in parallel. */
4833         mutex_lock(&mgs->mgs_mutex);
4834
4835         rc = class_dentry_readdir(env, mgs, &log_list);
4836         if (rc) {
4837                 mutex_unlock(&mgs->mgs_mutex);
4838                 RETURN(rc);
4839         }
4840
4841         if (list_empty(&log_list)) {
4842                 mutex_unlock(&mgs->mgs_mutex);
4843                 RETURN(-ENOENT);
4844         }
4845
4846         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4847                 char *ptr;
4848
4849                 ptr = strrchr(dirent->mde_name, '-');
4850                 if (ptr) {
4851                         int tlen = ptr - dirent->mde_name;
4852
4853                         if (tlen == nlen &&
4854                             strncmp(newname, dirent->mde_name, tlen) == 0)
4855                                 GOTO(out, rc = -EEXIST);
4856
4857                         if (tlen == olen &&
4858                             strncmp(oldname, dirent->mde_name, tlen) == 0)
4859                                 continue;
4860                 }
4861
4862                 list_del_init(&dirent->mde_list);
4863                 mgs_direntry_free(dirent);
4864         }
4865
4866         if (list_empty(&log_list)) {
4867                 mutex_unlock(&mgs->mgs_mutex);
4868                 RETURN(-ENOENT);
4869         }
4870
4871         list_for_each_entry(dirent, &log_list, mde_list) {
4872                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, newname);
4873                 if (rc)
4874                         break;
4875
4876                 count++;
4877         }
4878
4879 out:
4880         mutex_unlock(&mgs->mgs_mutex);
4881
4882         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4883                 list_del_init(&dirent->mde_list);
4884                 mgs_direntry_free(dirent);
4885         }
4886
4887         if (rc && count > 0)
4888                 mgs_erase_logs(env, mgs, newname);
4889
4890         RETURN(rc);
4891 }
4892
4893 int mgs_lcfg_erase(const struct lu_env *env, struct mgs_device *mgs,
4894                    const char *fsname)
4895 {
4896         int rc;
4897         ENTRY;
4898
4899         if (unlikely(!fsname || fsname[0] == '\0'))
4900                 RETURN(-EINVAL);
4901
4902         rc = mgs_erase_logs(env, mgs, fsname);
4903
4904         RETURN(rc);
4905 }
4906
4907 static int mgs_xattr_del(const struct lu_env *env, struct dt_object *obj)
4908 {
4909         struct dt_device *dev;
4910         struct thandle *th = NULL;
4911         int rc = 0;
4912
4913         ENTRY;
4914
4915         dev = container_of0(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
4916         th = dt_trans_create(env, dev);
4917         if (IS_ERR(th))
4918                 RETURN(PTR_ERR(th));
4919
4920         rc = dt_declare_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4921         if (rc)
4922                 GOTO(stop, rc);
4923
4924         rc = dt_trans_start_local(env, dev, th);
4925         if (rc)
4926                 GOTO(stop, rc);
4927
4928         dt_write_lock(env, obj, 0);
4929         rc = dt_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4930
4931         GOTO(unlock, rc);
4932
4933 unlock:
4934         dt_write_unlock(env, obj);
4935
4936 stop:
4937         dt_trans_stop(env, dev, th);
4938
4939         return rc;
4940 }
4941
4942 int mgs_lcfg_rename(const struct lu_env *env, struct mgs_device *mgs)
4943 {
4944         struct list_head log_list;
4945         struct mgs_direntry *dirent, *n;
4946         char fsname[16];
4947         struct lu_buf buf = {
4948                 .lb_buf = fsname,
4949                 .lb_len = sizeof(fsname)
4950         };
4951         int rc = 0;
4952
4953         ENTRY;
4954
4955         rc = class_dentry_readdir(env, mgs, &log_list);
4956         if (rc)
4957                 RETURN(rc);
4958
4959         if (list_empty(&log_list))
4960                 RETURN(0);
4961
4962         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4963                 struct dt_object *o = NULL;
4964                 char oldname[16];
4965                 char *ptr;
4966                 int len;
4967
4968                 list_del_init(&dirent->mde_list);
4969                 ptr = strrchr(dirent->mde_name, '-');
4970                 if (!ptr)
4971                         goto next;
4972
4973                 len = ptr - dirent->mde_name;
4974                 if (unlikely(len >= sizeof(oldname))) {
4975                         CDEBUG(D_MGS, "Skip invalid configuration file %s\n",
4976                                dirent->mde_name);
4977                         goto next;
4978                 }
4979
4980                 o = local_file_find(env, mgs->mgs_los, mgs->mgs_configs_dir,
4981                                     dirent->mde_name);
4982                 if (IS_ERR(o)) {
4983                         rc = PTR_ERR(o);
4984                         CDEBUG(D_MGS, "Fail to locate file %s: rc = %d\n",
4985                                dirent->mde_name, rc);
4986                         goto next;
4987                 }
4988
4989                 rc = dt_xattr_get(env, o, &buf, XATTR_TARGET_RENAME);
4990                 if (rc < 0) {
4991                         if (rc == -ENODATA)
4992                                 rc = 0;
4993                         else
4994                                 CDEBUG(D_MGS,
4995                                        "Fail to get EA for %s: rc = %d\n",
4996                                        dirent->mde_name, rc);
4997                         goto next;
4998                 }
4999
5000                 if (unlikely(rc == len &&
5001                              memcmp(fsname, dirent->mde_name, len) == 0)) {
5002                         /* The new fsname is the same as the old one. */
5003                         rc = mgs_xattr_del(env, o);
5004                         goto next;
5005                 }
5006
5007                 memcpy(oldname, dirent->mde_name, len);
5008                 oldname[len] = '\0';
5009                 fsname[rc] = '\0';
5010                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, fsname);
5011                 if (rc && rc != -EEXIST) {
5012                         CDEBUG(D_MGS, "Fail to fork %s: rc = %d\n",
5013                                dirent->mde_name, rc);
5014                         goto next;
5015                 }
5016
5017                 rc = mgs_erase_log(env, mgs, dirent->mde_name);
5018                 if (rc) {
5019                         CDEBUG(D_MGS, "Fail to erase old %s: rc = %d\n",
5020                                dirent->mde_name, rc);
5021                         /* keep it there if failed to remove it. */
5022                         rc = 0;
5023                 }
5024
5025 next:
5026                 if (o && !IS_ERR(o))
5027                         lu_object_put(env, &o->do_lu);
5028
5029                 mgs_direntry_free(dirent);
5030                 if (rc)
5031                         break;
5032         }
5033
5034         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
5035                 list_del_init(&dirent->mde_list);
5036                 mgs_direntry_free(dirent);
5037         }
5038
5039         RETURN(rc);
5040 }
5041
5042 /* Setup _mgs fsdb and log
5043  */
5044 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5045 {
5046         struct fs_db *fsdb = NULL;
5047         int rc;
5048         ENTRY;
5049
5050         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
5051         if (!rc)
5052                 mgs_put_fsdb(mgs, fsdb);
5053
5054         RETURN(rc);
5055 }
5056
5057 /* Setup params fsdb and log
5058  */
5059 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5060 {
5061         struct fs_db *fsdb = NULL;
5062         struct llog_handle *params_llh = NULL;
5063         int rc;
5064         ENTRY;
5065
5066         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5067         if (!rc) {
5068                 mutex_lock(&fsdb->fsdb_mutex);
5069                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
5070                 if (!rc)
5071                         rc = record_end_log(env, &params_llh);
5072                 mutex_unlock(&fsdb->fsdb_mutex);
5073                 mgs_put_fsdb(mgs, fsdb);
5074         }
5075
5076         RETURN(rc);
5077 }
5078
5079 /* Cleanup params fsdb and log
5080  */
5081 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
5082 {
5083         int rc;
5084
5085         rc = mgs_erase_logs(env, mgs, PARAMS_FILENAME);
5086         return rc == -ENOENT ? 0 : rc;
5087 }
5088
5089 /**
5090  * Fill in the mgs_target_info based on data devname and param provide.
5091  *
5092  * @env         thread context
5093  * @mgs         mgs device
5094  * @mti         mgs target info. We want to set this based other paramters
5095  *              passed to this function. Once setup we write it to the config
5096  *              logs.
5097  * @devname     optional OBD device name
5098  * @param       string that contains both what tunable to set and the value to
5099  *              set it to.
5100  *
5101  * RETURN       0 for success
5102  *              negative error number on failure
5103  **/
5104 static int mgs_set_conf_param(const struct lu_env *env, struct mgs_device *mgs,
5105                               struct mgs_target_info *mti, const char *devname,
5106                               const char *param)
5107 {
5108         struct fs_db *fsdb = NULL;
5109         int dev_type;
5110         int rc = 0;
5111
5112         ENTRY;
5113         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
5114         if (!devname) {
5115                 size_t len;
5116
5117                 /* We have two possible cases here:
5118                  *
5119                  * 1) the device name embedded in the param:
5120                  *    lustre-OST0000.osc.max_dirty_mb=32
5121                  *
5122                  * 2) the file system name is embedded in
5123                  *    the param: lustre.sys.at.min=0
5124                  */
5125                 len = strcspn(param, ".=");
5126                 if (!len || param[len] == '=')
5127                         RETURN(-EINVAL);
5128
5129                 if (len >= sizeof(mti->mti_svname))
5130                         RETURN(-E2BIG);
5131
5132                 snprintf(mti->mti_svname, sizeof(mti->mti_svname),
5133                          "%.*s", (int)len, param);
5134                 param += len + 1;
5135         } else {
5136                 if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname)) >=
5137                     sizeof(mti->mti_svname))
5138                         RETURN(-E2BIG);
5139         }
5140
5141         if (!strlen(mti->mti_svname)) {
5142                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
5143                 RETURN(-ENOSYS);
5144         }
5145
5146         dev_type = mgs_parse_devname(mti->mti_svname, mti->mti_fsname,
5147                                      &mti->mti_stripe_index);
5148         switch (dev_type) {
5149         /* For this case we have an invalid obd device name */
5150         case -ENXIO:
5151                 CDEBUG(D_MGS, "%s don't contain an index\n", mti->mti_svname);
5152                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5153                 dev_type = 0;
5154                 break;
5155         /* Not an obd device, assume devname is the fsname.
5156          * User might of only provided fsname and not obd device
5157          */
5158         case -EINVAL:
5159                 CDEBUG(D_MGS, "%s is seen as a file system name\n", mti->mti_svname);
5160                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5161                 dev_type = 0;
5162                 break;
5163         default:
5164                 if (dev_type < 0)
5165                         GOTO(out, rc = dev_type);
5166
5167                 /* param related to llite isn't allowed to set by OST or MDT */
5168                 if (dev_type & LDD_F_SV_TYPE_OST ||
5169                     dev_type & LDD_F_SV_TYPE_MDT) {
5170                         /* param related to llite isn't allowed to set by OST
5171                          * or MDT
5172                          */
5173                         if (!strncmp(param, PARAM_LLITE,
5174                                      sizeof(PARAM_LLITE) - 1))
5175                                 GOTO(out, rc = -EINVAL);
5176
5177                         /* Strip -osc or -mdc suffix from svname */
5178                         if (server_make_name(dev_type, mti->mti_stripe_index,
5179                                              mti->mti_fsname, mti->mti_svname,
5180                                              sizeof(mti->mti_svname)))
5181                                 GOTO(out, rc = -EINVAL);
5182                 }
5183                 break;
5184         }
5185
5186         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
5187             sizeof(mti->mti_params))
5188                 GOTO(out, rc = -E2BIG);
5189
5190         CDEBUG(D_MGS, "set_conf_param fs='%s' device='%s' param='%s'\n",
5191                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5192
5193         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
5194         if (rc)
5195                 GOTO(out, rc);
5196
5197         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
5198             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5199                 CERROR("No filesystem targets for %s. cfg_device from lctl "
5200                        "is '%s'\n", mti->mti_fsname, mti->mti_svname);
5201                 mgs_unlink_fsdb(mgs, fsdb);
5202                 GOTO(out, rc = -EINVAL);
5203         }
5204
5205         /*
5206          * Revoke lock so everyone updates.  Should be alright if
5207          * someone was already reading while we were updating the logs,
5208          * so we don't really need to hold the lock while we're
5209          * writing (above).
5210          */
5211         mti->mti_flags = dev_type | LDD_F_PARAM;
5212         mutex_lock(&fsdb->fsdb_mutex);
5213         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
5214         mutex_unlock(&fsdb->fsdb_mutex);
5215         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
5216
5217 out:
5218         if (fsdb)
5219                 mgs_put_fsdb(mgs, fsdb);
5220
5221         RETURN(rc);
5222 }
5223
5224 static int mgs_set_param2(const struct lu_env *env, struct mgs_device *mgs,
5225                           struct mgs_target_info *mti, const char *param)
5226 {
5227         struct fs_db *fsdb = NULL;
5228         int dev_type;
5229         size_t len;
5230         int rc;
5231
5232         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
5233             sizeof(mti->mti_params))
5234                 GOTO(out, rc = -E2BIG);
5235
5236         /* obdname2fsname reports devname as an obd device */
5237         len = strcspn(param, ".=");
5238         if (len && param[len] != '=') {
5239                 char *ptr;
5240
5241                 param += len + 1;
5242                 ptr = strchr(param, '.');
5243
5244                 len = strlen(param);
5245                 if (ptr)
5246                         len -= strlen(ptr);
5247                 if (len >= sizeof(mti->mti_svname))
5248                         GOTO(out, rc = -E2BIG);
5249
5250                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "%.*s",
5251                         (int)len, param);
5252
5253                 obdname2fsname(mti->mti_svname, mti->mti_fsname,
5254                                sizeof(mti->mti_fsname));
5255         } else {
5256                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "general");
5257         }
5258
5259         CDEBUG(D_MGS, "set_param2 fs='%s' device='%s' param='%s'\n",
5260                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5261
5262         /* The return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5263          * A returned error tells us we don't have a target obd device.
5264          */
5265         dev_type = server_name2index(mti->mti_svname, &mti->mti_stripe_index,
5266                                      NULL);
5267         if (dev_type < 0)
5268                 dev_type = 0;
5269
5270         /* the return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5271          * Strip -osc or -mdc suffix from svname
5272          */
5273         if ((dev_type & LDD_F_SV_TYPE_OST || dev_type & LDD_F_SV_TYPE_MDT) &&
5274             server_make_name(dev_type, mti->mti_stripe_index,
5275                              mti->mti_fsname, mti->mti_svname,
5276                              sizeof(mti->mti_svname)))
5277                 GOTO(out, rc = -EINVAL);
5278
5279         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5280         if (rc)
5281                 GOTO(out, rc);
5282         /*
5283          * Revoke lock so everyone updates.  Should be alright if
5284          * someone was already reading while we were updating the logs,
5285          * so we don't really need to hold the lock while we're
5286          * writing (above).
5287          */
5288         mti->mti_flags = dev_type | LDD_F_PARAM2;
5289         mutex_lock(&fsdb->fsdb_mutex);
5290         rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
5291         mutex_unlock(&fsdb->fsdb_mutex);
5292         mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
5293         mgs_put_fsdb(mgs, fsdb);
5294 out:
5295         RETURN(rc);
5296 }
5297
5298 /* Set a permanent (config log) param for a target or fs
5299  *
5300  * @lcfg buf0 may contain the device (testfs-MDT0000) name
5301  *       buf1 contains the single parameter
5302  */
5303 int mgs_set_param(const struct lu_env *env, struct mgs_device *mgs,
5304                   struct lustre_cfg *lcfg)
5305 {
5306         const char *param = lustre_cfg_string(lcfg, 1);
5307         struct mgs_target_info *mti;
5308         int rc;
5309
5310         /* Create a fake mti to hold everything */
5311         OBD_ALLOC_PTR(mti);
5312         if (!mti)
5313                 return -ENOMEM;
5314
5315         print_lustre_cfg(lcfg);
5316
5317         if (lcfg->lcfg_command == LCFG_PARAM) {
5318                 /* For the case of lctl conf_param devname can be
5319                  * lustre, lustre-mdtlov, lustre-client, lustre-MDT0000
5320                  */
5321                 const char *devname = lustre_cfg_string(lcfg, 0);
5322
5323                 rc = mgs_set_conf_param(env, mgs, mti, devname, param);
5324         } else {
5325                 /* In the case of lctl set_param -P lcfg[0] will always
5326                  * be 'general'. At least for now.
5327                  */
5328                 rc = mgs_set_param2(env, mgs, mti, param);
5329         }
5330
5331         OBD_FREE_PTR(mti);
5332
5333         return rc;
5334 }
5335
5336 static int mgs_write_log_pool(const struct lu_env *env,
5337                               struct mgs_device *mgs, char *logname,
5338                               struct fs_db *fsdb, char *tgtname,
5339                               enum lcfg_command_type cmd,
5340                               char *fsname, char *poolname,
5341                               char *ostname, char *comment)
5342 {
5343         struct llog_handle *llh = NULL;
5344         int rc;
5345
5346         rc = record_start_log(env, mgs, &llh, logname);
5347         if (rc)
5348                 return rc;
5349         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
5350         if (rc)
5351                 goto out;
5352         rc = record_base(env, llh, tgtname, 0, cmd,
5353                          fsname, poolname, ostname, NULL);
5354         if (rc)
5355                 goto out;
5356         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
5357 out:
5358         record_end_log(env, &llh);
5359         return rc;
5360 }
5361
5362 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
5363                     enum lcfg_command_type cmd, const char *nodemap_name,
5364                     char *param)
5365 {
5366         lnet_nid_t      nid[2];
5367         __u32           idmap[2];
5368         bool            bool_switch;
5369         __u32           int_id;
5370         int             rc = 0;
5371         ENTRY;
5372
5373         switch (cmd) {
5374         case LCFG_NODEMAP_ADD:
5375                 rc = nodemap_add(nodemap_name);
5376                 break;
5377         case LCFG_NODEMAP_DEL:
5378                 rc = nodemap_del(nodemap_name);
5379                 break;
5380         case LCFG_NODEMAP_ADD_RANGE:
5381                 rc = nodemap_parse_range(param, nid);
5382                 if (rc != 0)
5383                         break;
5384                 rc = nodemap_add_range(nodemap_name, nid);
5385                 break;
5386         case LCFG_NODEMAP_DEL_RANGE:
5387                 rc = nodemap_parse_range(param, nid);
5388                 if (rc != 0)
5389                         break;
5390                 rc = nodemap_del_range(nodemap_name, nid);
5391                 break;
5392         case LCFG_NODEMAP_ADMIN:
5393                 bool_switch = simple_strtoul(param, NULL, 10);
5394                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
5395                 break;
5396         case LCFG_NODEMAP_DENY_UNKNOWN:
5397                 bool_switch = simple_strtoul(param, NULL, 10);
5398                 rc = nodemap_set_deny_unknown(nodemap_name, bool_switch);
5399                 break;
5400         case LCFG_NODEMAP_AUDIT_MODE:
5401                 rc = kstrtoul(param, 10, (unsigned long *)&bool_switch);
5402                 if (rc == 0)
5403                         rc = nodemap_set_audit_mode(nodemap_name, bool_switch);
5404                 break;
5405         case LCFG_NODEMAP_MAP_MODE:
5406                 if (strcmp("both", param) == 0)
5407                         rc = nodemap_set_mapping_mode(nodemap_name,
5408                                                       NODEMAP_MAP_BOTH);
5409                 else if (strcmp("uid_only", param) == 0)
5410                         rc = nodemap_set_mapping_mode(nodemap_name,
5411                                                       NODEMAP_MAP_UID_ONLY);
5412                 else if (strcmp("gid_only", param) == 0)
5413                         rc = nodemap_set_mapping_mode(nodemap_name,
5414                                                       NODEMAP_MAP_GID_ONLY);
5415                 else
5416                         rc = -EINVAL;
5417                 break;
5418         case LCFG_NODEMAP_TRUSTED:
5419                 bool_switch = simple_strtoul(param, NULL, 10);
5420                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
5421                 break;
5422         case LCFG_NODEMAP_SQUASH_UID:
5423                 int_id = simple_strtoul(param, NULL, 10);
5424                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
5425                 break;
5426         case LCFG_NODEMAP_SQUASH_GID:
5427                 int_id = simple_strtoul(param, NULL, 10);
5428                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
5429                 break;
5430         case LCFG_NODEMAP_ADD_UIDMAP:
5431         case LCFG_NODEMAP_ADD_GIDMAP:
5432                 rc = nodemap_parse_idmap(param, idmap);
5433                 if (rc != 0)
5434                         break;
5435                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
5436                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
5437                                                idmap);
5438                 else
5439                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
5440                                                idmap);
5441                 break;
5442         case LCFG_NODEMAP_DEL_UIDMAP:
5443         case LCFG_NODEMAP_DEL_GIDMAP:
5444                 rc = nodemap_parse_idmap(param, idmap);
5445                 if (rc != 0)
5446                         break;
5447                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
5448                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
5449                                                idmap);
5450                 else
5451                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
5452                                                idmap);
5453                 break;
5454         case LCFG_NODEMAP_SET_FILESET:
5455                 rc = nodemap_set_fileset(nodemap_name, param);
5456                 break;
5457         default:
5458                 rc = -EINVAL;
5459         }
5460
5461         RETURN(rc);
5462 }
5463
5464 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
5465                  enum lcfg_command_type cmd, char *fsname,
5466                  char *poolname, char *ostname)
5467 {
5468         struct fs_db *fsdb;
5469         char *lovname;
5470         char *logname;
5471         char *label = NULL, *canceled_label = NULL;
5472         int label_sz;
5473         struct mgs_target_info *mti = NULL;
5474         bool checked = false;
5475         bool locked = false;
5476         bool free = false;
5477         int rc, i;
5478         ENTRY;
5479
5480         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
5481         if (rc) {
5482                 CERROR("Can't get db for %s\n", fsname);
5483                 RETURN(rc);
5484         }
5485         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5486                 CERROR("%s is not defined\n", fsname);
5487                 free = true;
5488                 GOTO(out_fsdb, rc = -EINVAL);
5489         }
5490
5491         label_sz = 10 + strlen(fsname) + strlen(poolname);
5492
5493         /* check if ostname match fsname */
5494         if (ostname != NULL) {
5495                 char *ptr;
5496
5497                 ptr = strrchr(ostname, '-');
5498                 if ((ptr == NULL) ||
5499                     (strncmp(fsname, ostname, ptr-ostname) != 0))
5500                         RETURN(-EINVAL);
5501                 label_sz += strlen(ostname);
5502         }
5503
5504         OBD_ALLOC(label, label_sz);
5505         if (!label)
5506                 GOTO(out_fsdb, rc = -ENOMEM);
5507
5508         switch(cmd) {
5509         case LCFG_POOL_NEW:
5510                 sprintf(label,
5511                         "new %s.%s", fsname, poolname);
5512                 break;
5513         case LCFG_POOL_ADD:
5514                 sprintf(label,
5515                         "add %s.%s.%s", fsname, poolname, ostname);
5516                 break;
5517         case LCFG_POOL_REM:
5518                 OBD_ALLOC(canceled_label, label_sz);
5519                 if (canceled_label == NULL)
5520                         GOTO(out_label, rc = -ENOMEM);
5521                 sprintf(label,
5522                         "rem %s.%s.%s", fsname, poolname, ostname);
5523                 sprintf(canceled_label,
5524                         "add %s.%s.%s", fsname, poolname, ostname);
5525                 break;
5526         case LCFG_POOL_DEL:
5527                 OBD_ALLOC(canceled_label, label_sz);
5528                 if (canceled_label == NULL)
5529                         GOTO(out_label, rc = -ENOMEM);
5530                 sprintf(label,
5531                         "del %s.%s", fsname, poolname);
5532                 sprintf(canceled_label,
5533                         "new %s.%s", fsname, poolname);
5534                 break;
5535         default:
5536                 break;
5537         }
5538
5539         OBD_ALLOC_PTR(mti);
5540         if (mti == NULL)
5541                 GOTO(out_cancel, rc = -ENOMEM);
5542         strncpy(mti->mti_svname, "lov pool", sizeof(mti->mti_svname));
5543
5544         mutex_lock(&fsdb->fsdb_mutex);
5545         locked = true;
5546         /* write pool def to all MDT logs */
5547         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
5548                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
5549                         rc = name_create_mdt_and_lov(&logname, &lovname,
5550                                                      fsdb, i);
5551                         if (rc)
5552                                 GOTO(out_mti, rc);
5553
5554                         if (!checked && (canceled_label == NULL)) {
5555                                 rc = mgs_check_marker(env, mgs, fsdb, mti,
5556                                                 logname, lovname, label);
5557                                 if (rc) {
5558                                         name_destroy(&logname);
5559                                         name_destroy(&lovname);
5560                                         GOTO(out_mti,
5561                                                 rc = (rc == LLOG_PROC_BREAK ?
5562                                                         -EEXIST : rc));
5563                                 }
5564                                 checked = true;
5565                         }
5566                         if (canceled_label != NULL)
5567                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5568                                                 lovname, canceled_label,
5569                                                 CM_SKIP);
5570
5571                         if (rc >= 0)
5572                                 rc = mgs_write_log_pool(env, mgs, logname,
5573                                                         fsdb, lovname, cmd,
5574                                                         fsname, poolname,
5575                                                         ostname, label);
5576                         name_destroy(&logname);
5577                         name_destroy(&lovname);
5578                         if (rc)
5579                                 GOTO(out_mti, rc);
5580                 }
5581         }
5582
5583         rc = name_create(&logname, fsname, "-client");
5584         if (rc)
5585                 GOTO(out_mti, rc);
5586
5587         if (!checked && (canceled_label == NULL)) {
5588                 rc = mgs_check_marker(env, mgs, fsdb, mti, logname,
5589                                 fsdb->fsdb_clilov, label);
5590                 if (rc) {
5591                         name_destroy(&logname);
5592                         GOTO(out_mti, rc = (rc == LLOG_PROC_BREAK ?
5593                                 -EEXIST : rc));
5594                 }
5595         }
5596         if (canceled_label != NULL) {
5597                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5598                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
5599                 if (rc < 0) {
5600                         name_destroy(&logname);
5601                         GOTO(out_mti, rc);
5602                 }
5603         }
5604
5605         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
5606                                 cmd, fsname, poolname, ostname, label);
5607         mutex_unlock(&fsdb->fsdb_mutex);
5608         locked = false;
5609         name_destroy(&logname);
5610         /* request for update */
5611         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
5612
5613         GOTO(out_mti, rc);
5614
5615 out_mti:
5616         if (locked)
5617                 mutex_unlock(&fsdb->fsdb_mutex);
5618         if (mti != NULL)
5619                 OBD_FREE_PTR(mti);
5620 out_cancel:
5621         if (canceled_label != NULL)
5622                 OBD_FREE(canceled_label, label_sz);
5623 out_label:
5624         OBD_FREE(label, label_sz);
5625 out_fsdb:
5626         if (free)
5627                 mgs_unlink_fsdb(mgs, fsdb);
5628         mgs_put_fsdb(mgs, fsdb);
5629
5630         return rc;
5631 }