Whamcloud - gitweb
LU-12635 build: Support for gcc -Wimplicit-fallthrough
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgs/mgs_llog.c
33  *
34  * Lustre Management Server (mgs) config llog creation
35  *
36  * Author: Nathan Rutman <nathan@clusterfs.com>
37  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
38  * Author: Mikhail Pershin <tappro@whamcloud.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MGS
42 #define D_MGS D_CONFIG
43
44 #include <obd.h>
45 #include <uapi/linux/lustre/lustre_ioctl.h>
46 #include <uapi/linux/lustre/lustre_param.h>
47 #include <lustre_sec.h>
48 #include <lustre_quota.h>
49 #include <lustre_sec.h>
50
51 #include "mgs_internal.h"
52
53 /********************** Class functions ********************/
54
55 /**
56  * Find all logs in CONFIG directory and link then into list.
57  *
58  * \param[in] env       pointer to the thread context
59  * \param[in] mgs       pointer to the mgs device
60  * \param[out] log_list the list to hold the found llog name entry
61  *
62  * \retval              0 for success
63  * \retval              negative error number on failure
64  **/
65 int class_dentry_readdir(const struct lu_env *env, struct mgs_device *mgs,
66                          struct list_head *log_list)
67 {
68         struct dt_object *dir = mgs->mgs_configs_dir;
69         const struct dt_it_ops *iops;
70         struct dt_it *it;
71         struct mgs_direntry *de;
72         char *key;
73         int rc, key_sz;
74         size_t suffix_len = sizeof(".bak") - 1;
75
76         INIT_LIST_HEAD(log_list);
77
78         LASSERT(dir);
79         LASSERT(dir->do_index_ops);
80
81         iops = &dir->do_index_ops->dio_it;
82         it = iops->init(env, dir, LUDA_64BITHASH);
83         if (IS_ERR(it))
84                 RETURN(PTR_ERR(it));
85
86         rc = iops->load(env, it, 0);
87         if (rc <= 0)
88                 GOTO(fini, rc = 0);
89
90         /* main cycle */
91         do {
92                 key = (void *)iops->key(env, it);
93                 if (IS_ERR(key)) {
94                         CERROR("%s: key failed when listing %s: rc = %d\n",
95                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
96                                (int) PTR_ERR(key));
97                         goto next;
98                 }
99                 key_sz = iops->key_size(env, it);
100                 LASSERT(key_sz > 0);
101
102                 /* filter out "." and ".." entries */
103                 if (key[0] == '.') {
104                         if (key_sz == 1)
105                                 goto next;
106                         if (key_sz == 2 && key[1] == '.')
107                                 goto next;
108                 }
109
110                 /* filter out ".bak" files */
111                 if (key_sz >= suffix_len &&
112                     !memcmp(".bak", key + key_sz - suffix_len, suffix_len)) {
113                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
114                                key_sz, key);
115                         goto next;
116                 }
117
118                 de = mgs_direntry_alloc(key_sz + 1);
119                 if (de == NULL) {
120                         rc = -ENOMEM;
121                         break;
122                 }
123
124                 memcpy(de->mde_name, key, key_sz);
125                 de->mde_name[key_sz] = 0;
126
127                 list_add(&de->mde_list, log_list);
128
129 next:
130                 rc = iops->next(env, it);
131         } while (rc == 0);
132         if (rc > 0)
133                 rc = 0;
134
135         iops->put(env, it);
136
137 fini:
138         iops->fini(env, it);
139         if (rc) {
140                 struct mgs_direntry *n;
141
142                 CERROR("%s: key failed when listing %s: rc = %d\n",
143                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
144
145                 list_for_each_entry_safe(de, n, log_list, mde_list) {
146                         list_del_init(&de->mde_list);
147                         mgs_direntry_free(de);
148                 }
149         }
150
151         RETURN(rc);
152 }
153
154 /******************** DB functions *********************/
155
156 static inline int name_create(char **newname, char *prefix, char *suffix)
157 {
158         LASSERT(newname);
159         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
160         if (!*newname)
161                 return -ENOMEM;
162         sprintf(*newname, "%s%s", prefix, suffix);
163         return 0;
164 }
165
166 static inline void name_destroy(char **name)
167 {
168         if (*name)
169                 OBD_FREE(*name, strlen(*name) + 1);
170         *name = NULL;
171 }
172
173 struct mgs_fsdb_handler_data
174 {
175         struct fs_db   *fsdb;
176         __u32           ver;
177 };
178
179 /* from the (client) config log, figure out:
180         1. which ost's/mdt's are configured (by index)
181         2. what the last config step is
182         3. COMPAT_18 osc name
183 */
184 /* It might be better to have a separate db file, instead of parsing the info
185    out of the client log.  This is slow and potentially error-prone. */
186 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
187                             struct llog_rec_hdr *rec, void *data)
188 {
189         struct mgs_fsdb_handler_data *d = data;
190         struct fs_db *fsdb = d->fsdb;
191         int cfg_len = rec->lrh_len;
192         char *cfg_buf = (char*) (rec + 1);
193         struct lustre_cfg *lcfg;
194         u32 index;
195         int rc = 0;
196         ENTRY;
197
198         if (rec->lrh_type != OBD_CFG_REC) {
199                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
200                 RETURN(-EINVAL);
201         }
202
203         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
204         if (rc) {
205                 CERROR("Insane cfg\n");
206                 RETURN(rc);
207         }
208
209         lcfg = (struct lustre_cfg *)cfg_buf;
210
211         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
212                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
213
214         /* Figure out ost indicies */
215         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
216         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
217             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
218                 rc = kstrtouint(lustre_cfg_string(lcfg, 2), 10, &index);
219                 if (rc)
220                         RETURN(rc);
221
222                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
223                        lustre_cfg_string(lcfg, 1), index,
224                        lustre_cfg_string(lcfg, 2));
225                 set_bit(index, fsdb->fsdb_ost_index_map);
226         }
227
228         /* Figure out mdt indicies */
229         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
230         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
231             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
232                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
233                                        &index, NULL);
234                 if (rc != LDD_F_SV_TYPE_MDT) {
235                         CWARN("Unparsable MDC name %s, assuming index 0\n",
236                               lustre_cfg_string(lcfg, 0));
237                         index = 0;
238                 }
239                 rc = 0;
240                 CDEBUG(D_MGS, "MDT index is %u\n", index);
241                 if (!test_bit(index, fsdb->fsdb_mdt_index_map)) {
242                         set_bit(index, fsdb->fsdb_mdt_index_map);
243                         fsdb->fsdb_mdt_count++;
244                 }
245         }
246
247         /**
248          * figure out the old config. fsdb_gen = 0 means old log
249          * It is obsoleted and not supported anymore
250          */
251         if (fsdb->fsdb_gen == 0) {
252                 CERROR("Old config format is not supported\n");
253                 RETURN(-EINVAL);
254         }
255
256         /*
257          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
258          */
259         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
260             lcfg->lcfg_command == LCFG_ATTACH &&
261             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
262                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
263                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
264                         CWARN("MDT using 1.8 OSC name scheme\n");
265                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
266                 }
267         }
268
269         if (lcfg->lcfg_command == LCFG_MARKER) {
270                 struct cfg_marker *marker;
271                 marker = lustre_cfg_buf(lcfg, 1);
272
273                 d->ver = marker->cm_vers;
274
275                 /* Keep track of the latest marker step */
276                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
277         }
278
279         RETURN(rc);
280 }
281
282 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
283 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
284                                   struct mgs_device *mgs,
285                                   struct fs_db *fsdb)
286 {
287         char *logname;
288         struct llog_handle *loghandle;
289         struct llog_ctxt *ctxt;
290         struct mgs_fsdb_handler_data d = {
291                 .fsdb = fsdb,
292         };
293         int rc;
294
295         ENTRY;
296
297         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
298         LASSERT(ctxt != NULL);
299         rc = name_create(&logname, fsdb->fsdb_name, "-client");
300         if (rc)
301                 GOTO(out_put, rc);
302         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
303         if (rc)
304                 GOTO(out_pop, rc);
305
306         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
307         if (rc)
308                 GOTO(out_close, rc);
309
310         if (llog_get_size(loghandle) <= 1)
311                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
312
313         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
314         CDEBUG(D_INFO, "get_db = %d\n", rc);
315 out_close:
316         llog_close(env, loghandle);
317 out_pop:
318         name_destroy(&logname);
319 out_put:
320         llog_ctxt_put(ctxt);
321
322         RETURN(rc);
323 }
324
325 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
326 {
327         struct mgs_tgt_srpc_conf *tgtconf;
328
329         /* free target-specific rules */
330         while (fsdb->fsdb_srpc_tgt) {
331                 tgtconf = fsdb->fsdb_srpc_tgt;
332                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
333
334                 LASSERT(tgtconf->mtsc_tgt);
335
336                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
337                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
338                 OBD_FREE_PTR(tgtconf);
339         }
340
341         /* free general rules */
342         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
343 }
344
345 static void mgs_unlink_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
346 {
347         mutex_lock(&mgs->mgs_mutex);
348         if (likely(!list_empty(&fsdb->fsdb_list))) {
349                 LASSERTF(atomic_read(&fsdb->fsdb_ref) >= 2,
350                          "Invalid ref %d on %s\n",
351                          atomic_read(&fsdb->fsdb_ref),
352                          fsdb->fsdb_name);
353
354                 list_del_init(&fsdb->fsdb_list);
355                 /* Drop the reference on the list.*/
356                 mgs_put_fsdb(mgs, fsdb);
357         }
358         mutex_unlock(&mgs->mgs_mutex);
359 }
360
361 /* The caller must hold mgs->mgs_mutex. */
362 static inline struct fs_db *
363 mgs_find_fsdb_noref(struct mgs_device *mgs, const char *fsname)
364 {
365         struct fs_db *fsdb;
366         struct list_head *tmp;
367
368         list_for_each(tmp, &mgs->mgs_fs_db_list) {
369                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
370                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
371                         return fsdb;
372         }
373
374         return NULL;
375 }
376
377 /* The caller must hold mgs->mgs_mutex. */
378 static void mgs_remove_fsdb_by_name(struct mgs_device *mgs, const char *name)
379 {
380         struct fs_db *fsdb;
381
382         fsdb = mgs_find_fsdb_noref(mgs, name);
383         if (fsdb) {
384                 list_del_init(&fsdb->fsdb_list);
385                 /* Drop the reference on the list.*/
386                 mgs_put_fsdb(mgs, fsdb);
387         }
388 }
389
390 /* The caller must hold mgs->mgs_mutex. */
391 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, const char *fsname)
392 {
393         struct fs_db *fsdb;
394
395         fsdb = mgs_find_fsdb_noref(mgs, fsname);
396         if (fsdb)
397                 atomic_inc(&fsdb->fsdb_ref);
398
399         return fsdb;
400 }
401
402 /* The caller must hold mgs->mgs_mutex. */
403 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
404                                   struct mgs_device *mgs, char *fsname)
405 {
406         struct fs_db *fsdb;
407         int rc;
408         ENTRY;
409
410         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
411                 CERROR("fsname %s is too long\n", fsname);
412
413                 RETURN(ERR_PTR(-EINVAL));
414         }
415
416         OBD_ALLOC_PTR(fsdb);
417         if (!fsdb)
418                 RETURN(ERR_PTR(-ENOMEM));
419
420         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
421         mutex_init(&fsdb->fsdb_mutex);
422         INIT_LIST_HEAD(&fsdb->fsdb_list);
423         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
424         fsdb->fsdb_gen = 1;
425         INIT_LIST_HEAD(&fsdb->fsdb_clients);
426         atomic_set(&fsdb->fsdb_notify_phase, 0);
427         init_waitqueue_head(&fsdb->fsdb_notify_waitq);
428         init_completion(&fsdb->fsdb_notify_comp);
429
430         if (strcmp(fsname, MGSSELF_NAME) == 0) {
431                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
432                 fsdb->fsdb_mgs = mgs;
433                 if (logname_is_barrier(fsname))
434                         goto add;
435         } else {
436                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
437                 if (!fsdb->fsdb_mdt_index_map) {
438                         CERROR("No memory for MDT index maps\n");
439
440                         GOTO(err, rc = -ENOMEM);
441                 }
442
443                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
444                 if (!fsdb->fsdb_ost_index_map) {
445                         CERROR("No memory for OST index maps\n");
446
447                         GOTO(err, rc = -ENOMEM);
448                 }
449
450                 if (logname_is_barrier(fsname))
451                         goto add;
452
453                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
454                 if (rc)
455                         GOTO(err, rc);
456
457                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
458                 if (rc)
459                         GOTO(err, rc);
460
461                 /* initialise data for NID table */
462                 mgs_ir_init_fs(env, mgs, fsdb);
463                 lproc_mgs_add_live(mgs, fsdb);
464         }
465
466         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
467             strcmp(PARAMS_FILENAME, fsname) != 0) {
468                 /* populate the db from the client llog */
469                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
470                 if (rc) {
471                         CERROR("Can't get db from client log %d\n", rc);
472
473                         GOTO(err, rc);
474                 }
475         }
476
477         /* populate srpc rules from params llog */
478         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
479         if (rc) {
480                 CERROR("Can't get db from params log %d\n", rc);
481
482                 GOTO(err, rc);
483         }
484
485 add:
486         /* One ref is for the fsdb on the list.
487          * The other ref is for the caller. */
488         atomic_set(&fsdb->fsdb_ref, 2);
489         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
490
491         RETURN(fsdb);
492
493 err:
494         atomic_set(&fsdb->fsdb_ref, 1);
495         mgs_put_fsdb(mgs, fsdb);
496
497         RETURN(ERR_PTR(rc));
498 }
499
500 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
501 {
502         LASSERT(list_empty(&fsdb->fsdb_list));
503
504         lproc_mgs_del_live(mgs, fsdb);
505
506         /* deinitialize fsr */
507         if (fsdb->fsdb_mgs)
508                 mgs_ir_fini_fs(mgs, fsdb);
509
510         if (fsdb->fsdb_ost_index_map)
511                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
512         if (fsdb->fsdb_mdt_index_map)
513                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
514         name_destroy(&fsdb->fsdb_clilov);
515         name_destroy(&fsdb->fsdb_clilmv);
516         mgs_free_fsdb_srpc(fsdb);
517         OBD_FREE_PTR(fsdb);
518 }
519
520 void mgs_put_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
521 {
522         if (atomic_dec_and_test(&fsdb->fsdb_ref))
523                 mgs_free_fsdb(mgs, fsdb);
524 }
525
526 int mgs_init_fsdb_list(struct mgs_device *mgs)
527 {
528         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
529         return 0;
530 }
531
532 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
533 {
534         struct fs_db *fsdb;
535         struct list_head *tmp, *tmp2;
536
537         mutex_lock(&mgs->mgs_mutex);
538         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
539                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
540                 list_del_init(&fsdb->fsdb_list);
541                 mgs_put_fsdb(mgs, fsdb);
542         }
543         mutex_unlock(&mgs->mgs_mutex);
544         return 0;
545 }
546
547 /* The caller must hold mgs->mgs_mutex. */
548 int mgs_find_or_make_fsdb_nolock(const struct lu_env *env,
549                                 struct mgs_device *mgs,
550                                 char *name, struct fs_db **dbh)
551 {
552         struct fs_db *fsdb;
553         int rc = 0;
554         ENTRY;
555
556         fsdb = mgs_find_fsdb(mgs, name);
557         if (!fsdb) {
558                 fsdb = mgs_new_fsdb(env, mgs, name);
559                 if (IS_ERR(fsdb))
560                         rc = PTR_ERR(fsdb);
561
562                 CDEBUG(D_MGS, "Created new db: rc = %d\n", rc);
563         }
564
565         if (!rc)
566                 *dbh = fsdb;
567
568         RETURN(rc);
569 }
570
571 int mgs_find_or_make_fsdb(const struct lu_env *env, struct mgs_device *mgs,
572                           char *name, struct fs_db **dbh)
573 {
574         int rc;
575         ENTRY;
576
577         mutex_lock(&mgs->mgs_mutex);
578         rc = mgs_find_or_make_fsdb_nolock(env, mgs, name, dbh);
579         mutex_unlock(&mgs->mgs_mutex);
580
581         RETURN(rc);
582 }
583
584 /* 1 = index in use
585    0 = index unused
586    -1= empty client log */
587 int mgs_check_index(const struct lu_env *env,
588                     struct mgs_device *mgs,
589                     struct mgs_target_info *mti)
590 {
591         struct fs_db *fsdb;
592         void *imap;
593         int rc = 0;
594         ENTRY;
595
596         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
597
598         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
599         if (rc) {
600                 CERROR("Can't get db for %s\n", mti->mti_fsname);
601                 RETURN(rc);
602         }
603
604         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
605                 GOTO(out, rc = -1);
606
607         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
608                 imap = fsdb->fsdb_ost_index_map;
609         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
610                 imap = fsdb->fsdb_mdt_index_map;
611         else
612                 GOTO(out, rc = -EINVAL);
613
614         if (test_bit(mti->mti_stripe_index, imap))
615                 GOTO(out, rc = 1);
616
617         GOTO(out, rc = 0);
618
619 out:
620         mgs_put_fsdb(mgs, fsdb);
621         return rc;
622 }
623
624 static __inline__ int next_index(void *index_map, int map_len)
625 {
626         int i;
627         for (i = 0; i < map_len * 8; i++)
628                 if (!test_bit(i, index_map)) {
629                          return i;
630                  }
631         CERROR("max index %d exceeded.\n", i);
632         return -1;
633 }
634
635 /* Make the mdt/ost server obd name based on the filesystem name */
636 static bool server_make_name(u32 flags, u16 index, const char *fs,
637                              char *name_buf, size_t name_buf_size)
638 {
639         bool invalid_flag = false;
640
641         if (flags & (LDD_F_SV_TYPE_MDT | LDD_F_SV_TYPE_OST)) {
642                 if (!(flags & LDD_F_SV_ALL))
643                         snprintf(name_buf, name_buf_size, "%.8s%c%s%04x", fs,
644                                 (flags & LDD_F_VIRGIN) ? ':' :
645                                         ((flags & LDD_F_WRITECONF) ? '=' : '-'),
646                                 (flags & LDD_F_SV_TYPE_MDT) ? "MDT" : "OST",
647                                 index);
648         } else if (flags & LDD_F_SV_TYPE_MGS) {
649                 snprintf(name_buf, name_buf_size, "MGS");
650         } else {
651                 CERROR("unknown server type %#x\n", flags);
652                 invalid_flag = true;
653         }
654         return invalid_flag;
655 }
656
657 /* Return codes:
658         0  newly marked as in use
659         <0 err
660         +EALREADY for update of an old index */
661 static int mgs_set_index(const struct lu_env *env,
662                          struct mgs_device *mgs,
663                          struct mgs_target_info *mti)
664 {
665         struct fs_db *fsdb;
666         void *imap;
667         int rc = 0;
668         ENTRY;
669
670         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
671         if (rc) {
672                 CERROR("Can't get db for %s\n", mti->mti_fsname);
673                 RETURN(rc);
674         }
675
676         mutex_lock(&fsdb->fsdb_mutex);
677         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
678                 imap = fsdb->fsdb_ost_index_map;
679         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
680                 imap = fsdb->fsdb_mdt_index_map;
681         } else {
682                 GOTO(out_up, rc = -EINVAL);
683         }
684
685         if (mti->mti_flags & LDD_F_NEED_INDEX) {
686                 rc = next_index(imap, INDEX_MAP_SIZE);
687                 if (rc == -1)
688                         GOTO(out_up, rc = -ERANGE);
689                 mti->mti_stripe_index = rc;
690         }
691
692         /* the last index(0xffff) is reserved for default value. */
693         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
694                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
695                                    "but index must be less than %u.\n",
696                                    mti->mti_svname, mti->mti_stripe_index,
697                                    INDEX_MAP_SIZE * 8 - 1);
698                 GOTO(out_up, rc = -ERANGE);
699         }
700
701         if (test_bit(mti->mti_stripe_index, imap)) {
702                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
703                     !(mti->mti_flags & LDD_F_WRITECONF)) {
704                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
705                                            "%d, but that index is already in "
706                                            "use. Use --writeconf to force\n",
707                                            mti->mti_svname,
708                                            mti->mti_stripe_index);
709                         GOTO(out_up, rc = -EADDRINUSE);
710                 } else {
711                         CDEBUG(D_MGS, "Server %s updating index %d\n",
712                                mti->mti_svname, mti->mti_stripe_index);
713                         GOTO(out_up, rc = EALREADY);
714                 }
715         } else {
716                 set_bit(mti->mti_stripe_index, imap);
717                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
718                         fsdb->fsdb_mdt_count++;
719         }
720
721         set_bit(mti->mti_stripe_index, imap);
722         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
723         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
724                              mti->mti_stripe_index, mti->mti_fsname,
725                              mti->mti_svname, sizeof(mti->mti_svname))) {
726                 CERROR("unknown server type %#x\n", mti->mti_flags);
727                 GOTO(out_up, rc = -EINVAL);
728         }
729
730         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
731                mti->mti_stripe_index);
732
733         GOTO(out_up, rc = 0);
734
735 out_up:
736         mutex_unlock(&fsdb->fsdb_mutex);
737         mgs_put_fsdb(mgs, fsdb);
738         return rc;
739 }
740
741 struct mgs_modify_lookup {
742         struct cfg_marker mml_marker;
743         int               mml_modified;
744 };
745
746 static int mgs_check_record_match(const struct lu_env *env,
747                                 struct llog_handle *llh,
748                                 struct llog_rec_hdr *rec, void *data)
749 {
750         struct cfg_marker *mc_marker = data;
751         struct cfg_marker *marker;
752         struct lustre_cfg *lcfg = REC_DATA(rec);
753         int cfg_len = REC_DATA_LEN(rec);
754         int rc;
755         ENTRY;
756
757
758         if (rec->lrh_type != OBD_CFG_REC) {
759                 CDEBUG(D_ERROR, "Unhandled lrh_type: %#x\n", rec->lrh_type);
760                 RETURN(-EINVAL);
761         }
762
763         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
764         if (rc) {
765                 CDEBUG(D_ERROR, "Insane cfg\n");
766                 RETURN(rc);
767         }
768
769         /* We only care about markers */
770         if (lcfg->lcfg_command != LCFG_MARKER)
771                 RETURN(0);
772
773         marker = lustre_cfg_buf(lcfg, 1);
774
775         if (marker->cm_flags & CM_SKIP)
776                 RETURN(0);
777
778         if ((strcmp(mc_marker->cm_comment, marker->cm_comment) == 0) &&
779                 (strcmp(mc_marker->cm_tgtname, marker->cm_tgtname) == 0)) {
780                 /* Found a non-skipped marker match */
781                 CDEBUG(D_MGS, "Matched rec %u marker %d flag %x %s %s\n",
782                         rec->lrh_index, marker->cm_step,
783                         marker->cm_flags, marker->cm_tgtname,
784                         marker->cm_comment);
785                 rc = LLOG_PROC_BREAK;
786         }
787
788         RETURN(rc);
789 }
790
791 /**
792  * Check an existing config log record with matching comment and device
793  * Return code:
794  * 0 - checked successfully,
795  * LLOG_PROC_BREAK - record matches
796  * negative - error
797  */
798 static int mgs_check_marker(const struct lu_env *env, struct mgs_device *mgs,
799                 struct fs_db *fsdb, struct mgs_target_info *mti,
800                 char *logname, char *devname, char *comment)
801 {
802         struct llog_handle *loghandle;
803         struct llog_ctxt *ctxt;
804         struct cfg_marker *mc_marker;
805         int rc;
806
807         ENTRY;
808
809         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
810         CDEBUG(D_MGS, "mgs check %s/%s/%s\n", logname, devname, comment);
811
812         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
813         LASSERT(ctxt != NULL);
814         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
815         if (rc < 0) {
816                 if (rc == -ENOENT)
817                         rc = 0;
818                 GOTO(out_pop, rc);
819         }
820
821         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
822         if (rc)
823                 GOTO(out_close, rc);
824
825         if (llog_get_size(loghandle) <= 1)
826                 GOTO(out_close, rc = 0);
827
828         OBD_ALLOC_PTR(mc_marker);
829         if (!mc_marker)
830                 GOTO(out_close, rc = -ENOMEM);
831         if (strlcpy(mc_marker->cm_comment, comment,
832                 sizeof(mc_marker->cm_comment)) >=
833                 sizeof(mc_marker->cm_comment))
834                 GOTO(out_free, rc = -E2BIG);
835         if (strlcpy(mc_marker->cm_tgtname, devname,
836                 sizeof(mc_marker->cm_tgtname)) >=
837                 sizeof(mc_marker->cm_tgtname))
838                 GOTO(out_free, rc = -E2BIG);
839
840         rc = llog_process(env, loghandle, mgs_check_record_match,
841                         (void *)mc_marker, NULL);
842
843 out_free:
844         OBD_FREE_PTR(mc_marker);
845
846 out_close:
847         llog_close(env, loghandle);
848 out_pop:
849         if (rc && rc != LLOG_PROC_BREAK)
850                 CDEBUG(D_ERROR, "%s: mgs check %s/%s failed: rc = %d\n",
851                         mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
852         llog_ctxt_put(ctxt);
853         RETURN(rc);
854 }
855
856 static int mgs_modify_handler(const struct lu_env *env,
857                               struct llog_handle *llh,
858                               struct llog_rec_hdr *rec, void *data)
859 {
860         struct mgs_modify_lookup *mml = data;
861         struct cfg_marker *marker;
862         struct lustre_cfg *lcfg = REC_DATA(rec);
863         int cfg_len = REC_DATA_LEN(rec);
864         int rc;
865         ENTRY;
866
867         if (rec->lrh_type != OBD_CFG_REC) {
868                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
869                 RETURN(-EINVAL);
870         }
871
872         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
873         if (rc) {
874                 CERROR("Insane cfg\n");
875                 RETURN(rc);
876         }
877
878         /* We only care about markers */
879         if (lcfg->lcfg_command != LCFG_MARKER)
880                 RETURN(0);
881
882         marker = lustre_cfg_buf(lcfg, 1);
883         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
884             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
885             !(marker->cm_flags & CM_SKIP)) {
886                 /* Found a non-skipped marker match */
887                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
888                        rec->lrh_index, marker->cm_step,
889                        marker->cm_flags, mml->mml_marker.cm_flags,
890                        marker->cm_tgtname, marker->cm_comment);
891                 /* Overwrite the old marker llog entry */
892                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
893                 marker->cm_flags |= mml->mml_marker.cm_flags;
894                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
895                 rc = llog_write(env, llh, rec, rec->lrh_index);
896                 if (!rc)
897                          mml->mml_modified++;
898         }
899
900         RETURN(rc);
901 }
902
903 /**
904  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
905  * Return code:
906  * 0 - modified successfully,
907  * 1 - no modification was done
908  * negative - error
909  */
910 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
911                       struct fs_db *fsdb, struct mgs_target_info *mti,
912                       char *logname, char *devname, char *comment, int flags)
913 {
914         struct llog_handle *loghandle;
915         struct llog_ctxt *ctxt;
916         struct mgs_modify_lookup *mml;
917         int rc;
918
919         ENTRY;
920
921         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
922         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
923                flags);
924
925         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
926         LASSERT(ctxt != NULL);
927         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
928         if (rc < 0) {
929                 if (rc == -ENOENT)
930                         rc = 0;
931                 GOTO(out_pop, rc);
932         }
933
934         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
935         if (rc)
936                 GOTO(out_close, rc);
937
938         if (llog_get_size(loghandle) <= 1)
939                 GOTO(out_close, rc = 0);
940
941         OBD_ALLOC_PTR(mml);
942         if (!mml)
943                 GOTO(out_close, rc = -ENOMEM);
944         if (strlcpy(mml->mml_marker.cm_comment, comment,
945                     sizeof(mml->mml_marker.cm_comment)) >=
946             sizeof(mml->mml_marker.cm_comment))
947                 GOTO(out_free, rc = -E2BIG);
948         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
949                     sizeof(mml->mml_marker.cm_tgtname)) >=
950             sizeof(mml->mml_marker.cm_tgtname))
951                 GOTO(out_free, rc = -E2BIG);
952         /* Modify mostly means cancel */
953         mml->mml_marker.cm_flags = flags;
954         mml->mml_marker.cm_canceltime = flags ? ktime_get_real_seconds() : 0;
955         mml->mml_modified = 0;
956         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
957                           NULL);
958         if (!rc && !mml->mml_modified)
959                 rc = 1;
960
961 out_free:
962         OBD_FREE_PTR(mml);
963
964 out_close:
965         llog_close(env, loghandle);
966 out_pop:
967         if (rc < 0)
968                 CERROR("%s: modify %s/%s failed: rc = %d\n",
969                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
970         llog_ctxt_put(ctxt);
971         RETURN(rc);
972 }
973
974 enum replace_state {
975         REPLACE_COPY = 0,
976         REPLACE_SKIP,
977         REPLACE_DONE,
978         REPLACE_UUID,
979         REPLACE_SETUP
980 };
981
982 /** This structure is passed to mgs_replace_handler */
983 struct mgs_replace_data {
984         /* Nids are replaced for this target device */
985         struct mgs_target_info target;
986         /* Temporary modified llog */
987         struct llog_handle *temp_llh;
988         enum replace_state state;
989         char *failover;
990         char *nodeuuid;
991 };
992
993 /**
994  * Check: a) if block should be skipped
995  * b) is it target block
996  *
997  * \param[in] lcfg
998  * \param[in] mrd
999  *
1000  * \retval 0 should not to be skipped
1001  * \retval 1 should to be skipped
1002  */
1003 static int check_markers(struct lustre_cfg *lcfg,
1004                          struct mgs_replace_data *mrd)
1005 {
1006          struct cfg_marker *marker;
1007
1008         /* Track markers. Find given device */
1009         if (lcfg->lcfg_command == LCFG_MARKER) {
1010                 marker = lustre_cfg_buf(lcfg, 1);
1011                 /* Clean llog from records marked as CM_SKIP.
1012                    CM_EXCLUDE records are used for "active" command
1013                    and can be restored if needed */
1014                 if ((marker->cm_flags & (CM_SKIP | CM_START)) ==
1015                     (CM_SKIP | CM_START)) {
1016                         mrd->state = REPLACE_SKIP;
1017                         return 1;
1018                 }
1019
1020                 if ((marker->cm_flags & (CM_SKIP | CM_END)) ==
1021                     (CM_SKIP | CM_END)) {
1022                         mrd->state = REPLACE_COPY;
1023                         return 1;
1024                 }
1025
1026                 if (strcmp(mrd->target.mti_svname, marker->cm_tgtname) == 0) {
1027                         LASSERT(!(marker->cm_flags & CM_START) ||
1028                                 !(marker->cm_flags & CM_END));
1029                         if (marker->cm_flags & CM_START) {
1030                                 mrd->state = REPLACE_UUID;
1031                                 mrd->failover = NULL;
1032                         } else if (marker->cm_flags & CM_END)
1033                                 mrd->state = REPLACE_COPY;
1034                 }
1035         }
1036
1037         return 0;
1038 }
1039
1040 static int record_base(const struct lu_env *env, struct llog_handle *llh,
1041                      char *cfgname, lnet_nid_t nid, int cmd,
1042                      char *s1, char *s2, char *s3, char *s4)
1043 {
1044         struct mgs_thread_info  *mgi = mgs_env_info(env);
1045         struct llog_cfg_rec     *lcr;
1046         int rc;
1047
1048         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
1049                cmd, s1, s2, s3, s4);
1050
1051         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
1052         if (s1)
1053                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
1054         if (s2)
1055                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
1056         if (s3)
1057                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
1058         if (s4)
1059                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
1060
1061         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
1062         if (lcr == NULL)
1063                 return -ENOMEM;
1064
1065         lcr->lcr_cfg.lcfg_nid = nid;
1066         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1067
1068         lustre_cfg_rec_free(lcr);
1069
1070         if (rc < 0)
1071                 CDEBUG(D_MGS,
1072                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
1073                        cfgname, cmd, s1, s2, s3, s4, rc);
1074         return rc;
1075 }
1076
1077 static inline int record_add_uuid(const struct lu_env *env,
1078                                   struct llog_handle *llh,
1079                                   uint64_t nid, char *uuid)
1080 {
1081         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
1082                            NULL, NULL, NULL);
1083 }
1084
1085 static inline int record_add_conn(const struct lu_env *env,
1086                                   struct llog_handle *llh,
1087                                   char *devname, char *uuid)
1088 {
1089         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
1090                            NULL, NULL, NULL);
1091 }
1092
1093 static inline int record_attach(const struct lu_env *env,
1094                                 struct llog_handle *llh, char *devname,
1095                                 char *type, char *uuid)
1096 {
1097         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
1098                            NULL, NULL);
1099 }
1100
1101 static inline int record_setup(const struct lu_env *env,
1102                                struct llog_handle *llh, char *devname,
1103                                char *s1, char *s2, char *s3, char *s4)
1104 {
1105         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
1106 }
1107
1108 /**
1109  * \retval <0 record processing error
1110  * \retval n record is processed. No need copy original one.
1111  * \retval 0 record is not processed.
1112  */
1113 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
1114                            struct mgs_replace_data *mrd)
1115 {
1116         int nids_added = 0;
1117         lnet_nid_t nid;
1118         char *ptr;
1119         int rc = 0;
1120
1121         if (mrd->state == REPLACE_UUID &&
1122             lcfg->lcfg_command == LCFG_ADD_UUID) {
1123                 /* LCFG_ADD_UUID command found. Let's skip original command
1124                    and add passed nids */
1125                 ptr = mrd->target.mti_params;
1126                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1127                         if (!mrd->nodeuuid) {
1128                                 rc = name_create(&mrd->nodeuuid,
1129                                                  libcfs_nid2str(nid), "");
1130                                 if (rc) {
1131                                         CERROR("Can't create uuid for "
1132                                                 "nid  %s, device %s\n",
1133                                                 libcfs_nid2str(nid),
1134                                                 mrd->target.mti_svname);
1135                                         return rc;
1136                                 }
1137                         }
1138                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
1139                                "device %s\n", libcfs_nid2str(nid),
1140                                 mrd->target.mti_params,
1141                                 mrd->nodeuuid);
1142                         rc = record_add_uuid(env,
1143                                              mrd->temp_llh, nid,
1144                                              mrd->nodeuuid);
1145                         if (!rc)
1146                                 nids_added++;
1147
1148                         if (*ptr == ':') {
1149                                 mrd->failover = ptr;
1150                                 break;
1151                         }
1152                 }
1153
1154                 if (nids_added == 0) {
1155                         CERROR("No new nids were added, nid %s with uuid %s, "
1156                                "device %s\n", libcfs_nid2str(nid),
1157                                mrd->nodeuuid ? mrd->nodeuuid : "NULL",
1158                                mrd->target.mti_svname);
1159                         name_destroy(&mrd->nodeuuid);
1160                         return -ENXIO;
1161                 } else {
1162                         mrd->state = REPLACE_SETUP;
1163                 }
1164
1165                 return nids_added;
1166         }
1167
1168         if (mrd->state == REPLACE_SETUP && lcfg->lcfg_command == LCFG_SETUP) {
1169                 /* LCFG_SETUP command found. UUID should be changed */
1170                 rc = record_setup(env,
1171                                   mrd->temp_llh,
1172                                   /* devname the same */
1173                                   lustre_cfg_string(lcfg, 0),
1174                                   /* s1 is not changed */
1175                                   lustre_cfg_string(lcfg, 1),
1176                                   mrd->nodeuuid,
1177                                   /* s3 is not changed */
1178                                   lustre_cfg_string(lcfg, 3),
1179                                   /* s4 is not changed */
1180                                   lustre_cfg_string(lcfg, 4));
1181
1182                 name_destroy(&mrd->nodeuuid);
1183                 if (rc)
1184                         return rc;
1185
1186                 if (mrd->failover) {
1187                         ptr = mrd->failover;
1188                         while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1189                                 if (mrd->nodeuuid == NULL) {
1190                                         rc =  name_create(&mrd->nodeuuid,
1191                                                           libcfs_nid2str(nid),
1192                                                           "");
1193                                         if (rc)
1194                                                 return rc;
1195                                 }
1196
1197                                 CDEBUG(D_MGS, "add nid %s for failover %s\n",
1198                                        libcfs_nid2str(nid), mrd->nodeuuid);
1199                                 rc = record_add_uuid(env, mrd->temp_llh, nid,
1200                                                      mrd->nodeuuid);
1201                                 if (rc) {
1202                                         name_destroy(&mrd->nodeuuid);
1203                                         return rc;
1204                                 }
1205                                 if (*ptr == ':') {
1206                                         rc = record_add_conn(env,
1207                                                 mrd->temp_llh,
1208                                                 lustre_cfg_string(lcfg, 0),
1209                                                 mrd->nodeuuid);
1210                                         name_destroy(&mrd->nodeuuid);
1211                                         if (rc)
1212                                                 return rc;
1213                                 }
1214                         }
1215                         if (mrd->nodeuuid) {
1216                                 rc = record_add_conn(env, mrd->temp_llh,
1217                                                      lustre_cfg_string(lcfg, 0),
1218                                                      mrd->nodeuuid);
1219                                 name_destroy(&mrd->nodeuuid);
1220                                 if (rc)
1221                                         return rc;
1222                         }
1223                 }
1224                 mrd->state = REPLACE_DONE;
1225                 return rc ? rc : 1;
1226         }
1227
1228         /* Another commands in target device block */
1229         return 0;
1230 }
1231
1232 /**
1233  * Handler that called for every record in llog.
1234  * Records are processed in order they placed in llog.
1235  *
1236  * \param[in] llh       log to be processed
1237  * \param[in] rec       current record
1238  * \param[in] data      mgs_replace_data structure
1239  *
1240  * \retval 0    success
1241  */
1242 static int mgs_replace_nids_handler(const struct lu_env *env,
1243                                     struct llog_handle *llh,
1244                                     struct llog_rec_hdr *rec,
1245                                     void *data)
1246 {
1247         struct mgs_replace_data *mrd;
1248         struct lustre_cfg *lcfg = REC_DATA(rec);
1249         int cfg_len = REC_DATA_LEN(rec);
1250         int rc;
1251         ENTRY;
1252
1253         mrd = (struct mgs_replace_data *)data;
1254
1255         if (rec->lrh_type != OBD_CFG_REC) {
1256                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
1257                        rec->lrh_type, lcfg->lcfg_command,
1258                        lustre_cfg_string(lcfg, 0),
1259                        lustre_cfg_string(lcfg, 1));
1260                 RETURN(-EINVAL);
1261         }
1262
1263         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1264         if (rc) {
1265                 /* Do not copy any invalidated records */
1266                 GOTO(skip_out, rc = 0);
1267         }
1268
1269         rc = check_markers(lcfg, mrd);
1270         if (rc || mrd->state == REPLACE_SKIP)
1271                 GOTO(skip_out, rc = 0);
1272
1273         /* Write to new log all commands outside target device block */
1274         if (mrd->state == REPLACE_COPY)
1275                 GOTO(copy_out, rc = 0);
1276
1277         if (mrd->state == REPLACE_DONE &&
1278             (lcfg->lcfg_command == LCFG_ADD_UUID ||
1279              lcfg->lcfg_command == LCFG_ADD_CONN)) {
1280                 if (!mrd->failover)
1281                         CWARN("Previous failover is deleted, but new one is "
1282                               "not set. This means you configure system "
1283                               "without failover or passed wrong replace_nids "
1284                               "command parameters. Device %s, passed nids %s\n",
1285                               mrd->target.mti_svname, mrd->target.mti_params);
1286                 GOTO(skip_out, rc = 0);
1287         }
1288
1289         rc = process_command(env, lcfg, mrd);
1290         if (rc < 0)
1291                 RETURN(rc);
1292
1293         if (rc)
1294                 RETURN(0);
1295 copy_out:
1296         /* Record is placed in temporary llog as is */
1297         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1298
1299         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1300                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1301                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1302         RETURN(rc);
1303
1304 skip_out:
1305         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1306                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1307                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1308         RETURN(rc);
1309 }
1310
1311 static int mgs_log_is_empty(const struct lu_env *env,
1312                             struct mgs_device *mgs, char *name)
1313 {
1314         struct llog_ctxt        *ctxt;
1315         int                      rc;
1316
1317         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1318         LASSERT(ctxt != NULL);
1319
1320         rc = llog_is_empty(env, ctxt, name);
1321         llog_ctxt_put(ctxt);
1322         return rc;
1323 }
1324
1325 static int mgs_replace_log(const struct lu_env *env,
1326                            struct obd_device *mgs,
1327                            char *logname, char *devname,
1328                            llog_cb_t replace_handler, void *data)
1329 {
1330         struct llog_handle *orig_llh, *backup_llh;
1331         struct llog_ctxt *ctxt;
1332         struct mgs_replace_data *mrd;
1333         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1334         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1335         char *backup;
1336         int rc, rc2, buf_size;
1337         time64_t now;
1338         ENTRY;
1339
1340         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1341         LASSERT(ctxt != NULL);
1342
1343         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1344                 /* Log is empty. Nothing to replace */
1345                 GOTO(out_put, rc = 0);
1346         }
1347
1348         now = ktime_get_real_seconds();
1349
1350         /* max time64_t in decimal fits into 20 bytes long string */
1351         buf_size = strlen(logname) + 1 + 20 + 1 + strlen(".bak") + 1;
1352         OBD_ALLOC(backup, buf_size);
1353         if (backup == NULL)
1354                 GOTO(out_put, rc = -ENOMEM);
1355
1356         snprintf(backup, buf_size, "%s.%llu.bak", logname, now);
1357
1358         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1359         if (rc == 0) {
1360                 /* Now erase original log file. Connections are not allowed.
1361                    Backup is already saved */
1362                 rc = llog_erase(env, ctxt, NULL, logname);
1363                 if (rc < 0)
1364                         GOTO(out_free, rc);
1365         } else if (rc != -ENOENT) {
1366                 CERROR("%s: can't make backup for %s: rc = %d\n",
1367                        mgs->obd_name, logname, rc);
1368                 GOTO(out_free,rc);
1369         }
1370
1371         /* open local log */
1372         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1373         if (rc)
1374                 GOTO(out_restore, rc);
1375
1376         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1377         if (rc)
1378                 GOTO(out_closel, rc);
1379
1380         /* open backup llog */
1381         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1382                        LLOG_OPEN_EXISTS);
1383         if (rc)
1384                 GOTO(out_closel, rc);
1385
1386         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1387         if (rc)
1388                 GOTO(out_close, rc);
1389
1390         if (llog_get_size(backup_llh) <= 1)
1391                 GOTO(out_close, rc = 0);
1392
1393         OBD_ALLOC_PTR(mrd);
1394         if (!mrd)
1395                 GOTO(out_close, rc = -ENOMEM);
1396         /* devname is only needed information to replace UUID records */
1397         if (devname)
1398                 strlcpy(mrd->target.mti_svname, devname,
1399                         sizeof(mrd->target.mti_svname));
1400         /* data is parsed in llog callback */
1401         if (data)
1402                 strlcpy(mrd->target.mti_params, data,
1403                         sizeof(mrd->target.mti_params));
1404         /* Copy records to this temporary llog */
1405         mrd->temp_llh = orig_llh;
1406
1407         rc = llog_process(env, backup_llh, replace_handler,
1408                           (void *)mrd, NULL);
1409         OBD_FREE_PTR(mrd);
1410 out_close:
1411         rc2 = llog_close(NULL, backup_llh);
1412         if (!rc)
1413                 rc = rc2;
1414 out_closel:
1415         rc2 = llog_close(NULL, orig_llh);
1416         if (!rc)
1417                 rc = rc2;
1418
1419 out_restore:
1420         if (rc) {
1421                 CERROR("%s: llog should be restored: rc = %d\n",
1422                        mgs->obd_name, rc);
1423                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1424                                   logname);
1425                 if (rc2 < 0)
1426                         CERROR("%s: can't restore backup %s: rc = %d\n",
1427                                mgs->obd_name, logname, rc2);
1428         }
1429
1430 out_free:
1431         OBD_FREE(backup, buf_size);
1432
1433 out_put:
1434         llog_ctxt_put(ctxt);
1435
1436         if (rc)
1437                 CERROR("%s: failed to replace log %s: rc = %d\n",
1438                        mgs->obd_name, logname, rc);
1439
1440         RETURN(rc);
1441 }
1442
1443 static int mgs_replace_nids_log(const struct lu_env *env,
1444                                 struct obd_device *obd,
1445                                 char *logname, char *devname, char *nids)
1446 {
1447         CDEBUG(D_MGS, "Replace NIDs for %s in %s\n", devname, logname);
1448         return mgs_replace_log(env, obd, logname, devname,
1449                                mgs_replace_nids_handler, nids);
1450 }
1451
1452 /**
1453  * Parse device name and get file system name and/or device index
1454  *
1455  * @devname     device name (ex. lustre-MDT0000)
1456  * @fsname      file system name extracted from @devname and returned
1457  *              to the caller (optional)
1458  * @index       device index extracted from @devname and returned to
1459  *              the caller (optional)
1460  *
1461  * RETURN       0                       success if we are only interested in
1462  *                                      extracting fsname from devname.
1463  *                                      i.e index is NULL
1464  *
1465  *              LDD_F_SV_TYPE_*         Besides extracting the fsname the
1466  *                                      user also wants the index. Report to
1467  *                                      the user the type of obd device the
1468  *                                      returned index belongs too.
1469  *
1470  *              -EINVAL                 The obd device name is improper so
1471  *                                      fsname could not be extracted.
1472  *
1473  *              -ENXIO                  Failed to extract the index out of
1474  *                                      the obd device name. Most likely an
1475  *                                      invalid obd device name
1476  */
1477 static int mgs_parse_devname(char *devname, char *fsname, u32 *index)
1478 {
1479         int rc = 0;
1480         ENTRY;
1481
1482         /* Extract fsname */
1483         if (fsname) {
1484                 rc = server_name2fsname(devname, fsname, NULL);
1485                 if (rc < 0) {
1486                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1487                                devname);
1488                         RETURN(-EINVAL);
1489                 }
1490         }
1491
1492         if (index) {
1493                 rc = server_name2index(devname, index, NULL);
1494                 if (rc < 0) {
1495                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1496                                devname);
1497                         RETURN(-ENXIO);
1498                 }
1499         }
1500
1501         /* server_name2index can return LDD_F_SV_TYPE_* so always return rc */
1502         RETURN(rc);
1503 }
1504
1505 /* This is only called during replace_nids */
1506 static int only_mgs_is_running(struct obd_device *mgs_obd)
1507 {
1508         /* TDB: Is global variable with devices count exists? */
1509         int num_devices = get_devices_count();
1510         int num_exports = 0;
1511         struct obd_export *exp;
1512
1513         spin_lock(&mgs_obd->obd_dev_lock);
1514         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1515                 /* skip self export */
1516                 if (exp == mgs_obd->obd_self_export)
1517                         continue;
1518                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1519                         continue;
1520
1521                 ++num_exports;
1522
1523                 CERROR("%s: node %s still connected during replace_nids "
1524                        "connect_flags:%llx\n",
1525                        mgs_obd->obd_name,
1526                        libcfs_nid2str(exp->exp_nid_stats->nid),
1527                        exp_connect_flags(exp));
1528
1529         }
1530         spin_unlock(&mgs_obd->obd_dev_lock);
1531
1532         /* osd, MGS and MGC + self_export
1533            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1534         return (num_devices <= 3) && (num_exports == 0);
1535 }
1536
1537 static int name_create_mdt(char **logname, char *fsname, int i)
1538 {
1539         char mdt_index[9];
1540
1541         sprintf(mdt_index, "-MDT%04x", i);
1542         return name_create(logname, fsname, mdt_index);
1543 }
1544
1545 /**
1546  * Replace nids for \a device to \a nids values
1547  *
1548  * \param obd           MGS obd device
1549  * \param devname       nids need to be replaced for this device
1550  * (ex. lustre-OST0000)
1551  * \param nids          nids list (ex. nid1,nid2,nid3)
1552  *
1553  * \retval 0    success
1554  */
1555 int mgs_replace_nids(const struct lu_env *env,
1556                      struct mgs_device *mgs,
1557                      char *devname, char *nids)
1558 {
1559         /* Assume fsname is part of device name */
1560         char fsname[MTI_NAME_MAXLEN];
1561         int rc;
1562         __u32 index;
1563         char *logname;
1564         struct fs_db *fsdb = NULL;
1565         unsigned int i;
1566         int conn_state;
1567         struct obd_device *mgs_obd = mgs->mgs_obd;
1568         ENTRY;
1569
1570         /* We can only change NIDs if no other nodes are connected */
1571         spin_lock(&mgs_obd->obd_dev_lock);
1572         conn_state = mgs_obd->obd_no_conn;
1573         mgs_obd->obd_no_conn = 1;
1574         spin_unlock(&mgs_obd->obd_dev_lock);
1575
1576         /* We can not change nids if not only MGS is started */
1577         if (!only_mgs_is_running(mgs_obd)) {
1578                 CERROR("Only MGS is allowed to be started\n");
1579                 GOTO(out, rc = -EINPROGRESS);
1580         }
1581
1582         /* Get fsname and index */
1583         rc = mgs_parse_devname(devname, fsname, &index);
1584         if (rc < 0)
1585                 GOTO(out, rc);
1586
1587         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1588         if (rc) {
1589                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1590                 GOTO(out, rc);
1591         }
1592
1593         /* Process client llogs */
1594         name_create(&logname, fsname, "-client");
1595         rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1596         name_destroy(&logname);
1597         if (rc) {
1598                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1599                        fsname, devname, rc);
1600                 GOTO(out, rc);
1601         }
1602
1603         /* Process MDT llogs */
1604         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1605                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1606                         continue;
1607                 name_create_mdt(&logname, fsname, i);
1608                 rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1609                 name_destroy(&logname);
1610                 if (rc)
1611                         GOTO(out, rc);
1612         }
1613
1614 out:
1615         spin_lock(&mgs_obd->obd_dev_lock);
1616         mgs_obd->obd_no_conn = conn_state;
1617         spin_unlock(&mgs_obd->obd_dev_lock);
1618
1619         if (fsdb)
1620                 mgs_put_fsdb(mgs, fsdb);
1621
1622         RETURN(rc);
1623 }
1624
1625 /**
1626  * This is called for every record in llog. Some of records are
1627  * skipped, others are copied to new log as is.
1628  * Records to be skipped are
1629  *  marker records marked SKIP
1630  *  records enclosed between SKIP markers
1631  *
1632  * \param[in] llh       log to be processed
1633  * \param[in] rec       current record
1634  * \param[in] data      mgs_replace_data structure
1635  *
1636  * \retval 0    success
1637  **/
1638 static int mgs_clear_config_handler(const struct lu_env *env,
1639                                     struct llog_handle *llh,
1640                                     struct llog_rec_hdr *rec, void *data)
1641 {
1642         struct mgs_replace_data *mrd;
1643         struct lustre_cfg *lcfg = REC_DATA(rec);
1644         int cfg_len = REC_DATA_LEN(rec);
1645         int rc;
1646
1647         ENTRY;
1648
1649         mrd = (struct mgs_replace_data *)data;
1650
1651         if (rec->lrh_type != OBD_CFG_REC) {
1652                 CDEBUG(D_MGS, "Config llog Name=%s, Record Index=%u, "
1653                        "Unhandled Record Type=%#x\n", llh->lgh_name,
1654                        rec->lrh_index, rec->lrh_type);
1655                 RETURN(-EINVAL);
1656         }
1657
1658         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1659         if (rc) {
1660                 CDEBUG(D_MGS, "Config llog Name=%s, Invalid config file.",
1661                        llh->lgh_name);
1662                 RETURN(-EINVAL);
1663         }
1664
1665         if (lcfg->lcfg_command == LCFG_MARKER) {
1666                 struct cfg_marker *marker;
1667
1668                 marker = lustre_cfg_buf(lcfg, 1);
1669                 if (marker->cm_flags & CM_SKIP) {
1670                         if (marker->cm_flags & CM_START)
1671                                 mrd->state = REPLACE_SKIP;
1672                         if (marker->cm_flags & CM_END)
1673                                 mrd->state = REPLACE_COPY;
1674                         /* SKIP section started or finished */
1675                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1676                                "cmd %x %s %s\n", rec->lrh_index, rc,
1677                                rec->lrh_len, lcfg->lcfg_command,
1678                                lustre_cfg_string(lcfg, 0),
1679                                lustre_cfg_string(lcfg, 1));
1680                         RETURN(0);
1681                 }
1682         } else {
1683                 if (mrd->state == REPLACE_SKIP) {
1684                         /* record enclosed between SKIP markers, skip it */
1685                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1686                                "cmd %x %s %s\n", rec->lrh_index, rc,
1687                                rec->lrh_len, lcfg->lcfg_command,
1688                                lustre_cfg_string(lcfg, 0),
1689                                lustre_cfg_string(lcfg, 1));
1690                         RETURN(0);
1691                 }
1692         }
1693
1694         /* Record is placed in temporary llog as is */
1695         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1696
1697         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1698                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1699                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1700         RETURN(rc);
1701 }
1702
1703 /*
1704  * Directory CONFIGS/ may contain files which are not config logs to
1705  * be cleared. Skip any llogs with a non-alphanumeric character after
1706  * the last '-'. For example, fsname-MDT0000.sav, fsname-MDT0000.bak,
1707  * fsname-MDT0000.orig, fsname-MDT0000~, fsname-MDT0000.20150516, etc.
1708  */
1709 static bool config_to_clear(const char *logname)
1710 {
1711         int i;
1712         char *str;
1713
1714         str = strrchr(logname, '-');
1715         if (!str)
1716                 return 0;
1717
1718         i = 0;
1719         while (isalnum(str[++i]));
1720         return str[i] == '\0';
1721 }
1722
1723 /**
1724  * Clear config logs for \a name
1725  *
1726  * \param env
1727  * \param mgs           MGS device
1728  * \param name          name of device or of filesystem
1729  *                      (ex. lustre-OST0000 or lustre) in later case all logs
1730  *                      will be cleared
1731  *
1732  * \retval 0            success
1733  */
1734 int mgs_clear_configs(const struct lu_env *env,
1735                      struct mgs_device *mgs, const char *name)
1736 {
1737         struct list_head dentry_list;
1738         struct mgs_direntry *dirent, *n;
1739         char *namedash;
1740         int conn_state;
1741         struct obd_device *mgs_obd = mgs->mgs_obd;
1742         int rc;
1743
1744         ENTRY;
1745
1746         /* Prevent clients and servers from connecting to mgs */
1747         spin_lock(&mgs_obd->obd_dev_lock);
1748         conn_state = mgs_obd->obd_no_conn;
1749         mgs_obd->obd_no_conn = 1;
1750         spin_unlock(&mgs_obd->obd_dev_lock);
1751
1752         /*
1753          * config logs cannot be cleaned if anything other than
1754          * MGS is started
1755          */
1756         if (!only_mgs_is_running(mgs_obd)) {
1757                 CERROR("Only MGS is allowed to be started\n");
1758                 GOTO(out, rc = -EBUSY);
1759         }
1760
1761         /* Find all the logs in the CONFIGS directory */
1762         rc = class_dentry_readdir(env, mgs, &dentry_list);
1763         if (rc) {
1764                 CERROR("%s: cannot read config directory '%s': rc = %d\n",
1765                        mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
1766                 GOTO(out, rc);
1767         }
1768
1769         if (list_empty(&dentry_list)) {
1770                 CERROR("%s: list empty reading config dir '%s': rc = %d\n",
1771                         mgs_obd->obd_name, MOUNT_CONFIGS_DIR, -ENOENT);
1772                 GOTO(out, rc = -ENOENT);
1773         }
1774
1775         OBD_ALLOC(namedash, strlen(name) + 2);
1776         if (namedash == NULL)
1777                 GOTO(out, rc = -ENOMEM);
1778         snprintf(namedash, strlen(name) + 2, "%s-", name);
1779
1780         list_for_each_entry(dirent, &dentry_list, mde_list) {
1781                 if (strcmp(name, dirent->mde_name) &&
1782                     strncmp(namedash, dirent->mde_name, strlen(namedash)))
1783                         continue;
1784                 if (!config_to_clear(dirent->mde_name))
1785                         continue;
1786                 CDEBUG(D_MGS, "%s: Clear config log %s\n",
1787                        mgs_obd->obd_name, dirent->mde_name);
1788                 rc = mgs_replace_log(env, mgs_obd, dirent->mde_name, NULL,
1789                                      mgs_clear_config_handler, NULL);
1790                 if (rc)
1791                         break;
1792         }
1793
1794         list_for_each_entry_safe(dirent, n, &dentry_list, mde_list) {
1795                 list_del_init(&dirent->mde_list);
1796                 mgs_direntry_free(dirent);
1797         }
1798         OBD_FREE(namedash, strlen(name) + 2);
1799 out:
1800         spin_lock(&mgs_obd->obd_dev_lock);
1801         mgs_obd->obd_no_conn = conn_state;
1802         spin_unlock(&mgs_obd->obd_dev_lock);
1803
1804         RETURN(rc);
1805 }
1806
1807 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1808                             char *devname, struct lov_desc *desc)
1809 {
1810         struct mgs_thread_info  *mgi = mgs_env_info(env);
1811         struct llog_cfg_rec     *lcr;
1812         int rc;
1813
1814         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1815         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1816         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1817         if (lcr == NULL)
1818                 return -ENOMEM;
1819
1820         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1821         lustre_cfg_rec_free(lcr);
1822         return rc;
1823 }
1824
1825 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1826                             char *devname, struct lmv_desc *desc)
1827 {
1828         struct mgs_thread_info  *mgi = mgs_env_info(env);
1829         struct llog_cfg_rec     *lcr;
1830         int rc;
1831
1832         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1833         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1834         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1835         if (lcr == NULL)
1836                 return -ENOMEM;
1837
1838         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1839         lustre_cfg_rec_free(lcr);
1840         return rc;
1841 }
1842
1843 static inline int record_mdc_add(const struct lu_env *env,
1844                                  struct llog_handle *llh,
1845                                  char *logname, char *mdcuuid,
1846                                  char *mdtuuid, char *index,
1847                                  char *gen)
1848 {
1849         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1850                            mdtuuid,index,gen,mdcuuid);
1851 }
1852
1853 static inline int record_lov_add(const struct lu_env *env,
1854                                  struct llog_handle *llh,
1855                                  char *lov_name, char *ost_uuid,
1856                                  char *index, char *gen)
1857 {
1858         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1859                            ost_uuid, index, gen, NULL);
1860 }
1861
1862 static inline int record_mount_opt(const struct lu_env *env,
1863                                    struct llog_handle *llh,
1864                                    char *profile, char *lov_name,
1865                                    char *mdc_name)
1866 {
1867         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1868                            profile, lov_name, mdc_name, NULL);
1869 }
1870
1871 static int record_marker(const struct lu_env *env,
1872                          struct llog_handle *llh,
1873                          struct fs_db *fsdb, __u32 flags,
1874                          char *tgtname, char *comment)
1875 {
1876         struct mgs_thread_info *mgi = mgs_env_info(env);
1877         struct llog_cfg_rec *lcr;
1878         int rc;
1879         int cplen = 0;
1880
1881         if (flags & CM_START)
1882                 fsdb->fsdb_gen++;
1883         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1884         mgi->mgi_marker.cm_flags = flags;
1885         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1886         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1887                         sizeof(mgi->mgi_marker.cm_tgtname));
1888         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1889                 return -E2BIG;
1890         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1891                         sizeof(mgi->mgi_marker.cm_comment));
1892         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1893                 return -E2BIG;
1894         mgi->mgi_marker.cm_createtime = ktime_get_real_seconds();
1895         mgi->mgi_marker.cm_canceltime = 0;
1896         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1897         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1898                             sizeof(mgi->mgi_marker));
1899         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1900         if (lcr == NULL)
1901                 return -ENOMEM;
1902
1903         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1904         lustre_cfg_rec_free(lcr);
1905         return rc;
1906 }
1907
1908 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1909                             struct llog_handle **llh, char *name)
1910 {
1911         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1912         struct llog_ctxt        *ctxt;
1913         int                      rc = 0;
1914         ENTRY;
1915
1916         if (*llh)
1917                 GOTO(out, rc = -EBUSY);
1918
1919         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1920         if (!ctxt)
1921                 GOTO(out, rc = -ENODEV);
1922         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1923
1924         rc = llog_open_create(env, ctxt, llh, NULL, name);
1925         if (rc)
1926                 GOTO(out_ctxt, rc);
1927         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1928         if (rc)
1929                 llog_close(env, *llh);
1930 out_ctxt:
1931         llog_ctxt_put(ctxt);
1932 out:
1933         if (rc) {
1934                 CERROR("%s: can't start log %s: rc = %d\n",
1935                        mgs->mgs_obd->obd_name, name, rc);
1936                 *llh = NULL;
1937         }
1938         RETURN(rc);
1939 }
1940
1941 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1942 {
1943         int rc;
1944
1945         rc = llog_close(env, *llh);
1946         *llh = NULL;
1947
1948         return rc;
1949 }
1950
1951 /******************** config "macros" *********************/
1952
1953 /* write an lcfg directly into a log (with markers) */
1954 static int mgs_write_log_direct(const struct lu_env *env,
1955                                 struct mgs_device *mgs, struct fs_db *fsdb,
1956                                 char *logname, struct llog_cfg_rec *lcr,
1957                                 char *devname, char *comment)
1958 {
1959         struct llog_handle *llh = NULL;
1960         int rc;
1961
1962         ENTRY;
1963
1964         rc = record_start_log(env, mgs, &llh, logname);
1965         if (rc)
1966                 RETURN(rc);
1967
1968         /* FIXME These should be a single journal transaction */
1969         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1970         if (rc)
1971                 GOTO(out_end, rc);
1972         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1973         if (rc)
1974                 GOTO(out_end, rc);
1975         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1976         if (rc)
1977                 GOTO(out_end, rc);
1978 out_end:
1979         record_end_log(env, &llh);
1980         RETURN(rc);
1981 }
1982
1983 /* write the lcfg in all logs for the given fs */
1984 static int mgs_write_log_direct_all(const struct lu_env *env,
1985                                     struct mgs_device *mgs,
1986                                     struct fs_db *fsdb,
1987                                     struct mgs_target_info *mti,
1988                                     struct llog_cfg_rec *lcr, char *devname,
1989                                     char *comment, int server_only)
1990 {
1991         struct list_head         log_list;
1992         struct mgs_direntry     *dirent, *n;
1993         char                    *fsname = mti->mti_fsname;
1994         int                      rc = 0, len = strlen(fsname);
1995
1996         ENTRY;
1997         /* Find all the logs in the CONFIGS directory */
1998         rc = class_dentry_readdir(env, mgs, &log_list);
1999         if (rc)
2000                 RETURN(rc);
2001
2002         /* Could use fsdb index maps instead of directory listing */
2003         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
2004                 list_del_init(&dirent->mde_list);
2005                 /* don't write to sptlrpc rule log */
2006                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
2007                         goto next;
2008
2009                 /* caller wants write server logs only */
2010                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
2011                         goto next;
2012
2013                 if (strlen(dirent->mde_name) <= len ||
2014                     strncmp(fsname, dirent->mde_name, len) != 0 ||
2015                     dirent->mde_name[len] != '-')
2016                         goto next;
2017
2018                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
2019                 /* Erase any old settings of this same parameter */
2020                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
2021                                 devname, comment, CM_SKIP);
2022                 if (rc < 0)
2023                         CERROR("%s: Can't modify llog %s: rc = %d\n",
2024                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2025                 if (lcr == NULL)
2026                         goto next;
2027                 /* Write the new one */
2028                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
2029                                           lcr, devname, comment);
2030                 if (rc != 0)
2031                         CERROR("%s: writing log %s: rc = %d\n",
2032                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2033 next:
2034                 mgs_direntry_free(dirent);
2035         }
2036
2037         RETURN(rc);
2038 }
2039
2040 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2041                                     struct mgs_device *mgs,
2042                                     struct fs_db *fsdb,
2043                                     struct mgs_target_info *mti,
2044                                     int index, char *logname);
2045 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2046                                     struct mgs_device *mgs,
2047                                     struct fs_db *fsdb,
2048                                     struct mgs_target_info *mti,
2049                                     char *logname, char *suffix, char *lovname,
2050                                     enum lustre_sec_part sec_part, int flags);
2051 static int name_create_mdt_and_lov(char **logname, char **lovname,
2052                                    struct fs_db *fsdb, int i);
2053
2054 static int add_param(char *params, char *key, char *val)
2055 {
2056         char *start = params + strlen(params);
2057         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
2058         int keylen = 0;
2059
2060         if (key != NULL)
2061                 keylen = strlen(key);
2062         if (start + 1 + keylen + strlen(val) >= end) {
2063                 CERROR("params are too long: %s %s%s\n",
2064                        params, key != NULL ? key : "", val);
2065                 return -EINVAL;
2066         }
2067
2068         sprintf(start, " %s%s", key != NULL ? key : "", val);
2069         return 0;
2070 }
2071
2072 /**
2073  * Walk through client config log record and convert the related records
2074  * into the target.
2075  **/
2076 static int mgs_steal_client_llog_handler(const struct lu_env *env,
2077                                          struct llog_handle *llh,
2078                                          struct llog_rec_hdr *rec, void *data)
2079 {
2080         struct mgs_device *mgs;
2081         struct obd_device *obd;
2082         struct mgs_target_info *mti, *tmti;
2083         struct fs_db *fsdb;
2084         int cfg_len = rec->lrh_len;
2085         char *cfg_buf = (char*) (rec + 1);
2086         struct lustre_cfg *lcfg;
2087         int rc = 0;
2088         struct llog_handle *mdt_llh = NULL;
2089         static int got_an_osc_or_mdc = 0;
2090         /* 0: not found any osc/mdc;
2091            1: found osc;
2092            2: found mdc;
2093         */
2094         static int last_step = -1;
2095         int cplen = 0;
2096
2097         ENTRY;
2098
2099         mti = ((struct temp_comp*)data)->comp_mti;
2100         tmti = ((struct temp_comp*)data)->comp_tmti;
2101         fsdb = ((struct temp_comp*)data)->comp_fsdb;
2102         obd = ((struct temp_comp *)data)->comp_obd;
2103         mgs = lu2mgs_dev(obd->obd_lu_dev);
2104         LASSERT(mgs);
2105
2106         if (rec->lrh_type != OBD_CFG_REC) {
2107                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2108                 RETURN(-EINVAL);
2109         }
2110
2111         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
2112         if (rc) {
2113                 CERROR("Insane cfg\n");
2114                 RETURN(rc);
2115         }
2116
2117         lcfg = (struct lustre_cfg *)cfg_buf;
2118
2119         if (lcfg->lcfg_command == LCFG_MARKER) {
2120                 struct cfg_marker *marker;
2121                 marker = lustre_cfg_buf(lcfg, 1);
2122                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2123                     (marker->cm_flags & CM_START) &&
2124                      !(marker->cm_flags & CM_SKIP)) {
2125                         got_an_osc_or_mdc = 1;
2126                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
2127                                         sizeof(tmti->mti_svname));
2128                         if (cplen >= sizeof(tmti->mti_svname))
2129                                 RETURN(-E2BIG);
2130                         rc = record_start_log(env, mgs, &mdt_llh,
2131                                               mti->mti_svname);
2132                         if (rc)
2133                                 RETURN(rc);
2134                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
2135                                            mti->mti_svname, "add osc(copied)");
2136                         record_end_log(env, &mdt_llh);
2137                         last_step = marker->cm_step;
2138                         RETURN(rc);
2139                 }
2140                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2141                     (marker->cm_flags & CM_END) &&
2142                      !(marker->cm_flags & CM_SKIP)) {
2143                         LASSERT(last_step == marker->cm_step);
2144                         last_step = -1;
2145                         got_an_osc_or_mdc = 0;
2146                         memset(tmti, 0, sizeof(*tmti));
2147                         rc = record_start_log(env, mgs, &mdt_llh,
2148                                               mti->mti_svname);
2149                         if (rc)
2150                                 RETURN(rc);
2151                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
2152                                            mti->mti_svname, "add osc(copied)");
2153                         record_end_log(env, &mdt_llh);
2154                         RETURN(rc);
2155                 }
2156                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2157                     (marker->cm_flags & CM_START) &&
2158                      !(marker->cm_flags & CM_SKIP)) {
2159                         got_an_osc_or_mdc = 2;
2160                         last_step = marker->cm_step;
2161                         memcpy(tmti->mti_svname, marker->cm_tgtname,
2162                                strlen(marker->cm_tgtname));
2163
2164                         RETURN(rc);
2165                 }
2166                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2167                     (marker->cm_flags & CM_END) &&
2168                      !(marker->cm_flags & CM_SKIP)) {
2169                         LASSERT(last_step == marker->cm_step);
2170                         last_step = -1;
2171                         got_an_osc_or_mdc = 0;
2172                         memset(tmti, 0, sizeof(*tmti));
2173                         RETURN(rc);
2174                 }
2175         }
2176
2177         if (got_an_osc_or_mdc == 0 || last_step < 0)
2178                 RETURN(rc);
2179
2180         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
2181                 __u64 nodenid = lcfg->lcfg_nid;
2182
2183                 if (strlen(tmti->mti_uuid) == 0) {
2184                         /* target uuid not set, this config record is before
2185                          * LCFG_SETUP, this nid is one of target node nid.
2186                          */
2187                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
2188                         tmti->mti_nid_count++;
2189                 } else {
2190                         char nidstr[LNET_NIDSTR_SIZE];
2191
2192                         /* failover node nid */
2193                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
2194                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
2195                                         nidstr);
2196                 }
2197
2198                 RETURN(rc);
2199         }
2200
2201         if (lcfg->lcfg_command == LCFG_SETUP) {
2202                 char *target;
2203
2204                 target = lustre_cfg_string(lcfg, 1);
2205                 memcpy(tmti->mti_uuid, target, strlen(target));
2206                 RETURN(rc);
2207         }
2208
2209         /* ignore client side sptlrpc_conf_log */
2210         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
2211                 RETURN(rc);
2212
2213         if (lcfg->lcfg_command == LCFG_ADD_MDC &&
2214             strstr(lustre_cfg_string(lcfg, 0), "-clilmv") != NULL) {
2215                 int index;
2216
2217                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
2218                         RETURN (-EINVAL);
2219
2220                 memcpy(tmti->mti_fsname, mti->mti_fsname,
2221                        strlen(mti->mti_fsname));
2222                 tmti->mti_stripe_index = index;
2223
2224                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
2225                                               mti->mti_stripe_index,
2226                                               mti->mti_svname);
2227                 memset(tmti, 0, sizeof(*tmti));
2228                 RETURN(rc);
2229         }
2230
2231         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
2232                 int index;
2233                 char mdt_index[9];
2234                 char *logname, *lovname;
2235
2236                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2237                                              mti->mti_stripe_index);
2238                 if (rc)
2239                         RETURN(rc);
2240                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
2241
2242                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
2243                         name_destroy(&logname);
2244                         name_destroy(&lovname);
2245                         RETURN(-EINVAL);
2246                 }
2247
2248                 tmti->mti_stripe_index = index;
2249                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
2250                                          mdt_index, lovname,
2251                                          LUSTRE_SP_MDT, 0);
2252                 name_destroy(&logname);
2253                 name_destroy(&lovname);
2254                 RETURN(rc);
2255         }
2256         RETURN(rc);
2257 }
2258
2259 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
2260 /* stealed from mgs_get_fsdb_from_llog*/
2261 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
2262                                               struct mgs_device *mgs,
2263                                               char *client_name,
2264                                               struct temp_comp* comp)
2265 {
2266         struct llog_handle *loghandle;
2267         struct mgs_target_info *tmti;
2268         struct llog_ctxt *ctxt;
2269         int rc;
2270
2271         ENTRY;
2272
2273         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2274         LASSERT(ctxt != NULL);
2275
2276         OBD_ALLOC_PTR(tmti);
2277         if (tmti == NULL)
2278                 GOTO(out_ctxt, rc = -ENOMEM);
2279
2280         comp->comp_tmti = tmti;
2281         comp->comp_obd = mgs->mgs_obd;
2282
2283         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
2284                        LLOG_OPEN_EXISTS);
2285         if (rc < 0) {
2286                 if (rc == -ENOENT)
2287                         rc = 0;
2288                 GOTO(out_pop, rc);
2289         }
2290
2291         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
2292         if (rc)
2293                 GOTO(out_close, rc);
2294
2295         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
2296                                   (void *)comp, NULL, false);
2297         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
2298 out_close:
2299         llog_close(env, loghandle);
2300 out_pop:
2301         OBD_FREE_PTR(tmti);
2302 out_ctxt:
2303         llog_ctxt_put(ctxt);
2304         RETURN(rc);
2305 }
2306
2307 /* lmv is the second thing for client logs */
2308 /* copied from mgs_write_log_lov. Please refer to that.  */
2309 static int mgs_write_log_lmv(const struct lu_env *env,
2310                              struct mgs_device *mgs,
2311                              struct fs_db *fsdb,
2312                              struct mgs_target_info *mti,
2313                              char *logname, char *lmvname)
2314 {
2315         struct llog_handle *llh = NULL;
2316         struct lmv_desc *lmvdesc;
2317         char *uuid;
2318         int rc = 0;
2319         ENTRY;
2320
2321         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
2322
2323         OBD_ALLOC_PTR(lmvdesc);
2324         if (lmvdesc == NULL)
2325                 RETURN(-ENOMEM);
2326         lmvdesc->ld_active_tgt_count = 0;
2327         lmvdesc->ld_tgt_count = 0;
2328         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
2329         uuid = (char *)lmvdesc->ld_uuid.uuid;
2330
2331         rc = record_start_log(env, mgs, &llh, logname);
2332         if (rc)
2333                 GOTO(out_free, rc);
2334         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
2335         if (rc)
2336                 GOTO(out_end, rc);
2337         rc = record_attach(env, llh, lmvname, "lmv", uuid);
2338         if (rc)
2339                 GOTO(out_end, rc);
2340         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
2341         if (rc)
2342                 GOTO(out_end, rc);
2343         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
2344         if (rc)
2345                 GOTO(out_end, rc);
2346 out_end:
2347         record_end_log(env, &llh);
2348 out_free:
2349         OBD_FREE_PTR(lmvdesc);
2350         RETURN(rc);
2351 }
2352
2353 /* lov is the first thing in the mdt and client logs */
2354 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
2355                              struct fs_db *fsdb, struct mgs_target_info *mti,
2356                              char *logname, char *lovname)
2357 {
2358         struct llog_handle *llh = NULL;
2359         struct lov_desc *lovdesc;
2360         char *uuid;
2361         int rc = 0;
2362         ENTRY;
2363
2364         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
2365
2366         /*
2367         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
2368         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
2369               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
2370         */
2371
2372         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
2373         OBD_ALLOC_PTR(lovdesc);
2374         if (lovdesc == NULL)
2375                 RETURN(-ENOMEM);
2376         lovdesc->ld_magic = LOV_DESC_MAGIC;
2377         lovdesc->ld_tgt_count = 0;
2378         /* Defaults.  Can be changed later by lcfg config_param */
2379         lovdesc->ld_default_stripe_count = 1;
2380         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
2381         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
2382         lovdesc->ld_default_stripe_offset = -1;
2383         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
2384         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
2385         /* can these be the same? */
2386         uuid = (char *)lovdesc->ld_uuid.uuid;
2387
2388         /* This should always be the first entry in a log.
2389         rc = mgs_clear_log(obd, logname); */
2390         rc = record_start_log(env, mgs, &llh, logname);
2391         if (rc)
2392                 GOTO(out_free, rc);
2393         /* FIXME these should be a single journal transaction */
2394         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
2395         if (rc)
2396                 GOTO(out_end, rc);
2397         rc = record_attach(env, llh, lovname, "lov", uuid);
2398         if (rc)
2399                 GOTO(out_end, rc);
2400         rc = record_lov_setup(env, llh, lovname, lovdesc);
2401         if (rc)
2402                 GOTO(out_end, rc);
2403         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
2404         if (rc)
2405                 GOTO(out_end, rc);
2406         EXIT;
2407 out_end:
2408         record_end_log(env, &llh);
2409 out_free:
2410         OBD_FREE_PTR(lovdesc);
2411         return rc;
2412 }
2413
2414 /* add failnids to open log */
2415 static int mgs_write_log_failnids(const struct lu_env *env,
2416                                   struct mgs_target_info *mti,
2417                                   struct llog_handle *llh,
2418                                   char *cliname)
2419 {
2420         char *failnodeuuid = NULL;
2421         char *ptr = mti->mti_params;
2422         lnet_nid_t nid;
2423         int rc = 0;
2424
2425         /*
2426         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
2427         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
2428         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
2429         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
2430         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
2431         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
2432         */
2433
2434         /*
2435          * Pull failnid info out of params string, which may contain something
2436          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
2437          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
2438          * etc.  However, convert_hostnames() should have caught those.
2439          */
2440         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2441                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2442                         char nidstr[LNET_NIDSTR_SIZE];
2443
2444                         if (failnodeuuid == NULL) {
2445                                 /* We don't know the failover node name,
2446                                  * so just use the first nid as the uuid */
2447                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
2448                                 rc = name_create(&failnodeuuid, nidstr, "");
2449                                 if (rc != 0)
2450                                         return rc;
2451                         }
2452                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2453                                 "client %s\n",
2454                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
2455                                 failnodeuuid, cliname);
2456                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2457                         /*
2458                          * If *ptr is ':', we have added all NIDs for
2459                          * failnodeuuid.
2460                          */
2461                         if (*ptr == ':') {
2462                                 rc = record_add_conn(env, llh, cliname,
2463                                                      failnodeuuid);
2464                                 name_destroy(&failnodeuuid);
2465                                 failnodeuuid = NULL;
2466                         }
2467                 }
2468                 if (failnodeuuid) {
2469                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2470                         name_destroy(&failnodeuuid);
2471                         failnodeuuid = NULL;
2472                 }
2473         }
2474
2475         return rc;
2476 }
2477
2478 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2479                                     struct mgs_device *mgs,
2480                                     struct fs_db *fsdb,
2481                                     struct mgs_target_info *mti,
2482                                     char *logname, char *lmvname)
2483 {
2484         struct llog_handle *llh = NULL;
2485         char *mdcname = NULL;
2486         char *nodeuuid = NULL;
2487         char *mdcuuid = NULL;
2488         char *lmvuuid = NULL;
2489         char index[6];
2490         char nidstr[LNET_NIDSTR_SIZE];
2491         int i, rc;
2492         ENTRY;
2493
2494         if (mgs_log_is_empty(env, mgs, logname)) {
2495                 CERROR("log is empty! Logical error\n");
2496                 RETURN(-EINVAL);
2497         }
2498
2499         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2500                mti->mti_svname, logname, lmvname);
2501
2502         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2503         rc = name_create(&nodeuuid, nidstr, "");
2504         if (rc)
2505                 RETURN(rc);
2506         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2507         if (rc)
2508                 GOTO(out_free, rc);
2509         rc = name_create(&mdcuuid, mdcname, "_UUID");
2510         if (rc)
2511                 GOTO(out_free, rc);
2512         rc = name_create(&lmvuuid, lmvname, "_UUID");
2513         if (rc)
2514                 GOTO(out_free, rc);
2515
2516         rc = record_start_log(env, mgs, &llh, logname);
2517         if (rc)
2518                 GOTO(out_free, rc);
2519         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2520                            "add mdc");
2521         if (rc)
2522                 GOTO(out_end, rc);
2523         for (i = 0; i < mti->mti_nid_count; i++) {
2524                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2525                         libcfs_nid2str_r(mti->mti_nids[i],
2526                                          nidstr, sizeof(nidstr)));
2527
2528                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2529                 if (rc)
2530                         GOTO(out_end, rc);
2531         }
2532
2533         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2534         if (rc)
2535                 GOTO(out_end, rc);
2536         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2537                           NULL, NULL);
2538         if (rc)
2539                 GOTO(out_end, rc);
2540         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2541         if (rc)
2542                 GOTO(out_end, rc);
2543         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2544         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2545                             index, "1");
2546         if (rc)
2547                 GOTO(out_end, rc);
2548         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2549                            "add mdc");
2550         if (rc)
2551                 GOTO(out_end, rc);
2552 out_end:
2553         record_end_log(env, &llh);
2554 out_free:
2555         name_destroy(&lmvuuid);
2556         name_destroy(&mdcuuid);
2557         name_destroy(&mdcname);
2558         name_destroy(&nodeuuid);
2559         RETURN(rc);
2560 }
2561
2562 static inline int name_create_lov(char **lovname, char *mdtname,
2563                                   struct fs_db *fsdb, int index)
2564 {
2565         /* COMPAT_180 */
2566         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2567                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2568         else
2569                 return name_create(lovname, mdtname, "-mdtlov");
2570 }
2571
2572 static int name_create_mdt_and_lov(char **logname, char **lovname,
2573                                    struct fs_db *fsdb, int i)
2574 {
2575         int rc;
2576
2577         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2578         if (rc)
2579                 return rc;
2580         /* COMPAT_180 */
2581         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2582                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2583         else
2584                 rc = name_create(lovname, *logname, "-mdtlov");
2585         if (rc) {
2586                 name_destroy(logname);
2587                 *logname = NULL;
2588         }
2589         return rc;
2590 }
2591
2592 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2593                                       struct fs_db *fsdb, int i)
2594 {
2595         char suffix[16];
2596
2597         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2598                 sprintf(suffix, "-osc");
2599         else
2600                 sprintf(suffix, "-osc-MDT%04x", i);
2601         return name_create(oscname, ostname, suffix);
2602 }
2603
2604 /* add new mdc to already existent MDS */
2605 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2606                                     struct mgs_device *mgs,
2607                                     struct fs_db *fsdb,
2608                                     struct mgs_target_info *mti,
2609                                     int mdt_index, char *logname)
2610 {
2611         struct llog_handle      *llh = NULL;
2612         char    *nodeuuid = NULL;
2613         char    *ospname = NULL;
2614         char    *lovuuid = NULL;
2615         char    *mdtuuid = NULL;
2616         char    *svname = NULL;
2617         char    *mdtname = NULL;
2618         char    *lovname = NULL;
2619         char    index_str[16];
2620         char    nidstr[LNET_NIDSTR_SIZE];
2621         int     i, rc;
2622
2623         ENTRY;
2624         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2625                 CERROR("log is empty! Logical error\n");
2626                 RETURN (-EINVAL);
2627         }
2628
2629         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2630                logname);
2631
2632         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2633         if (rc)
2634                 RETURN(rc);
2635
2636         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2637         rc = name_create(&nodeuuid, nidstr, "");
2638         if (rc)
2639                 GOTO(out_destory, rc);
2640
2641         rc = name_create(&svname, mdtname, "-osp");
2642         if (rc)
2643                 GOTO(out_destory, rc);
2644
2645         sprintf(index_str, "-MDT%04x", mdt_index);
2646         rc = name_create(&ospname, svname, index_str);
2647         if (rc)
2648                 GOTO(out_destory, rc);
2649
2650         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2651         if (rc)
2652                 GOTO(out_destory, rc);
2653
2654         rc = name_create(&lovuuid, lovname, "_UUID");
2655         if (rc)
2656                 GOTO(out_destory, rc);
2657
2658         rc = name_create(&mdtuuid, mdtname, "_UUID");
2659         if (rc)
2660                 GOTO(out_destory, rc);
2661
2662         rc = record_start_log(env, mgs, &llh, logname);
2663         if (rc)
2664                 GOTO(out_destory, rc);
2665
2666         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2667                            "add osp");
2668         if (rc)
2669                 GOTO(out_destory, rc);
2670
2671         for (i = 0; i < mti->mti_nid_count; i++) {
2672                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2673                         libcfs_nid2str_r(mti->mti_nids[i],
2674                                          nidstr, sizeof(nidstr)));
2675                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2676                 if (rc)
2677                         GOTO(out_end, rc);
2678         }
2679
2680         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2681         if (rc)
2682                 GOTO(out_end, rc);
2683
2684         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2685                           NULL, NULL);
2686         if (rc)
2687                 GOTO(out_end, rc);
2688
2689         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2690         if (rc)
2691                 GOTO(out_end, rc);
2692
2693         /* Add mdc(osp) to lod */
2694         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2695         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2696                          index_str, "1", NULL);
2697         if (rc)
2698                 GOTO(out_end, rc);
2699
2700         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2701         if (rc)
2702                 GOTO(out_end, rc);
2703
2704 out_end:
2705         record_end_log(env, &llh);
2706
2707 out_destory:
2708         name_destroy(&mdtuuid);
2709         name_destroy(&lovuuid);
2710         name_destroy(&lovname);
2711         name_destroy(&ospname);
2712         name_destroy(&svname);
2713         name_destroy(&nodeuuid);
2714         name_destroy(&mdtname);
2715         RETURN(rc);
2716 }
2717
2718 static int mgs_write_log_mdt0(const struct lu_env *env,
2719                               struct mgs_device *mgs,
2720                               struct fs_db *fsdb,
2721                               struct mgs_target_info *mti)
2722 {
2723         char *log = mti->mti_svname;
2724         struct llog_handle *llh = NULL;
2725         char *uuid, *lovname;
2726         char mdt_index[6];
2727         char *ptr = mti->mti_params;
2728         int rc = 0, failout = 0;
2729         ENTRY;
2730
2731         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2732         if (uuid == NULL)
2733                 RETURN(-ENOMEM);
2734
2735         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2736                 failout = (strncmp(ptr, "failout", 7) == 0);
2737
2738         rc = name_create(&lovname, log, "-mdtlov");
2739         if (rc)
2740                 GOTO(out_free, rc);
2741         if (mgs_log_is_empty(env, mgs, log)) {
2742                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2743                 if (rc)
2744                         GOTO(out_lod, rc);
2745         }
2746
2747         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2748
2749         rc = record_start_log(env, mgs, &llh, log);
2750         if (rc)
2751                 GOTO(out_lod, rc);
2752
2753         /* add MDT itself */
2754
2755         /* FIXME this whole fn should be a single journal transaction */
2756         sprintf(uuid, "%s_UUID", log);
2757         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2758         if (rc)
2759                 GOTO(out_lod, rc);
2760         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2761         if (rc)
2762                 GOTO(out_end, rc);
2763         rc = record_mount_opt(env, llh, log, lovname, NULL);
2764         if (rc)
2765                 GOTO(out_end, rc);
2766         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2767                         failout ? "n" : "f");
2768         if (rc)
2769                 GOTO(out_end, rc);
2770         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2771         if (rc)
2772                 GOTO(out_end, rc);
2773 out_end:
2774         record_end_log(env, &llh);
2775 out_lod:
2776         name_destroy(&lovname);
2777 out_free:
2778         OBD_FREE(uuid, sizeof(struct obd_uuid));
2779         RETURN(rc);
2780 }
2781
2782 /* envelope method for all layers log */
2783 static int mgs_write_log_mdt(const struct lu_env *env,
2784                              struct mgs_device *mgs,
2785                              struct fs_db *fsdb,
2786                              struct mgs_target_info *mti)
2787 {
2788         struct mgs_thread_info *mgi = mgs_env_info(env);
2789         struct llog_handle *llh = NULL;
2790         char *cliname;
2791         int rc, i = 0;
2792         ENTRY;
2793
2794         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2795
2796         if (mti->mti_uuid[0] == '\0') {
2797                 /* Make up our own uuid */
2798                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2799                          "%s_UUID", mti->mti_svname);
2800         }
2801
2802         /* add mdt */
2803         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2804         if (rc)
2805                 RETURN(rc);
2806         /* Append the mdt info to the client log */
2807         rc = name_create(&cliname, mti->mti_fsname, "-client");
2808         if (rc)
2809                 RETURN(rc);
2810
2811         if (mgs_log_is_empty(env, mgs, cliname)) {
2812                 /* Start client log */
2813                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2814                                        fsdb->fsdb_clilov);
2815                 if (rc)
2816                         GOTO(out_free, rc);
2817                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2818                                        fsdb->fsdb_clilmv);
2819                 if (rc)
2820                         GOTO(out_free, rc);
2821         }
2822
2823         /*
2824         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2825         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2826         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2827         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2828         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2829         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2830         */
2831
2832         /* copy client info about lov/lmv */
2833         mgi->mgi_comp.comp_mti = mti;
2834         mgi->mgi_comp.comp_fsdb = fsdb;
2835
2836         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2837                                                 &mgi->mgi_comp);
2838         if (rc)
2839                 GOTO(out_free, rc);
2840         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2841                                       fsdb->fsdb_clilmv);
2842         if (rc)
2843                 GOTO(out_free, rc);
2844
2845         /* add mountopts */
2846         rc = record_start_log(env, mgs, &llh, cliname);
2847         if (rc)
2848                 GOTO(out_free, rc);
2849
2850         rc = record_marker(env, llh, fsdb, CM_START, cliname, "mount opts");
2851         if (rc)
2852                 GOTO(out_end, rc);
2853         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2854                               fsdb->fsdb_clilmv);
2855         if (rc)
2856                 GOTO(out_end, rc);
2857         rc = record_marker(env, llh, fsdb, CM_END, cliname, "mount opts");
2858         if (rc)
2859                 GOTO(out_end, rc);
2860
2861         /* for_all_existing_mdt except current one */
2862         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2863                 if (i !=  mti->mti_stripe_index &&
2864                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2865                         char *logname;
2866
2867                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2868                         if (rc)
2869                                 GOTO(out_end, rc);
2870
2871                         /* NB: If the log for the MDT is empty, it means
2872                          * the MDT is only added to the index
2873                          * map, and not being process yet, i.e. this
2874                          * is an unregistered MDT, see mgs_write_log_target().
2875                          * so we should skip it. Otherwise
2876                          *
2877                          * 1. MGS get register request for MDT1 and MDT2.
2878                          *
2879                          * 2. Then both MDT1 and MDT2 are added into
2880                          * fsdb_mdt_index_map. (see mgs_set_index()).
2881                          *
2882                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2883                          * generate the config log, here, it will regard MDT2
2884                          * as an existent MDT, and generate "add osp" for
2885                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2886                          * MDT0002 config log is still empty, so it will
2887                          * add "add osp" even before "lov setup", which
2888                          * will definitly cause trouble.
2889                          *
2890                          * 4. MDT1 registeration finished, fsdb_mutex is
2891                          * released, then MDT2 get in, then in above
2892                          * mgs_steal_llog_for_mdt_from_client(), it will
2893                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2894                          * which will cause another trouble.*/
2895                         if (!mgs_log_is_empty(env, mgs, logname))
2896                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2897                                                               mti, i, logname);
2898
2899                         name_destroy(&logname);
2900                         if (rc)
2901                                 GOTO(out_end, rc);
2902                 }
2903         }
2904 out_end:
2905         record_end_log(env, &llh);
2906 out_free:
2907         name_destroy(&cliname);
2908         RETURN(rc);
2909 }
2910
2911 /* Add the ost info to the client/mdt lov */
2912 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2913                                     struct mgs_device *mgs, struct fs_db *fsdb,
2914                                     struct mgs_target_info *mti,
2915                                     char *logname, char *suffix, char *lovname,
2916                                     enum lustre_sec_part sec_part, int flags)
2917 {
2918         struct llog_handle *llh = NULL;
2919         char *nodeuuid = NULL;
2920         char *oscname = NULL;
2921         char *oscuuid = NULL;
2922         char *lovuuid = NULL;
2923         char *svname = NULL;
2924         char index[6];
2925         char nidstr[LNET_NIDSTR_SIZE];
2926         int i, rc;
2927         ENTRY;
2928
2929         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2930                 mti->mti_svname, logname);
2931
2932         if (mgs_log_is_empty(env, mgs, logname)) {
2933                 CERROR("log is empty! Logical error\n");
2934                 RETURN(-EINVAL);
2935         }
2936
2937         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2938         rc = name_create(&nodeuuid, nidstr, "");
2939         if (rc)
2940                 RETURN(rc);
2941         rc = name_create(&svname, mti->mti_svname, "-osc");
2942         if (rc)
2943                 GOTO(out_free, rc);
2944
2945         /* for the system upgraded from old 1.8, keep using the old osc naming
2946          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2947         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2948                 rc = name_create(&oscname, svname, "");
2949         else
2950                 rc = name_create(&oscname, svname, suffix);
2951         if (rc)
2952                 GOTO(out_free, rc);
2953
2954         rc = name_create(&oscuuid, oscname, "_UUID");
2955         if (rc)
2956                 GOTO(out_free, rc);
2957         rc = name_create(&lovuuid, lovname, "_UUID");
2958         if (rc)
2959                 GOTO(out_free, rc);
2960
2961
2962         /*
2963         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2964         multihomed (#4)
2965         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2966         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2967         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2968         failover (#6,7)
2969         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2970         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2971         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2972         */
2973
2974         rc = record_start_log(env, mgs, &llh, logname);
2975         if (rc)
2976                 GOTO(out_free, rc);
2977
2978         /* FIXME these should be a single journal transaction */
2979         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2980                            "add osc");
2981         if (rc)
2982                 GOTO(out_end, rc);
2983
2984         /* NB: don't change record order, because upon MDT steal OSC config
2985          * from client, it treats all nids before LCFG_SETUP as target nids
2986          * (multiple interfaces), while nids after as failover node nids.
2987          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2988          */
2989         for (i = 0; i < mti->mti_nid_count; i++) {
2990                 CDEBUG(D_MGS, "add nid %s\n",
2991                         libcfs_nid2str_r(mti->mti_nids[i],
2992                                          nidstr, sizeof(nidstr)));
2993                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2994                 if (rc)
2995                         GOTO(out_end, rc);
2996         }
2997         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2998         if (rc)
2999                 GOTO(out_end, rc);
3000         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
3001                           NULL, NULL);
3002         if (rc)
3003                 GOTO(out_end, rc);
3004         rc = mgs_write_log_failnids(env, mti, llh, oscname);
3005         if (rc)
3006                 GOTO(out_end, rc);
3007
3008         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
3009
3010         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
3011         if (rc)
3012                 GOTO(out_end, rc);
3013         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
3014                            "add osc");
3015         if (rc)
3016                 GOTO(out_end, rc);
3017 out_end:
3018         record_end_log(env, &llh);
3019 out_free:
3020         name_destroy(&lovuuid);
3021         name_destroy(&oscuuid);
3022         name_destroy(&oscname);
3023         name_destroy(&svname);
3024         name_destroy(&nodeuuid);
3025         RETURN(rc);
3026 }
3027
3028 static int mgs_write_log_ost(const struct lu_env *env,
3029                              struct mgs_device *mgs, struct fs_db *fsdb,
3030                              struct mgs_target_info *mti)
3031 {
3032         struct llog_handle *llh = NULL;
3033         char *logname, *lovname;
3034         char *ptr = mti->mti_params;
3035         int rc, flags = 0, failout = 0, i;
3036         ENTRY;
3037
3038         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
3039
3040         /* The ost startup log */
3041
3042         /* If the ost log already exists, that means that someone reformatted
3043            the ost and it called target_add again. */
3044         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3045                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
3046                                    "exists, yet the server claims it never "
3047                                    "registered. It may have been reformatted, "
3048                                    "or the index changed. writeconf the MDT to "
3049                                    "regenerate all logs.\n", mti->mti_svname);
3050                 RETURN(-EALREADY);
3051         }
3052
3053         /*
3054         attach obdfilter ost1 ost1_UUID
3055         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
3056         */
3057         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
3058                 failout = (strncmp(ptr, "failout", 7) == 0);
3059         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
3060         if (rc)
3061                 RETURN(rc);
3062         /* FIXME these should be a single journal transaction */
3063         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
3064         if (rc)
3065                 GOTO(out_end, rc);
3066         if (*mti->mti_uuid == '\0')
3067                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
3068                          "%s_UUID", mti->mti_svname);
3069         rc = record_attach(env, llh, mti->mti_svname,
3070                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
3071         if (rc)
3072                 GOTO(out_end, rc);
3073         rc = record_setup(env, llh, mti->mti_svname,
3074                           "dev"/*ignored*/, "type"/*ignored*/,
3075                           failout ? "n" : "f", NULL/*options*/);
3076         if (rc)
3077                 GOTO(out_end, rc);
3078         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
3079         if (rc)
3080                 GOTO(out_end, rc);
3081 out_end:
3082         record_end_log(env, &llh);
3083         if (rc)
3084                 RETURN(rc);
3085         /* We also have to update the other logs where this osc is part of
3086            the lov */
3087
3088         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3089                 /* If we're upgrading, the old mdt log already has our
3090                    entry. Let's do a fake one for fun. */
3091                 /* Note that we can't add any new failnids, since we don't
3092                    know the old osc names. */
3093                 flags = CM_SKIP | CM_UPGRADE146;
3094
3095         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
3096                 /* If the update flag isn't set, don't update client/mdt
3097                    logs. */
3098                 flags |= CM_SKIP;
3099                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
3100                               "the MDT first to regenerate it.\n",
3101                               mti->mti_svname);
3102         }
3103
3104         /* Add ost to all MDT lov defs */
3105         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3106                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
3107                         char mdt_index[13];
3108
3109                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
3110                                                      i);
3111                         if (rc)
3112                                 RETURN(rc);
3113
3114                         snprintf(mdt_index, sizeof(mdt_index), "-MDT%04x", i);
3115                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
3116                                                       logname, mdt_index,
3117                                                       lovname, LUSTRE_SP_MDT,
3118                                                       flags);
3119                         name_destroy(&logname);
3120                         name_destroy(&lovname);
3121                         if (rc)
3122                                 RETURN(rc);
3123                 }
3124         }
3125
3126         /* Append ost info to the client log */
3127         rc = name_create(&logname, mti->mti_fsname, "-client");
3128         if (rc)
3129                 RETURN(rc);
3130         if (mgs_log_is_empty(env, mgs, logname)) {
3131                 /* Start client log */
3132                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
3133                                        fsdb->fsdb_clilov);
3134                 if (rc)
3135                         GOTO(out_free, rc);
3136                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
3137                                        fsdb->fsdb_clilmv);
3138                 if (rc)
3139                         GOTO(out_free, rc);
3140         }
3141         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
3142                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
3143 out_free:
3144         name_destroy(&logname);
3145         RETURN(rc);
3146 }
3147
3148 static __inline__ int mgs_param_empty(char *ptr)
3149 {
3150         char *tmp;
3151
3152         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
3153                 return 1;
3154         return 0;
3155 }
3156
3157 static int mgs_write_log_failnid_internal(const struct lu_env *env,
3158                                           struct mgs_device *mgs,
3159                                           struct fs_db *fsdb,
3160                                           struct mgs_target_info *mti,
3161                                           char *logname, char *cliname)
3162 {
3163         int rc;
3164         struct llog_handle *llh = NULL;
3165
3166         if (mgs_param_empty(mti->mti_params)) {
3167                 /* Remove _all_ failnids */
3168                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3169                                 mti->mti_svname, "add failnid", CM_SKIP);
3170                 return rc < 0 ? rc : 0;
3171         }
3172
3173         /* Otherwise failover nids are additive */
3174         rc = record_start_log(env, mgs, &llh, logname);
3175         if (rc)
3176                 return rc;
3177                 /* FIXME this should be a single journal transaction */
3178         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
3179                            "add failnid");
3180         if (rc)
3181                 goto out_end;
3182         rc = mgs_write_log_failnids(env, mti, llh, cliname);
3183         if (rc)
3184                 goto out_end;
3185         rc = record_marker(env, llh, fsdb, CM_END,
3186                            mti->mti_svname, "add failnid");
3187 out_end:
3188         record_end_log(env, &llh);
3189         return rc;
3190 }
3191
3192
3193 /* Add additional failnids to an existing log.
3194    The mdc/osc must have been added to logs first */
3195 /* tcp nids must be in dotted-quad ascii -
3196    we can't resolve hostnames from the kernel. */
3197 static int mgs_write_log_add_failnid(const struct lu_env *env,
3198                                      struct mgs_device *mgs,
3199                                      struct fs_db *fsdb,
3200                                      struct mgs_target_info *mti)
3201 {
3202         char *logname, *cliname;
3203         int rc;
3204         ENTRY;
3205
3206         /* FIXME we currently can't erase the failnids
3207          * given when a target first registers, since they aren't part of
3208          * an "add uuid" stanza
3209          */
3210
3211         /* Verify that we know about this target */
3212         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3213                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
3214                                    "yet. It must be started before failnids "
3215                                    "can be added.\n", mti->mti_svname);
3216                 RETURN(-ENOENT);
3217         }
3218
3219         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
3220         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3221                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
3222         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3223                 rc = name_create(&cliname, mti->mti_svname, "-osc");
3224         } else {
3225                 RETURN(-EINVAL);
3226         }
3227         if (rc)
3228                 RETURN(rc);
3229
3230         /* Add failover nids to the client log */
3231         rc = name_create(&logname, mti->mti_fsname, "-client");
3232         if (rc) {
3233                 name_destroy(&cliname);
3234                 RETURN(rc);
3235         }
3236
3237         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
3238         name_destroy(&logname);
3239         name_destroy(&cliname);
3240         if (rc)
3241                 RETURN(rc);
3242
3243         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3244                 /* Add OST failover nids to the MDT logs as well */
3245                 int i;
3246
3247                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3248                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3249                                 continue;
3250                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3251                         if (rc)
3252                                 RETURN(rc);
3253                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
3254                                                  fsdb, i);
3255                         if (rc) {
3256                                 name_destroy(&logname);
3257                                 RETURN(rc);
3258                         }
3259                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
3260                                                             mti, logname,
3261                                                             cliname);
3262                         name_destroy(&cliname);
3263                         name_destroy(&logname);
3264                         if (rc)
3265                                 RETURN(rc);
3266                 }
3267         }
3268
3269         RETURN(rc);
3270 }
3271
3272 static int mgs_wlp_lcfg(const struct lu_env *env,
3273                         struct mgs_device *mgs, struct fs_db *fsdb,
3274                         struct mgs_target_info *mti,
3275                         char *logname, struct lustre_cfg_bufs *bufs,
3276                         char *tgtname, char *ptr)
3277 {
3278         char comment[MTI_NAME_MAXLEN];
3279         char *tmp;
3280         struct llog_cfg_rec *lcr;
3281         int rc, del;
3282
3283         /* Erase any old settings of this same parameter */
3284         memcpy(comment, ptr, MTI_NAME_MAXLEN);
3285         comment[MTI_NAME_MAXLEN - 1] = 0;
3286         /* But don't try to match the value. */
3287         tmp = strchr(comment, '=');
3288         if (tmp != NULL)
3289                 *tmp = 0;
3290         /* FIXME we should skip settings that are the same as old values */
3291         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
3292         if (rc < 0)
3293                 return rc;
3294         del = mgs_param_empty(ptr);
3295
3296         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
3297                       "Setting" : "Modifying", tgtname, comment, logname);
3298         if (del) {
3299                 /* mgs_modify() will return 1 if nothing had to be done */
3300                 if (rc == 1)
3301                         rc = 0;
3302                 return rc;
3303         }
3304
3305         lustre_cfg_bufs_reset(bufs, tgtname);
3306         lustre_cfg_bufs_set_string(bufs, 1, ptr);
3307         if (mti->mti_flags & LDD_F_PARAM2)
3308                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
3309
3310         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
3311                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
3312         if (lcr == NULL)
3313                 return -ENOMEM;
3314
3315         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
3316                                   comment);
3317         lustre_cfg_rec_free(lcr);
3318         return rc;
3319 }
3320
3321 /* write global variable settings into log */
3322 static int mgs_write_log_sys(const struct lu_env *env,
3323                              struct mgs_device *mgs, struct fs_db *fsdb,
3324                              struct mgs_target_info *mti, char *sys, char *ptr)
3325 {
3326         struct mgs_thread_info  *mgi = mgs_env_info(env);
3327         struct lustre_cfg       *lcfg;
3328         struct llog_cfg_rec     *lcr;
3329         char *tmp, sep;
3330         int rc, cmd, convert = 1;
3331
3332         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
3333                 cmd = LCFG_SET_TIMEOUT;
3334         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
3335                 cmd = LCFG_SET_LDLM_TIMEOUT;
3336         /* Check for known params here so we can return error to lctl */
3337         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
3338                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
3339                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
3340                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
3341                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
3342                 cmd = LCFG_PARAM;
3343         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
3344                 convert = 0; /* Don't convert string value to integer */
3345                 cmd = LCFG_PARAM;
3346         } else {
3347                 return -EINVAL;
3348         }
3349
3350         if (mgs_param_empty(ptr))
3351                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
3352         else
3353                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
3354
3355         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
3356         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
3357         if (!convert && *tmp != '\0')
3358                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
3359         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3360         if (lcr == NULL)
3361                 return -ENOMEM;
3362
3363         lcfg = &lcr->lcr_cfg;
3364         if (convert) {
3365                 rc = kstrtouint(tmp, 0, &lcfg->lcfg_num);
3366                 if (rc)
3367                         GOTO(out_rec_free, rc);
3368         } else {
3369                 lcfg->lcfg_num = 0;
3370         }
3371
3372         /* truncate the comment to the parameter name */
3373         ptr = tmp - 1;
3374         sep = *ptr;
3375         *ptr = '\0';
3376         /* modify all servers and clients */
3377         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3378                                       *tmp == '\0' ? NULL : lcr,
3379                                       mti->mti_fsname, sys, 0);
3380         if (rc == 0 && *tmp != '\0') {
3381                 switch (cmd) {
3382                 case LCFG_SET_TIMEOUT:
3383                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
3384                                 class_process_config(lcfg);
3385                         break;
3386                 case LCFG_SET_LDLM_TIMEOUT:
3387                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
3388                                 class_process_config(lcfg);
3389                         break;
3390                 default:
3391                         break;
3392                 }
3393         }
3394         *ptr = sep;
3395 out_rec_free:
3396         lustre_cfg_rec_free(lcr);
3397         return rc;
3398 }
3399
3400 /* write quota settings into log */
3401 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
3402                                struct fs_db *fsdb, struct mgs_target_info *mti,
3403                                char *quota, char *ptr)
3404 {
3405         struct mgs_thread_info  *mgi = mgs_env_info(env);
3406         struct llog_cfg_rec     *lcr;
3407         char                    *tmp;
3408         char                     sep;
3409         int                      rc, cmd = LCFG_PARAM;
3410
3411         /* support only 'meta' and 'data' pools so far */
3412         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
3413             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
3414                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
3415                        "& quota.ost are)\n", ptr);
3416                 return -EINVAL;
3417         }
3418
3419         if (*tmp == '\0') {
3420                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
3421         } else {
3422                 CDEBUG(D_MGS, "global '%s'\n", quota);
3423
3424                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
3425                     strchr(tmp, 'p') == NULL &&
3426                     strcmp(tmp, "none") != 0) {
3427                         CERROR("enable option(%s) isn't supported\n", tmp);
3428                         return -EINVAL;
3429                 }
3430         }
3431
3432         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
3433         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
3434         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3435         if (lcr == NULL)
3436                 return -ENOMEM;
3437
3438         /* truncate the comment to the parameter name */
3439         ptr = tmp - 1;
3440         sep = *ptr;
3441         *ptr = '\0';
3442
3443         /* XXX we duplicated quota enable information in all server
3444          *     config logs, it should be moved to a separate config
3445          *     log once we cleanup the config log for global param. */
3446         /* modify all servers */
3447         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3448                                       *tmp == '\0' ? NULL : lcr,
3449                                       mti->mti_fsname, quota, 1);
3450         *ptr = sep;
3451         lustre_cfg_rec_free(lcr);
3452         return rc < 0 ? rc : 0;
3453 }
3454
3455 static int mgs_srpc_set_param_disk(const struct lu_env *env,
3456                                    struct mgs_device *mgs,
3457                                    struct fs_db *fsdb,
3458                                    struct mgs_target_info *mti,
3459                                    char *param)
3460 {
3461         struct mgs_thread_info  *mgi = mgs_env_info(env);
3462         struct llog_cfg_rec     *lcr;
3463         struct llog_handle      *llh = NULL;
3464         char                    *logname;
3465         char                    *comment, *ptr;
3466         int                      rc, len;
3467
3468         ENTRY;
3469
3470         /* get comment */
3471         ptr = strchr(param, '=');
3472         LASSERT(ptr != NULL);
3473         len = ptr - param;
3474
3475         OBD_ALLOC(comment, len + 1);
3476         if (comment == NULL)
3477                 RETURN(-ENOMEM);
3478         strncpy(comment, param, len);
3479         comment[len] = '\0';
3480
3481         /* prepare lcfg */
3482         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
3483         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
3484         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
3485         if (lcr == NULL)
3486                 GOTO(out_comment, rc = -ENOMEM);
3487
3488         /* construct log name */
3489         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
3490         if (rc < 0)
3491                 GOTO(out_lcfg, rc);
3492
3493         if (mgs_log_is_empty(env, mgs, logname)) {
3494                 rc = record_start_log(env, mgs, &llh, logname);
3495                 if (rc < 0)
3496                         GOTO(out, rc);
3497                 record_end_log(env, &llh);
3498         }
3499
3500         /* obsolete old one */
3501         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
3502                         comment, CM_SKIP);
3503         if (rc < 0)
3504                 GOTO(out, rc);
3505         /* write the new one */
3506         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
3507                                   mti->mti_svname, comment);
3508         if (rc)
3509                 CERROR("%s: error writing log %s: rc = %d\n",
3510                        mgs->mgs_obd->obd_name, logname, rc);
3511 out:
3512         name_destroy(&logname);
3513 out_lcfg:
3514         lustre_cfg_rec_free(lcr);
3515 out_comment:
3516         OBD_FREE(comment, len + 1);
3517         RETURN(rc);
3518 }
3519
3520 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3521                                         char *param)
3522 {
3523         char    *ptr;
3524
3525         /* disable the adjustable udesc parameter for now, i.e. use default
3526          * setting that client always ship udesc to MDT if possible. to enable
3527          * it simply remove the following line */
3528         goto error_out;
3529
3530         ptr = strchr(param, '=');
3531         if (ptr == NULL)
3532                 goto error_out;
3533         *ptr++ = '\0';
3534
3535         if (strcmp(param, PARAM_SRPC_UDESC))
3536                 goto error_out;
3537
3538         if (strcmp(ptr, "yes") == 0) {
3539                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3540                 CWARN("Enable user descriptor shipping from client to MDT\n");
3541         } else if (strcmp(ptr, "no") == 0) {
3542                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3543                 CWARN("Disable user descriptor shipping from client to MDT\n");
3544         } else {
3545                 *(ptr - 1) = '=';
3546                 goto error_out;
3547         }
3548         return 0;
3549
3550 error_out:
3551         CERROR("Invalid param: %s\n", param);
3552         return -EINVAL;
3553 }
3554
3555 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3556                                   const char *svname,
3557                                   char *param)
3558 {
3559         struct sptlrpc_rule      rule;
3560         struct sptlrpc_rule_set *rset;
3561         int                      rc;
3562         ENTRY;
3563
3564         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3565                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3566                 RETURN(-EINVAL);
3567         }
3568
3569         if (strncmp(param, PARAM_SRPC_UDESC,
3570                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3571                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3572         }
3573
3574         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3575                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3576                 RETURN(-EINVAL);
3577         }
3578
3579         param += sizeof(PARAM_SRPC_FLVR) - 1;
3580
3581         rc = sptlrpc_parse_rule(param, &rule);
3582         if (rc)
3583                 RETURN(rc);
3584
3585         /* mgs rules implies must be mgc->mgs */
3586         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3587                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3588                      rule.sr_from != LUSTRE_SP_ANY) ||
3589                     (rule.sr_to != LUSTRE_SP_MGS &&
3590                      rule.sr_to != LUSTRE_SP_ANY))
3591                         RETURN(-EINVAL);
3592         }
3593
3594         /* preapre room for this coming rule. svcname format should be:
3595          * - fsname: general rule
3596          * - fsname-tgtname: target-specific rule
3597          */
3598         if (strchr(svname, '-')) {
3599                 struct mgs_tgt_srpc_conf *tgtconf;
3600                 int                       found = 0;
3601
3602                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3603                      tgtconf = tgtconf->mtsc_next) {
3604                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3605                                 found = 1;
3606                                 break;
3607                         }
3608                 }
3609
3610                 if (!found) {
3611                         int name_len;
3612
3613                         OBD_ALLOC_PTR(tgtconf);
3614                         if (tgtconf == NULL)
3615                                 RETURN(-ENOMEM);
3616
3617                         name_len = strlen(svname);
3618
3619                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3620                         if (tgtconf->mtsc_tgt == NULL) {
3621                                 OBD_FREE_PTR(tgtconf);
3622                                 RETURN(-ENOMEM);
3623                         }
3624                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3625
3626                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3627                         fsdb->fsdb_srpc_tgt = tgtconf;
3628                 }
3629
3630                 rset = &tgtconf->mtsc_rset;
3631         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3632                 /* put _mgs related srpc rule directly in mgs ruleset */
3633                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3634         } else {
3635                 rset = &fsdb->fsdb_srpc_gen;
3636         }
3637
3638         rc = sptlrpc_rule_set_merge(rset, &rule);
3639
3640         RETURN(rc);
3641 }
3642
3643 static int mgs_srpc_set_param(const struct lu_env *env,
3644                               struct mgs_device *mgs,
3645                               struct fs_db *fsdb,
3646                               struct mgs_target_info *mti,
3647                               char *param)
3648 {
3649         char                   *copy;
3650         int                     rc, copy_size;
3651         ENTRY;
3652
3653 #ifndef HAVE_GSS
3654         RETURN(-EINVAL);
3655 #endif
3656         /* keep a copy of original param, which could be destroied
3657          * during parsing */
3658         copy_size = strlen(param) + 1;
3659         OBD_ALLOC(copy, copy_size);
3660         if (copy == NULL)
3661                 return -ENOMEM;
3662         memcpy(copy, param, copy_size);
3663
3664         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3665         if (rc)
3666                 goto out_free;
3667
3668         /* previous steps guaranteed the syntax is correct */
3669         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3670         if (rc)
3671                 goto out_free;
3672
3673         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3674                 /*
3675                  * for mgs rules, make them effective immediately.
3676                  */
3677                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3678                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3679                                                  &fsdb->fsdb_srpc_gen);
3680         }
3681
3682 out_free:
3683         OBD_FREE(copy, copy_size);
3684         RETURN(rc);
3685 }
3686
3687 struct mgs_srpc_read_data {
3688         struct fs_db   *msrd_fsdb;
3689         int             msrd_skip;
3690 };
3691
3692 static int mgs_srpc_read_handler(const struct lu_env *env,
3693                                  struct llog_handle *llh,
3694                                  struct llog_rec_hdr *rec, void *data)
3695 {
3696         struct mgs_srpc_read_data *msrd = data;
3697         struct cfg_marker         *marker;
3698         struct lustre_cfg         *lcfg = REC_DATA(rec);
3699         char                      *svname, *param;
3700         int                        cfg_len, rc;
3701         ENTRY;
3702
3703         if (rec->lrh_type != OBD_CFG_REC) {
3704                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3705                 RETURN(-EINVAL);
3706         }
3707
3708         cfg_len = REC_DATA_LEN(rec);
3709
3710         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3711         if (rc) {
3712                 CERROR("Insane cfg\n");
3713                 RETURN(rc);
3714         }
3715
3716         if (lcfg->lcfg_command == LCFG_MARKER) {
3717                 marker = lustre_cfg_buf(lcfg, 1);
3718
3719                 if (marker->cm_flags & CM_START &&
3720                     marker->cm_flags & CM_SKIP)
3721                         msrd->msrd_skip = 1;
3722                 if (marker->cm_flags & CM_END)
3723                         msrd->msrd_skip = 0;
3724
3725                 RETURN(0);
3726         }
3727
3728         if (msrd->msrd_skip)
3729                 RETURN(0);
3730
3731         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3732                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3733                 RETURN(0);
3734         }
3735
3736         svname = lustre_cfg_string(lcfg, 0);
3737         if (svname == NULL) {
3738                 CERROR("svname is empty\n");
3739                 RETURN(0);
3740         }
3741
3742         param = lustre_cfg_string(lcfg, 1);
3743         if (param == NULL) {
3744                 CERROR("param is empty\n");
3745                 RETURN(0);
3746         }
3747
3748         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3749         if (rc)
3750                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3751
3752         RETURN(0);
3753 }
3754
3755 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3756                                 struct mgs_device *mgs,
3757                                 struct fs_db *fsdb)
3758 {
3759         struct llog_handle        *llh = NULL;
3760         struct llog_ctxt          *ctxt;
3761         char                      *logname;
3762         struct mgs_srpc_read_data  msrd;
3763         int                        rc;
3764         ENTRY;
3765
3766         /* construct log name */
3767         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3768         if (rc)
3769                 RETURN(rc);
3770
3771         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3772         LASSERT(ctxt != NULL);
3773
3774         if (mgs_log_is_empty(env, mgs, logname))
3775                 GOTO(out, rc = 0);
3776
3777         rc = llog_open(env, ctxt, &llh, NULL, logname,
3778                        LLOG_OPEN_EXISTS);
3779         if (rc < 0) {
3780                 if (rc == -ENOENT)
3781                         rc = 0;
3782                 GOTO(out, rc);
3783         }
3784
3785         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3786         if (rc)
3787                 GOTO(out_close, rc);
3788
3789         if (llog_get_size(llh) <= 1)
3790                 GOTO(out_close, rc = 0);
3791
3792         msrd.msrd_fsdb = fsdb;
3793         msrd.msrd_skip = 0;
3794
3795         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3796                           NULL);
3797
3798 out_close:
3799         llog_close(env, llh);
3800 out:
3801         llog_ctxt_put(ctxt);
3802         name_destroy(&logname);
3803
3804         if (rc)
3805                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3806         RETURN(rc);
3807 }
3808
3809 static int mgs_write_log_param2(const struct lu_env *env,
3810                                 struct mgs_device *mgs,
3811                                 struct fs_db *fsdb,
3812                                 struct mgs_target_info *mti, char *ptr)
3813 {
3814         struct lustre_cfg_bufs bufs;
3815         int rc;
3816
3817         ENTRY;
3818         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3819
3820         /* PARAM_MGSNODE and PARAM_NETWORK are set only when formating
3821          * or during the inital mount. It can never change after that.
3822          */
3823         if (!class_match_param(ptr, PARAM_MGSNODE, NULL) ||
3824             !class_match_param(ptr, PARAM_NETWORK, NULL)) {
3825                 rc = 0;
3826                 goto end;
3827         }
3828
3829         /* Processed in mgs_write_log_ost. Another value that can't
3830          * be changed by lctl set_param -P.
3831          */
3832         if (!class_match_param(ptr, PARAM_FAILMODE, NULL)) {
3833                 LCONSOLE_ERROR_MSG(0x169,
3834                                    "%s can only be changed with tunefs.lustre and --writeconf\n",
3835                                    ptr);
3836                 rc = -EPERM;
3837                 goto end;
3838         }
3839
3840         /* FIXME !!! Support for sptlrpc is incomplete. Currently the change
3841          * doesn't transmit to the client. See LU-7183.
3842          */
3843         if (!class_match_param(ptr, PARAM_SRPC, NULL)) {
3844                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3845                 goto end;
3846         }
3847
3848         /* Can't use class_match_param since ptr doesn't start with
3849          * PARAM_FAILNODE. So we look for PARAM_FAILNODE contained in ptr.
3850          */
3851         if (strstr(ptr, PARAM_FAILNODE)) {
3852                 /* Add a failover nidlist. We already processed failovers
3853                  * params for new targets in mgs_write_log_target.
3854                  */
3855                 const char *param;
3856
3857                 /* can't use wildcards with failover.node */
3858                 if (strchr(ptr, '*')) {
3859                         rc = -ENODEV;
3860                         goto end;
3861                 }
3862
3863                 param = strstr(ptr, PARAM_FAILNODE);
3864                 if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
3865                     sizeof(mti->mti_params)) {
3866                         rc = -E2BIG;
3867                         goto end;
3868                 }
3869
3870                 CDEBUG(D_MGS, "Adding failnode with param %s\n",
3871                        mti->mti_params);
3872                 rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3873                 goto end;
3874         }
3875
3876         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
3877                           mti->mti_svname, ptr);
3878 end:
3879         RETURN(rc);
3880 }
3881
3882 /* Permanent settings of all parameters by writing into the appropriate
3883  * configuration logs.
3884  * A parameter with null value ("<param>='\0'") means to erase it out of
3885  * the logs.
3886  */
3887 static int mgs_write_log_param(const struct lu_env *env,
3888                                struct mgs_device *mgs, struct fs_db *fsdb,
3889                                struct mgs_target_info *mti, char *ptr)
3890 {
3891         struct mgs_thread_info *mgi = mgs_env_info(env);
3892         char *logname;
3893         char *tmp;
3894         int rc = 0;
3895         ENTRY;
3896
3897         /* For various parameter settings, we have to figure out which logs
3898            care about them (e.g. both mdt and client for lov settings) */
3899         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3900
3901         /* The params are stored in MOUNT_DATA_FILE and modified via
3902            tunefs.lustre, or set using lctl conf_param */
3903
3904         /* Processed in lustre_start_mgc */
3905         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3906                 GOTO(end, rc);
3907
3908         /* Processed in ost/mdt */
3909         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3910                 GOTO(end, rc);
3911
3912         /* Processed in mgs_write_log_ost */
3913         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3914                 if (mti->mti_flags & LDD_F_PARAM) {
3915                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3916                                            "changed with tunefs.lustre"
3917                                            "and --writeconf\n", ptr);
3918                         rc = -EPERM;
3919                 }
3920                 GOTO(end, rc);
3921         }
3922
3923         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3924                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3925                 GOTO(end, rc);
3926         }
3927
3928         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3929                 /* Add a failover nidlist */
3930                 rc = 0;
3931                 /* We already processed failovers params for new
3932                    targets in mgs_write_log_target */
3933                 if (mti->mti_flags & LDD_F_PARAM) {
3934                         CDEBUG(D_MGS, "Adding failnode\n");
3935                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3936                 }
3937                 GOTO(end, rc);
3938         }
3939
3940         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3941                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3942                 GOTO(end, rc);
3943         }
3944
3945         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3946                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3947                 GOTO(end, rc);
3948         }
3949
3950         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3951             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3952                 /* active=0 means off, anything else means on */
3953                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3954                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3955                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3956                 int i;
3957
3958                 if (!deactive_osc) {
3959                         __u32   index;
3960
3961                         rc = server_name2index(mti->mti_svname, &index, NULL);
3962                         if (rc < 0)
3963                                 GOTO(end, rc);
3964
3965                         if (index == 0) {
3966                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3967                                                    " (de)activated.\n",
3968                                                    mti->mti_svname);
3969                                 GOTO(end, rc = -EPERM);
3970                         }
3971                 }
3972
3973                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3974                               flag ? "de" : "re", mti->mti_svname);
3975                 /* Modify clilov */
3976                 rc = name_create(&logname, mti->mti_fsname, "-client");
3977                 if (rc < 0)
3978                         GOTO(end, rc);
3979                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3980                                 mti->mti_svname,
3981                                 deactive_osc ? "add osc" : "add mdc", flag);
3982                 name_destroy(&logname);
3983                 if (rc < 0)
3984                         goto active_err;
3985
3986                 /* Modify mdtlov */
3987                 /* Add to all MDT logs for DNE */
3988                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3989                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3990                                 continue;
3991                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3992                         if (rc < 0)
3993                                 GOTO(end, rc);
3994                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3995                                         mti->mti_svname,
3996                                         deactive_osc ? "add osc" : "add osp",
3997                                         flag);
3998                         name_destroy(&logname);
3999                         if (rc < 0)
4000                                 goto active_err;
4001                 }
4002 active_err:
4003                 if (rc < 0) {
4004                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
4005                                            "log (%d). No permanent "
4006                                            "changes were made to the "
4007                                            "config log.\n",
4008                                            mti->mti_svname, rc);
4009                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
4010                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
4011                                                    " because the log"
4012                                                    "is in the old 1.4"
4013                                                    "style. Consider "
4014                                                    " --writeconf to "
4015                                                    "update the logs.\n");
4016                         GOTO(end, rc);
4017                 }
4018                 /* Fall through to osc/mdc proc for deactivating live
4019                    OSC/OSP on running MDT / clients. */
4020         }
4021         /* Below here, let obd's XXX_process_config methods handle it */
4022
4023         /* All lov. in proc */
4024         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
4025                 char *mdtlovname;
4026
4027                 CDEBUG(D_MGS, "lov param %s\n", ptr);
4028                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
4029                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
4030                                            "set on the MDT, not %s. "
4031                                            "Ignoring.\n",
4032                                            mti->mti_svname);
4033                         GOTO(end, rc = 0);
4034                 }
4035
4036                 /* Modify mdtlov */
4037                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4038                         GOTO(end, rc = -ENODEV);
4039
4040                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
4041                                              mti->mti_stripe_index);
4042                 if (rc)
4043                         GOTO(end, rc);
4044                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4045                                   &mgi->mgi_bufs, mdtlovname, ptr);
4046                 name_destroy(&logname);
4047                 name_destroy(&mdtlovname);
4048                 if (rc)
4049                         GOTO(end, rc);
4050
4051                 /* Modify clilov */
4052                 rc = name_create(&logname, mti->mti_fsname, "-client");
4053                 if (rc)
4054                         GOTO(end, rc);
4055                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4056                                   fsdb->fsdb_clilov, ptr);
4057                 name_destroy(&logname);
4058                 GOTO(end, rc);
4059         }
4060
4061         /* All osc., mdc., llite. params in proc */
4062         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
4063             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
4064             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
4065                 char *cname;
4066
4067                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
4068                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
4069                                            " cannot be modified. Consider"
4070                                            " updating the configuration with"
4071                                            " --writeconf\n",
4072                                            mti->mti_svname);
4073                         GOTO(end, rc = -EINVAL);
4074                 }
4075                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
4076                         rc = name_create(&cname, mti->mti_fsname, "-client");
4077                         /* Add the client type to match the obdname in
4078                            class_config_llog_handler */
4079                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4080                         rc = name_create(&cname, mti->mti_svname, "-mdc");
4081                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4082                         rc = name_create(&cname, mti->mti_svname, "-osc");
4083                 } else {
4084                         GOTO(end, rc = -EINVAL);
4085                 }
4086                 if (rc)
4087                         GOTO(end, rc);
4088
4089                 /* Forbid direct update of llite root squash parameters.
4090                  * These parameters are indirectly set via the MDT settings.
4091                  * See (LU-1778) */
4092                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
4093                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4094                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4095                         LCONSOLE_ERROR("%s: root squash parameters can only "
4096                                 "be updated through MDT component\n",
4097                                 mti->mti_fsname);
4098                         name_destroy(&cname);
4099                         GOTO(end, rc = -EINVAL);
4100                 }
4101
4102                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4103
4104                 /* Modify client */
4105                 rc = name_create(&logname, mti->mti_fsname, "-client");
4106                 if (rc) {
4107                         name_destroy(&cname);
4108                         GOTO(end, rc);
4109                 }
4110                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4111                                   cname, ptr);
4112
4113                 /* osc params affect the MDT as well */
4114                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
4115                         int i;
4116
4117                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
4118                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4119                                         continue;
4120                                 name_destroy(&cname);
4121                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
4122                                                          fsdb, i);
4123                                 name_destroy(&logname);
4124                                 if (rc)
4125                                         break;
4126                                 rc = name_create_mdt(&logname,
4127                                                      mti->mti_fsname, i);
4128                                 if (rc)
4129                                         break;
4130                                 if (!mgs_log_is_empty(env, mgs, logname)) {
4131                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
4132                                                           mti, logname,
4133                                                           &mgi->mgi_bufs,
4134                                                           cname, ptr);
4135                                         if (rc)
4136                                                 break;
4137                                 }
4138                         }
4139                 }
4140
4141                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
4142                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
4143                     rc == 0) {
4144                         char suffix[16];
4145                         char *lodname = NULL;
4146                         char *param_str = NULL;
4147                         int i;
4148                         int index;
4149
4150                         /* replace mdc with osp */
4151                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
4152                         rc = server_name2index(mti->mti_svname, &index, NULL);
4153                         if (rc < 0) {
4154                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4155                                 GOTO(end, rc);
4156                         }
4157
4158                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4159                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4160                                         continue;
4161
4162                                 if (i == index)
4163                                         continue;
4164
4165                                 name_destroy(&logname);
4166                                 rc = name_create_mdt(&logname, mti->mti_fsname,
4167                                                      i);
4168                                 if (rc < 0)
4169                                         break;
4170
4171                                 if (mgs_log_is_empty(env, mgs, logname))
4172                                         continue;
4173
4174                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
4175                                          i);
4176                                 name_destroy(&cname);
4177                                 rc = name_create(&cname, mti->mti_svname,
4178                                                  suffix);
4179                                 if (rc < 0)
4180                                         break;
4181
4182                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4183                                                   &mgi->mgi_bufs, cname, ptr);
4184                                 if (rc < 0)
4185                                         break;
4186
4187                                 /* Add configuration log for noitfying LOD
4188                                  * to active/deactive the OSP. */
4189                                 name_destroy(&param_str);
4190                                 rc = name_create(&param_str, cname,
4191                                                  (*tmp == '0') ?  ".active=0" :
4192                                                  ".active=1");
4193                                 if (rc < 0)
4194                                         break;
4195
4196                                 name_destroy(&lodname);
4197                                 rc = name_create(&lodname, logname, "-mdtlov");
4198                                 if (rc < 0)
4199                                         break;
4200
4201                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4202                                                   &mgi->mgi_bufs, lodname,
4203                                                   param_str);
4204                                 if (rc < 0)
4205                                         break;
4206                         }
4207                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4208                         name_destroy(&lodname);
4209                         name_destroy(&param_str);
4210                 }
4211
4212                 name_destroy(&logname);
4213                 name_destroy(&cname);
4214                 GOTO(end, rc);
4215         }
4216
4217         /* All mdt. params in proc */
4218         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
4219                 int i;
4220                 __u32 idx;
4221
4222                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4223                 if (strncmp(mti->mti_svname, mti->mti_fsname,
4224                             MTI_NAME_MAXLEN) == 0)
4225                         /* device is unspecified completely? */
4226                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
4227                 else
4228                         rc = server_name2index(mti->mti_svname, &idx, NULL);
4229                 if (rc < 0)
4230                         goto active_err;
4231                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
4232                         goto active_err;
4233                 if (rc & LDD_F_SV_ALL) {
4234                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4235                                 if (!test_bit(i,
4236                                                   fsdb->fsdb_mdt_index_map))
4237                                         continue;
4238                                 rc = name_create_mdt(&logname,
4239                                                 mti->mti_fsname, i);
4240                                 if (rc)
4241                                         goto active_err;
4242                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4243                                                   logname, &mgi->mgi_bufs,
4244                                                   logname, ptr);
4245                                 name_destroy(&logname);
4246                                 if (rc)
4247                                         goto active_err;
4248                         }
4249                 } else {
4250                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
4251                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
4252                                 LCONSOLE_ERROR("%s: root squash parameters "
4253                                         "cannot be applied to a single MDT\n",
4254                                         mti->mti_fsname);
4255                                 GOTO(end, rc = -EINVAL);
4256                         }
4257                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4258                                           mti->mti_svname, &mgi->mgi_bufs,
4259                                           mti->mti_svname, ptr);
4260                         if (rc)
4261                                 goto active_err;
4262                 }
4263
4264                 /* root squash settings are also applied to llite
4265                  * config log (see LU-1778) */
4266                 if (rc == 0 &&
4267                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4268                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4269                         char *cname;
4270                         char *ptr2;
4271
4272                         rc = name_create(&cname, mti->mti_fsname, "-client");
4273                         if (rc)
4274                                 GOTO(end, rc);
4275                         rc = name_create(&logname, mti->mti_fsname, "-client");
4276                         if (rc) {
4277                                 name_destroy(&cname);
4278                                 GOTO(end, rc);
4279                         }
4280                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
4281                         if (rc) {
4282                                 name_destroy(&cname);
4283                                 name_destroy(&logname);
4284                                 GOTO(end, rc);
4285                         }
4286                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4287                                           &mgi->mgi_bufs, cname, ptr2);
4288                         name_destroy(&ptr2);
4289                         name_destroy(&logname);
4290                         name_destroy(&cname);
4291                 }
4292                 GOTO(end, rc);
4293         }
4294
4295         /* All mdd., ost. and osd. params in proc */
4296         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
4297             (class_match_param(ptr, PARAM_LOD, NULL) == 0) ||
4298             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
4299             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
4300                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4301                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4302                         GOTO(end, rc = -ENODEV);
4303
4304                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4305                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
4306                 GOTO(end, rc);
4307         }
4308
4309         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
4310
4311 end:
4312         if (rc)
4313                 CERROR("err %d on param '%s'\n", rc, ptr);
4314
4315         RETURN(rc);
4316 }
4317
4318 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
4319                          struct mgs_target_info *mti, struct fs_db *fsdb)
4320 {
4321         char    *buf, *params;
4322         int      rc = -EINVAL;
4323
4324         ENTRY;
4325
4326         /* set/check the new target index */
4327         rc = mgs_set_index(env, mgs, mti);
4328         if (rc < 0)
4329                 RETURN(rc);
4330
4331         if (rc == EALREADY) {
4332                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
4333                               mti->mti_stripe_index, mti->mti_svname);
4334                 /* We would like to mark old log sections as invalid
4335                    and add new log sections in the client and mdt logs.
4336                    But if we add new sections, then live clients will
4337                    get repeat setup instructions for already running
4338                    osc's. So don't update the client/mdt logs. */
4339                 mti->mti_flags &= ~LDD_F_UPDATE;
4340                 rc = 0;
4341         }
4342
4343         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
4344                          cfs_fail_val : 10);
4345
4346         mutex_lock(&fsdb->fsdb_mutex);
4347
4348         if (mti->mti_flags & (LDD_F_VIRGIN | LDD_F_WRITECONF)) {
4349                 /* Generate a log from scratch */
4350                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4351                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
4352                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4353                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
4354                 } else {
4355                         CERROR("Unknown target type %#x, can't create log for %s\n",
4356                                mti->mti_flags, mti->mti_svname);
4357                 }
4358                 if (rc) {
4359                         CERROR("Can't write logs for %s (%d)\n",
4360                                mti->mti_svname, rc);
4361                         GOTO(out_up, rc);
4362                 }
4363         } else {
4364                 /* Just update the params from tunefs in mgs_write_log_params */
4365                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
4366                 mti->mti_flags |= LDD_F_PARAM;
4367         }
4368
4369         /* allocate temporary buffer, where class_get_next_param will
4370            make copy of a current  parameter */
4371         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
4372         if (buf == NULL)
4373                 GOTO(out_up, rc = -ENOMEM);
4374         params = mti->mti_params;
4375         while (params != NULL) {
4376                 rc = class_get_next_param(&params, buf);
4377                 if (rc) {
4378                         if (rc == 1)
4379                                 /* there is no next parameter, that is
4380                                    not an error */
4381                                 rc = 0;
4382                         break;
4383                 }
4384                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
4385                        params, buf);
4386                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
4387                 if (rc)
4388                         break;
4389         }
4390
4391         OBD_FREE(buf, strlen(mti->mti_params) + 1);
4392
4393 out_up:
4394         mutex_unlock(&fsdb->fsdb_mutex);
4395         RETURN(rc);
4396 }
4397
4398 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
4399 {
4400         struct llog_ctxt        *ctxt;
4401         int                      rc = 0;
4402
4403         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4404         if (ctxt == NULL) {
4405                 CERROR("%s: MGS config context doesn't exist\n",
4406                        mgs->mgs_obd->obd_name);
4407                 rc = -ENODEV;
4408         } else {
4409                 rc = llog_erase(env, ctxt, NULL, name);
4410                 /* llog may not exist */
4411                 if (rc == -ENOENT)
4412                         rc = 0;
4413                 llog_ctxt_put(ctxt);
4414         }
4415
4416         if (rc)
4417                 CERROR("%s: failed to clear log %s: %d\n",
4418                        mgs->mgs_obd->obd_name, name, rc);
4419
4420         return rc;
4421 }
4422
4423 /* erase all logs for the given fs */
4424 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs,
4425                    const char *fsname)
4426 {
4427         struct list_head log_list;
4428         struct mgs_direntry *dirent, *n;
4429         char barrier_name[20] = {};
4430         char *suffix;
4431         int count = 0;
4432         int rc, len = strlen(fsname);
4433         ENTRY;
4434
4435         mutex_lock(&mgs->mgs_mutex);
4436
4437         /* Find all the logs in the CONFIGS directory */
4438         rc = class_dentry_readdir(env, mgs, &log_list);
4439         if (rc) {
4440                 mutex_unlock(&mgs->mgs_mutex);
4441                 RETURN(rc);
4442         }
4443
4444         if (list_empty(&log_list)) {
4445                 mutex_unlock(&mgs->mgs_mutex);
4446                 RETURN(-ENOENT);
4447         }
4448
4449         snprintf(barrier_name, sizeof(barrier_name) - 1, "%s-%s",
4450                  fsname, BARRIER_FILENAME);
4451         /* Delete the barrier fsdb */
4452         mgs_remove_fsdb_by_name(mgs, barrier_name);
4453         /* Delete the fs db */
4454         mgs_remove_fsdb_by_name(mgs, fsname);
4455         mutex_unlock(&mgs->mgs_mutex);
4456
4457         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4458                 list_del_init(&dirent->mde_list);
4459                 suffix = strrchr(dirent->mde_name, '-');
4460                 if (suffix != NULL) {
4461                         if ((len == suffix - dirent->mde_name) &&
4462                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
4463                                 CDEBUG(D_MGS, "Removing log %s\n",
4464                                        dirent->mde_name);
4465                                 mgs_erase_log(env, mgs, dirent->mde_name);
4466                                 count++;
4467                         }
4468                 }
4469                 mgs_direntry_free(dirent);
4470         }
4471
4472         if (count == 0)
4473                 rc = -ENOENT;
4474
4475         RETURN(rc);
4476 }
4477
4478 /* list all logs for the given fs */
4479 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
4480                   struct obd_ioctl_data *data)
4481 {
4482         struct list_head         log_list;
4483         struct mgs_direntry     *dirent, *n;
4484         char                    *out, *suffix;
4485         int                      l, remains, rc;
4486
4487         ENTRY;
4488
4489         /* Find all the logs in the CONFIGS directory */
4490         rc = class_dentry_readdir(env, mgs, &log_list);
4491         if (rc)
4492                 RETURN(rc);
4493
4494         out = data->ioc_bulk;
4495         remains = data->ioc_inllen1;
4496         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4497                 list_del_init(&dirent->mde_list);
4498                 suffix = strrchr(dirent->mde_name, '-');
4499                 if (suffix != NULL) {
4500                         l = snprintf(out, remains, "config_log: %s\n",
4501                                      dirent->mde_name);
4502                         out += l;
4503                         remains -= l;
4504                 }
4505                 mgs_direntry_free(dirent);
4506                 if (remains <= 0)
4507                         break;
4508         }
4509         RETURN(rc);
4510 }
4511
4512 struct mgs_lcfg_fork_data {
4513         struct lustre_cfg_bufs   mlfd_bufs;
4514         struct mgs_device       *mlfd_mgs;
4515         struct llog_handle      *mlfd_llh;
4516         const char              *mlfd_oldname;
4517         const char              *mlfd_newname;
4518         char                     mlfd_data[0];
4519 };
4520
4521 static bool contain_valid_fsname(char *buf, const char *fsname,
4522                                  int buflen, int namelen)
4523 {
4524         if (buflen < namelen)
4525                 return false;
4526
4527         if (memcmp(buf, fsname, namelen) != 0)
4528                 return false;
4529
4530         if (buf[namelen] != '\0' && buf[namelen] != '-')
4531                 return false;
4532
4533         return true;
4534 }
4535
4536 static int mgs_lcfg_fork_handler(const struct lu_env *env,
4537                                  struct llog_handle *o_llh,
4538                                  struct llog_rec_hdr *o_rec, void *data)
4539 {
4540         struct mgs_lcfg_fork_data *mlfd = data;
4541         struct lustre_cfg_bufs *n_bufs = &mlfd->mlfd_bufs;
4542         struct lustre_cfg *o_lcfg = (struct lustre_cfg *)(o_rec + 1);
4543         struct llog_cfg_rec *lcr;
4544         char *o_buf;
4545         char *n_buf = mlfd->mlfd_data;
4546         int o_buflen;
4547         int o_namelen = strlen(mlfd->mlfd_oldname);
4548         int n_namelen = strlen(mlfd->mlfd_newname);
4549         int diff = n_namelen - o_namelen;
4550         __u32 cmd = o_lcfg->lcfg_command;
4551         __u32 cnt = o_lcfg->lcfg_bufcount;
4552         int rc;
4553         int i;
4554         ENTRY;
4555
4556         /* buf[0] */
4557         o_buf = lustre_cfg_buf(o_lcfg, 0);
4558         o_buflen = o_lcfg->lcfg_buflens[0];
4559         if (contain_valid_fsname(o_buf, mlfd->mlfd_oldname, o_buflen,
4560                                  o_namelen)) {
4561                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4562                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4563                        o_buflen - o_namelen);
4564                 lustre_cfg_bufs_reset(n_bufs, n_buf);
4565                 n_buf += cfs_size_round(o_buflen + diff);
4566         } else {
4567                 lustre_cfg_bufs_reset(n_bufs, o_buflen != 0 ? o_buf : NULL);
4568         }
4569
4570         switch (cmd) {
4571         case LCFG_MARKER: {
4572                 struct cfg_marker *o_marker;
4573                 struct cfg_marker *n_marker;
4574                 int tgt_namelen;
4575
4576                 if (cnt != 2) {
4577                         CDEBUG(D_MGS, "Unknown cfg marker entry with %d "
4578                                "buffers\n", cnt);
4579                         RETURN(-EINVAL);
4580                 }
4581
4582                 /* buf[1] is marker */
4583                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4584                 o_buflen = o_lcfg->lcfg_buflens[1];
4585                 o_marker = (struct cfg_marker *)o_buf;
4586                 if (!contain_valid_fsname(o_marker->cm_tgtname,
4587                                           mlfd->mlfd_oldname,
4588                                           sizeof(o_marker->cm_tgtname),
4589                                           o_namelen)) {
4590                         lustre_cfg_bufs_set(n_bufs, 1, o_marker,
4591                                             sizeof(*o_marker));
4592                         break;
4593                 }
4594
4595                 n_marker = (struct cfg_marker *)n_buf;
4596                 *n_marker = *o_marker;
4597                 memcpy(n_marker->cm_tgtname, mlfd->mlfd_newname, n_namelen);
4598                 tgt_namelen = strlen(o_marker->cm_tgtname);
4599                 if (tgt_namelen > o_namelen)
4600                         memcpy(n_marker->cm_tgtname + n_namelen,
4601                                o_marker->cm_tgtname + o_namelen,
4602                                tgt_namelen - o_namelen);
4603                 n_marker->cm_tgtname[tgt_namelen + diff] = '\0';
4604                 lustre_cfg_bufs_set(n_bufs, 1, n_marker, sizeof(*n_marker));
4605                 break;
4606         }
4607         case LCFG_PARAM:
4608         case LCFG_SET_PARAM: {
4609                 for (i = 1; i < cnt; i++)
4610                         /* buf[i] is the param value, reuse it directly */
4611                         lustre_cfg_bufs_set(n_bufs, i,
4612                                             lustre_cfg_buf(o_lcfg, i),
4613                                             o_lcfg->lcfg_buflens[i]);
4614                 break;
4615         }
4616         case LCFG_POOL_NEW:
4617         case LCFG_POOL_ADD:
4618         case LCFG_POOL_REM:
4619         case LCFG_POOL_DEL: {
4620                 if (cnt < 3 || cnt > 4) {
4621                         CDEBUG(D_MGS, "Unknown cfg pool (%x) entry with %d "
4622                                "buffers\n", cmd, cnt);
4623                         RETURN(-EINVAL);
4624                 }
4625
4626                 /* buf[1] is fsname */
4627                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4628                 o_buflen = o_lcfg->lcfg_buflens[1];
4629                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4630                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4631                        o_buflen - o_namelen);
4632                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen + diff);
4633                 n_buf += cfs_size_round(o_buflen + diff);
4634
4635                 /* buf[2] is the pool name, reuse it directly */
4636                 lustre_cfg_bufs_set(n_bufs, 2, lustre_cfg_buf(o_lcfg, 2),
4637                                     o_lcfg->lcfg_buflens[2]);
4638
4639                 if (cnt == 3)
4640                         break;
4641
4642                 /* buf[3] is ostname */
4643                 o_buf = lustre_cfg_buf(o_lcfg, 3);
4644                 o_buflen = o_lcfg->lcfg_buflens[3];
4645                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4646                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4647                        o_buflen - o_namelen);
4648                 lustre_cfg_bufs_set(n_bufs, 3, n_buf, o_buflen + diff);
4649                 break;
4650         }
4651         case LCFG_SETUP: {
4652                 if (cnt == 2) {
4653                         o_buflen = o_lcfg->lcfg_buflens[1];
4654                         if (o_buflen == sizeof(struct lov_desc) ||
4655                             o_buflen == sizeof(struct lmv_desc)) {
4656                                 char *o_uuid;
4657                                 char *n_uuid;
4658                                 int uuid_len;
4659
4660                                 /* buf[1] */
4661                                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4662                                 if (o_buflen == sizeof(struct lov_desc)) {
4663                                         struct lov_desc *o_desc =
4664                                                 (struct lov_desc *)o_buf;
4665                                         struct lov_desc *n_desc =
4666                                                 (struct lov_desc *)n_buf;
4667
4668                                         *n_desc = *o_desc;
4669                                         o_uuid = o_desc->ld_uuid.uuid;
4670                                         n_uuid = n_desc->ld_uuid.uuid;
4671                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4672                                 } else {
4673                                         struct lmv_desc *o_desc =
4674                                                 (struct lmv_desc *)o_buf;
4675                                         struct lmv_desc *n_desc =
4676                                                 (struct lmv_desc *)n_buf;
4677
4678                                         *n_desc = *o_desc;
4679                                         o_uuid = o_desc->ld_uuid.uuid;
4680                                         n_uuid = n_desc->ld_uuid.uuid;
4681                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4682                                 }
4683
4684                                 if (unlikely(!contain_valid_fsname(o_uuid,
4685                                                 mlfd->mlfd_oldname, uuid_len,
4686                                                 o_namelen))) {
4687                                         lustre_cfg_bufs_set(n_bufs, 1, o_buf,
4688                                                             o_buflen);
4689                                         break;
4690                                 }
4691
4692                                 memcpy(n_uuid, mlfd->mlfd_newname, n_namelen);
4693                                 uuid_len = strlen(o_uuid);
4694                                 if (uuid_len > o_namelen)
4695                                         memcpy(n_uuid + n_namelen,
4696                                                o_uuid + o_namelen,
4697                                                uuid_len - o_namelen);
4698                                 n_uuid[uuid_len + diff] = '\0';
4699                                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen);
4700                                 break;
4701                         } /* else case fall through */
4702                 } /* else case fall through */
4703         }
4704         /* fallthrough */
4705         default: {
4706                 for (i = 1; i < cnt; i++) {
4707                         o_buflen = o_lcfg->lcfg_buflens[i];
4708                         if (o_buflen == 0)
4709                                 continue;
4710
4711                         o_buf = lustre_cfg_buf(o_lcfg, i);
4712                         if (!contain_valid_fsname(o_buf, mlfd->mlfd_oldname,
4713                                                   o_buflen, o_namelen)) {
4714                                 lustre_cfg_bufs_set(n_bufs, i, o_buf, o_buflen);
4715                                 continue;
4716                         }
4717
4718                         memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4719                         if (o_buflen == o_namelen) {
4720                                 lustre_cfg_bufs_set(n_bufs, i, n_buf,
4721                                                     n_namelen);
4722                                 n_buf += cfs_size_round(n_namelen);
4723                                 continue;
4724                         }
4725
4726                         memcpy(n_buf + n_namelen, o_buf + o_namelen,
4727                                o_buflen - o_namelen);
4728                         lustre_cfg_bufs_set(n_bufs, i, n_buf, o_buflen + diff);
4729                         n_buf += cfs_size_round(o_buflen + diff);
4730                 }
4731                 break;
4732         }
4733         }
4734
4735         lcr = lustre_cfg_rec_new(cmd, n_bufs);
4736         if (!lcr)
4737                 RETURN(-ENOMEM);
4738
4739         lcr->lcr_cfg = *o_lcfg;
4740         rc = llog_write(env, mlfd->mlfd_llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
4741         lustre_cfg_rec_free(lcr);
4742
4743         RETURN(rc);
4744 }
4745
4746 static int mgs_lcfg_fork_one(const struct lu_env *env, struct mgs_device *mgs,
4747                              struct mgs_direntry *mde, const char *oldname,
4748                              const char *newname)
4749 {
4750         struct llog_handle *old_llh = NULL;
4751         struct llog_handle *new_llh = NULL;
4752         struct llog_ctxt *ctxt = NULL;
4753         struct mgs_lcfg_fork_data *mlfd = NULL;
4754         char *name_buf = NULL;
4755         int name_buflen;
4756         int old_namelen = strlen(oldname);
4757         int new_namelen = strlen(newname);
4758         int rc;
4759         ENTRY;
4760
4761         name_buflen = mde->mde_len + new_namelen - old_namelen;
4762         OBD_ALLOC(name_buf, name_buflen);
4763         if (!name_buf)
4764                 RETURN(-ENOMEM);
4765
4766         memcpy(name_buf, newname, new_namelen);
4767         memcpy(name_buf + new_namelen, mde->mde_name + old_namelen,
4768                mde->mde_len - old_namelen);
4769
4770         CDEBUG(D_MGS, "Fork the config-log from %s to %s\n",
4771                mde->mde_name, name_buf);
4772
4773         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4774         LASSERT(ctxt);
4775
4776         rc = llog_open_create(env, ctxt, &new_llh, NULL, name_buf);
4777         if (rc)
4778                 GOTO(out, rc);
4779
4780         rc = llog_init_handle(env, new_llh, LLOG_F_IS_PLAIN, NULL);
4781         if (rc)
4782                 GOTO(out, rc);
4783
4784         if (unlikely(mgs_log_is_empty(env, mgs, mde->mde_name)))
4785                 GOTO(out, rc = 0);
4786
4787         rc = llog_open(env, ctxt, &old_llh, NULL, mde->mde_name,
4788                        LLOG_OPEN_EXISTS);
4789         if (rc)
4790                 GOTO(out, rc);
4791
4792         rc = llog_init_handle(env, old_llh, LLOG_F_IS_PLAIN, NULL);
4793         if (rc)
4794                 GOTO(out, rc);
4795
4796         new_llh->lgh_hdr->llh_tgtuuid = old_llh->lgh_hdr->llh_tgtuuid;
4797
4798         OBD_ALLOC(mlfd, LLOG_MIN_CHUNK_SIZE);
4799         if (!mlfd)
4800                 GOTO(out, rc = -ENOMEM);
4801
4802         mlfd->mlfd_mgs = mgs;
4803         mlfd->mlfd_llh = new_llh;
4804         mlfd->mlfd_oldname = oldname;
4805         mlfd->mlfd_newname = newname;
4806
4807         rc = llog_process(env, old_llh, mgs_lcfg_fork_handler, mlfd, NULL);
4808         OBD_FREE(mlfd, LLOG_MIN_CHUNK_SIZE);
4809
4810         GOTO(out, rc);
4811
4812 out:
4813         if (old_llh)
4814                 llog_close(env, old_llh);
4815         if (new_llh)
4816                 llog_close(env, new_llh);
4817         if (name_buf)
4818                 OBD_FREE(name_buf, name_buflen);
4819         if (ctxt)
4820                 llog_ctxt_put(ctxt);
4821
4822         return rc;
4823 }
4824
4825 int mgs_lcfg_fork(const struct lu_env *env, struct mgs_device *mgs,
4826                   const char *oldname, const char *newname)
4827 {
4828         struct list_head log_list;
4829         struct mgs_direntry *dirent, *n;
4830         int olen = strlen(oldname);
4831         int nlen = strlen(newname);
4832         int count = 0;
4833         int rc = 0;
4834         ENTRY;
4835
4836         if (unlikely(!oldname || oldname[0] == '\0' ||
4837                      !newname || newname[0] == '\0'))
4838                 RETURN(-EINVAL);
4839
4840         if (strcmp(oldname, newname) == 0)
4841                 RETURN(-EINVAL);
4842
4843         /* lock it to prevent fork/erase/register in parallel. */
4844         mutex_lock(&mgs->mgs_mutex);
4845
4846         rc = class_dentry_readdir(env, mgs, &log_list);
4847         if (rc) {
4848                 mutex_unlock(&mgs->mgs_mutex);
4849                 RETURN(rc);
4850         }
4851
4852         if (list_empty(&log_list)) {
4853                 mutex_unlock(&mgs->mgs_mutex);
4854                 RETURN(-ENOENT);
4855         }
4856
4857         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4858                 char *ptr;
4859
4860                 ptr = strrchr(dirent->mde_name, '-');
4861                 if (ptr) {
4862                         int tlen = ptr - dirent->mde_name;
4863
4864                         if (tlen == nlen &&
4865                             strncmp(newname, dirent->mde_name, tlen) == 0)
4866                                 GOTO(out, rc = -EEXIST);
4867
4868                         if (tlen == olen &&
4869                             strncmp(oldname, dirent->mde_name, tlen) == 0)
4870                                 continue;
4871                 }
4872
4873                 list_del_init(&dirent->mde_list);
4874                 mgs_direntry_free(dirent);
4875         }
4876
4877         if (list_empty(&log_list)) {
4878                 mutex_unlock(&mgs->mgs_mutex);
4879                 RETURN(-ENOENT);
4880         }
4881
4882         list_for_each_entry(dirent, &log_list, mde_list) {
4883                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, newname);
4884                 if (rc)
4885                         break;
4886
4887                 count++;
4888         }
4889
4890 out:
4891         mutex_unlock(&mgs->mgs_mutex);
4892
4893         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4894                 list_del_init(&dirent->mde_list);
4895                 mgs_direntry_free(dirent);
4896         }
4897
4898         if (rc && count > 0)
4899                 mgs_erase_logs(env, mgs, newname);
4900
4901         RETURN(rc);
4902 }
4903
4904 int mgs_lcfg_erase(const struct lu_env *env, struct mgs_device *mgs,
4905                    const char *fsname)
4906 {
4907         int rc;
4908         ENTRY;
4909
4910         if (unlikely(!fsname || fsname[0] == '\0'))
4911                 RETURN(-EINVAL);
4912
4913         rc = mgs_erase_logs(env, mgs, fsname);
4914
4915         RETURN(rc);
4916 }
4917
4918 static int mgs_xattr_del(const struct lu_env *env, struct dt_object *obj)
4919 {
4920         struct dt_device *dev;
4921         struct thandle *th = NULL;
4922         int rc = 0;
4923
4924         ENTRY;
4925
4926         dev = container_of0(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
4927         th = dt_trans_create(env, dev);
4928         if (IS_ERR(th))
4929                 RETURN(PTR_ERR(th));
4930
4931         rc = dt_declare_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4932         if (rc)
4933                 GOTO(stop, rc);
4934
4935         rc = dt_trans_start_local(env, dev, th);
4936         if (rc)
4937                 GOTO(stop, rc);
4938
4939         dt_write_lock(env, obj, 0);
4940         rc = dt_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4941
4942         GOTO(unlock, rc);
4943
4944 unlock:
4945         dt_write_unlock(env, obj);
4946
4947 stop:
4948         dt_trans_stop(env, dev, th);
4949
4950         return rc;
4951 }
4952
4953 int mgs_lcfg_rename(const struct lu_env *env, struct mgs_device *mgs)
4954 {
4955         struct list_head log_list;
4956         struct mgs_direntry *dirent, *n;
4957         char fsname[16];
4958         struct lu_buf buf = {
4959                 .lb_buf = fsname,
4960                 .lb_len = sizeof(fsname)
4961         };
4962         int rc = 0;
4963
4964         ENTRY;
4965
4966         rc = class_dentry_readdir(env, mgs, &log_list);
4967         if (rc)
4968                 RETURN(rc);
4969
4970         if (list_empty(&log_list))
4971                 RETURN(0);
4972
4973         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4974                 struct dt_object *o = NULL;
4975                 char oldname[16];
4976                 char *ptr;
4977                 int len;
4978
4979                 list_del_init(&dirent->mde_list);
4980                 ptr = strrchr(dirent->mde_name, '-');
4981                 if (!ptr)
4982                         goto next;
4983
4984                 len = ptr - dirent->mde_name;
4985                 if (unlikely(len >= sizeof(oldname))) {
4986                         CDEBUG(D_MGS, "Skip invalid configuration file %s\n",
4987                                dirent->mde_name);
4988                         goto next;
4989                 }
4990
4991                 o = local_file_find(env, mgs->mgs_los, mgs->mgs_configs_dir,
4992                                     dirent->mde_name);
4993                 if (IS_ERR(o)) {
4994                         rc = PTR_ERR(o);
4995                         CDEBUG(D_MGS, "Fail to locate file %s: rc = %d\n",
4996                                dirent->mde_name, rc);
4997                         goto next;
4998                 }
4999
5000                 rc = dt_xattr_get(env, o, &buf, XATTR_TARGET_RENAME);
5001                 if (rc < 0) {
5002                         if (rc == -ENODATA)
5003                                 rc = 0;
5004                         else
5005                                 CDEBUG(D_MGS,
5006                                        "Fail to get EA for %s: rc = %d\n",
5007                                        dirent->mde_name, rc);
5008                         goto next;
5009                 }
5010
5011                 if (unlikely(rc == len &&
5012                              memcmp(fsname, dirent->mde_name, len) == 0)) {
5013                         /* The new fsname is the same as the old one. */
5014                         rc = mgs_xattr_del(env, o);
5015                         goto next;
5016                 }
5017
5018                 memcpy(oldname, dirent->mde_name, len);
5019                 oldname[len] = '\0';
5020                 fsname[rc] = '\0';
5021                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, fsname);
5022                 if (rc && rc != -EEXIST) {
5023                         CDEBUG(D_MGS, "Fail to fork %s: rc = %d\n",
5024                                dirent->mde_name, rc);
5025                         goto next;
5026                 }
5027
5028                 rc = mgs_erase_log(env, mgs, dirent->mde_name);
5029                 if (rc) {
5030                         CDEBUG(D_MGS, "Fail to erase old %s: rc = %d\n",
5031                                dirent->mde_name, rc);
5032                         /* keep it there if failed to remove it. */
5033                         rc = 0;
5034                 }
5035
5036 next:
5037                 if (o && !IS_ERR(o))
5038                         lu_object_put(env, &o->do_lu);
5039
5040                 mgs_direntry_free(dirent);
5041                 if (rc)
5042                         break;
5043         }
5044
5045         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
5046                 list_del_init(&dirent->mde_list);
5047                 mgs_direntry_free(dirent);
5048         }
5049
5050         RETURN(rc);
5051 }
5052
5053 /* Setup _mgs fsdb and log
5054  */
5055 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5056 {
5057         struct fs_db *fsdb = NULL;
5058         int rc;
5059         ENTRY;
5060
5061         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
5062         if (!rc)
5063                 mgs_put_fsdb(mgs, fsdb);
5064
5065         RETURN(rc);
5066 }
5067
5068 /* Setup params fsdb and log
5069  */
5070 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5071 {
5072         struct fs_db *fsdb = NULL;
5073         struct llog_handle *params_llh = NULL;
5074         int rc;
5075         ENTRY;
5076
5077         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5078         if (!rc) {
5079                 mutex_lock(&fsdb->fsdb_mutex);
5080                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
5081                 if (!rc)
5082                         rc = record_end_log(env, &params_llh);
5083                 mutex_unlock(&fsdb->fsdb_mutex);
5084                 mgs_put_fsdb(mgs, fsdb);
5085         }
5086
5087         RETURN(rc);
5088 }
5089
5090 /* Cleanup params fsdb and log
5091  */
5092 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
5093 {
5094         int rc;
5095
5096         rc = mgs_erase_logs(env, mgs, PARAMS_FILENAME);
5097         return rc == -ENOENT ? 0 : rc;
5098 }
5099
5100 /**
5101  * Fill in the mgs_target_info based on data devname and param provide.
5102  *
5103  * @env         thread context
5104  * @mgs         mgs device
5105  * @mti         mgs target info. We want to set this based other paramters
5106  *              passed to this function. Once setup we write it to the config
5107  *              logs.
5108  * @devname     optional OBD device name
5109  * @param       string that contains both what tunable to set and the value to
5110  *              set it to.
5111  *
5112  * RETURN       0 for success
5113  *              negative error number on failure
5114  **/
5115 static int mgs_set_conf_param(const struct lu_env *env, struct mgs_device *mgs,
5116                               struct mgs_target_info *mti, const char *devname,
5117                               const char *param)
5118 {
5119         struct fs_db *fsdb = NULL;
5120         int dev_type;
5121         int rc = 0;
5122
5123         ENTRY;
5124         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
5125         if (!devname) {
5126                 size_t len;
5127
5128                 /* We have two possible cases here:
5129                  *
5130                  * 1) the device name embedded in the param:
5131                  *    lustre-OST0000.osc.max_dirty_mb=32
5132                  *
5133                  * 2) the file system name is embedded in
5134                  *    the param: lustre.sys.at.min=0
5135                  */
5136                 len = strcspn(param, ".=");
5137                 if (!len || param[len] == '=')
5138                         RETURN(-EINVAL);
5139
5140                 if (len >= sizeof(mti->mti_svname))
5141                         RETURN(-E2BIG);
5142
5143                 snprintf(mti->mti_svname, sizeof(mti->mti_svname),
5144                          "%.*s", (int)len, param);
5145                 param += len + 1;
5146         } else {
5147                 if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname)) >=
5148                     sizeof(mti->mti_svname))
5149                         RETURN(-E2BIG);
5150         }
5151
5152         if (!strlen(mti->mti_svname)) {
5153                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
5154                 RETURN(-ENOSYS);
5155         }
5156
5157         dev_type = mgs_parse_devname(mti->mti_svname, mti->mti_fsname,
5158                                      &mti->mti_stripe_index);
5159         switch (dev_type) {
5160         /* For this case we have an invalid obd device name */
5161         case -ENXIO:
5162                 CDEBUG(D_MGS, "%s don't contain an index\n", mti->mti_svname);
5163                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5164                 dev_type = 0;
5165                 break;
5166         /* Not an obd device, assume devname is the fsname.
5167          * User might of only provided fsname and not obd device
5168          */
5169         case -EINVAL:
5170                 CDEBUG(D_MGS, "%s is seen as a file system name\n", mti->mti_svname);
5171                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5172                 dev_type = 0;
5173                 break;
5174         default:
5175                 if (dev_type < 0)
5176                         GOTO(out, rc = dev_type);
5177
5178                 /* param related to llite isn't allowed to set by OST or MDT */
5179                 if (dev_type & LDD_F_SV_TYPE_OST ||
5180                     dev_type & LDD_F_SV_TYPE_MDT) {
5181                         /* param related to llite isn't allowed to set by OST
5182                          * or MDT
5183                          */
5184                         if (!strncmp(param, PARAM_LLITE,
5185                                      sizeof(PARAM_LLITE) - 1))
5186                                 GOTO(out, rc = -EINVAL);
5187
5188                         /* Strip -osc or -mdc suffix from svname */
5189                         if (server_make_name(dev_type, mti->mti_stripe_index,
5190                                              mti->mti_fsname, mti->mti_svname,
5191                                              sizeof(mti->mti_svname)))
5192                                 GOTO(out, rc = -EINVAL);
5193                 }
5194                 break;
5195         }
5196
5197         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
5198             sizeof(mti->mti_params))
5199                 GOTO(out, rc = -E2BIG);
5200
5201         CDEBUG(D_MGS, "set_conf_param fs='%s' device='%s' param='%s'\n",
5202                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5203
5204         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
5205         if (rc)
5206                 GOTO(out, rc);
5207
5208         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
5209             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5210                 CERROR("No filesystem targets for %s. cfg_device from lctl "
5211                        "is '%s'\n", mti->mti_fsname, mti->mti_svname);
5212                 mgs_unlink_fsdb(mgs, fsdb);
5213                 GOTO(out, rc = -EINVAL);
5214         }
5215
5216         /*
5217          * Revoke lock so everyone updates.  Should be alright if
5218          * someone was already reading while we were updating the logs,
5219          * so we don't really need to hold the lock while we're
5220          * writing (above).
5221          */
5222         mti->mti_flags = dev_type | LDD_F_PARAM;
5223         mutex_lock(&fsdb->fsdb_mutex);
5224         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
5225         mutex_unlock(&fsdb->fsdb_mutex);
5226         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
5227
5228 out:
5229         if (fsdb)
5230                 mgs_put_fsdb(mgs, fsdb);
5231
5232         RETURN(rc);
5233 }
5234
5235 static int mgs_set_param2(const struct lu_env *env, struct mgs_device *mgs,
5236                           struct mgs_target_info *mti, const char *param)
5237 {
5238         struct fs_db *fsdb = NULL;
5239         int dev_type;
5240         size_t len;
5241         int rc;
5242
5243         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
5244             sizeof(mti->mti_params))
5245                 GOTO(out, rc = -E2BIG);
5246
5247         len = strcspn(param, ".=");
5248         if (len && param[len] != '=') {
5249                 struct list_head *tmp;
5250                 char *ptr;
5251
5252                 param += len + 1;
5253                 ptr = strchr(param, '.');
5254
5255                 len = strlen(param);
5256                 if (ptr)
5257                         len -= strlen(ptr);
5258                 if (len >= sizeof(mti->mti_svname))
5259                         GOTO(out, rc = -E2BIG);
5260
5261                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "%.*s",
5262                         (int)len, param);
5263
5264                 mutex_lock(&mgs->mgs_mutex);
5265                 if (unlikely(list_empty(&mgs->mgs_fs_db_list))) {
5266                         mutex_unlock(&mgs->mgs_mutex);
5267                         GOTO(out, rc = -ENODEV);
5268                 }
5269
5270                 list_for_each(tmp, &mgs->mgs_fs_db_list) {
5271                         fsdb = list_entry(tmp, struct fs_db, fsdb_list);
5272                         if (fsdb->fsdb_has_lproc_entry &&
5273                             strcmp(fsdb->fsdb_name, "params") != 0 &&
5274                             strstr(param, fsdb->fsdb_name)) {
5275                                 snprintf(mti->mti_svname,
5276                                          sizeof(mti->mti_svname), "%s",
5277                                          fsdb->fsdb_name);
5278                                 break;
5279                         }
5280                         fsdb = NULL;
5281                 }
5282
5283                 if (!fsdb) {
5284                         snprintf(mti->mti_svname, sizeof(mti->mti_svname),
5285                                  "general");
5286                 }
5287                 mutex_unlock(&mgs->mgs_mutex);
5288         } else {
5289                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "general");
5290         }
5291
5292         CDEBUG(D_MGS, "set_param2 fs='%s' device='%s' param='%s'\n",
5293                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5294
5295         /* The return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5296          * A returned error tells us we don't have a target obd device.
5297          */
5298         dev_type = server_name2index(mti->mti_svname, &mti->mti_stripe_index,
5299                                      NULL);
5300         if (dev_type < 0)
5301                 dev_type = 0;
5302
5303         /* the return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5304          * Strip -osc or -mdc suffix from svname
5305          */
5306         if ((dev_type & LDD_F_SV_TYPE_OST || dev_type & LDD_F_SV_TYPE_MDT) &&
5307             server_make_name(dev_type, mti->mti_stripe_index,
5308                              mti->mti_fsname, mti->mti_svname,
5309                              sizeof(mti->mti_svname)))
5310                 GOTO(out, rc = -EINVAL);
5311
5312         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5313         if (rc)
5314                 GOTO(out, rc);
5315         /*
5316          * Revoke lock so everyone updates.  Should be alright if
5317          * someone was already reading while we were updating the logs,
5318          * so we don't really need to hold the lock while we're
5319          * writing (above).
5320          */
5321         mti->mti_flags = dev_type | LDD_F_PARAM2;
5322         mutex_lock(&fsdb->fsdb_mutex);
5323         rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
5324         mutex_unlock(&fsdb->fsdb_mutex);
5325         mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
5326         mgs_put_fsdb(mgs, fsdb);
5327 out:
5328         RETURN(rc);
5329 }
5330
5331 /* Set a permanent (config log) param for a target or fs
5332  *
5333  * @lcfg buf0 may contain the device (testfs-MDT0000) name
5334  *       buf1 contains the single parameter
5335  */
5336 int mgs_set_param(const struct lu_env *env, struct mgs_device *mgs,
5337                   struct lustre_cfg *lcfg)
5338 {
5339         const char *param = lustre_cfg_string(lcfg, 1);
5340         struct mgs_target_info *mti;
5341         int rc;
5342
5343         /* Create a fake mti to hold everything */
5344         OBD_ALLOC_PTR(mti);
5345         if (!mti)
5346                 return -ENOMEM;
5347
5348         print_lustre_cfg(lcfg);
5349
5350         if (lcfg->lcfg_command == LCFG_PARAM) {
5351                 /* For the case of lctl conf_param devname can be
5352                  * lustre, lustre-mdtlov, lustre-client, lustre-MDT0000
5353                  */
5354                 const char *devname = lustre_cfg_string(lcfg, 0);
5355
5356                 rc = mgs_set_conf_param(env, mgs, mti, devname, param);
5357         } else {
5358                 /* In the case of lctl set_param -P lcfg[0] will always
5359                  * be 'general'. At least for now.
5360                  */
5361                 rc = mgs_set_param2(env, mgs, mti, param);
5362         }
5363
5364         OBD_FREE_PTR(mti);
5365
5366         return rc;
5367 }
5368
5369 static int mgs_write_log_pool(const struct lu_env *env,
5370                               struct mgs_device *mgs, char *logname,
5371                               struct fs_db *fsdb, char *tgtname,
5372                               enum lcfg_command_type cmd,
5373                               char *fsname, char *poolname,
5374                               char *ostname, char *comment)
5375 {
5376         struct llog_handle *llh = NULL;
5377         int rc;
5378
5379         rc = record_start_log(env, mgs, &llh, logname);
5380         if (rc)
5381                 return rc;
5382         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
5383         if (rc)
5384                 goto out;
5385         rc = record_base(env, llh, tgtname, 0, cmd,
5386                          fsname, poolname, ostname, NULL);
5387         if (rc)
5388                 goto out;
5389         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
5390 out:
5391         record_end_log(env, &llh);
5392         return rc;
5393 }
5394
5395 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
5396                     enum lcfg_command_type cmd, const char *nodemap_name,
5397                     char *param)
5398 {
5399         lnet_nid_t nid[2];
5400         u32 idmap[2];
5401         bool bool_switch;
5402         u32 int_id;
5403         int rc = 0;
5404
5405         ENTRY;
5406         switch (cmd) {
5407         case LCFG_NODEMAP_ADD:
5408                 rc = nodemap_add(nodemap_name);
5409                 break;
5410         case LCFG_NODEMAP_DEL:
5411                 rc = nodemap_del(nodemap_name);
5412                 break;
5413         case LCFG_NODEMAP_ADD_RANGE:
5414                 rc = nodemap_parse_range(param, nid);
5415                 if (rc != 0)
5416                         break;
5417                 rc = nodemap_add_range(nodemap_name, nid);
5418                 break;
5419         case LCFG_NODEMAP_DEL_RANGE:
5420                 rc = nodemap_parse_range(param, nid);
5421                 if (rc != 0)
5422                         break;
5423                 rc = nodemap_del_range(nodemap_name, nid);
5424                 break;
5425         case LCFG_NODEMAP_ADMIN:
5426                 rc = kstrtobool(param, &bool_switch);
5427                 if (rc)
5428                         break;
5429                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
5430                 break;
5431         case LCFG_NODEMAP_DENY_UNKNOWN:
5432                 rc = kstrtobool(param, &bool_switch);
5433                 if (rc)
5434                         break;
5435                 rc = nodemap_set_deny_unknown(nodemap_name, bool_switch);
5436                 break;
5437         case LCFG_NODEMAP_AUDIT_MODE:
5438                 rc = kstrtoul(param, 10, (unsigned long *)&bool_switch);
5439                 if (rc == 0)
5440                         rc = nodemap_set_audit_mode(nodemap_name, bool_switch);
5441                 break;
5442         case LCFG_NODEMAP_MAP_MODE:
5443                 if (strcmp("both", param) == 0)
5444                         rc = nodemap_set_mapping_mode(nodemap_name,
5445                                                       NODEMAP_MAP_BOTH);
5446                 else if (strcmp("uid_only", param) == 0)
5447                         rc = nodemap_set_mapping_mode(nodemap_name,
5448                                                       NODEMAP_MAP_UID_ONLY);
5449                 else if (strcmp("gid_only", param) == 0)
5450                         rc = nodemap_set_mapping_mode(nodemap_name,
5451                                                       NODEMAP_MAP_GID_ONLY);
5452                 else
5453                         rc = -EINVAL;
5454                 break;
5455         case LCFG_NODEMAP_TRUSTED:
5456                 rc = kstrtobool(param, &bool_switch);
5457                 if (rc)
5458                         break;
5459                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
5460                 break;
5461         case LCFG_NODEMAP_SQUASH_UID:
5462                 rc = kstrtouint(param, 10, &int_id);
5463                 if (rc)
5464                         break;
5465                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
5466                 break;
5467         case LCFG_NODEMAP_SQUASH_GID:
5468                 rc = kstrtouint(param, 10, &int_id);
5469                 if (rc)
5470                         break;
5471                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
5472                 break;
5473         case LCFG_NODEMAP_ADD_UIDMAP:
5474         case LCFG_NODEMAP_ADD_GIDMAP:
5475                 rc = nodemap_parse_idmap(param, idmap);
5476                 if (rc != 0)
5477                         break;
5478                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
5479                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
5480                                                idmap);
5481                 else
5482                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
5483                                                idmap);
5484                 break;
5485         case LCFG_NODEMAP_DEL_UIDMAP:
5486         case LCFG_NODEMAP_DEL_GIDMAP:
5487                 rc = nodemap_parse_idmap(param, idmap);
5488                 if (rc != 0)
5489                         break;
5490                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
5491                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
5492                                                idmap);
5493                 else
5494                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
5495                                                idmap);
5496                 break;
5497         case LCFG_NODEMAP_SET_FILESET:
5498                 rc = nodemap_set_fileset(nodemap_name, param);
5499                 break;
5500         case LCFG_NODEMAP_SET_SEPOL:
5501                 rc = nodemap_set_sepol(nodemap_name, param);
5502                 break;
5503         default:
5504                 rc = -EINVAL;
5505         }
5506
5507         RETURN(rc);
5508 }
5509
5510 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
5511                  enum lcfg_command_type cmd, char *fsname,
5512                  char *poolname, char *ostname)
5513 {
5514         struct fs_db *fsdb;
5515         char *lovname;
5516         char *logname;
5517         char *label = NULL, *canceled_label = NULL;
5518         int label_sz;
5519         struct mgs_target_info *mti = NULL;
5520         bool checked = false;
5521         bool locked = false;
5522         bool free = false;
5523         int rc, i;
5524         ENTRY;
5525
5526         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
5527         if (rc) {
5528                 CERROR("Can't get db for %s\n", fsname);
5529                 RETURN(rc);
5530         }
5531         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5532                 CERROR("%s is not defined\n", fsname);
5533                 free = true;
5534                 GOTO(out_fsdb, rc = -EINVAL);
5535         }
5536
5537         label_sz = 10 + strlen(fsname) + strlen(poolname);
5538
5539         /* check if ostname match fsname */
5540         if (ostname != NULL) {
5541                 char *ptr;
5542
5543                 ptr = strrchr(ostname, '-');
5544                 if ((ptr == NULL) ||
5545                     (strncmp(fsname, ostname, ptr-ostname) != 0))
5546                         RETURN(-EINVAL);
5547                 label_sz += strlen(ostname);
5548         }
5549
5550         OBD_ALLOC(label, label_sz);
5551         if (!label)
5552                 GOTO(out_fsdb, rc = -ENOMEM);
5553
5554         switch(cmd) {
5555         case LCFG_POOL_NEW:
5556                 sprintf(label,
5557                         "new %s.%s", fsname, poolname);
5558                 break;
5559         case LCFG_POOL_ADD:
5560                 sprintf(label,
5561                         "add %s.%s.%s", fsname, poolname, ostname);
5562                 break;
5563         case LCFG_POOL_REM:
5564                 OBD_ALLOC(canceled_label, label_sz);
5565                 if (canceled_label == NULL)
5566                         GOTO(out_label, rc = -ENOMEM);
5567                 sprintf(label,
5568                         "rem %s.%s.%s", fsname, poolname, ostname);
5569                 sprintf(canceled_label,
5570                         "add %s.%s.%s", fsname, poolname, ostname);
5571                 break;
5572         case LCFG_POOL_DEL:
5573                 OBD_ALLOC(canceled_label, label_sz);
5574                 if (canceled_label == NULL)
5575                         GOTO(out_label, rc = -ENOMEM);
5576                 sprintf(label,
5577                         "del %s.%s", fsname, poolname);
5578                 sprintf(canceled_label,
5579                         "new %s.%s", fsname, poolname);
5580                 break;
5581         default:
5582                 break;
5583         }
5584
5585         OBD_ALLOC_PTR(mti);
5586         if (mti == NULL)
5587                 GOTO(out_cancel, rc = -ENOMEM);
5588         strncpy(mti->mti_svname, "lov pool", sizeof(mti->mti_svname));
5589
5590         mutex_lock(&fsdb->fsdb_mutex);
5591         locked = true;
5592         /* write pool def to all MDT logs */
5593         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
5594                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
5595                         rc = name_create_mdt_and_lov(&logname, &lovname,
5596                                                      fsdb, i);
5597                         if (rc)
5598                                 GOTO(out_mti, rc);
5599
5600                         if (!checked && (canceled_label == NULL)) {
5601                                 rc = mgs_check_marker(env, mgs, fsdb, mti,
5602                                                 logname, lovname, label);
5603                                 if (rc) {
5604                                         name_destroy(&logname);
5605                                         name_destroy(&lovname);
5606                                         GOTO(out_mti,
5607                                                 rc = (rc == LLOG_PROC_BREAK ?
5608                                                         -EEXIST : rc));
5609                                 }
5610                                 checked = true;
5611                         }
5612                         if (canceled_label != NULL)
5613                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5614                                                 lovname, canceled_label,
5615                                                 CM_SKIP);
5616
5617                         if (rc >= 0)
5618                                 rc = mgs_write_log_pool(env, mgs, logname,
5619                                                         fsdb, lovname, cmd,
5620                                                         fsname, poolname,
5621                                                         ostname, label);
5622                         name_destroy(&logname);
5623                         name_destroy(&lovname);
5624                         if (rc)
5625                                 GOTO(out_mti, rc);
5626                 }
5627         }
5628
5629         rc = name_create(&logname, fsname, "-client");
5630         if (rc)
5631                 GOTO(out_mti, rc);
5632
5633         if (!checked && (canceled_label == NULL)) {
5634                 rc = mgs_check_marker(env, mgs, fsdb, mti, logname,
5635                                 fsdb->fsdb_clilov, label);
5636                 if (rc) {
5637                         name_destroy(&logname);
5638                         GOTO(out_mti, rc = (rc == LLOG_PROC_BREAK ?
5639                                 -EEXIST : rc));
5640                 }
5641         }
5642         if (canceled_label != NULL) {
5643                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5644                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
5645                 if (rc < 0) {
5646                         name_destroy(&logname);
5647                         GOTO(out_mti, rc);
5648                 }
5649         }
5650
5651         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
5652                                 cmd, fsname, poolname, ostname, label);
5653         mutex_unlock(&fsdb->fsdb_mutex);
5654         locked = false;
5655         name_destroy(&logname);
5656         /* request for update */
5657         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
5658
5659         GOTO(out_mti, rc);
5660
5661 out_mti:
5662         if (locked)
5663                 mutex_unlock(&fsdb->fsdb_mutex);
5664         if (mti != NULL)
5665                 OBD_FREE_PTR(mti);
5666 out_cancel:
5667         if (canceled_label != NULL)
5668                 OBD_FREE(canceled_label, label_sz);
5669 out_label:
5670         OBD_FREE(label, label_sz);
5671 out_fsdb:
5672         if (free)
5673                 mgs_unlink_fsdb(mgs, fsdb);
5674         mgs_put_fsdb(mgs, fsdb);
5675
5676         return rc;
5677 }