Whamcloud - gitweb
2dc1a8ab87aa31cedf6e89539a7b158630aa138c
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgs/mgs_llog.c
33  *
34  * Lustre Management Server (mgs) config llog creation
35  *
36  * Author: Nathan Rutman <nathan@clusterfs.com>
37  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
38  * Author: Mikhail Pershin <tappro@whamcloud.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MGS
42 #define D_MGS D_CONFIG
43
44 #include <obd.h>
45 #include <uapi/linux/lustre/lustre_ioctl.h>
46 #include <uapi/linux/lustre/lustre_param.h>
47 #include <lustre_sec.h>
48 #include <lustre_quota.h>
49 #include <lustre_sec.h>
50
51 #include "mgs_internal.h"
52
53 /********************** Class functions ********************/
54
55 /**
56  * Find all logs in CONFIG directory and link then into list.
57  *
58  * \param[in] env       pointer to the thread context
59  * \param[in] mgs       pointer to the mgs device
60  * \param[out] log_list the list to hold the found llog name entry
61  *
62  * \retval              0 for success
63  * \retval              negative error number on failure
64  **/
65 int class_dentry_readdir(const struct lu_env *env, struct mgs_device *mgs,
66                          struct list_head *log_list)
67 {
68         struct dt_object *dir = mgs->mgs_configs_dir;
69         const struct dt_it_ops *iops;
70         struct dt_it *it;
71         struct mgs_direntry *de;
72         char *key;
73         int rc, key_sz;
74
75         INIT_LIST_HEAD(log_list);
76
77         LASSERT(dir);
78         LASSERT(dir->do_index_ops);
79
80         iops = &dir->do_index_ops->dio_it;
81         it = iops->init(env, dir, LUDA_64BITHASH);
82         if (IS_ERR(it))
83                 RETURN(PTR_ERR(it));
84
85         rc = iops->load(env, it, 0);
86         if (rc <= 0)
87                 GOTO(fini, rc = 0);
88
89         /* main cycle */
90         do {
91                 key = (void *)iops->key(env, it);
92                 if (IS_ERR(key)) {
93                         CERROR("%s: key failed when listing %s: rc = %d\n",
94                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
95                                (int) PTR_ERR(key));
96                         goto next;
97                 }
98                 key_sz = iops->key_size(env, it);
99                 LASSERT(key_sz > 0);
100
101                 /* filter out "." and ".." entries */
102                 if (key[0] == '.') {
103                         if (key_sz == 1)
104                                 goto next;
105                         if (key_sz == 2 && key[1] == '.')
106                                 goto next;
107                 }
108
109                 /* filter out ".bak" files */
110                 /* sizeof(".bak") - 1 == 3 */
111                 if (key_sz >= 3 &&
112                     !memcmp(".bak", key + key_sz - 3, 3)) {
113                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
114                                key_sz, key);
115                         goto next;
116                 }
117
118                 de = mgs_direntry_alloc(key_sz + 1);
119                 if (de == NULL) {
120                         rc = -ENOMEM;
121                         break;
122                 }
123
124                 memcpy(de->mde_name, key, key_sz);
125                 de->mde_name[key_sz] = 0;
126
127                 list_add(&de->mde_list, log_list);
128
129 next:
130                 rc = iops->next(env, it);
131         } while (rc == 0);
132         if (rc > 0)
133                 rc = 0;
134
135         iops->put(env, it);
136
137 fini:
138         iops->fini(env, it);
139         if (rc) {
140                 struct mgs_direntry *n;
141
142                 CERROR("%s: key failed when listing %s: rc = %d\n",
143                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
144
145                 list_for_each_entry_safe(de, n, log_list, mde_list) {
146                         list_del_init(&de->mde_list);
147                         mgs_direntry_free(de);
148                 }
149         }
150
151         RETURN(rc);
152 }
153
154 /******************** DB functions *********************/
155
156 static inline int name_create(char **newname, char *prefix, char *suffix)
157 {
158         LASSERT(newname);
159         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
160         if (!*newname)
161                 return -ENOMEM;
162         sprintf(*newname, "%s%s", prefix, suffix);
163         return 0;
164 }
165
166 static inline void name_destroy(char **name)
167 {
168         if (*name)
169                 OBD_FREE(*name, strlen(*name) + 1);
170         *name = NULL;
171 }
172
173 struct mgs_fsdb_handler_data
174 {
175         struct fs_db   *fsdb;
176         __u32           ver;
177 };
178
179 /* from the (client) config log, figure out:
180         1. which ost's/mdt's are configured (by index)
181         2. what the last config step is
182         3. COMPAT_18 osc name
183 */
184 /* It might be better to have a separate db file, instead of parsing the info
185    out of the client log.  This is slow and potentially error-prone. */
186 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
187                             struct llog_rec_hdr *rec, void *data)
188 {
189         struct mgs_fsdb_handler_data *d = data;
190         struct fs_db *fsdb = d->fsdb;
191         int cfg_len = rec->lrh_len;
192         char *cfg_buf = (char*) (rec + 1);
193         struct lustre_cfg *lcfg;
194         __u32 index;
195         int rc = 0;
196         ENTRY;
197
198         if (rec->lrh_type != OBD_CFG_REC) {
199                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
200                 RETURN(-EINVAL);
201         }
202
203         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
204         if (rc) {
205                 CERROR("Insane cfg\n");
206                 RETURN(rc);
207         }
208
209         lcfg = (struct lustre_cfg *)cfg_buf;
210
211         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
212                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
213
214         /* Figure out ost indicies */
215         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
216         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
217             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
218                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
219                                        NULL, 10);
220                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
221                        lustre_cfg_string(lcfg, 1), index,
222                        lustre_cfg_string(lcfg, 2));
223                 set_bit(index, fsdb->fsdb_ost_index_map);
224         }
225
226         /* Figure out mdt indicies */
227         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
228         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
229             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
230                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
231                                        &index, NULL);
232                 if (rc != LDD_F_SV_TYPE_MDT) {
233                         CWARN("Unparsable MDC name %s, assuming index 0\n",
234                               lustre_cfg_string(lcfg, 0));
235                         index = 0;
236                 }
237                 rc = 0;
238                 CDEBUG(D_MGS, "MDT index is %u\n", index);
239                 if (!test_bit(index, fsdb->fsdb_mdt_index_map)) {
240                         set_bit(index, fsdb->fsdb_mdt_index_map);
241                         fsdb->fsdb_mdt_count++;
242                 }
243         }
244
245         /**
246          * figure out the old config. fsdb_gen = 0 means old log
247          * It is obsoleted and not supported anymore
248          */
249         if (fsdb->fsdb_gen == 0) {
250                 CERROR("Old config format is not supported\n");
251                 RETURN(-EINVAL);
252         }
253
254         /*
255          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
256          */
257         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
258             lcfg->lcfg_command == LCFG_ATTACH &&
259             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
260                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
261                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
262                         CWARN("MDT using 1.8 OSC name scheme\n");
263                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
264                 }
265         }
266
267         if (lcfg->lcfg_command == LCFG_MARKER) {
268                 struct cfg_marker *marker;
269                 marker = lustre_cfg_buf(lcfg, 1);
270
271                 d->ver = marker->cm_vers;
272
273                 /* Keep track of the latest marker step */
274                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
275         }
276
277         RETURN(rc);
278 }
279
280 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
281 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
282                                   struct mgs_device *mgs,
283                                   struct fs_db *fsdb)
284 {
285         char *logname;
286         struct llog_handle *loghandle;
287         struct llog_ctxt *ctxt;
288         struct mgs_fsdb_handler_data d = {
289                 .fsdb = fsdb,
290         };
291         int rc;
292
293         ENTRY;
294
295         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
296         LASSERT(ctxt != NULL);
297         rc = name_create(&logname, fsdb->fsdb_name, "-client");
298         if (rc)
299                 GOTO(out_put, rc);
300         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
301         if (rc)
302                 GOTO(out_pop, rc);
303
304         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
305         if (rc)
306                 GOTO(out_close, rc);
307
308         if (llog_get_size(loghandle) <= 1)
309                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
310
311         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
312         CDEBUG(D_INFO, "get_db = %d\n", rc);
313 out_close:
314         llog_close(env, loghandle);
315 out_pop:
316         name_destroy(&logname);
317 out_put:
318         llog_ctxt_put(ctxt);
319
320         RETURN(rc);
321 }
322
323 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
324 {
325         struct mgs_tgt_srpc_conf *tgtconf;
326
327         /* free target-specific rules */
328         while (fsdb->fsdb_srpc_tgt) {
329                 tgtconf = fsdb->fsdb_srpc_tgt;
330                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
331
332                 LASSERT(tgtconf->mtsc_tgt);
333
334                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
335                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
336                 OBD_FREE_PTR(tgtconf);
337         }
338
339         /* free general rules */
340         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
341 }
342
343 static void mgs_unlink_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
344 {
345         mutex_lock(&mgs->mgs_mutex);
346         if (likely(!list_empty(&fsdb->fsdb_list))) {
347                 LASSERTF(atomic_read(&fsdb->fsdb_ref) >= 2,
348                          "Invalid ref %d on %s\n",
349                          atomic_read(&fsdb->fsdb_ref),
350                          fsdb->fsdb_name);
351
352                 list_del_init(&fsdb->fsdb_list);
353                 /* Drop the reference on the list.*/
354                 mgs_put_fsdb(mgs, fsdb);
355         }
356         mutex_unlock(&mgs->mgs_mutex);
357 }
358
359 /* The caller must hold mgs->mgs_mutex. */
360 static inline struct fs_db *
361 mgs_find_fsdb_noref(struct mgs_device *mgs, const char *fsname)
362 {
363         struct fs_db *fsdb;
364         struct list_head *tmp;
365
366         list_for_each(tmp, &mgs->mgs_fs_db_list) {
367                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
368                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
369                         return fsdb;
370         }
371
372         return NULL;
373 }
374
375 /* The caller must hold mgs->mgs_mutex. */
376 static void mgs_remove_fsdb_by_name(struct mgs_device *mgs, const char *name)
377 {
378         struct fs_db *fsdb;
379
380         fsdb = mgs_find_fsdb_noref(mgs, name);
381         if (fsdb) {
382                 list_del_init(&fsdb->fsdb_list);
383                 /* Drop the reference on the list.*/
384                 mgs_put_fsdb(mgs, fsdb);
385         }
386 }
387
388 /* The caller must hold mgs->mgs_mutex. */
389 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, const char *fsname)
390 {
391         struct fs_db *fsdb;
392
393         fsdb = mgs_find_fsdb_noref(mgs, fsname);
394         if (fsdb)
395                 atomic_inc(&fsdb->fsdb_ref);
396
397         return fsdb;
398 }
399
400 /* The caller must hold mgs->mgs_mutex. */
401 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
402                                   struct mgs_device *mgs, char *fsname)
403 {
404         struct fs_db *fsdb;
405         int rc;
406         ENTRY;
407
408         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
409                 CERROR("fsname %s is too long\n", fsname);
410
411                 RETURN(ERR_PTR(-EINVAL));
412         }
413
414         OBD_ALLOC_PTR(fsdb);
415         if (!fsdb)
416                 RETURN(ERR_PTR(-ENOMEM));
417
418         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
419         mutex_init(&fsdb->fsdb_mutex);
420         INIT_LIST_HEAD(&fsdb->fsdb_list);
421         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
422         fsdb->fsdb_gen = 1;
423         INIT_LIST_HEAD(&fsdb->fsdb_clients);
424         atomic_set(&fsdb->fsdb_notify_phase, 0);
425         init_waitqueue_head(&fsdb->fsdb_notify_waitq);
426         init_completion(&fsdb->fsdb_notify_comp);
427
428         if (strcmp(fsname, MGSSELF_NAME) == 0) {
429                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
430                 fsdb->fsdb_mgs = mgs;
431                 if (logname_is_barrier(fsname))
432                         goto add;
433         } else {
434                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
435                 if (!fsdb->fsdb_mdt_index_map) {
436                         CERROR("No memory for MDT index maps\n");
437
438                         GOTO(err, rc = -ENOMEM);
439                 }
440
441                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
442                 if (!fsdb->fsdb_ost_index_map) {
443                         CERROR("No memory for OST index maps\n");
444
445                         GOTO(err, rc = -ENOMEM);
446                 }
447
448                 if (logname_is_barrier(fsname))
449                         goto add;
450
451                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
452                 if (rc)
453                         GOTO(err, rc);
454
455                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
456                 if (rc)
457                         GOTO(err, rc);
458
459                 /* initialise data for NID table */
460                 mgs_ir_init_fs(env, mgs, fsdb);
461                 lproc_mgs_add_live(mgs, fsdb);
462         }
463
464         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
465             strcmp(PARAMS_FILENAME, fsname) != 0) {
466                 /* populate the db from the client llog */
467                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
468                 if (rc) {
469                         CERROR("Can't get db from client log %d\n", rc);
470
471                         GOTO(err, rc);
472                 }
473         }
474
475         /* populate srpc rules from params llog */
476         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
477         if (rc) {
478                 CERROR("Can't get db from params log %d\n", rc);
479
480                 GOTO(err, rc);
481         }
482
483 add:
484         /* One ref is for the fsdb on the list.
485          * The other ref is for the caller. */
486         atomic_set(&fsdb->fsdb_ref, 2);
487         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
488
489         RETURN(fsdb);
490
491 err:
492         atomic_set(&fsdb->fsdb_ref, 1);
493         mgs_put_fsdb(mgs, fsdb);
494
495         RETURN(ERR_PTR(rc));
496 }
497
498 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
499 {
500         LASSERT(list_empty(&fsdb->fsdb_list));
501
502         lproc_mgs_del_live(mgs, fsdb);
503
504         /* deinitialize fsr */
505         if (fsdb->fsdb_mgs)
506                 mgs_ir_fini_fs(mgs, fsdb);
507
508         if (fsdb->fsdb_ost_index_map)
509                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
510         if (fsdb->fsdb_mdt_index_map)
511                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
512         name_destroy(&fsdb->fsdb_clilov);
513         name_destroy(&fsdb->fsdb_clilmv);
514         mgs_free_fsdb_srpc(fsdb);
515         OBD_FREE_PTR(fsdb);
516 }
517
518 void mgs_put_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
519 {
520         if (atomic_dec_and_test(&fsdb->fsdb_ref))
521                 mgs_free_fsdb(mgs, fsdb);
522 }
523
524 int mgs_init_fsdb_list(struct mgs_device *mgs)
525 {
526         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
527         return 0;
528 }
529
530 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
531 {
532         struct fs_db *fsdb;
533         struct list_head *tmp, *tmp2;
534
535         mutex_lock(&mgs->mgs_mutex);
536         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
537                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
538                 list_del_init(&fsdb->fsdb_list);
539                 mgs_put_fsdb(mgs, fsdb);
540         }
541         mutex_unlock(&mgs->mgs_mutex);
542         return 0;
543 }
544
545 /* The caller must hold mgs->mgs_mutex. */
546 int mgs_find_or_make_fsdb_nolock(const struct lu_env *env,
547                                 struct mgs_device *mgs,
548                                 char *name, struct fs_db **dbh)
549 {
550         struct fs_db *fsdb;
551         int rc = 0;
552         ENTRY;
553
554         fsdb = mgs_find_fsdb(mgs, name);
555         if (!fsdb) {
556                 fsdb = mgs_new_fsdb(env, mgs, name);
557                 if (IS_ERR(fsdb))
558                         rc = PTR_ERR(fsdb);
559
560                 CDEBUG(D_MGS, "Created new db: rc = %d\n", rc);
561         }
562
563         if (!rc)
564                 *dbh = fsdb;
565
566         RETURN(rc);
567 }
568
569 int mgs_find_or_make_fsdb(const struct lu_env *env, struct mgs_device *mgs,
570                           char *name, struct fs_db **dbh)
571 {
572         int rc;
573         ENTRY;
574
575         mutex_lock(&mgs->mgs_mutex);
576         rc = mgs_find_or_make_fsdb_nolock(env, mgs, name, dbh);
577         mutex_unlock(&mgs->mgs_mutex);
578
579         RETURN(rc);
580 }
581
582 /* 1 = index in use
583    0 = index unused
584    -1= empty client log */
585 int mgs_check_index(const struct lu_env *env,
586                     struct mgs_device *mgs,
587                     struct mgs_target_info *mti)
588 {
589         struct fs_db *fsdb;
590         void *imap;
591         int rc = 0;
592         ENTRY;
593
594         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
595
596         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
597         if (rc) {
598                 CERROR("Can't get db for %s\n", mti->mti_fsname);
599                 RETURN(rc);
600         }
601
602         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
603                 GOTO(out, rc = -1);
604
605         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
606                 imap = fsdb->fsdb_ost_index_map;
607         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
608                 imap = fsdb->fsdb_mdt_index_map;
609         else
610                 GOTO(out, rc = -EINVAL);
611
612         if (test_bit(mti->mti_stripe_index, imap))
613                 GOTO(out, rc = 1);
614
615         GOTO(out, rc = 0);
616
617 out:
618         mgs_put_fsdb(mgs, fsdb);
619         return rc;
620 }
621
622 static __inline__ int next_index(void *index_map, int map_len)
623 {
624         int i;
625         for (i = 0; i < map_len * 8; i++)
626                 if (!test_bit(i, index_map)) {
627                          return i;
628                  }
629         CERROR("max index %d exceeded.\n", i);
630         return -1;
631 }
632
633 /* Make the mdt/ost server obd name based on the filesystem name */
634 static bool server_make_name(u32 flags, u16 index, const char *fs,
635                              char *name_buf, size_t name_buf_size)
636 {
637         bool invalid_flag = false;
638
639         if (flags & (LDD_F_SV_TYPE_MDT | LDD_F_SV_TYPE_OST)) {
640                 if (!(flags & LDD_F_SV_ALL))
641                         snprintf(name_buf, name_buf_size, "%.8s%c%s%04x", fs,
642                                 (flags & LDD_F_VIRGIN) ? ':' :
643                                         ((flags & LDD_F_WRITECONF) ? '=' : '-'),
644                                 (flags & LDD_F_SV_TYPE_MDT) ? "MDT" : "OST",
645                                 index);
646         } else if (flags & LDD_F_SV_TYPE_MGS) {
647                 snprintf(name_buf, name_buf_size, "MGS");
648         } else {
649                 CERROR("unknown server type %#x\n", flags);
650                 invalid_flag = true;
651         }
652         return invalid_flag;
653 }
654
655 /* Return codes:
656         0  newly marked as in use
657         <0 err
658         +EALREADY for update of an old index */
659 static int mgs_set_index(const struct lu_env *env,
660                          struct mgs_device *mgs,
661                          struct mgs_target_info *mti)
662 {
663         struct fs_db *fsdb;
664         void *imap;
665         int rc = 0;
666         ENTRY;
667
668         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
669         if (rc) {
670                 CERROR("Can't get db for %s\n", mti->mti_fsname);
671                 RETURN(rc);
672         }
673
674         mutex_lock(&fsdb->fsdb_mutex);
675         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
676                 imap = fsdb->fsdb_ost_index_map;
677         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
678                 imap = fsdb->fsdb_mdt_index_map;
679         } else {
680                 GOTO(out_up, rc = -EINVAL);
681         }
682
683         if (mti->mti_flags & LDD_F_NEED_INDEX) {
684                 rc = next_index(imap, INDEX_MAP_SIZE);
685                 if (rc == -1)
686                         GOTO(out_up, rc = -ERANGE);
687                 mti->mti_stripe_index = rc;
688         }
689
690         /* the last index(0xffff) is reserved for default value. */
691         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
692                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
693                                    "but index must be less than %u.\n",
694                                    mti->mti_svname, mti->mti_stripe_index,
695                                    INDEX_MAP_SIZE * 8 - 1);
696                 GOTO(out_up, rc = -ERANGE);
697         }
698
699         if (test_bit(mti->mti_stripe_index, imap)) {
700                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
701                     !(mti->mti_flags & LDD_F_WRITECONF)) {
702                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
703                                            "%d, but that index is already in "
704                                            "use. Use --writeconf to force\n",
705                                            mti->mti_svname,
706                                            mti->mti_stripe_index);
707                         GOTO(out_up, rc = -EADDRINUSE);
708                 } else {
709                         CDEBUG(D_MGS, "Server %s updating index %d\n",
710                                mti->mti_svname, mti->mti_stripe_index);
711                         GOTO(out_up, rc = EALREADY);
712                 }
713         } else {
714                 set_bit(mti->mti_stripe_index, imap);
715                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
716                         fsdb->fsdb_mdt_count++;
717         }
718
719         set_bit(mti->mti_stripe_index, imap);
720         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
721         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
722                              mti->mti_stripe_index, mti->mti_fsname,
723                              mti->mti_svname, sizeof(mti->mti_svname))) {
724                 CERROR("unknown server type %#x\n", mti->mti_flags);
725                 GOTO(out_up, rc = -EINVAL);
726         }
727
728         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
729                mti->mti_stripe_index);
730
731         GOTO(out_up, rc = 0);
732
733 out_up:
734         mutex_unlock(&fsdb->fsdb_mutex);
735         mgs_put_fsdb(mgs, fsdb);
736         return rc;
737 }
738
739 struct mgs_modify_lookup {
740         struct cfg_marker mml_marker;
741         int               mml_modified;
742 };
743
744 static int mgs_check_record_match(const struct lu_env *env,
745                                 struct llog_handle *llh,
746                                 struct llog_rec_hdr *rec, void *data)
747 {
748         struct cfg_marker *mc_marker = data;
749         struct cfg_marker *marker;
750         struct lustre_cfg *lcfg = REC_DATA(rec);
751         int cfg_len = REC_DATA_LEN(rec);
752         int rc;
753         ENTRY;
754
755
756         if (rec->lrh_type != OBD_CFG_REC) {
757                 CDEBUG(D_ERROR, "Unhandled lrh_type: %#x\n", rec->lrh_type);
758                 RETURN(-EINVAL);
759         }
760
761         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
762         if (rc) {
763                 CDEBUG(D_ERROR, "Insane cfg\n");
764                 RETURN(rc);
765         }
766
767         /* We only care about markers */
768         if (lcfg->lcfg_command != LCFG_MARKER)
769                 RETURN(0);
770
771         marker = lustre_cfg_buf(lcfg, 1);
772
773         if (marker->cm_flags & CM_SKIP)
774                 RETURN(0);
775
776         if ((strcmp(mc_marker->cm_comment, marker->cm_comment) == 0) &&
777                 (strcmp(mc_marker->cm_tgtname, marker->cm_tgtname) == 0)) {
778                 /* Found a non-skipped marker match */
779                 CDEBUG(D_MGS, "Matched rec %u marker %d flag %x %s %s\n",
780                         rec->lrh_index, marker->cm_step,
781                         marker->cm_flags, marker->cm_tgtname,
782                         marker->cm_comment);
783                 rc = LLOG_PROC_BREAK;
784         }
785
786         RETURN(rc);
787 }
788
789 /**
790  * Check an existing config log record with matching comment and device
791  * Return code:
792  * 0 - checked successfully,
793  * LLOG_PROC_BREAK - record matches
794  * negative - error
795  */
796 static int mgs_check_marker(const struct lu_env *env, struct mgs_device *mgs,
797                 struct fs_db *fsdb, struct mgs_target_info *mti,
798                 char *logname, char *devname, char *comment)
799 {
800         struct llog_handle *loghandle;
801         struct llog_ctxt *ctxt;
802         struct cfg_marker *mc_marker;
803         int rc;
804
805         ENTRY;
806
807         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
808         CDEBUG(D_MGS, "mgs check %s/%s/%s\n", logname, devname, comment);
809
810         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
811         LASSERT(ctxt != NULL);
812         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
813         if (rc < 0) {
814                 if (rc == -ENOENT)
815                         rc = 0;
816                 GOTO(out_pop, rc);
817         }
818
819         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
820         if (rc)
821                 GOTO(out_close, rc);
822
823         if (llog_get_size(loghandle) <= 1)
824                 GOTO(out_close, rc = 0);
825
826         OBD_ALLOC_PTR(mc_marker);
827         if (!mc_marker)
828                 GOTO(out_close, rc = -ENOMEM);
829         if (strlcpy(mc_marker->cm_comment, comment,
830                 sizeof(mc_marker->cm_comment)) >=
831                 sizeof(mc_marker->cm_comment))
832                 GOTO(out_free, rc = -E2BIG);
833         if (strlcpy(mc_marker->cm_tgtname, devname,
834                 sizeof(mc_marker->cm_tgtname)) >=
835                 sizeof(mc_marker->cm_tgtname))
836                 GOTO(out_free, rc = -E2BIG);
837
838         rc = llog_process(env, loghandle, mgs_check_record_match,
839                         (void *)mc_marker, NULL);
840
841 out_free:
842         OBD_FREE_PTR(mc_marker);
843
844 out_close:
845         llog_close(env, loghandle);
846 out_pop:
847         if (rc && rc != LLOG_PROC_BREAK)
848                 CDEBUG(D_ERROR, "%s: mgs check %s/%s failed: rc = %d\n",
849                         mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
850         llog_ctxt_put(ctxt);
851         RETURN(rc);
852 }
853
854 static int mgs_modify_handler(const struct lu_env *env,
855                               struct llog_handle *llh,
856                               struct llog_rec_hdr *rec, void *data)
857 {
858         struct mgs_modify_lookup *mml = data;
859         struct cfg_marker *marker;
860         struct lustre_cfg *lcfg = REC_DATA(rec);
861         int cfg_len = REC_DATA_LEN(rec);
862         int rc;
863         ENTRY;
864
865         if (rec->lrh_type != OBD_CFG_REC) {
866                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
867                 RETURN(-EINVAL);
868         }
869
870         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
871         if (rc) {
872                 CERROR("Insane cfg\n");
873                 RETURN(rc);
874         }
875
876         /* We only care about markers */
877         if (lcfg->lcfg_command != LCFG_MARKER)
878                 RETURN(0);
879
880         marker = lustre_cfg_buf(lcfg, 1);
881         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
882             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
883             !(marker->cm_flags & CM_SKIP)) {
884                 /* Found a non-skipped marker match */
885                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
886                        rec->lrh_index, marker->cm_step,
887                        marker->cm_flags, mml->mml_marker.cm_flags,
888                        marker->cm_tgtname, marker->cm_comment);
889                 /* Overwrite the old marker llog entry */
890                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
891                 marker->cm_flags |= mml->mml_marker.cm_flags;
892                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
893                 rc = llog_write(env, llh, rec, rec->lrh_index);
894                 if (!rc)
895                          mml->mml_modified++;
896         }
897
898         RETURN(rc);
899 }
900
901 /**
902  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
903  * Return code:
904  * 0 - modified successfully,
905  * 1 - no modification was done
906  * negative - error
907  */
908 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
909                       struct fs_db *fsdb, struct mgs_target_info *mti,
910                       char *logname, char *devname, char *comment, int flags)
911 {
912         struct llog_handle *loghandle;
913         struct llog_ctxt *ctxt;
914         struct mgs_modify_lookup *mml;
915         int rc;
916
917         ENTRY;
918
919         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
920         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
921                flags);
922
923         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
924         LASSERT(ctxt != NULL);
925         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
926         if (rc < 0) {
927                 if (rc == -ENOENT)
928                         rc = 0;
929                 GOTO(out_pop, rc);
930         }
931
932         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
933         if (rc)
934                 GOTO(out_close, rc);
935
936         if (llog_get_size(loghandle) <= 1)
937                 GOTO(out_close, rc = 0);
938
939         OBD_ALLOC_PTR(mml);
940         if (!mml)
941                 GOTO(out_close, rc = -ENOMEM);
942         if (strlcpy(mml->mml_marker.cm_comment, comment,
943                     sizeof(mml->mml_marker.cm_comment)) >=
944             sizeof(mml->mml_marker.cm_comment))
945                 GOTO(out_free, rc = -E2BIG);
946         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
947                     sizeof(mml->mml_marker.cm_tgtname)) >=
948             sizeof(mml->mml_marker.cm_tgtname))
949                 GOTO(out_free, rc = -E2BIG);
950         /* Modify mostly means cancel */
951         mml->mml_marker.cm_flags = flags;
952         mml->mml_marker.cm_canceltime = flags ? ktime_get_real_seconds() : 0;
953         mml->mml_modified = 0;
954         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
955                           NULL);
956         if (!rc && !mml->mml_modified)
957                 rc = 1;
958
959 out_free:
960         OBD_FREE_PTR(mml);
961
962 out_close:
963         llog_close(env, loghandle);
964 out_pop:
965         if (rc < 0)
966                 CERROR("%s: modify %s/%s failed: rc = %d\n",
967                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
968         llog_ctxt_put(ctxt);
969         RETURN(rc);
970 }
971
972 enum replace_state {
973         REPLACE_COPY = 0,
974         REPLACE_SKIP,
975         REPLACE_DONE,
976         REPLACE_UUID,
977         REPLACE_SETUP
978 };
979
980 /** This structure is passed to mgs_replace_handler */
981 struct mgs_replace_data {
982         /* Nids are replaced for this target device */
983         struct mgs_target_info target;
984         /* Temporary modified llog */
985         struct llog_handle *temp_llh;
986         enum replace_state state;
987         char *failover;
988         char *nodeuuid;
989 };
990
991 /**
992  * Check: a) if block should be skipped
993  * b) is it target block
994  *
995  * \param[in] lcfg
996  * \param[in] mrd
997  *
998  * \retval 0 should not to be skipped
999  * \retval 1 should to be skipped
1000  */
1001 static int check_markers(struct lustre_cfg *lcfg,
1002                          struct mgs_replace_data *mrd)
1003 {
1004          struct cfg_marker *marker;
1005
1006         /* Track markers. Find given device */
1007         if (lcfg->lcfg_command == LCFG_MARKER) {
1008                 marker = lustre_cfg_buf(lcfg, 1);
1009                 /* Clean llog from records marked as CM_SKIP.
1010                    CM_EXCLUDE records are used for "active" command
1011                    and can be restored if needed */
1012                 if ((marker->cm_flags & (CM_SKIP | CM_START)) ==
1013                     (CM_SKIP | CM_START)) {
1014                         mrd->state = REPLACE_SKIP;
1015                         return 1;
1016                 }
1017
1018                 if ((marker->cm_flags & (CM_SKIP | CM_END)) ==
1019                     (CM_SKIP | CM_END)) {
1020                         mrd->state = REPLACE_COPY;
1021                         return 1;
1022                 }
1023
1024                 if (strcmp(mrd->target.mti_svname, marker->cm_tgtname) == 0) {
1025                         LASSERT(!(marker->cm_flags & CM_START) ||
1026                                 !(marker->cm_flags & CM_END));
1027                         if (marker->cm_flags & CM_START) {
1028                                 mrd->state = REPLACE_UUID;
1029                                 mrd->failover = NULL;
1030                         } else if (marker->cm_flags & CM_END)
1031                                 mrd->state = REPLACE_COPY;
1032                 }
1033         }
1034
1035         return 0;
1036 }
1037
1038 static int record_base(const struct lu_env *env, struct llog_handle *llh,
1039                      char *cfgname, lnet_nid_t nid, int cmd,
1040                      char *s1, char *s2, char *s3, char *s4)
1041 {
1042         struct mgs_thread_info  *mgi = mgs_env_info(env);
1043         struct llog_cfg_rec     *lcr;
1044         int rc;
1045
1046         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
1047                cmd, s1, s2, s3, s4);
1048
1049         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
1050         if (s1)
1051                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
1052         if (s2)
1053                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
1054         if (s3)
1055                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
1056         if (s4)
1057                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
1058
1059         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
1060         if (lcr == NULL)
1061                 return -ENOMEM;
1062
1063         lcr->lcr_cfg.lcfg_nid = nid;
1064         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1065
1066         lustre_cfg_rec_free(lcr);
1067
1068         if (rc < 0)
1069                 CDEBUG(D_MGS,
1070                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
1071                        cfgname, cmd, s1, s2, s3, s4, rc);
1072         return rc;
1073 }
1074
1075 static inline int record_add_uuid(const struct lu_env *env,
1076                                   struct llog_handle *llh,
1077                                   uint64_t nid, char *uuid)
1078 {
1079         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
1080                            NULL, NULL, NULL);
1081 }
1082
1083 static inline int record_add_conn(const struct lu_env *env,
1084                                   struct llog_handle *llh,
1085                                   char *devname, char *uuid)
1086 {
1087         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
1088                            NULL, NULL, NULL);
1089 }
1090
1091 static inline int record_attach(const struct lu_env *env,
1092                                 struct llog_handle *llh, char *devname,
1093                                 char *type, char *uuid)
1094 {
1095         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
1096                            NULL, NULL);
1097 }
1098
1099 static inline int record_setup(const struct lu_env *env,
1100                                struct llog_handle *llh, char *devname,
1101                                char *s1, char *s2, char *s3, char *s4)
1102 {
1103         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
1104 }
1105
1106 /**
1107  * \retval <0 record processing error
1108  * \retval n record is processed. No need copy original one.
1109  * \retval 0 record is not processed.
1110  */
1111 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
1112                            struct mgs_replace_data *mrd)
1113 {
1114         int nids_added = 0;
1115         lnet_nid_t nid;
1116         char *ptr;
1117         int rc = 0;
1118
1119         if (mrd->state == REPLACE_UUID &&
1120             lcfg->lcfg_command == LCFG_ADD_UUID) {
1121                 /* LCFG_ADD_UUID command found. Let's skip original command
1122                    and add passed nids */
1123                 ptr = mrd->target.mti_params;
1124                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1125                         if (!mrd->nodeuuid) {
1126                                 rc = name_create(&mrd->nodeuuid,
1127                                                  libcfs_nid2str(nid), "");
1128                                 if (rc) {
1129                                         CERROR("Can't create uuid for "
1130                                                 "nid  %s, device %s\n",
1131                                                 libcfs_nid2str(nid),
1132                                                 mrd->target.mti_svname);
1133                                         return rc;
1134                                 }
1135                         }
1136                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
1137                                "device %s\n", libcfs_nid2str(nid),
1138                                 mrd->target.mti_params,
1139                                 mrd->nodeuuid);
1140                         rc = record_add_uuid(env,
1141                                              mrd->temp_llh, nid,
1142                                              mrd->nodeuuid);
1143                         if (!rc)
1144                                 nids_added++;
1145
1146                         if (*ptr == ':') {
1147                                 mrd->failover = ptr;
1148                                 break;
1149                         }
1150                 }
1151
1152                 if (nids_added == 0) {
1153                         CERROR("No new nids were added, nid %s with uuid %s, "
1154                                "device %s\n", libcfs_nid2str(nid),
1155                                mrd->nodeuuid ? mrd->nodeuuid : "NULL",
1156                                mrd->target.mti_svname);
1157                         name_destroy(&mrd->nodeuuid);
1158                         return -ENXIO;
1159                 } else {
1160                         mrd->state = REPLACE_SETUP;
1161                 }
1162
1163                 return nids_added;
1164         }
1165
1166         if (mrd->state == REPLACE_SETUP && lcfg->lcfg_command == LCFG_SETUP) {
1167                 /* LCFG_SETUP command found. UUID should be changed */
1168                 rc = record_setup(env,
1169                                   mrd->temp_llh,
1170                                   /* devname the same */
1171                                   lustre_cfg_string(lcfg, 0),
1172                                   /* s1 is not changed */
1173                                   lustre_cfg_string(lcfg, 1),
1174                                   mrd->nodeuuid,
1175                                   /* s3 is not changed */
1176                                   lustre_cfg_string(lcfg, 3),
1177                                   /* s4 is not changed */
1178                                   lustre_cfg_string(lcfg, 4));
1179
1180                 name_destroy(&mrd->nodeuuid);
1181                 if (rc)
1182                         return rc;
1183
1184                 if (mrd->failover) {
1185                         ptr = mrd->failover;
1186                         while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1187                                 if (mrd->nodeuuid == NULL) {
1188                                         rc =  name_create(&mrd->nodeuuid,
1189                                                           libcfs_nid2str(nid),
1190                                                           "");
1191                                         if (rc)
1192                                                 return rc;
1193                                 }
1194
1195                                 CDEBUG(D_MGS, "add nid %s for failover %s\n",
1196                                        libcfs_nid2str(nid), mrd->nodeuuid);
1197                                 rc = record_add_uuid(env, mrd->temp_llh, nid,
1198                                                      mrd->nodeuuid);
1199                                 if (rc) {
1200                                         name_destroy(&mrd->nodeuuid);
1201                                         return rc;
1202                                 }
1203                                 if (*ptr == ':') {
1204                                         rc = record_add_conn(env,
1205                                                 mrd->temp_llh,
1206                                                 lustre_cfg_string(lcfg, 0),
1207                                                 mrd->nodeuuid);
1208                                         name_destroy(&mrd->nodeuuid);
1209                                         if (rc)
1210                                                 return rc;
1211                                 }
1212                         }
1213                         if (mrd->nodeuuid) {
1214                                 rc = record_add_conn(env, mrd->temp_llh,
1215                                                      lustre_cfg_string(lcfg, 0),
1216                                                      mrd->nodeuuid);
1217                                 name_destroy(&mrd->nodeuuid);
1218                                 if (rc)
1219                                         return rc;
1220                         }
1221                 }
1222                 mrd->state = REPLACE_DONE;
1223                 return rc ? rc : 1;
1224         }
1225
1226         /* Another commands in target device block */
1227         return 0;
1228 }
1229
1230 /**
1231  * Handler that called for every record in llog.
1232  * Records are processed in order they placed in llog.
1233  *
1234  * \param[in] llh       log to be processed
1235  * \param[in] rec       current record
1236  * \param[in] data      mgs_replace_data structure
1237  *
1238  * \retval 0    success
1239  */
1240 static int mgs_replace_nids_handler(const struct lu_env *env,
1241                                     struct llog_handle *llh,
1242                                     struct llog_rec_hdr *rec,
1243                                     void *data)
1244 {
1245         struct mgs_replace_data *mrd;
1246         struct lustre_cfg *lcfg = REC_DATA(rec);
1247         int cfg_len = REC_DATA_LEN(rec);
1248         int rc;
1249         ENTRY;
1250
1251         mrd = (struct mgs_replace_data *)data;
1252
1253         if (rec->lrh_type != OBD_CFG_REC) {
1254                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
1255                        rec->lrh_type, lcfg->lcfg_command,
1256                        lustre_cfg_string(lcfg, 0),
1257                        lustre_cfg_string(lcfg, 1));
1258                 RETURN(-EINVAL);
1259         }
1260
1261         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1262         if (rc) {
1263                 /* Do not copy any invalidated records */
1264                 GOTO(skip_out, rc = 0);
1265         }
1266
1267         rc = check_markers(lcfg, mrd);
1268         if (rc || mrd->state == REPLACE_SKIP)
1269                 GOTO(skip_out, rc = 0);
1270
1271         /* Write to new log all commands outside target device block */
1272         if (mrd->state == REPLACE_COPY)
1273                 GOTO(copy_out, rc = 0);
1274
1275         if (mrd->state == REPLACE_DONE &&
1276             (lcfg->lcfg_command == LCFG_ADD_UUID ||
1277              lcfg->lcfg_command == LCFG_ADD_CONN)) {
1278                 if (!mrd->failover)
1279                         CWARN("Previous failover is deleted, but new one is "
1280                               "not set. This means you configure system "
1281                               "without failover or passed wrong replace_nids "
1282                               "command parameters. Device %s, passed nids %s\n",
1283                               mrd->target.mti_svname, mrd->target.mti_params);
1284                 GOTO(skip_out, rc = 0);
1285         }
1286
1287         rc = process_command(env, lcfg, mrd);
1288         if (rc < 0)
1289                 RETURN(rc);
1290
1291         if (rc)
1292                 RETURN(0);
1293 copy_out:
1294         /* Record is placed in temporary llog as is */
1295         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1296
1297         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1298                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1299                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1300         RETURN(rc);
1301
1302 skip_out:
1303         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1304                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1305                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1306         RETURN(rc);
1307 }
1308
1309 static int mgs_log_is_empty(const struct lu_env *env,
1310                             struct mgs_device *mgs, char *name)
1311 {
1312         struct llog_ctxt        *ctxt;
1313         int                      rc;
1314
1315         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1316         LASSERT(ctxt != NULL);
1317
1318         rc = llog_is_empty(env, ctxt, name);
1319         llog_ctxt_put(ctxt);
1320         return rc;
1321 }
1322
1323 static int mgs_replace_log(const struct lu_env *env,
1324                            struct obd_device *mgs,
1325                            char *logname, char *devname,
1326                            llog_cb_t replace_handler, void *data)
1327 {
1328         struct llog_handle *orig_llh, *backup_llh;
1329         struct llog_ctxt *ctxt;
1330         struct mgs_replace_data *mrd;
1331         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1332         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1333         char *backup;
1334         int rc, rc2, buf_size;
1335         time64_t now;
1336         ENTRY;
1337
1338         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1339         LASSERT(ctxt != NULL);
1340
1341         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1342                 /* Log is empty. Nothing to replace */
1343                 GOTO(out_put, rc = 0);
1344         }
1345
1346         now = ktime_get_real_seconds();
1347
1348         /* max time64_t in decimal fits into 20 bytes long string */
1349         buf_size = strlen(logname) + 1 + 20 + 1 + strlen(".bak") + 1;
1350         OBD_ALLOC(backup, buf_size);
1351         if (backup == NULL)
1352                 GOTO(out_put, rc = -ENOMEM);
1353
1354         snprintf(backup, buf_size, "%s.%llu.bak", logname, now);
1355
1356         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1357         if (rc == 0) {
1358                 /* Now erase original log file. Connections are not allowed.
1359                    Backup is already saved */
1360                 rc = llog_erase(env, ctxt, NULL, logname);
1361                 if (rc < 0)
1362                         GOTO(out_free, rc);
1363         } else if (rc != -ENOENT) {
1364                 CERROR("%s: can't make backup for %s: rc = %d\n",
1365                        mgs->obd_name, logname, rc);
1366                 GOTO(out_free,rc);
1367         }
1368
1369         /* open local log */
1370         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1371         if (rc)
1372                 GOTO(out_restore, rc);
1373
1374         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1375         if (rc)
1376                 GOTO(out_closel, rc);
1377
1378         /* open backup llog */
1379         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1380                        LLOG_OPEN_EXISTS);
1381         if (rc)
1382                 GOTO(out_closel, rc);
1383
1384         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1385         if (rc)
1386                 GOTO(out_close, rc);
1387
1388         if (llog_get_size(backup_llh) <= 1)
1389                 GOTO(out_close, rc = 0);
1390
1391         OBD_ALLOC_PTR(mrd);
1392         if (!mrd)
1393                 GOTO(out_close, rc = -ENOMEM);
1394         /* devname is only needed information to replace UUID records */
1395         if (devname)
1396                 strlcpy(mrd->target.mti_svname, devname,
1397                         sizeof(mrd->target.mti_svname));
1398         /* data is parsed in llog callback */
1399         if (data)
1400                 strlcpy(mrd->target.mti_params, data,
1401                         sizeof(mrd->target.mti_params));
1402         /* Copy records to this temporary llog */
1403         mrd->temp_llh = orig_llh;
1404
1405         rc = llog_process(env, backup_llh, replace_handler,
1406                           (void *)mrd, NULL);
1407         OBD_FREE_PTR(mrd);
1408 out_close:
1409         rc2 = llog_close(NULL, backup_llh);
1410         if (!rc)
1411                 rc = rc2;
1412 out_closel:
1413         rc2 = llog_close(NULL, orig_llh);
1414         if (!rc)
1415                 rc = rc2;
1416
1417 out_restore:
1418         if (rc) {
1419                 CERROR("%s: llog should be restored: rc = %d\n",
1420                        mgs->obd_name, rc);
1421                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1422                                   logname);
1423                 if (rc2 < 0)
1424                         CERROR("%s: can't restore backup %s: rc = %d\n",
1425                                mgs->obd_name, logname, rc2);
1426         }
1427
1428 out_free:
1429         OBD_FREE(backup, buf_size);
1430
1431 out_put:
1432         llog_ctxt_put(ctxt);
1433
1434         if (rc)
1435                 CERROR("%s: failed to replace log %s: rc = %d\n",
1436                        mgs->obd_name, logname, rc);
1437
1438         RETURN(rc);
1439 }
1440
1441 static int mgs_replace_nids_log(const struct lu_env *env,
1442                                 struct obd_device *obd,
1443                                 char *logname, char *devname, char *nids)
1444 {
1445         CDEBUG(D_MGS, "Replace NIDs for %s in %s\n", devname, logname);
1446         return mgs_replace_log(env, obd, logname, devname,
1447                                mgs_replace_nids_handler, nids);
1448 }
1449
1450 /**
1451  * Parse device name and get file system name and/or device index
1452  *
1453  * @devname     device name (ex. lustre-MDT0000)
1454  * @fsname      file system name extracted from @devname and returned
1455  *              to the caller (optional)
1456  * @index       device index extracted from @devname and returned to
1457  *              the caller (optional)
1458  *
1459  * RETURN       0                       success if we are only interested in
1460  *                                      extracting fsname from devname.
1461  *                                      i.e index is NULL
1462  *
1463  *              LDD_F_SV_TYPE_*         Besides extracting the fsname the
1464  *                                      user also wants the index. Report to
1465  *                                      the user the type of obd device the
1466  *                                      returned index belongs too.
1467  *
1468  *              -EINVAL                 The obd device name is improper so
1469  *                                      fsname could not be extracted.
1470  *
1471  *              -ENXIO                  Failed to extract the index out of
1472  *                                      the obd device name. Most likely an
1473  *                                      invalid obd device name
1474  */
1475 static int mgs_parse_devname(char *devname, char *fsname, u32 *index)
1476 {
1477         int rc = 0;
1478         ENTRY;
1479
1480         /* Extract fsname */
1481         if (fsname) {
1482                 rc = server_name2fsname(devname, fsname, NULL);
1483                 if (rc < 0) {
1484                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1485                                devname);
1486                         RETURN(-EINVAL);
1487                 }
1488         }
1489
1490         if (index) {
1491                 rc = server_name2index(devname, index, NULL);
1492                 if (rc < 0) {
1493                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1494                                devname);
1495                         RETURN(-ENXIO);
1496                 }
1497         }
1498
1499         /* server_name2index can return LDD_F_SV_TYPE_* so always return rc */
1500         RETURN(rc);
1501 }
1502
1503 /* This is only called during replace_nids */
1504 static int only_mgs_is_running(struct obd_device *mgs_obd)
1505 {
1506         /* TDB: Is global variable with devices count exists? */
1507         int num_devices = get_devices_count();
1508         int num_exports = 0;
1509         struct obd_export *exp;
1510
1511         spin_lock(&mgs_obd->obd_dev_lock);
1512         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1513                 /* skip self export */
1514                 if (exp == mgs_obd->obd_self_export)
1515                         continue;
1516
1517                 ++num_exports;
1518
1519                 if (num_exports > 1)
1520                         CERROR("%s: node %s still connected during replace_nids connect_flags:%llx\n",
1521                                mgs_obd->obd_name,
1522                                libcfs_nid2str(exp->exp_nid_stats->nid),
1523                                exp_connect_flags(exp));
1524         }
1525         spin_unlock(&mgs_obd->obd_dev_lock);
1526
1527         /* osd, MGS and MGC + MGC export (nosvc starts MGC)
1528          *  (wc -l /proc/fs/lustre/devices <= 3) && (non self exports == 1)
1529          */
1530         return (num_devices <= 3) && (num_exports <= 1);
1531 }
1532
1533 static int name_create_mdt(char **logname, char *fsname, int mdt_idx)
1534 {
1535         char postfix[9];
1536
1537         if (mdt_idx > INDEX_MAP_MAX_VALUE)
1538                 return -E2BIG;
1539
1540         snprintf(postfix, sizeof(postfix), "-MDT%04x", mdt_idx);
1541         return name_create(logname, fsname, postfix);
1542 }
1543
1544 /**
1545  * Replace nids for \a device to \a nids values
1546  *
1547  * \param obd           MGS obd device
1548  * \param devname       nids need to be replaced for this device
1549  * (ex. lustre-OST0000)
1550  * \param nids          nids list (ex. nid1,nid2,nid3)
1551  *
1552  * \retval 0    success
1553  */
1554 int mgs_replace_nids(const struct lu_env *env,
1555                      struct mgs_device *mgs,
1556                      char *devname, char *nids)
1557 {
1558         /* Assume fsname is part of device name */
1559         char fsname[MTI_NAME_MAXLEN];
1560         int rc;
1561         __u32 index;
1562         char *logname;
1563         struct fs_db *fsdb = NULL;
1564         unsigned int i;
1565         int conn_state;
1566         struct obd_device *mgs_obd = mgs->mgs_obd;
1567         ENTRY;
1568
1569         /* We can only change NIDs if no other nodes are connected */
1570         spin_lock(&mgs_obd->obd_dev_lock);
1571         conn_state = mgs_obd->obd_no_conn;
1572         mgs_obd->obd_no_conn = 1;
1573         spin_unlock(&mgs_obd->obd_dev_lock);
1574
1575         /* We can not change nids if not only MGS is started */
1576         if (!only_mgs_is_running(mgs_obd)) {
1577                 CERROR("Only MGS is allowed to be started\n");
1578                 GOTO(out, rc = -EINPROGRESS);
1579         }
1580
1581         /* Get fsname and index */
1582         rc = mgs_parse_devname(devname, fsname, &index);
1583         if (rc < 0)
1584                 GOTO(out, rc);
1585
1586         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1587         if (rc) {
1588                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1589                 GOTO(out, rc);
1590         }
1591
1592         /* Process client llogs */
1593         name_create(&logname, fsname, "-client");
1594         rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1595         name_destroy(&logname);
1596         if (rc) {
1597                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1598                        fsname, devname, rc);
1599                 GOTO(out, rc);
1600         }
1601
1602         /* Process MDT llogs */
1603         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1604                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1605                         continue;
1606                 name_create_mdt(&logname, fsname, i);
1607                 rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1608                 name_destroy(&logname);
1609                 if (rc)
1610                         GOTO(out, rc);
1611         }
1612
1613 out:
1614         spin_lock(&mgs_obd->obd_dev_lock);
1615         mgs_obd->obd_no_conn = conn_state;
1616         spin_unlock(&mgs_obd->obd_dev_lock);
1617
1618         if (fsdb)
1619                 mgs_put_fsdb(mgs, fsdb);
1620
1621         RETURN(rc);
1622 }
1623
1624 /**
1625  * This is called for every record in llog. Some of records are
1626  * skipped, others are copied to new log as is.
1627  * Records to be skipped are
1628  *  marker records marked SKIP
1629  *  records enclosed between SKIP markers
1630  *
1631  * \param[in] llh       log to be processed
1632  * \param[in] rec       current record
1633  * \param[in] data      mgs_replace_data structure
1634  *
1635  * \retval 0    success
1636  **/
1637 static int mgs_clear_config_handler(const struct lu_env *env,
1638                                     struct llog_handle *llh,
1639                                     struct llog_rec_hdr *rec, void *data)
1640 {
1641         struct mgs_replace_data *mrd;
1642         struct lustre_cfg *lcfg = REC_DATA(rec);
1643         int cfg_len = REC_DATA_LEN(rec);
1644         int rc;
1645
1646         ENTRY;
1647
1648         mrd = (struct mgs_replace_data *)data;
1649
1650         if (rec->lrh_type != OBD_CFG_REC) {
1651                 CDEBUG(D_MGS, "Config llog Name=%s, Record Index=%u, "
1652                        "Unhandled Record Type=%#x\n", llh->lgh_name,
1653                        rec->lrh_index, rec->lrh_type);
1654                 RETURN(-EINVAL);
1655         }
1656
1657         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1658         if (rc) {
1659                 CDEBUG(D_MGS, "Config llog Name=%s, Invalid config file.",
1660                        llh->lgh_name);
1661                 RETURN(-EINVAL);
1662         }
1663
1664         if (lcfg->lcfg_command == LCFG_MARKER) {
1665                 struct cfg_marker *marker;
1666
1667                 marker = lustre_cfg_buf(lcfg, 1);
1668                 if (marker->cm_flags & CM_SKIP) {
1669                         if (marker->cm_flags & CM_START)
1670                                 mrd->state = REPLACE_SKIP;
1671                         if (marker->cm_flags & CM_END)
1672                                 mrd->state = REPLACE_COPY;
1673                         /* SKIP section started or finished */
1674                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1675                                "cmd %x %s %s\n", rec->lrh_index, rc,
1676                                rec->lrh_len, lcfg->lcfg_command,
1677                                lustre_cfg_string(lcfg, 0),
1678                                lustre_cfg_string(lcfg, 1));
1679                         RETURN(0);
1680                 }
1681         } else {
1682                 if (mrd->state == REPLACE_SKIP) {
1683                         /* record enclosed between SKIP markers, skip it */
1684                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1685                                "cmd %x %s %s\n", rec->lrh_index, rc,
1686                                rec->lrh_len, lcfg->lcfg_command,
1687                                lustre_cfg_string(lcfg, 0),
1688                                lustre_cfg_string(lcfg, 1));
1689                         RETURN(0);
1690                 }
1691         }
1692
1693         /* Record is placed in temporary llog as is */
1694         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1695
1696         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1697                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1698                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1699         RETURN(rc);
1700 }
1701
1702 /*
1703  * Directory CONFIGS/ may contain files which are not config logs to
1704  * be cleared. Skip any llogs with a non-alphanumeric character after
1705  * the last '-'. For example, fsname-MDT0000.sav, fsname-MDT0000.bak,
1706  * fsname-MDT0000.orig, fsname-MDT0000~, fsname-MDT0000.20150516, etc.
1707  */
1708 static bool config_to_clear(const char *logname)
1709 {
1710         int i;
1711         char *str;
1712
1713         str = strrchr(logname, '-');
1714         if (!str)
1715                 return 0;
1716
1717         i = 0;
1718         while (isalnum(str[++i]));
1719         return str[i] == '\0';
1720 }
1721
1722 /**
1723  * Clear config logs for \a name
1724  *
1725  * \param env
1726  * \param mgs           MGS device
1727  * \param name          name of device or of filesystem
1728  *                      (ex. lustre-OST0000 or lustre) in later case all logs
1729  *                      will be cleared
1730  *
1731  * \retval 0            success
1732  */
1733 int mgs_clear_configs(const struct lu_env *env,
1734                      struct mgs_device *mgs, const char *name)
1735 {
1736         struct list_head dentry_list;
1737         struct mgs_direntry *dirent, *n;
1738         char *namedash;
1739         int conn_state;
1740         struct obd_device *mgs_obd = mgs->mgs_obd;
1741         int rc;
1742
1743         ENTRY;
1744
1745         /* Prevent clients and servers from connecting to mgs */
1746         spin_lock(&mgs_obd->obd_dev_lock);
1747         conn_state = mgs_obd->obd_no_conn;
1748         mgs_obd->obd_no_conn = 1;
1749         spin_unlock(&mgs_obd->obd_dev_lock);
1750
1751         /*
1752          * config logs cannot be cleaned if anything other than
1753          * MGS is started
1754          */
1755         if (!only_mgs_is_running(mgs_obd)) {
1756                 CERROR("Only MGS is allowed to be started\n");
1757                 GOTO(out, rc = -EBUSY);
1758         }
1759
1760         /* Find all the logs in the CONFIGS directory */
1761         rc = class_dentry_readdir(env, mgs, &dentry_list);
1762         if (rc) {
1763                 CERROR("%s: cannot read config directory '%s': rc = %d\n",
1764                        mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
1765                 GOTO(out, rc);
1766         }
1767
1768         if (list_empty(&dentry_list)) {
1769                 CERROR("%s: list empty reading config dir '%s': rc = %d\n",
1770                         mgs_obd->obd_name, MOUNT_CONFIGS_DIR, -ENOENT);
1771                 GOTO(out, rc = -ENOENT);
1772         }
1773
1774         OBD_ALLOC(namedash, strlen(name) + 2);
1775         if (namedash == NULL)
1776                 GOTO(out, rc = -ENOMEM);
1777         snprintf(namedash, strlen(name) + 2, "%s-", name);
1778
1779         list_for_each_entry(dirent, &dentry_list, mde_list) {
1780                 if (strcmp(name, dirent->mde_name) &&
1781                     strncmp(namedash, dirent->mde_name, strlen(namedash)))
1782                         continue;
1783                 if (!config_to_clear(dirent->mde_name))
1784                         continue;
1785                 CDEBUG(D_MGS, "%s: Clear config log %s\n",
1786                        mgs_obd->obd_name, dirent->mde_name);
1787                 rc = mgs_replace_log(env, mgs_obd, dirent->mde_name, NULL,
1788                                      mgs_clear_config_handler, NULL);
1789                 if (rc)
1790                         break;
1791         }
1792
1793         list_for_each_entry_safe(dirent, n, &dentry_list, mde_list) {
1794                 list_del_init(&dirent->mde_list);
1795                 mgs_direntry_free(dirent);
1796         }
1797         OBD_FREE(namedash, strlen(name) + 2);
1798 out:
1799         spin_lock(&mgs_obd->obd_dev_lock);
1800         mgs_obd->obd_no_conn = conn_state;
1801         spin_unlock(&mgs_obd->obd_dev_lock);
1802
1803         RETURN(rc);
1804 }
1805
1806 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1807                             char *devname, struct lov_desc *desc)
1808 {
1809         struct mgs_thread_info  *mgi = mgs_env_info(env);
1810         struct llog_cfg_rec     *lcr;
1811         int rc;
1812
1813         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1814         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1815         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1816         if (lcr == NULL)
1817                 return -ENOMEM;
1818
1819         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1820         lustre_cfg_rec_free(lcr);
1821         return rc;
1822 }
1823
1824 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1825                             char *devname, struct lmv_desc *desc)
1826 {
1827         struct mgs_thread_info  *mgi = mgs_env_info(env);
1828         struct llog_cfg_rec     *lcr;
1829         int rc;
1830
1831         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1832         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1833         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1834         if (lcr == NULL)
1835                 return -ENOMEM;
1836
1837         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1838         lustre_cfg_rec_free(lcr);
1839         return rc;
1840 }
1841
1842 static inline int record_mdc_add(const struct lu_env *env,
1843                                  struct llog_handle *llh,
1844                                  char *logname, char *mdcuuid,
1845                                  char *mdtuuid, char *index,
1846                                  char *gen)
1847 {
1848         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1849                            mdtuuid,index,gen,mdcuuid);
1850 }
1851
1852 static inline int record_lov_add(const struct lu_env *env,
1853                                  struct llog_handle *llh,
1854                                  char *lov_name, char *ost_uuid,
1855                                  char *index, char *gen)
1856 {
1857         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1858                            ost_uuid, index, gen, NULL);
1859 }
1860
1861 static inline int record_mount_opt(const struct lu_env *env,
1862                                    struct llog_handle *llh,
1863                                    char *profile, char *lov_name,
1864                                    char *mdc_name)
1865 {
1866         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1867                            profile, lov_name, mdc_name, NULL);
1868 }
1869
1870 static int record_marker(const struct lu_env *env,
1871                          struct llog_handle *llh,
1872                          struct fs_db *fsdb, __u32 flags,
1873                          char *tgtname, char *comment)
1874 {
1875         struct mgs_thread_info *mgi = mgs_env_info(env);
1876         struct llog_cfg_rec *lcr;
1877         int rc;
1878         int cplen = 0;
1879
1880         if (flags & CM_START)
1881                 fsdb->fsdb_gen++;
1882         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1883         mgi->mgi_marker.cm_flags = flags;
1884         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1885         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1886                         sizeof(mgi->mgi_marker.cm_tgtname));
1887         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1888                 return -E2BIG;
1889         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1890                         sizeof(mgi->mgi_marker.cm_comment));
1891         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1892                 return -E2BIG;
1893         mgi->mgi_marker.cm_createtime = ktime_get_real_seconds();
1894         mgi->mgi_marker.cm_canceltime = 0;
1895         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1896         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1897                             sizeof(mgi->mgi_marker));
1898         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1899         if (lcr == NULL)
1900                 return -ENOMEM;
1901
1902         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1903         lustre_cfg_rec_free(lcr);
1904         return rc;
1905 }
1906
1907 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1908                             struct llog_handle **llh, char *name)
1909 {
1910         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1911         struct llog_ctxt        *ctxt;
1912         int                      rc = 0;
1913         ENTRY;
1914
1915         if (*llh)
1916                 GOTO(out, rc = -EBUSY);
1917
1918         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1919         if (!ctxt)
1920                 GOTO(out, rc = -ENODEV);
1921         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1922
1923         rc = llog_open_create(env, ctxt, llh, NULL, name);
1924         if (rc)
1925                 GOTO(out_ctxt, rc);
1926         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1927         if (rc)
1928                 llog_close(env, *llh);
1929 out_ctxt:
1930         llog_ctxt_put(ctxt);
1931 out:
1932         if (rc) {
1933                 CERROR("%s: can't start log %s: rc = %d\n",
1934                        mgs->mgs_obd->obd_name, name, rc);
1935                 *llh = NULL;
1936         }
1937         RETURN(rc);
1938 }
1939
1940 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1941 {
1942         int rc;
1943
1944         rc = llog_close(env, *llh);
1945         *llh = NULL;
1946
1947         return rc;
1948 }
1949
1950 /******************** config "macros" *********************/
1951
1952 /* write an lcfg directly into a log (with markers) */
1953 static int mgs_write_log_direct(const struct lu_env *env,
1954                                 struct mgs_device *mgs, struct fs_db *fsdb,
1955                                 char *logname, struct llog_cfg_rec *lcr,
1956                                 char *devname, char *comment)
1957 {
1958         struct llog_handle *llh = NULL;
1959         int rc;
1960
1961         ENTRY;
1962
1963         rc = record_start_log(env, mgs, &llh, logname);
1964         if (rc)
1965                 RETURN(rc);
1966
1967         /* FIXME These should be a single journal transaction */
1968         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1969         if (rc)
1970                 GOTO(out_end, rc);
1971         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1972         if (rc)
1973                 GOTO(out_end, rc);
1974         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1975         if (rc)
1976                 GOTO(out_end, rc);
1977 out_end:
1978         record_end_log(env, &llh);
1979         RETURN(rc);
1980 }
1981
1982 /* write the lcfg in all logs for the given fs */
1983 static int mgs_write_log_direct_all(const struct lu_env *env,
1984                                     struct mgs_device *mgs,
1985                                     struct fs_db *fsdb,
1986                                     struct mgs_target_info *mti,
1987                                     struct llog_cfg_rec *lcr, char *devname,
1988                                     char *comment, int server_only)
1989 {
1990         struct list_head         log_list;
1991         struct mgs_direntry     *dirent, *n;
1992         char                    *fsname = mti->mti_fsname;
1993         int                      rc = 0, len = strlen(fsname);
1994
1995         ENTRY;
1996         /* Find all the logs in the CONFIGS directory */
1997         rc = class_dentry_readdir(env, mgs, &log_list);
1998         if (rc)
1999                 RETURN(rc);
2000
2001         /* Could use fsdb index maps instead of directory listing */
2002         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
2003                 list_del_init(&dirent->mde_list);
2004                 /* don't write to sptlrpc rule log */
2005                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
2006                         goto next;
2007
2008                 /* caller wants write server logs only */
2009                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
2010                         goto next;
2011
2012                 if (strlen(dirent->mde_name) <= len ||
2013                     strncmp(fsname, dirent->mde_name, len) != 0 ||
2014                     dirent->mde_name[len] != '-')
2015                         goto next;
2016
2017                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
2018                 /* Erase any old settings of this same parameter */
2019                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
2020                                 devname, comment, CM_SKIP);
2021                 if (rc < 0)
2022                         CERROR("%s: Can't modify llog %s: rc = %d\n",
2023                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2024                 if (lcr == NULL)
2025                         goto next;
2026                 /* Write the new one */
2027                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
2028                                           lcr, devname, comment);
2029                 if (rc != 0)
2030                         CERROR("%s: writing log %s: rc = %d\n",
2031                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2032 next:
2033                 mgs_direntry_free(dirent);
2034         }
2035
2036         RETURN(rc);
2037 }
2038
2039 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2040                                     struct mgs_device *mgs,
2041                                     struct fs_db *fsdb,
2042                                     struct mgs_target_info *mti,
2043                                     int index, char *logname);
2044 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2045                                     struct mgs_device *mgs,
2046                                     struct fs_db *fsdb,
2047                                     struct mgs_target_info *mti,
2048                                     char *logname, char *suffix, char *lovname,
2049                                     enum lustre_sec_part sec_part, int flags);
2050 static int name_create_mdt_and_lov(char **logname, char **lovname,
2051                                    struct fs_db *fsdb, int i);
2052
2053 static int add_param(char *params, char *key, char *val)
2054 {
2055         char *start = params + strlen(params);
2056         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
2057         int keylen = 0;
2058
2059         if (key != NULL)
2060                 keylen = strlen(key);
2061         if (start + 1 + keylen + strlen(val) >= end) {
2062                 CERROR("params are too long: %s %s%s\n",
2063                        params, key != NULL ? key : "", val);
2064                 return -EINVAL;
2065         }
2066
2067         sprintf(start, " %s%s", key != NULL ? key : "", val);
2068         return 0;
2069 }
2070
2071 /**
2072  * Walk through client config log record and convert the related records
2073  * into the target.
2074  **/
2075 static int mgs_steal_client_llog_handler(const struct lu_env *env,
2076                                          struct llog_handle *llh,
2077                                          struct llog_rec_hdr *rec, void *data)
2078 {
2079         struct mgs_device *mgs;
2080         struct obd_device *obd;
2081         struct mgs_target_info *mti, *tmti;
2082         struct fs_db *fsdb;
2083         int cfg_len = rec->lrh_len;
2084         char *cfg_buf = (char*) (rec + 1);
2085         struct lustre_cfg *lcfg;
2086         int rc = 0;
2087         struct llog_handle *mdt_llh = NULL;
2088         static int got_an_osc_or_mdc = 0;
2089         /* 0: not found any osc/mdc;
2090            1: found osc;
2091            2: found mdc;
2092         */
2093         static int last_step = -1;
2094         int cplen = 0;
2095
2096         ENTRY;
2097
2098         mti = ((struct temp_comp*)data)->comp_mti;
2099         tmti = ((struct temp_comp*)data)->comp_tmti;
2100         fsdb = ((struct temp_comp*)data)->comp_fsdb;
2101         obd = ((struct temp_comp *)data)->comp_obd;
2102         mgs = lu2mgs_dev(obd->obd_lu_dev);
2103         LASSERT(mgs);
2104
2105         if (rec->lrh_type != OBD_CFG_REC) {
2106                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2107                 RETURN(-EINVAL);
2108         }
2109
2110         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
2111         if (rc) {
2112                 CERROR("Insane cfg\n");
2113                 RETURN(rc);
2114         }
2115
2116         lcfg = (struct lustre_cfg *)cfg_buf;
2117
2118         if (lcfg->lcfg_command == LCFG_MARKER) {
2119                 struct cfg_marker *marker;
2120                 marker = lustre_cfg_buf(lcfg, 1);
2121                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2122                     (marker->cm_flags & CM_START) &&
2123                      !(marker->cm_flags & CM_SKIP)) {
2124                         got_an_osc_or_mdc = 1;
2125                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
2126                                         sizeof(tmti->mti_svname));
2127                         if (cplen >= sizeof(tmti->mti_svname))
2128                                 RETURN(-E2BIG);
2129                         rc = record_start_log(env, mgs, &mdt_llh,
2130                                               mti->mti_svname);
2131                         if (rc)
2132                                 RETURN(rc);
2133                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
2134                                            mti->mti_svname, "add osc(copied)");
2135                         record_end_log(env, &mdt_llh);
2136                         last_step = marker->cm_step;
2137                         RETURN(rc);
2138                 }
2139                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2140                     (marker->cm_flags & CM_END) &&
2141                      !(marker->cm_flags & CM_SKIP)) {
2142                         LASSERT(last_step == marker->cm_step);
2143                         last_step = -1;
2144                         got_an_osc_or_mdc = 0;
2145                         memset(tmti, 0, sizeof(*tmti));
2146                         rc = record_start_log(env, mgs, &mdt_llh,
2147                                               mti->mti_svname);
2148                         if (rc)
2149                                 RETURN(rc);
2150                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
2151                                            mti->mti_svname, "add osc(copied)");
2152                         record_end_log(env, &mdt_llh);
2153                         RETURN(rc);
2154                 }
2155                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2156                     (marker->cm_flags & CM_START) &&
2157                      !(marker->cm_flags & CM_SKIP)) {
2158                         got_an_osc_or_mdc = 2;
2159                         last_step = marker->cm_step;
2160                         memcpy(tmti->mti_svname, marker->cm_tgtname,
2161                                strlen(marker->cm_tgtname));
2162
2163                         RETURN(rc);
2164                 }
2165                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2166                     (marker->cm_flags & CM_END) &&
2167                      !(marker->cm_flags & CM_SKIP)) {
2168                         LASSERT(last_step == marker->cm_step);
2169                         last_step = -1;
2170                         got_an_osc_or_mdc = 0;
2171                         memset(tmti, 0, sizeof(*tmti));
2172                         RETURN(rc);
2173                 }
2174         }
2175
2176         if (got_an_osc_or_mdc == 0 || last_step < 0)
2177                 RETURN(rc);
2178
2179         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
2180                 __u64 nodenid = lcfg->lcfg_nid;
2181
2182                 if (strlen(tmti->mti_uuid) == 0) {
2183                         /* target uuid not set, this config record is before
2184                          * LCFG_SETUP, this nid is one of target node nid.
2185                          */
2186                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
2187                         tmti->mti_nid_count++;
2188                 } else {
2189                         char nidstr[LNET_NIDSTR_SIZE];
2190
2191                         /* failover node nid */
2192                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
2193                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
2194                                         nidstr);
2195                 }
2196
2197                 RETURN(rc);
2198         }
2199
2200         if (lcfg->lcfg_command == LCFG_SETUP) {
2201                 char *target;
2202
2203                 target = lustre_cfg_string(lcfg, 1);
2204                 memcpy(tmti->mti_uuid, target, strlen(target));
2205                 RETURN(rc);
2206         }
2207
2208         /* ignore client side sptlrpc_conf_log */
2209         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
2210                 RETURN(rc);
2211
2212         if (lcfg->lcfg_command == LCFG_ADD_MDC &&
2213             strstr(lustre_cfg_string(lcfg, 0), "-clilmv") != NULL) {
2214                 int index;
2215
2216                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
2217                         RETURN (-EINVAL);
2218
2219                 memcpy(tmti->mti_fsname, mti->mti_fsname,
2220                        strlen(mti->mti_fsname));
2221                 tmti->mti_stripe_index = index;
2222
2223                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
2224                                               mti->mti_stripe_index,
2225                                               mti->mti_svname);
2226                 memset(tmti, 0, sizeof(*tmti));
2227                 RETURN(rc);
2228         }
2229
2230         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
2231                 int index;
2232                 char mdt_index[9];
2233                 char *logname, *lovname;
2234
2235                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2236                                              mti->mti_stripe_index);
2237                 if (rc)
2238                         RETURN(rc);
2239                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
2240
2241                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
2242                         name_destroy(&logname);
2243                         name_destroy(&lovname);
2244                         RETURN(-EINVAL);
2245                 }
2246
2247                 tmti->mti_stripe_index = index;
2248                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
2249                                          mdt_index, lovname,
2250                                          LUSTRE_SP_MDT, 0);
2251                 name_destroy(&logname);
2252                 name_destroy(&lovname);
2253                 RETURN(rc);
2254         }
2255         RETURN(rc);
2256 }
2257
2258 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
2259 /* stealed from mgs_get_fsdb_from_llog*/
2260 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
2261                                               struct mgs_device *mgs,
2262                                               char *client_name,
2263                                               struct temp_comp* comp)
2264 {
2265         struct llog_handle *loghandle;
2266         struct mgs_target_info *tmti;
2267         struct llog_ctxt *ctxt;
2268         int rc;
2269
2270         ENTRY;
2271
2272         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2273         LASSERT(ctxt != NULL);
2274
2275         OBD_ALLOC_PTR(tmti);
2276         if (tmti == NULL)
2277                 GOTO(out_ctxt, rc = -ENOMEM);
2278
2279         comp->comp_tmti = tmti;
2280         comp->comp_obd = mgs->mgs_obd;
2281
2282         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
2283                        LLOG_OPEN_EXISTS);
2284         if (rc < 0) {
2285                 if (rc == -ENOENT)
2286                         rc = 0;
2287                 GOTO(out_pop, rc);
2288         }
2289
2290         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
2291         if (rc)
2292                 GOTO(out_close, rc);
2293
2294         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
2295                                   (void *)comp, NULL, false);
2296         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
2297 out_close:
2298         llog_close(env, loghandle);
2299 out_pop:
2300         OBD_FREE_PTR(tmti);
2301 out_ctxt:
2302         llog_ctxt_put(ctxt);
2303         RETURN(rc);
2304 }
2305
2306 /* lmv is the second thing for client logs */
2307 /* copied from mgs_write_log_lov. Please refer to that.  */
2308 static int mgs_write_log_lmv(const struct lu_env *env,
2309                              struct mgs_device *mgs,
2310                              struct fs_db *fsdb,
2311                              struct mgs_target_info *mti,
2312                              char *logname, char *lmvname)
2313 {
2314         struct llog_handle *llh = NULL;
2315         struct lmv_desc *lmvdesc;
2316         char *uuid;
2317         int rc = 0;
2318         ENTRY;
2319
2320         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
2321
2322         OBD_ALLOC_PTR(lmvdesc);
2323         if (lmvdesc == NULL)
2324                 RETURN(-ENOMEM);
2325         lmvdesc->ld_active_tgt_count = 0;
2326         lmvdesc->ld_tgt_count = 0;
2327         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
2328         uuid = (char *)lmvdesc->ld_uuid.uuid;
2329
2330         rc = record_start_log(env, mgs, &llh, logname);
2331         if (rc)
2332                 GOTO(out_free, rc);
2333         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
2334         if (rc)
2335                 GOTO(out_end, rc);
2336         rc = record_attach(env, llh, lmvname, "lmv", uuid);
2337         if (rc)
2338                 GOTO(out_end, rc);
2339         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
2340         if (rc)
2341                 GOTO(out_end, rc);
2342         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
2343         if (rc)
2344                 GOTO(out_end, rc);
2345 out_end:
2346         record_end_log(env, &llh);
2347 out_free:
2348         OBD_FREE_PTR(lmvdesc);
2349         RETURN(rc);
2350 }
2351
2352 /* lov is the first thing in the mdt and client logs */
2353 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
2354                              struct fs_db *fsdb, struct mgs_target_info *mti,
2355                              char *logname, char *lovname)
2356 {
2357         struct llog_handle *llh = NULL;
2358         struct lov_desc *lovdesc;
2359         char *uuid;
2360         int rc = 0;
2361         ENTRY;
2362
2363         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
2364
2365         /*
2366         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
2367         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
2368               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
2369         */
2370
2371         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
2372         OBD_ALLOC_PTR(lovdesc);
2373         if (lovdesc == NULL)
2374                 RETURN(-ENOMEM);
2375         lovdesc->ld_magic = LOV_DESC_MAGIC;
2376         lovdesc->ld_tgt_count = 0;
2377         /* Defaults.  Can be changed later by lcfg config_param */
2378         lovdesc->ld_default_stripe_count = 1;
2379         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
2380         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
2381         lovdesc->ld_default_stripe_offset = -1;
2382         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
2383         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
2384         /* can these be the same? */
2385         uuid = (char *)lovdesc->ld_uuid.uuid;
2386
2387         /* This should always be the first entry in a log.
2388         rc = mgs_clear_log(obd, logname); */
2389         rc = record_start_log(env, mgs, &llh, logname);
2390         if (rc)
2391                 GOTO(out_free, rc);
2392         /* FIXME these should be a single journal transaction */
2393         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
2394         if (rc)
2395                 GOTO(out_end, rc);
2396         rc = record_attach(env, llh, lovname, "lov", uuid);
2397         if (rc)
2398                 GOTO(out_end, rc);
2399         rc = record_lov_setup(env, llh, lovname, lovdesc);
2400         if (rc)
2401                 GOTO(out_end, rc);
2402         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
2403         if (rc)
2404                 GOTO(out_end, rc);
2405         EXIT;
2406 out_end:
2407         record_end_log(env, &llh);
2408 out_free:
2409         OBD_FREE_PTR(lovdesc);
2410         return rc;
2411 }
2412
2413 /* add failnids to open log */
2414 static int mgs_write_log_failnids(const struct lu_env *env,
2415                                   struct mgs_target_info *mti,
2416                                   struct llog_handle *llh,
2417                                   char *cliname)
2418 {
2419         char *failnodeuuid = NULL;
2420         char *ptr = mti->mti_params;
2421         lnet_nid_t nid;
2422         int rc = 0;
2423
2424         /*
2425         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
2426         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
2427         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
2428         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
2429         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
2430         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
2431         */
2432
2433         /*
2434          * Pull failnid info out of params string, which may contain something
2435          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
2436          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
2437          * etc.  However, convert_hostnames() should have caught those.
2438          */
2439         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2440                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2441                         char nidstr[LNET_NIDSTR_SIZE];
2442
2443                         if (failnodeuuid == NULL) {
2444                                 /* We don't know the failover node name,
2445                                  * so just use the first nid as the uuid */
2446                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
2447                                 rc = name_create(&failnodeuuid, nidstr, "");
2448                                 if (rc != 0)
2449                                         return rc;
2450                         }
2451                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2452                                 "client %s\n",
2453                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
2454                                 failnodeuuid, cliname);
2455                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2456                         /*
2457                          * If *ptr is ':', we have added all NIDs for
2458                          * failnodeuuid.
2459                          */
2460                         if (*ptr == ':') {
2461                                 rc = record_add_conn(env, llh, cliname,
2462                                                      failnodeuuid);
2463                                 name_destroy(&failnodeuuid);
2464                                 failnodeuuid = NULL;
2465                         }
2466                 }
2467                 if (failnodeuuid) {
2468                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2469                         name_destroy(&failnodeuuid);
2470                         failnodeuuid = NULL;
2471                 }
2472         }
2473
2474         return rc;
2475 }
2476
2477 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2478                                     struct mgs_device *mgs,
2479                                     struct fs_db *fsdb,
2480                                     struct mgs_target_info *mti,
2481                                     char *logname, char *lmvname)
2482 {
2483         struct llog_handle *llh = NULL;
2484         char *mdcname = NULL;
2485         char *nodeuuid = NULL;
2486         char *mdcuuid = NULL;
2487         char *lmvuuid = NULL;
2488         char index[6];
2489         char nidstr[LNET_NIDSTR_SIZE];
2490         int i, rc;
2491         ENTRY;
2492
2493         if (mgs_log_is_empty(env, mgs, logname)) {
2494                 CERROR("log is empty! Logical error\n");
2495                 RETURN(-EINVAL);
2496         }
2497
2498         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2499                mti->mti_svname, logname, lmvname);
2500
2501         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2502         rc = name_create(&nodeuuid, nidstr, "");
2503         if (rc)
2504                 RETURN(rc);
2505         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2506         if (rc)
2507                 GOTO(out_free, rc);
2508         rc = name_create(&mdcuuid, mdcname, "_UUID");
2509         if (rc)
2510                 GOTO(out_free, rc);
2511         rc = name_create(&lmvuuid, lmvname, "_UUID");
2512         if (rc)
2513                 GOTO(out_free, rc);
2514
2515         rc = record_start_log(env, mgs, &llh, logname);
2516         if (rc)
2517                 GOTO(out_free, rc);
2518         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2519                            "add mdc");
2520         if (rc)
2521                 GOTO(out_end, rc);
2522         for (i = 0; i < mti->mti_nid_count; i++) {
2523                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2524                         libcfs_nid2str_r(mti->mti_nids[i],
2525                                          nidstr, sizeof(nidstr)));
2526
2527                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2528                 if (rc)
2529                         GOTO(out_end, rc);
2530         }
2531
2532         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2533         if (rc)
2534                 GOTO(out_end, rc);
2535         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2536                           NULL, NULL);
2537         if (rc)
2538                 GOTO(out_end, rc);
2539         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2540         if (rc)
2541                 GOTO(out_end, rc);
2542         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2543         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2544                             index, "1");
2545         if (rc)
2546                 GOTO(out_end, rc);
2547         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2548                            "add mdc");
2549         if (rc)
2550                 GOTO(out_end, rc);
2551 out_end:
2552         record_end_log(env, &llh);
2553 out_free:
2554         name_destroy(&lmvuuid);
2555         name_destroy(&mdcuuid);
2556         name_destroy(&mdcname);
2557         name_destroy(&nodeuuid);
2558         RETURN(rc);
2559 }
2560
2561 static inline int name_create_lov(char **lovname, char *mdtname,
2562                                   struct fs_db *fsdb, int index)
2563 {
2564         /* COMPAT_180 */
2565         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2566                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2567         else
2568                 return name_create(lovname, mdtname, "-mdtlov");
2569 }
2570
2571 static int name_create_mdt_and_lov(char **logname, char **lovname,
2572                                    struct fs_db *fsdb, int i)
2573 {
2574         int rc;
2575
2576         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2577         if (rc)
2578                 return rc;
2579         /* COMPAT_180 */
2580         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2581                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2582         else
2583                 rc = name_create(lovname, *logname, "-mdtlov");
2584         if (rc) {
2585                 name_destroy(logname);
2586                 *logname = NULL;
2587         }
2588         return rc;
2589 }
2590
2591 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2592                                       struct fs_db *fsdb, int i)
2593 {
2594         char suffix[16];
2595
2596         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2597                 sprintf(suffix, "-osc");
2598         else
2599                 sprintf(suffix, "-osc-MDT%04x", i);
2600         return name_create(oscname, ostname, suffix);
2601 }
2602
2603 /* add new mdc to already existent MDS */
2604 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2605                                     struct mgs_device *mgs,
2606                                     struct fs_db *fsdb,
2607                                     struct mgs_target_info *mti,
2608                                     int mdt_index, char *logname)
2609 {
2610         struct llog_handle      *llh = NULL;
2611         char    *nodeuuid = NULL;
2612         char    *ospname = NULL;
2613         char    *lovuuid = NULL;
2614         char    *mdtuuid = NULL;
2615         char    *svname = NULL;
2616         char    *mdtname = NULL;
2617         char    *lovname = NULL;
2618         char    index_str[16];
2619         char    nidstr[LNET_NIDSTR_SIZE];
2620         int     i, rc;
2621
2622         ENTRY;
2623         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2624                 CERROR("log is empty! Logical error\n");
2625                 RETURN (-EINVAL);
2626         }
2627
2628         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2629                logname);
2630
2631         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2632         if (rc)
2633                 RETURN(rc);
2634
2635         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2636         rc = name_create(&nodeuuid, nidstr, "");
2637         if (rc)
2638                 GOTO(out_destory, rc);
2639
2640         rc = name_create(&svname, mdtname, "-osp");
2641         if (rc)
2642                 GOTO(out_destory, rc);
2643
2644         sprintf(index_str, "-MDT%04x", mdt_index);
2645         rc = name_create(&ospname, svname, index_str);
2646         if (rc)
2647                 GOTO(out_destory, rc);
2648
2649         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2650         if (rc)
2651                 GOTO(out_destory, rc);
2652
2653         rc = name_create(&lovuuid, lovname, "_UUID");
2654         if (rc)
2655                 GOTO(out_destory, rc);
2656
2657         rc = name_create(&mdtuuid, mdtname, "_UUID");
2658         if (rc)
2659                 GOTO(out_destory, rc);
2660
2661         rc = record_start_log(env, mgs, &llh, logname);
2662         if (rc)
2663                 GOTO(out_destory, rc);
2664
2665         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2666                            "add osp");
2667         if (rc)
2668                 GOTO(out_destory, rc);
2669
2670         for (i = 0; i < mti->mti_nid_count; i++) {
2671                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2672                         libcfs_nid2str_r(mti->mti_nids[i],
2673                                          nidstr, sizeof(nidstr)));
2674                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2675                 if (rc)
2676                         GOTO(out_end, rc);
2677         }
2678
2679         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2680         if (rc)
2681                 GOTO(out_end, rc);
2682
2683         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2684                           NULL, NULL);
2685         if (rc)
2686                 GOTO(out_end, rc);
2687
2688         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2689         if (rc)
2690                 GOTO(out_end, rc);
2691
2692         /* Add mdc(osp) to lod */
2693         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2694         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2695                          index_str, "1", NULL);
2696         if (rc)
2697                 GOTO(out_end, rc);
2698
2699         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2700         if (rc)
2701                 GOTO(out_end, rc);
2702
2703 out_end:
2704         record_end_log(env, &llh);
2705
2706 out_destory:
2707         name_destroy(&mdtuuid);
2708         name_destroy(&lovuuid);
2709         name_destroy(&lovname);
2710         name_destroy(&ospname);
2711         name_destroy(&svname);
2712         name_destroy(&nodeuuid);
2713         name_destroy(&mdtname);
2714         RETURN(rc);
2715 }
2716
2717 static int mgs_write_log_mdt0(const struct lu_env *env,
2718                               struct mgs_device *mgs,
2719                               struct fs_db *fsdb,
2720                               struct mgs_target_info *mti)
2721 {
2722         char *log = mti->mti_svname;
2723         struct llog_handle *llh = NULL;
2724         char *uuid, *lovname;
2725         char mdt_index[6];
2726         char *ptr = mti->mti_params;
2727         int rc = 0, failout = 0;
2728         ENTRY;
2729
2730         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2731         if (uuid == NULL)
2732                 RETURN(-ENOMEM);
2733
2734         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2735                 failout = (strncmp(ptr, "failout", 7) == 0);
2736
2737         rc = name_create(&lovname, log, "-mdtlov");
2738         if (rc)
2739                 GOTO(out_free, rc);
2740         if (mgs_log_is_empty(env, mgs, log)) {
2741                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2742                 if (rc)
2743                         GOTO(out_lod, rc);
2744         }
2745
2746         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2747
2748         rc = record_start_log(env, mgs, &llh, log);
2749         if (rc)
2750                 GOTO(out_lod, rc);
2751
2752         /* add MDT itself */
2753
2754         /* FIXME this whole fn should be a single journal transaction */
2755         sprintf(uuid, "%s_UUID", log);
2756         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2757         if (rc)
2758                 GOTO(out_lod, rc);
2759         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2760         if (rc)
2761                 GOTO(out_end, rc);
2762         rc = record_mount_opt(env, llh, log, lovname, NULL);
2763         if (rc)
2764                 GOTO(out_end, rc);
2765         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2766                         failout ? "n" : "f");
2767         if (rc)
2768                 GOTO(out_end, rc);
2769         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2770         if (rc)
2771                 GOTO(out_end, rc);
2772 out_end:
2773         record_end_log(env, &llh);
2774 out_lod:
2775         name_destroy(&lovname);
2776 out_free:
2777         OBD_FREE(uuid, sizeof(struct obd_uuid));
2778         RETURN(rc);
2779 }
2780
2781 /* envelope method for all layers log */
2782 static int mgs_write_log_mdt(const struct lu_env *env,
2783                              struct mgs_device *mgs,
2784                              struct fs_db *fsdb,
2785                              struct mgs_target_info *mti)
2786 {
2787         struct mgs_thread_info *mgi = mgs_env_info(env);
2788         struct llog_handle *llh = NULL;
2789         char *cliname;
2790         int rc, i = 0;
2791         ENTRY;
2792
2793         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2794
2795         if (mti->mti_uuid[0] == '\0') {
2796                 /* Make up our own uuid */
2797                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2798                          "%s_UUID", mti->mti_svname);
2799         }
2800
2801         /* add mdt */
2802         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2803         if (rc)
2804                 RETURN(rc);
2805         /* Append the mdt info to the client log */
2806         rc = name_create(&cliname, mti->mti_fsname, "-client");
2807         if (rc)
2808                 RETURN(rc);
2809
2810         if (mgs_log_is_empty(env, mgs, cliname)) {
2811                 /* Start client log */
2812                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2813                                        fsdb->fsdb_clilov);
2814                 if (rc)
2815                         GOTO(out_free, rc);
2816                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2817                                        fsdb->fsdb_clilmv);
2818                 if (rc)
2819                         GOTO(out_free, rc);
2820         }
2821
2822         /*
2823         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2824         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2825         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2826         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2827         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2828         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2829         */
2830
2831         /* copy client info about lov/lmv */
2832         mgi->mgi_comp.comp_mti = mti;
2833         mgi->mgi_comp.comp_fsdb = fsdb;
2834
2835         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2836                                                 &mgi->mgi_comp);
2837         if (rc)
2838                 GOTO(out_free, rc);
2839         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2840                                       fsdb->fsdb_clilmv);
2841         if (rc)
2842                 GOTO(out_free, rc);
2843
2844         /* add mountopts */
2845         rc = record_start_log(env, mgs, &llh, cliname);
2846         if (rc)
2847                 GOTO(out_free, rc);
2848
2849         rc = record_marker(env, llh, fsdb, CM_START, cliname, "mount opts");
2850         if (rc)
2851                 GOTO(out_end, rc);
2852         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2853                               fsdb->fsdb_clilmv);
2854         if (rc)
2855                 GOTO(out_end, rc);
2856         rc = record_marker(env, llh, fsdb, CM_END, cliname, "mount opts");
2857         if (rc)
2858                 GOTO(out_end, rc);
2859
2860         /* for_all_existing_mdt except current one */
2861         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2862                 if (i !=  mti->mti_stripe_index &&
2863                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2864                         char *logname;
2865
2866                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2867                         if (rc)
2868                                 GOTO(out_end, rc);
2869
2870                         /* NB: If the log for the MDT is empty, it means
2871                          * the MDT is only added to the index
2872                          * map, and not being process yet, i.e. this
2873                          * is an unregistered MDT, see mgs_write_log_target().
2874                          * so we should skip it. Otherwise
2875                          *
2876                          * 1. MGS get register request for MDT1 and MDT2.
2877                          *
2878                          * 2. Then both MDT1 and MDT2 are added into
2879                          * fsdb_mdt_index_map. (see mgs_set_index()).
2880                          *
2881                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2882                          * generate the config log, here, it will regard MDT2
2883                          * as an existent MDT, and generate "add osp" for
2884                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2885                          * MDT0002 config log is still empty, so it will
2886                          * add "add osp" even before "lov setup", which
2887                          * will definitly cause trouble.
2888                          *
2889                          * 4. MDT1 registeration finished, fsdb_mutex is
2890                          * released, then MDT2 get in, then in above
2891                          * mgs_steal_llog_for_mdt_from_client(), it will
2892                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2893                          * which will cause another trouble.*/
2894                         if (!mgs_log_is_empty(env, mgs, logname))
2895                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2896                                                               mti, i, logname);
2897
2898                         name_destroy(&logname);
2899                         if (rc)
2900                                 GOTO(out_end, rc);
2901                 }
2902         }
2903 out_end:
2904         record_end_log(env, &llh);
2905 out_free:
2906         name_destroy(&cliname);
2907         RETURN(rc);
2908 }
2909
2910 /* Add the ost info to the client/mdt lov */
2911 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2912                                     struct mgs_device *mgs, struct fs_db *fsdb,
2913                                     struct mgs_target_info *mti,
2914                                     char *logname, char *suffix, char *lovname,
2915                                     enum lustre_sec_part sec_part, int flags)
2916 {
2917         struct llog_handle *llh = NULL;
2918         char *nodeuuid = NULL;
2919         char *oscname = NULL;
2920         char *oscuuid = NULL;
2921         char *lovuuid = NULL;
2922         char *svname = NULL;
2923         char index[6];
2924         char nidstr[LNET_NIDSTR_SIZE];
2925         int i, rc;
2926         ENTRY;
2927
2928         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2929                 mti->mti_svname, logname);
2930
2931         if (mgs_log_is_empty(env, mgs, logname)) {
2932                 CERROR("log is empty! Logical error\n");
2933                 RETURN(-EINVAL);
2934         }
2935
2936         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2937         rc = name_create(&nodeuuid, nidstr, "");
2938         if (rc)
2939                 RETURN(rc);
2940         rc = name_create(&svname, mti->mti_svname, "-osc");
2941         if (rc)
2942                 GOTO(out_free, rc);
2943
2944         /* for the system upgraded from old 1.8, keep using the old osc naming
2945          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2946         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2947                 rc = name_create(&oscname, svname, "");
2948         else
2949                 rc = name_create(&oscname, svname, suffix);
2950         if (rc)
2951                 GOTO(out_free, rc);
2952
2953         rc = name_create(&oscuuid, oscname, "_UUID");
2954         if (rc)
2955                 GOTO(out_free, rc);
2956         rc = name_create(&lovuuid, lovname, "_UUID");
2957         if (rc)
2958                 GOTO(out_free, rc);
2959
2960
2961         /*
2962         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2963         multihomed (#4)
2964         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2965         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2966         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2967         failover (#6,7)
2968         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2969         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2970         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2971         */
2972
2973         rc = record_start_log(env, mgs, &llh, logname);
2974         if (rc)
2975                 GOTO(out_free, rc);
2976
2977         /* FIXME these should be a single journal transaction */
2978         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2979                            "add osc");
2980         if (rc)
2981                 GOTO(out_end, rc);
2982
2983         /* NB: don't change record order, because upon MDT steal OSC config
2984          * from client, it treats all nids before LCFG_SETUP as target nids
2985          * (multiple interfaces), while nids after as failover node nids.
2986          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2987          */
2988         for (i = 0; i < mti->mti_nid_count; i++) {
2989                 CDEBUG(D_MGS, "add nid %s\n",
2990                         libcfs_nid2str_r(mti->mti_nids[i],
2991                                          nidstr, sizeof(nidstr)));
2992                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2993                 if (rc)
2994                         GOTO(out_end, rc);
2995         }
2996         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2997         if (rc)
2998                 GOTO(out_end, rc);
2999         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
3000                           NULL, NULL);
3001         if (rc)
3002                 GOTO(out_end, rc);
3003         rc = mgs_write_log_failnids(env, mti, llh, oscname);
3004         if (rc)
3005                 GOTO(out_end, rc);
3006
3007         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
3008
3009         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
3010         if (rc)
3011                 GOTO(out_end, rc);
3012         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
3013                            "add osc");
3014         if (rc)
3015                 GOTO(out_end, rc);
3016 out_end:
3017         record_end_log(env, &llh);
3018 out_free:
3019         name_destroy(&lovuuid);
3020         name_destroy(&oscuuid);
3021         name_destroy(&oscname);
3022         name_destroy(&svname);
3023         name_destroy(&nodeuuid);
3024         RETURN(rc);
3025 }
3026
3027 static int mgs_write_log_ost(const struct lu_env *env,
3028                              struct mgs_device *mgs, struct fs_db *fsdb,
3029                              struct mgs_target_info *mti)
3030 {
3031         struct llog_handle *llh = NULL;
3032         char *logname, *lovname;
3033         char *ptr = mti->mti_params;
3034         int rc, flags = 0, failout = 0, i;
3035         ENTRY;
3036
3037         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
3038
3039         /* The ost startup log */
3040
3041         /* If the ost log already exists, that means that someone reformatted
3042            the ost and it called target_add again. */
3043         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3044                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
3045                                    "exists, yet the server claims it never "
3046                                    "registered. It may have been reformatted, "
3047                                    "or the index changed. writeconf the MDT to "
3048                                    "regenerate all logs.\n", mti->mti_svname);
3049                 RETURN(-EALREADY);
3050         }
3051
3052         /*
3053         attach obdfilter ost1 ost1_UUID
3054         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
3055         */
3056         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
3057                 failout = (strncmp(ptr, "failout", 7) == 0);
3058         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
3059         if (rc)
3060                 RETURN(rc);
3061         /* FIXME these should be a single journal transaction */
3062         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
3063         if (rc)
3064                 GOTO(out_end, rc);
3065         if (*mti->mti_uuid == '\0')
3066                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
3067                          "%s_UUID", mti->mti_svname);
3068         rc = record_attach(env, llh, mti->mti_svname,
3069                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
3070         if (rc)
3071                 GOTO(out_end, rc);
3072         rc = record_setup(env, llh, mti->mti_svname,
3073                           "dev"/*ignored*/, "type"/*ignored*/,
3074                           failout ? "n" : "f", NULL/*options*/);
3075         if (rc)
3076                 GOTO(out_end, rc);
3077         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
3078         if (rc)
3079                 GOTO(out_end, rc);
3080 out_end:
3081         record_end_log(env, &llh);
3082         if (rc)
3083                 RETURN(rc);
3084         /* We also have to update the other logs where this osc is part of
3085            the lov */
3086
3087         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3088                 /* If we're upgrading, the old mdt log already has our
3089                    entry. Let's do a fake one for fun. */
3090                 /* Note that we can't add any new failnids, since we don't
3091                    know the old osc names. */
3092                 flags = CM_SKIP | CM_UPGRADE146;
3093
3094         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
3095                 /* If the update flag isn't set, don't update client/mdt
3096                    logs. */
3097                 flags |= CM_SKIP;
3098                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
3099                               "the MDT first to regenerate it.\n",
3100                               mti->mti_svname);
3101         }
3102
3103         /* Add ost to all MDT lov defs */
3104         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3105                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
3106                         char mdt_index[13];
3107
3108                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
3109                                                      i);
3110                         if (rc)
3111                                 RETURN(rc);
3112
3113                         snprintf(mdt_index, sizeof(mdt_index), "-MDT%04x", i);
3114                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
3115                                                       logname, mdt_index,
3116                                                       lovname, LUSTRE_SP_MDT,
3117                                                       flags);
3118                         name_destroy(&logname);
3119                         name_destroy(&lovname);
3120                         if (rc)
3121                                 RETURN(rc);
3122                 }
3123         }
3124
3125         /* Append ost info to the client log */
3126         rc = name_create(&logname, mti->mti_fsname, "-client");
3127         if (rc)
3128                 RETURN(rc);
3129         if (mgs_log_is_empty(env, mgs, logname)) {
3130                 /* Start client log */
3131                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
3132                                        fsdb->fsdb_clilov);
3133                 if (rc)
3134                         GOTO(out_free, rc);
3135                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
3136                                        fsdb->fsdb_clilmv);
3137                 if (rc)
3138                         GOTO(out_free, rc);
3139         }
3140         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
3141                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
3142 out_free:
3143         name_destroy(&logname);
3144         RETURN(rc);
3145 }
3146
3147 static __inline__ int mgs_param_empty(char *ptr)
3148 {
3149         char *tmp;
3150
3151         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
3152                 return 1;
3153         return 0;
3154 }
3155
3156 static int mgs_write_log_failnid_internal(const struct lu_env *env,
3157                                           struct mgs_device *mgs,
3158                                           struct fs_db *fsdb,
3159                                           struct mgs_target_info *mti,
3160                                           char *logname, char *cliname)
3161 {
3162         int rc;
3163         struct llog_handle *llh = NULL;
3164
3165         if (mgs_param_empty(mti->mti_params)) {
3166                 /* Remove _all_ failnids */
3167                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3168                                 mti->mti_svname, "add failnid", CM_SKIP);
3169                 return rc < 0 ? rc : 0;
3170         }
3171
3172         /* Otherwise failover nids are additive */
3173         rc = record_start_log(env, mgs, &llh, logname);
3174         if (rc)
3175                 return rc;
3176                 /* FIXME this should be a single journal transaction */
3177         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
3178                            "add failnid");
3179         if (rc)
3180                 goto out_end;
3181         rc = mgs_write_log_failnids(env, mti, llh, cliname);
3182         if (rc)
3183                 goto out_end;
3184         rc = record_marker(env, llh, fsdb, CM_END,
3185                            mti->mti_svname, "add failnid");
3186 out_end:
3187         record_end_log(env, &llh);
3188         return rc;
3189 }
3190
3191
3192 /* Add additional failnids to an existing log.
3193    The mdc/osc must have been added to logs first */
3194 /* tcp nids must be in dotted-quad ascii -
3195    we can't resolve hostnames from the kernel. */
3196 static int mgs_write_log_add_failnid(const struct lu_env *env,
3197                                      struct mgs_device *mgs,
3198                                      struct fs_db *fsdb,
3199                                      struct mgs_target_info *mti)
3200 {
3201         char *logname, *cliname;
3202         int rc;
3203         ENTRY;
3204
3205         /* FIXME we currently can't erase the failnids
3206          * given when a target first registers, since they aren't part of
3207          * an "add uuid" stanza
3208          */
3209
3210         /* Verify that we know about this target */
3211         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3212                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
3213                                    "yet. It must be started before failnids "
3214                                    "can be added.\n", mti->mti_svname);
3215                 RETURN(-ENOENT);
3216         }
3217
3218         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
3219         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3220                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
3221         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3222                 rc = name_create(&cliname, mti->mti_svname, "-osc");
3223         } else {
3224                 RETURN(-EINVAL);
3225         }
3226         if (rc)
3227                 RETURN(rc);
3228
3229         /* Add failover nids to the client log */
3230         rc = name_create(&logname, mti->mti_fsname, "-client");
3231         if (rc) {
3232                 name_destroy(&cliname);
3233                 RETURN(rc);
3234         }
3235
3236         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
3237         name_destroy(&logname);
3238         name_destroy(&cliname);
3239         if (rc)
3240                 RETURN(rc);
3241
3242         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3243                 /* Add OST failover nids to the MDT logs as well */
3244                 int i;
3245
3246                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3247                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3248                                 continue;
3249                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3250                         if (rc)
3251                                 RETURN(rc);
3252                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
3253                                                  fsdb, i);
3254                         if (rc) {
3255                                 name_destroy(&logname);
3256                                 RETURN(rc);
3257                         }
3258                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
3259                                                             mti, logname,
3260                                                             cliname);
3261                         name_destroy(&cliname);
3262                         name_destroy(&logname);
3263                         if (rc)
3264                                 RETURN(rc);
3265                 }
3266         }
3267
3268         RETURN(rc);
3269 }
3270
3271 static int mgs_wlp_lcfg(const struct lu_env *env,
3272                         struct mgs_device *mgs, struct fs_db *fsdb,
3273                         struct mgs_target_info *mti,
3274                         char *logname, struct lustre_cfg_bufs *bufs,
3275                         char *tgtname, char *ptr)
3276 {
3277         char comment[MTI_NAME_MAXLEN];
3278         char *tmp;
3279         struct llog_cfg_rec *lcr;
3280         int rc, del;
3281
3282         /* Erase any old settings of this same parameter */
3283         memcpy(comment, ptr, MTI_NAME_MAXLEN);
3284         comment[MTI_NAME_MAXLEN - 1] = 0;
3285         /* But don't try to match the value. */
3286         tmp = strchr(comment, '=');
3287         if (tmp != NULL)
3288                 *tmp = 0;
3289         /* FIXME we should skip settings that are the same as old values */
3290         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
3291         if (rc < 0)
3292                 return rc;
3293         del = mgs_param_empty(ptr);
3294
3295         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
3296                       "Setting" : "Modifying", tgtname, comment, logname);
3297         if (del) {
3298                 /* mgs_modify() will return 1 if nothing had to be done */
3299                 if (rc == 1)
3300                         rc = 0;
3301                 return rc;
3302         }
3303
3304         lustre_cfg_bufs_reset(bufs, tgtname);
3305         lustre_cfg_bufs_set_string(bufs, 1, ptr);
3306         if (mti->mti_flags & LDD_F_PARAM2)
3307                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
3308
3309         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
3310                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
3311         if (lcr == NULL)
3312                 return -ENOMEM;
3313
3314         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
3315                                   comment);
3316         lustre_cfg_rec_free(lcr);
3317         return rc;
3318 }
3319
3320 /* write global variable settings into log */
3321 static int mgs_write_log_sys(const struct lu_env *env,
3322                              struct mgs_device *mgs, struct fs_db *fsdb,
3323                              struct mgs_target_info *mti, char *sys, char *ptr)
3324 {
3325         struct mgs_thread_info  *mgi = mgs_env_info(env);
3326         struct lustre_cfg       *lcfg;
3327         struct llog_cfg_rec     *lcr;
3328         char *tmp, sep;
3329         int rc, cmd, convert = 1;
3330
3331         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
3332                 cmd = LCFG_SET_TIMEOUT;
3333         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
3334                 cmd = LCFG_SET_LDLM_TIMEOUT;
3335         /* Check for known params here so we can return error to lctl */
3336         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
3337                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
3338                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
3339                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
3340                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
3341                 cmd = LCFG_PARAM;
3342         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
3343                 convert = 0; /* Don't convert string value to integer */
3344                 cmd = LCFG_PARAM;
3345         } else {
3346                 return -EINVAL;
3347         }
3348
3349         if (mgs_param_empty(ptr))
3350                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
3351         else
3352                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
3353
3354         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
3355         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
3356         if (!convert && *tmp != '\0')
3357                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
3358         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3359         if (lcr == NULL)
3360                 return -ENOMEM;
3361
3362         lcfg = &lcr->lcr_cfg;
3363         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
3364         /* truncate the comment to the parameter name */
3365         ptr = tmp - 1;
3366         sep = *ptr;
3367         *ptr = '\0';
3368         /* modify all servers and clients */
3369         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3370                                       *tmp == '\0' ? NULL : lcr,
3371                                       mti->mti_fsname, sys, 0);
3372         if (rc == 0 && *tmp != '\0') {
3373                 switch (cmd) {
3374                 case LCFG_SET_TIMEOUT:
3375                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
3376                                 class_process_config(lcfg);
3377                         break;
3378                 case LCFG_SET_LDLM_TIMEOUT:
3379                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
3380                                 class_process_config(lcfg);
3381                         break;
3382                 default:
3383                         break;
3384                 }
3385         }
3386         *ptr = sep;
3387         lustre_cfg_rec_free(lcr);
3388         return rc;
3389 }
3390
3391 /* write quota settings into log */
3392 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
3393                                struct fs_db *fsdb, struct mgs_target_info *mti,
3394                                char *quota, char *ptr)
3395 {
3396         struct mgs_thread_info  *mgi = mgs_env_info(env);
3397         struct llog_cfg_rec     *lcr;
3398         char                    *tmp;
3399         char                     sep;
3400         int                      rc, cmd = LCFG_PARAM;
3401
3402         /* support only 'meta' and 'data' pools so far */
3403         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
3404             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
3405                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
3406                        "& quota.ost are)\n", ptr);
3407                 return -EINVAL;
3408         }
3409
3410         if (*tmp == '\0') {
3411                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
3412         } else {
3413                 CDEBUG(D_MGS, "global '%s'\n", quota);
3414
3415                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
3416                     strchr(tmp, 'p') == NULL &&
3417                     strcmp(tmp, "none") != 0) {
3418                         CERROR("enable option(%s) isn't supported\n", tmp);
3419                         return -EINVAL;
3420                 }
3421         }
3422
3423         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
3424         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
3425         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3426         if (lcr == NULL)
3427                 return -ENOMEM;
3428
3429         /* truncate the comment to the parameter name */
3430         ptr = tmp - 1;
3431         sep = *ptr;
3432         *ptr = '\0';
3433
3434         /* XXX we duplicated quota enable information in all server
3435          *     config logs, it should be moved to a separate config
3436          *     log once we cleanup the config log for global param. */
3437         /* modify all servers */
3438         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3439                                       *tmp == '\0' ? NULL : lcr,
3440                                       mti->mti_fsname, quota, 1);
3441         *ptr = sep;
3442         lustre_cfg_rec_free(lcr);
3443         return rc < 0 ? rc : 0;
3444 }
3445
3446 static int mgs_srpc_set_param_disk(const struct lu_env *env,
3447                                    struct mgs_device *mgs,
3448                                    struct fs_db *fsdb,
3449                                    struct mgs_target_info *mti,
3450                                    char *param)
3451 {
3452         struct mgs_thread_info  *mgi = mgs_env_info(env);
3453         struct llog_cfg_rec     *lcr;
3454         struct llog_handle      *llh = NULL;
3455         char                    *logname;
3456         char                    *comment, *ptr;
3457         int                      rc, len;
3458
3459         ENTRY;
3460
3461         /* get comment */
3462         ptr = strchr(param, '=');
3463         LASSERT(ptr != NULL);
3464         len = ptr - param;
3465
3466         OBD_ALLOC(comment, len + 1);
3467         if (comment == NULL)
3468                 RETURN(-ENOMEM);
3469         strncpy(comment, param, len);
3470         comment[len] = '\0';
3471
3472         /* prepare lcfg */
3473         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
3474         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
3475         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
3476         if (lcr == NULL)
3477                 GOTO(out_comment, rc = -ENOMEM);
3478
3479         /* construct log name */
3480         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
3481         if (rc < 0)
3482                 GOTO(out_lcfg, rc);
3483
3484         if (mgs_log_is_empty(env, mgs, logname)) {
3485                 rc = record_start_log(env, mgs, &llh, logname);
3486                 if (rc < 0)
3487                         GOTO(out, rc);
3488                 record_end_log(env, &llh);
3489         }
3490
3491         /* obsolete old one */
3492         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
3493                         comment, CM_SKIP);
3494         if (rc < 0)
3495                 GOTO(out, rc);
3496         /* write the new one */
3497         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
3498                                   mti->mti_svname, comment);
3499         if (rc)
3500                 CERROR("%s: error writing log %s: rc = %d\n",
3501                        mgs->mgs_obd->obd_name, logname, rc);
3502 out:
3503         name_destroy(&logname);
3504 out_lcfg:
3505         lustre_cfg_rec_free(lcr);
3506 out_comment:
3507         OBD_FREE(comment, len + 1);
3508         RETURN(rc);
3509 }
3510
3511 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3512                                         char *param)
3513 {
3514         char    *ptr;
3515
3516         /* disable the adjustable udesc parameter for now, i.e. use default
3517          * setting that client always ship udesc to MDT if possible. to enable
3518          * it simply remove the following line */
3519         goto error_out;
3520
3521         ptr = strchr(param, '=');
3522         if (ptr == NULL)
3523                 goto error_out;
3524         *ptr++ = '\0';
3525
3526         if (strcmp(param, PARAM_SRPC_UDESC))
3527                 goto error_out;
3528
3529         if (strcmp(ptr, "yes") == 0) {
3530                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3531                 CWARN("Enable user descriptor shipping from client to MDT\n");
3532         } else if (strcmp(ptr, "no") == 0) {
3533                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3534                 CWARN("Disable user descriptor shipping from client to MDT\n");
3535         } else {
3536                 *(ptr - 1) = '=';
3537                 goto error_out;
3538         }
3539         return 0;
3540
3541 error_out:
3542         CERROR("Invalid param: %s\n", param);
3543         return -EINVAL;
3544 }
3545
3546 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3547                                   const char *svname,
3548                                   char *param)
3549 {
3550         struct sptlrpc_rule      rule;
3551         struct sptlrpc_rule_set *rset;
3552         int                      rc;
3553         ENTRY;
3554
3555         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3556                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3557                 RETURN(-EINVAL);
3558         }
3559
3560         if (strncmp(param, PARAM_SRPC_UDESC,
3561                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3562                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3563         }
3564
3565         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3566                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3567                 RETURN(-EINVAL);
3568         }
3569
3570         param += sizeof(PARAM_SRPC_FLVR) - 1;
3571
3572         rc = sptlrpc_parse_rule(param, &rule);
3573         if (rc)
3574                 RETURN(rc);
3575
3576         /* mgs rules implies must be mgc->mgs */
3577         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3578                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3579                      rule.sr_from != LUSTRE_SP_ANY) ||
3580                     (rule.sr_to != LUSTRE_SP_MGS &&
3581                      rule.sr_to != LUSTRE_SP_ANY))
3582                         RETURN(-EINVAL);
3583         }
3584
3585         /* preapre room for this coming rule. svcname format should be:
3586          * - fsname: general rule
3587          * - fsname-tgtname: target-specific rule
3588          */
3589         if (strchr(svname, '-')) {
3590                 struct mgs_tgt_srpc_conf *tgtconf;
3591                 int                       found = 0;
3592
3593                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3594                      tgtconf = tgtconf->mtsc_next) {
3595                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3596                                 found = 1;
3597                                 break;
3598                         }
3599                 }
3600
3601                 if (!found) {
3602                         int name_len;
3603
3604                         OBD_ALLOC_PTR(tgtconf);
3605                         if (tgtconf == NULL)
3606                                 RETURN(-ENOMEM);
3607
3608                         name_len = strlen(svname);
3609
3610                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3611                         if (tgtconf->mtsc_tgt == NULL) {
3612                                 OBD_FREE_PTR(tgtconf);
3613                                 RETURN(-ENOMEM);
3614                         }
3615                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3616
3617                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3618                         fsdb->fsdb_srpc_tgt = tgtconf;
3619                 }
3620
3621                 rset = &tgtconf->mtsc_rset;
3622         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3623                 /* put _mgs related srpc rule directly in mgs ruleset */
3624                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3625         } else {
3626                 rset = &fsdb->fsdb_srpc_gen;
3627         }
3628
3629         rc = sptlrpc_rule_set_merge(rset, &rule);
3630
3631         RETURN(rc);
3632 }
3633
3634 static int mgs_srpc_set_param(const struct lu_env *env,
3635                               struct mgs_device *mgs,
3636                               struct fs_db *fsdb,
3637                               struct mgs_target_info *mti,
3638                               char *param)
3639 {
3640         char                   *copy;
3641         int                     rc, copy_size;
3642         ENTRY;
3643
3644 #ifndef HAVE_GSS
3645         RETURN(-EINVAL);
3646 #endif
3647         /* keep a copy of original param, which could be destroied
3648          * during parsing */
3649         copy_size = strlen(param) + 1;
3650         OBD_ALLOC(copy, copy_size);
3651         if (copy == NULL)
3652                 return -ENOMEM;
3653         memcpy(copy, param, copy_size);
3654
3655         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3656         if (rc)
3657                 goto out_free;
3658
3659         /* previous steps guaranteed the syntax is correct */
3660         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3661         if (rc)
3662                 goto out_free;
3663
3664         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3665                 /*
3666                  * for mgs rules, make them effective immediately.
3667                  */
3668                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3669                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3670                                                  &fsdb->fsdb_srpc_gen);
3671         }
3672
3673 out_free:
3674         OBD_FREE(copy, copy_size);
3675         RETURN(rc);
3676 }
3677
3678 struct mgs_srpc_read_data {
3679         struct fs_db   *msrd_fsdb;
3680         int             msrd_skip;
3681 };
3682
3683 static int mgs_srpc_read_handler(const struct lu_env *env,
3684                                  struct llog_handle *llh,
3685                                  struct llog_rec_hdr *rec, void *data)
3686 {
3687         struct mgs_srpc_read_data *msrd = data;
3688         struct cfg_marker         *marker;
3689         struct lustre_cfg         *lcfg = REC_DATA(rec);
3690         char                      *svname, *param;
3691         int                        cfg_len, rc;
3692         ENTRY;
3693
3694         if (rec->lrh_type != OBD_CFG_REC) {
3695                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3696                 RETURN(-EINVAL);
3697         }
3698
3699         cfg_len = REC_DATA_LEN(rec);
3700
3701         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3702         if (rc) {
3703                 CERROR("Insane cfg\n");
3704                 RETURN(rc);
3705         }
3706
3707         if (lcfg->lcfg_command == LCFG_MARKER) {
3708                 marker = lustre_cfg_buf(lcfg, 1);
3709
3710                 if (marker->cm_flags & CM_START &&
3711                     marker->cm_flags & CM_SKIP)
3712                         msrd->msrd_skip = 1;
3713                 if (marker->cm_flags & CM_END)
3714                         msrd->msrd_skip = 0;
3715
3716                 RETURN(0);
3717         }
3718
3719         if (msrd->msrd_skip)
3720                 RETURN(0);
3721
3722         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3723                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3724                 RETURN(0);
3725         }
3726
3727         svname = lustre_cfg_string(lcfg, 0);
3728         if (svname == NULL) {
3729                 CERROR("svname is empty\n");
3730                 RETURN(0);
3731         }
3732
3733         param = lustre_cfg_string(lcfg, 1);
3734         if (param == NULL) {
3735                 CERROR("param is empty\n");
3736                 RETURN(0);
3737         }
3738
3739         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3740         if (rc)
3741                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3742
3743         RETURN(0);
3744 }
3745
3746 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3747                                 struct mgs_device *mgs,
3748                                 struct fs_db *fsdb)
3749 {
3750         struct llog_handle        *llh = NULL;
3751         struct llog_ctxt          *ctxt;
3752         char                      *logname;
3753         struct mgs_srpc_read_data  msrd;
3754         int                        rc;
3755         ENTRY;
3756
3757         /* construct log name */
3758         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3759         if (rc)
3760                 RETURN(rc);
3761
3762         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3763         LASSERT(ctxt != NULL);
3764
3765         if (mgs_log_is_empty(env, mgs, logname))
3766                 GOTO(out, rc = 0);
3767
3768         rc = llog_open(env, ctxt, &llh, NULL, logname,
3769                        LLOG_OPEN_EXISTS);
3770         if (rc < 0) {
3771                 if (rc == -ENOENT)
3772                         rc = 0;
3773                 GOTO(out, rc);
3774         }
3775
3776         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3777         if (rc)
3778                 GOTO(out_close, rc);
3779
3780         if (llog_get_size(llh) <= 1)
3781                 GOTO(out_close, rc = 0);
3782
3783         msrd.msrd_fsdb = fsdb;
3784         msrd.msrd_skip = 0;
3785
3786         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3787                           NULL);
3788
3789 out_close:
3790         llog_close(env, llh);
3791 out:
3792         llog_ctxt_put(ctxt);
3793         name_destroy(&logname);
3794
3795         if (rc)
3796                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3797         RETURN(rc);
3798 }
3799
3800 static int mgs_write_log_param2(const struct lu_env *env,
3801                                 struct mgs_device *mgs,
3802                                 struct fs_db *fsdb,
3803                                 struct mgs_target_info *mti, char *ptr)
3804 {
3805         struct lustre_cfg_bufs bufs;
3806         int rc;
3807
3808         ENTRY;
3809         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3810
3811         /* PARAM_MGSNODE and PARAM_NETWORK are set only when formating
3812          * or during the inital mount. It can never change after that.
3813          */
3814         if (!class_match_param(ptr, PARAM_MGSNODE, NULL) ||
3815             !class_match_param(ptr, PARAM_NETWORK, NULL)) {
3816                 rc = 0;
3817                 goto end;
3818         }
3819
3820         /* Processed in mgs_write_log_ost. Another value that can't
3821          * be changed by lctl set_param -P.
3822          */
3823         if (!class_match_param(ptr, PARAM_FAILMODE, NULL)) {
3824                 LCONSOLE_ERROR_MSG(0x169,
3825                                    "%s can only be changed with tunefs.lustre and --writeconf\n",
3826                                    ptr);
3827                 rc = -EPERM;
3828                 goto end;
3829         }
3830
3831         /* FIXME !!! Support for sptlrpc is incomplete. Currently the change
3832          * doesn't transmit to the client. See LU-7183.
3833          */
3834         if (!class_match_param(ptr, PARAM_SRPC, NULL)) {
3835                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3836                 goto end;
3837         }
3838
3839         /* Can't use class_match_param since ptr doesn't start with
3840          * PARAM_FAILNODE. So we look for PARAM_FAILNODE contained in ptr.
3841          */
3842         if (strstr(ptr, PARAM_FAILNODE)) {
3843                 /* Add a failover nidlist. We already processed failovers
3844                  * params for new targets in mgs_write_log_target.
3845                  */
3846                 const char *param;
3847
3848                 /* can't use wildcards with failover.node */
3849                 if (strchr(ptr, '*')) {
3850                         rc = -ENODEV;
3851                         goto end;
3852                 }
3853
3854                 param = strstr(ptr, PARAM_FAILNODE);
3855                 if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
3856                     sizeof(mti->mti_params)) {
3857                         rc = -E2BIG;
3858                         goto end;
3859                 }
3860
3861                 CDEBUG(D_MGS, "Adding failnode with param %s\n",
3862                        mti->mti_params);
3863                 rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3864                 goto end;
3865         }
3866
3867         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
3868                           mti->mti_svname, ptr);
3869 end:
3870         RETURN(rc);
3871 }
3872
3873 /* Permanent settings of all parameters by writing into the appropriate
3874  * configuration logs.
3875  * A parameter with null value ("<param>='\0'") means to erase it out of
3876  * the logs.
3877  */
3878 static int mgs_write_log_param(const struct lu_env *env,
3879                                struct mgs_device *mgs, struct fs_db *fsdb,
3880                                struct mgs_target_info *mti, char *ptr)
3881 {
3882         struct mgs_thread_info *mgi = mgs_env_info(env);
3883         char *logname;
3884         char *tmp;
3885         int rc = 0;
3886         ENTRY;
3887
3888         /* For various parameter settings, we have to figure out which logs
3889            care about them (e.g. both mdt and client for lov settings) */
3890         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3891
3892         /* The params are stored in MOUNT_DATA_FILE and modified via
3893            tunefs.lustre, or set using lctl conf_param */
3894
3895         /* Processed in lustre_start_mgc */
3896         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3897                 GOTO(end, rc);
3898
3899         /* Processed in ost/mdt */
3900         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3901                 GOTO(end, rc);
3902
3903         /* Processed in mgs_write_log_ost */
3904         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3905                 if (mti->mti_flags & LDD_F_PARAM) {
3906                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3907                                            "changed with tunefs.lustre"
3908                                            "and --writeconf\n", ptr);
3909                         rc = -EPERM;
3910                 }
3911                 GOTO(end, rc);
3912         }
3913
3914         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3915                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3916                 GOTO(end, rc);
3917         }
3918
3919         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3920                 /* Add a failover nidlist */
3921                 rc = 0;
3922                 /* We already processed failovers params for new
3923                    targets in mgs_write_log_target */
3924                 if (mti->mti_flags & LDD_F_PARAM) {
3925                         CDEBUG(D_MGS, "Adding failnode\n");
3926                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3927                 }
3928                 GOTO(end, rc);
3929         }
3930
3931         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3932                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3933                 GOTO(end, rc);
3934         }
3935
3936         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3937                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3938                 GOTO(end, rc);
3939         }
3940
3941         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3942             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3943                 /* active=0 means off, anything else means on */
3944                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3945                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3946                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3947                 int i;
3948
3949                 if (!deactive_osc) {
3950                         __u32   index;
3951
3952                         rc = server_name2index(mti->mti_svname, &index, NULL);
3953                         if (rc < 0)
3954                                 GOTO(end, rc);
3955
3956                         if (index == 0) {
3957                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3958                                                    " (de)activated.\n",
3959                                                    mti->mti_svname);
3960                                 GOTO(end, rc = -EPERM);
3961                         }
3962                 }
3963
3964                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3965                               flag ? "de" : "re", mti->mti_svname);
3966                 /* Modify clilov */
3967                 rc = name_create(&logname, mti->mti_fsname, "-client");
3968                 if (rc < 0)
3969                         GOTO(end, rc);
3970                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3971                                 mti->mti_svname,
3972                                 deactive_osc ? "add osc" : "add mdc", flag);
3973                 name_destroy(&logname);
3974                 if (rc < 0)
3975                         goto active_err;
3976
3977                 /* Modify mdtlov */
3978                 /* Add to all MDT logs for DNE */
3979                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3980                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3981                                 continue;
3982                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3983                         if (rc < 0)
3984                                 GOTO(end, rc);
3985                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3986                                         mti->mti_svname,
3987                                         deactive_osc ? "add osc" : "add osp",
3988                                         flag);
3989                         name_destroy(&logname);
3990                         if (rc < 0)
3991                                 goto active_err;
3992                 }
3993 active_err:
3994                 if (rc < 0) {
3995                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3996                                            "log (%d). No permanent "
3997                                            "changes were made to the "
3998                                            "config log.\n",
3999                                            mti->mti_svname, rc);
4000                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
4001                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
4002                                                    " because the log"
4003                                                    "is in the old 1.4"
4004                                                    "style. Consider "
4005                                                    " --writeconf to "
4006                                                    "update the logs.\n");
4007                         GOTO(end, rc);
4008                 }
4009                 /* Fall through to osc/mdc proc for deactivating live
4010                    OSC/OSP on running MDT / clients. */
4011         }
4012         /* Below here, let obd's XXX_process_config methods handle it */
4013
4014         /* All lov. in proc */
4015         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
4016                 char *mdtlovname;
4017
4018                 CDEBUG(D_MGS, "lov param %s\n", ptr);
4019                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
4020                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
4021                                            "set on the MDT, not %s. "
4022                                            "Ignoring.\n",
4023                                            mti->mti_svname);
4024                         GOTO(end, rc = 0);
4025                 }
4026
4027                 /* Modify mdtlov */
4028                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4029                         GOTO(end, rc = -ENODEV);
4030
4031                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
4032                                              mti->mti_stripe_index);
4033                 if (rc)
4034                         GOTO(end, rc);
4035                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4036                                   &mgi->mgi_bufs, mdtlovname, ptr);
4037                 name_destroy(&logname);
4038                 name_destroy(&mdtlovname);
4039                 if (rc)
4040                         GOTO(end, rc);
4041
4042                 /* Modify clilov */
4043                 rc = name_create(&logname, mti->mti_fsname, "-client");
4044                 if (rc)
4045                         GOTO(end, rc);
4046                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4047                                   fsdb->fsdb_clilov, ptr);
4048                 name_destroy(&logname);
4049                 GOTO(end, rc);
4050         }
4051
4052         /* All osc., mdc., llite. params in proc */
4053         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
4054             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
4055             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
4056                 char *cname;
4057
4058                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
4059                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
4060                                            " cannot be modified. Consider"
4061                                            " updating the configuration with"
4062                                            " --writeconf\n",
4063                                            mti->mti_svname);
4064                         GOTO(end, rc = -EINVAL);
4065                 }
4066                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
4067                         rc = name_create(&cname, mti->mti_fsname, "-client");
4068                         /* Add the client type to match the obdname in
4069                            class_config_llog_handler */
4070                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4071                         rc = name_create(&cname, mti->mti_svname, "-mdc");
4072                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4073                         rc = name_create(&cname, mti->mti_svname, "-osc");
4074                 } else {
4075                         GOTO(end, rc = -EINVAL);
4076                 }
4077                 if (rc)
4078                         GOTO(end, rc);
4079
4080                 /* Forbid direct update of llite root squash parameters.
4081                  * These parameters are indirectly set via the MDT settings.
4082                  * See (LU-1778) */
4083                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
4084                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4085                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4086                         LCONSOLE_ERROR("%s: root squash parameters can only "
4087                                 "be updated through MDT component\n",
4088                                 mti->mti_fsname);
4089                         name_destroy(&cname);
4090                         GOTO(end, rc = -EINVAL);
4091                 }
4092
4093                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4094
4095                 /* Modify client */
4096                 rc = name_create(&logname, mti->mti_fsname, "-client");
4097                 if (rc) {
4098                         name_destroy(&cname);
4099                         GOTO(end, rc);
4100                 }
4101                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4102                                   cname, ptr);
4103
4104                 /* osc params affect the MDT as well */
4105                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
4106                         int i;
4107
4108                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
4109                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4110                                         continue;
4111                                 name_destroy(&cname);
4112                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
4113                                                          fsdb, i);
4114                                 name_destroy(&logname);
4115                                 if (rc)
4116                                         break;
4117                                 rc = name_create_mdt(&logname,
4118                                                      mti->mti_fsname, i);
4119                                 if (rc)
4120                                         break;
4121                                 if (!mgs_log_is_empty(env, mgs, logname)) {
4122                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
4123                                                           mti, logname,
4124                                                           &mgi->mgi_bufs,
4125                                                           cname, ptr);
4126                                         if (rc)
4127                                                 break;
4128                                 }
4129                         }
4130                 }
4131
4132                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
4133                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
4134                     rc == 0) {
4135                         char suffix[16];
4136                         char *lodname = NULL;
4137                         char *param_str = NULL;
4138                         int i;
4139                         int index;
4140
4141                         /* replace mdc with osp */
4142                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
4143                         rc = server_name2index(mti->mti_svname, &index, NULL);
4144                         if (rc < 0) {
4145                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4146                                 GOTO(end, rc);
4147                         }
4148
4149                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4150                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4151                                         continue;
4152
4153                                 if (i == index)
4154                                         continue;
4155
4156                                 name_destroy(&logname);
4157                                 rc = name_create_mdt(&logname, mti->mti_fsname,
4158                                                      i);
4159                                 if (rc < 0)
4160                                         break;
4161
4162                                 if (mgs_log_is_empty(env, mgs, logname))
4163                                         continue;
4164
4165                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
4166                                          i);
4167                                 name_destroy(&cname);
4168                                 rc = name_create(&cname, mti->mti_svname,
4169                                                  suffix);
4170                                 if (rc < 0)
4171                                         break;
4172
4173                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4174                                                   &mgi->mgi_bufs, cname, ptr);
4175                                 if (rc < 0)
4176                                         break;
4177
4178                                 /* Add configuration log for noitfying LOD
4179                                  * to active/deactive the OSP. */
4180                                 name_destroy(&param_str);
4181                                 rc = name_create(&param_str, cname,
4182                                                  (*tmp == '0') ?  ".active=0" :
4183                                                  ".active=1");
4184                                 if (rc < 0)
4185                                         break;
4186
4187                                 name_destroy(&lodname);
4188                                 rc = name_create(&lodname, logname, "-mdtlov");
4189                                 if (rc < 0)
4190                                         break;
4191
4192                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4193                                                   &mgi->mgi_bufs, lodname,
4194                                                   param_str);
4195                                 if (rc < 0)
4196                                         break;
4197                         }
4198                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4199                         name_destroy(&lodname);
4200                         name_destroy(&param_str);
4201                 }
4202
4203                 name_destroy(&logname);
4204                 name_destroy(&cname);
4205                 GOTO(end, rc);
4206         }
4207
4208         /* All mdt. params in proc */
4209         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
4210                 int i;
4211                 __u32 idx;
4212
4213                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4214                 if (strncmp(mti->mti_svname, mti->mti_fsname,
4215                             MTI_NAME_MAXLEN) == 0)
4216                         /* device is unspecified completely? */
4217                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
4218                 else
4219                         rc = server_name2index(mti->mti_svname, &idx, NULL);
4220                 if (rc < 0)
4221                         goto active_err;
4222                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
4223                         goto active_err;
4224                 if (rc & LDD_F_SV_ALL) {
4225                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4226                                 if (!test_bit(i,
4227                                                   fsdb->fsdb_mdt_index_map))
4228                                         continue;
4229                                 rc = name_create_mdt(&logname,
4230                                                 mti->mti_fsname, i);
4231                                 if (rc)
4232                                         goto active_err;
4233                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4234                                                   logname, &mgi->mgi_bufs,
4235                                                   logname, ptr);
4236                                 name_destroy(&logname);
4237                                 if (rc)
4238                                         goto active_err;
4239                         }
4240                 } else {
4241                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
4242                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
4243                                 LCONSOLE_ERROR("%s: root squash parameters "
4244                                         "cannot be applied to a single MDT\n",
4245                                         mti->mti_fsname);
4246                                 GOTO(end, rc = -EINVAL);
4247                         }
4248                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4249                                           mti->mti_svname, &mgi->mgi_bufs,
4250                                           mti->mti_svname, ptr);
4251                         if (rc)
4252                                 goto active_err;
4253                 }
4254
4255                 /* root squash settings are also applied to llite
4256                  * config log (see LU-1778) */
4257                 if (rc == 0 &&
4258                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4259                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4260                         char *cname;
4261                         char *ptr2;
4262
4263                         rc = name_create(&cname, mti->mti_fsname, "-client");
4264                         if (rc)
4265                                 GOTO(end, rc);
4266                         rc = name_create(&logname, mti->mti_fsname, "-client");
4267                         if (rc) {
4268                                 name_destroy(&cname);
4269                                 GOTO(end, rc);
4270                         }
4271                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
4272                         if (rc) {
4273                                 name_destroy(&cname);
4274                                 name_destroy(&logname);
4275                                 GOTO(end, rc);
4276                         }
4277                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4278                                           &mgi->mgi_bufs, cname, ptr2);
4279                         name_destroy(&ptr2);
4280                         name_destroy(&logname);
4281                         name_destroy(&cname);
4282                 }
4283                 GOTO(end, rc);
4284         }
4285
4286         /* All mdd., ost. and osd. params in proc */
4287         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
4288             (class_match_param(ptr, PARAM_LOD, NULL) == 0) ||
4289             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
4290             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
4291                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4292                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4293                         GOTO(end, rc = -ENODEV);
4294
4295                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4296                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
4297                 GOTO(end, rc);
4298         }
4299
4300         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
4301
4302 end:
4303         if (rc)
4304                 CERROR("err %d on param '%s'\n", rc, ptr);
4305
4306         RETURN(rc);
4307 }
4308
4309 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
4310                          struct mgs_target_info *mti, struct fs_db *fsdb)
4311 {
4312         char    *buf, *params;
4313         int      rc = -EINVAL;
4314
4315         ENTRY;
4316
4317         /* set/check the new target index */
4318         rc = mgs_set_index(env, mgs, mti);
4319         if (rc < 0)
4320                 RETURN(rc);
4321
4322         if (rc == EALREADY) {
4323                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
4324                               mti->mti_stripe_index, mti->mti_svname);
4325                 /* We would like to mark old log sections as invalid
4326                    and add new log sections in the client and mdt logs.
4327                    But if we add new sections, then live clients will
4328                    get repeat setup instructions for already running
4329                    osc's. So don't update the client/mdt logs. */
4330                 mti->mti_flags &= ~LDD_F_UPDATE;
4331                 rc = 0;
4332         }
4333
4334         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
4335                          cfs_fail_val : 10);
4336
4337         mutex_lock(&fsdb->fsdb_mutex);
4338
4339         if (mti->mti_flags & (LDD_F_VIRGIN | LDD_F_WRITECONF)) {
4340                 /* Generate a log from scratch */
4341                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4342                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
4343                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4344                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
4345                 } else {
4346                         CERROR("Unknown target type %#x, can't create log for %s\n",
4347                                mti->mti_flags, mti->mti_svname);
4348                 }
4349                 if (rc) {
4350                         CERROR("Can't write logs for %s (%d)\n",
4351                                mti->mti_svname, rc);
4352                         GOTO(out_up, rc);
4353                 }
4354         } else {
4355                 /* Just update the params from tunefs in mgs_write_log_params */
4356                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
4357                 mti->mti_flags |= LDD_F_PARAM;
4358         }
4359
4360         /* allocate temporary buffer, where class_get_next_param will
4361            make copy of a current  parameter */
4362         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
4363         if (buf == NULL)
4364                 GOTO(out_up, rc = -ENOMEM);
4365         params = mti->mti_params;
4366         while (params != NULL) {
4367                 rc = class_get_next_param(&params, buf);
4368                 if (rc) {
4369                         if (rc == 1)
4370                                 /* there is no next parameter, that is
4371                                    not an error */
4372                                 rc = 0;
4373                         break;
4374                 }
4375                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
4376                        params, buf);
4377                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
4378                 if (rc)
4379                         break;
4380         }
4381
4382         OBD_FREE(buf, strlen(mti->mti_params) + 1);
4383
4384 out_up:
4385         mutex_unlock(&fsdb->fsdb_mutex);
4386         RETURN(rc);
4387 }
4388
4389 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
4390 {
4391         struct llog_ctxt        *ctxt;
4392         int                      rc = 0;
4393
4394         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4395         if (ctxt == NULL) {
4396                 CERROR("%s: MGS config context doesn't exist\n",
4397                        mgs->mgs_obd->obd_name);
4398                 rc = -ENODEV;
4399         } else {
4400                 rc = llog_erase(env, ctxt, NULL, name);
4401                 /* llog may not exist */
4402                 if (rc == -ENOENT)
4403                         rc = 0;
4404                 llog_ctxt_put(ctxt);
4405         }
4406
4407         if (rc)
4408                 CERROR("%s: failed to clear log %s: %d\n",
4409                        mgs->mgs_obd->obd_name, name, rc);
4410
4411         return rc;
4412 }
4413
4414 /* erase all logs for the given fs */
4415 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs,
4416                    const char *fsname)
4417 {
4418         struct list_head log_list;
4419         struct mgs_direntry *dirent, *n;
4420         char barrier_name[20] = {};
4421         char *suffix;
4422         int count = 0;
4423         int rc, len = strlen(fsname);
4424         ENTRY;
4425
4426         mutex_lock(&mgs->mgs_mutex);
4427
4428         /* Find all the logs in the CONFIGS directory */
4429         rc = class_dentry_readdir(env, mgs, &log_list);
4430         if (rc) {
4431                 mutex_unlock(&mgs->mgs_mutex);
4432                 RETURN(rc);
4433         }
4434
4435         if (list_empty(&log_list)) {
4436                 mutex_unlock(&mgs->mgs_mutex);
4437                 RETURN(-ENOENT);
4438         }
4439
4440         snprintf(barrier_name, sizeof(barrier_name) - 1, "%s-%s",
4441                  fsname, BARRIER_FILENAME);
4442         /* Delete the barrier fsdb */
4443         mgs_remove_fsdb_by_name(mgs, barrier_name);
4444         /* Delete the fs db */
4445         mgs_remove_fsdb_by_name(mgs, fsname);
4446         mutex_unlock(&mgs->mgs_mutex);
4447
4448         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4449                 list_del_init(&dirent->mde_list);
4450                 suffix = strrchr(dirent->mde_name, '-');
4451                 if (suffix != NULL) {
4452                         if ((len == suffix - dirent->mde_name) &&
4453                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
4454                                 CDEBUG(D_MGS, "Removing log %s\n",
4455                                        dirent->mde_name);
4456                                 mgs_erase_log(env, mgs, dirent->mde_name);
4457                                 count++;
4458                         }
4459                 }
4460                 mgs_direntry_free(dirent);
4461         }
4462
4463         if (count == 0)
4464                 rc = -ENOENT;
4465
4466         RETURN(rc);
4467 }
4468
4469 /* list all logs for the given fs */
4470 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
4471                   struct obd_ioctl_data *data)
4472 {
4473         struct list_head         log_list;
4474         struct mgs_direntry     *dirent, *n;
4475         char                    *out, *suffix, prefix[] = "config_log: ";
4476         int                      prefix_len = strlen(prefix);
4477         int                      len, remains, start = 0, rc;
4478
4479         ENTRY;
4480
4481         /* Find all the logs in the CONFIGS directory */
4482         rc = class_dentry_readdir(env, mgs, &log_list);
4483         if (rc)
4484                 RETURN(rc);
4485
4486         out = data->ioc_bulk;
4487         remains = data->ioc_inllen1;
4488         /* OBD_FAIL: fetch the config_log records from the specified one */
4489         if (OBD_FAIL_CHECK(OBD_FAIL_CATLIST))
4490                 data->ioc_count = cfs_fail_val;
4491
4492         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4493                 list_del_init(&dirent->mde_list);
4494                 suffix = strrchr(dirent->mde_name, '-');
4495                 if (suffix != NULL) {
4496                         len = prefix_len + dirent->mde_len + 1;
4497                         if (remains - len < 0) {
4498                                 /* No enough space for this record */
4499                                 mgs_direntry_free(dirent);
4500                                 goto out;
4501                         }
4502                         start++;
4503                         if (start < data->ioc_count) {
4504                                 mgs_direntry_free(dirent);
4505                                 continue;
4506                         }
4507                         len = scnprintf(out, remains, "%s%s\n", prefix,
4508                                         dirent->mde_name);
4509                         out += len;
4510                         remains -= len;
4511                 }
4512                 mgs_direntry_free(dirent);
4513                 if (remains <= 1)
4514                         /* Full */
4515                         goto out;
4516         }
4517         /* Finished */
4518         start = 0;
4519 out:
4520         data->ioc_count = start;
4521         RETURN(rc);
4522 }
4523
4524 struct mgs_lcfg_fork_data {
4525         struct lustre_cfg_bufs   mlfd_bufs;
4526         struct mgs_device       *mlfd_mgs;
4527         struct llog_handle      *mlfd_llh;
4528         const char              *mlfd_oldname;
4529         const char              *mlfd_newname;
4530         char                     mlfd_data[0];
4531 };
4532
4533 static bool contain_valid_fsname(char *buf, const char *fsname,
4534                                  int buflen, int namelen)
4535 {
4536         if (buflen < namelen)
4537                 return false;
4538
4539         if (memcmp(buf, fsname, namelen) != 0)
4540                 return false;
4541
4542         if (buf[namelen] != '\0' && buf[namelen] != '-')
4543                 return false;
4544
4545         return true;
4546 }
4547
4548 static int mgs_lcfg_fork_handler(const struct lu_env *env,
4549                                  struct llog_handle *o_llh,
4550                                  struct llog_rec_hdr *o_rec, void *data)
4551 {
4552         struct mgs_lcfg_fork_data *mlfd = data;
4553         struct lustre_cfg_bufs *n_bufs = &mlfd->mlfd_bufs;
4554         struct lustre_cfg *o_lcfg = (struct lustre_cfg *)(o_rec + 1);
4555         struct llog_cfg_rec *lcr;
4556         char *o_buf;
4557         char *n_buf = mlfd->mlfd_data;
4558         int o_buflen;
4559         int o_namelen = strlen(mlfd->mlfd_oldname);
4560         int n_namelen = strlen(mlfd->mlfd_newname);
4561         int diff = n_namelen - o_namelen;
4562         __u32 cmd = o_lcfg->lcfg_command;
4563         __u32 cnt = o_lcfg->lcfg_bufcount;
4564         int rc;
4565         int i;
4566         ENTRY;
4567
4568         /* buf[0] */
4569         o_buf = lustre_cfg_buf(o_lcfg, 0);
4570         o_buflen = o_lcfg->lcfg_buflens[0];
4571         if (contain_valid_fsname(o_buf, mlfd->mlfd_oldname, o_buflen,
4572                                  o_namelen)) {
4573                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4574                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4575                        o_buflen - o_namelen);
4576                 lustre_cfg_bufs_reset(n_bufs, n_buf);
4577                 n_buf += cfs_size_round(o_buflen + diff);
4578         } else {
4579                 lustre_cfg_bufs_reset(n_bufs, o_buflen != 0 ? o_buf : NULL);
4580         }
4581
4582         switch (cmd) {
4583         case LCFG_MARKER: {
4584                 struct cfg_marker *o_marker;
4585                 struct cfg_marker *n_marker;
4586                 int tgt_namelen;
4587
4588                 if (cnt != 2) {
4589                         CDEBUG(D_MGS, "Unknown cfg marker entry with %d "
4590                                "buffers\n", cnt);
4591                         RETURN(-EINVAL);
4592                 }
4593
4594                 /* buf[1] is marker */
4595                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4596                 o_buflen = o_lcfg->lcfg_buflens[1];
4597                 o_marker = (struct cfg_marker *)o_buf;
4598                 if (!contain_valid_fsname(o_marker->cm_tgtname,
4599                                           mlfd->mlfd_oldname,
4600                                           sizeof(o_marker->cm_tgtname),
4601                                           o_namelen)) {
4602                         lustre_cfg_bufs_set(n_bufs, 1, o_marker,
4603                                             sizeof(*o_marker));
4604                         break;
4605                 }
4606
4607                 n_marker = (struct cfg_marker *)n_buf;
4608                 *n_marker = *o_marker;
4609                 memcpy(n_marker->cm_tgtname, mlfd->mlfd_newname, n_namelen);
4610                 tgt_namelen = strlen(o_marker->cm_tgtname);
4611                 if (tgt_namelen > o_namelen)
4612                         memcpy(n_marker->cm_tgtname + n_namelen,
4613                                o_marker->cm_tgtname + o_namelen,
4614                                tgt_namelen - o_namelen);
4615                 n_marker->cm_tgtname[tgt_namelen + diff] = '\0';
4616                 lustre_cfg_bufs_set(n_bufs, 1, n_marker, sizeof(*n_marker));
4617                 break;
4618         }
4619         case LCFG_PARAM:
4620         case LCFG_SET_PARAM: {
4621                 for (i = 1; i < cnt; i++)
4622                         /* buf[i] is the param value, reuse it directly */
4623                         lustre_cfg_bufs_set(n_bufs, i,
4624                                             lustre_cfg_buf(o_lcfg, i),
4625                                             o_lcfg->lcfg_buflens[i]);
4626                 break;
4627         }
4628         case LCFG_POOL_NEW:
4629         case LCFG_POOL_ADD:
4630         case LCFG_POOL_REM:
4631         case LCFG_POOL_DEL: {
4632                 if (cnt < 3 || cnt > 4) {
4633                         CDEBUG(D_MGS, "Unknown cfg pool (%x) entry with %d "
4634                                "buffers\n", cmd, cnt);
4635                         RETURN(-EINVAL);
4636                 }
4637
4638                 /* buf[1] is fsname */
4639                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4640                 o_buflen = o_lcfg->lcfg_buflens[1];
4641                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4642                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4643                        o_buflen - o_namelen);
4644                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen + diff);
4645                 n_buf += cfs_size_round(o_buflen + diff);
4646
4647                 /* buf[2] is the pool name, reuse it directly */
4648                 lustre_cfg_bufs_set(n_bufs, 2, lustre_cfg_buf(o_lcfg, 2),
4649                                     o_lcfg->lcfg_buflens[2]);
4650
4651                 if (cnt == 3)
4652                         break;
4653
4654                 /* buf[3] is ostname */
4655                 o_buf = lustre_cfg_buf(o_lcfg, 3);
4656                 o_buflen = o_lcfg->lcfg_buflens[3];
4657                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4658                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4659                        o_buflen - o_namelen);
4660                 lustre_cfg_bufs_set(n_bufs, 3, n_buf, o_buflen + diff);
4661                 break;
4662         }
4663         case LCFG_SETUP: {
4664                 if (cnt == 2) {
4665                         o_buflen = o_lcfg->lcfg_buflens[1];
4666                         if (o_buflen == sizeof(struct lov_desc) ||
4667                             o_buflen == sizeof(struct lmv_desc)) {
4668                                 char *o_uuid;
4669                                 char *n_uuid;
4670                                 int uuid_len;
4671
4672                                 /* buf[1] */
4673                                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4674                                 if (o_buflen == sizeof(struct lov_desc)) {
4675                                         struct lov_desc *o_desc =
4676                                                 (struct lov_desc *)o_buf;
4677                                         struct lov_desc *n_desc =
4678                                                 (struct lov_desc *)n_buf;
4679
4680                                         *n_desc = *o_desc;
4681                                         o_uuid = o_desc->ld_uuid.uuid;
4682                                         n_uuid = n_desc->ld_uuid.uuid;
4683                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4684                                 } else {
4685                                         struct lmv_desc *o_desc =
4686                                                 (struct lmv_desc *)o_buf;
4687                                         struct lmv_desc *n_desc =
4688                                                 (struct lmv_desc *)n_buf;
4689
4690                                         *n_desc = *o_desc;
4691                                         o_uuid = o_desc->ld_uuid.uuid;
4692                                         n_uuid = n_desc->ld_uuid.uuid;
4693                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4694                                 }
4695
4696                                 if (unlikely(!contain_valid_fsname(o_uuid,
4697                                                 mlfd->mlfd_oldname, uuid_len,
4698                                                 o_namelen))) {
4699                                         lustre_cfg_bufs_set(n_bufs, 1, o_buf,
4700                                                             o_buflen);
4701                                         break;
4702                                 }
4703
4704                                 memcpy(n_uuid, mlfd->mlfd_newname, n_namelen);
4705                                 uuid_len = strlen(o_uuid);
4706                                 if (uuid_len > o_namelen)
4707                                         memcpy(n_uuid + n_namelen,
4708                                                o_uuid + o_namelen,
4709                                                uuid_len - o_namelen);
4710                                 n_uuid[uuid_len + diff] = '\0';
4711                                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen);
4712                                 break;
4713                         } /* else case fall through */
4714                 } /* else case fall through */
4715         }
4716         default: {
4717                 for (i = 1; i < cnt; i++) {
4718                         o_buflen = o_lcfg->lcfg_buflens[i];
4719                         if (o_buflen == 0)
4720                                 continue;
4721
4722                         o_buf = lustre_cfg_buf(o_lcfg, i);
4723                         if (!contain_valid_fsname(o_buf, mlfd->mlfd_oldname,
4724                                                   o_buflen, o_namelen)) {
4725                                 lustre_cfg_bufs_set(n_bufs, i, o_buf, o_buflen);
4726                                 continue;
4727                         }
4728
4729                         memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4730                         if (o_buflen == o_namelen) {
4731                                 lustre_cfg_bufs_set(n_bufs, i, n_buf,
4732                                                     n_namelen);
4733                                 n_buf += cfs_size_round(n_namelen);
4734                                 continue;
4735                         }
4736
4737                         memcpy(n_buf + n_namelen, o_buf + o_namelen,
4738                                o_buflen - o_namelen);
4739                         lustre_cfg_bufs_set(n_bufs, i, n_buf, o_buflen + diff);
4740                         n_buf += cfs_size_round(o_buflen + diff);
4741                 }
4742                 break;
4743         }
4744         }
4745
4746         lcr = lustre_cfg_rec_new(cmd, n_bufs);
4747         if (!lcr)
4748                 RETURN(-ENOMEM);
4749
4750         lcr->lcr_cfg = *o_lcfg;
4751         rc = llog_write(env, mlfd->mlfd_llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
4752         lustre_cfg_rec_free(lcr);
4753
4754         RETURN(rc);
4755 }
4756
4757 static int mgs_lcfg_fork_one(const struct lu_env *env, struct mgs_device *mgs,
4758                              struct mgs_direntry *mde, const char *oldname,
4759                              const char *newname)
4760 {
4761         struct llog_handle *old_llh = NULL;
4762         struct llog_handle *new_llh = NULL;
4763         struct llog_ctxt *ctxt = NULL;
4764         struct mgs_lcfg_fork_data *mlfd = NULL;
4765         char *name_buf = NULL;
4766         int name_buflen;
4767         int old_namelen = strlen(oldname);
4768         int new_namelen = strlen(newname);
4769         int rc;
4770         ENTRY;
4771
4772         name_buflen = mde->mde_len + new_namelen - old_namelen;
4773         OBD_ALLOC(name_buf, name_buflen);
4774         if (!name_buf)
4775                 RETURN(-ENOMEM);
4776
4777         memcpy(name_buf, newname, new_namelen);
4778         memcpy(name_buf + new_namelen, mde->mde_name + old_namelen,
4779                mde->mde_len - old_namelen);
4780
4781         CDEBUG(D_MGS, "Fork the config-log from %s to %s\n",
4782                mde->mde_name, name_buf);
4783
4784         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4785         LASSERT(ctxt);
4786
4787         rc = llog_open_create(env, ctxt, &new_llh, NULL, name_buf);
4788         if (rc)
4789                 GOTO(out, rc);
4790
4791         rc = llog_init_handle(env, new_llh, LLOG_F_IS_PLAIN, NULL);
4792         if (rc)
4793                 GOTO(out, rc);
4794
4795         if (unlikely(mgs_log_is_empty(env, mgs, mde->mde_name)))
4796                 GOTO(out, rc = 0);
4797
4798         rc = llog_open(env, ctxt, &old_llh, NULL, mde->mde_name,
4799                        LLOG_OPEN_EXISTS);
4800         if (rc)
4801                 GOTO(out, rc);
4802
4803         rc = llog_init_handle(env, old_llh, LLOG_F_IS_PLAIN, NULL);
4804         if (rc)
4805                 GOTO(out, rc);
4806
4807         new_llh->lgh_hdr->llh_tgtuuid = old_llh->lgh_hdr->llh_tgtuuid;
4808
4809         OBD_ALLOC(mlfd, LLOG_MIN_CHUNK_SIZE);
4810         if (!mlfd)
4811                 GOTO(out, rc = -ENOMEM);
4812
4813         mlfd->mlfd_mgs = mgs;
4814         mlfd->mlfd_llh = new_llh;
4815         mlfd->mlfd_oldname = oldname;
4816         mlfd->mlfd_newname = newname;
4817
4818         rc = llog_process(env, old_llh, mgs_lcfg_fork_handler, mlfd, NULL);
4819         OBD_FREE(mlfd, LLOG_MIN_CHUNK_SIZE);
4820
4821         GOTO(out, rc);
4822
4823 out:
4824         if (old_llh)
4825                 llog_close(env, old_llh);
4826         if (new_llh)
4827                 llog_close(env, new_llh);
4828         if (name_buf)
4829                 OBD_FREE(name_buf, name_buflen);
4830         if (ctxt)
4831                 llog_ctxt_put(ctxt);
4832
4833         return rc;
4834 }
4835
4836 int mgs_lcfg_fork(const struct lu_env *env, struct mgs_device *mgs,
4837                   const char *oldname, const char *newname)
4838 {
4839         struct list_head log_list;
4840         struct mgs_direntry *dirent, *n;
4841         int olen = strlen(oldname);
4842         int nlen = strlen(newname);
4843         int count = 0;
4844         int rc = 0;
4845         ENTRY;
4846
4847         if (unlikely(!oldname || oldname[0] == '\0' ||
4848                      !newname || newname[0] == '\0'))
4849                 RETURN(-EINVAL);
4850
4851         if (strcmp(oldname, newname) == 0)
4852                 RETURN(-EINVAL);
4853
4854         /* lock it to prevent fork/erase/register in parallel. */
4855         mutex_lock(&mgs->mgs_mutex);
4856
4857         rc = class_dentry_readdir(env, mgs, &log_list);
4858         if (rc) {
4859                 mutex_unlock(&mgs->mgs_mutex);
4860                 RETURN(rc);
4861         }
4862
4863         if (list_empty(&log_list)) {
4864                 mutex_unlock(&mgs->mgs_mutex);
4865                 RETURN(-ENOENT);
4866         }
4867
4868         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4869                 char *ptr;
4870
4871                 ptr = strrchr(dirent->mde_name, '-');
4872                 if (ptr) {
4873                         int tlen = ptr - dirent->mde_name;
4874
4875                         if (tlen == nlen &&
4876                             strncmp(newname, dirent->mde_name, tlen) == 0)
4877                                 GOTO(out, rc = -EEXIST);
4878
4879                         if (tlen == olen &&
4880                             strncmp(oldname, dirent->mde_name, tlen) == 0)
4881                                 continue;
4882                 }
4883
4884                 list_del_init(&dirent->mde_list);
4885                 mgs_direntry_free(dirent);
4886         }
4887
4888         if (list_empty(&log_list)) {
4889                 mutex_unlock(&mgs->mgs_mutex);
4890                 RETURN(-ENOENT);
4891         }
4892
4893         list_for_each_entry(dirent, &log_list, mde_list) {
4894                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, newname);
4895                 if (rc)
4896                         break;
4897
4898                 count++;
4899         }
4900
4901 out:
4902         mutex_unlock(&mgs->mgs_mutex);
4903
4904         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4905                 list_del_init(&dirent->mde_list);
4906                 mgs_direntry_free(dirent);
4907         }
4908
4909         if (rc && count > 0)
4910                 mgs_erase_logs(env, mgs, newname);
4911
4912         RETURN(rc);
4913 }
4914
4915 int mgs_lcfg_erase(const struct lu_env *env, struct mgs_device *mgs,
4916                    const char *fsname)
4917 {
4918         int rc;
4919         ENTRY;
4920
4921         if (unlikely(!fsname || fsname[0] == '\0'))
4922                 RETURN(-EINVAL);
4923
4924         rc = mgs_erase_logs(env, mgs, fsname);
4925
4926         RETURN(rc);
4927 }
4928
4929 static int mgs_xattr_del(const struct lu_env *env, struct dt_object *obj)
4930 {
4931         struct dt_device *dev;
4932         struct thandle *th = NULL;
4933         int rc = 0;
4934
4935         ENTRY;
4936
4937         dev = container_of0(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
4938         th = dt_trans_create(env, dev);
4939         if (IS_ERR(th))
4940                 RETURN(PTR_ERR(th));
4941
4942         rc = dt_declare_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4943         if (rc)
4944                 GOTO(stop, rc);
4945
4946         rc = dt_trans_start_local(env, dev, th);
4947         if (rc)
4948                 GOTO(stop, rc);
4949
4950         dt_write_lock(env, obj, 0);
4951         rc = dt_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4952
4953         GOTO(unlock, rc);
4954
4955 unlock:
4956         dt_write_unlock(env, obj);
4957
4958 stop:
4959         dt_trans_stop(env, dev, th);
4960
4961         return rc;
4962 }
4963
4964 int mgs_lcfg_rename(const struct lu_env *env, struct mgs_device *mgs)
4965 {
4966         struct list_head log_list;
4967         struct mgs_direntry *dirent, *n;
4968         char fsname[16];
4969         struct lu_buf buf = {
4970                 .lb_buf = fsname,
4971                 .lb_len = sizeof(fsname)
4972         };
4973         int rc = 0;
4974
4975         ENTRY;
4976
4977         rc = class_dentry_readdir(env, mgs, &log_list);
4978         if (rc)
4979                 RETURN(rc);
4980
4981         if (list_empty(&log_list))
4982                 RETURN(0);
4983
4984         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4985                 struct dt_object *o = NULL;
4986                 char oldname[16];
4987                 char *ptr;
4988                 int len;
4989
4990                 list_del_init(&dirent->mde_list);
4991                 ptr = strrchr(dirent->mde_name, '-');
4992                 if (!ptr)
4993                         goto next;
4994
4995                 len = ptr - dirent->mde_name;
4996                 if (unlikely(len >= sizeof(oldname))) {
4997                         CDEBUG(D_MGS, "Skip invalid configuration file %s\n",
4998                                dirent->mde_name);
4999                         goto next;
5000                 }
5001
5002                 o = local_file_find(env, mgs->mgs_los, mgs->mgs_configs_dir,
5003                                     dirent->mde_name);
5004                 if (IS_ERR(o)) {
5005                         rc = PTR_ERR(o);
5006                         CDEBUG(D_MGS, "Fail to locate file %s: rc = %d\n",
5007                                dirent->mde_name, rc);
5008                         goto next;
5009                 }
5010
5011                 rc = dt_xattr_get(env, o, &buf, XATTR_TARGET_RENAME);
5012                 if (rc < 0) {
5013                         if (rc == -ENODATA)
5014                                 rc = 0;
5015                         else
5016                                 CDEBUG(D_MGS,
5017                                        "Fail to get EA for %s: rc = %d\n",
5018                                        dirent->mde_name, rc);
5019                         goto next;
5020                 }
5021
5022                 if (unlikely(rc == len &&
5023                              memcmp(fsname, dirent->mde_name, len) == 0)) {
5024                         /* The new fsname is the same as the old one. */
5025                         rc = mgs_xattr_del(env, o);
5026                         goto next;
5027                 }
5028
5029                 memcpy(oldname, dirent->mde_name, len);
5030                 oldname[len] = '\0';
5031                 fsname[rc] = '\0';
5032                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, fsname);
5033                 if (rc && rc != -EEXIST) {
5034                         CDEBUG(D_MGS, "Fail to fork %s: rc = %d\n",
5035                                dirent->mde_name, rc);
5036                         goto next;
5037                 }
5038
5039                 rc = mgs_erase_log(env, mgs, dirent->mde_name);
5040                 if (rc) {
5041                         CDEBUG(D_MGS, "Fail to erase old %s: rc = %d\n",
5042                                dirent->mde_name, rc);
5043                         /* keep it there if failed to remove it. */
5044                         rc = 0;
5045                 }
5046
5047 next:
5048                 if (o && !IS_ERR(o))
5049                         lu_object_put(env, &o->do_lu);
5050
5051                 mgs_direntry_free(dirent);
5052                 if (rc)
5053                         break;
5054         }
5055
5056         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
5057                 list_del_init(&dirent->mde_list);
5058                 mgs_direntry_free(dirent);
5059         }
5060
5061         RETURN(rc);
5062 }
5063
5064 /* Setup _mgs fsdb and log
5065  */
5066 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5067 {
5068         struct fs_db *fsdb = NULL;
5069         int rc;
5070         ENTRY;
5071
5072         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
5073         if (!rc)
5074                 mgs_put_fsdb(mgs, fsdb);
5075
5076         RETURN(rc);
5077 }
5078
5079 /* Setup params fsdb and log
5080  */
5081 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5082 {
5083         struct fs_db *fsdb = NULL;
5084         struct llog_handle *params_llh = NULL;
5085         int rc;
5086         ENTRY;
5087
5088         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5089         if (!rc) {
5090                 mutex_lock(&fsdb->fsdb_mutex);
5091                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
5092                 if (!rc)
5093                         rc = record_end_log(env, &params_llh);
5094                 mutex_unlock(&fsdb->fsdb_mutex);
5095                 mgs_put_fsdb(mgs, fsdb);
5096         }
5097
5098         RETURN(rc);
5099 }
5100
5101 /* Cleanup params fsdb and log
5102  */
5103 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
5104 {
5105         int rc;
5106
5107         rc = mgs_erase_logs(env, mgs, PARAMS_FILENAME);
5108         return rc == -ENOENT ? 0 : rc;
5109 }
5110
5111 /**
5112  * Fill in the mgs_target_info based on data devname and param provide.
5113  *
5114  * @env         thread context
5115  * @mgs         mgs device
5116  * @mti         mgs target info. We want to set this based other paramters
5117  *              passed to this function. Once setup we write it to the config
5118  *              logs.
5119  * @devname     optional OBD device name
5120  * @param       string that contains both what tunable to set and the value to
5121  *              set it to.
5122  *
5123  * RETURN       0 for success
5124  *              negative error number on failure
5125  **/
5126 static int mgs_set_conf_param(const struct lu_env *env, struct mgs_device *mgs,
5127                               struct mgs_target_info *mti, const char *devname,
5128                               const char *param)
5129 {
5130         struct fs_db *fsdb = NULL;
5131         int dev_type;
5132         int rc = 0;
5133
5134         ENTRY;
5135         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
5136         if (!devname) {
5137                 size_t len;
5138
5139                 /* We have two possible cases here:
5140                  *
5141                  * 1) the device name embedded in the param:
5142                  *    lustre-OST0000.osc.max_dirty_mb=32
5143                  *
5144                  * 2) the file system name is embedded in
5145                  *    the param: lustre.sys.at.min=0
5146                  */
5147                 len = strcspn(param, ".=");
5148                 if (!len || param[len] == '=')
5149                         RETURN(-EINVAL);
5150
5151                 if (len >= sizeof(mti->mti_svname))
5152                         RETURN(-E2BIG);
5153
5154                 snprintf(mti->mti_svname, sizeof(mti->mti_svname),
5155                          "%.*s", (int)len, param);
5156                 param += len + 1;
5157         } else {
5158                 if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname)) >=
5159                     sizeof(mti->mti_svname))
5160                         RETURN(-E2BIG);
5161         }
5162
5163         if (!strlen(mti->mti_svname)) {
5164                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
5165                 RETURN(-ENOSYS);
5166         }
5167
5168         dev_type = mgs_parse_devname(mti->mti_svname, mti->mti_fsname,
5169                                      &mti->mti_stripe_index);
5170         switch (dev_type) {
5171         /* For this case we have an invalid obd device name */
5172         case -ENXIO:
5173                 CDEBUG(D_MGS, "%s don't contain an index\n", mti->mti_svname);
5174                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5175                 dev_type = 0;
5176                 break;
5177         /* Not an obd device, assume devname is the fsname.
5178          * User might of only provided fsname and not obd device
5179          */
5180         case -EINVAL:
5181                 CDEBUG(D_MGS, "%s is seen as a file system name\n", mti->mti_svname);
5182                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5183                 dev_type = 0;
5184                 break;
5185         default:
5186                 if (dev_type < 0)
5187                         GOTO(out, rc = dev_type);
5188
5189                 /* param related to llite isn't allowed to set by OST or MDT */
5190                 if (dev_type & LDD_F_SV_TYPE_OST ||
5191                     dev_type & LDD_F_SV_TYPE_MDT) {
5192                         /* param related to llite isn't allowed to set by OST
5193                          * or MDT
5194                          */
5195                         if (!strncmp(param, PARAM_LLITE,
5196                                      sizeof(PARAM_LLITE) - 1))
5197                                 GOTO(out, rc = -EINVAL);
5198
5199                         /* Strip -osc or -mdc suffix from svname */
5200                         if (server_make_name(dev_type, mti->mti_stripe_index,
5201                                              mti->mti_fsname, mti->mti_svname,
5202                                              sizeof(mti->mti_svname)))
5203                                 GOTO(out, rc = -EINVAL);
5204                 }
5205                 break;
5206         }
5207
5208         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
5209             sizeof(mti->mti_params))
5210                 GOTO(out, rc = -E2BIG);
5211
5212         CDEBUG(D_MGS, "set_conf_param fs='%s' device='%s' param='%s'\n",
5213                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5214
5215         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
5216         if (rc)
5217                 GOTO(out, rc);
5218
5219         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
5220             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5221                 CERROR("No filesystem targets for %s. cfg_device from lctl "
5222                        "is '%s'\n", mti->mti_fsname, mti->mti_svname);
5223                 mgs_unlink_fsdb(mgs, fsdb);
5224                 GOTO(out, rc = -EINVAL);
5225         }
5226
5227         /*
5228          * Revoke lock so everyone updates.  Should be alright if
5229          * someone was already reading while we were updating the logs,
5230          * so we don't really need to hold the lock while we're
5231          * writing (above).
5232          */
5233         mti->mti_flags = dev_type | LDD_F_PARAM;
5234         mutex_lock(&fsdb->fsdb_mutex);
5235         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
5236         mutex_unlock(&fsdb->fsdb_mutex);
5237         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
5238
5239 out:
5240         if (fsdb)
5241                 mgs_put_fsdb(mgs, fsdb);
5242
5243         RETURN(rc);
5244 }
5245
5246 static int mgs_set_param2(const struct lu_env *env, struct mgs_device *mgs,
5247                           struct mgs_target_info *mti, const char *param)
5248 {
5249         struct fs_db *fsdb = NULL;
5250         int dev_type;
5251         size_t len;
5252         int rc;
5253
5254         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
5255             sizeof(mti->mti_params))
5256                 GOTO(out, rc = -E2BIG);
5257
5258         /* obdname2fsname reports devname as an obd device */
5259         len = strcspn(param, ".=");
5260         if (len && param[len] != '=') {
5261                 char *ptr;
5262
5263                 param += len + 1;
5264                 ptr = strchr(param, '.');
5265
5266                 len = strlen(param);
5267                 if (ptr)
5268                         len -= strlen(ptr);
5269                 if (len >= sizeof(mti->mti_svname))
5270                         GOTO(out, rc = -E2BIG);
5271
5272                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "%.*s",
5273                         (int)len, param);
5274
5275                 obdname2fsname(mti->mti_svname, mti->mti_fsname,
5276                                sizeof(mti->mti_fsname));
5277         } else {
5278                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "general");
5279         }
5280
5281         CDEBUG(D_MGS, "set_param2 fs='%s' device='%s' param='%s'\n",
5282                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5283
5284         /* The return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5285          * A returned error tells us we don't have a target obd device.
5286          */
5287         dev_type = server_name2index(mti->mti_svname, &mti->mti_stripe_index,
5288                                      NULL);
5289         if (dev_type < 0)
5290                 dev_type = 0;
5291
5292         /* the return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5293          * Strip -osc or -mdc suffix from svname
5294          */
5295         if ((dev_type & LDD_F_SV_TYPE_OST || dev_type & LDD_F_SV_TYPE_MDT) &&
5296             server_make_name(dev_type, mti->mti_stripe_index,
5297                              mti->mti_fsname, mti->mti_svname,
5298                              sizeof(mti->mti_svname)))
5299                 GOTO(out, rc = -EINVAL);
5300
5301         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5302         if (rc)
5303                 GOTO(out, rc);
5304         /*
5305          * Revoke lock so everyone updates.  Should be alright if
5306          * someone was already reading while we were updating the logs,
5307          * so we don't really need to hold the lock while we're
5308          * writing (above).
5309          */
5310         mti->mti_flags = dev_type | LDD_F_PARAM2;
5311         mutex_lock(&fsdb->fsdb_mutex);
5312         rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
5313         mutex_unlock(&fsdb->fsdb_mutex);
5314         mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
5315         mgs_put_fsdb(mgs, fsdb);
5316 out:
5317         RETURN(rc);
5318 }
5319
5320 /* Set a permanent (config log) param for a target or fs
5321  *
5322  * @lcfg buf0 may contain the device (testfs-MDT0000) name
5323  *       buf1 contains the single parameter
5324  */
5325 int mgs_set_param(const struct lu_env *env, struct mgs_device *mgs,
5326                   struct lustre_cfg *lcfg)
5327 {
5328         const char *param = lustre_cfg_string(lcfg, 1);
5329         struct mgs_target_info *mti;
5330         int rc;
5331
5332         /* Create a fake mti to hold everything */
5333         OBD_ALLOC_PTR(mti);
5334         if (!mti)
5335                 return -ENOMEM;
5336
5337         print_lustre_cfg(lcfg);
5338
5339         if (lcfg->lcfg_command == LCFG_PARAM) {
5340                 /* For the case of lctl conf_param devname can be
5341                  * lustre, lustre-mdtlov, lustre-client, lustre-MDT0000
5342                  */
5343                 const char *devname = lustre_cfg_string(lcfg, 0);
5344
5345                 rc = mgs_set_conf_param(env, mgs, mti, devname, param);
5346         } else {
5347                 /* In the case of lctl set_param -P lcfg[0] will always
5348                  * be 'general'. At least for now.
5349                  */
5350                 rc = mgs_set_param2(env, mgs, mti, param);
5351         }
5352
5353         OBD_FREE_PTR(mti);
5354
5355         return rc;
5356 }
5357
5358 static int mgs_write_log_pool(const struct lu_env *env,
5359                               struct mgs_device *mgs, char *logname,
5360                               struct fs_db *fsdb, char *tgtname,
5361                               enum lcfg_command_type cmd,
5362                               char *fsname, char *poolname,
5363                               char *ostname, char *comment)
5364 {
5365         struct llog_handle *llh = NULL;
5366         int rc;
5367
5368         rc = record_start_log(env, mgs, &llh, logname);
5369         if (rc)
5370                 return rc;
5371         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
5372         if (rc)
5373                 goto out;
5374         rc = record_base(env, llh, tgtname, 0, cmd,
5375                          fsname, poolname, ostname, NULL);
5376         if (rc)
5377                 goto out;
5378         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
5379 out:
5380         record_end_log(env, &llh);
5381         return rc;
5382 }
5383
5384 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
5385                     enum lcfg_command_type cmd, const char *nodemap_name,
5386                     char *param)
5387 {
5388         lnet_nid_t      nid[2];
5389         __u32           idmap[2];
5390         bool            bool_switch;
5391         __u32           int_id;
5392         int             rc = 0;
5393         ENTRY;
5394
5395         switch (cmd) {
5396         case LCFG_NODEMAP_ADD:
5397                 rc = nodemap_add(nodemap_name);
5398                 break;
5399         case LCFG_NODEMAP_DEL:
5400                 rc = nodemap_del(nodemap_name);
5401                 break;
5402         case LCFG_NODEMAP_ADD_RANGE:
5403                 rc = nodemap_parse_range(param, nid);
5404                 if (rc != 0)
5405                         break;
5406                 rc = nodemap_add_range(nodemap_name, nid);
5407                 break;
5408         case LCFG_NODEMAP_DEL_RANGE:
5409                 rc = nodemap_parse_range(param, nid);
5410                 if (rc != 0)
5411                         break;
5412                 rc = nodemap_del_range(nodemap_name, nid);
5413                 break;
5414         case LCFG_NODEMAP_ADMIN:
5415                 bool_switch = simple_strtoul(param, NULL, 10);
5416                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
5417                 break;
5418         case LCFG_NODEMAP_DENY_UNKNOWN:
5419                 bool_switch = simple_strtoul(param, NULL, 10);
5420                 rc = nodemap_set_deny_unknown(nodemap_name, bool_switch);
5421                 break;
5422         case LCFG_NODEMAP_AUDIT_MODE:
5423                 rc = kstrtoul(param, 10, (unsigned long *)&bool_switch);
5424                 if (rc == 0)
5425                         rc = nodemap_set_audit_mode(nodemap_name, bool_switch);
5426                 break;
5427         case LCFG_NODEMAP_MAP_MODE:
5428                 if (strcmp("both", param) == 0)
5429                         rc = nodemap_set_mapping_mode(nodemap_name,
5430                                                       NODEMAP_MAP_BOTH);
5431                 else if (strcmp("uid_only", param) == 0)
5432                         rc = nodemap_set_mapping_mode(nodemap_name,
5433                                                       NODEMAP_MAP_UID_ONLY);
5434                 else if (strcmp("gid_only", param) == 0)
5435                         rc = nodemap_set_mapping_mode(nodemap_name,
5436                                                       NODEMAP_MAP_GID_ONLY);
5437                 else
5438                         rc = -EINVAL;
5439                 break;
5440         case LCFG_NODEMAP_TRUSTED:
5441                 bool_switch = simple_strtoul(param, NULL, 10);
5442                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
5443                 break;
5444         case LCFG_NODEMAP_SQUASH_UID:
5445                 int_id = simple_strtoul(param, NULL, 10);
5446                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
5447                 break;
5448         case LCFG_NODEMAP_SQUASH_GID:
5449                 int_id = simple_strtoul(param, NULL, 10);
5450                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
5451                 break;
5452         case LCFG_NODEMAP_ADD_UIDMAP:
5453         case LCFG_NODEMAP_ADD_GIDMAP:
5454                 rc = nodemap_parse_idmap(param, idmap);
5455                 if (rc != 0)
5456                         break;
5457                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
5458                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
5459                                                idmap);
5460                 else
5461                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
5462                                                idmap);
5463                 break;
5464         case LCFG_NODEMAP_DEL_UIDMAP:
5465         case LCFG_NODEMAP_DEL_GIDMAP:
5466                 rc = nodemap_parse_idmap(param, idmap);
5467                 if (rc != 0)
5468                         break;
5469                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
5470                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
5471                                                idmap);
5472                 else
5473                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
5474                                                idmap);
5475                 break;
5476         case LCFG_NODEMAP_SET_FILESET:
5477                 rc = nodemap_set_fileset(nodemap_name, param);
5478                 break;
5479         case LCFG_NODEMAP_SET_SEPOL:
5480                 rc = nodemap_set_sepol(nodemap_name, param);
5481                 break;
5482         default:
5483                 rc = -EINVAL;
5484         }
5485
5486         RETURN(rc);
5487 }
5488
5489 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
5490                  enum lcfg_command_type cmd, char *fsname,
5491                  char *poolname, char *ostname)
5492 {
5493         struct fs_db *fsdb;
5494         char *lovname;
5495         char *logname;
5496         char *label = NULL, *canceled_label = NULL;
5497         int label_sz;
5498         struct mgs_target_info *mti = NULL;
5499         bool checked = false;
5500         bool locked = false;
5501         bool free = false;
5502         int rc, i;
5503         ENTRY;
5504
5505         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
5506         if (rc) {
5507                 CERROR("Can't get db for %s\n", fsname);
5508                 RETURN(rc);
5509         }
5510         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5511                 CERROR("%s is not defined\n", fsname);
5512                 free = true;
5513                 GOTO(out_fsdb, rc = -EINVAL);
5514         }
5515
5516         label_sz = 10 + strlen(fsname) + strlen(poolname);
5517
5518         /* check if ostname match fsname */
5519         if (ostname != NULL) {
5520                 char *ptr;
5521
5522                 ptr = strrchr(ostname, '-');
5523                 if ((ptr == NULL) ||
5524                     (strncmp(fsname, ostname, ptr-ostname) != 0))
5525                         RETURN(-EINVAL);
5526                 label_sz += strlen(ostname);
5527         }
5528
5529         OBD_ALLOC(label, label_sz);
5530         if (!label)
5531                 GOTO(out_fsdb, rc = -ENOMEM);
5532
5533         switch(cmd) {
5534         case LCFG_POOL_NEW:
5535                 sprintf(label,
5536                         "new %s.%s", fsname, poolname);
5537                 break;
5538         case LCFG_POOL_ADD:
5539                 sprintf(label,
5540                         "add %s.%s.%s", fsname, poolname, ostname);
5541                 break;
5542         case LCFG_POOL_REM:
5543                 OBD_ALLOC(canceled_label, label_sz);
5544                 if (canceled_label == NULL)
5545                         GOTO(out_label, rc = -ENOMEM);
5546                 sprintf(label,
5547                         "rem %s.%s.%s", fsname, poolname, ostname);
5548                 sprintf(canceled_label,
5549                         "add %s.%s.%s", fsname, poolname, ostname);
5550                 break;
5551         case LCFG_POOL_DEL:
5552                 OBD_ALLOC(canceled_label, label_sz);
5553                 if (canceled_label == NULL)
5554                         GOTO(out_label, rc = -ENOMEM);
5555                 sprintf(label,
5556                         "del %s.%s", fsname, poolname);
5557                 sprintf(canceled_label,
5558                         "new %s.%s", fsname, poolname);
5559                 break;
5560         default:
5561                 break;
5562         }
5563
5564         OBD_ALLOC_PTR(mti);
5565         if (mti == NULL)
5566                 GOTO(out_cancel, rc = -ENOMEM);
5567         strncpy(mti->mti_svname, "lov pool", sizeof(mti->mti_svname));
5568
5569         mutex_lock(&fsdb->fsdb_mutex);
5570         locked = true;
5571         /* write pool def to all MDT logs */
5572         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
5573                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
5574                         rc = name_create_mdt_and_lov(&logname, &lovname,
5575                                                      fsdb, i);
5576                         if (rc)
5577                                 GOTO(out_mti, rc);
5578
5579                         if (!checked && (canceled_label == NULL)) {
5580                                 rc = mgs_check_marker(env, mgs, fsdb, mti,
5581                                                 logname, lovname, label);
5582                                 if (rc) {
5583                                         name_destroy(&logname);
5584                                         name_destroy(&lovname);
5585                                         GOTO(out_mti,
5586                                                 rc = (rc == LLOG_PROC_BREAK ?
5587                                                         -EEXIST : rc));
5588                                 }
5589                                 checked = true;
5590                         }
5591                         if (canceled_label != NULL)
5592                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5593                                                 lovname, canceled_label,
5594                                                 CM_SKIP);
5595
5596                         if (rc >= 0)
5597                                 rc = mgs_write_log_pool(env, mgs, logname,
5598                                                         fsdb, lovname, cmd,
5599                                                         fsname, poolname,
5600                                                         ostname, label);
5601                         name_destroy(&logname);
5602                         name_destroy(&lovname);
5603                         if (rc)
5604                                 GOTO(out_mti, rc);
5605                 }
5606         }
5607
5608         rc = name_create(&logname, fsname, "-client");
5609         if (rc)
5610                 GOTO(out_mti, rc);
5611
5612         if (!checked && (canceled_label == NULL)) {
5613                 rc = mgs_check_marker(env, mgs, fsdb, mti, logname,
5614                                 fsdb->fsdb_clilov, label);
5615                 if (rc) {
5616                         name_destroy(&logname);
5617                         GOTO(out_mti, rc = (rc == LLOG_PROC_BREAK ?
5618                                 -EEXIST : rc));
5619                 }
5620         }
5621         if (canceled_label != NULL) {
5622                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5623                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
5624                 if (rc < 0) {
5625                         name_destroy(&logname);
5626                         GOTO(out_mti, rc);
5627                 }
5628         }
5629
5630         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
5631                                 cmd, fsname, poolname, ostname, label);
5632         mutex_unlock(&fsdb->fsdb_mutex);
5633         locked = false;
5634         name_destroy(&logname);
5635         /* request for update */
5636         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
5637
5638         GOTO(out_mti, rc);
5639
5640 out_mti:
5641         if (locked)
5642                 mutex_unlock(&fsdb->fsdb_mutex);
5643         if (mti != NULL)
5644                 OBD_FREE_PTR(mti);
5645 out_cancel:
5646         if (canceled_label != NULL)
5647                 OBD_FREE(canceled_label, label_sz);
5648 out_label:
5649         OBD_FREE(label, label_sz);
5650 out_fsdb:
5651         if (free)
5652                 mgs_unlink_fsdb(mgs, fsdb);
5653         mgs_put_fsdb(mgs, fsdb);
5654
5655         return rc;
5656 }