Whamcloud - gitweb
LU-14928 mgs: allow md target re-register
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mgs/mgs_llog.c
32  *
33  * Lustre Management Server (mgs) config llog creation
34  *
35  * Author: Nathan Rutman <nathan@clusterfs.com>
36  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
37  * Author: Mikhail Pershin <tappro@whamcloud.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_MGS
41 #define D_MGS D_CONFIG
42
43 #include <obd.h>
44 #include <uapi/linux/lustre/lustre_ioctl.h>
45 #include <uapi/linux/lustre/lustre_param.h>
46 #include <lustre_sec.h>
47 #include <lustre_quota.h>
48 #include <lustre_sec.h>
49
50 #include "mgs_internal.h"
51
52 /********************** Class functions ********************/
53
54 /**
55  * Find all logs in CONFIG directory and link then into list.
56  *
57  * \param[in] env       pointer to the thread context
58  * \param[in] mgs       pointer to the mgs device
59  * \param[out] log_list the list to hold the found llog name entry
60  *
61  * \retval              0 for success
62  * \retval              negative error number on failure
63  **/
64 int class_dentry_readdir(const struct lu_env *env, struct mgs_device *mgs,
65                          struct list_head *log_list)
66 {
67         struct dt_object *dir = mgs->mgs_configs_dir;
68         const struct dt_it_ops *iops;
69         struct dt_it *it;
70         struct mgs_direntry *de;
71         char *key;
72         int rc, key_sz;
73
74         INIT_LIST_HEAD(log_list);
75
76         LASSERT(dir);
77         LASSERT(dir->do_index_ops);
78
79         iops = &dir->do_index_ops->dio_it;
80         it = iops->init(env, dir, LUDA_64BITHASH);
81         if (IS_ERR(it))
82                 RETURN(PTR_ERR(it));
83
84         rc = iops->load(env, it, 0);
85         if (rc <= 0)
86                 GOTO(fini, rc = 0);
87
88         /* main cycle */
89         do {
90                 key = (void *)iops->key(env, it);
91                 if (IS_ERR(key)) {
92                         CERROR("%s: key failed when listing %s: rc = %d\n",
93                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
94                                (int) PTR_ERR(key));
95                         goto next;
96                 }
97                 key_sz = iops->key_size(env, it);
98                 LASSERT(key_sz > 0);
99
100                 /* filter out "." and ".." entries */
101                 if (key[0] == '.') {
102                         if (key_sz == 1)
103                                 goto next;
104                         if (key_sz == 2 && key[1] == '.')
105                                 goto next;
106                 }
107
108                 /* filter out backup files */
109                 if (lu_name_is_backup_file(key, key_sz, NULL)) {
110                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
111                                key_sz, key);
112                         goto next;
113                 }
114
115                 de = mgs_direntry_alloc(key_sz + 1);
116                 if (de == NULL) {
117                         rc = -ENOMEM;
118                         break;
119                 }
120
121                 memcpy(de->mde_name, key, key_sz);
122                 de->mde_name[key_sz] = 0;
123
124                 list_add(&de->mde_list, log_list);
125
126 next:
127                 rc = iops->next(env, it);
128         } while (rc == 0);
129         if (rc > 0)
130                 rc = 0;
131
132         iops->put(env, it);
133
134 fini:
135         iops->fini(env, it);
136         if (rc) {
137                 struct mgs_direntry *n;
138
139                 CERROR("%s: key failed when listing %s: rc = %d\n",
140                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
141
142                 list_for_each_entry_safe(de, n, log_list, mde_list) {
143                         list_del_init(&de->mde_list);
144                         mgs_direntry_free(de);
145                 }
146         }
147
148         RETURN(rc);
149 }
150
151 /******************** DB functions *********************/
152
153 static inline int name_create(char **newname, char *prefix, char *suffix)
154 {
155         LASSERT(newname);
156         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
157         if (!*newname)
158                 return -ENOMEM;
159         sprintf(*newname, "%s%s", prefix, suffix);
160         return 0;
161 }
162
163 static inline void name_destroy(char **name)
164 {
165         if (*name)
166                 OBD_FREE(*name, strlen(*name) + 1);
167         *name = NULL;
168 }
169
170 struct mgs_fsdb_handler_data
171 {
172         struct fs_db   *fsdb;
173         __u32           ver;
174 };
175
176 /* from the (client) config log, figure out:
177         1. which ost's/mdt's are configured (by index)
178         2. what the last config step is
179         3. COMPAT_18 osc name
180 */
181 /* It might be better to have a separate db file, instead of parsing the info
182    out of the client log.  This is slow and potentially error-prone. */
183 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
184                             struct llog_rec_hdr *rec, void *data)
185 {
186         struct mgs_fsdb_handler_data *d = data;
187         struct fs_db *fsdb = d->fsdb;
188         int cfg_len = rec->lrh_len;
189         char *cfg_buf = (char*) (rec + 1);
190         struct lustre_cfg *lcfg;
191         u32 index;
192         int rc = 0;
193         ENTRY;
194
195         if (rec->lrh_type != OBD_CFG_REC) {
196                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
197                 RETURN(-EINVAL);
198         }
199
200         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
201         if (rc) {
202                 CERROR("Insane cfg\n");
203                 RETURN(rc);
204         }
205
206         lcfg = (struct lustre_cfg *)cfg_buf;
207
208         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
209                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
210
211         /* Figure out ost indicies */
212         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
213         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
214             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
215                 rc = kstrtouint(lustre_cfg_string(lcfg, 2), 10, &index);
216                 if (rc)
217                         RETURN(rc);
218
219                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
220                        lustre_cfg_string(lcfg, 1), index,
221                        lustre_cfg_string(lcfg, 2));
222                 set_bit(index, fsdb->fsdb_ost_index_map);
223         }
224
225         /* Figure out mdt indicies */
226         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
227         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
228             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
229                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
230                                        &index, NULL);
231                 if (rc != LDD_F_SV_TYPE_MDT) {
232                         CWARN("Unparsable MDC name %s, assuming index 0\n",
233                               lustre_cfg_string(lcfg, 0));
234                         index = 0;
235                 }
236                 rc = 0;
237                 CDEBUG(D_MGS, "MDT index is %u\n", index);
238                 if (!test_bit(index, fsdb->fsdb_mdt_index_map)) {
239                         set_bit(index, fsdb->fsdb_mdt_index_map);
240                         fsdb->fsdb_mdt_count++;
241                 }
242         }
243
244         /**
245          * figure out the old config. fsdb_gen = 0 means old log
246          * It is obsoleted and not supported anymore
247          */
248         if (fsdb->fsdb_gen == 0) {
249                 CERROR("Old config format is not supported\n");
250                 RETURN(-EINVAL);
251         }
252
253         /*
254          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
255          */
256         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
257             lcfg->lcfg_command == LCFG_ATTACH &&
258             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
259                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
260                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
261                         CWARN("MDT using 1.8 OSC name scheme\n");
262                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
263                 }
264         }
265
266         if (lcfg->lcfg_command == LCFG_MARKER) {
267                 struct cfg_marker *marker;
268                 marker = lustre_cfg_buf(lcfg, 1);
269
270                 d->ver = marker->cm_vers;
271
272                 /* Keep track of the latest marker step */
273                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
274         }
275
276         RETURN(rc);
277 }
278
279 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
280 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
281                                   struct mgs_device *mgs,
282                                   struct fs_db *fsdb)
283 {
284         char *logname;
285         struct llog_handle *loghandle;
286         struct llog_ctxt *ctxt;
287         struct mgs_fsdb_handler_data d = {
288                 .fsdb = fsdb,
289         };
290         int rc;
291
292         ENTRY;
293
294         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
295         LASSERT(ctxt != NULL);
296         rc = name_create(&logname, fsdb->fsdb_name, "-client");
297         if (rc)
298                 GOTO(out_put, rc);
299         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
300         if (rc)
301                 GOTO(out_pop, rc);
302
303         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
304         if (rc)
305                 GOTO(out_close, rc);
306
307         if (llog_get_size(loghandle) <= 1)
308                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
309
310         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
311         CDEBUG(D_INFO, "get_db = %d\n", rc);
312 out_close:
313         llog_close(env, loghandle);
314 out_pop:
315         name_destroy(&logname);
316 out_put:
317         llog_ctxt_put(ctxt);
318
319         RETURN(rc);
320 }
321
322 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
323 {
324         struct mgs_tgt_srpc_conf *tgtconf;
325
326         /* free target-specific rules */
327         while (fsdb->fsdb_srpc_tgt) {
328                 tgtconf = fsdb->fsdb_srpc_tgt;
329                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
330
331                 LASSERT(tgtconf->mtsc_tgt);
332
333                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
334                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
335                 OBD_FREE_PTR(tgtconf);
336         }
337
338         /* free general rules */
339         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
340 }
341
342 static void mgs_unlink_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
343 {
344         mutex_lock(&mgs->mgs_mutex);
345         if (likely(!list_empty(&fsdb->fsdb_list))) {
346                 LASSERTF(atomic_read(&fsdb->fsdb_ref) >= 2,
347                          "Invalid ref %d on %s\n",
348                          atomic_read(&fsdb->fsdb_ref),
349                          fsdb->fsdb_name);
350
351                 list_del_init(&fsdb->fsdb_list);
352                 /* Drop the reference on the list.*/
353                 mgs_put_fsdb(mgs, fsdb);
354         }
355         mutex_unlock(&mgs->mgs_mutex);
356 }
357
358 /* The caller must hold mgs->mgs_mutex. */
359 static inline struct fs_db *
360 mgs_find_fsdb_noref(struct mgs_device *mgs, const char *fsname)
361 {
362         struct fs_db *fsdb;
363         struct list_head *tmp;
364
365         list_for_each(tmp, &mgs->mgs_fs_db_list) {
366                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
367                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
368                         return fsdb;
369         }
370
371         return NULL;
372 }
373
374 /* The caller must hold mgs->mgs_mutex. */
375 static void mgs_remove_fsdb_by_name(struct mgs_device *mgs, const char *name)
376 {
377         struct fs_db *fsdb;
378
379         fsdb = mgs_find_fsdb_noref(mgs, name);
380         if (fsdb) {
381                 list_del_init(&fsdb->fsdb_list);
382                 /* Drop the reference on the list.*/
383                 mgs_put_fsdb(mgs, fsdb);
384         }
385 }
386
387 /* The caller must hold mgs->mgs_mutex. */
388 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, const char *fsname)
389 {
390         struct fs_db *fsdb;
391
392         fsdb = mgs_find_fsdb_noref(mgs, fsname);
393         if (fsdb)
394                 atomic_inc(&fsdb->fsdb_ref);
395
396         return fsdb;
397 }
398
399 /* The caller must hold mgs->mgs_mutex. */
400 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
401                                   struct mgs_device *mgs, char *fsname)
402 {
403         struct fs_db *fsdb;
404         int rc;
405         ENTRY;
406
407         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
408                 CERROR("fsname %s is too long\n", fsname);
409
410                 RETURN(ERR_PTR(-EINVAL));
411         }
412
413         OBD_ALLOC_PTR(fsdb);
414         if (!fsdb)
415                 RETURN(ERR_PTR(-ENOMEM));
416
417         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
418         mutex_init(&fsdb->fsdb_mutex);
419         INIT_LIST_HEAD(&fsdb->fsdb_list);
420         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
421         fsdb->fsdb_gen = 1;
422         INIT_LIST_HEAD(&fsdb->fsdb_clients);
423         atomic_set(&fsdb->fsdb_notify_phase, 0);
424         init_waitqueue_head(&fsdb->fsdb_notify_waitq);
425         init_completion(&fsdb->fsdb_notify_comp);
426
427         if (strcmp(fsname, MGSSELF_NAME) == 0) {
428                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
429                 fsdb->fsdb_mgs = mgs;
430                 if (logname_is_barrier(fsname))
431                         goto add;
432         } else {
433                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
434                 if (!fsdb->fsdb_mdt_index_map) {
435                         CERROR("No memory for MDT index maps\n");
436
437                         GOTO(err, rc = -ENOMEM);
438                 }
439
440                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
441                 if (!fsdb->fsdb_ost_index_map) {
442                         CERROR("No memory for OST index maps\n");
443
444                         GOTO(err, rc = -ENOMEM);
445                 }
446
447                 if (logname_is_barrier(fsname))
448                         goto add;
449
450                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
451                 if (rc)
452                         GOTO(err, rc);
453
454                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
455                 if (rc)
456                         GOTO(err, rc);
457
458                 /* initialise data for NID table */
459                 mgs_ir_init_fs(env, mgs, fsdb);
460                 lproc_mgs_add_live(mgs, fsdb);
461         }
462
463         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
464             strcmp(PARAMS_FILENAME, fsname) != 0) {
465                 /* populate the db from the client llog */
466                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
467                 if (rc) {
468                         CERROR("Can't get db from client log %d\n", rc);
469
470                         GOTO(err, rc);
471                 }
472         }
473
474         /* populate srpc rules from params llog */
475         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
476         if (rc) {
477                 CERROR("Can't get db from params log %d\n", rc);
478
479                 GOTO(err, rc);
480         }
481
482 add:
483         /* One ref is for the fsdb on the list.
484          * The other ref is for the caller. */
485         atomic_set(&fsdb->fsdb_ref, 2);
486         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
487
488         RETURN(fsdb);
489
490 err:
491         atomic_set(&fsdb->fsdb_ref, 1);
492         mgs_put_fsdb(mgs, fsdb);
493
494         RETURN(ERR_PTR(rc));
495 }
496
497 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
498 {
499         LASSERT(list_empty(&fsdb->fsdb_list));
500
501         lproc_mgs_del_live(mgs, fsdb);
502
503         /* deinitialize fsr */
504         if (fsdb->fsdb_mgs)
505                 mgs_ir_fini_fs(mgs, fsdb);
506
507         if (fsdb->fsdb_ost_index_map)
508                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
509         if (fsdb->fsdb_mdt_index_map)
510                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
511         name_destroy(&fsdb->fsdb_clilov);
512         name_destroy(&fsdb->fsdb_clilmv);
513         mgs_free_fsdb_srpc(fsdb);
514         OBD_FREE_PTR(fsdb);
515 }
516
517 void mgs_put_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
518 {
519         if (atomic_dec_and_test(&fsdb->fsdb_ref))
520                 mgs_free_fsdb(mgs, fsdb);
521 }
522
523 int mgs_init_fsdb_list(struct mgs_device *mgs)
524 {
525         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
526         return 0;
527 }
528
529 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
530 {
531         struct fs_db *fsdb;
532         struct list_head *tmp, *tmp2;
533
534         mutex_lock(&mgs->mgs_mutex);
535         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
536                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
537                 list_del_init(&fsdb->fsdb_list);
538                 mgs_put_fsdb(mgs, fsdb);
539         }
540         mutex_unlock(&mgs->mgs_mutex);
541         return 0;
542 }
543
544 /* The caller must hold mgs->mgs_mutex. */
545 int mgs_find_or_make_fsdb_nolock(const struct lu_env *env,
546                                 struct mgs_device *mgs,
547                                 char *name, struct fs_db **dbh)
548 {
549         struct fs_db *fsdb;
550         int rc = 0;
551         ENTRY;
552
553         fsdb = mgs_find_fsdb(mgs, name);
554         if (!fsdb) {
555                 fsdb = mgs_new_fsdb(env, mgs, name);
556                 if (IS_ERR(fsdb))
557                         rc = PTR_ERR(fsdb);
558
559                 CDEBUG(D_MGS, "Created new db: rc = %d\n", rc);
560         }
561
562         if (!rc)
563                 *dbh = fsdb;
564
565         RETURN(rc);
566 }
567
568 int mgs_find_or_make_fsdb(const struct lu_env *env, struct mgs_device *mgs,
569                           char *name, struct fs_db **dbh)
570 {
571         int rc;
572         ENTRY;
573
574         mutex_lock(&mgs->mgs_mutex);
575         rc = mgs_find_or_make_fsdb_nolock(env, mgs, name, dbh);
576         mutex_unlock(&mgs->mgs_mutex);
577
578         RETURN(rc);
579 }
580
581 /* 1 = index in use
582    0 = index unused
583    -1= empty client log */
584 int mgs_check_index(const struct lu_env *env,
585                     struct mgs_device *mgs,
586                     struct mgs_target_info *mti)
587 {
588         struct fs_db *fsdb;
589         void *imap;
590         int rc = 0;
591         ENTRY;
592
593         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
594
595         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
596         if (rc) {
597                 CERROR("Can't get db for %s\n", mti->mti_fsname);
598                 RETURN(rc);
599         }
600
601         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
602                 GOTO(out, rc = -1);
603
604         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
605                 imap = fsdb->fsdb_ost_index_map;
606         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
607                 imap = fsdb->fsdb_mdt_index_map;
608         else
609                 GOTO(out, rc = -EINVAL);
610
611         if (test_bit(mti->mti_stripe_index, imap))
612                 GOTO(out, rc = 1);
613
614         GOTO(out, rc = 0);
615
616 out:
617         mgs_put_fsdb(mgs, fsdb);
618         return rc;
619 }
620
621 static __inline__ int next_index(void *index_map, int map_len)
622 {
623         int i;
624         for (i = 0; i < map_len * 8; i++)
625                 if (!test_bit(i, index_map)) {
626                          return i;
627                  }
628         CERROR("max index %d exceeded.\n", i);
629         return -1;
630 }
631
632 /* Make the mdt/ost server obd name based on the filesystem name */
633 static bool server_make_name(u32 flags, u16 index, const char *fs,
634                              char *name_buf, size_t name_buf_size)
635 {
636         bool invalid_flag = false;
637
638         if (flags & (LDD_F_SV_TYPE_MDT | LDD_F_SV_TYPE_OST)) {
639                 if (!(flags & LDD_F_SV_ALL))
640                         snprintf(name_buf, name_buf_size, "%.8s%c%s%04x", fs,
641                                 (flags & LDD_F_VIRGIN) ? ':' :
642                                         ((flags & LDD_F_WRITECONF) ? '=' : '-'),
643                                 (flags & LDD_F_SV_TYPE_MDT) ? "MDT" : "OST",
644                                 index);
645         } else if (flags & LDD_F_SV_TYPE_MGS) {
646                 snprintf(name_buf, name_buf_size, "MGS");
647         } else {
648                 CERROR("unknown server type %#x\n", flags);
649                 invalid_flag = true;
650         }
651         return invalid_flag;
652 }
653
654 /* Return codes:
655         0  newly marked as in use
656         <0 err
657         +EALREADY for update of an old index */
658 static int mgs_set_index(const struct lu_env *env,
659                          struct mgs_device *mgs,
660                          struct mgs_target_info *mti)
661 {
662         struct fs_db *fsdb;
663         void *imap;
664         int rc = 0;
665         ENTRY;
666
667         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
668         if (rc) {
669                 CERROR("Can't get db for %s\n", mti->mti_fsname);
670                 RETURN(rc);
671         }
672
673         mutex_lock(&fsdb->fsdb_mutex);
674         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
675                 imap = fsdb->fsdb_ost_index_map;
676         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
677                 imap = fsdb->fsdb_mdt_index_map;
678         } else {
679                 GOTO(out_up, rc = -EINVAL);
680         }
681
682         if (mti->mti_flags & LDD_F_NEED_INDEX) {
683                 rc = next_index(imap, INDEX_MAP_SIZE);
684                 if (rc == -1)
685                         GOTO(out_up, rc = -ERANGE);
686                 mti->mti_stripe_index = rc;
687         }
688
689         /* the last index(0xffff) is reserved for default value. */
690         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
691                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
692                                    "but index must be less than %u.\n",
693                                    mti->mti_svname, mti->mti_stripe_index,
694                                    INDEX_MAP_SIZE * 8 - 1);
695                 GOTO(out_up, rc = -ERANGE);
696         }
697
698         if (test_bit(mti->mti_stripe_index, imap)) {
699                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
700                     !(mti->mti_flags & LDD_F_WRITECONF)) {
701                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
702                                            "%d, but that index is already in "
703                                            "use. Use --writeconf to force\n",
704                                            mti->mti_svname,
705                                            mti->mti_stripe_index);
706                         GOTO(out_up, rc = -EADDRINUSE);
707                 } else {
708                         CDEBUG(D_MGS, "Server %s updating index %d\n",
709                                mti->mti_svname, mti->mti_stripe_index);
710                         GOTO(out_up, rc = EALREADY);
711                 }
712         } else {
713                 set_bit(mti->mti_stripe_index, imap);
714                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
715                         fsdb->fsdb_mdt_count++;
716         }
717
718         set_bit(mti->mti_stripe_index, imap);
719         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
720         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
721                              mti->mti_stripe_index, mti->mti_fsname,
722                              mti->mti_svname, sizeof(mti->mti_svname))) {
723                 CERROR("unknown server type %#x\n", mti->mti_flags);
724                 GOTO(out_up, rc = -EINVAL);
725         }
726
727         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
728                mti->mti_stripe_index);
729
730         GOTO(out_up, rc = 0);
731
732 out_up:
733         mutex_unlock(&fsdb->fsdb_mutex);
734         mgs_put_fsdb(mgs, fsdb);
735         return rc;
736 }
737
738 struct mgs_modify_lookup {
739         struct cfg_marker mml_marker;
740         int               mml_modified;
741 };
742
743 static int mgs_check_record_match(const struct lu_env *env,
744                                 struct llog_handle *llh,
745                                 struct llog_rec_hdr *rec, void *data)
746 {
747         struct cfg_marker *mc_marker = data;
748         struct cfg_marker *marker;
749         struct lustre_cfg *lcfg = REC_DATA(rec);
750         int cfg_len = REC_DATA_LEN(rec);
751         int rc;
752         ENTRY;
753
754
755         if (rec->lrh_type != OBD_CFG_REC) {
756                 CDEBUG(D_ERROR, "Unhandled lrh_type: %#x\n", rec->lrh_type);
757                 RETURN(-EINVAL);
758         }
759
760         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
761         if (rc) {
762                 CDEBUG(D_ERROR, "Insane cfg\n");
763                 RETURN(rc);
764         }
765
766         /* We only care about markers */
767         if (lcfg->lcfg_command != LCFG_MARKER)
768                 RETURN(0);
769
770         marker = lustre_cfg_buf(lcfg, 1);
771
772         if (marker->cm_flags & CM_SKIP)
773                 RETURN(0);
774
775         if ((strcmp(mc_marker->cm_comment, marker->cm_comment) == 0) &&
776                 (strcmp(mc_marker->cm_tgtname, marker->cm_tgtname) == 0)) {
777                 /* Found a non-skipped marker match */
778                 CDEBUG(D_MGS, "Matched rec %u marker %d flag %x %s %s\n",
779                         rec->lrh_index, marker->cm_step,
780                         marker->cm_flags, marker->cm_tgtname,
781                         marker->cm_comment);
782                 rc = LLOG_PROC_BREAK;
783         }
784
785         RETURN(rc);
786 }
787
788 /**
789  * Check an existing config log record with matching comment and device
790  * Return code:
791  * 0 - checked successfully,
792  * LLOG_PROC_BREAK - record matches
793  * negative - error
794  */
795 static int mgs_check_marker(const struct lu_env *env, struct mgs_device *mgs,
796                 struct fs_db *fsdb, struct mgs_target_info *mti,
797                 char *logname, char *devname, char *comment)
798 {
799         struct llog_handle *loghandle;
800         struct llog_ctxt *ctxt;
801         struct cfg_marker *mc_marker;
802         int rc;
803
804         ENTRY;
805
806         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
807         CDEBUG(D_MGS, "mgs check %s/%s/%s\n", logname, devname, comment);
808
809         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
810         LASSERT(ctxt != NULL);
811         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
812         if (rc < 0) {
813                 if (rc == -ENOENT)
814                         rc = 0;
815                 GOTO(out_pop, rc);
816         }
817
818         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
819         if (rc)
820                 GOTO(out_close, rc);
821
822         if (llog_get_size(loghandle) <= 1)
823                 GOTO(out_close, rc = 0);
824
825         OBD_ALLOC_PTR(mc_marker);
826         if (!mc_marker)
827                 GOTO(out_close, rc = -ENOMEM);
828         if (strlcpy(mc_marker->cm_comment, comment,
829                 sizeof(mc_marker->cm_comment)) >=
830                 sizeof(mc_marker->cm_comment))
831                 GOTO(out_free, rc = -E2BIG);
832         if (strlcpy(mc_marker->cm_tgtname, devname,
833                 sizeof(mc_marker->cm_tgtname)) >=
834                 sizeof(mc_marker->cm_tgtname))
835                 GOTO(out_free, rc = -E2BIG);
836
837         rc = llog_process(env, loghandle, mgs_check_record_match,
838                         (void *)mc_marker, NULL);
839
840 out_free:
841         OBD_FREE_PTR(mc_marker);
842
843 out_close:
844         llog_close(env, loghandle);
845 out_pop:
846         if (rc && rc != LLOG_PROC_BREAK)
847                 CDEBUG(D_ERROR, "%s: mgs check %s/%s failed: rc = %d\n",
848                         mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
849         llog_ctxt_put(ctxt);
850         RETURN(rc);
851 }
852
853 static int mgs_modify_handler(const struct lu_env *env,
854                               struct llog_handle *llh,
855                               struct llog_rec_hdr *rec, void *data)
856 {
857         struct mgs_modify_lookup *mml = data;
858         struct cfg_marker *marker;
859         struct lustre_cfg *lcfg = REC_DATA(rec);
860         int cfg_len = REC_DATA_LEN(rec);
861         int rc;
862         ENTRY;
863
864         if (rec->lrh_type != OBD_CFG_REC) {
865                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
866                 RETURN(-EINVAL);
867         }
868
869         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
870         if (rc) {
871                 CERROR("Insane cfg\n");
872                 RETURN(rc);
873         }
874
875         /* We only care about markers */
876         if (lcfg->lcfg_command != LCFG_MARKER)
877                 RETURN(0);
878
879         marker = lustre_cfg_buf(lcfg, 1);
880         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
881             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
882             !(marker->cm_flags & CM_SKIP)) {
883                 /* Found a non-skipped marker match */
884                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
885                        rec->lrh_index, marker->cm_step,
886                        marker->cm_flags, mml->mml_marker.cm_flags,
887                        marker->cm_tgtname, marker->cm_comment);
888                 /* Overwrite the old marker llog entry */
889                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
890                 marker->cm_flags |= mml->mml_marker.cm_flags;
891                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
892                 rc = llog_write(env, llh, rec, rec->lrh_index);
893                 if (!rc)
894                          mml->mml_modified++;
895         }
896
897         RETURN(rc);
898 }
899
900 /**
901  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
902  * Return code:
903  * 0 - modified successfully,
904  * 1 - no modification was done
905  * negative - error
906  */
907 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
908                       struct fs_db *fsdb, struct mgs_target_info *mti,
909                       char *logname, char *devname, char *comment, int flags)
910 {
911         struct llog_handle *loghandle;
912         struct llog_ctxt *ctxt;
913         struct mgs_modify_lookup *mml;
914         int rc;
915
916         ENTRY;
917
918         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
919         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
920                flags);
921
922         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
923         LASSERT(ctxt != NULL);
924         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
925         if (rc < 0) {
926                 if (rc == -ENOENT)
927                         rc = 0;
928                 GOTO(out_pop, rc);
929         }
930
931         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
932         if (rc)
933                 GOTO(out_close, rc);
934
935         if (llog_get_size(loghandle) <= 1)
936                 GOTO(out_close, rc = 0);
937
938         OBD_ALLOC_PTR(mml);
939         if (!mml)
940                 GOTO(out_close, rc = -ENOMEM);
941         if (strlcpy(mml->mml_marker.cm_comment, comment,
942                     sizeof(mml->mml_marker.cm_comment)) >=
943             sizeof(mml->mml_marker.cm_comment))
944                 GOTO(out_free, rc = -E2BIG);
945         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
946                     sizeof(mml->mml_marker.cm_tgtname)) >=
947             sizeof(mml->mml_marker.cm_tgtname))
948                 GOTO(out_free, rc = -E2BIG);
949         /* Modify mostly means cancel */
950         mml->mml_marker.cm_flags = flags;
951         mml->mml_marker.cm_canceltime = flags ? ktime_get_real_seconds() : 0;
952         mml->mml_modified = 0;
953         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
954                           NULL);
955         if (!rc && !mml->mml_modified)
956                 rc = 1;
957
958 out_free:
959         OBD_FREE_PTR(mml);
960
961 out_close:
962         llog_close(env, loghandle);
963 out_pop:
964         if (rc < 0)
965                 CERROR("%s: modify %s/%s failed: rc = %d\n",
966                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
967         llog_ctxt_put(ctxt);
968         RETURN(rc);
969 }
970
971 enum replace_state {
972         REPLACE_COPY = 0,
973         REPLACE_SKIP,
974         REPLACE_DONE,
975         REPLACE_UUID,
976         REPLACE_SETUP
977 };
978
979 /** This structure is passed to mgs_replace_handler */
980 struct mgs_replace_data {
981         /* Nids are replaced for this target device */
982         struct mgs_target_info target;
983         /* Temporary modified llog */
984         struct llog_handle *temp_llh;
985         enum replace_state state;
986         char *failover;
987         char *nodeuuid;
988 };
989
990 /**
991  * Check: a) if block should be skipped
992  * b) is it target block
993  *
994  * \param[in] lcfg
995  * \param[in] mrd
996  *
997  * \retval 0 should not to be skipped
998  * \retval 1 should to be skipped
999  */
1000 static int check_markers(struct lustre_cfg *lcfg,
1001                          struct mgs_replace_data *mrd)
1002 {
1003          struct cfg_marker *marker;
1004
1005         /* Track markers. Find given device */
1006         if (lcfg->lcfg_command == LCFG_MARKER) {
1007                 marker = lustre_cfg_buf(lcfg, 1);
1008                 /* Clean llog from records marked as CM_SKIP.
1009                    CM_EXCLUDE records are used for "active" command
1010                    and can be restored if needed */
1011                 if ((marker->cm_flags & (CM_SKIP | CM_START)) ==
1012                     (CM_SKIP | CM_START)) {
1013                         mrd->state = REPLACE_SKIP;
1014                         return 1;
1015                 }
1016
1017                 if ((marker->cm_flags & (CM_SKIP | CM_END)) ==
1018                     (CM_SKIP | CM_END)) {
1019                         mrd->state = REPLACE_COPY;
1020                         return 1;
1021                 }
1022
1023                 if (strcmp(mrd->target.mti_svname, marker->cm_tgtname) == 0) {
1024                         LASSERT(!(marker->cm_flags & CM_START) ||
1025                                 !(marker->cm_flags & CM_END));
1026                         if (marker->cm_flags & CM_START) {
1027                                 if (!strncmp(marker->cm_comment,
1028                                              "add failnid", 11)) {
1029                                         mrd->state = REPLACE_SKIP;
1030                                 } else {
1031                                         mrd->state = REPLACE_UUID;
1032                                         mrd->failover = NULL;
1033                                 }
1034                         } else if (marker->cm_flags & CM_END)
1035                                 mrd->state = REPLACE_COPY;
1036
1037                         if (!strncmp(marker->cm_comment,
1038                                 "add failnid", 11))
1039                                 return 1;
1040                 }
1041         }
1042
1043         return 0;
1044 }
1045
1046 static int record_base(const struct lu_env *env, struct llog_handle *llh,
1047                      char *cfgname, lnet_nid_t nid, int cmd,
1048                      char *s1, char *s2, char *s3, char *s4)
1049 {
1050         struct mgs_thread_info  *mgi = mgs_env_info(env);
1051         struct llog_cfg_rec     *lcr;
1052         int rc;
1053
1054         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
1055                cmd, s1, s2, s3, s4);
1056
1057         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
1058         if (s1)
1059                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
1060         if (s2)
1061                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
1062         if (s3)
1063                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
1064         if (s4)
1065                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
1066
1067         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
1068         if (lcr == NULL)
1069                 return -ENOMEM;
1070
1071         lcr->lcr_cfg.lcfg_nid = nid;
1072         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1073
1074         lustre_cfg_rec_free(lcr);
1075
1076         if (rc < 0)
1077                 CDEBUG(D_MGS,
1078                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
1079                        cfgname, cmd, s1, s2, s3, s4, rc);
1080         return rc;
1081 }
1082
1083 static inline int record_add_uuid(const struct lu_env *env,
1084                                   struct llog_handle *llh,
1085                                   uint64_t nid, char *uuid)
1086 {
1087         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
1088                            NULL, NULL, NULL);
1089 }
1090
1091 static inline int record_add_conn(const struct lu_env *env,
1092                                   struct llog_handle *llh,
1093                                   char *devname, char *uuid)
1094 {
1095         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
1096                            NULL, NULL, NULL);
1097 }
1098
1099 static inline int record_attach(const struct lu_env *env,
1100                                 struct llog_handle *llh, char *devname,
1101                                 char *type, char *uuid)
1102 {
1103         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
1104                            NULL, NULL);
1105 }
1106
1107 static inline int record_setup(const struct lu_env *env,
1108                                struct llog_handle *llh, char *devname,
1109                                char *s1, char *s2, char *s3, char *s4)
1110 {
1111         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
1112 }
1113
1114 /**
1115  * \retval <0 record processing error
1116  * \retval n record is processed. No need copy original one.
1117  * \retval 0 record is not processed.
1118  */
1119 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
1120                            struct mgs_replace_data *mrd)
1121 {
1122         int nids_added = 0;
1123         lnet_nid_t nid;
1124         char *ptr;
1125         int rc = 0;
1126
1127         if (mrd->state == REPLACE_UUID &&
1128             lcfg->lcfg_command == LCFG_ADD_UUID) {
1129                 /* LCFG_ADD_UUID command found. Let's skip original command
1130                    and add passed nids */
1131                 ptr = mrd->target.mti_params;
1132                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1133                         if (!mrd->nodeuuid) {
1134                                 rc = name_create(&mrd->nodeuuid,
1135                                                  libcfs_nid2str(nid), "");
1136                                 if (rc) {
1137                                         CERROR("Can't create uuid for "
1138                                                 "nid  %s, device %s\n",
1139                                                 libcfs_nid2str(nid),
1140                                                 mrd->target.mti_svname);
1141                                         return rc;
1142                                 }
1143                         }
1144                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
1145                                "device %s\n", libcfs_nid2str(nid),
1146                                 mrd->target.mti_params,
1147                                 mrd->nodeuuid);
1148                         rc = record_add_uuid(env,
1149                                              mrd->temp_llh, nid,
1150                                              mrd->nodeuuid);
1151                         if (rc)
1152                                 CWARN("%s: Can't add nid %s for uuid %s :rc=%d\n",
1153                                         mrd->target.mti_svname,
1154                                         libcfs_nid2str(nid),
1155                                         mrd->nodeuuid, rc);
1156                         else
1157                                 nids_added++;
1158
1159                         if (*ptr == ':') {
1160                                 mrd->failover = ptr;
1161                                 break;
1162                         }
1163                 }
1164
1165                 if (nids_added == 0) {
1166                         CERROR("No new nids were added, nid %s with uuid %s, "
1167                                "device %s\n", libcfs_nid2str(nid),
1168                                mrd->nodeuuid ? mrd->nodeuuid : "NULL",
1169                                mrd->target.mti_svname);
1170                         name_destroy(&mrd->nodeuuid);
1171                         return -ENXIO;
1172                 } else {
1173                         mrd->state = REPLACE_SETUP;
1174                 }
1175
1176                 return nids_added;
1177         }
1178
1179         if (mrd->state == REPLACE_SETUP && lcfg->lcfg_command == LCFG_SETUP) {
1180                 /* LCFG_SETUP command found. UUID should be changed */
1181                 rc = record_setup(env,
1182                                   mrd->temp_llh,
1183                                   /* devname the same */
1184                                   lustre_cfg_string(lcfg, 0),
1185                                   /* s1 is not changed */
1186                                   lustre_cfg_string(lcfg, 1),
1187                                   mrd->nodeuuid,
1188                                   /* s3 is not changed */
1189                                   lustre_cfg_string(lcfg, 3),
1190                                   /* s4 is not changed */
1191                                   lustre_cfg_string(lcfg, 4));
1192
1193                 name_destroy(&mrd->nodeuuid);
1194                 if (rc)
1195                         return rc;
1196
1197                 if (mrd->failover) {
1198                         ptr = mrd->failover;
1199                         while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1200                                 if (mrd->nodeuuid == NULL) {
1201                                         rc =  name_create(&mrd->nodeuuid,
1202                                                           libcfs_nid2str(nid),
1203                                                           "");
1204                                         if (rc)
1205                                                 return rc;
1206                                 }
1207
1208                                 CDEBUG(D_MGS, "add nid %s for failover %s\n",
1209                                        libcfs_nid2str(nid), mrd->nodeuuid);
1210                                 rc = record_add_uuid(env, mrd->temp_llh, nid,
1211                                                      mrd->nodeuuid);
1212                                 if (rc) {
1213                                         CWARN("%s: Can't add nid %s for failover %s :rc = %d\n",
1214                                                 mrd->target.mti_svname,
1215                                                 libcfs_nid2str(nid),
1216                                                 mrd->nodeuuid, rc);
1217                                         name_destroy(&mrd->nodeuuid);
1218                                         return rc;
1219                                 }
1220                                 if (*ptr == ':') {
1221                                         rc = record_add_conn(env,
1222                                                 mrd->temp_llh,
1223                                                 lustre_cfg_string(lcfg, 0),
1224                                                 mrd->nodeuuid);
1225                                         name_destroy(&mrd->nodeuuid);
1226                                         if (rc)
1227                                                 return rc;
1228                                 }
1229                         }
1230                         if (mrd->nodeuuid) {
1231                                 rc = record_add_conn(env, mrd->temp_llh,
1232                                                      lustre_cfg_string(lcfg, 0),
1233                                                      mrd->nodeuuid);
1234                                 name_destroy(&mrd->nodeuuid);
1235                                 if (rc)
1236                                         return rc;
1237                         }
1238                 }
1239                 mrd->state = REPLACE_DONE;
1240                 return rc ? rc : 1;
1241         }
1242
1243         /* All new UUID are added. Skip. */
1244         if (mrd->state == REPLACE_SETUP &&
1245                 lcfg->lcfg_command == LCFG_ADD_UUID)
1246                 return 1;
1247
1248         /* Another commands in target device block */
1249         return 0;
1250 }
1251
1252 /**
1253  * Handler that called for every record in llog.
1254  * Records are processed in order they placed in llog.
1255  *
1256  * \param[in] llh       log to be processed
1257  * \param[in] rec       current record
1258  * \param[in] data      mgs_replace_data structure
1259  *
1260  * \retval 0    success
1261  */
1262 static int mgs_replace_nids_handler(const struct lu_env *env,
1263                                     struct llog_handle *llh,
1264                                     struct llog_rec_hdr *rec,
1265                                     void *data)
1266 {
1267         struct mgs_replace_data *mrd;
1268         struct lustre_cfg *lcfg = REC_DATA(rec);
1269         int cfg_len = REC_DATA_LEN(rec);
1270         int rc;
1271         ENTRY;
1272
1273         mrd = (struct mgs_replace_data *)data;
1274
1275         if (rec->lrh_type != OBD_CFG_REC) {
1276                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
1277                        rec->lrh_type, lcfg->lcfg_command,
1278                        lustre_cfg_string(lcfg, 0),
1279                        lustre_cfg_string(lcfg, 1));
1280                 RETURN(-EINVAL);
1281         }
1282
1283         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1284         if (rc) {
1285                 /* Do not copy any invalidated records */
1286                 GOTO(skip_out, rc = 0);
1287         }
1288
1289         rc = check_markers(lcfg, mrd);
1290         if (rc || mrd->state == REPLACE_SKIP)
1291                 GOTO(skip_out, rc = 0);
1292
1293         /* Write to new log all commands outside target device block */
1294         if (mrd->state == REPLACE_COPY)
1295                 GOTO(copy_out, rc = 0);
1296
1297         if (mrd->state == REPLACE_DONE &&
1298             (lcfg->lcfg_command == LCFG_ADD_UUID ||
1299              lcfg->lcfg_command == LCFG_ADD_CONN)) {
1300                 if (!mrd->failover)
1301                         CWARN("Previous failover is deleted, but new one is "
1302                               "not set. This means you configure system "
1303                               "without failover or passed wrong replace_nids "
1304                               "command parameters. Device %s, passed nids %s\n",
1305                               mrd->target.mti_svname, mrd->target.mti_params);
1306                 GOTO(skip_out, rc = 0);
1307         }
1308
1309         rc = process_command(env, lcfg, mrd);
1310         if (rc < 0)
1311                 RETURN(rc);
1312
1313         if (rc)
1314                 RETURN(0);
1315 copy_out:
1316         /* Record is placed in temporary llog as is */
1317         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1318
1319         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1320                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1321                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1322         RETURN(rc);
1323
1324 skip_out:
1325         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1326                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1327                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1328         RETURN(rc);
1329 }
1330
1331 static int mgs_log_is_empty(const struct lu_env *env,
1332                             struct mgs_device *mgs, char *name)
1333 {
1334         struct llog_ctxt        *ctxt;
1335         int                      rc;
1336
1337         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1338         LASSERT(ctxt != NULL);
1339
1340         rc = llog_is_empty(env, ctxt, name);
1341         llog_ctxt_put(ctxt);
1342         return rc;
1343 }
1344
1345 static int mgs_replace_log(const struct lu_env *env,
1346                            struct obd_device *mgs,
1347                            char *logname, char *devname,
1348                            llog_cb_t replace_handler, void *data)
1349 {
1350         struct llog_handle *orig_llh, *backup_llh;
1351         struct llog_ctxt *ctxt;
1352         struct mgs_replace_data *mrd;
1353         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1354         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1355         char *backup;
1356         int rc, rc2, buf_size;
1357         time64_t now;
1358         ENTRY;
1359
1360         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1361         LASSERT(ctxt != NULL);
1362
1363         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1364                 /* Log is empty. Nothing to replace */
1365                 GOTO(out_put, rc = 0);
1366         }
1367
1368         now = ktime_get_real_seconds();
1369
1370         /* max time64_t in decimal fits into 20 bytes long string */
1371         buf_size = strlen(logname) + 1 + 20 + 1 + strlen(".bak") + 1;
1372         OBD_ALLOC(backup, buf_size);
1373         if (backup == NULL)
1374                 GOTO(out_put, rc = -ENOMEM);
1375
1376         snprintf(backup, buf_size, "%s.%llu.bak", logname, now);
1377
1378         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1379         if (rc == 0) {
1380                 /* Now erase original log file. Connections are not allowed.
1381                    Backup is already saved */
1382                 rc = llog_erase(env, ctxt, NULL, logname);
1383                 if (rc < 0)
1384                         GOTO(out_free, rc);
1385         } else if (rc != -ENOENT) {
1386                 CERROR("%s: can't make backup for %s: rc = %d\n",
1387                        mgs->obd_name, logname, rc);
1388                 GOTO(out_free,rc);
1389         }
1390
1391         /* open local log */
1392         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1393         if (rc)
1394                 GOTO(out_restore, rc);
1395
1396         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1397         if (rc)
1398                 GOTO(out_closel, rc);
1399
1400         /* open backup llog */
1401         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1402                        LLOG_OPEN_EXISTS);
1403         if (rc)
1404                 GOTO(out_closel, rc);
1405
1406         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1407         if (rc)
1408                 GOTO(out_close, rc);
1409
1410         if (llog_get_size(backup_llh) <= 1)
1411                 GOTO(out_close, rc = 0);
1412
1413         OBD_ALLOC_PTR(mrd);
1414         if (!mrd)
1415                 GOTO(out_close, rc = -ENOMEM);
1416         /* devname is only needed information to replace UUID records */
1417         if (devname)
1418                 strlcpy(mrd->target.mti_svname, devname,
1419                         sizeof(mrd->target.mti_svname));
1420         /* data is parsed in llog callback */
1421         if (data)
1422                 strlcpy(mrd->target.mti_params, data,
1423                         sizeof(mrd->target.mti_params));
1424         /* Copy records to this temporary llog */
1425         mrd->temp_llh = orig_llh;
1426
1427         rc = llog_process(env, backup_llh, replace_handler,
1428                           (void *)mrd, NULL);
1429         OBD_FREE_PTR(mrd);
1430 out_close:
1431         rc2 = llog_close(NULL, backup_llh);
1432         if (!rc)
1433                 rc = rc2;
1434 out_closel:
1435         rc2 = llog_close(NULL, orig_llh);
1436         if (!rc)
1437                 rc = rc2;
1438
1439 out_restore:
1440         if (rc) {
1441                 CERROR("%s: llog should be restored: rc = %d\n",
1442                        mgs->obd_name, rc);
1443                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1444                                   logname);
1445                 if (rc2 < 0)
1446                         CERROR("%s: can't restore backup %s: rc = %d\n",
1447                                mgs->obd_name, logname, rc2);
1448         }
1449
1450 out_free:
1451         OBD_FREE(backup, buf_size);
1452
1453 out_put:
1454         llog_ctxt_put(ctxt);
1455
1456         if (rc)
1457                 CERROR("%s: failed to replace log %s: rc = %d\n",
1458                        mgs->obd_name, logname, rc);
1459
1460         RETURN(rc);
1461 }
1462
1463 static int mgs_replace_nids_log(const struct lu_env *env,
1464                                 struct obd_device *obd,
1465                                 char *logname, char *devname, char *nids)
1466 {
1467         CDEBUG(D_MGS, "Replace NIDs for %s in %s\n", devname, logname);
1468         return mgs_replace_log(env, obd, logname, devname,
1469                                mgs_replace_nids_handler, nids);
1470 }
1471
1472 /**
1473  * Parse device name and get file system name and/or device index
1474  *
1475  * @devname     device name (ex. lustre-MDT0000)
1476  * @fsname      file system name extracted from @devname and returned
1477  *              to the caller (optional)
1478  * @index       device index extracted from @devname and returned to
1479  *              the caller (optional)
1480  *
1481  * RETURN       0                       success if we are only interested in
1482  *                                      extracting fsname from devname.
1483  *                                      i.e index is NULL
1484  *
1485  *              LDD_F_SV_TYPE_*         Besides extracting the fsname the
1486  *                                      user also wants the index. Report to
1487  *                                      the user the type of obd device the
1488  *                                      returned index belongs too.
1489  *
1490  *              -EINVAL                 The obd device name is improper so
1491  *                                      fsname could not be extracted.
1492  *
1493  *              -ENXIO                  Failed to extract the index out of
1494  *                                      the obd device name. Most likely an
1495  *                                      invalid obd device name
1496  */
1497 static int mgs_parse_devname(char *devname, char *fsname, u32 *index)
1498 {
1499         int rc = 0;
1500         ENTRY;
1501
1502         /* Extract fsname */
1503         if (fsname) {
1504                 rc = server_name2fsname(devname, fsname, NULL);
1505                 if (rc < 0) {
1506                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1507                                devname);
1508                         RETURN(-EINVAL);
1509                 }
1510         }
1511
1512         if (index) {
1513                 rc = server_name2index(devname, index, NULL);
1514                 if (rc < 0) {
1515                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1516                                devname);
1517                         RETURN(-ENXIO);
1518                 }
1519         }
1520
1521         /* server_name2index can return LDD_F_SV_TYPE_* so always return rc */
1522         RETURN(rc);
1523 }
1524
1525 /* This is only called during replace_nids */
1526 static int only_mgs_is_running(struct obd_device *mgs_obd)
1527 {
1528         /* TDB: Is global variable with devices count exists? */
1529         int num_devices = get_devices_count();
1530         int num_exports = 0;
1531         struct obd_export *exp;
1532
1533         spin_lock(&mgs_obd->obd_dev_lock);
1534         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1535                 /* skip self export */
1536                 if (exp == mgs_obd->obd_self_export)
1537                         continue;
1538
1539                 ++num_exports;
1540
1541                 if (num_exports > 1)
1542                         CERROR("%s: node %s still connected during replace_nids connect_flags:%llx\n",
1543                                mgs_obd->obd_name,
1544                                libcfs_nid2str(exp->exp_nid_stats->nid),
1545                                exp_connect_flags(exp));
1546         }
1547         spin_unlock(&mgs_obd->obd_dev_lock);
1548
1549         /* osd, MGS and MGC + MGC export (nosvc starts MGC)
1550          *  (wc -l /proc/fs/lustre/devices <= 3) && (non self exports == 1)
1551          */
1552         return (num_devices <= 3) && (num_exports <= 1);
1553 }
1554
1555 static int name_create_mdt(char **logname, char *fsname, int mdt_idx)
1556 {
1557         char postfix[9];
1558
1559         if (mdt_idx > INDEX_MAP_MAX_VALUE)
1560                 return -E2BIG;
1561
1562         snprintf(postfix, sizeof(postfix), "-MDT%04x", mdt_idx);
1563         return name_create(logname, fsname, postfix);
1564 }
1565
1566 /**
1567  * Replace nids for \a device to \a nids values
1568  *
1569  * \param obd           MGS obd device
1570  * \param devname       nids need to be replaced for this device
1571  * (ex. lustre-OST0000)
1572  * \param nids          nids list (ex. nid1,nid2,nid3)
1573  *
1574  * \retval 0    success
1575  */
1576 int mgs_replace_nids(const struct lu_env *env,
1577                      struct mgs_device *mgs,
1578                      char *devname, char *nids)
1579 {
1580         /* Assume fsname is part of device name */
1581         char fsname[MTI_NAME_MAXLEN];
1582         int rc;
1583         __u32 index;
1584         char *logname;
1585         struct fs_db *fsdb = NULL;
1586         unsigned int i;
1587         int conn_state;
1588         struct obd_device *mgs_obd = mgs->mgs_obd;
1589         ENTRY;
1590
1591         /* We can only change NIDs if no other nodes are connected */
1592         spin_lock(&mgs_obd->obd_dev_lock);
1593         conn_state = mgs_obd->obd_no_conn;
1594         mgs_obd->obd_no_conn = 1;
1595         spin_unlock(&mgs_obd->obd_dev_lock);
1596
1597         /* We can not change nids if not only MGS is started */
1598         if (!only_mgs_is_running(mgs_obd)) {
1599                 CERROR("Only MGS is allowed to be started\n");
1600                 GOTO(out, rc = -EINPROGRESS);
1601         }
1602
1603         /* Get fsname and index */
1604         rc = mgs_parse_devname(devname, fsname, &index);
1605         if (rc < 0)
1606                 GOTO(out, rc);
1607
1608         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1609         if (rc) {
1610                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1611                 GOTO(out, rc);
1612         }
1613
1614         /* Process client llogs */
1615         rc = name_create(&logname, fsname, "-client");
1616         if (rc)
1617                 GOTO(out, rc);
1618         rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1619         name_destroy(&logname);
1620         if (rc) {
1621                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1622                        fsname, devname, rc);
1623                 GOTO(out, rc);
1624         }
1625
1626         /* Process MDT llogs */
1627         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1628                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1629                         continue;
1630                 rc = name_create_mdt(&logname, fsname, i);
1631                 if (rc)
1632                         GOTO(out, rc);
1633                 rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1634                 name_destroy(&logname);
1635                 if (rc)
1636                         GOTO(out, rc);
1637         }
1638
1639 out:
1640         spin_lock(&mgs_obd->obd_dev_lock);
1641         mgs_obd->obd_no_conn = conn_state;
1642         spin_unlock(&mgs_obd->obd_dev_lock);
1643
1644         if (fsdb)
1645                 mgs_put_fsdb(mgs, fsdb);
1646
1647         RETURN(rc);
1648 }
1649
1650 /**
1651  * This is called for every record in llog. Some of records are
1652  * skipped, others are copied to new log as is.
1653  * Records to be skipped are
1654  *  marker records marked SKIP
1655  *  records enclosed between SKIP markers
1656  *
1657  * \param[in] llh       log to be processed
1658  * \param[in] rec       current record
1659  * \param[in] data      mgs_replace_data structure
1660  *
1661  * \retval 0    success
1662  **/
1663 static int mgs_clear_config_handler(const struct lu_env *env,
1664                                     struct llog_handle *llh,
1665                                     struct llog_rec_hdr *rec, void *data)
1666 {
1667         struct mgs_replace_data *mrd;
1668         struct lustre_cfg *lcfg = REC_DATA(rec);
1669         int cfg_len = REC_DATA_LEN(rec);
1670         int rc;
1671
1672         ENTRY;
1673
1674         mrd = (struct mgs_replace_data *)data;
1675
1676         if (rec->lrh_type != OBD_CFG_REC) {
1677                 CDEBUG(D_MGS, "Config llog Name=%s, Record Index=%u, "
1678                        "Unhandled Record Type=%#x\n", llh->lgh_name,
1679                        rec->lrh_index, rec->lrh_type);
1680                 RETURN(-EINVAL);
1681         }
1682
1683         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1684         if (rc) {
1685                 CDEBUG(D_MGS, "Config llog Name=%s, Invalid config file.",
1686                        llh->lgh_name);
1687                 RETURN(-EINVAL);
1688         }
1689
1690         if (lcfg->lcfg_command == LCFG_MARKER) {
1691                 struct cfg_marker *marker;
1692
1693                 marker = lustre_cfg_buf(lcfg, 1);
1694                 if (marker->cm_flags & CM_SKIP) {
1695                         if (marker->cm_flags & CM_START)
1696                                 mrd->state = REPLACE_SKIP;
1697                         if (marker->cm_flags & CM_END)
1698                                 mrd->state = REPLACE_COPY;
1699                         /* SKIP section started or finished */
1700                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1701                                "cmd %x %s %s\n", rec->lrh_index, rc,
1702                                rec->lrh_len, lcfg->lcfg_command,
1703                                lustre_cfg_string(lcfg, 0),
1704                                lustre_cfg_string(lcfg, 1));
1705                         RETURN(0);
1706                 }
1707         } else {
1708                 if (mrd->state == REPLACE_SKIP) {
1709                         /* record enclosed between SKIP markers, skip it */
1710                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1711                                "cmd %x %s %s\n", rec->lrh_index, rc,
1712                                rec->lrh_len, lcfg->lcfg_command,
1713                                lustre_cfg_string(lcfg, 0),
1714                                lustre_cfg_string(lcfg, 1));
1715                         RETURN(0);
1716                 }
1717         }
1718
1719         /* Record is placed in temporary llog as is */
1720         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1721
1722         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1723                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1724                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1725         RETURN(rc);
1726 }
1727
1728 /*
1729  * Directory CONFIGS/ may contain files which are not config logs to
1730  * be cleared. Skip any llogs with a non-alphanumeric character after
1731  * the last '-'. For example, fsname-MDT0000.sav, fsname-MDT0000.bak,
1732  * fsname-MDT0000.orig, fsname-MDT0000~, fsname-MDT0000.20150516, etc.
1733  */
1734 static bool config_to_clear(const char *logname)
1735 {
1736         int i;
1737         char *str;
1738
1739         str = strrchr(logname, '-');
1740         if (!str)
1741                 return 0;
1742
1743         i = 0;
1744         while (isalnum(str[++i]));
1745         return str[i] == '\0';
1746 }
1747
1748 /**
1749  * Clear config logs for \a name
1750  *
1751  * \param env
1752  * \param mgs           MGS device
1753  * \param name          name of device or of filesystem
1754  *                      (ex. lustre-OST0000 or lustre) in later case all logs
1755  *                      will be cleared
1756  *
1757  * \retval 0            success
1758  */
1759 int mgs_clear_configs(const struct lu_env *env,
1760                      struct mgs_device *mgs, const char *name)
1761 {
1762         struct list_head dentry_list;
1763         struct mgs_direntry *dirent, *n;
1764         char *namedash;
1765         int conn_state;
1766         struct obd_device *mgs_obd = mgs->mgs_obd;
1767         int rc;
1768
1769         ENTRY;
1770
1771         /* Prevent clients and servers from connecting to mgs */
1772         spin_lock(&mgs_obd->obd_dev_lock);
1773         conn_state = mgs_obd->obd_no_conn;
1774         mgs_obd->obd_no_conn = 1;
1775         spin_unlock(&mgs_obd->obd_dev_lock);
1776
1777         /*
1778          * config logs cannot be cleaned if anything other than
1779          * MGS is started
1780          */
1781         if (!only_mgs_is_running(mgs_obd)) {
1782                 CERROR("Only MGS is allowed to be started\n");
1783                 GOTO(out, rc = -EBUSY);
1784         }
1785
1786         /* Find all the logs in the CONFIGS directory */
1787         rc = class_dentry_readdir(env, mgs, &dentry_list);
1788         if (rc) {
1789                 CERROR("%s: cannot read config directory '%s': rc = %d\n",
1790                        mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
1791                 GOTO(out, rc);
1792         }
1793
1794         if (list_empty(&dentry_list)) {
1795                 CERROR("%s: list empty reading config dir '%s': rc = %d\n",
1796                         mgs_obd->obd_name, MOUNT_CONFIGS_DIR, -ENOENT);
1797                 GOTO(out, rc = -ENOENT);
1798         }
1799
1800         OBD_ALLOC(namedash, strlen(name) + 2);
1801         if (namedash == NULL)
1802                 GOTO(out, rc = -ENOMEM);
1803         snprintf(namedash, strlen(name) + 2, "%s-", name);
1804
1805         list_for_each_entry(dirent, &dentry_list, mde_list) {
1806                 if (strcmp(name, dirent->mde_name) &&
1807                     strncmp(namedash, dirent->mde_name, strlen(namedash)))
1808                         continue;
1809                 if (!config_to_clear(dirent->mde_name))
1810                         continue;
1811                 CDEBUG(D_MGS, "%s: Clear config log %s\n",
1812                        mgs_obd->obd_name, dirent->mde_name);
1813                 rc = mgs_replace_log(env, mgs_obd, dirent->mde_name, NULL,
1814                                      mgs_clear_config_handler, NULL);
1815                 if (rc)
1816                         break;
1817         }
1818
1819         list_for_each_entry_safe(dirent, n, &dentry_list, mde_list) {
1820                 list_del_init(&dirent->mde_list);
1821                 mgs_direntry_free(dirent);
1822         }
1823         OBD_FREE(namedash, strlen(name) + 2);
1824 out:
1825         spin_lock(&mgs_obd->obd_dev_lock);
1826         mgs_obd->obd_no_conn = conn_state;
1827         spin_unlock(&mgs_obd->obd_dev_lock);
1828
1829         RETURN(rc);
1830 }
1831
1832 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1833                             char *devname, struct lov_desc *desc)
1834 {
1835         struct mgs_thread_info  *mgi = mgs_env_info(env);
1836         struct llog_cfg_rec     *lcr;
1837         int rc;
1838
1839         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1840         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1841         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1842         if (lcr == NULL)
1843                 return -ENOMEM;
1844
1845         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1846         lustre_cfg_rec_free(lcr);
1847         return rc;
1848 }
1849
1850 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1851                             char *devname, struct lmv_desc *desc)
1852 {
1853         struct mgs_thread_info  *mgi = mgs_env_info(env);
1854         struct llog_cfg_rec     *lcr;
1855         int rc;
1856
1857         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1858         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1859         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1860         if (lcr == NULL)
1861                 return -ENOMEM;
1862
1863         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1864         lustre_cfg_rec_free(lcr);
1865         return rc;
1866 }
1867
1868 static inline int record_mdc_add(const struct lu_env *env,
1869                                  struct llog_handle *llh,
1870                                  char *logname, char *mdcuuid,
1871                                  char *mdtuuid, char *index,
1872                                  char *gen)
1873 {
1874         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1875                            mdtuuid,index,gen,mdcuuid);
1876 }
1877
1878 static inline int record_lov_add(const struct lu_env *env,
1879                                  struct llog_handle *llh,
1880                                  char *lov_name, char *ost_uuid,
1881                                  char *index, char *gen)
1882 {
1883         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1884                            ost_uuid, index, gen, NULL);
1885 }
1886
1887 static inline int record_mount_opt(const struct lu_env *env,
1888                                    struct llog_handle *llh,
1889                                    char *profile, char *lov_name,
1890                                    char *mdc_name)
1891 {
1892         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1893                            profile, lov_name, mdc_name, NULL);
1894 }
1895
1896 static int record_marker(const struct lu_env *env,
1897                          struct llog_handle *llh,
1898                          struct fs_db *fsdb, __u32 flags,
1899                          char *tgtname, char *comment)
1900 {
1901         struct mgs_thread_info *mgi = mgs_env_info(env);
1902         struct llog_cfg_rec *lcr;
1903         int rc;
1904         int cplen = 0;
1905
1906         if (flags & CM_START)
1907                 fsdb->fsdb_gen++;
1908         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1909         mgi->mgi_marker.cm_flags = flags;
1910         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1911         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1912                         sizeof(mgi->mgi_marker.cm_tgtname));
1913         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1914                 return -E2BIG;
1915         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1916                         sizeof(mgi->mgi_marker.cm_comment));
1917         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1918                 return -E2BIG;
1919         mgi->mgi_marker.cm_createtime = ktime_get_real_seconds();
1920         mgi->mgi_marker.cm_canceltime = 0;
1921         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1922         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1923                             sizeof(mgi->mgi_marker));
1924         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1925         if (lcr == NULL)
1926                 return -ENOMEM;
1927
1928         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1929         lustre_cfg_rec_free(lcr);
1930         return rc;
1931 }
1932
1933 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1934                             struct llog_handle **llh, char *name)
1935 {
1936         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1937         struct llog_ctxt        *ctxt;
1938         int                      rc = 0;
1939         ENTRY;
1940
1941         if (*llh)
1942                 GOTO(out, rc = -EBUSY);
1943
1944         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1945         if (!ctxt)
1946                 GOTO(out, rc = -ENODEV);
1947         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1948
1949         rc = llog_open_create(env, ctxt, llh, NULL, name);
1950         if (rc)
1951                 GOTO(out_ctxt, rc);
1952         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1953         if (rc)
1954                 llog_close(env, *llh);
1955 out_ctxt:
1956         llog_ctxt_put(ctxt);
1957 out:
1958         if (rc) {
1959                 CERROR("%s: can't start log %s: rc = %d\n",
1960                        mgs->mgs_obd->obd_name, name, rc);
1961                 *llh = NULL;
1962         }
1963         RETURN(rc);
1964 }
1965
1966 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1967 {
1968         int rc;
1969
1970         rc = llog_close(env, *llh);
1971         *llh = NULL;
1972
1973         return rc;
1974 }
1975
1976 /******************** config "macros" *********************/
1977
1978 /* write an lcfg directly into a log (with markers) */
1979 static int mgs_write_log_direct(const struct lu_env *env,
1980                                 struct mgs_device *mgs, struct fs_db *fsdb,
1981                                 char *logname, struct llog_cfg_rec *lcr,
1982                                 char *devname, char *comment)
1983 {
1984         struct llog_handle *llh = NULL;
1985         int rc;
1986
1987         ENTRY;
1988
1989         rc = record_start_log(env, mgs, &llh, logname);
1990         if (rc)
1991                 RETURN(rc);
1992
1993         /* FIXME These should be a single journal transaction */
1994         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1995         if (rc)
1996                 GOTO(out_end, rc);
1997         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1998         if (rc)
1999                 GOTO(out_end, rc);
2000         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
2001         if (rc)
2002                 GOTO(out_end, rc);
2003 out_end:
2004         record_end_log(env, &llh);
2005         RETURN(rc);
2006 }
2007
2008 /* write the lcfg in all logs for the given fs */
2009 static int mgs_write_log_direct_all(const struct lu_env *env,
2010                                     struct mgs_device *mgs,
2011                                     struct fs_db *fsdb,
2012                                     struct mgs_target_info *mti,
2013                                     struct llog_cfg_rec *lcr, char *devname,
2014                                     char *comment, int server_only)
2015 {
2016         struct list_head         log_list;
2017         struct mgs_direntry     *dirent, *n;
2018         char                    *fsname = mti->mti_fsname;
2019         int                      rc = 0, len = strlen(fsname);
2020
2021         ENTRY;
2022         /* Find all the logs in the CONFIGS directory */
2023         rc = class_dentry_readdir(env, mgs, &log_list);
2024         if (rc)
2025                 RETURN(rc);
2026
2027         /* Could use fsdb index maps instead of directory listing */
2028         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
2029                 list_del_init(&dirent->mde_list);
2030                 /* don't write to sptlrpc rule log */
2031                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
2032                         goto next;
2033
2034                 /* caller wants write server logs only */
2035                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
2036                         goto next;
2037
2038                 if (strlen(dirent->mde_name) <= len ||
2039                     strncmp(fsname, dirent->mde_name, len) != 0 ||
2040                     dirent->mde_name[len] != '-')
2041                         goto next;
2042
2043                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
2044                 /* Erase any old settings of this same parameter */
2045                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
2046                                 devname, comment, CM_SKIP);
2047                 if (rc < 0)
2048                         CERROR("%s: Can't modify llog %s: rc = %d\n",
2049                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2050                 if (lcr == NULL)
2051                         goto next;
2052                 /* Write the new one */
2053                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
2054                                           lcr, devname, comment);
2055                 if (rc != 0)
2056                         CERROR("%s: writing log %s: rc = %d\n",
2057                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2058 next:
2059                 mgs_direntry_free(dirent);
2060         }
2061
2062         RETURN(rc);
2063 }
2064
2065 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2066                                     struct mgs_device *mgs,
2067                                     struct fs_db *fsdb,
2068                                     struct mgs_target_info *mti,
2069                                     int index, char *logname);
2070 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2071                                     struct mgs_device *mgs,
2072                                     struct fs_db *fsdb,
2073                                     struct mgs_target_info *mti,
2074                                     char *logname, char *suffix, char *lovname,
2075                                     enum lustre_sec_part sec_part, int flags);
2076 static int name_create_mdt_and_lov(char **logname, char **lovname,
2077                                    struct fs_db *fsdb, int i);
2078
2079 static int add_param(char *params, char *key, char *val)
2080 {
2081         char *start = params + strlen(params);
2082         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
2083         int keylen = 0;
2084
2085         if (key != NULL)
2086                 keylen = strlen(key);
2087         if (start + 1 + keylen + strlen(val) >= end) {
2088                 CERROR("params are too long: %s %s%s\n",
2089                        params, key != NULL ? key : "", val);
2090                 return -EINVAL;
2091         }
2092
2093         sprintf(start, " %s%s", key != NULL ? key : "", val);
2094         return 0;
2095 }
2096
2097 /**
2098  * Walk through client config log record and convert the related records
2099  * into the target.
2100  **/
2101 static int mgs_steal_client_llog_handler(const struct lu_env *env,
2102                                          struct llog_handle *llh,
2103                                          struct llog_rec_hdr *rec, void *data)
2104 {
2105         struct mgs_device *mgs;
2106         struct obd_device *obd;
2107         struct mgs_target_info *mti, *tmti;
2108         struct fs_db *fsdb;
2109         int cfg_len = rec->lrh_len;
2110         char *cfg_buf = (char*) (rec + 1);
2111         struct lustre_cfg *lcfg;
2112         int rc = 0;
2113         struct llog_handle *mdt_llh = NULL;
2114         static int got_an_osc_or_mdc = 0;
2115         /* 0: not found any osc/mdc;
2116            1: found osc;
2117            2: found mdc;
2118         */
2119         static int last_step = -1;
2120         int cplen = 0;
2121
2122         ENTRY;
2123
2124         mti = ((struct temp_comp*)data)->comp_mti;
2125         tmti = ((struct temp_comp*)data)->comp_tmti;
2126         fsdb = ((struct temp_comp*)data)->comp_fsdb;
2127         obd = ((struct temp_comp *)data)->comp_obd;
2128         mgs = lu2mgs_dev(obd->obd_lu_dev);
2129         LASSERT(mgs);
2130
2131         if (rec->lrh_type != OBD_CFG_REC) {
2132                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2133                 RETURN(-EINVAL);
2134         }
2135
2136         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
2137         if (rc) {
2138                 CERROR("Insane cfg\n");
2139                 RETURN(rc);
2140         }
2141
2142         lcfg = (struct lustre_cfg *)cfg_buf;
2143
2144         if (lcfg->lcfg_command == LCFG_MARKER) {
2145                 struct cfg_marker *marker;
2146                 marker = lustre_cfg_buf(lcfg, 1);
2147                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2148                     (marker->cm_flags & CM_START) &&
2149                      !(marker->cm_flags & CM_SKIP)) {
2150                         got_an_osc_or_mdc = 1;
2151                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
2152                                         sizeof(tmti->mti_svname));
2153                         if (cplen >= sizeof(tmti->mti_svname))
2154                                 RETURN(-E2BIG);
2155                         rc = record_start_log(env, mgs, &mdt_llh,
2156                                               mti->mti_svname);
2157                         if (rc)
2158                                 RETURN(rc);
2159                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
2160                                            mti->mti_svname, "add osc(copied)");
2161                         record_end_log(env, &mdt_llh);
2162                         last_step = marker->cm_step;
2163                         RETURN(rc);
2164                 }
2165                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2166                     (marker->cm_flags & CM_END) &&
2167                      !(marker->cm_flags & CM_SKIP)) {
2168                         LASSERT(last_step == marker->cm_step);
2169                         last_step = -1;
2170                         got_an_osc_or_mdc = 0;
2171                         memset(tmti, 0, sizeof(*tmti));
2172                         rc = record_start_log(env, mgs, &mdt_llh,
2173                                               mti->mti_svname);
2174                         if (rc)
2175                                 RETURN(rc);
2176                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
2177                                            mti->mti_svname, "add osc(copied)");
2178                         record_end_log(env, &mdt_llh);
2179                         RETURN(rc);
2180                 }
2181                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2182                     (marker->cm_flags & CM_START) &&
2183                      !(marker->cm_flags & CM_SKIP)) {
2184                         got_an_osc_or_mdc = 2;
2185                         last_step = marker->cm_step;
2186                         memcpy(tmti->mti_svname, marker->cm_tgtname,
2187                                strlen(marker->cm_tgtname));
2188
2189                         RETURN(rc);
2190                 }
2191                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2192                     (marker->cm_flags & CM_END) &&
2193                      !(marker->cm_flags & CM_SKIP)) {
2194                         LASSERT(last_step == marker->cm_step);
2195                         last_step = -1;
2196                         got_an_osc_or_mdc = 0;
2197                         memset(tmti, 0, sizeof(*tmti));
2198                         RETURN(rc);
2199                 }
2200         }
2201
2202         if (got_an_osc_or_mdc == 0 || last_step < 0)
2203                 RETURN(rc);
2204
2205         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
2206                 __u64 nodenid = lcfg->lcfg_nid;
2207
2208                 if (strlen(tmti->mti_uuid) == 0) {
2209                         /* target uuid not set, this config record is before
2210                          * LCFG_SETUP, this nid is one of target node nid.
2211                          */
2212                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
2213                         tmti->mti_nid_count++;
2214                 } else {
2215                         char nidstr[LNET_NIDSTR_SIZE];
2216
2217                         /* failover node nid */
2218                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
2219                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
2220                                         nidstr);
2221                 }
2222
2223                 RETURN(rc);
2224         }
2225
2226         if (lcfg->lcfg_command == LCFG_SETUP) {
2227                 char *target;
2228
2229                 target = lustre_cfg_string(lcfg, 1);
2230                 memcpy(tmti->mti_uuid, target, strlen(target));
2231                 RETURN(rc);
2232         }
2233
2234         /* ignore client side sptlrpc_conf_log */
2235         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
2236                 RETURN(rc);
2237
2238         if (lcfg->lcfg_command == LCFG_ADD_MDC &&
2239             strstr(lustre_cfg_string(lcfg, 0), "-clilmv") != NULL) {
2240                 int index;
2241
2242                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
2243                         RETURN (-EINVAL);
2244                 if (index == mti->mti_stripe_index) {
2245                         CDEBUG(D_INFO,
2246                                "attempt to create MDT%04x->MDT%04x osp device\n",
2247                                index, index);
2248                         RETURN(0);
2249                 }
2250                 memcpy(tmti->mti_fsname, mti->mti_fsname,
2251                        strlen(mti->mti_fsname));
2252                 tmti->mti_stripe_index = index;
2253
2254                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
2255                                               mti->mti_stripe_index,
2256                                               mti->mti_svname);
2257                 memset(tmti, 0, sizeof(*tmti));
2258                 RETURN(rc);
2259         }
2260
2261         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
2262                 int index;
2263                 char mdt_index[9];
2264                 char *logname, *lovname;
2265
2266                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2267                                              mti->mti_stripe_index);
2268                 if (rc)
2269                         RETURN(rc);
2270                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
2271
2272                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
2273                         name_destroy(&logname);
2274                         name_destroy(&lovname);
2275                         RETURN(-EINVAL);
2276                 }
2277
2278                 tmti->mti_stripe_index = index;
2279                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
2280                                          mdt_index, lovname,
2281                                          LUSTRE_SP_MDT, 0);
2282                 name_destroy(&logname);
2283                 name_destroy(&lovname);
2284                 RETURN(rc);
2285         }
2286         RETURN(rc);
2287 }
2288
2289 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
2290 /* stealed from mgs_get_fsdb_from_llog*/
2291 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
2292                                               struct mgs_device *mgs,
2293                                               char *client_name,
2294                                               struct temp_comp* comp)
2295 {
2296         struct llog_handle *loghandle;
2297         struct mgs_target_info *tmti;
2298         struct llog_ctxt *ctxt;
2299         int rc;
2300
2301         ENTRY;
2302
2303         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2304         LASSERT(ctxt != NULL);
2305
2306         OBD_ALLOC_PTR(tmti);
2307         if (tmti == NULL)
2308                 GOTO(out_ctxt, rc = -ENOMEM);
2309
2310         comp->comp_tmti = tmti;
2311         comp->comp_obd = mgs->mgs_obd;
2312
2313         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
2314                        LLOG_OPEN_EXISTS);
2315         if (rc < 0) {
2316                 if (rc == -ENOENT)
2317                         rc = 0;
2318                 GOTO(out_pop, rc);
2319         }
2320
2321         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
2322         if (rc)
2323                 GOTO(out_close, rc);
2324
2325         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
2326                                   (void *)comp, NULL, false);
2327         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
2328 out_close:
2329         llog_close(env, loghandle);
2330 out_pop:
2331         OBD_FREE_PTR(tmti);
2332 out_ctxt:
2333         llog_ctxt_put(ctxt);
2334         RETURN(rc);
2335 }
2336
2337 /* lmv is the second thing for client logs */
2338 /* copied from mgs_write_log_lov. Please refer to that.  */
2339 static int mgs_write_log_lmv(const struct lu_env *env,
2340                              struct mgs_device *mgs,
2341                              struct fs_db *fsdb,
2342                              struct mgs_target_info *mti,
2343                              char *logname, char *lmvname)
2344 {
2345         struct llog_handle *llh = NULL;
2346         struct lmv_desc *lmvdesc;
2347         char *uuid;
2348         int rc = 0;
2349         ENTRY;
2350
2351         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
2352
2353         OBD_ALLOC_PTR(lmvdesc);
2354         if (lmvdesc == NULL)
2355                 RETURN(-ENOMEM);
2356         lmvdesc->ld_active_tgt_count = 0;
2357         lmvdesc->ld_tgt_count = 0;
2358         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
2359         uuid = (char *)lmvdesc->ld_uuid.uuid;
2360
2361         rc = record_start_log(env, mgs, &llh, logname);
2362         if (rc)
2363                 GOTO(out_free, rc);
2364         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
2365         if (rc)
2366                 GOTO(out_end, rc);
2367         rc = record_attach(env, llh, lmvname, "lmv", uuid);
2368         if (rc)
2369                 GOTO(out_end, rc);
2370         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
2371         if (rc)
2372                 GOTO(out_end, rc);
2373         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
2374         if (rc)
2375                 GOTO(out_end, rc);
2376 out_end:
2377         record_end_log(env, &llh);
2378 out_free:
2379         OBD_FREE_PTR(lmvdesc);
2380         RETURN(rc);
2381 }
2382
2383 /* lov is the first thing in the mdt and client logs */
2384 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
2385                              struct fs_db *fsdb, struct mgs_target_info *mti,
2386                              char *logname, char *lovname)
2387 {
2388         struct llog_handle *llh = NULL;
2389         struct lov_desc *lovdesc;
2390         char *uuid;
2391         int rc = 0;
2392         ENTRY;
2393
2394         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
2395
2396         /*
2397         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
2398         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
2399               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
2400         */
2401
2402         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
2403         OBD_ALLOC_PTR(lovdesc);
2404         if (lovdesc == NULL)
2405                 RETURN(-ENOMEM);
2406         lovdesc->ld_magic = LOV_DESC_MAGIC;
2407         lovdesc->ld_tgt_count = 0;
2408         /* Defaults.  Can be changed later by lcfg config_param */
2409         lovdesc->ld_default_stripe_count = 1;
2410         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
2411         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
2412         lovdesc->ld_default_stripe_offset = -1;
2413         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
2414         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
2415         /* can these be the same? */
2416         uuid = (char *)lovdesc->ld_uuid.uuid;
2417
2418         /* This should always be the first entry in a log.
2419         rc = mgs_clear_log(obd, logname); */
2420         rc = record_start_log(env, mgs, &llh, logname);
2421         if (rc)
2422                 GOTO(out_free, rc);
2423         /* FIXME these should be a single journal transaction */
2424         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
2425         if (rc)
2426                 GOTO(out_end, rc);
2427         rc = record_attach(env, llh, lovname, "lov", uuid);
2428         if (rc)
2429                 GOTO(out_end, rc);
2430         rc = record_lov_setup(env, llh, lovname, lovdesc);
2431         if (rc)
2432                 GOTO(out_end, rc);
2433         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
2434         if (rc)
2435                 GOTO(out_end, rc);
2436         EXIT;
2437 out_end:
2438         record_end_log(env, &llh);
2439 out_free:
2440         OBD_FREE_PTR(lovdesc);
2441         return rc;
2442 }
2443
2444 /* add failnids to open log */
2445 static int mgs_write_log_failnids(const struct lu_env *env,
2446                                   struct mgs_target_info *mti,
2447                                   struct llog_handle *llh,
2448                                   char *cliname)
2449 {
2450         char *failnodeuuid = NULL;
2451         char *ptr = mti->mti_params;
2452         lnet_nid_t nid;
2453         int rc = 0;
2454
2455         /*
2456         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
2457         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
2458         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
2459         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
2460         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
2461         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
2462         */
2463
2464         /*
2465          * Pull failnid info out of params string, which may contain something
2466          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
2467          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
2468          * etc.  However, convert_hostnames() should have caught those.
2469          */
2470         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2471                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2472                         char nidstr[LNET_NIDSTR_SIZE];
2473
2474                         if (failnodeuuid == NULL) {
2475                                 /* We don't know the failover node name,
2476                                  * so just use the first nid as the uuid */
2477                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
2478                                 rc = name_create(&failnodeuuid, nidstr, "");
2479                                 if (rc != 0)
2480                                         return rc;
2481                         }
2482                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2483                                 "client %s\n",
2484                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
2485                                 failnodeuuid, cliname);
2486                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2487                         /*
2488                          * If *ptr is ':', we have added all NIDs for
2489                          * failnodeuuid.
2490                          */
2491                         if (*ptr == ':') {
2492                                 rc = record_add_conn(env, llh, cliname,
2493                                                      failnodeuuid);
2494                                 name_destroy(&failnodeuuid);
2495                                 failnodeuuid = NULL;
2496                         }
2497                 }
2498                 if (failnodeuuid) {
2499                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2500                         name_destroy(&failnodeuuid);
2501                         failnodeuuid = NULL;
2502                 }
2503         }
2504
2505         return rc;
2506 }
2507
2508 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2509                                     struct mgs_device *mgs,
2510                                     struct fs_db *fsdb,
2511                                     struct mgs_target_info *mti,
2512                                     char *logname, char *lmvname)
2513 {
2514         struct llog_handle *llh = NULL;
2515         char *mdcname = NULL;
2516         char *nodeuuid = NULL;
2517         char *mdcuuid = NULL;
2518         char *lmvuuid = NULL;
2519         char index[6];
2520         char nidstr[LNET_NIDSTR_SIZE];
2521         int i, rc;
2522         ENTRY;
2523
2524         if (mgs_log_is_empty(env, mgs, logname)) {
2525                 CERROR("log is empty! Logical error\n");
2526                 RETURN(-EINVAL);
2527         }
2528
2529         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2530                mti->mti_svname, logname, lmvname);
2531
2532         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2533         rc = name_create(&nodeuuid, nidstr, "");
2534         if (rc)
2535                 RETURN(rc);
2536         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2537         if (rc)
2538                 GOTO(out_free, rc);
2539         rc = name_create(&mdcuuid, mdcname, "_UUID");
2540         if (rc)
2541                 GOTO(out_free, rc);
2542         rc = name_create(&lmvuuid, lmvname, "_UUID");
2543         if (rc)
2544                 GOTO(out_free, rc);
2545
2546         rc = record_start_log(env, mgs, &llh, logname);
2547         if (rc)
2548                 GOTO(out_free, rc);
2549         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2550                            "add mdc");
2551         if (rc)
2552                 GOTO(out_end, rc);
2553         for (i = 0; i < mti->mti_nid_count; i++) {
2554                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2555                         libcfs_nid2str_r(mti->mti_nids[i],
2556                                          nidstr, sizeof(nidstr)));
2557
2558                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2559                 if (rc)
2560                         GOTO(out_end, rc);
2561         }
2562
2563         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2564         if (rc)
2565                 GOTO(out_end, rc);
2566         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2567                           NULL, NULL);
2568         if (rc)
2569                 GOTO(out_end, rc);
2570         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2571         if (rc)
2572                 GOTO(out_end, rc);
2573         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2574         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2575                             index, "1");
2576         if (rc)
2577                 GOTO(out_end, rc);
2578         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2579                            "add mdc");
2580         if (rc)
2581                 GOTO(out_end, rc);
2582 out_end:
2583         record_end_log(env, &llh);
2584 out_free:
2585         name_destroy(&lmvuuid);
2586         name_destroy(&mdcuuid);
2587         name_destroy(&mdcname);
2588         name_destroy(&nodeuuid);
2589         RETURN(rc);
2590 }
2591
2592 static inline int name_create_lov(char **lovname, char *mdtname,
2593                                   struct fs_db *fsdb, int index)
2594 {
2595         /* COMPAT_180 */
2596         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2597                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2598         else
2599                 return name_create(lovname, mdtname, "-mdtlov");
2600 }
2601
2602 static int name_create_mdt_and_lov(char **logname, char **lovname,
2603                                    struct fs_db *fsdb, int i)
2604 {
2605         int rc;
2606
2607         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2608         if (rc)
2609                 return rc;
2610         /* COMPAT_180 */
2611         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2612                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2613         else
2614                 rc = name_create(lovname, *logname, "-mdtlov");
2615         if (rc) {
2616                 name_destroy(logname);
2617                 *logname = NULL;
2618         }
2619         return rc;
2620 }
2621
2622 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2623                                       struct fs_db *fsdb, int i)
2624 {
2625         char suffix[16];
2626
2627         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2628                 sprintf(suffix, "-osc");
2629         else
2630                 sprintf(suffix, "-osc-MDT%04x", i);
2631         return name_create(oscname, ostname, suffix);
2632 }
2633
2634 /* add new mdc to already existent MDS */
2635 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2636                                     struct mgs_device *mgs,
2637                                     struct fs_db *fsdb,
2638                                     struct mgs_target_info *mti,
2639                                     int mdt_index, char *logname)
2640 {
2641         struct llog_handle      *llh = NULL;
2642         char    *nodeuuid = NULL;
2643         char    *ospname = NULL;
2644         char    *lovuuid = NULL;
2645         char    *mdtuuid = NULL;
2646         char    *svname = NULL;
2647         char    *mdtname = NULL;
2648         char    *lovname = NULL;
2649         char    index_str[16];
2650         char    nidstr[LNET_NIDSTR_SIZE];
2651         int     i, rc;
2652
2653         ENTRY;
2654         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2655                 CERROR("log is empty! Logical error\n");
2656                 RETURN (-EINVAL);
2657         }
2658
2659         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2660                logname);
2661
2662         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2663         if (rc)
2664                 RETURN(rc);
2665
2666         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2667         rc = name_create(&nodeuuid, nidstr, "");
2668         if (rc)
2669                 GOTO(out_destory, rc);
2670
2671         rc = name_create(&svname, mdtname, "-osp");
2672         if (rc)
2673                 GOTO(out_destory, rc);
2674
2675         sprintf(index_str, "-MDT%04x", mdt_index);
2676         rc = name_create(&ospname, svname, index_str);
2677         if (rc)
2678                 GOTO(out_destory, rc);
2679
2680         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2681         if (rc)
2682                 GOTO(out_destory, rc);
2683
2684         rc = name_create(&lovuuid, lovname, "_UUID");
2685         if (rc)
2686                 GOTO(out_destory, rc);
2687
2688         rc = name_create(&mdtuuid, mdtname, "_UUID");
2689         if (rc)
2690                 GOTO(out_destory, rc);
2691
2692         rc = record_start_log(env, mgs, &llh, logname);
2693         if (rc)
2694                 GOTO(out_destory, rc);
2695
2696         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2697                            "add osp");
2698         if (rc)
2699                 GOTO(out_destory, rc);
2700
2701         for (i = 0; i < mti->mti_nid_count; i++) {
2702                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2703                         libcfs_nid2str_r(mti->mti_nids[i],
2704                                          nidstr, sizeof(nidstr)));
2705                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2706                 if (rc)
2707                         GOTO(out_end, rc);
2708         }
2709
2710         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2711         if (rc)
2712                 GOTO(out_end, rc);
2713
2714         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2715                           NULL, NULL);
2716         if (rc)
2717                 GOTO(out_end, rc);
2718
2719         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2720         if (rc)
2721                 GOTO(out_end, rc);
2722
2723         /* Add mdc(osp) to lod */
2724         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2725         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2726                          index_str, "1", NULL);
2727         if (rc)
2728                 GOTO(out_end, rc);
2729
2730         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2731         if (rc)
2732                 GOTO(out_end, rc);
2733
2734 out_end:
2735         record_end_log(env, &llh);
2736
2737 out_destory:
2738         name_destroy(&mdtuuid);
2739         name_destroy(&lovuuid);
2740         name_destroy(&lovname);
2741         name_destroy(&ospname);
2742         name_destroy(&svname);
2743         name_destroy(&nodeuuid);
2744         name_destroy(&mdtname);
2745         RETURN(rc);
2746 }
2747
2748 static int mgs_write_log_mdt0(const struct lu_env *env,
2749                               struct mgs_device *mgs,
2750                               struct fs_db *fsdb,
2751                               struct mgs_target_info *mti)
2752 {
2753         char *log = mti->mti_svname;
2754         struct llog_handle *llh = NULL;
2755         struct obd_uuid *uuid;
2756         char *lovname;
2757         char mdt_index[6];
2758         char *ptr = mti->mti_params;
2759         int rc = 0, failout = 0;
2760         ENTRY;
2761
2762         OBD_ALLOC_PTR(uuid);
2763         if (uuid == NULL)
2764                 RETURN(-ENOMEM);
2765
2766         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2767                 failout = (strncmp(ptr, "failout", 7) == 0);
2768
2769         rc = name_create(&lovname, log, "-mdtlov");
2770         if (rc)
2771                 GOTO(out_free, rc);
2772         if (mgs_log_is_empty(env, mgs, log)) {
2773                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2774                 if (rc)
2775                         GOTO(out_lod, rc);
2776         }
2777
2778         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2779
2780         rc = record_start_log(env, mgs, &llh, log);
2781         if (rc)
2782                 GOTO(out_lod, rc);
2783
2784         /* add MDT itself */
2785
2786         /* FIXME this whole fn should be a single journal transaction */
2787         sprintf(uuid->uuid, "%s_UUID", log);
2788         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2789         if (rc)
2790                 GOTO(out_lod, rc);
2791         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid->uuid);
2792         if (rc)
2793                 GOTO(out_end, rc);
2794         rc = record_mount_opt(env, llh, log, lovname, NULL);
2795         if (rc)
2796                 GOTO(out_end, rc);
2797         rc = record_setup(env, llh, log, uuid->uuid, mdt_index, lovname,
2798                         failout ? "n" : "f");
2799         if (rc)
2800                 GOTO(out_end, rc);
2801         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2802         if (rc)
2803                 GOTO(out_end, rc);
2804 out_end:
2805         record_end_log(env, &llh);
2806 out_lod:
2807         name_destroy(&lovname);
2808 out_free:
2809         OBD_FREE_PTR(uuid);
2810         RETURN(rc);
2811 }
2812
2813 /* envelope method for all layers log */
2814 static int mgs_write_log_mdt(const struct lu_env *env,
2815                              struct mgs_device *mgs,
2816                              struct fs_db *fsdb,
2817                              struct mgs_target_info *mti)
2818 {
2819         struct mgs_thread_info *mgi = mgs_env_info(env);
2820         struct llog_handle *llh = NULL;
2821         char *cliname;
2822         int rc, i = 0;
2823         ENTRY;
2824
2825         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2826
2827         if (mti->mti_uuid[0] == '\0') {
2828                 /* Make up our own uuid */
2829                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2830                          "%s_UUID", mti->mti_svname);
2831         }
2832
2833         /* add mdt */
2834         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2835         if (rc)
2836                 RETURN(rc);
2837         /* Append the mdt info to the client log */
2838         rc = name_create(&cliname, mti->mti_fsname, "-client");
2839         if (rc)
2840                 RETURN(rc);
2841
2842         if (mgs_log_is_empty(env, mgs, cliname)) {
2843                 /* Start client log */
2844                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2845                                        fsdb->fsdb_clilov);
2846                 if (rc)
2847                         GOTO(out_free, rc);
2848                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2849                                        fsdb->fsdb_clilmv);
2850                 if (rc)
2851                         GOTO(out_free, rc);
2852         }
2853
2854         /*
2855         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2856         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2857         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2858         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2859         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2860         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2861         */
2862
2863         /* copy client info about lov/lmv */
2864         mgi->mgi_comp.comp_mti = mti;
2865         mgi->mgi_comp.comp_fsdb = fsdb;
2866
2867         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2868                                                 &mgi->mgi_comp);
2869         if (rc)
2870                 GOTO(out_free, rc);
2871         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2872                                       fsdb->fsdb_clilmv);
2873         if (rc)
2874                 GOTO(out_free, rc);
2875
2876         /* add mountopts */
2877         rc = record_start_log(env, mgs, &llh, cliname);
2878         if (rc)
2879                 GOTO(out_free, rc);
2880
2881         rc = record_marker(env, llh, fsdb, CM_START, cliname, "mount opts");
2882         if (rc)
2883                 GOTO(out_end, rc);
2884         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2885                               fsdb->fsdb_clilmv);
2886         if (rc)
2887                 GOTO(out_end, rc);
2888         rc = record_marker(env, llh, fsdb, CM_END, cliname, "mount opts");
2889         if (rc)
2890                 GOTO(out_end, rc);
2891
2892         /* for_all_existing_mdt except current one */
2893         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2894                 if (i !=  mti->mti_stripe_index &&
2895                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2896                         char *logname;
2897
2898                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2899                         if (rc)
2900                                 GOTO(out_end, rc);
2901
2902                         /* NB: If the log for the MDT is empty, it means
2903                          * the MDT is only added to the index
2904                          * map, and not being process yet, i.e. this
2905                          * is an unregistered MDT, see mgs_write_log_target().
2906                          * so we should skip it. Otherwise
2907                          *
2908                          * 1. MGS get register request for MDT1 and MDT2.
2909                          *
2910                          * 2. Then both MDT1 and MDT2 are added into
2911                          * fsdb_mdt_index_map. (see mgs_set_index()).
2912                          *
2913                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2914                          * generate the config log, here, it will regard MDT2
2915                          * as an existent MDT, and generate "add osp" for
2916                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2917                          * MDT0002 config log is still empty, so it will
2918                          * add "add osp" even before "lov setup", which
2919                          * will definitly cause trouble.
2920                          *
2921                          * 4. MDT1 registeration finished, fsdb_mutex is
2922                          * released, then MDT2 get in, then in above
2923                          * mgs_steal_llog_for_mdt_from_client(), it will
2924                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2925                          * which will cause another trouble.*/
2926                         if (!mgs_log_is_empty(env, mgs, logname))
2927                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2928                                                               mti, i, logname);
2929
2930                         name_destroy(&logname);
2931                         if (rc)
2932                                 GOTO(out_end, rc);
2933                 }
2934         }
2935 out_end:
2936         record_end_log(env, &llh);
2937 out_free:
2938         name_destroy(&cliname);
2939         RETURN(rc);
2940 }
2941
2942 /* Add the ost info to the client/mdt lov */
2943 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2944                                     struct mgs_device *mgs, struct fs_db *fsdb,
2945                                     struct mgs_target_info *mti,
2946                                     char *logname, char *suffix, char *lovname,
2947                                     enum lustre_sec_part sec_part, int flags)
2948 {
2949         struct llog_handle *llh = NULL;
2950         char *nodeuuid = NULL;
2951         char *oscname = NULL;
2952         char *oscuuid = NULL;
2953         char *lovuuid = NULL;
2954         char *svname = NULL;
2955         char index[6];
2956         char nidstr[LNET_NIDSTR_SIZE];
2957         int i, rc;
2958         ENTRY;
2959
2960         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2961                 mti->mti_svname, logname);
2962
2963         if (mgs_log_is_empty(env, mgs, logname)) {
2964                 CERROR("log is empty! Logical error\n");
2965                 RETURN(-EINVAL);
2966         }
2967
2968         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2969         rc = name_create(&nodeuuid, nidstr, "");
2970         if (rc)
2971                 RETURN(rc);
2972         rc = name_create(&svname, mti->mti_svname, "-osc");
2973         if (rc)
2974                 GOTO(out_free, rc);
2975
2976         /* for the system upgraded from old 1.8, keep using the old osc naming
2977          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2978         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2979                 rc = name_create(&oscname, svname, "");
2980         else
2981                 rc = name_create(&oscname, svname, suffix);
2982         if (rc)
2983                 GOTO(out_free, rc);
2984
2985         rc = name_create(&oscuuid, oscname, "_UUID");
2986         if (rc)
2987                 GOTO(out_free, rc);
2988         rc = name_create(&lovuuid, lovname, "_UUID");
2989         if (rc)
2990                 GOTO(out_free, rc);
2991
2992
2993         /*
2994         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2995         multihomed (#4)
2996         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2997         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2998         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2999         failover (#6,7)
3000         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
3001         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
3002         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
3003         */
3004
3005         rc = record_start_log(env, mgs, &llh, logname);
3006         if (rc)
3007                 GOTO(out_free, rc);
3008
3009         /* FIXME these should be a single journal transaction */
3010         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
3011                            "add osc");
3012         if (rc)
3013                 GOTO(out_end, rc);
3014
3015         /* NB: don't change record order, because upon MDT steal OSC config
3016          * from client, it treats all nids before LCFG_SETUP as target nids
3017          * (multiple interfaces), while nids after as failover node nids.
3018          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
3019          */
3020         for (i = 0; i < mti->mti_nid_count; i++) {
3021                 CDEBUG(D_MGS, "add nid %s\n",
3022                         libcfs_nid2str_r(mti->mti_nids[i],
3023                                          nidstr, sizeof(nidstr)));
3024                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
3025                 if (rc)
3026                         GOTO(out_end, rc);
3027         }
3028         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
3029         if (rc)
3030                 GOTO(out_end, rc);
3031         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
3032                           NULL, NULL);
3033         if (rc)
3034                 GOTO(out_end, rc);
3035         rc = mgs_write_log_failnids(env, mti, llh, oscname);
3036         if (rc)
3037                 GOTO(out_end, rc);
3038
3039         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
3040
3041         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
3042         if (rc)
3043                 GOTO(out_end, rc);
3044         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
3045                            "add osc");
3046         if (rc)
3047                 GOTO(out_end, rc);
3048 out_end:
3049         record_end_log(env, &llh);
3050 out_free:
3051         name_destroy(&lovuuid);
3052         name_destroy(&oscuuid);
3053         name_destroy(&oscname);
3054         name_destroy(&svname);
3055         name_destroy(&nodeuuid);
3056         RETURN(rc);
3057 }
3058
3059 static int mgs_write_log_ost(const struct lu_env *env,
3060                              struct mgs_device *mgs, struct fs_db *fsdb,
3061                              struct mgs_target_info *mti)
3062 {
3063         struct llog_handle *llh = NULL;
3064         char *logname, *lovname;
3065         char *ptr = mti->mti_params;
3066         int rc, flags = 0, failout = 0, i;
3067         ENTRY;
3068
3069         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
3070
3071         /* The ost startup log */
3072
3073         /* If the ost log already exists, that means that someone reformatted
3074            the ost and it called target_add again. */
3075         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3076                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
3077                                    "exists, yet the server claims it never "
3078                                    "registered. It may have been reformatted, "
3079                                    "or the index changed. writeconf the MDT to "
3080                                    "regenerate all logs.\n", mti->mti_svname);
3081                 RETURN(-EALREADY);
3082         }
3083
3084         /*
3085         attach obdfilter ost1 ost1_UUID
3086         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
3087         */
3088         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
3089                 failout = (strncmp(ptr, "failout", 7) == 0);
3090         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
3091         if (rc)
3092                 RETURN(rc);
3093         /* FIXME these should be a single journal transaction */
3094         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
3095         if (rc)
3096                 GOTO(out_end, rc);
3097         if (*mti->mti_uuid == '\0')
3098                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
3099                          "%s_UUID", mti->mti_svname);
3100         rc = record_attach(env, llh, mti->mti_svname,
3101                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
3102         if (rc)
3103                 GOTO(out_end, rc);
3104         rc = record_setup(env, llh, mti->mti_svname,
3105                           "dev"/*ignored*/, "type"/*ignored*/,
3106                           failout ? "n" : "f", NULL/*options*/);
3107         if (rc)
3108                 GOTO(out_end, rc);
3109         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
3110         if (rc)
3111                 GOTO(out_end, rc);
3112 out_end:
3113         record_end_log(env, &llh);
3114         if (rc)
3115                 RETURN(rc);
3116         /* We also have to update the other logs where this osc is part of
3117            the lov */
3118
3119         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3120                 /* If we're upgrading, the old mdt log already has our
3121                    entry. Let's do a fake one for fun. */
3122                 /* Note that we can't add any new failnids, since we don't
3123                    know the old osc names. */
3124                 flags = CM_SKIP | CM_UPGRADE146;
3125
3126         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
3127                 /* If the update flag isn't set, don't update client/mdt
3128                    logs. */
3129                 flags |= CM_SKIP;
3130                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
3131                               "the MDT first to regenerate it.\n",
3132                               mti->mti_svname);
3133         }
3134
3135         /* Add ost to all MDT lov defs */
3136         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3137                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
3138                         char mdt_index[13];
3139
3140                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
3141                                                      i);
3142                         if (rc)
3143                                 RETURN(rc);
3144
3145                         snprintf(mdt_index, sizeof(mdt_index), "-MDT%04x", i);
3146                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
3147                                                       logname, mdt_index,
3148                                                       lovname, LUSTRE_SP_MDT,
3149                                                       flags);
3150                         name_destroy(&logname);
3151                         name_destroy(&lovname);
3152                         if (rc)
3153                                 RETURN(rc);
3154                 }
3155         }
3156
3157         /* Append ost info to the client log */
3158         rc = name_create(&logname, mti->mti_fsname, "-client");
3159         if (rc)
3160                 RETURN(rc);
3161         if (mgs_log_is_empty(env, mgs, logname)) {
3162                 /* Start client log */
3163                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
3164                                        fsdb->fsdb_clilov);
3165                 if (rc)
3166                         GOTO(out_free, rc);
3167                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
3168                                        fsdb->fsdb_clilmv);
3169                 if (rc)
3170                         GOTO(out_free, rc);
3171         }
3172         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
3173                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
3174 out_free:
3175         name_destroy(&logname);
3176         RETURN(rc);
3177 }
3178
3179 static __inline__ int mgs_param_empty(char *ptr)
3180 {
3181         char *tmp;
3182
3183         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
3184                 return 1;
3185         return 0;
3186 }
3187
3188 static int mgs_write_log_failnid_internal(const struct lu_env *env,
3189                                           struct mgs_device *mgs,
3190                                           struct fs_db *fsdb,
3191                                           struct mgs_target_info *mti,
3192                                           char *logname, char *cliname)
3193 {
3194         int rc;
3195         struct llog_handle *llh = NULL;
3196
3197         if (mgs_param_empty(mti->mti_params)) {
3198                 /* Remove _all_ failnids */
3199                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3200                                 mti->mti_svname, "add failnid", CM_SKIP);
3201                 return rc < 0 ? rc : 0;
3202         }
3203
3204         /* Otherwise failover nids are additive */
3205         rc = record_start_log(env, mgs, &llh, logname);
3206         if (rc)
3207                 return rc;
3208                 /* FIXME this should be a single journal transaction */
3209         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
3210                            "add failnid");
3211         if (rc)
3212                 goto out_end;
3213         rc = mgs_write_log_failnids(env, mti, llh, cliname);
3214         if (rc)
3215                 goto out_end;
3216         rc = record_marker(env, llh, fsdb, CM_END,
3217                            mti->mti_svname, "add failnid");
3218 out_end:
3219         record_end_log(env, &llh);
3220         return rc;
3221 }
3222
3223
3224 /* Add additional failnids to an existing log.
3225    The mdc/osc must have been added to logs first */
3226 /* tcp nids must be in dotted-quad ascii -
3227    we can't resolve hostnames from the kernel. */
3228 static int mgs_write_log_add_failnid(const struct lu_env *env,
3229                                      struct mgs_device *mgs,
3230                                      struct fs_db *fsdb,
3231                                      struct mgs_target_info *mti)
3232 {
3233         char *logname, *cliname;
3234         int rc;
3235         ENTRY;
3236
3237         /* FIXME we currently can't erase the failnids
3238          * given when a target first registers, since they aren't part of
3239          * an "add uuid" stanza
3240          */
3241
3242         /* Verify that we know about this target */
3243         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3244                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
3245                                    "yet. It must be started before failnids "
3246                                    "can be added.\n", mti->mti_svname);
3247                 RETURN(-ENOENT);
3248         }
3249
3250         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
3251         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3252                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
3253         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3254                 rc = name_create(&cliname, mti->mti_svname, "-osc");
3255         } else {
3256                 RETURN(-EINVAL);
3257         }
3258         if (rc)
3259                 RETURN(rc);
3260
3261         /* Add failover nids to the client log */
3262         rc = name_create(&logname, mti->mti_fsname, "-client");
3263         if (rc) {
3264                 name_destroy(&cliname);
3265                 RETURN(rc);
3266         }
3267
3268         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
3269         name_destroy(&logname);
3270         name_destroy(&cliname);
3271         if (rc)
3272                 RETURN(rc);
3273
3274         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3275                 /* Add OST failover nids to the MDT logs as well */
3276                 int i;
3277
3278                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3279                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3280                                 continue;
3281                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3282                         if (rc)
3283                                 RETURN(rc);
3284                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
3285                                                  fsdb, i);
3286                         if (rc) {
3287                                 name_destroy(&logname);
3288                                 RETURN(rc);
3289                         }
3290                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
3291                                                             mti, logname,
3292                                                             cliname);
3293                         name_destroy(&cliname);
3294                         name_destroy(&logname);
3295                         if (rc)
3296                                 RETURN(rc);
3297                 }
3298         }
3299
3300         RETURN(rc);
3301 }
3302
3303 static int mgs_wlp_lcfg(const struct lu_env *env,
3304                         struct mgs_device *mgs, struct fs_db *fsdb,
3305                         struct mgs_target_info *mti,
3306                         char *logname, struct lustre_cfg_bufs *bufs,
3307                         char *tgtname, char *ptr)
3308 {
3309         char comment[MTI_NAME_MAXLEN];
3310         char *tmp;
3311         struct llog_cfg_rec *lcr;
3312         int rc, del;
3313
3314         /* Erase any old settings of this same parameter */
3315         strlcpy(comment, ptr, sizeof(comment));
3316         /* But don't try to match the value. */
3317         tmp = strchr(comment, '=');
3318         if (tmp != NULL)
3319                 *tmp = 0;
3320         /* FIXME we should skip settings that are the same as old values */
3321         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
3322         if (rc < 0)
3323                 return rc;
3324         del = mgs_param_empty(ptr);
3325
3326         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
3327                       "Setting" : "Modifying", tgtname, comment, logname);
3328         if (del) {
3329                 /* mgs_modify() will return 1 if nothing had to be done */
3330                 if (rc == 1)
3331                         rc = 0;
3332                 return rc;
3333         }
3334
3335         lustre_cfg_bufs_reset(bufs, tgtname);
3336         lustre_cfg_bufs_set_string(bufs, 1, ptr);
3337         if (mti->mti_flags & LDD_F_PARAM2)
3338                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
3339
3340         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
3341                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
3342         if (lcr == NULL)
3343                 return -ENOMEM;
3344
3345         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
3346                                   comment);
3347         lustre_cfg_rec_free(lcr);
3348         return rc;
3349 }
3350
3351 /* write global variable settings into log */
3352 static int mgs_write_log_sys(const struct lu_env *env,
3353                              struct mgs_device *mgs, struct fs_db *fsdb,
3354                              struct mgs_target_info *mti, char *sys, char *ptr)
3355 {
3356         struct mgs_thread_info  *mgi = mgs_env_info(env);
3357         struct lustre_cfg       *lcfg;
3358         struct llog_cfg_rec     *lcr;
3359         char *tmp, sep;
3360         int rc, cmd, convert = 1;
3361
3362         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
3363                 cmd = LCFG_SET_TIMEOUT;
3364         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
3365                 cmd = LCFG_SET_LDLM_TIMEOUT;
3366         /* Check for known params here so we can return error to lctl */
3367         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
3368                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
3369                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
3370                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
3371                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
3372                 cmd = LCFG_PARAM;
3373         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
3374                 convert = 0; /* Don't convert string value to integer */
3375                 cmd = LCFG_PARAM;
3376         } else {
3377                 return -EINVAL;
3378         }
3379
3380         if (mgs_param_empty(ptr))
3381                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
3382         else
3383                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
3384
3385         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
3386         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
3387         if (!convert && *tmp != '\0')
3388                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
3389         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3390         if (lcr == NULL)
3391                 return -ENOMEM;
3392
3393         lcfg = &lcr->lcr_cfg;
3394         if (convert) {
3395                 rc = kstrtouint(tmp, 0, &lcfg->lcfg_num);
3396                 if (rc)
3397                         GOTO(out_rec_free, rc);
3398         } else {
3399                 lcfg->lcfg_num = 0;
3400         }
3401
3402         /* truncate the comment to the parameter name */
3403         ptr = tmp - 1;
3404         sep = *ptr;
3405         *ptr = '\0';
3406         /* modify all servers and clients */
3407         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3408                                       *tmp == '\0' ? NULL : lcr,
3409                                       mti->mti_fsname, sys, 0);
3410         if (rc == 0 && *tmp != '\0') {
3411                 switch (cmd) {
3412                 case LCFG_SET_TIMEOUT:
3413                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
3414                                 class_process_config(lcfg);
3415                         break;
3416                 case LCFG_SET_LDLM_TIMEOUT:
3417                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
3418                                 class_process_config(lcfg);
3419                         break;
3420                 default:
3421                         break;
3422                 }
3423         }
3424         *ptr = sep;
3425 out_rec_free:
3426         lustre_cfg_rec_free(lcr);
3427         return rc;
3428 }
3429
3430 /* write quota settings into log */
3431 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
3432                                struct fs_db *fsdb, struct mgs_target_info *mti,
3433                                char *quota, char *ptr)
3434 {
3435         struct mgs_thread_info  *mgi = mgs_env_info(env);
3436         struct llog_cfg_rec     *lcr;
3437         char                    *tmp;
3438         char                     sep;
3439         int                      rc, cmd = LCFG_PARAM;
3440
3441         /* support only 'meta' and 'data' pools so far */
3442         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
3443             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
3444                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
3445                        "& quota.ost are)\n", ptr);
3446                 return -EINVAL;
3447         }
3448
3449         if (*tmp == '\0') {
3450                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
3451         } else {
3452                 CDEBUG(D_MGS, "global '%s'\n", quota);
3453
3454                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
3455                     strchr(tmp, 'p') == NULL &&
3456                     strcmp(tmp, "none") != 0) {
3457                         CERROR("enable option(%s) isn't supported\n", tmp);
3458                         return -EINVAL;
3459                 }
3460         }
3461
3462         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
3463         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
3464         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3465         if (lcr == NULL)
3466                 return -ENOMEM;
3467
3468         /* truncate the comment to the parameter name */
3469         ptr = tmp - 1;
3470         sep = *ptr;
3471         *ptr = '\0';
3472
3473         /* XXX we duplicated quota enable information in all server
3474          *     config logs, it should be moved to a separate config
3475          *     log once we cleanup the config log for global param. */
3476         /* modify all servers */
3477         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3478                                       *tmp == '\0' ? NULL : lcr,
3479                                       mti->mti_fsname, quota, 1);
3480         *ptr = sep;
3481         lustre_cfg_rec_free(lcr);
3482         return rc < 0 ? rc : 0;
3483 }
3484
3485 static int mgs_srpc_set_param_disk(const struct lu_env *env,
3486                                    struct mgs_device *mgs,
3487                                    struct fs_db *fsdb,
3488                                    struct mgs_target_info *mti,
3489                                    char *param)
3490 {
3491         struct mgs_thread_info  *mgi = mgs_env_info(env);
3492         struct llog_cfg_rec     *lcr;
3493         struct llog_handle      *llh = NULL;
3494         char                    *logname;
3495         char                    *comment, *ptr;
3496         int                      rc, len;
3497
3498         ENTRY;
3499
3500         /* get comment */
3501         ptr = strchr(param, '=');
3502         LASSERT(ptr != NULL);
3503         len = ptr - param;
3504
3505         OBD_ALLOC(comment, len + 1);
3506         if (comment == NULL)
3507                 RETURN(-ENOMEM);
3508         strncpy(comment, param, len);
3509         comment[len] = '\0';
3510
3511         /* prepare lcfg */
3512         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
3513         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
3514         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
3515         if (lcr == NULL)
3516                 GOTO(out_comment, rc = -ENOMEM);
3517
3518         /* construct log name */
3519         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
3520         if (rc < 0)
3521                 GOTO(out_lcfg, rc);
3522
3523         if (mgs_log_is_empty(env, mgs, logname)) {
3524                 rc = record_start_log(env, mgs, &llh, logname);
3525                 if (rc < 0)
3526                         GOTO(out, rc);
3527                 record_end_log(env, &llh);
3528         }
3529
3530         /* obsolete old one */
3531         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
3532                         comment, CM_SKIP);
3533         if (rc < 0)
3534                 GOTO(out, rc);
3535         /* write the new one */
3536         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
3537                                   mti->mti_svname, comment);
3538         if (rc)
3539                 CERROR("%s: error writing log %s: rc = %d\n",
3540                        mgs->mgs_obd->obd_name, logname, rc);
3541 out:
3542         name_destroy(&logname);
3543 out_lcfg:
3544         lustre_cfg_rec_free(lcr);
3545 out_comment:
3546         OBD_FREE(comment, len + 1);
3547         RETURN(rc);
3548 }
3549
3550 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3551                                         char *param)
3552 {
3553         char    *ptr;
3554
3555         /* disable the adjustable udesc parameter for now, i.e. use default
3556          * setting that client always ship udesc to MDT if possible. to enable
3557          * it simply remove the following line */
3558         goto error_out;
3559
3560         ptr = strchr(param, '=');
3561         if (ptr == NULL)
3562                 goto error_out;
3563         *ptr++ = '\0';
3564
3565         if (strcmp(param, PARAM_SRPC_UDESC))
3566                 goto error_out;
3567
3568         if (strcmp(ptr, "yes") == 0) {
3569                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3570                 CWARN("Enable user descriptor shipping from client to MDT\n");
3571         } else if (strcmp(ptr, "no") == 0) {
3572                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3573                 CWARN("Disable user descriptor shipping from client to MDT\n");
3574         } else {
3575                 *(ptr - 1) = '=';
3576                 goto error_out;
3577         }
3578         return 0;
3579
3580 error_out:
3581         CERROR("Invalid param: %s\n", param);
3582         return -EINVAL;
3583 }
3584
3585 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3586                                   const char *svname,
3587                                   char *param)
3588 {
3589         struct sptlrpc_rule      rule;
3590         struct sptlrpc_rule_set *rset;
3591         int                      rc;
3592         ENTRY;
3593
3594         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3595                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3596                 RETURN(-EINVAL);
3597         }
3598
3599         if (strncmp(param, PARAM_SRPC_UDESC,
3600                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3601                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3602         }
3603
3604         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3605                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3606                 RETURN(-EINVAL);
3607         }
3608
3609         param += sizeof(PARAM_SRPC_FLVR) - 1;
3610
3611         rc = sptlrpc_parse_rule(param, &rule);
3612         if (rc)
3613                 RETURN(rc);
3614
3615         /* mgs rules implies must be mgc->mgs */
3616         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3617                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3618                      rule.sr_from != LUSTRE_SP_ANY) ||
3619                     (rule.sr_to != LUSTRE_SP_MGS &&
3620                      rule.sr_to != LUSTRE_SP_ANY))
3621                         RETURN(-EINVAL);
3622         }
3623
3624         /* preapre room for this coming rule. svcname format should be:
3625          * - fsname: general rule
3626          * - fsname-tgtname: target-specific rule
3627          */
3628         if (strchr(svname, '-')) {
3629                 struct mgs_tgt_srpc_conf *tgtconf;
3630                 int                       found = 0;
3631
3632                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3633                      tgtconf = tgtconf->mtsc_next) {
3634                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3635                                 found = 1;
3636                                 break;
3637                         }
3638                 }
3639
3640                 if (!found) {
3641                         int name_len;
3642
3643                         OBD_ALLOC_PTR(tgtconf);
3644                         if (tgtconf == NULL)
3645                                 RETURN(-ENOMEM);
3646
3647                         name_len = strlen(svname);
3648
3649                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3650                         if (tgtconf->mtsc_tgt == NULL) {
3651                                 OBD_FREE_PTR(tgtconf);
3652                                 RETURN(-ENOMEM);
3653                         }
3654                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3655
3656                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3657                         fsdb->fsdb_srpc_tgt = tgtconf;
3658                 }
3659
3660                 rset = &tgtconf->mtsc_rset;
3661         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3662                 /* put _mgs related srpc rule directly in mgs ruleset */
3663                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3664         } else {
3665                 rset = &fsdb->fsdb_srpc_gen;
3666         }
3667
3668         rc = sptlrpc_rule_set_merge(rset, &rule);
3669
3670         RETURN(rc);
3671 }
3672
3673 static int mgs_srpc_set_param(const struct lu_env *env,
3674                               struct mgs_device *mgs,
3675                               struct fs_db *fsdb,
3676                               struct mgs_target_info *mti,
3677                               char *param)
3678 {
3679         char                   *copy;
3680         int                     rc, copy_size;
3681         ENTRY;
3682
3683 #ifndef HAVE_GSS
3684         RETURN(-EINVAL);
3685 #endif
3686         /* keep a copy of original param, which could be destroied
3687          * during parsing */
3688         copy_size = strlen(param) + 1;
3689         OBD_ALLOC(copy, copy_size);
3690         if (copy == NULL)
3691                 return -ENOMEM;
3692         memcpy(copy, param, copy_size);
3693
3694         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3695         if (rc)
3696                 goto out_free;
3697
3698         /* previous steps guaranteed the syntax is correct */
3699         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3700         if (rc)
3701                 goto out_free;
3702
3703         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3704                 /*
3705                  * for mgs rules, make them effective immediately.
3706                  */
3707                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3708                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3709                                                  &fsdb->fsdb_srpc_gen);
3710         }
3711
3712 out_free:
3713         OBD_FREE(copy, copy_size);
3714         RETURN(rc);
3715 }
3716
3717 struct mgs_srpc_read_data {
3718         struct fs_db   *msrd_fsdb;
3719         int             msrd_skip;
3720 };
3721
3722 static int mgs_srpc_read_handler(const struct lu_env *env,
3723                                  struct llog_handle *llh,
3724                                  struct llog_rec_hdr *rec, void *data)
3725 {
3726         struct mgs_srpc_read_data *msrd = data;
3727         struct cfg_marker         *marker;
3728         struct lustre_cfg         *lcfg = REC_DATA(rec);
3729         char                      *svname, *param;
3730         int                        cfg_len, rc;
3731         ENTRY;
3732
3733         if (rec->lrh_type != OBD_CFG_REC) {
3734                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3735                 RETURN(-EINVAL);
3736         }
3737
3738         cfg_len = REC_DATA_LEN(rec);
3739
3740         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3741         if (rc) {
3742                 CERROR("Insane cfg\n");
3743                 RETURN(rc);
3744         }
3745
3746         if (lcfg->lcfg_command == LCFG_MARKER) {
3747                 marker = lustre_cfg_buf(lcfg, 1);
3748
3749                 if (marker->cm_flags & CM_START &&
3750                     marker->cm_flags & CM_SKIP)
3751                         msrd->msrd_skip = 1;
3752                 if (marker->cm_flags & CM_END)
3753                         msrd->msrd_skip = 0;
3754
3755                 RETURN(0);
3756         }
3757
3758         if (msrd->msrd_skip)
3759                 RETURN(0);
3760
3761         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3762                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3763                 RETURN(0);
3764         }
3765
3766         svname = lustre_cfg_string(lcfg, 0);
3767         if (svname == NULL) {
3768                 CERROR("svname is empty\n");
3769                 RETURN(0);
3770         }
3771
3772         param = lustre_cfg_string(lcfg, 1);
3773         if (param == NULL) {
3774                 CERROR("param is empty\n");
3775                 RETURN(0);
3776         }
3777
3778         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3779         if (rc)
3780                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3781
3782         RETURN(0);
3783 }
3784
3785 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3786                                 struct mgs_device *mgs,
3787                                 struct fs_db *fsdb)
3788 {
3789         struct llog_handle        *llh = NULL;
3790         struct llog_ctxt          *ctxt;
3791         char                      *logname;
3792         struct mgs_srpc_read_data  msrd;
3793         int                        rc;
3794         ENTRY;
3795
3796         /* construct log name */
3797         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3798         if (rc)
3799                 RETURN(rc);
3800
3801         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3802         LASSERT(ctxt != NULL);
3803
3804         if (mgs_log_is_empty(env, mgs, logname))
3805                 GOTO(out, rc = 0);
3806
3807         rc = llog_open(env, ctxt, &llh, NULL, logname,
3808                        LLOG_OPEN_EXISTS);
3809         if (rc < 0) {
3810                 if (rc == -ENOENT)
3811                         rc = 0;
3812                 GOTO(out, rc);
3813         }
3814
3815         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3816         if (rc)
3817                 GOTO(out_close, rc);
3818
3819         if (llog_get_size(llh) <= 1)
3820                 GOTO(out_close, rc = 0);
3821
3822         msrd.msrd_fsdb = fsdb;
3823         msrd.msrd_skip = 0;
3824
3825         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3826                           NULL);
3827
3828 out_close:
3829         llog_close(env, llh);
3830 out:
3831         llog_ctxt_put(ctxt);
3832         name_destroy(&logname);
3833
3834         if (rc)
3835                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3836         RETURN(rc);
3837 }
3838
3839 static int mgs_write_log_param2(const struct lu_env *env,
3840                                 struct mgs_device *mgs,
3841                                 struct fs_db *fsdb,
3842                                 struct mgs_target_info *mti, char *ptr)
3843 {
3844         struct lustre_cfg_bufs bufs;
3845         int rc;
3846
3847         ENTRY;
3848         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3849
3850         /* PARAM_MGSNODE and PARAM_NETWORK are set only when formating
3851          * or during the inital mount. It can never change after that.
3852          */
3853         if (!class_match_param(ptr, PARAM_MGSNODE, NULL) ||
3854             !class_match_param(ptr, PARAM_NETWORK, NULL)) {
3855                 rc = 0;
3856                 goto end;
3857         }
3858
3859         /* Processed in mgs_write_log_ost. Another value that can't
3860          * be changed by lctl set_param -P.
3861          */
3862         if (!class_match_param(ptr, PARAM_FAILMODE, NULL)) {
3863                 LCONSOLE_ERROR_MSG(0x169,
3864                                    "%s can only be changed with tunefs.lustre and --writeconf\n",
3865                                    ptr);
3866                 rc = -EPERM;
3867                 goto end;
3868         }
3869
3870         /* FIXME !!! Support for sptlrpc is incomplete. Currently the change
3871          * doesn't transmit to the client. See LU-7183.
3872          */
3873         if (!class_match_param(ptr, PARAM_SRPC, NULL)) {
3874                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3875                 goto end;
3876         }
3877
3878         /* Can't use class_match_param since ptr doesn't start with
3879          * PARAM_FAILNODE. So we look for PARAM_FAILNODE contained in ptr.
3880          */
3881         if (strstr(ptr, PARAM_FAILNODE)) {
3882                 /* Add a failover nidlist. We already processed failovers
3883                  * params for new targets in mgs_write_log_target.
3884                  */
3885                 const char *param;
3886
3887                 /* can't use wildcards with failover.node */
3888                 if (strchr(ptr, '*')) {
3889                         rc = -ENODEV;
3890                         goto end;
3891                 }
3892
3893                 param = strstr(ptr, PARAM_FAILNODE);
3894                 if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
3895                     sizeof(mti->mti_params)) {
3896                         rc = -E2BIG;
3897                         goto end;
3898                 }
3899
3900                 CDEBUG(D_MGS, "Adding failnode with param %s\n",
3901                        mti->mti_params);
3902                 rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3903                 goto end;
3904         }
3905
3906         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
3907                           mti->mti_svname, ptr);
3908 end:
3909         RETURN(rc);
3910 }
3911
3912 /* Permanent settings of all parameters by writing into the appropriate
3913  * configuration logs.
3914  * A parameter with null value ("<param>='\0'") means to erase it out of
3915  * the logs.
3916  */
3917 static int mgs_write_log_param(const struct lu_env *env,
3918                                struct mgs_device *mgs, struct fs_db *fsdb,
3919                                struct mgs_target_info *mti, char *ptr)
3920 {
3921         struct mgs_thread_info *mgi = mgs_env_info(env);
3922         char *logname;
3923         char *tmp;
3924         int rc = 0;
3925         ENTRY;
3926
3927         /* For various parameter settings, we have to figure out which logs
3928            care about them (e.g. both mdt and client for lov settings) */
3929         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3930
3931         /* The params are stored in MOUNT_DATA_FILE and modified via
3932            tunefs.lustre, or set using lctl conf_param */
3933
3934         /* Processed in lustre_start_mgc */
3935         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3936                 GOTO(end, rc);
3937
3938         /* Processed in ost/mdt */
3939         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3940                 GOTO(end, rc);
3941
3942         /* Processed in mgs_write_log_ost */
3943         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3944                 if (mti->mti_flags & LDD_F_PARAM) {
3945                         LCONSOLE_ERROR_MSG(0x169,
3946                                            "%s can only be changed with tunefs.lustre and --writeconf\n",
3947                                            ptr);
3948                         rc = -EPERM;
3949                 }
3950                 GOTO(end, rc);
3951         }
3952
3953         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3954                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3955                 GOTO(end, rc);
3956         }
3957
3958         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3959                 /* Add a failover nidlist */
3960                 rc = 0;
3961                 /* We already processed failovers params for new
3962                    targets in mgs_write_log_target */
3963                 if (mti->mti_flags & LDD_F_PARAM) {
3964                         CDEBUG(D_MGS, "Adding failnode\n");
3965                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3966                 }
3967                 GOTO(end, rc);
3968         }
3969
3970         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3971                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3972                 GOTO(end, rc);
3973         }
3974
3975         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3976                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3977                 GOTO(end, rc);
3978         }
3979
3980         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3981             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3982                 /* active=0 means off, anything else means on */
3983                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3984                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3985                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3986                 int i;
3987
3988                 if (!deactive_osc) {
3989                         __u32   index;
3990
3991                         rc = server_name2index(mti->mti_svname, &index, NULL);
3992                         if (rc < 0)
3993                                 GOTO(end, rc);
3994
3995                         if (index == 0) {
3996                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3997                                                    " (de)activated.\n",
3998                                                    mti->mti_svname);
3999                                 GOTO(end, rc = -EPERM);
4000                         }
4001                 }
4002
4003                 LCONSOLE_WARN("Permanently %sactivating %s\n",
4004                               flag ? "de" : "re", mti->mti_svname);
4005                 /* Modify clilov */
4006                 rc = name_create(&logname, mti->mti_fsname, "-client");
4007                 if (rc < 0)
4008                         GOTO(end, rc);
4009                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4010                                 mti->mti_svname,
4011                                 deactive_osc ? "add osc" : "add mdc", flag);
4012                 name_destroy(&logname);
4013                 if (rc < 0)
4014                         goto active_err;
4015
4016                 /* Modify mdtlov */
4017                 /* Add to all MDT logs for DNE */
4018                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4019                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4020                                 continue;
4021                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
4022                         if (rc < 0)
4023                                 GOTO(end, rc);
4024                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
4025                                         mti->mti_svname,
4026                                         deactive_osc ? "add osc" : "add osp",
4027                                         flag);
4028                         name_destroy(&logname);
4029                         if (rc < 0)
4030                                 goto active_err;
4031                 }
4032 active_err:
4033                 if (rc < 0) {
4034                         LCONSOLE_ERROR_MSG(0x145,
4035                                            "Couldn't find %s in log (%d). No permanent changes were made to the config log.\n",
4036                                            mti->mti_svname, rc);
4037                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
4038                                 LCONSOLE_ERROR_MSG(0x146,
4039                                                    "This may be because the log is in the old 1.4 style. Consider --writeconf to update the logs.\n");
4040                         GOTO(end, rc);
4041                 }
4042                 /* Fall through to osc/mdc proc for deactivating live
4043                    OSC/OSP on running MDT / clients. */
4044         }
4045         /* Below here, let obd's XXX_process_config methods handle it */
4046
4047         /* All lov. in proc */
4048         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
4049                 char *mdtlovname;
4050
4051                 CDEBUG(D_MGS, "lov param %s\n", ptr);
4052                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
4053                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
4054                                            "set on the MDT, not %s. "
4055                                            "Ignoring.\n",
4056                                            mti->mti_svname);
4057                         GOTO(end, rc = 0);
4058                 }
4059
4060                 /* Modify mdtlov */
4061                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4062                         GOTO(end, rc = -ENODEV);
4063
4064                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
4065                                              mti->mti_stripe_index);
4066                 if (rc)
4067                         GOTO(end, rc);
4068                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4069                                   &mgi->mgi_bufs, mdtlovname, ptr);
4070                 name_destroy(&logname);
4071                 name_destroy(&mdtlovname);
4072                 if (rc)
4073                         GOTO(end, rc);
4074
4075                 /* Modify clilov */
4076                 rc = name_create(&logname, mti->mti_fsname, "-client");
4077                 if (rc)
4078                         GOTO(end, rc);
4079                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4080                                   fsdb->fsdb_clilov, ptr);
4081                 name_destroy(&logname);
4082                 GOTO(end, rc);
4083         }
4084
4085         /* All osc., mdc., llite. params in proc */
4086         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
4087             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
4088             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
4089                 char *cname;
4090
4091                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
4092                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
4093                                            " cannot be modified. Consider"
4094                                            " updating the configuration with"
4095                                            " --writeconf\n",
4096                                            mti->mti_svname);
4097                         GOTO(end, rc = -EINVAL);
4098                 }
4099                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
4100                         rc = name_create(&cname, mti->mti_fsname, "-client");
4101                         /* Add the client type to match the obdname in
4102                            class_config_llog_handler */
4103                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4104                         rc = name_create(&cname, mti->mti_svname, "-mdc");
4105                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4106                         rc = name_create(&cname, mti->mti_svname, "-osc");
4107                 } else {
4108                         GOTO(end, rc = -EINVAL);
4109                 }
4110                 if (rc)
4111                         GOTO(end, rc);
4112
4113                 /* Forbid direct update of llite root squash parameters.
4114                  * These parameters are indirectly set via the MDT settings.
4115                  * See (LU-1778) */
4116                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
4117                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4118                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4119                         LCONSOLE_ERROR("%s: root squash parameters can only "
4120                                 "be updated through MDT component\n",
4121                                 mti->mti_fsname);
4122                         name_destroy(&cname);
4123                         GOTO(end, rc = -EINVAL);
4124                 }
4125
4126                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4127
4128                 /* Modify client */
4129                 rc = name_create(&logname, mti->mti_fsname, "-client");
4130                 if (rc) {
4131                         name_destroy(&cname);
4132                         GOTO(end, rc);
4133                 }
4134                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4135                                   cname, ptr);
4136
4137                 /* osc params affect the MDT as well */
4138                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
4139                         int i;
4140
4141                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
4142                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4143                                         continue;
4144                                 name_destroy(&cname);
4145                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
4146                                                          fsdb, i);
4147                                 name_destroy(&logname);
4148                                 if (rc)
4149                                         break;
4150                                 rc = name_create_mdt(&logname,
4151                                                      mti->mti_fsname, i);
4152                                 if (rc)
4153                                         break;
4154                                 if (!mgs_log_is_empty(env, mgs, logname)) {
4155                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
4156                                                           mti, logname,
4157                                                           &mgi->mgi_bufs,
4158                                                           cname, ptr);
4159                                         if (rc)
4160                                                 break;
4161                                 }
4162                         }
4163                 }
4164
4165                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
4166                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
4167                     rc == 0) {
4168                         char suffix[16];
4169                         char *lodname = NULL;
4170                         char *param_str = NULL;
4171                         int i;
4172                         int index;
4173
4174                         /* replace mdc with osp */
4175                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
4176                         rc = server_name2index(mti->mti_svname, &index, NULL);
4177                         if (rc < 0) {
4178                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4179                                 GOTO(end, rc);
4180                         }
4181
4182                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4183                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4184                                         continue;
4185
4186                                 if (i == index)
4187                                         continue;
4188
4189                                 name_destroy(&logname);
4190                                 rc = name_create_mdt(&logname, mti->mti_fsname,
4191                                                      i);
4192                                 if (rc < 0)
4193                                         break;
4194
4195                                 if (mgs_log_is_empty(env, mgs, logname))
4196                                         continue;
4197
4198                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
4199                                          i);
4200                                 name_destroy(&cname);
4201                                 rc = name_create(&cname, mti->mti_svname,
4202                                                  suffix);
4203                                 if (rc < 0)
4204                                         break;
4205
4206                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4207                                                   &mgi->mgi_bufs, cname, ptr);
4208                                 if (rc < 0)
4209                                         break;
4210
4211                                 /* Add configuration log for noitfying LOD
4212                                  * to active/deactive the OSP. */
4213                                 name_destroy(&param_str);
4214                                 rc = name_create(&param_str, cname,
4215                                                  (*tmp == '0') ?  ".active=0" :
4216                                                  ".active=1");
4217                                 if (rc < 0)
4218                                         break;
4219
4220                                 name_destroy(&lodname);
4221                                 rc = name_create(&lodname, logname, "-mdtlov");
4222                                 if (rc < 0)
4223                                         break;
4224
4225                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4226                                                   &mgi->mgi_bufs, lodname,
4227                                                   param_str);
4228                                 if (rc < 0)
4229                                         break;
4230                         }
4231                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4232                         name_destroy(&lodname);
4233                         name_destroy(&param_str);
4234                 }
4235
4236                 name_destroy(&logname);
4237                 name_destroy(&cname);
4238                 GOTO(end, rc);
4239         }
4240
4241         /* All mdt. params in proc */
4242         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
4243                 int i;
4244                 __u32 idx;
4245
4246                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4247                 if (strncmp(mti->mti_svname, mti->mti_fsname,
4248                             MTI_NAME_MAXLEN) == 0)
4249                         /* device is unspecified completely? */
4250                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
4251                 else
4252                         rc = server_name2index(mti->mti_svname, &idx, NULL);
4253                 if (rc < 0)
4254                         goto active_err;
4255                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
4256                         goto active_err;
4257                 if (rc & LDD_F_SV_ALL) {
4258                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4259                                 if (!test_bit(i,
4260                                                   fsdb->fsdb_mdt_index_map))
4261                                         continue;
4262                                 rc = name_create_mdt(&logname,
4263                                                 mti->mti_fsname, i);
4264                                 if (rc)
4265                                         goto active_err;
4266                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4267                                                   logname, &mgi->mgi_bufs,
4268                                                   logname, ptr);
4269                                 name_destroy(&logname);
4270                                 if (rc)
4271                                         goto active_err;
4272                         }
4273                 } else {
4274                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
4275                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
4276                                 LCONSOLE_ERROR("%s: root squash parameters "
4277                                         "cannot be applied to a single MDT\n",
4278                                         mti->mti_fsname);
4279                                 GOTO(end, rc = -EINVAL);
4280                         }
4281                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4282                                           mti->mti_svname, &mgi->mgi_bufs,
4283                                           mti->mti_svname, ptr);
4284                         if (rc)
4285                                 goto active_err;
4286                 }
4287
4288                 /* root squash settings are also applied to llite
4289                  * config log (see LU-1778) */
4290                 if (rc == 0 &&
4291                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4292                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4293                         char *cname;
4294                         char *ptr2;
4295
4296                         rc = name_create(&cname, mti->mti_fsname, "-client");
4297                         if (rc)
4298                                 GOTO(end, rc);
4299                         rc = name_create(&logname, mti->mti_fsname, "-client");
4300                         if (rc) {
4301                                 name_destroy(&cname);
4302                                 GOTO(end, rc);
4303                         }
4304                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
4305                         if (rc) {
4306                                 name_destroy(&cname);
4307                                 name_destroy(&logname);
4308                                 GOTO(end, rc);
4309                         }
4310                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4311                                           &mgi->mgi_bufs, cname, ptr2);
4312                         name_destroy(&ptr2);
4313                         name_destroy(&logname);
4314                         name_destroy(&cname);
4315                 }
4316                 GOTO(end, rc);
4317         }
4318
4319         /* All mdd., ost. and osd. params in proc */
4320         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
4321             (class_match_param(ptr, PARAM_LOD, NULL) == 0) ||
4322             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
4323             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
4324                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4325                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4326                         GOTO(end, rc = -ENODEV);
4327
4328                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4329                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
4330                 GOTO(end, rc);
4331         }
4332
4333         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
4334
4335 end:
4336         if (rc)
4337                 CERROR("err %d on param '%s'\n", rc, ptr);
4338
4339         RETURN(rc);
4340 }
4341
4342 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
4343                          struct mgs_target_info *mti, struct fs_db *fsdb)
4344 {
4345         char    *buf, *params;
4346         int      rc = -EINVAL;
4347
4348         ENTRY;
4349
4350         /* set/check the new target index */
4351         rc = mgs_set_index(env, mgs, mti);
4352         if (rc < 0)
4353                 RETURN(rc);
4354
4355         if (rc == EALREADY) {
4356                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
4357                               mti->mti_stripe_index, mti->mti_svname);
4358                 /* We would like to mark old log sections as invalid
4359                    and add new log sections in the client and mdt logs.
4360                    But if we add new sections, then live clients will
4361                    get repeat setup instructions for already running
4362                    osc's. So don't update the client/mdt logs. */
4363                 mti->mti_flags &= ~LDD_F_UPDATE;
4364                 rc = 0;
4365         }
4366
4367         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
4368                          cfs_fail_val : 10);
4369
4370         mutex_lock(&fsdb->fsdb_mutex);
4371
4372         if (mti->mti_flags & (LDD_F_VIRGIN | LDD_F_WRITECONF)) {
4373                 /* Generate a log from scratch */
4374                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4375                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
4376                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4377                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
4378                 } else {
4379                         CERROR("Unknown target type %#x, can't create log for %s\n",
4380                                mti->mti_flags, mti->mti_svname);
4381                 }
4382                 if (rc) {
4383                         CERROR("Can't write logs for %s (%d)\n",
4384                                mti->mti_svname, rc);
4385                         GOTO(out_up, rc);
4386                 }
4387         } else {
4388                 /* Just update the params from tunefs in mgs_write_log_params */
4389                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
4390                 mti->mti_flags |= LDD_F_PARAM;
4391         }
4392
4393         /* allocate temporary buffer, where class_get_next_param will
4394            make copy of a current  parameter */
4395         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
4396         if (buf == NULL)
4397                 GOTO(out_up, rc = -ENOMEM);
4398         params = mti->mti_params;
4399         while (params != NULL) {
4400                 rc = class_get_next_param(&params, buf);
4401                 if (rc) {
4402                         if (rc == 1)
4403                                 /* there is no next parameter, that is
4404                                    not an error */
4405                                 rc = 0;
4406                         break;
4407                 }
4408                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
4409                        params, buf);
4410                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
4411                 if (rc)
4412                         break;
4413         }
4414
4415         OBD_FREE(buf, strlen(mti->mti_params) + 1);
4416
4417 out_up:
4418         mutex_unlock(&fsdb->fsdb_mutex);
4419         RETURN(rc);
4420 }
4421
4422 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
4423 {
4424         struct llog_ctxt        *ctxt;
4425         int                      rc = 0;
4426
4427         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4428         if (ctxt == NULL) {
4429                 CERROR("%s: MGS config context doesn't exist\n",
4430                        mgs->mgs_obd->obd_name);
4431                 rc = -ENODEV;
4432         } else {
4433                 rc = llog_erase(env, ctxt, NULL, name);
4434                 /* llog may not exist */
4435                 if (rc == -ENOENT)
4436                         rc = 0;
4437                 llog_ctxt_put(ctxt);
4438         }
4439
4440         if (rc)
4441                 CERROR("%s: failed to clear log %s: %d\n",
4442                        mgs->mgs_obd->obd_name, name, rc);
4443
4444         return rc;
4445 }
4446
4447 /* erase all logs for the given fs */
4448 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs,
4449                    const char *fsname)
4450 {
4451         struct list_head log_list;
4452         struct mgs_direntry *dirent, *n;
4453         char barrier_name[20] = {};
4454         char *suffix;
4455         int count = 0;
4456         int rc, len = strlen(fsname);
4457         ENTRY;
4458
4459         mutex_lock(&mgs->mgs_mutex);
4460
4461         /* Find all the logs in the CONFIGS directory */
4462         rc = class_dentry_readdir(env, mgs, &log_list);
4463         if (rc) {
4464                 mutex_unlock(&mgs->mgs_mutex);
4465                 RETURN(rc);
4466         }
4467
4468         if (list_empty(&log_list)) {
4469                 mutex_unlock(&mgs->mgs_mutex);
4470                 RETURN(-ENOENT);
4471         }
4472
4473         snprintf(barrier_name, sizeof(barrier_name) - 1, "%s-%s",
4474                  fsname, BARRIER_FILENAME);
4475         /* Delete the barrier fsdb */
4476         mgs_remove_fsdb_by_name(mgs, barrier_name);
4477         /* Delete the fs db */
4478         mgs_remove_fsdb_by_name(mgs, fsname);
4479         mutex_unlock(&mgs->mgs_mutex);
4480
4481         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4482                 list_del_init(&dirent->mde_list);
4483                 suffix = strrchr(dirent->mde_name, '-');
4484                 if (suffix != NULL) {
4485                         if ((len == suffix - dirent->mde_name) &&
4486                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
4487                                 CDEBUG(D_MGS, "Removing log %s\n",
4488                                        dirent->mde_name);
4489                                 mgs_erase_log(env, mgs, dirent->mde_name);
4490                                 count++;
4491                         }
4492                 }
4493                 mgs_direntry_free(dirent);
4494         }
4495
4496         if (count == 0)
4497                 rc = -ENOENT;
4498
4499         RETURN(rc);
4500 }
4501
4502 /* list all logs for the given fs */
4503 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
4504                   struct obd_ioctl_data *data)
4505 {
4506         struct list_head         log_list;
4507         struct mgs_direntry     *dirent, *n;
4508         char                    *out, *suffix, prefix[] = "config_log: ";
4509         int                      prefix_len = strlen(prefix);
4510         int                      len, remains, start = 0, rc;
4511
4512         ENTRY;
4513
4514         /* Find all the logs in the CONFIGS directory */
4515         rc = class_dentry_readdir(env, mgs, &log_list);
4516         if (rc)
4517                 RETURN(rc);
4518
4519         out = data->ioc_bulk;
4520         remains = data->ioc_inllen1;
4521         /* OBD_FAIL: fetch the config_log records from the specified one */
4522         if (OBD_FAIL_CHECK(OBD_FAIL_CATLIST))
4523                 data->ioc_count = cfs_fail_val;
4524
4525         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4526                 list_del_init(&dirent->mde_list);
4527                 suffix = strrchr(dirent->mde_name, '-');
4528                 if (suffix != NULL) {
4529                         len = prefix_len + dirent->mde_len + 1;
4530                         if (remains - len < 0) {
4531                                 /* No enough space for this record */
4532                                 mgs_direntry_free(dirent);
4533                                 goto out;
4534                         }
4535                         start++;
4536                         if (start < data->ioc_count) {
4537                                 mgs_direntry_free(dirent);
4538                                 continue;
4539                         }
4540                         len = scnprintf(out, remains, "%s%s\n", prefix,
4541                                         dirent->mde_name);
4542                         out += len;
4543                         remains -= len;
4544                 }
4545                 mgs_direntry_free(dirent);
4546                 if (remains <= 1)
4547                         /* Full */
4548                         goto out;
4549         }
4550         /* Finished */
4551         start = 0;
4552 out:
4553         data->ioc_count = start;
4554         RETURN(rc);
4555 }
4556
4557 struct mgs_lcfg_fork_data {
4558         struct lustre_cfg_bufs   mlfd_bufs;
4559         struct mgs_device       *mlfd_mgs;
4560         struct llog_handle      *mlfd_llh;
4561         const char              *mlfd_oldname;
4562         const char              *mlfd_newname;
4563         char                     mlfd_data[0];
4564 };
4565
4566 static bool contain_valid_fsname(char *buf, const char *fsname,
4567                                  int buflen, int namelen)
4568 {
4569         if (buflen < namelen)
4570                 return false;
4571
4572         if (memcmp(buf, fsname, namelen) != 0)
4573                 return false;
4574
4575         if (buf[namelen] != '\0' && buf[namelen] != '-')
4576                 return false;
4577
4578         return true;
4579 }
4580
4581 static int mgs_lcfg_fork_handler(const struct lu_env *env,
4582                                  struct llog_handle *o_llh,
4583                                  struct llog_rec_hdr *o_rec, void *data)
4584 {
4585         struct mgs_lcfg_fork_data *mlfd = data;
4586         struct lustre_cfg_bufs *n_bufs = &mlfd->mlfd_bufs;
4587         struct lustre_cfg *o_lcfg = (struct lustre_cfg *)(o_rec + 1);
4588         struct llog_cfg_rec *lcr;
4589         char *o_buf;
4590         char *n_buf = mlfd->mlfd_data;
4591         int o_buflen;
4592         int o_namelen = strlen(mlfd->mlfd_oldname);
4593         int n_namelen = strlen(mlfd->mlfd_newname);
4594         int diff = n_namelen - o_namelen;
4595         __u32 cmd = o_lcfg->lcfg_command;
4596         __u32 cnt = o_lcfg->lcfg_bufcount;
4597         int rc;
4598         int i;
4599         ENTRY;
4600
4601         /* buf[0] */
4602         o_buf = lustre_cfg_buf(o_lcfg, 0);
4603         o_buflen = o_lcfg->lcfg_buflens[0];
4604         if (contain_valid_fsname(o_buf, mlfd->mlfd_oldname, o_buflen,
4605                                  o_namelen)) {
4606                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4607                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4608                        o_buflen - o_namelen);
4609                 lustre_cfg_bufs_reset(n_bufs, n_buf);
4610                 n_buf += cfs_size_round(o_buflen + diff);
4611         } else {
4612                 lustre_cfg_bufs_reset(n_bufs, o_buflen != 0 ? o_buf : NULL);
4613         }
4614
4615         switch (cmd) {
4616         case LCFG_MARKER: {
4617                 struct cfg_marker *o_marker;
4618                 struct cfg_marker *n_marker;
4619                 int tgt_namelen;
4620
4621                 if (cnt != 2) {
4622                         CDEBUG(D_MGS, "Unknown cfg marker entry with %d "
4623                                "buffers\n", cnt);
4624                         RETURN(-EINVAL);
4625                 }
4626
4627                 /* buf[1] is marker */
4628                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4629                 o_buflen = o_lcfg->lcfg_buflens[1];
4630                 o_marker = (struct cfg_marker *)o_buf;
4631                 if (!contain_valid_fsname(o_marker->cm_tgtname,
4632                                           mlfd->mlfd_oldname,
4633                                           sizeof(o_marker->cm_tgtname),
4634                                           o_namelen)) {
4635                         lustre_cfg_bufs_set(n_bufs, 1, o_marker,
4636                                             sizeof(*o_marker));
4637                         break;
4638                 }
4639
4640                 n_marker = (struct cfg_marker *)n_buf;
4641                 *n_marker = *o_marker;
4642                 memcpy(n_marker->cm_tgtname, mlfd->mlfd_newname, n_namelen);
4643                 tgt_namelen = strlen(o_marker->cm_tgtname);
4644                 if (tgt_namelen > o_namelen)
4645                         memcpy(n_marker->cm_tgtname + n_namelen,
4646                                o_marker->cm_tgtname + o_namelen,
4647                                tgt_namelen - o_namelen);
4648                 n_marker->cm_tgtname[tgt_namelen + diff] = '\0';
4649                 lustre_cfg_bufs_set(n_bufs, 1, n_marker, sizeof(*n_marker));
4650                 break;
4651         }
4652         case LCFG_PARAM:
4653         case LCFG_SET_PARAM: {
4654                 for (i = 1; i < cnt; i++)
4655                         /* buf[i] is the param value, reuse it directly */
4656                         lustre_cfg_bufs_set(n_bufs, i,
4657                                             lustre_cfg_buf(o_lcfg, i),
4658                                             o_lcfg->lcfg_buflens[i]);
4659                 break;
4660         }
4661         case LCFG_POOL_NEW:
4662         case LCFG_POOL_ADD:
4663         case LCFG_POOL_REM:
4664         case LCFG_POOL_DEL: {
4665                 if (cnt < 3 || cnt > 4) {
4666                         CDEBUG(D_MGS, "Unknown cfg pool (%x) entry with %d "
4667                                "buffers\n", cmd, cnt);
4668                         RETURN(-EINVAL);
4669                 }
4670
4671                 /* buf[1] is fsname */
4672                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4673                 o_buflen = o_lcfg->lcfg_buflens[1];
4674                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4675                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4676                        o_buflen - o_namelen);
4677                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen + diff);
4678                 n_buf += cfs_size_round(o_buflen + diff);
4679
4680                 /* buf[2] is the pool name, reuse it directly */
4681                 lustre_cfg_bufs_set(n_bufs, 2, lustre_cfg_buf(o_lcfg, 2),
4682                                     o_lcfg->lcfg_buflens[2]);
4683
4684                 if (cnt == 3)
4685                         break;
4686
4687                 /* buf[3] is ostname */
4688                 o_buf = lustre_cfg_buf(o_lcfg, 3);
4689                 o_buflen = o_lcfg->lcfg_buflens[3];
4690                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4691                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4692                        o_buflen - o_namelen);
4693                 lustre_cfg_bufs_set(n_bufs, 3, n_buf, o_buflen + diff);
4694                 break;
4695         }
4696         case LCFG_SETUP: {
4697                 if (cnt == 2) {
4698                         o_buflen = o_lcfg->lcfg_buflens[1];
4699                         if (o_buflen == sizeof(struct lov_desc) ||
4700                             o_buflen == sizeof(struct lmv_desc)) {
4701                                 char *o_uuid;
4702                                 char *n_uuid;
4703                                 int uuid_len;
4704
4705                                 /* buf[1] */
4706                                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4707                                 if (o_buflen == sizeof(struct lov_desc)) {
4708                                         struct lov_desc *o_desc =
4709                                                 (struct lov_desc *)o_buf;
4710                                         struct lov_desc *n_desc =
4711                                                 (struct lov_desc *)n_buf;
4712
4713                                         *n_desc = *o_desc;
4714                                         o_uuid = o_desc->ld_uuid.uuid;
4715                                         n_uuid = n_desc->ld_uuid.uuid;
4716                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4717                                 } else {
4718                                         struct lmv_desc *o_desc =
4719                                                 (struct lmv_desc *)o_buf;
4720                                         struct lmv_desc *n_desc =
4721                                                 (struct lmv_desc *)n_buf;
4722
4723                                         *n_desc = *o_desc;
4724                                         o_uuid = o_desc->ld_uuid.uuid;
4725                                         n_uuid = n_desc->ld_uuid.uuid;
4726                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4727                                 }
4728
4729                                 if (unlikely(!contain_valid_fsname(o_uuid,
4730                                                 mlfd->mlfd_oldname, uuid_len,
4731                                                 o_namelen))) {
4732                                         lustre_cfg_bufs_set(n_bufs, 1, o_buf,
4733                                                             o_buflen);
4734                                         break;
4735                                 }
4736
4737                                 memcpy(n_uuid, mlfd->mlfd_newname, n_namelen);
4738                                 uuid_len = strlen(o_uuid);
4739                                 if (uuid_len > o_namelen)
4740                                         memcpy(n_uuid + n_namelen,
4741                                                o_uuid + o_namelen,
4742                                                uuid_len - o_namelen);
4743                                 n_uuid[uuid_len + diff] = '\0';
4744                                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen);
4745                                 break;
4746                         } /* else case fall through */
4747                 } /* else case fall through */
4748         }
4749         /* fallthrough */
4750         default: {
4751                 for (i = 1; i < cnt; i++) {
4752                         o_buflen = o_lcfg->lcfg_buflens[i];
4753                         if (o_buflen == 0)
4754                                 continue;
4755
4756                         o_buf = lustre_cfg_buf(o_lcfg, i);
4757                         if (!contain_valid_fsname(o_buf, mlfd->mlfd_oldname,
4758                                                   o_buflen, o_namelen)) {
4759                                 lustre_cfg_bufs_set(n_bufs, i, o_buf, o_buflen);
4760                                 continue;
4761                         }
4762
4763                         memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4764                         if (o_buflen == o_namelen) {
4765                                 lustre_cfg_bufs_set(n_bufs, i, n_buf,
4766                                                     n_namelen);
4767                                 n_buf += cfs_size_round(n_namelen);
4768                                 continue;
4769                         }
4770
4771                         memcpy(n_buf + n_namelen, o_buf + o_namelen,
4772                                o_buflen - o_namelen);
4773                         lustre_cfg_bufs_set(n_bufs, i, n_buf, o_buflen + diff);
4774                         n_buf += cfs_size_round(o_buflen + diff);
4775                 }
4776                 break;
4777         }
4778         }
4779
4780         lcr = lustre_cfg_rec_new(cmd, n_bufs);
4781         if (!lcr)
4782                 RETURN(-ENOMEM);
4783
4784         lcr->lcr_cfg = *o_lcfg;
4785         rc = llog_write(env, mlfd->mlfd_llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
4786         lustre_cfg_rec_free(lcr);
4787
4788         RETURN(rc);
4789 }
4790
4791 static int mgs_lcfg_fork_one(const struct lu_env *env, struct mgs_device *mgs,
4792                              struct mgs_direntry *mde, const char *oldname,
4793                              const char *newname)
4794 {
4795         struct llog_handle *old_llh = NULL;
4796         struct llog_handle *new_llh = NULL;
4797         struct llog_ctxt *ctxt = NULL;
4798         struct mgs_lcfg_fork_data *mlfd = NULL;
4799         char *name_buf = NULL;
4800         int name_buflen;
4801         int old_namelen = strlen(oldname);
4802         int new_namelen = strlen(newname);
4803         int rc;
4804         ENTRY;
4805
4806         name_buflen = mde->mde_len + new_namelen - old_namelen;
4807         OBD_ALLOC(name_buf, name_buflen);
4808         if (!name_buf)
4809                 RETURN(-ENOMEM);
4810
4811         memcpy(name_buf, newname, new_namelen);
4812         memcpy(name_buf + new_namelen, mde->mde_name + old_namelen,
4813                mde->mde_len - old_namelen);
4814
4815         CDEBUG(D_MGS, "Fork the config-log from %s to %s\n",
4816                mde->mde_name, name_buf);
4817
4818         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4819         LASSERT(ctxt);
4820
4821         rc = llog_open_create(env, ctxt, &new_llh, NULL, name_buf);
4822         if (rc)
4823                 GOTO(out, rc);
4824
4825         rc = llog_init_handle(env, new_llh, LLOG_F_IS_PLAIN, NULL);
4826         if (rc)
4827                 GOTO(out, rc);
4828
4829         if (unlikely(mgs_log_is_empty(env, mgs, mde->mde_name)))
4830                 GOTO(out, rc = 0);
4831
4832         rc = llog_open(env, ctxt, &old_llh, NULL, mde->mde_name,
4833                        LLOG_OPEN_EXISTS);
4834         if (rc)
4835                 GOTO(out, rc);
4836
4837         rc = llog_init_handle(env, old_llh, LLOG_F_IS_PLAIN, NULL);
4838         if (rc)
4839                 GOTO(out, rc);
4840
4841         new_llh->lgh_hdr->llh_tgtuuid = old_llh->lgh_hdr->llh_tgtuuid;
4842
4843         OBD_ALLOC(mlfd, LLOG_MIN_CHUNK_SIZE);
4844         if (!mlfd)
4845                 GOTO(out, rc = -ENOMEM);
4846
4847         mlfd->mlfd_mgs = mgs;
4848         mlfd->mlfd_llh = new_llh;
4849         mlfd->mlfd_oldname = oldname;
4850         mlfd->mlfd_newname = newname;
4851
4852         rc = llog_process(env, old_llh, mgs_lcfg_fork_handler, mlfd, NULL);
4853         OBD_FREE(mlfd, LLOG_MIN_CHUNK_SIZE);
4854
4855         GOTO(out, rc);
4856
4857 out:
4858         if (old_llh)
4859                 llog_close(env, old_llh);
4860         if (new_llh)
4861                 llog_close(env, new_llh);
4862         if (name_buf)
4863                 OBD_FREE(name_buf, name_buflen);
4864         if (ctxt)
4865                 llog_ctxt_put(ctxt);
4866
4867         return rc;
4868 }
4869
4870 int mgs_lcfg_fork(const struct lu_env *env, struct mgs_device *mgs,
4871                   const char *oldname, const char *newname)
4872 {
4873         struct list_head log_list;
4874         struct mgs_direntry *dirent, *n;
4875         int olen = strlen(oldname);
4876         int nlen = strlen(newname);
4877         int count = 0;
4878         int rc = 0;
4879         ENTRY;
4880
4881         if (unlikely(!oldname || oldname[0] == '\0' ||
4882                      !newname || newname[0] == '\0'))
4883                 RETURN(-EINVAL);
4884
4885         if (strcmp(oldname, newname) == 0)
4886                 RETURN(-EINVAL);
4887
4888         /* lock it to prevent fork/erase/register in parallel. */
4889         mutex_lock(&mgs->mgs_mutex);
4890
4891         rc = class_dentry_readdir(env, mgs, &log_list);
4892         if (rc) {
4893                 mutex_unlock(&mgs->mgs_mutex);
4894                 RETURN(rc);
4895         }
4896
4897         if (list_empty(&log_list)) {
4898                 mutex_unlock(&mgs->mgs_mutex);
4899                 RETURN(-ENOENT);
4900         }
4901
4902         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4903                 char *ptr;
4904
4905                 ptr = strrchr(dirent->mde_name, '-');
4906                 if (ptr) {
4907                         int tlen = ptr - dirent->mde_name;
4908
4909                         if (tlen == nlen &&
4910                             strncmp(newname, dirent->mde_name, tlen) == 0)
4911                                 GOTO(out, rc = -EEXIST);
4912
4913                         if (tlen == olen &&
4914                             strncmp(oldname, dirent->mde_name, tlen) == 0)
4915                                 continue;
4916                 }
4917
4918                 list_del_init(&dirent->mde_list);
4919                 mgs_direntry_free(dirent);
4920         }
4921
4922         if (list_empty(&log_list)) {
4923                 mutex_unlock(&mgs->mgs_mutex);
4924                 RETURN(-ENOENT);
4925         }
4926
4927         list_for_each_entry(dirent, &log_list, mde_list) {
4928                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, newname);
4929                 if (rc)
4930                         break;
4931
4932                 count++;
4933         }
4934
4935 out:
4936         mutex_unlock(&mgs->mgs_mutex);
4937
4938         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4939                 list_del_init(&dirent->mde_list);
4940                 mgs_direntry_free(dirent);
4941         }
4942
4943         if (rc && count > 0)
4944                 mgs_erase_logs(env, mgs, newname);
4945
4946         RETURN(rc);
4947 }
4948
4949 int mgs_lcfg_erase(const struct lu_env *env, struct mgs_device *mgs,
4950                    const char *fsname)
4951 {
4952         int rc;
4953         ENTRY;
4954
4955         if (unlikely(!fsname || fsname[0] == '\0'))
4956                 RETURN(-EINVAL);
4957
4958         rc = mgs_erase_logs(env, mgs, fsname);
4959
4960         RETURN(rc);
4961 }
4962
4963 static int mgs_xattr_del(const struct lu_env *env, struct dt_object *obj)
4964 {
4965         struct dt_device *dev;
4966         struct thandle *th = NULL;
4967         int rc = 0;
4968
4969         ENTRY;
4970
4971         dev = container_of(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
4972         th = dt_trans_create(env, dev);
4973         if (IS_ERR(th))
4974                 RETURN(PTR_ERR(th));
4975
4976         rc = dt_declare_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4977         if (rc)
4978                 GOTO(stop, rc);
4979
4980         rc = dt_trans_start_local(env, dev, th);
4981         if (rc)
4982                 GOTO(stop, rc);
4983
4984         dt_write_lock(env, obj, 0);
4985         rc = dt_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4986
4987         GOTO(unlock, rc);
4988
4989 unlock:
4990         dt_write_unlock(env, obj);
4991
4992 stop:
4993         dt_trans_stop(env, dev, th);
4994
4995         return rc;
4996 }
4997
4998 int mgs_lcfg_rename(const struct lu_env *env, struct mgs_device *mgs)
4999 {
5000         struct list_head log_list;
5001         struct mgs_direntry *dirent, *n;
5002         char fsname[16];
5003         struct lu_buf buf = {
5004                 .lb_buf = fsname,
5005                 .lb_len = sizeof(fsname)
5006         };
5007         int rc = 0;
5008
5009         ENTRY;
5010
5011         rc = class_dentry_readdir(env, mgs, &log_list);
5012         if (rc)
5013                 RETURN(rc);
5014
5015         if (list_empty(&log_list))
5016                 RETURN(0);
5017
5018         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
5019                 struct dt_object *o = NULL;
5020                 char oldname[16];
5021                 char *ptr;
5022                 int len;
5023
5024                 list_del_init(&dirent->mde_list);
5025                 ptr = strrchr(dirent->mde_name, '-');
5026                 if (!ptr)
5027                         goto next;
5028
5029                 len = ptr - dirent->mde_name;
5030                 if (unlikely(len >= sizeof(oldname))) {
5031                         CDEBUG(D_MGS, "Skip invalid configuration file %s\n",
5032                                dirent->mde_name);
5033                         goto next;
5034                 }
5035
5036                 o = local_file_find(env, mgs->mgs_los, mgs->mgs_configs_dir,
5037                                     dirent->mde_name);
5038                 if (IS_ERR(o)) {
5039                         rc = PTR_ERR(o);
5040                         CDEBUG(D_MGS, "Fail to locate file %s: rc = %d\n",
5041                                dirent->mde_name, rc);
5042                         goto next;
5043                 }
5044
5045                 rc = dt_xattr_get(env, o, &buf, XATTR_TARGET_RENAME);
5046                 if (rc < 0) {
5047                         if (rc == -ENODATA)
5048                                 rc = 0;
5049                         else
5050                                 CDEBUG(D_MGS,
5051                                        "Fail to get EA for %s: rc = %d\n",
5052                                        dirent->mde_name, rc);
5053                         goto next;
5054                 }
5055
5056                 if (unlikely(rc == len &&
5057                              memcmp(fsname, dirent->mde_name, len) == 0)) {
5058                         /* The new fsname is the same as the old one. */
5059                         rc = mgs_xattr_del(env, o);
5060                         goto next;
5061                 }
5062
5063                 memcpy(oldname, dirent->mde_name, len);
5064                 oldname[len] = '\0';
5065                 fsname[rc] = '\0';
5066                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, fsname);
5067                 if (rc && rc != -EEXIST) {
5068                         CDEBUG(D_MGS, "Fail to fork %s: rc = %d\n",
5069                                dirent->mde_name, rc);
5070                         goto next;
5071                 }
5072
5073                 rc = mgs_erase_log(env, mgs, dirent->mde_name);
5074                 if (rc) {
5075                         CDEBUG(D_MGS, "Fail to erase old %s: rc = %d\n",
5076                                dirent->mde_name, rc);
5077                         /* keep it there if failed to remove it. */
5078                         rc = 0;
5079                 }
5080
5081 next:
5082                 if (o && !IS_ERR(o))
5083                         lu_object_put(env, &o->do_lu);
5084
5085                 mgs_direntry_free(dirent);
5086                 if (rc)
5087                         break;
5088         }
5089
5090         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
5091                 list_del_init(&dirent->mde_list);
5092                 mgs_direntry_free(dirent);
5093         }
5094
5095         RETURN(rc);
5096 }
5097
5098 /* Setup _mgs fsdb and log
5099  */
5100 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5101 {
5102         struct fs_db *fsdb = NULL;
5103         int rc;
5104         ENTRY;
5105
5106         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
5107         if (!rc)
5108                 mgs_put_fsdb(mgs, fsdb);
5109
5110         RETURN(rc);
5111 }
5112
5113 /* Setup params fsdb and log
5114  */
5115 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5116 {
5117         struct fs_db *fsdb = NULL;
5118         struct llog_handle *params_llh = NULL;
5119         int rc;
5120         ENTRY;
5121
5122         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5123         if (!rc) {
5124                 mutex_lock(&fsdb->fsdb_mutex);
5125                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
5126                 if (!rc)
5127                         rc = record_end_log(env, &params_llh);
5128                 mutex_unlock(&fsdb->fsdb_mutex);
5129                 mgs_put_fsdb(mgs, fsdb);
5130         }
5131
5132         RETURN(rc);
5133 }
5134
5135 /* Cleanup params fsdb and log
5136  */
5137 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
5138 {
5139         int rc;
5140
5141         rc = mgs_erase_logs(env, mgs, PARAMS_FILENAME);
5142         return rc == -ENOENT ? 0 : rc;
5143 }
5144
5145 /**
5146  * Fill in the mgs_target_info based on data devname and param provide.
5147  *
5148  * @env         thread context
5149  * @mgs         mgs device
5150  * @mti         mgs target info. We want to set this based other paramters
5151  *              passed to this function. Once setup we write it to the config
5152  *              logs.
5153  * @devname     optional OBD device name
5154  * @param       string that contains both what tunable to set and the value to
5155  *              set it to.
5156  *
5157  * RETURN       0 for success
5158  *              negative error number on failure
5159  **/
5160 static int mgs_set_conf_param(const struct lu_env *env, struct mgs_device *mgs,
5161                               struct mgs_target_info *mti, const char *devname,
5162                               const char *param)
5163 {
5164         struct fs_db *fsdb = NULL;
5165         int dev_type;
5166         int rc = 0;
5167
5168         ENTRY;
5169         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
5170         if (!devname) {
5171                 size_t len;
5172
5173                 /* We have two possible cases here:
5174                  *
5175                  * 1) the device name embedded in the param:
5176                  *    lustre-OST0000.osc.max_dirty_mb=32
5177                  *
5178                  * 2) the file system name is embedded in
5179                  *    the param: lustre.sys.at.min=0
5180                  */
5181                 len = strcspn(param, ".=");
5182                 if (!len || param[len] == '=')
5183                         RETURN(-EINVAL);
5184
5185                 if (len >= sizeof(mti->mti_svname))
5186                         RETURN(-E2BIG);
5187
5188                 snprintf(mti->mti_svname, sizeof(mti->mti_svname),
5189                          "%.*s", (int)len, param);
5190                 param += len + 1;
5191         } else {
5192                 if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname)) >=
5193                     sizeof(mti->mti_svname))
5194                         RETURN(-E2BIG);
5195         }
5196
5197         if (!strlen(mti->mti_svname)) {
5198                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
5199                 RETURN(-ENOSYS);
5200         }
5201
5202         dev_type = mgs_parse_devname(mti->mti_svname, mti->mti_fsname,
5203                                      &mti->mti_stripe_index);
5204         switch (dev_type) {
5205         /* For this case we have an invalid obd device name */
5206         case -ENXIO:
5207                 CDEBUG(D_MGS, "%s don't contain an index\n", mti->mti_svname);
5208                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5209                 dev_type = 0;
5210                 break;
5211         /* Not an obd device, assume devname is the fsname.
5212          * User might of only provided fsname and not obd device
5213          */
5214         case -EINVAL:
5215                 CDEBUG(D_MGS, "%s is seen as a file system name\n", mti->mti_svname);
5216                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5217                 dev_type = 0;
5218                 break;
5219         default:
5220                 if (dev_type < 0)
5221                         GOTO(out, rc = dev_type);
5222
5223                 /* param related to llite isn't allowed to set by OST or MDT */
5224                 if (dev_type & LDD_F_SV_TYPE_OST ||
5225                     dev_type & LDD_F_SV_TYPE_MDT) {
5226                         /* param related to llite isn't allowed to set by OST
5227                          * or MDT
5228                          */
5229                         if (!strncmp(param, PARAM_LLITE,
5230                                      sizeof(PARAM_LLITE) - 1))
5231                                 GOTO(out, rc = -EINVAL);
5232
5233                         /* Strip -osc or -mdc suffix from svname */
5234                         if (server_make_name(dev_type, mti->mti_stripe_index,
5235                                              mti->mti_fsname, mti->mti_svname,
5236                                              sizeof(mti->mti_svname)))
5237                                 GOTO(out, rc = -EINVAL);
5238                 }
5239                 break;
5240         }
5241
5242         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
5243             sizeof(mti->mti_params))
5244                 GOTO(out, rc = -E2BIG);
5245
5246         CDEBUG(D_MGS, "set_conf_param fs='%s' device='%s' param='%s'\n",
5247                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5248
5249         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
5250         if (rc)
5251                 GOTO(out, rc);
5252
5253         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
5254             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5255                 CERROR("No filesystem targets for %s. cfg_device from lctl "
5256                        "is '%s'\n", mti->mti_fsname, mti->mti_svname);
5257                 mgs_unlink_fsdb(mgs, fsdb);
5258                 GOTO(out, rc = -EINVAL);
5259         }
5260
5261         /*
5262          * Revoke lock so everyone updates.  Should be alright if
5263          * someone was already reading while we were updating the logs,
5264          * so we don't really need to hold the lock while we're
5265          * writing (above).
5266          */
5267         mti->mti_flags = dev_type | LDD_F_PARAM;
5268         mutex_lock(&fsdb->fsdb_mutex);
5269         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
5270         mutex_unlock(&fsdb->fsdb_mutex);
5271         mgs_revoke_lock(mgs, fsdb, MGS_CFG_T_CONFIG);
5272
5273 out:
5274         if (fsdb)
5275                 mgs_put_fsdb(mgs, fsdb);
5276
5277         RETURN(rc);
5278 }
5279
5280 static int mgs_set_param2(const struct lu_env *env, struct mgs_device *mgs,
5281                           struct mgs_target_info *mti, const char *param)
5282 {
5283         struct fs_db *fsdb = NULL;
5284         int dev_type;
5285         size_t len;
5286         int rc;
5287
5288         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
5289             sizeof(mti->mti_params))
5290                 GOTO(out, rc = -E2BIG);
5291
5292         len = strcspn(param, ".=");
5293         if (len && param[len] != '=') {
5294                 struct list_head *tmp;
5295                 char *ptr;
5296
5297                 param += len + 1;
5298                 ptr = strchr(param, '.');
5299
5300                 len = strlen(param);
5301                 if (ptr)
5302                         len -= strlen(ptr);
5303                 if (len >= sizeof(mti->mti_svname))
5304                         GOTO(out, rc = -E2BIG);
5305
5306                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "%.*s",
5307                         (int)len, param);
5308
5309                 mutex_lock(&mgs->mgs_mutex);
5310                 if (unlikely(list_empty(&mgs->mgs_fs_db_list))) {
5311                         mutex_unlock(&mgs->mgs_mutex);
5312                         GOTO(out, rc = -ENODEV);
5313                 }
5314
5315                 list_for_each(tmp, &mgs->mgs_fs_db_list) {
5316                         fsdb = list_entry(tmp, struct fs_db, fsdb_list);
5317                         if (fsdb->fsdb_has_lproc_entry &&
5318                             strcmp(fsdb->fsdb_name, "params") != 0 &&
5319                             strstr(param, fsdb->fsdb_name)) {
5320                                 snprintf(mti->mti_svname,
5321                                          sizeof(mti->mti_svname), "%s",
5322                                          fsdb->fsdb_name);
5323                                 break;
5324                         }
5325                         fsdb = NULL;
5326                 }
5327
5328                 if (!fsdb) {
5329                         snprintf(mti->mti_svname, sizeof(mti->mti_svname),
5330                                  "general");
5331                 }
5332                 mutex_unlock(&mgs->mgs_mutex);
5333         } else {
5334                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "general");
5335         }
5336
5337         CDEBUG(D_MGS, "set_param2 fs='%s' device='%s' param='%s'\n",
5338                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5339
5340         /* The return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5341          * A returned error tells us we don't have a target obd device.
5342          */
5343         dev_type = server_name2index(mti->mti_svname, &mti->mti_stripe_index,
5344                                      NULL);
5345         if (dev_type < 0)
5346                 dev_type = 0;
5347
5348         /* the return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5349          * Strip -osc or -mdc suffix from svname
5350          */
5351         if ((dev_type & LDD_F_SV_TYPE_OST || dev_type & LDD_F_SV_TYPE_MDT) &&
5352             server_make_name(dev_type, mti->mti_stripe_index,
5353                              mti->mti_fsname, mti->mti_svname,
5354                              sizeof(mti->mti_svname)))
5355                 GOTO(out, rc = -EINVAL);
5356
5357         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5358         if (rc)
5359                 GOTO(out, rc);
5360         /*
5361          * Revoke lock so everyone updates.  Should be alright if
5362          * someone was already reading while we were updating the logs,
5363          * so we don't really need to hold the lock while we're
5364          * writing (above).
5365          */
5366         mti->mti_flags = dev_type | LDD_F_PARAM2;
5367         mutex_lock(&fsdb->fsdb_mutex);
5368         rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
5369         mutex_unlock(&fsdb->fsdb_mutex);
5370         mgs_revoke_lock(mgs, fsdb, MGS_CFG_T_PARAMS);
5371         mgs_put_fsdb(mgs, fsdb);
5372 out:
5373         RETURN(rc);
5374 }
5375
5376 /* Set a permanent (config log) param for a target or fs
5377  *
5378  * @lcfg buf0 may contain the device (testfs-MDT0000) name
5379  *       buf1 contains the single parameter
5380  */
5381 int mgs_set_param(const struct lu_env *env, struct mgs_device *mgs,
5382                   struct lustre_cfg *lcfg)
5383 {
5384         const char *param = lustre_cfg_string(lcfg, 1);
5385         struct mgs_target_info *mti;
5386         int rc;
5387
5388         /* Create a fake mti to hold everything */
5389         OBD_ALLOC_PTR(mti);
5390         if (!mti)
5391                 return -ENOMEM;
5392
5393         print_lustre_cfg(lcfg);
5394
5395         if (lcfg->lcfg_command == LCFG_PARAM) {
5396                 /* For the case of lctl conf_param devname can be
5397                  * lustre, lustre-mdtlov, lustre-client, lustre-MDT0000
5398                  */
5399                 const char *devname = lustre_cfg_string(lcfg, 0);
5400
5401                 rc = mgs_set_conf_param(env, mgs, mti, devname, param);
5402         } else {
5403                 /* In the case of lctl set_param -P lcfg[0] will always
5404                  * be 'general'. At least for now.
5405                  */
5406                 rc = mgs_set_param2(env, mgs, mti, param);
5407         }
5408
5409         OBD_FREE_PTR(mti);
5410
5411         return rc;
5412 }
5413
5414 static int mgs_write_log_pool(const struct lu_env *env,
5415                               struct mgs_device *mgs, char *logname,
5416                               struct fs_db *fsdb, char *tgtname,
5417                               enum lcfg_command_type cmd,
5418                               char *fsname, char *poolname,
5419                               char *ostname, char *comment)
5420 {
5421         struct llog_handle *llh = NULL;
5422         int rc;
5423
5424         rc = record_start_log(env, mgs, &llh, logname);
5425         if (rc)
5426                 return rc;
5427         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
5428         if (rc)
5429                 goto out;
5430         rc = record_base(env, llh, tgtname, 0, cmd,
5431                          fsname, poolname, ostname, NULL);
5432         if (rc)
5433                 goto out;
5434         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
5435 out:
5436         record_end_log(env, &llh);
5437         return rc;
5438 }
5439
5440 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
5441                     enum lcfg_command_type cmd, const char *nodemap_name,
5442                     char *param)
5443 {
5444         lnet_nid_t nid[2];
5445         u32 idmap[2];
5446         bool bool_switch;
5447         u32 int_id;
5448         int rc = 0;
5449
5450         ENTRY;
5451         switch (cmd) {
5452         case LCFG_NODEMAP_ADD:
5453                 rc = nodemap_add(nodemap_name);
5454                 break;
5455         case LCFG_NODEMAP_DEL:
5456                 rc = nodemap_del(nodemap_name);
5457                 break;
5458         case LCFG_NODEMAP_ADD_RANGE:
5459                 rc = nodemap_parse_range(param, nid);
5460                 if (rc != 0)
5461                         break;
5462                 rc = nodemap_add_range(nodemap_name, nid);
5463                 break;
5464         case LCFG_NODEMAP_DEL_RANGE:
5465                 rc = nodemap_parse_range(param, nid);
5466                 if (rc != 0)
5467                         break;
5468                 rc = nodemap_del_range(nodemap_name, nid);
5469                 break;
5470         case LCFG_NODEMAP_ADMIN:
5471                 rc = kstrtobool(param, &bool_switch);
5472                 if (rc)
5473                         break;
5474                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
5475                 break;
5476         case LCFG_NODEMAP_DENY_UNKNOWN:
5477                 rc = kstrtobool(param, &bool_switch);
5478                 if (rc)
5479                         break;
5480                 rc = nodemap_set_deny_unknown(nodemap_name, bool_switch);
5481                 break;
5482         case LCFG_NODEMAP_AUDIT_MODE:
5483                 rc = kstrtobool(param, &bool_switch);
5484                 if (rc == 0)
5485                         rc = nodemap_set_audit_mode(nodemap_name, bool_switch);
5486                 break;
5487         case LCFG_NODEMAP_FORBID_ENCRYPT:
5488                 rc = kstrtobool(param, &bool_switch);
5489                 if (rc == 0)
5490                         rc = nodemap_set_forbid_encryption(nodemap_name,
5491                                                            bool_switch);
5492                 break;
5493         case LCFG_NODEMAP_MAP_MODE:
5494                 if (strcmp("both", param) == 0)
5495                         rc = nodemap_set_mapping_mode(nodemap_name,
5496                                                       NODEMAP_MAP_BOTH);
5497                 else if (strcmp("uid_only", param) == 0)
5498                         rc = nodemap_set_mapping_mode(nodemap_name,
5499                                                       NODEMAP_MAP_UID_ONLY);
5500                 else if (strcmp("gid_only", param) == 0)
5501                         rc = nodemap_set_mapping_mode(nodemap_name,
5502                                                       NODEMAP_MAP_GID_ONLY);
5503                 else
5504                         rc = -EINVAL;
5505                 break;
5506         case LCFG_NODEMAP_TRUSTED:
5507                 rc = kstrtobool(param, &bool_switch);
5508                 if (rc)
5509                         break;
5510                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
5511                 break;
5512         case LCFG_NODEMAP_SQUASH_UID:
5513                 rc = kstrtouint(param, 10, &int_id);
5514                 if (rc)
5515                         break;
5516                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
5517                 break;
5518         case LCFG_NODEMAP_SQUASH_GID:
5519                 rc = kstrtouint(param, 10, &int_id);
5520                 if (rc)
5521                         break;
5522                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
5523                 break;
5524         case LCFG_NODEMAP_ADD_UIDMAP:
5525         case LCFG_NODEMAP_ADD_GIDMAP:
5526                 rc = nodemap_parse_idmap(param, idmap);
5527                 if (rc != 0)
5528                         break;
5529                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
5530                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
5531                                                idmap);
5532                 else
5533                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
5534                                                idmap);
5535                 break;
5536         case LCFG_NODEMAP_DEL_UIDMAP:
5537         case LCFG_NODEMAP_DEL_GIDMAP:
5538                 rc = nodemap_parse_idmap(param, idmap);
5539                 if (rc != 0)
5540                         break;
5541                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
5542                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
5543                                                idmap);
5544                 else
5545                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
5546                                                idmap);
5547                 break;
5548         case LCFG_NODEMAP_SET_FILESET:
5549                 rc = nodemap_set_fileset(nodemap_name, param);
5550                 break;
5551         case LCFG_NODEMAP_SET_SEPOL:
5552                 rc = nodemap_set_sepol(nodemap_name, param);
5553                 break;
5554         default:
5555                 rc = -EINVAL;
5556         }
5557
5558         RETURN(rc);
5559 }
5560
5561 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
5562                  enum lcfg_command_type cmd, char *fsname,
5563                  char *poolname, char *ostname)
5564 {
5565         struct fs_db *fsdb;
5566         char *lovname;
5567         char *logname;
5568         char *label = NULL, *canceled_label = NULL;
5569         int label_sz;
5570         struct mgs_target_info *mti = NULL;
5571         bool checked = false;
5572         bool locked = false;
5573         bool free = false;
5574         int rc, i;
5575         ENTRY;
5576
5577         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
5578         if (rc) {
5579                 CERROR("Can't get db for %s\n", fsname);
5580                 RETURN(rc);
5581         }
5582         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5583                 CERROR("%s is not defined\n", fsname);
5584                 free = true;
5585                 GOTO(out_fsdb, rc = -EINVAL);
5586         }
5587
5588         label_sz = 10 + strlen(fsname) + strlen(poolname);
5589
5590         /* check if ostname match fsname */
5591         if (ostname != NULL) {
5592                 char *ptr;
5593
5594                 ptr = strrchr(ostname, '-');
5595                 if ((ptr == NULL) ||
5596                     (strncmp(fsname, ostname, ptr-ostname) != 0))
5597                         RETURN(-EINVAL);
5598                 label_sz += strlen(ostname);
5599         }
5600
5601         OBD_ALLOC(label, label_sz);
5602         if (!label)
5603                 GOTO(out_fsdb, rc = -ENOMEM);
5604
5605         switch(cmd) {
5606         case LCFG_POOL_NEW:
5607                 sprintf(label,
5608                         "new %s.%s", fsname, poolname);
5609                 break;
5610         case LCFG_POOL_ADD:
5611                 sprintf(label,
5612                         "add %s.%s.%s", fsname, poolname, ostname);
5613                 break;
5614         case LCFG_POOL_REM:
5615                 OBD_ALLOC(canceled_label, label_sz);
5616                 if (canceled_label == NULL)
5617                         GOTO(out_label, rc = -ENOMEM);
5618                 sprintf(label,
5619                         "rem %s.%s.%s", fsname, poolname, ostname);
5620                 sprintf(canceled_label,
5621                         "add %s.%s.%s", fsname, poolname, ostname);
5622                 break;
5623         case LCFG_POOL_DEL:
5624                 OBD_ALLOC(canceled_label, label_sz);
5625                 if (canceled_label == NULL)
5626                         GOTO(out_label, rc = -ENOMEM);
5627                 sprintf(label,
5628                         "del %s.%s", fsname, poolname);
5629                 sprintf(canceled_label,
5630                         "new %s.%s", fsname, poolname);
5631                 break;
5632         default:
5633                 break;
5634         }
5635
5636         OBD_ALLOC_PTR(mti);
5637         if (mti == NULL)
5638                 GOTO(out_cancel, rc = -ENOMEM);
5639         strncpy(mti->mti_svname, "lov pool", sizeof(mti->mti_svname));
5640
5641         mutex_lock(&fsdb->fsdb_mutex);
5642         locked = true;
5643         /* write pool def to all MDT logs */
5644         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
5645                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
5646                         rc = name_create_mdt_and_lov(&logname, &lovname,
5647                                                      fsdb, i);
5648                         if (rc)
5649                                 GOTO(out_mti, rc);
5650
5651                         if (!checked && (canceled_label == NULL)) {
5652                                 rc = mgs_check_marker(env, mgs, fsdb, mti,
5653                                                 logname, lovname, label);
5654                                 if (rc) {
5655                                         name_destroy(&logname);
5656                                         name_destroy(&lovname);
5657                                         GOTO(out_mti,
5658                                                 rc = (rc == LLOG_PROC_BREAK ?
5659                                                         -EEXIST : rc));
5660                                 }
5661                                 checked = true;
5662                         }
5663                         if (canceled_label != NULL)
5664                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5665                                                 lovname, canceled_label,
5666                                                 CM_SKIP);
5667
5668                         if (rc >= 0)
5669                                 rc = mgs_write_log_pool(env, mgs, logname,
5670                                                         fsdb, lovname, cmd,
5671                                                         fsname, poolname,
5672                                                         ostname, label);
5673                         name_destroy(&logname);
5674                         name_destroy(&lovname);
5675                         if (rc)
5676                                 GOTO(out_mti, rc);
5677                 }
5678         }
5679
5680         rc = name_create(&logname, fsname, "-client");
5681         if (rc)
5682                 GOTO(out_mti, rc);
5683
5684         if (!checked && (canceled_label == NULL)) {
5685                 rc = mgs_check_marker(env, mgs, fsdb, mti, logname,
5686                                 fsdb->fsdb_clilov, label);
5687                 if (rc) {
5688                         name_destroy(&logname);
5689                         GOTO(out_mti, rc = (rc == LLOG_PROC_BREAK ?
5690                                 -EEXIST : rc));
5691                 }
5692         }
5693         if (canceled_label != NULL) {
5694                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5695                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
5696                 if (rc < 0) {
5697                         name_destroy(&logname);
5698                         GOTO(out_mti, rc);
5699                 }
5700         }
5701
5702         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
5703                                 cmd, fsname, poolname, ostname, label);
5704         mutex_unlock(&fsdb->fsdb_mutex);
5705         locked = false;
5706         name_destroy(&logname);
5707         /* request for update */
5708         mgs_revoke_lock(mgs, fsdb, MGS_CFG_T_CONFIG);
5709
5710         GOTO(out_mti, rc);
5711
5712 out_mti:
5713         if (locked)
5714                 mutex_unlock(&fsdb->fsdb_mutex);
5715         if (mti != NULL)
5716                 OBD_FREE_PTR(mti);
5717 out_cancel:
5718         if (canceled_label != NULL)
5719                 OBD_FREE(canceled_label, label_sz);
5720 out_label:
5721         OBD_FREE(label, label_sz);
5722 out_fsdb:
5723         if (free)
5724                 mgs_unlink_fsdb(mgs, fsdb);
5725         mgs_put_fsdb(mgs, fsdb);
5726
5727         return rc;
5728 }