Whamcloud - gitweb
LU-13437 uapi: add OBD_CONNECT2_GETATTR_PFID
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgs/mgs_llog.c
33  *
34  * Lustre Management Server (mgs) config llog creation
35  *
36  * Author: Nathan Rutman <nathan@clusterfs.com>
37  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
38  * Author: Mikhail Pershin <tappro@whamcloud.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MGS
42 #define D_MGS D_CONFIG
43
44 #include <obd.h>
45 #include <uapi/linux/lustre/lustre_ioctl.h>
46 #include <uapi/linux/lustre/lustre_param.h>
47 #include <lustre_sec.h>
48 #include <lustre_quota.h>
49 #include <lustre_sec.h>
50
51 #include "mgs_internal.h"
52
53 /********************** Class functions ********************/
54
55 /**
56  * Find all logs in CONFIG directory and link then into list.
57  *
58  * \param[in] env       pointer to the thread context
59  * \param[in] mgs       pointer to the mgs device
60  * \param[out] log_list the list to hold the found llog name entry
61  *
62  * \retval              0 for success
63  * \retval              negative error number on failure
64  **/
65 int class_dentry_readdir(const struct lu_env *env, struct mgs_device *mgs,
66                          struct list_head *log_list)
67 {
68         struct dt_object *dir = mgs->mgs_configs_dir;
69         const struct dt_it_ops *iops;
70         struct dt_it *it;
71         struct mgs_direntry *de;
72         char *key;
73         int rc, key_sz;
74
75         INIT_LIST_HEAD(log_list);
76
77         LASSERT(dir);
78         LASSERT(dir->do_index_ops);
79
80         iops = &dir->do_index_ops->dio_it;
81         it = iops->init(env, dir, LUDA_64BITHASH);
82         if (IS_ERR(it))
83                 RETURN(PTR_ERR(it));
84
85         rc = iops->load(env, it, 0);
86         if (rc <= 0)
87                 GOTO(fini, rc = 0);
88
89         /* main cycle */
90         do {
91                 key = (void *)iops->key(env, it);
92                 if (IS_ERR(key)) {
93                         CERROR("%s: key failed when listing %s: rc = %d\n",
94                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
95                                (int) PTR_ERR(key));
96                         goto next;
97                 }
98                 key_sz = iops->key_size(env, it);
99                 LASSERT(key_sz > 0);
100
101                 /* filter out "." and ".." entries */
102                 if (key[0] == '.') {
103                         if (key_sz == 1)
104                                 goto next;
105                         if (key_sz == 2 && key[1] == '.')
106                                 goto next;
107                 }
108
109                 /* filter out ".bak" files */
110                 /* sizeof(".bak") - 1 == 3 */
111                 if (key_sz >= 3 &&
112                     !memcmp(".bak", key + key_sz - 3, 3)) {
113                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
114                                key_sz, key);
115                         goto next;
116                 }
117
118                 de = mgs_direntry_alloc(key_sz + 1);
119                 if (de == NULL) {
120                         rc = -ENOMEM;
121                         break;
122                 }
123
124                 memcpy(de->mde_name, key, key_sz);
125                 de->mde_name[key_sz] = 0;
126
127                 list_add(&de->mde_list, log_list);
128
129 next:
130                 rc = iops->next(env, it);
131         } while (rc == 0);
132         if (rc > 0)
133                 rc = 0;
134
135         iops->put(env, it);
136
137 fini:
138         iops->fini(env, it);
139         if (rc) {
140                 struct mgs_direntry *n;
141
142                 CERROR("%s: key failed when listing %s: rc = %d\n",
143                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
144
145                 list_for_each_entry_safe(de, n, log_list, mde_list) {
146                         list_del_init(&de->mde_list);
147                         mgs_direntry_free(de);
148                 }
149         }
150
151         RETURN(rc);
152 }
153
154 /******************** DB functions *********************/
155
156 static inline int name_create(char **newname, char *prefix, char *suffix)
157 {
158         LASSERT(newname);
159         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
160         if (!*newname)
161                 return -ENOMEM;
162         sprintf(*newname, "%s%s", prefix, suffix);
163         return 0;
164 }
165
166 static inline void name_destroy(char **name)
167 {
168         if (*name)
169                 OBD_FREE(*name, strlen(*name) + 1);
170         *name = NULL;
171 }
172
173 struct mgs_fsdb_handler_data
174 {
175         struct fs_db   *fsdb;
176         __u32           ver;
177 };
178
179 /* from the (client) config log, figure out:
180         1. which ost's/mdt's are configured (by index)
181         2. what the last config step is
182         3. COMPAT_18 osc name
183 */
184 /* It might be better to have a separate db file, instead of parsing the info
185    out of the client log.  This is slow and potentially error-prone. */
186 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
187                             struct llog_rec_hdr *rec, void *data)
188 {
189         struct mgs_fsdb_handler_data *d = data;
190         struct fs_db *fsdb = d->fsdb;
191         int cfg_len = rec->lrh_len;
192         char *cfg_buf = (char*) (rec + 1);
193         struct lustre_cfg *lcfg;
194         __u32 index;
195         int rc = 0;
196         ENTRY;
197
198         if (rec->lrh_type != OBD_CFG_REC) {
199                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
200                 RETURN(-EINVAL);
201         }
202
203         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
204         if (rc) {
205                 CERROR("Insane cfg\n");
206                 RETURN(rc);
207         }
208
209         lcfg = (struct lustre_cfg *)cfg_buf;
210
211         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
212                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
213
214         /* Figure out ost indicies */
215         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
216         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
217             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
218                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
219                                        NULL, 10);
220                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
221                        lustre_cfg_string(lcfg, 1), index,
222                        lustre_cfg_string(lcfg, 2));
223                 set_bit(index, fsdb->fsdb_ost_index_map);
224         }
225
226         /* Figure out mdt indicies */
227         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
228         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
229             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
230                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
231                                        &index, NULL);
232                 if (rc != LDD_F_SV_TYPE_MDT) {
233                         CWARN("Unparsable MDC name %s, assuming index 0\n",
234                               lustre_cfg_string(lcfg, 0));
235                         index = 0;
236                 }
237                 rc = 0;
238                 CDEBUG(D_MGS, "MDT index is %u\n", index);
239                 if (!test_bit(index, fsdb->fsdb_mdt_index_map)) {
240                         set_bit(index, fsdb->fsdb_mdt_index_map);
241                         fsdb->fsdb_mdt_count++;
242                 }
243         }
244
245         /**
246          * figure out the old config. fsdb_gen = 0 means old log
247          * It is obsoleted and not supported anymore
248          */
249         if (fsdb->fsdb_gen == 0) {
250                 CERROR("Old config format is not supported\n");
251                 RETURN(-EINVAL);
252         }
253
254         /*
255          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
256          */
257         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
258             lcfg->lcfg_command == LCFG_ATTACH &&
259             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
260                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
261                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
262                         CWARN("MDT using 1.8 OSC name scheme\n");
263                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
264                 }
265         }
266
267         if (lcfg->lcfg_command == LCFG_MARKER) {
268                 struct cfg_marker *marker;
269                 marker = lustre_cfg_buf(lcfg, 1);
270
271                 d->ver = marker->cm_vers;
272
273                 /* Keep track of the latest marker step */
274                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
275         }
276
277         RETURN(rc);
278 }
279
280 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
281 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
282                                   struct mgs_device *mgs,
283                                   struct fs_db *fsdb)
284 {
285         char *logname;
286         struct llog_handle *loghandle;
287         struct llog_ctxt *ctxt;
288         struct mgs_fsdb_handler_data d = {
289                 .fsdb = fsdb,
290         };
291         int rc;
292
293         ENTRY;
294
295         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
296         LASSERT(ctxt != NULL);
297         rc = name_create(&logname, fsdb->fsdb_name, "-client");
298         if (rc)
299                 GOTO(out_put, rc);
300         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
301         if (rc)
302                 GOTO(out_pop, rc);
303
304         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
305         if (rc)
306                 GOTO(out_close, rc);
307
308         if (llog_get_size(loghandle) <= 1)
309                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
310
311         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
312         CDEBUG(D_INFO, "get_db = %d\n", rc);
313 out_close:
314         llog_close(env, loghandle);
315 out_pop:
316         name_destroy(&logname);
317 out_put:
318         llog_ctxt_put(ctxt);
319
320         RETURN(rc);
321 }
322
323 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
324 {
325         struct mgs_tgt_srpc_conf *tgtconf;
326
327         /* free target-specific rules */
328         while (fsdb->fsdb_srpc_tgt) {
329                 tgtconf = fsdb->fsdb_srpc_tgt;
330                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
331
332                 LASSERT(tgtconf->mtsc_tgt);
333
334                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
335                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
336                 OBD_FREE_PTR(tgtconf);
337         }
338
339         /* free general rules */
340         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
341 }
342
343 static void mgs_unlink_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
344 {
345         mutex_lock(&mgs->mgs_mutex);
346         if (likely(!list_empty(&fsdb->fsdb_list))) {
347                 LASSERTF(atomic_read(&fsdb->fsdb_ref) >= 2,
348                          "Invalid ref %d on %s\n",
349                          atomic_read(&fsdb->fsdb_ref),
350                          fsdb->fsdb_name);
351
352                 list_del_init(&fsdb->fsdb_list);
353                 /* Drop the reference on the list.*/
354                 mgs_put_fsdb(mgs, fsdb);
355         }
356         mutex_unlock(&mgs->mgs_mutex);
357 }
358
359 /* The caller must hold mgs->mgs_mutex. */
360 static inline struct fs_db *
361 mgs_find_fsdb_noref(struct mgs_device *mgs, const char *fsname)
362 {
363         struct fs_db *fsdb;
364         struct list_head *tmp;
365
366         list_for_each(tmp, &mgs->mgs_fs_db_list) {
367                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
368                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
369                         return fsdb;
370         }
371
372         return NULL;
373 }
374
375 /* The caller must hold mgs->mgs_mutex. */
376 static void mgs_remove_fsdb_by_name(struct mgs_device *mgs, const char *name)
377 {
378         struct fs_db *fsdb;
379
380         fsdb = mgs_find_fsdb_noref(mgs, name);
381         if (fsdb) {
382                 list_del_init(&fsdb->fsdb_list);
383                 /* Drop the reference on the list.*/
384                 mgs_put_fsdb(mgs, fsdb);
385         }
386 }
387
388 /* The caller must hold mgs->mgs_mutex. */
389 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, const char *fsname)
390 {
391         struct fs_db *fsdb;
392
393         fsdb = mgs_find_fsdb_noref(mgs, fsname);
394         if (fsdb)
395                 atomic_inc(&fsdb->fsdb_ref);
396
397         return fsdb;
398 }
399
400 /* The caller must hold mgs->mgs_mutex. */
401 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
402                                   struct mgs_device *mgs, char *fsname)
403 {
404         struct fs_db *fsdb;
405         int rc;
406         ENTRY;
407
408         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
409                 CERROR("fsname %s is too long\n", fsname);
410
411                 RETURN(ERR_PTR(-EINVAL));
412         }
413
414         OBD_ALLOC_PTR(fsdb);
415         if (!fsdb)
416                 RETURN(ERR_PTR(-ENOMEM));
417
418         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
419         mutex_init(&fsdb->fsdb_mutex);
420         INIT_LIST_HEAD(&fsdb->fsdb_list);
421         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
422         fsdb->fsdb_gen = 1;
423         INIT_LIST_HEAD(&fsdb->fsdb_clients);
424         atomic_set(&fsdb->fsdb_notify_phase, 0);
425         init_waitqueue_head(&fsdb->fsdb_notify_waitq);
426         init_completion(&fsdb->fsdb_notify_comp);
427
428         if (strcmp(fsname, MGSSELF_NAME) == 0) {
429                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
430                 fsdb->fsdb_mgs = mgs;
431                 if (logname_is_barrier(fsname))
432                         goto add;
433         } else {
434                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
435                 if (!fsdb->fsdb_mdt_index_map) {
436                         CERROR("No memory for MDT index maps\n");
437
438                         GOTO(err, rc = -ENOMEM);
439                 }
440
441                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
442                 if (!fsdb->fsdb_ost_index_map) {
443                         CERROR("No memory for OST index maps\n");
444
445                         GOTO(err, rc = -ENOMEM);
446                 }
447
448                 if (logname_is_barrier(fsname))
449                         goto add;
450
451                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
452                 if (rc)
453                         GOTO(err, rc);
454
455                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
456                 if (rc)
457                         GOTO(err, rc);
458
459                 /* initialise data for NID table */
460                 mgs_ir_init_fs(env, mgs, fsdb);
461                 lproc_mgs_add_live(mgs, fsdb);
462         }
463
464         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
465             strcmp(PARAMS_FILENAME, fsname) != 0) {
466                 /* populate the db from the client llog */
467                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
468                 if (rc) {
469                         CERROR("Can't get db from client log %d\n", rc);
470
471                         GOTO(err, rc);
472                 }
473         }
474
475         /* populate srpc rules from params llog */
476         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
477         if (rc) {
478                 CERROR("Can't get db from params log %d\n", rc);
479
480                 GOTO(err, rc);
481         }
482
483 add:
484         /* One ref is for the fsdb on the list.
485          * The other ref is for the caller. */
486         atomic_set(&fsdb->fsdb_ref, 2);
487         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
488
489         RETURN(fsdb);
490
491 err:
492         atomic_set(&fsdb->fsdb_ref, 1);
493         mgs_put_fsdb(mgs, fsdb);
494
495         RETURN(ERR_PTR(rc));
496 }
497
498 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
499 {
500         LASSERT(list_empty(&fsdb->fsdb_list));
501
502         lproc_mgs_del_live(mgs, fsdb);
503
504         /* deinitialize fsr */
505         if (fsdb->fsdb_mgs)
506                 mgs_ir_fini_fs(mgs, fsdb);
507
508         if (fsdb->fsdb_ost_index_map)
509                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
510         if (fsdb->fsdb_mdt_index_map)
511                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
512         name_destroy(&fsdb->fsdb_clilov);
513         name_destroy(&fsdb->fsdb_clilmv);
514         mgs_free_fsdb_srpc(fsdb);
515         OBD_FREE_PTR(fsdb);
516 }
517
518 void mgs_put_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
519 {
520         if (atomic_dec_and_test(&fsdb->fsdb_ref))
521                 mgs_free_fsdb(mgs, fsdb);
522 }
523
524 int mgs_init_fsdb_list(struct mgs_device *mgs)
525 {
526         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
527         return 0;
528 }
529
530 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
531 {
532         struct fs_db *fsdb;
533         struct list_head *tmp, *tmp2;
534
535         mutex_lock(&mgs->mgs_mutex);
536         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
537                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
538                 list_del_init(&fsdb->fsdb_list);
539                 mgs_put_fsdb(mgs, fsdb);
540         }
541         mutex_unlock(&mgs->mgs_mutex);
542         return 0;
543 }
544
545 /* The caller must hold mgs->mgs_mutex. */
546 int mgs_find_or_make_fsdb_nolock(const struct lu_env *env,
547                                 struct mgs_device *mgs,
548                                 char *name, struct fs_db **dbh)
549 {
550         struct fs_db *fsdb;
551         int rc = 0;
552         ENTRY;
553
554         fsdb = mgs_find_fsdb(mgs, name);
555         if (!fsdb) {
556                 fsdb = mgs_new_fsdb(env, mgs, name);
557                 if (IS_ERR(fsdb))
558                         rc = PTR_ERR(fsdb);
559
560                 CDEBUG(D_MGS, "Created new db: rc = %d\n", rc);
561         }
562
563         if (!rc)
564                 *dbh = fsdb;
565
566         RETURN(rc);
567 }
568
569 int mgs_find_or_make_fsdb(const struct lu_env *env, struct mgs_device *mgs,
570                           char *name, struct fs_db **dbh)
571 {
572         int rc;
573         ENTRY;
574
575         mutex_lock(&mgs->mgs_mutex);
576         rc = mgs_find_or_make_fsdb_nolock(env, mgs, name, dbh);
577         mutex_unlock(&mgs->mgs_mutex);
578
579         RETURN(rc);
580 }
581
582 /* 1 = index in use
583    0 = index unused
584    -1= empty client log */
585 int mgs_check_index(const struct lu_env *env,
586                     struct mgs_device *mgs,
587                     struct mgs_target_info *mti)
588 {
589         struct fs_db *fsdb;
590         void *imap;
591         int rc = 0;
592         ENTRY;
593
594         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
595
596         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
597         if (rc) {
598                 CERROR("Can't get db for %s\n", mti->mti_fsname);
599                 RETURN(rc);
600         }
601
602         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
603                 GOTO(out, rc = -1);
604
605         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
606                 imap = fsdb->fsdb_ost_index_map;
607         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
608                 imap = fsdb->fsdb_mdt_index_map;
609         else
610                 GOTO(out, rc = -EINVAL);
611
612         if (test_bit(mti->mti_stripe_index, imap))
613                 GOTO(out, rc = 1);
614
615         GOTO(out, rc = 0);
616
617 out:
618         mgs_put_fsdb(mgs, fsdb);
619         return rc;
620 }
621
622 static __inline__ int next_index(void *index_map, int map_len)
623 {
624         int i;
625         for (i = 0; i < map_len * 8; i++)
626                 if (!test_bit(i, index_map)) {
627                          return i;
628                  }
629         CERROR("max index %d exceeded.\n", i);
630         return -1;
631 }
632
633 /* Make the mdt/ost server obd name based on the filesystem name */
634 static bool server_make_name(u32 flags, u16 index, const char *fs,
635                              char *name_buf, size_t name_buf_size)
636 {
637         bool invalid_flag = false;
638
639         if (flags & (LDD_F_SV_TYPE_MDT | LDD_F_SV_TYPE_OST)) {
640                 if (!(flags & LDD_F_SV_ALL))
641                         snprintf(name_buf, name_buf_size, "%.8s%c%s%04x", fs,
642                                 (flags & LDD_F_VIRGIN) ? ':' :
643                                         ((flags & LDD_F_WRITECONF) ? '=' : '-'),
644                                 (flags & LDD_F_SV_TYPE_MDT) ? "MDT" : "OST",
645                                 index);
646         } else if (flags & LDD_F_SV_TYPE_MGS) {
647                 snprintf(name_buf, name_buf_size, "MGS");
648         } else {
649                 CERROR("unknown server type %#x\n", flags);
650                 invalid_flag = true;
651         }
652         return invalid_flag;
653 }
654
655 /* Return codes:
656         0  newly marked as in use
657         <0 err
658         +EALREADY for update of an old index */
659 static int mgs_set_index(const struct lu_env *env,
660                          struct mgs_device *mgs,
661                          struct mgs_target_info *mti)
662 {
663         struct fs_db *fsdb;
664         void *imap;
665         int rc = 0;
666         ENTRY;
667
668         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
669         if (rc) {
670                 CERROR("Can't get db for %s\n", mti->mti_fsname);
671                 RETURN(rc);
672         }
673
674         mutex_lock(&fsdb->fsdb_mutex);
675         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
676                 imap = fsdb->fsdb_ost_index_map;
677         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
678                 imap = fsdb->fsdb_mdt_index_map;
679         } else {
680                 GOTO(out_up, rc = -EINVAL);
681         }
682
683         if (mti->mti_flags & LDD_F_NEED_INDEX) {
684                 rc = next_index(imap, INDEX_MAP_SIZE);
685                 if (rc == -1)
686                         GOTO(out_up, rc = -ERANGE);
687                 mti->mti_stripe_index = rc;
688         }
689
690         /* the last index(0xffff) is reserved for default value. */
691         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
692                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
693                                    "but index must be less than %u.\n",
694                                    mti->mti_svname, mti->mti_stripe_index,
695                                    INDEX_MAP_SIZE * 8 - 1);
696                 GOTO(out_up, rc = -ERANGE);
697         }
698
699         if (test_bit(mti->mti_stripe_index, imap)) {
700                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
701                     !(mti->mti_flags & LDD_F_WRITECONF)) {
702                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
703                                            "%d, but that index is already in "
704                                            "use. Use --writeconf to force\n",
705                                            mti->mti_svname,
706                                            mti->mti_stripe_index);
707                         GOTO(out_up, rc = -EADDRINUSE);
708                 } else {
709                         CDEBUG(D_MGS, "Server %s updating index %d\n",
710                                mti->mti_svname, mti->mti_stripe_index);
711                         GOTO(out_up, rc = EALREADY);
712                 }
713         } else {
714                 set_bit(mti->mti_stripe_index, imap);
715                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
716                         fsdb->fsdb_mdt_count++;
717         }
718
719         set_bit(mti->mti_stripe_index, imap);
720         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
721         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
722                              mti->mti_stripe_index, mti->mti_fsname,
723                              mti->mti_svname, sizeof(mti->mti_svname))) {
724                 CERROR("unknown server type %#x\n", mti->mti_flags);
725                 GOTO(out_up, rc = -EINVAL);
726         }
727
728         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
729                mti->mti_stripe_index);
730
731         GOTO(out_up, rc = 0);
732
733 out_up:
734         mutex_unlock(&fsdb->fsdb_mutex);
735         mgs_put_fsdb(mgs, fsdb);
736         return rc;
737 }
738
739 struct mgs_modify_lookup {
740         struct cfg_marker mml_marker;
741         int               mml_modified;
742 };
743
744 static int mgs_check_record_match(const struct lu_env *env,
745                                 struct llog_handle *llh,
746                                 struct llog_rec_hdr *rec, void *data)
747 {
748         struct cfg_marker *mc_marker = data;
749         struct cfg_marker *marker;
750         struct lustre_cfg *lcfg = REC_DATA(rec);
751         int cfg_len = REC_DATA_LEN(rec);
752         int rc;
753         ENTRY;
754
755
756         if (rec->lrh_type != OBD_CFG_REC) {
757                 CDEBUG(D_ERROR, "Unhandled lrh_type: %#x\n", rec->lrh_type);
758                 RETURN(-EINVAL);
759         }
760
761         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
762         if (rc) {
763                 CDEBUG(D_ERROR, "Insane cfg\n");
764                 RETURN(rc);
765         }
766
767         /* We only care about markers */
768         if (lcfg->lcfg_command != LCFG_MARKER)
769                 RETURN(0);
770
771         marker = lustre_cfg_buf(lcfg, 1);
772
773         if (marker->cm_flags & CM_SKIP)
774                 RETURN(0);
775
776         if ((strcmp(mc_marker->cm_comment, marker->cm_comment) == 0) &&
777                 (strcmp(mc_marker->cm_tgtname, marker->cm_tgtname) == 0)) {
778                 /* Found a non-skipped marker match */
779                 CDEBUG(D_MGS, "Matched rec %u marker %d flag %x %s %s\n",
780                         rec->lrh_index, marker->cm_step,
781                         marker->cm_flags, marker->cm_tgtname,
782                         marker->cm_comment);
783                 rc = LLOG_PROC_BREAK;
784         }
785
786         RETURN(rc);
787 }
788
789 /**
790  * Check an existing config log record with matching comment and device
791  * Return code:
792  * 0 - checked successfully,
793  * LLOG_PROC_BREAK - record matches
794  * negative - error
795  */
796 static int mgs_check_marker(const struct lu_env *env, struct mgs_device *mgs,
797                 struct fs_db *fsdb, struct mgs_target_info *mti,
798                 char *logname, char *devname, char *comment)
799 {
800         struct llog_handle *loghandle;
801         struct llog_ctxt *ctxt;
802         struct cfg_marker *mc_marker;
803         int rc;
804
805         ENTRY;
806
807         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
808         CDEBUG(D_MGS, "mgs check %s/%s/%s\n", logname, devname, comment);
809
810         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
811         LASSERT(ctxt != NULL);
812         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
813         if (rc < 0) {
814                 if (rc == -ENOENT)
815                         rc = 0;
816                 GOTO(out_pop, rc);
817         }
818
819         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
820         if (rc)
821                 GOTO(out_close, rc);
822
823         if (llog_get_size(loghandle) <= 1)
824                 GOTO(out_close, rc = 0);
825
826         OBD_ALLOC_PTR(mc_marker);
827         if (!mc_marker)
828                 GOTO(out_close, rc = -ENOMEM);
829         if (strlcpy(mc_marker->cm_comment, comment,
830                 sizeof(mc_marker->cm_comment)) >=
831                 sizeof(mc_marker->cm_comment))
832                 GOTO(out_free, rc = -E2BIG);
833         if (strlcpy(mc_marker->cm_tgtname, devname,
834                 sizeof(mc_marker->cm_tgtname)) >=
835                 sizeof(mc_marker->cm_tgtname))
836                 GOTO(out_free, rc = -E2BIG);
837
838         rc = llog_process(env, loghandle, mgs_check_record_match,
839                         (void *)mc_marker, NULL);
840
841 out_free:
842         OBD_FREE_PTR(mc_marker);
843
844 out_close:
845         llog_close(env, loghandle);
846 out_pop:
847         if (rc && rc != LLOG_PROC_BREAK)
848                 CDEBUG(D_ERROR, "%s: mgs check %s/%s failed: rc = %d\n",
849                         mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
850         llog_ctxt_put(ctxt);
851         RETURN(rc);
852 }
853
854 static int mgs_modify_handler(const struct lu_env *env,
855                               struct llog_handle *llh,
856                               struct llog_rec_hdr *rec, void *data)
857 {
858         struct mgs_modify_lookup *mml = data;
859         struct cfg_marker *marker;
860         struct lustre_cfg *lcfg = REC_DATA(rec);
861         int cfg_len = REC_DATA_LEN(rec);
862         int rc;
863         ENTRY;
864
865         if (rec->lrh_type != OBD_CFG_REC) {
866                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
867                 RETURN(-EINVAL);
868         }
869
870         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
871         if (rc) {
872                 CERROR("Insane cfg\n");
873                 RETURN(rc);
874         }
875
876         /* We only care about markers */
877         if (lcfg->lcfg_command != LCFG_MARKER)
878                 RETURN(0);
879
880         marker = lustre_cfg_buf(lcfg, 1);
881         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
882             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
883             !(marker->cm_flags & CM_SKIP)) {
884                 /* Found a non-skipped marker match */
885                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
886                        rec->lrh_index, marker->cm_step,
887                        marker->cm_flags, mml->mml_marker.cm_flags,
888                        marker->cm_tgtname, marker->cm_comment);
889                 /* Overwrite the old marker llog entry */
890                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
891                 marker->cm_flags |= mml->mml_marker.cm_flags;
892                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
893                 rc = llog_write(env, llh, rec, rec->lrh_index);
894                 if (!rc)
895                          mml->mml_modified++;
896         }
897
898         RETURN(rc);
899 }
900
901 /**
902  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
903  * Return code:
904  * 0 - modified successfully,
905  * 1 - no modification was done
906  * negative - error
907  */
908 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
909                       struct fs_db *fsdb, struct mgs_target_info *mti,
910                       char *logname, char *devname, char *comment, int flags)
911 {
912         struct llog_handle *loghandle;
913         struct llog_ctxt *ctxt;
914         struct mgs_modify_lookup *mml;
915         int rc;
916
917         ENTRY;
918
919         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
920         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
921                flags);
922
923         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
924         LASSERT(ctxt != NULL);
925         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
926         if (rc < 0) {
927                 if (rc == -ENOENT)
928                         rc = 0;
929                 GOTO(out_pop, rc);
930         }
931
932         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
933         if (rc)
934                 GOTO(out_close, rc);
935
936         if (llog_get_size(loghandle) <= 1)
937                 GOTO(out_close, rc = 0);
938
939         OBD_ALLOC_PTR(mml);
940         if (!mml)
941                 GOTO(out_close, rc = -ENOMEM);
942         if (strlcpy(mml->mml_marker.cm_comment, comment,
943                     sizeof(mml->mml_marker.cm_comment)) >=
944             sizeof(mml->mml_marker.cm_comment))
945                 GOTO(out_free, rc = -E2BIG);
946         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
947                     sizeof(mml->mml_marker.cm_tgtname)) >=
948             sizeof(mml->mml_marker.cm_tgtname))
949                 GOTO(out_free, rc = -E2BIG);
950         /* Modify mostly means cancel */
951         mml->mml_marker.cm_flags = flags;
952         mml->mml_marker.cm_canceltime = flags ? ktime_get_real_seconds() : 0;
953         mml->mml_modified = 0;
954         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
955                           NULL);
956         if (!rc && !mml->mml_modified)
957                 rc = 1;
958
959 out_free:
960         OBD_FREE_PTR(mml);
961
962 out_close:
963         llog_close(env, loghandle);
964 out_pop:
965         if (rc < 0)
966                 CERROR("%s: modify %s/%s failed: rc = %d\n",
967                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
968         llog_ctxt_put(ctxt);
969         RETURN(rc);
970 }
971
972 enum replace_state {
973         REPLACE_COPY = 0,
974         REPLACE_SKIP,
975         REPLACE_DONE,
976         REPLACE_UUID,
977         REPLACE_SETUP
978 };
979
980 /** This structure is passed to mgs_replace_handler */
981 struct mgs_replace_data {
982         /* Nids are replaced for this target device */
983         struct mgs_target_info target;
984         /* Temporary modified llog */
985         struct llog_handle *temp_llh;
986         enum replace_state state;
987         char *failover;
988         char *nodeuuid;
989 };
990
991 /**
992  * Check: a) if block should be skipped
993  * b) is it target block
994  *
995  * \param[in] lcfg
996  * \param[in] mrd
997  *
998  * \retval 0 should not to be skipped
999  * \retval 1 should to be skipped
1000  */
1001 static int check_markers(struct lustre_cfg *lcfg,
1002                          struct mgs_replace_data *mrd)
1003 {
1004          struct cfg_marker *marker;
1005
1006         /* Track markers. Find given device */
1007         if (lcfg->lcfg_command == LCFG_MARKER) {
1008                 marker = lustre_cfg_buf(lcfg, 1);
1009                 /* Clean llog from records marked as CM_SKIP.
1010                    CM_EXCLUDE records are used for "active" command
1011                    and can be restored if needed */
1012                 if ((marker->cm_flags & (CM_SKIP | CM_START)) ==
1013                     (CM_SKIP | CM_START)) {
1014                         mrd->state = REPLACE_SKIP;
1015                         return 1;
1016                 }
1017
1018                 if ((marker->cm_flags & (CM_SKIP | CM_END)) ==
1019                     (CM_SKIP | CM_END)) {
1020                         mrd->state = REPLACE_COPY;
1021                         return 1;
1022                 }
1023
1024                 if (strcmp(mrd->target.mti_svname, marker->cm_tgtname) == 0) {
1025                         LASSERT(!(marker->cm_flags & CM_START) ||
1026                                 !(marker->cm_flags & CM_END));
1027                         if (marker->cm_flags & CM_START) {
1028                                 mrd->state = REPLACE_UUID;
1029                                 mrd->failover = NULL;
1030                         } else if (marker->cm_flags & CM_END)
1031                                 mrd->state = REPLACE_COPY;
1032                 }
1033         }
1034
1035         return 0;
1036 }
1037
1038 static int record_base(const struct lu_env *env, struct llog_handle *llh,
1039                      char *cfgname, lnet_nid_t nid, int cmd,
1040                      char *s1, char *s2, char *s3, char *s4)
1041 {
1042         struct mgs_thread_info  *mgi = mgs_env_info(env);
1043         struct llog_cfg_rec     *lcr;
1044         int rc;
1045
1046         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
1047                cmd, s1, s2, s3, s4);
1048
1049         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
1050         if (s1)
1051                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
1052         if (s2)
1053                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
1054         if (s3)
1055                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
1056         if (s4)
1057                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
1058
1059         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
1060         if (lcr == NULL)
1061                 return -ENOMEM;
1062
1063         lcr->lcr_cfg.lcfg_nid = nid;
1064         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1065
1066         lustre_cfg_rec_free(lcr);
1067
1068         if (rc < 0)
1069                 CDEBUG(D_MGS,
1070                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
1071                        cfgname, cmd, s1, s2, s3, s4, rc);
1072         return rc;
1073 }
1074
1075 static inline int record_add_uuid(const struct lu_env *env,
1076                                   struct llog_handle *llh,
1077                                   uint64_t nid, char *uuid)
1078 {
1079         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
1080                            NULL, NULL, NULL);
1081 }
1082
1083 static inline int record_add_conn(const struct lu_env *env,
1084                                   struct llog_handle *llh,
1085                                   char *devname, char *uuid)
1086 {
1087         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
1088                            NULL, NULL, NULL);
1089 }
1090
1091 static inline int record_attach(const struct lu_env *env,
1092                                 struct llog_handle *llh, char *devname,
1093                                 char *type, char *uuid)
1094 {
1095         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
1096                            NULL, NULL);
1097 }
1098
1099 static inline int record_setup(const struct lu_env *env,
1100                                struct llog_handle *llh, char *devname,
1101                                char *s1, char *s2, char *s3, char *s4)
1102 {
1103         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
1104 }
1105
1106 /**
1107  * \retval <0 record processing error
1108  * \retval n record is processed. No need copy original one.
1109  * \retval 0 record is not processed.
1110  */
1111 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
1112                            struct mgs_replace_data *mrd)
1113 {
1114         int nids_added = 0;
1115         lnet_nid_t nid;
1116         char *ptr;
1117         int rc = 0;
1118
1119         if (mrd->state == REPLACE_UUID &&
1120             lcfg->lcfg_command == LCFG_ADD_UUID) {
1121                 /* LCFG_ADD_UUID command found. Let's skip original command
1122                    and add passed nids */
1123                 ptr = mrd->target.mti_params;
1124                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1125                         if (!mrd->nodeuuid) {
1126                                 rc = name_create(&mrd->nodeuuid,
1127                                                  libcfs_nid2str(nid), "");
1128                                 if (rc) {
1129                                         CERROR("Can't create uuid for "
1130                                                 "nid  %s, device %s\n",
1131                                                 libcfs_nid2str(nid),
1132                                                 mrd->target.mti_svname);
1133                                         return rc;
1134                                 }
1135                         }
1136                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
1137                                "device %s\n", libcfs_nid2str(nid),
1138                                 mrd->target.mti_params,
1139                                 mrd->nodeuuid);
1140                         rc = record_add_uuid(env,
1141                                              mrd->temp_llh, nid,
1142                                              mrd->nodeuuid);
1143                         if (!rc)
1144                                 nids_added++;
1145
1146                         if (*ptr == ':') {
1147                                 mrd->failover = ptr;
1148                                 break;
1149                         }
1150                 }
1151
1152                 if (nids_added == 0) {
1153                         CERROR("No new nids were added, nid %s with uuid %s, "
1154                                "device %s\n", libcfs_nid2str(nid),
1155                                mrd->nodeuuid ? mrd->nodeuuid : "NULL",
1156                                mrd->target.mti_svname);
1157                         name_destroy(&mrd->nodeuuid);
1158                         return -ENXIO;
1159                 } else {
1160                         mrd->state = REPLACE_SETUP;
1161                 }
1162
1163                 return nids_added;
1164         }
1165
1166         if (mrd->state == REPLACE_SETUP && lcfg->lcfg_command == LCFG_SETUP) {
1167                 /* LCFG_SETUP command found. UUID should be changed */
1168                 rc = record_setup(env,
1169                                   mrd->temp_llh,
1170                                   /* devname the same */
1171                                   lustre_cfg_string(lcfg, 0),
1172                                   /* s1 is not changed */
1173                                   lustre_cfg_string(lcfg, 1),
1174                                   mrd->nodeuuid,
1175                                   /* s3 is not changed */
1176                                   lustre_cfg_string(lcfg, 3),
1177                                   /* s4 is not changed */
1178                                   lustre_cfg_string(lcfg, 4));
1179
1180                 name_destroy(&mrd->nodeuuid);
1181                 if (rc)
1182                         return rc;
1183
1184                 if (mrd->failover) {
1185                         ptr = mrd->failover;
1186                         while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1187                                 if (mrd->nodeuuid == NULL) {
1188                                         rc =  name_create(&mrd->nodeuuid,
1189                                                           libcfs_nid2str(nid),
1190                                                           "");
1191                                         if (rc)
1192                                                 return rc;
1193                                 }
1194
1195                                 CDEBUG(D_MGS, "add nid %s for failover %s\n",
1196                                        libcfs_nid2str(nid), mrd->nodeuuid);
1197                                 rc = record_add_uuid(env, mrd->temp_llh, nid,
1198                                                      mrd->nodeuuid);
1199                                 if (rc) {
1200                                         name_destroy(&mrd->nodeuuid);
1201                                         return rc;
1202                                 }
1203                                 if (*ptr == ':') {
1204                                         rc = record_add_conn(env,
1205                                                 mrd->temp_llh,
1206                                                 lustre_cfg_string(lcfg, 0),
1207                                                 mrd->nodeuuid);
1208                                         name_destroy(&mrd->nodeuuid);
1209                                         if (rc)
1210                                                 return rc;
1211                                 }
1212                         }
1213                         if (mrd->nodeuuid) {
1214                                 rc = record_add_conn(env, mrd->temp_llh,
1215                                                      lustre_cfg_string(lcfg, 0),
1216                                                      mrd->nodeuuid);
1217                                 name_destroy(&mrd->nodeuuid);
1218                                 if (rc)
1219                                         return rc;
1220                         }
1221                 }
1222                 mrd->state = REPLACE_DONE;
1223                 return rc ? rc : 1;
1224         }
1225
1226         /* Another commands in target device block */
1227         return 0;
1228 }
1229
1230 /**
1231  * Handler that called for every record in llog.
1232  * Records are processed in order they placed in llog.
1233  *
1234  * \param[in] llh       log to be processed
1235  * \param[in] rec       current record
1236  * \param[in] data      mgs_replace_data structure
1237  *
1238  * \retval 0    success
1239  */
1240 static int mgs_replace_nids_handler(const struct lu_env *env,
1241                                     struct llog_handle *llh,
1242                                     struct llog_rec_hdr *rec,
1243                                     void *data)
1244 {
1245         struct mgs_replace_data *mrd;
1246         struct lustre_cfg *lcfg = REC_DATA(rec);
1247         int cfg_len = REC_DATA_LEN(rec);
1248         int rc;
1249         ENTRY;
1250
1251         mrd = (struct mgs_replace_data *)data;
1252
1253         if (rec->lrh_type != OBD_CFG_REC) {
1254                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
1255                        rec->lrh_type, lcfg->lcfg_command,
1256                        lustre_cfg_string(lcfg, 0),
1257                        lustre_cfg_string(lcfg, 1));
1258                 RETURN(-EINVAL);
1259         }
1260
1261         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1262         if (rc) {
1263                 /* Do not copy any invalidated records */
1264                 GOTO(skip_out, rc = 0);
1265         }
1266
1267         rc = check_markers(lcfg, mrd);
1268         if (rc || mrd->state == REPLACE_SKIP)
1269                 GOTO(skip_out, rc = 0);
1270
1271         /* Write to new log all commands outside target device block */
1272         if (mrd->state == REPLACE_COPY)
1273                 GOTO(copy_out, rc = 0);
1274
1275         if (mrd->state == REPLACE_DONE &&
1276             (lcfg->lcfg_command == LCFG_ADD_UUID ||
1277              lcfg->lcfg_command == LCFG_ADD_CONN)) {
1278                 if (!mrd->failover)
1279                         CWARN("Previous failover is deleted, but new one is "
1280                               "not set. This means you configure system "
1281                               "without failover or passed wrong replace_nids "
1282                               "command parameters. Device %s, passed nids %s\n",
1283                               mrd->target.mti_svname, mrd->target.mti_params);
1284                 GOTO(skip_out, rc = 0);
1285         }
1286
1287         rc = process_command(env, lcfg, mrd);
1288         if (rc < 0)
1289                 RETURN(rc);
1290
1291         if (rc)
1292                 RETURN(0);
1293 copy_out:
1294         /* Record is placed in temporary llog as is */
1295         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1296
1297         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1298                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1299                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1300         RETURN(rc);
1301
1302 skip_out:
1303         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1304                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1305                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1306         RETURN(rc);
1307 }
1308
1309 static int mgs_log_is_empty(const struct lu_env *env,
1310                             struct mgs_device *mgs, char *name)
1311 {
1312         struct llog_ctxt        *ctxt;
1313         int                      rc;
1314
1315         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1316         LASSERT(ctxt != NULL);
1317
1318         rc = llog_is_empty(env, ctxt, name);
1319         llog_ctxt_put(ctxt);
1320         return rc;
1321 }
1322
1323 static int mgs_replace_log(const struct lu_env *env,
1324                            struct obd_device *mgs,
1325                            char *logname, char *devname,
1326                            llog_cb_t replace_handler, void *data)
1327 {
1328         struct llog_handle *orig_llh, *backup_llh;
1329         struct llog_ctxt *ctxt;
1330         struct mgs_replace_data *mrd;
1331         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1332         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1333         char *backup;
1334         int rc, rc2, buf_size;
1335         time64_t now;
1336         ENTRY;
1337
1338         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1339         LASSERT(ctxt != NULL);
1340
1341         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1342                 /* Log is empty. Nothing to replace */
1343                 GOTO(out_put, rc = 0);
1344         }
1345
1346         now = ktime_get_real_seconds();
1347
1348         /* max time64_t in decimal fits into 20 bytes long string */
1349         buf_size = strlen(logname) + 1 + 20 + 1 + strlen(".bak") + 1;
1350         OBD_ALLOC(backup, buf_size);
1351         if (backup == NULL)
1352                 GOTO(out_put, rc = -ENOMEM);
1353
1354         snprintf(backup, buf_size, "%s.%llu.bak", logname, now);
1355
1356         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1357         if (rc == 0) {
1358                 /* Now erase original log file. Connections are not allowed.
1359                    Backup is already saved */
1360                 rc = llog_erase(env, ctxt, NULL, logname);
1361                 if (rc < 0)
1362                         GOTO(out_free, rc);
1363         } else if (rc != -ENOENT) {
1364                 CERROR("%s: can't make backup for %s: rc = %d\n",
1365                        mgs->obd_name, logname, rc);
1366                 GOTO(out_free,rc);
1367         }
1368
1369         /* open local log */
1370         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1371         if (rc)
1372                 GOTO(out_restore, rc);
1373
1374         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1375         if (rc)
1376                 GOTO(out_closel, rc);
1377
1378         /* open backup llog */
1379         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1380                        LLOG_OPEN_EXISTS);
1381         if (rc)
1382                 GOTO(out_closel, rc);
1383
1384         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1385         if (rc)
1386                 GOTO(out_close, rc);
1387
1388         if (llog_get_size(backup_llh) <= 1)
1389                 GOTO(out_close, rc = 0);
1390
1391         OBD_ALLOC_PTR(mrd);
1392         if (!mrd)
1393                 GOTO(out_close, rc = -ENOMEM);
1394         /* devname is only needed information to replace UUID records */
1395         if (devname)
1396                 strlcpy(mrd->target.mti_svname, devname,
1397                         sizeof(mrd->target.mti_svname));
1398         /* data is parsed in llog callback */
1399         if (data)
1400                 strlcpy(mrd->target.mti_params, data,
1401                         sizeof(mrd->target.mti_params));
1402         /* Copy records to this temporary llog */
1403         mrd->temp_llh = orig_llh;
1404
1405         rc = llog_process(env, backup_llh, replace_handler,
1406                           (void *)mrd, NULL);
1407         OBD_FREE_PTR(mrd);
1408 out_close:
1409         rc2 = llog_close(NULL, backup_llh);
1410         if (!rc)
1411                 rc = rc2;
1412 out_closel:
1413         rc2 = llog_close(NULL, orig_llh);
1414         if (!rc)
1415                 rc = rc2;
1416
1417 out_restore:
1418         if (rc) {
1419                 CERROR("%s: llog should be restored: rc = %d\n",
1420                        mgs->obd_name, rc);
1421                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1422                                   logname);
1423                 if (rc2 < 0)
1424                         CERROR("%s: can't restore backup %s: rc = %d\n",
1425                                mgs->obd_name, logname, rc2);
1426         }
1427
1428 out_free:
1429         OBD_FREE(backup, buf_size);
1430
1431 out_put:
1432         llog_ctxt_put(ctxt);
1433
1434         if (rc)
1435                 CERROR("%s: failed to replace log %s: rc = %d\n",
1436                        mgs->obd_name, logname, rc);
1437
1438         RETURN(rc);
1439 }
1440
1441 static int mgs_replace_nids_log(const struct lu_env *env,
1442                                 struct obd_device *obd,
1443                                 char *logname, char *devname, char *nids)
1444 {
1445         CDEBUG(D_MGS, "Replace NIDs for %s in %s\n", devname, logname);
1446         return mgs_replace_log(env, obd, logname, devname,
1447                                mgs_replace_nids_handler, nids);
1448 }
1449
1450 /**
1451  * Parse device name and get file system name and/or device index
1452  *
1453  * @devname     device name (ex. lustre-MDT0000)
1454  * @fsname      file system name extracted from @devname and returned
1455  *              to the caller (optional)
1456  * @index       device index extracted from @devname and returned to
1457  *              the caller (optional)
1458  *
1459  * RETURN       0                       success if we are only interested in
1460  *                                      extracting fsname from devname.
1461  *                                      i.e index is NULL
1462  *
1463  *              LDD_F_SV_TYPE_*         Besides extracting the fsname the
1464  *                                      user also wants the index. Report to
1465  *                                      the user the type of obd device the
1466  *                                      returned index belongs too.
1467  *
1468  *              -EINVAL                 The obd device name is improper so
1469  *                                      fsname could not be extracted.
1470  *
1471  *              -ENXIO                  Failed to extract the index out of
1472  *                                      the obd device name. Most likely an
1473  *                                      invalid obd device name
1474  */
1475 static int mgs_parse_devname(char *devname, char *fsname, u32 *index)
1476 {
1477         int rc = 0;
1478         ENTRY;
1479
1480         /* Extract fsname */
1481         if (fsname) {
1482                 rc = server_name2fsname(devname, fsname, NULL);
1483                 if (rc < 0) {
1484                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1485                                devname);
1486                         RETURN(-EINVAL);
1487                 }
1488         }
1489
1490         if (index) {
1491                 rc = server_name2index(devname, index, NULL);
1492                 if (rc < 0) {
1493                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1494                                devname);
1495                         RETURN(-ENXIO);
1496                 }
1497         }
1498
1499         /* server_name2index can return LDD_F_SV_TYPE_* so always return rc */
1500         RETURN(rc);
1501 }
1502
1503 /* This is only called during replace_nids */
1504 static int only_mgs_is_running(struct obd_device *mgs_obd)
1505 {
1506         /* TDB: Is global variable with devices count exists? */
1507         int num_devices = get_devices_count();
1508         int num_exports = 0;
1509         struct obd_export *exp;
1510
1511         spin_lock(&mgs_obd->obd_dev_lock);
1512         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1513                 /* skip self export */
1514                 if (exp == mgs_obd->obd_self_export)
1515                         continue;
1516                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1517                         continue;
1518
1519                 ++num_exports;
1520
1521                 CERROR("%s: node %s still connected during replace_nids "
1522                        "connect_flags:%llx\n",
1523                        mgs_obd->obd_name,
1524                        libcfs_nid2str(exp->exp_nid_stats->nid),
1525                        exp_connect_flags(exp));
1526
1527         }
1528         spin_unlock(&mgs_obd->obd_dev_lock);
1529
1530         /* osd, MGS and MGC + self_export
1531            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1532         return (num_devices <= 3) && (num_exports == 0);
1533 }
1534
1535 static int name_create_mdt(char **logname, char *fsname, int i)
1536 {
1537         char mdt_index[9];
1538
1539         sprintf(mdt_index, "-MDT%04x", i);
1540         return name_create(logname, fsname, mdt_index);
1541 }
1542
1543 /**
1544  * Replace nids for \a device to \a nids values
1545  *
1546  * \param obd           MGS obd device
1547  * \param devname       nids need to be replaced for this device
1548  * (ex. lustre-OST0000)
1549  * \param nids          nids list (ex. nid1,nid2,nid3)
1550  *
1551  * \retval 0    success
1552  */
1553 int mgs_replace_nids(const struct lu_env *env,
1554                      struct mgs_device *mgs,
1555                      char *devname, char *nids)
1556 {
1557         /* Assume fsname is part of device name */
1558         char fsname[MTI_NAME_MAXLEN];
1559         int rc;
1560         __u32 index;
1561         char *logname;
1562         struct fs_db *fsdb = NULL;
1563         unsigned int i;
1564         int conn_state;
1565         struct obd_device *mgs_obd = mgs->mgs_obd;
1566         ENTRY;
1567
1568         /* We can only change NIDs if no other nodes are connected */
1569         spin_lock(&mgs_obd->obd_dev_lock);
1570         conn_state = mgs_obd->obd_no_conn;
1571         mgs_obd->obd_no_conn = 1;
1572         spin_unlock(&mgs_obd->obd_dev_lock);
1573
1574         /* We can not change nids if not only MGS is started */
1575         if (!only_mgs_is_running(mgs_obd)) {
1576                 CERROR("Only MGS is allowed to be started\n");
1577                 GOTO(out, rc = -EINPROGRESS);
1578         }
1579
1580         /* Get fsname and index */
1581         rc = mgs_parse_devname(devname, fsname, &index);
1582         if (rc < 0)
1583                 GOTO(out, rc);
1584
1585         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1586         if (rc) {
1587                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1588                 GOTO(out, rc);
1589         }
1590
1591         /* Process client llogs */
1592         name_create(&logname, fsname, "-client");
1593         rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1594         name_destroy(&logname);
1595         if (rc) {
1596                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1597                        fsname, devname, rc);
1598                 GOTO(out, rc);
1599         }
1600
1601         /* Process MDT llogs */
1602         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1603                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1604                         continue;
1605                 name_create_mdt(&logname, fsname, i);
1606                 rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1607                 name_destroy(&logname);
1608                 if (rc)
1609                         GOTO(out, rc);
1610         }
1611
1612 out:
1613         spin_lock(&mgs_obd->obd_dev_lock);
1614         mgs_obd->obd_no_conn = conn_state;
1615         spin_unlock(&mgs_obd->obd_dev_lock);
1616
1617         if (fsdb)
1618                 mgs_put_fsdb(mgs, fsdb);
1619
1620         RETURN(rc);
1621 }
1622
1623 /**
1624  * This is called for every record in llog. Some of records are
1625  * skipped, others are copied to new log as is.
1626  * Records to be skipped are
1627  *  marker records marked SKIP
1628  *  records enclosed between SKIP markers
1629  *
1630  * \param[in] llh       log to be processed
1631  * \param[in] rec       current record
1632  * \param[in] data      mgs_replace_data structure
1633  *
1634  * \retval 0    success
1635  **/
1636 static int mgs_clear_config_handler(const struct lu_env *env,
1637                                     struct llog_handle *llh,
1638                                     struct llog_rec_hdr *rec, void *data)
1639 {
1640         struct mgs_replace_data *mrd;
1641         struct lustre_cfg *lcfg = REC_DATA(rec);
1642         int cfg_len = REC_DATA_LEN(rec);
1643         int rc;
1644
1645         ENTRY;
1646
1647         mrd = (struct mgs_replace_data *)data;
1648
1649         if (rec->lrh_type != OBD_CFG_REC) {
1650                 CDEBUG(D_MGS, "Config llog Name=%s, Record Index=%u, "
1651                        "Unhandled Record Type=%#x\n", llh->lgh_name,
1652                        rec->lrh_index, rec->lrh_type);
1653                 RETURN(-EINVAL);
1654         }
1655
1656         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1657         if (rc) {
1658                 CDEBUG(D_MGS, "Config llog Name=%s, Invalid config file.",
1659                        llh->lgh_name);
1660                 RETURN(-EINVAL);
1661         }
1662
1663         if (lcfg->lcfg_command == LCFG_MARKER) {
1664                 struct cfg_marker *marker;
1665
1666                 marker = lustre_cfg_buf(lcfg, 1);
1667                 if (marker->cm_flags & CM_SKIP) {
1668                         if (marker->cm_flags & CM_START)
1669                                 mrd->state = REPLACE_SKIP;
1670                         if (marker->cm_flags & CM_END)
1671                                 mrd->state = REPLACE_COPY;
1672                         /* SKIP section started or finished */
1673                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1674                                "cmd %x %s %s\n", rec->lrh_index, rc,
1675                                rec->lrh_len, lcfg->lcfg_command,
1676                                lustre_cfg_string(lcfg, 0),
1677                                lustre_cfg_string(lcfg, 1));
1678                         RETURN(0);
1679                 }
1680         } else {
1681                 if (mrd->state == REPLACE_SKIP) {
1682                         /* record enclosed between SKIP markers, skip it */
1683                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1684                                "cmd %x %s %s\n", rec->lrh_index, rc,
1685                                rec->lrh_len, lcfg->lcfg_command,
1686                                lustre_cfg_string(lcfg, 0),
1687                                lustre_cfg_string(lcfg, 1));
1688                         RETURN(0);
1689                 }
1690         }
1691
1692         /* Record is placed in temporary llog as is */
1693         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1694
1695         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1696                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1697                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1698         RETURN(rc);
1699 }
1700
1701 /*
1702  * Directory CONFIGS/ may contain files which are not config logs to
1703  * be cleared. Skip any llogs with a non-alphanumeric character after
1704  * the last '-'. For example, fsname-MDT0000.sav, fsname-MDT0000.bak,
1705  * fsname-MDT0000.orig, fsname-MDT0000~, fsname-MDT0000.20150516, etc.
1706  */
1707 static bool config_to_clear(const char *logname)
1708 {
1709         int i;
1710         char *str;
1711
1712         str = strrchr(logname, '-');
1713         if (!str)
1714                 return 0;
1715
1716         i = 0;
1717         while (isalnum(str[++i]));
1718         return str[i] == '\0';
1719 }
1720
1721 /**
1722  * Clear config logs for \a name
1723  *
1724  * \param env
1725  * \param mgs           MGS device
1726  * \param name          name of device or of filesystem
1727  *                      (ex. lustre-OST0000 or lustre) in later case all logs
1728  *                      will be cleared
1729  *
1730  * \retval 0            success
1731  */
1732 int mgs_clear_configs(const struct lu_env *env,
1733                      struct mgs_device *mgs, const char *name)
1734 {
1735         struct list_head dentry_list;
1736         struct mgs_direntry *dirent, *n;
1737         char *namedash;
1738         int conn_state;
1739         struct obd_device *mgs_obd = mgs->mgs_obd;
1740         int rc;
1741
1742         ENTRY;
1743
1744         /* Prevent clients and servers from connecting to mgs */
1745         spin_lock(&mgs_obd->obd_dev_lock);
1746         conn_state = mgs_obd->obd_no_conn;
1747         mgs_obd->obd_no_conn = 1;
1748         spin_unlock(&mgs_obd->obd_dev_lock);
1749
1750         /*
1751          * config logs cannot be cleaned if anything other than
1752          * MGS is started
1753          */
1754         if (!only_mgs_is_running(mgs_obd)) {
1755                 CERROR("Only MGS is allowed to be started\n");
1756                 GOTO(out, rc = -EBUSY);
1757         }
1758
1759         /* Find all the logs in the CONFIGS directory */
1760         rc = class_dentry_readdir(env, mgs, &dentry_list);
1761         if (rc) {
1762                 CERROR("%s: cannot read config directory '%s': rc = %d\n",
1763                        mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
1764                 GOTO(out, rc);
1765         }
1766
1767         if (list_empty(&dentry_list)) {
1768                 CERROR("%s: list empty reading config dir '%s': rc = %d\n",
1769                         mgs_obd->obd_name, MOUNT_CONFIGS_DIR, -ENOENT);
1770                 GOTO(out, rc = -ENOENT);
1771         }
1772
1773         OBD_ALLOC(namedash, strlen(name) + 2);
1774         if (namedash == NULL)
1775                 GOTO(out, rc = -ENOMEM);
1776         snprintf(namedash, strlen(name) + 2, "%s-", name);
1777
1778         list_for_each_entry(dirent, &dentry_list, mde_list) {
1779                 if (strcmp(name, dirent->mde_name) &&
1780                     strncmp(namedash, dirent->mde_name, strlen(namedash)))
1781                         continue;
1782                 if (!config_to_clear(dirent->mde_name))
1783                         continue;
1784                 CDEBUG(D_MGS, "%s: Clear config log %s\n",
1785                        mgs_obd->obd_name, dirent->mde_name);
1786                 rc = mgs_replace_log(env, mgs_obd, dirent->mde_name, NULL,
1787                                      mgs_clear_config_handler, NULL);
1788                 if (rc)
1789                         break;
1790         }
1791
1792         list_for_each_entry_safe(dirent, n, &dentry_list, mde_list) {
1793                 list_del_init(&dirent->mde_list);
1794                 mgs_direntry_free(dirent);
1795         }
1796         OBD_FREE(namedash, strlen(name) + 2);
1797 out:
1798         spin_lock(&mgs_obd->obd_dev_lock);
1799         mgs_obd->obd_no_conn = conn_state;
1800         spin_unlock(&mgs_obd->obd_dev_lock);
1801
1802         RETURN(rc);
1803 }
1804
1805 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1806                             char *devname, struct lov_desc *desc)
1807 {
1808         struct mgs_thread_info  *mgi = mgs_env_info(env);
1809         struct llog_cfg_rec     *lcr;
1810         int rc;
1811
1812         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1813         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1814         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1815         if (lcr == NULL)
1816                 return -ENOMEM;
1817
1818         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1819         lustre_cfg_rec_free(lcr);
1820         return rc;
1821 }
1822
1823 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1824                             char *devname, struct lmv_desc *desc)
1825 {
1826         struct mgs_thread_info  *mgi = mgs_env_info(env);
1827         struct llog_cfg_rec     *lcr;
1828         int rc;
1829
1830         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1831         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1832         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1833         if (lcr == NULL)
1834                 return -ENOMEM;
1835
1836         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1837         lustre_cfg_rec_free(lcr);
1838         return rc;
1839 }
1840
1841 static inline int record_mdc_add(const struct lu_env *env,
1842                                  struct llog_handle *llh,
1843                                  char *logname, char *mdcuuid,
1844                                  char *mdtuuid, char *index,
1845                                  char *gen)
1846 {
1847         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1848                            mdtuuid,index,gen,mdcuuid);
1849 }
1850
1851 static inline int record_lov_add(const struct lu_env *env,
1852                                  struct llog_handle *llh,
1853                                  char *lov_name, char *ost_uuid,
1854                                  char *index, char *gen)
1855 {
1856         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1857                            ost_uuid, index, gen, NULL);
1858 }
1859
1860 static inline int record_mount_opt(const struct lu_env *env,
1861                                    struct llog_handle *llh,
1862                                    char *profile, char *lov_name,
1863                                    char *mdc_name)
1864 {
1865         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1866                            profile, lov_name, mdc_name, NULL);
1867 }
1868
1869 static int record_marker(const struct lu_env *env,
1870                          struct llog_handle *llh,
1871                          struct fs_db *fsdb, __u32 flags,
1872                          char *tgtname, char *comment)
1873 {
1874         struct mgs_thread_info *mgi = mgs_env_info(env);
1875         struct llog_cfg_rec *lcr;
1876         int rc;
1877         int cplen = 0;
1878
1879         if (flags & CM_START)
1880                 fsdb->fsdb_gen++;
1881         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1882         mgi->mgi_marker.cm_flags = flags;
1883         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1884         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1885                         sizeof(mgi->mgi_marker.cm_tgtname));
1886         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1887                 return -E2BIG;
1888         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1889                         sizeof(mgi->mgi_marker.cm_comment));
1890         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1891                 return -E2BIG;
1892         mgi->mgi_marker.cm_createtime = ktime_get_real_seconds();
1893         mgi->mgi_marker.cm_canceltime = 0;
1894         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1895         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1896                             sizeof(mgi->mgi_marker));
1897         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1898         if (lcr == NULL)
1899                 return -ENOMEM;
1900
1901         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1902         lustre_cfg_rec_free(lcr);
1903         return rc;
1904 }
1905
1906 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1907                             struct llog_handle **llh, char *name)
1908 {
1909         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1910         struct llog_ctxt        *ctxt;
1911         int                      rc = 0;
1912         ENTRY;
1913
1914         if (*llh)
1915                 GOTO(out, rc = -EBUSY);
1916
1917         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1918         if (!ctxt)
1919                 GOTO(out, rc = -ENODEV);
1920         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1921
1922         rc = llog_open_create(env, ctxt, llh, NULL, name);
1923         if (rc)
1924                 GOTO(out_ctxt, rc);
1925         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1926         if (rc)
1927                 llog_close(env, *llh);
1928 out_ctxt:
1929         llog_ctxt_put(ctxt);
1930 out:
1931         if (rc) {
1932                 CERROR("%s: can't start log %s: rc = %d\n",
1933                        mgs->mgs_obd->obd_name, name, rc);
1934                 *llh = NULL;
1935         }
1936         RETURN(rc);
1937 }
1938
1939 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1940 {
1941         int rc;
1942
1943         rc = llog_close(env, *llh);
1944         *llh = NULL;
1945
1946         return rc;
1947 }
1948
1949 /******************** config "macros" *********************/
1950
1951 /* write an lcfg directly into a log (with markers) */
1952 static int mgs_write_log_direct(const struct lu_env *env,
1953                                 struct mgs_device *mgs, struct fs_db *fsdb,
1954                                 char *logname, struct llog_cfg_rec *lcr,
1955                                 char *devname, char *comment)
1956 {
1957         struct llog_handle *llh = NULL;
1958         int rc;
1959
1960         ENTRY;
1961
1962         rc = record_start_log(env, mgs, &llh, logname);
1963         if (rc)
1964                 RETURN(rc);
1965
1966         /* FIXME These should be a single journal transaction */
1967         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1968         if (rc)
1969                 GOTO(out_end, rc);
1970         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1971         if (rc)
1972                 GOTO(out_end, rc);
1973         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1974         if (rc)
1975                 GOTO(out_end, rc);
1976 out_end:
1977         record_end_log(env, &llh);
1978         RETURN(rc);
1979 }
1980
1981 /* write the lcfg in all logs for the given fs */
1982 static int mgs_write_log_direct_all(const struct lu_env *env,
1983                                     struct mgs_device *mgs,
1984                                     struct fs_db *fsdb,
1985                                     struct mgs_target_info *mti,
1986                                     struct llog_cfg_rec *lcr, char *devname,
1987                                     char *comment, int server_only)
1988 {
1989         struct list_head         log_list;
1990         struct mgs_direntry     *dirent, *n;
1991         char                    *fsname = mti->mti_fsname;
1992         int                      rc = 0, len = strlen(fsname);
1993
1994         ENTRY;
1995         /* Find all the logs in the CONFIGS directory */
1996         rc = class_dentry_readdir(env, mgs, &log_list);
1997         if (rc)
1998                 RETURN(rc);
1999
2000         /* Could use fsdb index maps instead of directory listing */
2001         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
2002                 list_del_init(&dirent->mde_list);
2003                 /* don't write to sptlrpc rule log */
2004                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
2005                         goto next;
2006
2007                 /* caller wants write server logs only */
2008                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
2009                         goto next;
2010
2011                 if (strlen(dirent->mde_name) <= len ||
2012                     strncmp(fsname, dirent->mde_name, len) != 0 ||
2013                     dirent->mde_name[len] != '-')
2014                         goto next;
2015
2016                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
2017                 /* Erase any old settings of this same parameter */
2018                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
2019                                 devname, comment, CM_SKIP);
2020                 if (rc < 0)
2021                         CERROR("%s: Can't modify llog %s: rc = %d\n",
2022                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2023                 if (lcr == NULL)
2024                         goto next;
2025                 /* Write the new one */
2026                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
2027                                           lcr, devname, comment);
2028                 if (rc != 0)
2029                         CERROR("%s: writing log %s: rc = %d\n",
2030                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2031 next:
2032                 mgs_direntry_free(dirent);
2033         }
2034
2035         RETURN(rc);
2036 }
2037
2038 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2039                                     struct mgs_device *mgs,
2040                                     struct fs_db *fsdb,
2041                                     struct mgs_target_info *mti,
2042                                     int index, char *logname);
2043 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2044                                     struct mgs_device *mgs,
2045                                     struct fs_db *fsdb,
2046                                     struct mgs_target_info *mti,
2047                                     char *logname, char *suffix, char *lovname,
2048                                     enum lustre_sec_part sec_part, int flags);
2049 static int name_create_mdt_and_lov(char **logname, char **lovname,
2050                                    struct fs_db *fsdb, int i);
2051
2052 static int add_param(char *params, char *key, char *val)
2053 {
2054         char *start = params + strlen(params);
2055         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
2056         int keylen = 0;
2057
2058         if (key != NULL)
2059                 keylen = strlen(key);
2060         if (start + 1 + keylen + strlen(val) >= end) {
2061                 CERROR("params are too long: %s %s%s\n",
2062                        params, key != NULL ? key : "", val);
2063                 return -EINVAL;
2064         }
2065
2066         sprintf(start, " %s%s", key != NULL ? key : "", val);
2067         return 0;
2068 }
2069
2070 /**
2071  * Walk through client config log record and convert the related records
2072  * into the target.
2073  **/
2074 static int mgs_steal_client_llog_handler(const struct lu_env *env,
2075                                          struct llog_handle *llh,
2076                                          struct llog_rec_hdr *rec, void *data)
2077 {
2078         struct mgs_device *mgs;
2079         struct obd_device *obd;
2080         struct mgs_target_info *mti, *tmti;
2081         struct fs_db *fsdb;
2082         int cfg_len = rec->lrh_len;
2083         char *cfg_buf = (char*) (rec + 1);
2084         struct lustre_cfg *lcfg;
2085         int rc = 0;
2086         struct llog_handle *mdt_llh = NULL;
2087         static int got_an_osc_or_mdc = 0;
2088         /* 0: not found any osc/mdc;
2089            1: found osc;
2090            2: found mdc;
2091         */
2092         static int last_step = -1;
2093         int cplen = 0;
2094
2095         ENTRY;
2096
2097         mti = ((struct temp_comp*)data)->comp_mti;
2098         tmti = ((struct temp_comp*)data)->comp_tmti;
2099         fsdb = ((struct temp_comp*)data)->comp_fsdb;
2100         obd = ((struct temp_comp *)data)->comp_obd;
2101         mgs = lu2mgs_dev(obd->obd_lu_dev);
2102         LASSERT(mgs);
2103
2104         if (rec->lrh_type != OBD_CFG_REC) {
2105                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2106                 RETURN(-EINVAL);
2107         }
2108
2109         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
2110         if (rc) {
2111                 CERROR("Insane cfg\n");
2112                 RETURN(rc);
2113         }
2114
2115         lcfg = (struct lustre_cfg *)cfg_buf;
2116
2117         if (lcfg->lcfg_command == LCFG_MARKER) {
2118                 struct cfg_marker *marker;
2119                 marker = lustre_cfg_buf(lcfg, 1);
2120                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2121                     (marker->cm_flags & CM_START) &&
2122                      !(marker->cm_flags & CM_SKIP)) {
2123                         got_an_osc_or_mdc = 1;
2124                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
2125                                         sizeof(tmti->mti_svname));
2126                         if (cplen >= sizeof(tmti->mti_svname))
2127                                 RETURN(-E2BIG);
2128                         rc = record_start_log(env, mgs, &mdt_llh,
2129                                               mti->mti_svname);
2130                         if (rc)
2131                                 RETURN(rc);
2132                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
2133                                            mti->mti_svname, "add osc(copied)");
2134                         record_end_log(env, &mdt_llh);
2135                         last_step = marker->cm_step;
2136                         RETURN(rc);
2137                 }
2138                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2139                     (marker->cm_flags & CM_END) &&
2140                      !(marker->cm_flags & CM_SKIP)) {
2141                         LASSERT(last_step == marker->cm_step);
2142                         last_step = -1;
2143                         got_an_osc_or_mdc = 0;
2144                         memset(tmti, 0, sizeof(*tmti));
2145                         rc = record_start_log(env, mgs, &mdt_llh,
2146                                               mti->mti_svname);
2147                         if (rc)
2148                                 RETURN(rc);
2149                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
2150                                            mti->mti_svname, "add osc(copied)");
2151                         record_end_log(env, &mdt_llh);
2152                         RETURN(rc);
2153                 }
2154                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2155                     (marker->cm_flags & CM_START) &&
2156                      !(marker->cm_flags & CM_SKIP)) {
2157                         got_an_osc_or_mdc = 2;
2158                         last_step = marker->cm_step;
2159                         memcpy(tmti->mti_svname, marker->cm_tgtname,
2160                                strlen(marker->cm_tgtname));
2161
2162                         RETURN(rc);
2163                 }
2164                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2165                     (marker->cm_flags & CM_END) &&
2166                      !(marker->cm_flags & CM_SKIP)) {
2167                         LASSERT(last_step == marker->cm_step);
2168                         last_step = -1;
2169                         got_an_osc_or_mdc = 0;
2170                         memset(tmti, 0, sizeof(*tmti));
2171                         RETURN(rc);
2172                 }
2173         }
2174
2175         if (got_an_osc_or_mdc == 0 || last_step < 0)
2176                 RETURN(rc);
2177
2178         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
2179                 __u64 nodenid = lcfg->lcfg_nid;
2180
2181                 if (strlen(tmti->mti_uuid) == 0) {
2182                         /* target uuid not set, this config record is before
2183                          * LCFG_SETUP, this nid is one of target node nid.
2184                          */
2185                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
2186                         tmti->mti_nid_count++;
2187                 } else {
2188                         char nidstr[LNET_NIDSTR_SIZE];
2189
2190                         /* failover node nid */
2191                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
2192                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
2193                                         nidstr);
2194                 }
2195
2196                 RETURN(rc);
2197         }
2198
2199         if (lcfg->lcfg_command == LCFG_SETUP) {
2200                 char *target;
2201
2202                 target = lustre_cfg_string(lcfg, 1);
2203                 memcpy(tmti->mti_uuid, target, strlen(target));
2204                 RETURN(rc);
2205         }
2206
2207         /* ignore client side sptlrpc_conf_log */
2208         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
2209                 RETURN(rc);
2210
2211         if (lcfg->lcfg_command == LCFG_ADD_MDC &&
2212             strstr(lustre_cfg_string(lcfg, 0), "-clilmv") != NULL) {
2213                 int index;
2214
2215                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
2216                         RETURN (-EINVAL);
2217
2218                 memcpy(tmti->mti_fsname, mti->mti_fsname,
2219                        strlen(mti->mti_fsname));
2220                 tmti->mti_stripe_index = index;
2221
2222                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
2223                                               mti->mti_stripe_index,
2224                                               mti->mti_svname);
2225                 memset(tmti, 0, sizeof(*tmti));
2226                 RETURN(rc);
2227         }
2228
2229         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
2230                 int index;
2231                 char mdt_index[9];
2232                 char *logname, *lovname;
2233
2234                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2235                                              mti->mti_stripe_index);
2236                 if (rc)
2237                         RETURN(rc);
2238                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
2239
2240                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
2241                         name_destroy(&logname);
2242                         name_destroy(&lovname);
2243                         RETURN(-EINVAL);
2244                 }
2245
2246                 tmti->mti_stripe_index = index;
2247                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
2248                                          mdt_index, lovname,
2249                                          LUSTRE_SP_MDT, 0);
2250                 name_destroy(&logname);
2251                 name_destroy(&lovname);
2252                 RETURN(rc);
2253         }
2254         RETURN(rc);
2255 }
2256
2257 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
2258 /* stealed from mgs_get_fsdb_from_llog*/
2259 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
2260                                               struct mgs_device *mgs,
2261                                               char *client_name,
2262                                               struct temp_comp* comp)
2263 {
2264         struct llog_handle *loghandle;
2265         struct mgs_target_info *tmti;
2266         struct llog_ctxt *ctxt;
2267         int rc;
2268
2269         ENTRY;
2270
2271         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2272         LASSERT(ctxt != NULL);
2273
2274         OBD_ALLOC_PTR(tmti);
2275         if (tmti == NULL)
2276                 GOTO(out_ctxt, rc = -ENOMEM);
2277
2278         comp->comp_tmti = tmti;
2279         comp->comp_obd = mgs->mgs_obd;
2280
2281         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
2282                        LLOG_OPEN_EXISTS);
2283         if (rc < 0) {
2284                 if (rc == -ENOENT)
2285                         rc = 0;
2286                 GOTO(out_pop, rc);
2287         }
2288
2289         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
2290         if (rc)
2291                 GOTO(out_close, rc);
2292
2293         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
2294                                   (void *)comp, NULL, false);
2295         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
2296 out_close:
2297         llog_close(env, loghandle);
2298 out_pop:
2299         OBD_FREE_PTR(tmti);
2300 out_ctxt:
2301         llog_ctxt_put(ctxt);
2302         RETURN(rc);
2303 }
2304
2305 /* lmv is the second thing for client logs */
2306 /* copied from mgs_write_log_lov. Please refer to that.  */
2307 static int mgs_write_log_lmv(const struct lu_env *env,
2308                              struct mgs_device *mgs,
2309                              struct fs_db *fsdb,
2310                              struct mgs_target_info *mti,
2311                              char *logname, char *lmvname)
2312 {
2313         struct llog_handle *llh = NULL;
2314         struct lmv_desc *lmvdesc;
2315         char *uuid;
2316         int rc = 0;
2317         ENTRY;
2318
2319         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
2320
2321         OBD_ALLOC_PTR(lmvdesc);
2322         if (lmvdesc == NULL)
2323                 RETURN(-ENOMEM);
2324         lmvdesc->ld_active_tgt_count = 0;
2325         lmvdesc->ld_tgt_count = 0;
2326         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
2327         uuid = (char *)lmvdesc->ld_uuid.uuid;
2328
2329         rc = record_start_log(env, mgs, &llh, logname);
2330         if (rc)
2331                 GOTO(out_free, rc);
2332         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
2333         if (rc)
2334                 GOTO(out_end, rc);
2335         rc = record_attach(env, llh, lmvname, "lmv", uuid);
2336         if (rc)
2337                 GOTO(out_end, rc);
2338         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
2339         if (rc)
2340                 GOTO(out_end, rc);
2341         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
2342         if (rc)
2343                 GOTO(out_end, rc);
2344 out_end:
2345         record_end_log(env, &llh);
2346 out_free:
2347         OBD_FREE_PTR(lmvdesc);
2348         RETURN(rc);
2349 }
2350
2351 /* lov is the first thing in the mdt and client logs */
2352 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
2353                              struct fs_db *fsdb, struct mgs_target_info *mti,
2354                              char *logname, char *lovname)
2355 {
2356         struct llog_handle *llh = NULL;
2357         struct lov_desc *lovdesc;
2358         char *uuid;
2359         int rc = 0;
2360         ENTRY;
2361
2362         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
2363
2364         /*
2365         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
2366         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
2367               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
2368         */
2369
2370         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
2371         OBD_ALLOC_PTR(lovdesc);
2372         if (lovdesc == NULL)
2373                 RETURN(-ENOMEM);
2374         lovdesc->ld_magic = LOV_DESC_MAGIC;
2375         lovdesc->ld_tgt_count = 0;
2376         /* Defaults.  Can be changed later by lcfg config_param */
2377         lovdesc->ld_default_stripe_count = 1;
2378         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
2379         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
2380         lovdesc->ld_default_stripe_offset = -1;
2381         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
2382         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
2383         /* can these be the same? */
2384         uuid = (char *)lovdesc->ld_uuid.uuid;
2385
2386         /* This should always be the first entry in a log.
2387         rc = mgs_clear_log(obd, logname); */
2388         rc = record_start_log(env, mgs, &llh, logname);
2389         if (rc)
2390                 GOTO(out_free, rc);
2391         /* FIXME these should be a single journal transaction */
2392         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
2393         if (rc)
2394                 GOTO(out_end, rc);
2395         rc = record_attach(env, llh, lovname, "lov", uuid);
2396         if (rc)
2397                 GOTO(out_end, rc);
2398         rc = record_lov_setup(env, llh, lovname, lovdesc);
2399         if (rc)
2400                 GOTO(out_end, rc);
2401         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
2402         if (rc)
2403                 GOTO(out_end, rc);
2404         EXIT;
2405 out_end:
2406         record_end_log(env, &llh);
2407 out_free:
2408         OBD_FREE_PTR(lovdesc);
2409         return rc;
2410 }
2411
2412 /* add failnids to open log */
2413 static int mgs_write_log_failnids(const struct lu_env *env,
2414                                   struct mgs_target_info *mti,
2415                                   struct llog_handle *llh,
2416                                   char *cliname)
2417 {
2418         char *failnodeuuid = NULL;
2419         char *ptr = mti->mti_params;
2420         lnet_nid_t nid;
2421         int rc = 0;
2422
2423         /*
2424         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
2425         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
2426         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
2427         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
2428         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
2429         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
2430         */
2431
2432         /*
2433          * Pull failnid info out of params string, which may contain something
2434          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
2435          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
2436          * etc.  However, convert_hostnames() should have caught those.
2437          */
2438         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2439                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2440                         char nidstr[LNET_NIDSTR_SIZE];
2441
2442                         if (failnodeuuid == NULL) {
2443                                 /* We don't know the failover node name,
2444                                  * so just use the first nid as the uuid */
2445                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
2446                                 rc = name_create(&failnodeuuid, nidstr, "");
2447                                 if (rc != 0)
2448                                         return rc;
2449                         }
2450                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2451                                 "client %s\n",
2452                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
2453                                 failnodeuuid, cliname);
2454                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2455                         /*
2456                          * If *ptr is ':', we have added all NIDs for
2457                          * failnodeuuid.
2458                          */
2459                         if (*ptr == ':') {
2460                                 rc = record_add_conn(env, llh, cliname,
2461                                                      failnodeuuid);
2462                                 name_destroy(&failnodeuuid);
2463                                 failnodeuuid = NULL;
2464                         }
2465                 }
2466                 if (failnodeuuid) {
2467                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2468                         name_destroy(&failnodeuuid);
2469                         failnodeuuid = NULL;
2470                 }
2471         }
2472
2473         return rc;
2474 }
2475
2476 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2477                                     struct mgs_device *mgs,
2478                                     struct fs_db *fsdb,
2479                                     struct mgs_target_info *mti,
2480                                     char *logname, char *lmvname)
2481 {
2482         struct llog_handle *llh = NULL;
2483         char *mdcname = NULL;
2484         char *nodeuuid = NULL;
2485         char *mdcuuid = NULL;
2486         char *lmvuuid = NULL;
2487         char index[6];
2488         char nidstr[LNET_NIDSTR_SIZE];
2489         int i, rc;
2490         ENTRY;
2491
2492         if (mgs_log_is_empty(env, mgs, logname)) {
2493                 CERROR("log is empty! Logical error\n");
2494                 RETURN(-EINVAL);
2495         }
2496
2497         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2498                mti->mti_svname, logname, lmvname);
2499
2500         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2501         rc = name_create(&nodeuuid, nidstr, "");
2502         if (rc)
2503                 RETURN(rc);
2504         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2505         if (rc)
2506                 GOTO(out_free, rc);
2507         rc = name_create(&mdcuuid, mdcname, "_UUID");
2508         if (rc)
2509                 GOTO(out_free, rc);
2510         rc = name_create(&lmvuuid, lmvname, "_UUID");
2511         if (rc)
2512                 GOTO(out_free, rc);
2513
2514         rc = record_start_log(env, mgs, &llh, logname);
2515         if (rc)
2516                 GOTO(out_free, rc);
2517         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2518                            "add mdc");
2519         if (rc)
2520                 GOTO(out_end, rc);
2521         for (i = 0; i < mti->mti_nid_count; i++) {
2522                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2523                         libcfs_nid2str_r(mti->mti_nids[i],
2524                                          nidstr, sizeof(nidstr)));
2525
2526                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2527                 if (rc)
2528                         GOTO(out_end, rc);
2529         }
2530
2531         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2532         if (rc)
2533                 GOTO(out_end, rc);
2534         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2535                           NULL, NULL);
2536         if (rc)
2537                 GOTO(out_end, rc);
2538         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2539         if (rc)
2540                 GOTO(out_end, rc);
2541         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2542         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2543                             index, "1");
2544         if (rc)
2545                 GOTO(out_end, rc);
2546         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2547                            "add mdc");
2548         if (rc)
2549                 GOTO(out_end, rc);
2550 out_end:
2551         record_end_log(env, &llh);
2552 out_free:
2553         name_destroy(&lmvuuid);
2554         name_destroy(&mdcuuid);
2555         name_destroy(&mdcname);
2556         name_destroy(&nodeuuid);
2557         RETURN(rc);
2558 }
2559
2560 static inline int name_create_lov(char **lovname, char *mdtname,
2561                                   struct fs_db *fsdb, int index)
2562 {
2563         /* COMPAT_180 */
2564         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2565                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2566         else
2567                 return name_create(lovname, mdtname, "-mdtlov");
2568 }
2569
2570 static int name_create_mdt_and_lov(char **logname, char **lovname,
2571                                    struct fs_db *fsdb, int i)
2572 {
2573         int rc;
2574
2575         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2576         if (rc)
2577                 return rc;
2578         /* COMPAT_180 */
2579         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2580                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2581         else
2582                 rc = name_create(lovname, *logname, "-mdtlov");
2583         if (rc) {
2584                 name_destroy(logname);
2585                 *logname = NULL;
2586         }
2587         return rc;
2588 }
2589
2590 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2591                                       struct fs_db *fsdb, int i)
2592 {
2593         char suffix[16];
2594
2595         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2596                 sprintf(suffix, "-osc");
2597         else
2598                 sprintf(suffix, "-osc-MDT%04x", i);
2599         return name_create(oscname, ostname, suffix);
2600 }
2601
2602 /* add new mdc to already existent MDS */
2603 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2604                                     struct mgs_device *mgs,
2605                                     struct fs_db *fsdb,
2606                                     struct mgs_target_info *mti,
2607                                     int mdt_index, char *logname)
2608 {
2609         struct llog_handle      *llh = NULL;
2610         char    *nodeuuid = NULL;
2611         char    *ospname = NULL;
2612         char    *lovuuid = NULL;
2613         char    *mdtuuid = NULL;
2614         char    *svname = NULL;
2615         char    *mdtname = NULL;
2616         char    *lovname = NULL;
2617         char    index_str[16];
2618         char    nidstr[LNET_NIDSTR_SIZE];
2619         int     i, rc;
2620
2621         ENTRY;
2622         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2623                 CERROR("log is empty! Logical error\n");
2624                 RETURN (-EINVAL);
2625         }
2626
2627         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2628                logname);
2629
2630         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2631         if (rc)
2632                 RETURN(rc);
2633
2634         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2635         rc = name_create(&nodeuuid, nidstr, "");
2636         if (rc)
2637                 GOTO(out_destory, rc);
2638
2639         rc = name_create(&svname, mdtname, "-osp");
2640         if (rc)
2641                 GOTO(out_destory, rc);
2642
2643         sprintf(index_str, "-MDT%04x", mdt_index);
2644         rc = name_create(&ospname, svname, index_str);
2645         if (rc)
2646                 GOTO(out_destory, rc);
2647
2648         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2649         if (rc)
2650                 GOTO(out_destory, rc);
2651
2652         rc = name_create(&lovuuid, lovname, "_UUID");
2653         if (rc)
2654                 GOTO(out_destory, rc);
2655
2656         rc = name_create(&mdtuuid, mdtname, "_UUID");
2657         if (rc)
2658                 GOTO(out_destory, rc);
2659
2660         rc = record_start_log(env, mgs, &llh, logname);
2661         if (rc)
2662                 GOTO(out_destory, rc);
2663
2664         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2665                            "add osp");
2666         if (rc)
2667                 GOTO(out_destory, rc);
2668
2669         for (i = 0; i < mti->mti_nid_count; i++) {
2670                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2671                         libcfs_nid2str_r(mti->mti_nids[i],
2672                                          nidstr, sizeof(nidstr)));
2673                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2674                 if (rc)
2675                         GOTO(out_end, rc);
2676         }
2677
2678         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2679         if (rc)
2680                 GOTO(out_end, rc);
2681
2682         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2683                           NULL, NULL);
2684         if (rc)
2685                 GOTO(out_end, rc);
2686
2687         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2688         if (rc)
2689                 GOTO(out_end, rc);
2690
2691         /* Add mdc(osp) to lod */
2692         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2693         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2694                          index_str, "1", NULL);
2695         if (rc)
2696                 GOTO(out_end, rc);
2697
2698         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2699         if (rc)
2700                 GOTO(out_end, rc);
2701
2702 out_end:
2703         record_end_log(env, &llh);
2704
2705 out_destory:
2706         name_destroy(&mdtuuid);
2707         name_destroy(&lovuuid);
2708         name_destroy(&lovname);
2709         name_destroy(&ospname);
2710         name_destroy(&svname);
2711         name_destroy(&nodeuuid);
2712         name_destroy(&mdtname);
2713         RETURN(rc);
2714 }
2715
2716 static int mgs_write_log_mdt0(const struct lu_env *env,
2717                               struct mgs_device *mgs,
2718                               struct fs_db *fsdb,
2719                               struct mgs_target_info *mti)
2720 {
2721         char *log = mti->mti_svname;
2722         struct llog_handle *llh = NULL;
2723         char *uuid, *lovname;
2724         char mdt_index[6];
2725         char *ptr = mti->mti_params;
2726         int rc = 0, failout = 0;
2727         ENTRY;
2728
2729         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2730         if (uuid == NULL)
2731                 RETURN(-ENOMEM);
2732
2733         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2734                 failout = (strncmp(ptr, "failout", 7) == 0);
2735
2736         rc = name_create(&lovname, log, "-mdtlov");
2737         if (rc)
2738                 GOTO(out_free, rc);
2739         if (mgs_log_is_empty(env, mgs, log)) {
2740                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2741                 if (rc)
2742                         GOTO(out_lod, rc);
2743         }
2744
2745         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2746
2747         rc = record_start_log(env, mgs, &llh, log);
2748         if (rc)
2749                 GOTO(out_lod, rc);
2750
2751         /* add MDT itself */
2752
2753         /* FIXME this whole fn should be a single journal transaction */
2754         sprintf(uuid, "%s_UUID", log);
2755         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2756         if (rc)
2757                 GOTO(out_lod, rc);
2758         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2759         if (rc)
2760                 GOTO(out_end, rc);
2761         rc = record_mount_opt(env, llh, log, lovname, NULL);
2762         if (rc)
2763                 GOTO(out_end, rc);
2764         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2765                         failout ? "n" : "f");
2766         if (rc)
2767                 GOTO(out_end, rc);
2768         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2769         if (rc)
2770                 GOTO(out_end, rc);
2771 out_end:
2772         record_end_log(env, &llh);
2773 out_lod:
2774         name_destroy(&lovname);
2775 out_free:
2776         OBD_FREE(uuid, sizeof(struct obd_uuid));
2777         RETURN(rc);
2778 }
2779
2780 /* envelope method for all layers log */
2781 static int mgs_write_log_mdt(const struct lu_env *env,
2782                              struct mgs_device *mgs,
2783                              struct fs_db *fsdb,
2784                              struct mgs_target_info *mti)
2785 {
2786         struct mgs_thread_info *mgi = mgs_env_info(env);
2787         struct llog_handle *llh = NULL;
2788         char *cliname;
2789         int rc, i = 0;
2790         ENTRY;
2791
2792         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2793
2794         if (mti->mti_uuid[0] == '\0') {
2795                 /* Make up our own uuid */
2796                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2797                          "%s_UUID", mti->mti_svname);
2798         }
2799
2800         /* add mdt */
2801         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2802         if (rc)
2803                 RETURN(rc);
2804         /* Append the mdt info to the client log */
2805         rc = name_create(&cliname, mti->mti_fsname, "-client");
2806         if (rc)
2807                 RETURN(rc);
2808
2809         if (mgs_log_is_empty(env, mgs, cliname)) {
2810                 /* Start client log */
2811                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2812                                        fsdb->fsdb_clilov);
2813                 if (rc)
2814                         GOTO(out_free, rc);
2815                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2816                                        fsdb->fsdb_clilmv);
2817                 if (rc)
2818                         GOTO(out_free, rc);
2819         }
2820
2821         /*
2822         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2823         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2824         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2825         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2826         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2827         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2828         */
2829
2830         /* copy client info about lov/lmv */
2831         mgi->mgi_comp.comp_mti = mti;
2832         mgi->mgi_comp.comp_fsdb = fsdb;
2833
2834         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2835                                                 &mgi->mgi_comp);
2836         if (rc)
2837                 GOTO(out_free, rc);
2838         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2839                                       fsdb->fsdb_clilmv);
2840         if (rc)
2841                 GOTO(out_free, rc);
2842
2843         /* add mountopts */
2844         rc = record_start_log(env, mgs, &llh, cliname);
2845         if (rc)
2846                 GOTO(out_free, rc);
2847
2848         rc = record_marker(env, llh, fsdb, CM_START, cliname, "mount opts");
2849         if (rc)
2850                 GOTO(out_end, rc);
2851         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2852                               fsdb->fsdb_clilmv);
2853         if (rc)
2854                 GOTO(out_end, rc);
2855         rc = record_marker(env, llh, fsdb, CM_END, cliname, "mount opts");
2856         if (rc)
2857                 GOTO(out_end, rc);
2858
2859         /* for_all_existing_mdt except current one */
2860         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2861                 if (i !=  mti->mti_stripe_index &&
2862                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2863                         char *logname;
2864
2865                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2866                         if (rc)
2867                                 GOTO(out_end, rc);
2868
2869                         /* NB: If the log for the MDT is empty, it means
2870                          * the MDT is only added to the index
2871                          * map, and not being process yet, i.e. this
2872                          * is an unregistered MDT, see mgs_write_log_target().
2873                          * so we should skip it. Otherwise
2874                          *
2875                          * 1. MGS get register request for MDT1 and MDT2.
2876                          *
2877                          * 2. Then both MDT1 and MDT2 are added into
2878                          * fsdb_mdt_index_map. (see mgs_set_index()).
2879                          *
2880                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2881                          * generate the config log, here, it will regard MDT2
2882                          * as an existent MDT, and generate "add osp" for
2883                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2884                          * MDT0002 config log is still empty, so it will
2885                          * add "add osp" even before "lov setup", which
2886                          * will definitly cause trouble.
2887                          *
2888                          * 4. MDT1 registeration finished, fsdb_mutex is
2889                          * released, then MDT2 get in, then in above
2890                          * mgs_steal_llog_for_mdt_from_client(), it will
2891                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2892                          * which will cause another trouble.*/
2893                         if (!mgs_log_is_empty(env, mgs, logname))
2894                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2895                                                               mti, i, logname);
2896
2897                         name_destroy(&logname);
2898                         if (rc)
2899                                 GOTO(out_end, rc);
2900                 }
2901         }
2902 out_end:
2903         record_end_log(env, &llh);
2904 out_free:
2905         name_destroy(&cliname);
2906         RETURN(rc);
2907 }
2908
2909 /* Add the ost info to the client/mdt lov */
2910 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2911                                     struct mgs_device *mgs, struct fs_db *fsdb,
2912                                     struct mgs_target_info *mti,
2913                                     char *logname, char *suffix, char *lovname,
2914                                     enum lustre_sec_part sec_part, int flags)
2915 {
2916         struct llog_handle *llh = NULL;
2917         char *nodeuuid = NULL;
2918         char *oscname = NULL;
2919         char *oscuuid = NULL;
2920         char *lovuuid = NULL;
2921         char *svname = NULL;
2922         char index[6];
2923         char nidstr[LNET_NIDSTR_SIZE];
2924         int i, rc;
2925         ENTRY;
2926
2927         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2928                 mti->mti_svname, logname);
2929
2930         if (mgs_log_is_empty(env, mgs, logname)) {
2931                 CERROR("log is empty! Logical error\n");
2932                 RETURN(-EINVAL);
2933         }
2934
2935         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2936         rc = name_create(&nodeuuid, nidstr, "");
2937         if (rc)
2938                 RETURN(rc);
2939         rc = name_create(&svname, mti->mti_svname, "-osc");
2940         if (rc)
2941                 GOTO(out_free, rc);
2942
2943         /* for the system upgraded from old 1.8, keep using the old osc naming
2944          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2945         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2946                 rc = name_create(&oscname, svname, "");
2947         else
2948                 rc = name_create(&oscname, svname, suffix);
2949         if (rc)
2950                 GOTO(out_free, rc);
2951
2952         rc = name_create(&oscuuid, oscname, "_UUID");
2953         if (rc)
2954                 GOTO(out_free, rc);
2955         rc = name_create(&lovuuid, lovname, "_UUID");
2956         if (rc)
2957                 GOTO(out_free, rc);
2958
2959
2960         /*
2961         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2962         multihomed (#4)
2963         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2964         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2965         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2966         failover (#6,7)
2967         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2968         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2969         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2970         */
2971
2972         rc = record_start_log(env, mgs, &llh, logname);
2973         if (rc)
2974                 GOTO(out_free, rc);
2975
2976         /* FIXME these should be a single journal transaction */
2977         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2978                            "add osc");
2979         if (rc)
2980                 GOTO(out_end, rc);
2981
2982         /* NB: don't change record order, because upon MDT steal OSC config
2983          * from client, it treats all nids before LCFG_SETUP as target nids
2984          * (multiple interfaces), while nids after as failover node nids.
2985          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2986          */
2987         for (i = 0; i < mti->mti_nid_count; i++) {
2988                 CDEBUG(D_MGS, "add nid %s\n",
2989                         libcfs_nid2str_r(mti->mti_nids[i],
2990                                          nidstr, sizeof(nidstr)));
2991                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2992                 if (rc)
2993                         GOTO(out_end, rc);
2994         }
2995         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2996         if (rc)
2997                 GOTO(out_end, rc);
2998         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2999                           NULL, NULL);
3000         if (rc)
3001                 GOTO(out_end, rc);
3002         rc = mgs_write_log_failnids(env, mti, llh, oscname);
3003         if (rc)
3004                 GOTO(out_end, rc);
3005
3006         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
3007
3008         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
3009         if (rc)
3010                 GOTO(out_end, rc);
3011         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
3012                            "add osc");
3013         if (rc)
3014                 GOTO(out_end, rc);
3015 out_end:
3016         record_end_log(env, &llh);
3017 out_free:
3018         name_destroy(&lovuuid);
3019         name_destroy(&oscuuid);
3020         name_destroy(&oscname);
3021         name_destroy(&svname);
3022         name_destroy(&nodeuuid);
3023         RETURN(rc);
3024 }
3025
3026 static int mgs_write_log_ost(const struct lu_env *env,
3027                              struct mgs_device *mgs, struct fs_db *fsdb,
3028                              struct mgs_target_info *mti)
3029 {
3030         struct llog_handle *llh = NULL;
3031         char *logname, *lovname;
3032         char *ptr = mti->mti_params;
3033         int rc, flags = 0, failout = 0, i;
3034         ENTRY;
3035
3036         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
3037
3038         /* The ost startup log */
3039
3040         /* If the ost log already exists, that means that someone reformatted
3041            the ost and it called target_add again. */
3042         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3043                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
3044                                    "exists, yet the server claims it never "
3045                                    "registered. It may have been reformatted, "
3046                                    "or the index changed. writeconf the MDT to "
3047                                    "regenerate all logs.\n", mti->mti_svname);
3048                 RETURN(-EALREADY);
3049         }
3050
3051         /*
3052         attach obdfilter ost1 ost1_UUID
3053         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
3054         */
3055         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
3056                 failout = (strncmp(ptr, "failout", 7) == 0);
3057         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
3058         if (rc)
3059                 RETURN(rc);
3060         /* FIXME these should be a single journal transaction */
3061         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
3062         if (rc)
3063                 GOTO(out_end, rc);
3064         if (*mti->mti_uuid == '\0')
3065                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
3066                          "%s_UUID", mti->mti_svname);
3067         rc = record_attach(env, llh, mti->mti_svname,
3068                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
3069         if (rc)
3070                 GOTO(out_end, rc);
3071         rc = record_setup(env, llh, mti->mti_svname,
3072                           "dev"/*ignored*/, "type"/*ignored*/,
3073                           failout ? "n" : "f", NULL/*options*/);
3074         if (rc)
3075                 GOTO(out_end, rc);
3076         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
3077         if (rc)
3078                 GOTO(out_end, rc);
3079 out_end:
3080         record_end_log(env, &llh);
3081         if (rc)
3082                 RETURN(rc);
3083         /* We also have to update the other logs where this osc is part of
3084            the lov */
3085
3086         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3087                 /* If we're upgrading, the old mdt log already has our
3088                    entry. Let's do a fake one for fun. */
3089                 /* Note that we can't add any new failnids, since we don't
3090                    know the old osc names. */
3091                 flags = CM_SKIP | CM_UPGRADE146;
3092
3093         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
3094                 /* If the update flag isn't set, don't update client/mdt
3095                    logs. */
3096                 flags |= CM_SKIP;
3097                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
3098                               "the MDT first to regenerate it.\n",
3099                               mti->mti_svname);
3100         }
3101
3102         /* Add ost to all MDT lov defs */
3103         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3104                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
3105                         char mdt_index[13];
3106
3107                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
3108                                                      i);
3109                         if (rc)
3110                                 RETURN(rc);
3111
3112                         snprintf(mdt_index, sizeof(mdt_index), "-MDT%04x", i);
3113                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
3114                                                       logname, mdt_index,
3115                                                       lovname, LUSTRE_SP_MDT,
3116                                                       flags);
3117                         name_destroy(&logname);
3118                         name_destroy(&lovname);
3119                         if (rc)
3120                                 RETURN(rc);
3121                 }
3122         }
3123
3124         /* Append ost info to the client log */
3125         rc = name_create(&logname, mti->mti_fsname, "-client");
3126         if (rc)
3127                 RETURN(rc);
3128         if (mgs_log_is_empty(env, mgs, logname)) {
3129                 /* Start client log */
3130                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
3131                                        fsdb->fsdb_clilov);
3132                 if (rc)
3133                         GOTO(out_free, rc);
3134                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
3135                                        fsdb->fsdb_clilmv);
3136                 if (rc)
3137                         GOTO(out_free, rc);
3138         }
3139         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
3140                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
3141 out_free:
3142         name_destroy(&logname);
3143         RETURN(rc);
3144 }
3145
3146 static __inline__ int mgs_param_empty(char *ptr)
3147 {
3148         char *tmp;
3149
3150         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
3151                 return 1;
3152         return 0;
3153 }
3154
3155 static int mgs_write_log_failnid_internal(const struct lu_env *env,
3156                                           struct mgs_device *mgs,
3157                                           struct fs_db *fsdb,
3158                                           struct mgs_target_info *mti,
3159                                           char *logname, char *cliname)
3160 {
3161         int rc;
3162         struct llog_handle *llh = NULL;
3163
3164         if (mgs_param_empty(mti->mti_params)) {
3165                 /* Remove _all_ failnids */
3166                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3167                                 mti->mti_svname, "add failnid", CM_SKIP);
3168                 return rc < 0 ? rc : 0;
3169         }
3170
3171         /* Otherwise failover nids are additive */
3172         rc = record_start_log(env, mgs, &llh, logname);
3173         if (rc)
3174                 return rc;
3175                 /* FIXME this should be a single journal transaction */
3176         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
3177                            "add failnid");
3178         if (rc)
3179                 goto out_end;
3180         rc = mgs_write_log_failnids(env, mti, llh, cliname);
3181         if (rc)
3182                 goto out_end;
3183         rc = record_marker(env, llh, fsdb, CM_END,
3184                            mti->mti_svname, "add failnid");
3185 out_end:
3186         record_end_log(env, &llh);
3187         return rc;
3188 }
3189
3190
3191 /* Add additional failnids to an existing log.
3192    The mdc/osc must have been added to logs first */
3193 /* tcp nids must be in dotted-quad ascii -
3194    we can't resolve hostnames from the kernel. */
3195 static int mgs_write_log_add_failnid(const struct lu_env *env,
3196                                      struct mgs_device *mgs,
3197                                      struct fs_db *fsdb,
3198                                      struct mgs_target_info *mti)
3199 {
3200         char *logname, *cliname;
3201         int rc;
3202         ENTRY;
3203
3204         /* FIXME we currently can't erase the failnids
3205          * given when a target first registers, since they aren't part of
3206          * an "add uuid" stanza
3207          */
3208
3209         /* Verify that we know about this target */
3210         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3211                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
3212                                    "yet. It must be started before failnids "
3213                                    "can be added.\n", mti->mti_svname);
3214                 RETURN(-ENOENT);
3215         }
3216
3217         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
3218         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3219                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
3220         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3221                 rc = name_create(&cliname, mti->mti_svname, "-osc");
3222         } else {
3223                 RETURN(-EINVAL);
3224         }
3225         if (rc)
3226                 RETURN(rc);
3227
3228         /* Add failover nids to the client log */
3229         rc = name_create(&logname, mti->mti_fsname, "-client");
3230         if (rc) {
3231                 name_destroy(&cliname);
3232                 RETURN(rc);
3233         }
3234
3235         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
3236         name_destroy(&logname);
3237         name_destroy(&cliname);
3238         if (rc)
3239                 RETURN(rc);
3240
3241         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3242                 /* Add OST failover nids to the MDT logs as well */
3243                 int i;
3244
3245                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3246                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3247                                 continue;
3248                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3249                         if (rc)
3250                                 RETURN(rc);
3251                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
3252                                                  fsdb, i);
3253                         if (rc) {
3254                                 name_destroy(&logname);
3255                                 RETURN(rc);
3256                         }
3257                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
3258                                                             mti, logname,
3259                                                             cliname);
3260                         name_destroy(&cliname);
3261                         name_destroy(&logname);
3262                         if (rc)
3263                                 RETURN(rc);
3264                 }
3265         }
3266
3267         RETURN(rc);
3268 }
3269
3270 static int mgs_wlp_lcfg(const struct lu_env *env,
3271                         struct mgs_device *mgs, struct fs_db *fsdb,
3272                         struct mgs_target_info *mti,
3273                         char *logname, struct lustre_cfg_bufs *bufs,
3274                         char *tgtname, char *ptr)
3275 {
3276         char comment[MTI_NAME_MAXLEN];
3277         char *tmp;
3278         struct llog_cfg_rec *lcr;
3279         int rc, del;
3280
3281         /* Erase any old settings of this same parameter */
3282         memcpy(comment, ptr, MTI_NAME_MAXLEN);
3283         comment[MTI_NAME_MAXLEN - 1] = 0;
3284         /* But don't try to match the value. */
3285         tmp = strchr(comment, '=');
3286         if (tmp != NULL)
3287                 *tmp = 0;
3288         /* FIXME we should skip settings that are the same as old values */
3289         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
3290         if (rc < 0)
3291                 return rc;
3292         del = mgs_param_empty(ptr);
3293
3294         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
3295                       "Setting" : "Modifying", tgtname, comment, logname);
3296         if (del) {
3297                 /* mgs_modify() will return 1 if nothing had to be done */
3298                 if (rc == 1)
3299                         rc = 0;
3300                 return rc;
3301         }
3302
3303         lustre_cfg_bufs_reset(bufs, tgtname);
3304         lustre_cfg_bufs_set_string(bufs, 1, ptr);
3305         if (mti->mti_flags & LDD_F_PARAM2)
3306                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
3307
3308         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
3309                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
3310         if (lcr == NULL)
3311                 return -ENOMEM;
3312
3313         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
3314                                   comment);
3315         lustre_cfg_rec_free(lcr);
3316         return rc;
3317 }
3318
3319 /* write global variable settings into log */
3320 static int mgs_write_log_sys(const struct lu_env *env,
3321                              struct mgs_device *mgs, struct fs_db *fsdb,
3322                              struct mgs_target_info *mti, char *sys, char *ptr)
3323 {
3324         struct mgs_thread_info  *mgi = mgs_env_info(env);
3325         struct lustre_cfg       *lcfg;
3326         struct llog_cfg_rec     *lcr;
3327         char *tmp, sep;
3328         int rc, cmd, convert = 1;
3329
3330         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
3331                 cmd = LCFG_SET_TIMEOUT;
3332         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
3333                 cmd = LCFG_SET_LDLM_TIMEOUT;
3334         /* Check for known params here so we can return error to lctl */
3335         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
3336                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
3337                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
3338                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
3339                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
3340                 cmd = LCFG_PARAM;
3341         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
3342                 convert = 0; /* Don't convert string value to integer */
3343                 cmd = LCFG_PARAM;
3344         } else {
3345                 return -EINVAL;
3346         }
3347
3348         if (mgs_param_empty(ptr))
3349                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
3350         else
3351                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
3352
3353         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
3354         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
3355         if (!convert && *tmp != '\0')
3356                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
3357         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3358         if (lcr == NULL)
3359                 return -ENOMEM;
3360
3361         lcfg = &lcr->lcr_cfg;
3362         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
3363         /* truncate the comment to the parameter name */
3364         ptr = tmp - 1;
3365         sep = *ptr;
3366         *ptr = '\0';
3367         /* modify all servers and clients */
3368         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3369                                       *tmp == '\0' ? NULL : lcr,
3370                                       mti->mti_fsname, sys, 0);
3371         if (rc == 0 && *tmp != '\0') {
3372                 switch (cmd) {
3373                 case LCFG_SET_TIMEOUT:
3374                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
3375                                 class_process_config(lcfg);
3376                         break;
3377                 case LCFG_SET_LDLM_TIMEOUT:
3378                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
3379                                 class_process_config(lcfg);
3380                         break;
3381                 default:
3382                         break;
3383                 }
3384         }
3385         *ptr = sep;
3386         lustre_cfg_rec_free(lcr);
3387         return rc;
3388 }
3389
3390 /* write quota settings into log */
3391 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
3392                                struct fs_db *fsdb, struct mgs_target_info *mti,
3393                                char *quota, char *ptr)
3394 {
3395         struct mgs_thread_info  *mgi = mgs_env_info(env);
3396         struct llog_cfg_rec     *lcr;
3397         char                    *tmp;
3398         char                     sep;
3399         int                      rc, cmd = LCFG_PARAM;
3400
3401         /* support only 'meta' and 'data' pools so far */
3402         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
3403             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
3404                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
3405                        "& quota.ost are)\n", ptr);
3406                 return -EINVAL;
3407         }
3408
3409         if (*tmp == '\0') {
3410                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
3411         } else {
3412                 CDEBUG(D_MGS, "global '%s'\n", quota);
3413
3414                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
3415                     strchr(tmp, 'p') == NULL &&
3416                     strcmp(tmp, "none") != 0) {
3417                         CERROR("enable option(%s) isn't supported\n", tmp);
3418                         return -EINVAL;
3419                 }
3420         }
3421
3422         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
3423         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
3424         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3425         if (lcr == NULL)
3426                 return -ENOMEM;
3427
3428         /* truncate the comment to the parameter name */
3429         ptr = tmp - 1;
3430         sep = *ptr;
3431         *ptr = '\0';
3432
3433         /* XXX we duplicated quota enable information in all server
3434          *     config logs, it should be moved to a separate config
3435          *     log once we cleanup the config log for global param. */
3436         /* modify all servers */
3437         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3438                                       *tmp == '\0' ? NULL : lcr,
3439                                       mti->mti_fsname, quota, 1);
3440         *ptr = sep;
3441         lustre_cfg_rec_free(lcr);
3442         return rc < 0 ? rc : 0;
3443 }
3444
3445 static int mgs_srpc_set_param_disk(const struct lu_env *env,
3446                                    struct mgs_device *mgs,
3447                                    struct fs_db *fsdb,
3448                                    struct mgs_target_info *mti,
3449                                    char *param)
3450 {
3451         struct mgs_thread_info  *mgi = mgs_env_info(env);
3452         struct llog_cfg_rec     *lcr;
3453         struct llog_handle      *llh = NULL;
3454         char                    *logname;
3455         char                    *comment, *ptr;
3456         int                      rc, len;
3457
3458         ENTRY;
3459
3460         /* get comment */
3461         ptr = strchr(param, '=');
3462         LASSERT(ptr != NULL);
3463         len = ptr - param;
3464
3465         OBD_ALLOC(comment, len + 1);
3466         if (comment == NULL)
3467                 RETURN(-ENOMEM);
3468         strncpy(comment, param, len);
3469         comment[len] = '\0';
3470
3471         /* prepare lcfg */
3472         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
3473         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
3474         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
3475         if (lcr == NULL)
3476                 GOTO(out_comment, rc = -ENOMEM);
3477
3478         /* construct log name */
3479         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
3480         if (rc < 0)
3481                 GOTO(out_lcfg, rc);
3482
3483         if (mgs_log_is_empty(env, mgs, logname)) {
3484                 rc = record_start_log(env, mgs, &llh, logname);
3485                 if (rc < 0)
3486                         GOTO(out, rc);
3487                 record_end_log(env, &llh);
3488         }
3489
3490         /* obsolete old one */
3491         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
3492                         comment, CM_SKIP);
3493         if (rc < 0)
3494                 GOTO(out, rc);
3495         /* write the new one */
3496         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
3497                                   mti->mti_svname, comment);
3498         if (rc)
3499                 CERROR("%s: error writing log %s: rc = %d\n",
3500                        mgs->mgs_obd->obd_name, logname, rc);
3501 out:
3502         name_destroy(&logname);
3503 out_lcfg:
3504         lustre_cfg_rec_free(lcr);
3505 out_comment:
3506         OBD_FREE(comment, len + 1);
3507         RETURN(rc);
3508 }
3509
3510 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3511                                         char *param)
3512 {
3513         char    *ptr;
3514
3515         /* disable the adjustable udesc parameter for now, i.e. use default
3516          * setting that client always ship udesc to MDT if possible. to enable
3517          * it simply remove the following line */
3518         goto error_out;
3519
3520         ptr = strchr(param, '=');
3521         if (ptr == NULL)
3522                 goto error_out;
3523         *ptr++ = '\0';
3524
3525         if (strcmp(param, PARAM_SRPC_UDESC))
3526                 goto error_out;
3527
3528         if (strcmp(ptr, "yes") == 0) {
3529                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3530                 CWARN("Enable user descriptor shipping from client to MDT\n");
3531         } else if (strcmp(ptr, "no") == 0) {
3532                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3533                 CWARN("Disable user descriptor shipping from client to MDT\n");
3534         } else {
3535                 *(ptr - 1) = '=';
3536                 goto error_out;
3537         }
3538         return 0;
3539
3540 error_out:
3541         CERROR("Invalid param: %s\n", param);
3542         return -EINVAL;
3543 }
3544
3545 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3546                                   const char *svname,
3547                                   char *param)
3548 {
3549         struct sptlrpc_rule      rule;
3550         struct sptlrpc_rule_set *rset;
3551         int                      rc;
3552         ENTRY;
3553
3554         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3555                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3556                 RETURN(-EINVAL);
3557         }
3558
3559         if (strncmp(param, PARAM_SRPC_UDESC,
3560                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3561                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3562         }
3563
3564         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3565                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3566                 RETURN(-EINVAL);
3567         }
3568
3569         param += sizeof(PARAM_SRPC_FLVR) - 1;
3570
3571         rc = sptlrpc_parse_rule(param, &rule);
3572         if (rc)
3573                 RETURN(rc);
3574
3575         /* mgs rules implies must be mgc->mgs */
3576         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3577                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3578                      rule.sr_from != LUSTRE_SP_ANY) ||
3579                     (rule.sr_to != LUSTRE_SP_MGS &&
3580                      rule.sr_to != LUSTRE_SP_ANY))
3581                         RETURN(-EINVAL);
3582         }
3583
3584         /* preapre room for this coming rule. svcname format should be:
3585          * - fsname: general rule
3586          * - fsname-tgtname: target-specific rule
3587          */
3588         if (strchr(svname, '-')) {
3589                 struct mgs_tgt_srpc_conf *tgtconf;
3590                 int                       found = 0;
3591
3592                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3593                      tgtconf = tgtconf->mtsc_next) {
3594                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3595                                 found = 1;
3596                                 break;
3597                         }
3598                 }
3599
3600                 if (!found) {
3601                         int name_len;
3602
3603                         OBD_ALLOC_PTR(tgtconf);
3604                         if (tgtconf == NULL)
3605                                 RETURN(-ENOMEM);
3606
3607                         name_len = strlen(svname);
3608
3609                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3610                         if (tgtconf->mtsc_tgt == NULL) {
3611                                 OBD_FREE_PTR(tgtconf);
3612                                 RETURN(-ENOMEM);
3613                         }
3614                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3615
3616                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3617                         fsdb->fsdb_srpc_tgt = tgtconf;
3618                 }
3619
3620                 rset = &tgtconf->mtsc_rset;
3621         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3622                 /* put _mgs related srpc rule directly in mgs ruleset */
3623                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3624         } else {
3625                 rset = &fsdb->fsdb_srpc_gen;
3626         }
3627
3628         rc = sptlrpc_rule_set_merge(rset, &rule);
3629
3630         RETURN(rc);
3631 }
3632
3633 static int mgs_srpc_set_param(const struct lu_env *env,
3634                               struct mgs_device *mgs,
3635                               struct fs_db *fsdb,
3636                               struct mgs_target_info *mti,
3637                               char *param)
3638 {
3639         char                   *copy;
3640         int                     rc, copy_size;
3641         ENTRY;
3642
3643 #ifndef HAVE_GSS
3644         RETURN(-EINVAL);
3645 #endif
3646         /* keep a copy of original param, which could be destroied
3647          * during parsing */
3648         copy_size = strlen(param) + 1;
3649         OBD_ALLOC(copy, copy_size);
3650         if (copy == NULL)
3651                 return -ENOMEM;
3652         memcpy(copy, param, copy_size);
3653
3654         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3655         if (rc)
3656                 goto out_free;
3657
3658         /* previous steps guaranteed the syntax is correct */
3659         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3660         if (rc)
3661                 goto out_free;
3662
3663         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3664                 /*
3665                  * for mgs rules, make them effective immediately.
3666                  */
3667                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3668                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3669                                                  &fsdb->fsdb_srpc_gen);
3670         }
3671
3672 out_free:
3673         OBD_FREE(copy, copy_size);
3674         RETURN(rc);
3675 }
3676
3677 struct mgs_srpc_read_data {
3678         struct fs_db   *msrd_fsdb;
3679         int             msrd_skip;
3680 };
3681
3682 static int mgs_srpc_read_handler(const struct lu_env *env,
3683                                  struct llog_handle *llh,
3684                                  struct llog_rec_hdr *rec, void *data)
3685 {
3686         struct mgs_srpc_read_data *msrd = data;
3687         struct cfg_marker         *marker;
3688         struct lustre_cfg         *lcfg = REC_DATA(rec);
3689         char                      *svname, *param;
3690         int                        cfg_len, rc;
3691         ENTRY;
3692
3693         if (rec->lrh_type != OBD_CFG_REC) {
3694                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3695                 RETURN(-EINVAL);
3696         }
3697
3698         cfg_len = REC_DATA_LEN(rec);
3699
3700         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3701         if (rc) {
3702                 CERROR("Insane cfg\n");
3703                 RETURN(rc);
3704         }
3705
3706         if (lcfg->lcfg_command == LCFG_MARKER) {
3707                 marker = lustre_cfg_buf(lcfg, 1);
3708
3709                 if (marker->cm_flags & CM_START &&
3710                     marker->cm_flags & CM_SKIP)
3711                         msrd->msrd_skip = 1;
3712                 if (marker->cm_flags & CM_END)
3713                         msrd->msrd_skip = 0;
3714
3715                 RETURN(0);
3716         }
3717
3718         if (msrd->msrd_skip)
3719                 RETURN(0);
3720
3721         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3722                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3723                 RETURN(0);
3724         }
3725
3726         svname = lustre_cfg_string(lcfg, 0);
3727         if (svname == NULL) {
3728                 CERROR("svname is empty\n");
3729                 RETURN(0);
3730         }
3731
3732         param = lustre_cfg_string(lcfg, 1);
3733         if (param == NULL) {
3734                 CERROR("param is empty\n");
3735                 RETURN(0);
3736         }
3737
3738         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3739         if (rc)
3740                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3741
3742         RETURN(0);
3743 }
3744
3745 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3746                                 struct mgs_device *mgs,
3747                                 struct fs_db *fsdb)
3748 {
3749         struct llog_handle        *llh = NULL;
3750         struct llog_ctxt          *ctxt;
3751         char                      *logname;
3752         struct mgs_srpc_read_data  msrd;
3753         int                        rc;
3754         ENTRY;
3755
3756         /* construct log name */
3757         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3758         if (rc)
3759                 RETURN(rc);
3760
3761         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3762         LASSERT(ctxt != NULL);
3763
3764         if (mgs_log_is_empty(env, mgs, logname))
3765                 GOTO(out, rc = 0);
3766
3767         rc = llog_open(env, ctxt, &llh, NULL, logname,
3768                        LLOG_OPEN_EXISTS);
3769         if (rc < 0) {
3770                 if (rc == -ENOENT)
3771                         rc = 0;
3772                 GOTO(out, rc);
3773         }
3774
3775         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3776         if (rc)
3777                 GOTO(out_close, rc);
3778
3779         if (llog_get_size(llh) <= 1)
3780                 GOTO(out_close, rc = 0);
3781
3782         msrd.msrd_fsdb = fsdb;
3783         msrd.msrd_skip = 0;
3784
3785         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3786                           NULL);
3787
3788 out_close:
3789         llog_close(env, llh);
3790 out:
3791         llog_ctxt_put(ctxt);
3792         name_destroy(&logname);
3793
3794         if (rc)
3795                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3796         RETURN(rc);
3797 }
3798
3799 static int mgs_write_log_param2(const struct lu_env *env,
3800                                 struct mgs_device *mgs,
3801                                 struct fs_db *fsdb,
3802                                 struct mgs_target_info *mti, char *ptr)
3803 {
3804         struct lustre_cfg_bufs bufs;
3805         int rc;
3806
3807         ENTRY;
3808         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3809
3810         /* PARAM_MGSNODE and PARAM_NETWORK are set only when formating
3811          * or during the inital mount. It can never change after that.
3812          */
3813         if (!class_match_param(ptr, PARAM_MGSNODE, NULL) ||
3814             !class_match_param(ptr, PARAM_NETWORK, NULL)) {
3815                 rc = 0;
3816                 goto end;
3817         }
3818
3819         /* Processed in mgs_write_log_ost. Another value that can't
3820          * be changed by lctl set_param -P.
3821          */
3822         if (!class_match_param(ptr, PARAM_FAILMODE, NULL)) {
3823                 LCONSOLE_ERROR_MSG(0x169,
3824                                    "%s can only be changed with tunefs.lustre and --writeconf\n",
3825                                    ptr);
3826                 rc = -EPERM;
3827                 goto end;
3828         }
3829
3830         /* FIXME !!! Support for sptlrpc is incomplete. Currently the change
3831          * doesn't transmit to the client. See LU-7183.
3832          */
3833         if (!class_match_param(ptr, PARAM_SRPC, NULL)) {
3834                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3835                 goto end;
3836         }
3837
3838         /* Can't use class_match_param since ptr doesn't start with
3839          * PARAM_FAILNODE. So we look for PARAM_FAILNODE contained in ptr.
3840          */
3841         if (strstr(ptr, PARAM_FAILNODE)) {
3842                 /* Add a failover nidlist. We already processed failovers
3843                  * params for new targets in mgs_write_log_target.
3844                  */
3845                 const char *param;
3846
3847                 /* can't use wildcards with failover.node */
3848                 if (strchr(ptr, '*')) {
3849                         rc = -ENODEV;
3850                         goto end;
3851                 }
3852
3853                 param = strstr(ptr, PARAM_FAILNODE);
3854                 if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
3855                     sizeof(mti->mti_params)) {
3856                         rc = -E2BIG;
3857                         goto end;
3858                 }
3859
3860                 CDEBUG(D_MGS, "Adding failnode with param %s\n",
3861                        mti->mti_params);
3862                 rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3863                 goto end;
3864         }
3865
3866         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
3867                           mti->mti_svname, ptr);
3868 end:
3869         RETURN(rc);
3870 }
3871
3872 /* Permanent settings of all parameters by writing into the appropriate
3873  * configuration logs.
3874  * A parameter with null value ("<param>='\0'") means to erase it out of
3875  * the logs.
3876  */
3877 static int mgs_write_log_param(const struct lu_env *env,
3878                                struct mgs_device *mgs, struct fs_db *fsdb,
3879                                struct mgs_target_info *mti, char *ptr)
3880 {
3881         struct mgs_thread_info *mgi = mgs_env_info(env);
3882         char *logname;
3883         char *tmp;
3884         int rc = 0;
3885         ENTRY;
3886
3887         /* For various parameter settings, we have to figure out which logs
3888            care about them (e.g. both mdt and client for lov settings) */
3889         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3890
3891         /* The params are stored in MOUNT_DATA_FILE and modified via
3892            tunefs.lustre, or set using lctl conf_param */
3893
3894         /* Processed in lustre_start_mgc */
3895         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3896                 GOTO(end, rc);
3897
3898         /* Processed in ost/mdt */
3899         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3900                 GOTO(end, rc);
3901
3902         /* Processed in mgs_write_log_ost */
3903         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3904                 if (mti->mti_flags & LDD_F_PARAM) {
3905                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3906                                            "changed with tunefs.lustre"
3907                                            "and --writeconf\n", ptr);
3908                         rc = -EPERM;
3909                 }
3910                 GOTO(end, rc);
3911         }
3912
3913         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3914                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3915                 GOTO(end, rc);
3916         }
3917
3918         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3919                 /* Add a failover nidlist */
3920                 rc = 0;
3921                 /* We already processed failovers params for new
3922                    targets in mgs_write_log_target */
3923                 if (mti->mti_flags & LDD_F_PARAM) {
3924                         CDEBUG(D_MGS, "Adding failnode\n");
3925                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3926                 }
3927                 GOTO(end, rc);
3928         }
3929
3930         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3931                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3932                 GOTO(end, rc);
3933         }
3934
3935         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3936                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3937                 GOTO(end, rc);
3938         }
3939
3940         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3941             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3942                 /* active=0 means off, anything else means on */
3943                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3944                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3945                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3946                 int i;
3947
3948                 if (!deactive_osc) {
3949                         __u32   index;
3950
3951                         rc = server_name2index(mti->mti_svname, &index, NULL);
3952                         if (rc < 0)
3953                                 GOTO(end, rc);
3954
3955                         if (index == 0) {
3956                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3957                                                    " (de)activated.\n",
3958                                                    mti->mti_svname);
3959                                 GOTO(end, rc = -EPERM);
3960                         }
3961                 }
3962
3963                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3964                               flag ? "de" : "re", mti->mti_svname);
3965                 /* Modify clilov */
3966                 rc = name_create(&logname, mti->mti_fsname, "-client");
3967                 if (rc < 0)
3968                         GOTO(end, rc);
3969                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3970                                 mti->mti_svname,
3971                                 deactive_osc ? "add osc" : "add mdc", flag);
3972                 name_destroy(&logname);
3973                 if (rc < 0)
3974                         goto active_err;
3975
3976                 /* Modify mdtlov */
3977                 /* Add to all MDT logs for DNE */
3978                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3979                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3980                                 continue;
3981                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3982                         if (rc < 0)
3983                                 GOTO(end, rc);
3984                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3985                                         mti->mti_svname,
3986                                         deactive_osc ? "add osc" : "add osp",
3987                                         flag);
3988                         name_destroy(&logname);
3989                         if (rc < 0)
3990                                 goto active_err;
3991                 }
3992 active_err:
3993                 if (rc < 0) {
3994                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3995                                            "log (%d). No permanent "
3996                                            "changes were made to the "
3997                                            "config log.\n",
3998                                            mti->mti_svname, rc);
3999                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
4000                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
4001                                                    " because the log"
4002                                                    "is in the old 1.4"
4003                                                    "style. Consider "
4004                                                    " --writeconf to "
4005                                                    "update the logs.\n");
4006                         GOTO(end, rc);
4007                 }
4008                 /* Fall through to osc/mdc proc for deactivating live
4009                    OSC/OSP on running MDT / clients. */
4010         }
4011         /* Below here, let obd's XXX_process_config methods handle it */
4012
4013         /* All lov. in proc */
4014         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
4015                 char *mdtlovname;
4016
4017                 CDEBUG(D_MGS, "lov param %s\n", ptr);
4018                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
4019                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
4020                                            "set on the MDT, not %s. "
4021                                            "Ignoring.\n",
4022                                            mti->mti_svname);
4023                         GOTO(end, rc = 0);
4024                 }
4025
4026                 /* Modify mdtlov */
4027                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4028                         GOTO(end, rc = -ENODEV);
4029
4030                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
4031                                              mti->mti_stripe_index);
4032                 if (rc)
4033                         GOTO(end, rc);
4034                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4035                                   &mgi->mgi_bufs, mdtlovname, ptr);
4036                 name_destroy(&logname);
4037                 name_destroy(&mdtlovname);
4038                 if (rc)
4039                         GOTO(end, rc);
4040
4041                 /* Modify clilov */
4042                 rc = name_create(&logname, mti->mti_fsname, "-client");
4043                 if (rc)
4044                         GOTO(end, rc);
4045                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4046                                   fsdb->fsdb_clilov, ptr);
4047                 name_destroy(&logname);
4048                 GOTO(end, rc);
4049         }
4050
4051         /* All osc., mdc., llite. params in proc */
4052         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
4053             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
4054             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
4055                 char *cname;
4056
4057                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
4058                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
4059                                            " cannot be modified. Consider"
4060                                            " updating the configuration with"
4061                                            " --writeconf\n",
4062                                            mti->mti_svname);
4063                         GOTO(end, rc = -EINVAL);
4064                 }
4065                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
4066                         rc = name_create(&cname, mti->mti_fsname, "-client");
4067                         /* Add the client type to match the obdname in
4068                            class_config_llog_handler */
4069                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4070                         rc = name_create(&cname, mti->mti_svname, "-mdc");
4071                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4072                         rc = name_create(&cname, mti->mti_svname, "-osc");
4073                 } else {
4074                         GOTO(end, rc = -EINVAL);
4075                 }
4076                 if (rc)
4077                         GOTO(end, rc);
4078
4079                 /* Forbid direct update of llite root squash parameters.
4080                  * These parameters are indirectly set via the MDT settings.
4081                  * See (LU-1778) */
4082                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
4083                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4084                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4085                         LCONSOLE_ERROR("%s: root squash parameters can only "
4086                                 "be updated through MDT component\n",
4087                                 mti->mti_fsname);
4088                         name_destroy(&cname);
4089                         GOTO(end, rc = -EINVAL);
4090                 }
4091
4092                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4093
4094                 /* Modify client */
4095                 rc = name_create(&logname, mti->mti_fsname, "-client");
4096                 if (rc) {
4097                         name_destroy(&cname);
4098                         GOTO(end, rc);
4099                 }
4100                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
4101                                   cname, ptr);
4102
4103                 /* osc params affect the MDT as well */
4104                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
4105                         int i;
4106
4107                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
4108                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4109                                         continue;
4110                                 name_destroy(&cname);
4111                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
4112                                                          fsdb, i);
4113                                 name_destroy(&logname);
4114                                 if (rc)
4115                                         break;
4116                                 rc = name_create_mdt(&logname,
4117                                                      mti->mti_fsname, i);
4118                                 if (rc)
4119                                         break;
4120                                 if (!mgs_log_is_empty(env, mgs, logname)) {
4121                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
4122                                                           mti, logname,
4123                                                           &mgi->mgi_bufs,
4124                                                           cname, ptr);
4125                                         if (rc)
4126                                                 break;
4127                                 }
4128                         }
4129                 }
4130
4131                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
4132                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
4133                     rc == 0) {
4134                         char suffix[16];
4135                         char *lodname = NULL;
4136                         char *param_str = NULL;
4137                         int i;
4138                         int index;
4139
4140                         /* replace mdc with osp */
4141                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
4142                         rc = server_name2index(mti->mti_svname, &index, NULL);
4143                         if (rc < 0) {
4144                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4145                                 GOTO(end, rc);
4146                         }
4147
4148                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4149                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
4150                                         continue;
4151
4152                                 if (i == index)
4153                                         continue;
4154
4155                                 name_destroy(&logname);
4156                                 rc = name_create_mdt(&logname, mti->mti_fsname,
4157                                                      i);
4158                                 if (rc < 0)
4159                                         break;
4160
4161                                 if (mgs_log_is_empty(env, mgs, logname))
4162                                         continue;
4163
4164                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
4165                                          i);
4166                                 name_destroy(&cname);
4167                                 rc = name_create(&cname, mti->mti_svname,
4168                                                  suffix);
4169                                 if (rc < 0)
4170                                         break;
4171
4172                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4173                                                   &mgi->mgi_bufs, cname, ptr);
4174                                 if (rc < 0)
4175                                         break;
4176
4177                                 /* Add configuration log for noitfying LOD
4178                                  * to active/deactive the OSP. */
4179                                 name_destroy(&param_str);
4180                                 rc = name_create(&param_str, cname,
4181                                                  (*tmp == '0') ?  ".active=0" :
4182                                                  ".active=1");
4183                                 if (rc < 0)
4184                                         break;
4185
4186                                 name_destroy(&lodname);
4187                                 rc = name_create(&lodname, logname, "-mdtlov");
4188                                 if (rc < 0)
4189                                         break;
4190
4191                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4192                                                   &mgi->mgi_bufs, lodname,
4193                                                   param_str);
4194                                 if (rc < 0)
4195                                         break;
4196                         }
4197                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
4198                         name_destroy(&lodname);
4199                         name_destroy(&param_str);
4200                 }
4201
4202                 name_destroy(&logname);
4203                 name_destroy(&cname);
4204                 GOTO(end, rc);
4205         }
4206
4207         /* All mdt. params in proc */
4208         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
4209                 int i;
4210                 __u32 idx;
4211
4212                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4213                 if (strncmp(mti->mti_svname, mti->mti_fsname,
4214                             MTI_NAME_MAXLEN) == 0)
4215                         /* device is unspecified completely? */
4216                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
4217                 else
4218                         rc = server_name2index(mti->mti_svname, &idx, NULL);
4219                 if (rc < 0)
4220                         goto active_err;
4221                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
4222                         goto active_err;
4223                 if (rc & LDD_F_SV_ALL) {
4224                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4225                                 if (!test_bit(i,
4226                                                   fsdb->fsdb_mdt_index_map))
4227                                         continue;
4228                                 rc = name_create_mdt(&logname,
4229                                                 mti->mti_fsname, i);
4230                                 if (rc)
4231                                         goto active_err;
4232                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4233                                                   logname, &mgi->mgi_bufs,
4234                                                   logname, ptr);
4235                                 name_destroy(&logname);
4236                                 if (rc)
4237                                         goto active_err;
4238                         }
4239                 } else {
4240                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
4241                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
4242                                 LCONSOLE_ERROR("%s: root squash parameters "
4243                                         "cannot be applied to a single MDT\n",
4244                                         mti->mti_fsname);
4245                                 GOTO(end, rc = -EINVAL);
4246                         }
4247                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
4248                                           mti->mti_svname, &mgi->mgi_bufs,
4249                                           mti->mti_svname, ptr);
4250                         if (rc)
4251                                 goto active_err;
4252                 }
4253
4254                 /* root squash settings are also applied to llite
4255                  * config log (see LU-1778) */
4256                 if (rc == 0 &&
4257                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
4258                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
4259                         char *cname;
4260                         char *ptr2;
4261
4262                         rc = name_create(&cname, mti->mti_fsname, "-client");
4263                         if (rc)
4264                                 GOTO(end, rc);
4265                         rc = name_create(&logname, mti->mti_fsname, "-client");
4266                         if (rc) {
4267                                 name_destroy(&cname);
4268                                 GOTO(end, rc);
4269                         }
4270                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
4271                         if (rc) {
4272                                 name_destroy(&cname);
4273                                 name_destroy(&logname);
4274                                 GOTO(end, rc);
4275                         }
4276                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
4277                                           &mgi->mgi_bufs, cname, ptr2);
4278                         name_destroy(&ptr2);
4279                         name_destroy(&logname);
4280                         name_destroy(&cname);
4281                 }
4282                 GOTO(end, rc);
4283         }
4284
4285         /* All mdd., ost. and osd. params in proc */
4286         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
4287             (class_match_param(ptr, PARAM_LOD, NULL) == 0) ||
4288             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
4289             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
4290                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4291                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4292                         GOTO(end, rc = -ENODEV);
4293
4294                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4295                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
4296                 GOTO(end, rc);
4297         }
4298
4299         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
4300
4301 end:
4302         if (rc)
4303                 CERROR("err %d on param '%s'\n", rc, ptr);
4304
4305         RETURN(rc);
4306 }
4307
4308 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
4309                          struct mgs_target_info *mti, struct fs_db *fsdb)
4310 {
4311         char    *buf, *params;
4312         int      rc = -EINVAL;
4313
4314         ENTRY;
4315
4316         /* set/check the new target index */
4317         rc = mgs_set_index(env, mgs, mti);
4318         if (rc < 0)
4319                 RETURN(rc);
4320
4321         if (rc == EALREADY) {
4322                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
4323                               mti->mti_stripe_index, mti->mti_svname);
4324                 /* We would like to mark old log sections as invalid
4325                    and add new log sections in the client and mdt logs.
4326                    But if we add new sections, then live clients will
4327                    get repeat setup instructions for already running
4328                    osc's. So don't update the client/mdt logs. */
4329                 mti->mti_flags &= ~LDD_F_UPDATE;
4330                 rc = 0;
4331         }
4332
4333         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
4334                          cfs_fail_val : 10);
4335
4336         mutex_lock(&fsdb->fsdb_mutex);
4337
4338         if (mti->mti_flags & (LDD_F_VIRGIN | LDD_F_WRITECONF)) {
4339                 /* Generate a log from scratch */
4340                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4341                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
4342                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4343                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
4344                 } else {
4345                         CERROR("Unknown target type %#x, can't create log for %s\n",
4346                                mti->mti_flags, mti->mti_svname);
4347                 }
4348                 if (rc) {
4349                         CERROR("Can't write logs for %s (%d)\n",
4350                                mti->mti_svname, rc);
4351                         GOTO(out_up, rc);
4352                 }
4353         } else {
4354                 /* Just update the params from tunefs in mgs_write_log_params */
4355                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
4356                 mti->mti_flags |= LDD_F_PARAM;
4357         }
4358
4359         /* allocate temporary buffer, where class_get_next_param will
4360            make copy of a current  parameter */
4361         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
4362         if (buf == NULL)
4363                 GOTO(out_up, rc = -ENOMEM);
4364         params = mti->mti_params;
4365         while (params != NULL) {
4366                 rc = class_get_next_param(&params, buf);
4367                 if (rc) {
4368                         if (rc == 1)
4369                                 /* there is no next parameter, that is
4370                                    not an error */
4371                                 rc = 0;
4372                         break;
4373                 }
4374                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
4375                        params, buf);
4376                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
4377                 if (rc)
4378                         break;
4379         }
4380
4381         OBD_FREE(buf, strlen(mti->mti_params) + 1);
4382
4383 out_up:
4384         mutex_unlock(&fsdb->fsdb_mutex);
4385         RETURN(rc);
4386 }
4387
4388 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
4389 {
4390         struct llog_ctxt        *ctxt;
4391         int                      rc = 0;
4392
4393         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4394         if (ctxt == NULL) {
4395                 CERROR("%s: MGS config context doesn't exist\n",
4396                        mgs->mgs_obd->obd_name);
4397                 rc = -ENODEV;
4398         } else {
4399                 rc = llog_erase(env, ctxt, NULL, name);
4400                 /* llog may not exist */
4401                 if (rc == -ENOENT)
4402                         rc = 0;
4403                 llog_ctxt_put(ctxt);
4404         }
4405
4406         if (rc)
4407                 CERROR("%s: failed to clear log %s: %d\n",
4408                        mgs->mgs_obd->obd_name, name, rc);
4409
4410         return rc;
4411 }
4412
4413 /* erase all logs for the given fs */
4414 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs,
4415                    const char *fsname)
4416 {
4417         struct list_head log_list;
4418         struct mgs_direntry *dirent, *n;
4419         char barrier_name[20] = {};
4420         char *suffix;
4421         int count = 0;
4422         int rc, len = strlen(fsname);
4423         ENTRY;
4424
4425         mutex_lock(&mgs->mgs_mutex);
4426
4427         /* Find all the logs in the CONFIGS directory */
4428         rc = class_dentry_readdir(env, mgs, &log_list);
4429         if (rc) {
4430                 mutex_unlock(&mgs->mgs_mutex);
4431                 RETURN(rc);
4432         }
4433
4434         if (list_empty(&log_list)) {
4435                 mutex_unlock(&mgs->mgs_mutex);
4436                 RETURN(-ENOENT);
4437         }
4438
4439         snprintf(barrier_name, sizeof(barrier_name) - 1, "%s-%s",
4440                  fsname, BARRIER_FILENAME);
4441         /* Delete the barrier fsdb */
4442         mgs_remove_fsdb_by_name(mgs, barrier_name);
4443         /* Delete the fs db */
4444         mgs_remove_fsdb_by_name(mgs, fsname);
4445         mutex_unlock(&mgs->mgs_mutex);
4446
4447         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4448                 list_del_init(&dirent->mde_list);
4449                 suffix = strrchr(dirent->mde_name, '-');
4450                 if (suffix != NULL) {
4451                         if ((len == suffix - dirent->mde_name) &&
4452                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
4453                                 CDEBUG(D_MGS, "Removing log %s\n",
4454                                        dirent->mde_name);
4455                                 mgs_erase_log(env, mgs, dirent->mde_name);
4456                                 count++;
4457                         }
4458                 }
4459                 mgs_direntry_free(dirent);
4460         }
4461
4462         if (count == 0)
4463                 rc = -ENOENT;
4464
4465         RETURN(rc);
4466 }
4467
4468 /* list all logs for the given fs */
4469 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
4470                   struct obd_ioctl_data *data)
4471 {
4472         struct list_head         log_list;
4473         struct mgs_direntry     *dirent, *n;
4474         char                    *out, *suffix, prefix[] = "config_log: ";
4475         int                      prefix_len = strlen(prefix);
4476         int                      l, remains, start = 0, rc;
4477
4478         ENTRY;
4479
4480         /* Find all the logs in the CONFIGS directory */
4481         rc = class_dentry_readdir(env, mgs, &log_list);
4482         if (rc)
4483                 RETURN(rc);
4484
4485         out = data->ioc_bulk;
4486         remains = data->ioc_inllen1;
4487         /* OBD_FAIL: fetch the config_log records from the specified one */
4488         if (OBD_FAIL_CHECK(OBD_FAIL_CATLIST))
4489                 data->ioc_count = cfs_fail_val;
4490
4491         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4492                 list_del_init(&dirent->mde_list);
4493                 suffix = strrchr(dirent->mde_name, '-');
4494                 if (suffix != NULL) {
4495                         l = prefix_len + dirent->mde_len + 1;
4496                         if (remains - 1 < 0) {
4497                                 /* No enough space for this record */
4498                                 mgs_direntry_free(dirent);
4499                                 goto out;
4500                         }
4501                         start++;
4502                         if (start < data->ioc_count) {
4503                                 mgs_direntry_free(dirent);
4504                                 continue;
4505                         }
4506                         l = scnprintf(out, remains, "%s%s\n", prefix,
4507                                      dirent->mde_name);
4508                         out += l;
4509                         remains -= l;
4510                 }
4511                 mgs_direntry_free(dirent);
4512                 if (remains == 0)
4513                         /* Full */
4514                         goto out;
4515         }
4516         /* Finished */
4517         start = 0;
4518 out:
4519         data->ioc_count = start;
4520         RETURN(rc);
4521 }
4522
4523 struct mgs_lcfg_fork_data {
4524         struct lustre_cfg_bufs   mlfd_bufs;
4525         struct mgs_device       *mlfd_mgs;
4526         struct llog_handle      *mlfd_llh;
4527         const char              *mlfd_oldname;
4528         const char              *mlfd_newname;
4529         char                     mlfd_data[0];
4530 };
4531
4532 static bool contain_valid_fsname(char *buf, const char *fsname,
4533                                  int buflen, int namelen)
4534 {
4535         if (buflen < namelen)
4536                 return false;
4537
4538         if (memcmp(buf, fsname, namelen) != 0)
4539                 return false;
4540
4541         if (buf[namelen] != '\0' && buf[namelen] != '-')
4542                 return false;
4543
4544         return true;
4545 }
4546
4547 static int mgs_lcfg_fork_handler(const struct lu_env *env,
4548                                  struct llog_handle *o_llh,
4549                                  struct llog_rec_hdr *o_rec, void *data)
4550 {
4551         struct mgs_lcfg_fork_data *mlfd = data;
4552         struct lustre_cfg_bufs *n_bufs = &mlfd->mlfd_bufs;
4553         struct lustre_cfg *o_lcfg = (struct lustre_cfg *)(o_rec + 1);
4554         struct llog_cfg_rec *lcr;
4555         char *o_buf;
4556         char *n_buf = mlfd->mlfd_data;
4557         int o_buflen;
4558         int o_namelen = strlen(mlfd->mlfd_oldname);
4559         int n_namelen = strlen(mlfd->mlfd_newname);
4560         int diff = n_namelen - o_namelen;
4561         __u32 cmd = o_lcfg->lcfg_command;
4562         __u32 cnt = o_lcfg->lcfg_bufcount;
4563         int rc;
4564         int i;
4565         ENTRY;
4566
4567         /* buf[0] */
4568         o_buf = lustre_cfg_buf(o_lcfg, 0);
4569         o_buflen = o_lcfg->lcfg_buflens[0];
4570         if (contain_valid_fsname(o_buf, mlfd->mlfd_oldname, o_buflen,
4571                                  o_namelen)) {
4572                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4573                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4574                        o_buflen - o_namelen);
4575                 lustre_cfg_bufs_reset(n_bufs, n_buf);
4576                 n_buf += cfs_size_round(o_buflen + diff);
4577         } else {
4578                 lustre_cfg_bufs_reset(n_bufs, o_buflen != 0 ? o_buf : NULL);
4579         }
4580
4581         switch (cmd) {
4582         case LCFG_MARKER: {
4583                 struct cfg_marker *o_marker;
4584                 struct cfg_marker *n_marker;
4585                 int tgt_namelen;
4586
4587                 if (cnt != 2) {
4588                         CDEBUG(D_MGS, "Unknown cfg marker entry with %d "
4589                                "buffers\n", cnt);
4590                         RETURN(-EINVAL);
4591                 }
4592
4593                 /* buf[1] is marker */
4594                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4595                 o_buflen = o_lcfg->lcfg_buflens[1];
4596                 o_marker = (struct cfg_marker *)o_buf;
4597                 if (!contain_valid_fsname(o_marker->cm_tgtname,
4598                                           mlfd->mlfd_oldname,
4599                                           sizeof(o_marker->cm_tgtname),
4600                                           o_namelen)) {
4601                         lustre_cfg_bufs_set(n_bufs, 1, o_marker,
4602                                             sizeof(*o_marker));
4603                         break;
4604                 }
4605
4606                 n_marker = (struct cfg_marker *)n_buf;
4607                 *n_marker = *o_marker;
4608                 memcpy(n_marker->cm_tgtname, mlfd->mlfd_newname, n_namelen);
4609                 tgt_namelen = strlen(o_marker->cm_tgtname);
4610                 if (tgt_namelen > o_namelen)
4611                         memcpy(n_marker->cm_tgtname + n_namelen,
4612                                o_marker->cm_tgtname + o_namelen,
4613                                tgt_namelen - o_namelen);
4614                 n_marker->cm_tgtname[tgt_namelen + diff] = '\0';
4615                 lustre_cfg_bufs_set(n_bufs, 1, n_marker, sizeof(*n_marker));
4616                 break;
4617         }
4618         case LCFG_PARAM:
4619         case LCFG_SET_PARAM: {
4620                 for (i = 1; i < cnt; i++)
4621                         /* buf[i] is the param value, reuse it directly */
4622                         lustre_cfg_bufs_set(n_bufs, i,
4623                                             lustre_cfg_buf(o_lcfg, i),
4624                                             o_lcfg->lcfg_buflens[i]);
4625                 break;
4626         }
4627         case LCFG_POOL_NEW:
4628         case LCFG_POOL_ADD:
4629         case LCFG_POOL_REM:
4630         case LCFG_POOL_DEL: {
4631                 if (cnt < 3 || cnt > 4) {
4632                         CDEBUG(D_MGS, "Unknown cfg pool (%x) entry with %d "
4633                                "buffers\n", cmd, cnt);
4634                         RETURN(-EINVAL);
4635                 }
4636
4637                 /* buf[1] is fsname */
4638                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4639                 o_buflen = o_lcfg->lcfg_buflens[1];
4640                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4641                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4642                        o_buflen - o_namelen);
4643                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen + diff);
4644                 n_buf += cfs_size_round(o_buflen + diff);
4645
4646                 /* buf[2] is the pool name, reuse it directly */
4647                 lustre_cfg_bufs_set(n_bufs, 2, lustre_cfg_buf(o_lcfg, 2),
4648                                     o_lcfg->lcfg_buflens[2]);
4649
4650                 if (cnt == 3)
4651                         break;
4652
4653                 /* buf[3] is ostname */
4654                 o_buf = lustre_cfg_buf(o_lcfg, 3);
4655                 o_buflen = o_lcfg->lcfg_buflens[3];
4656                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4657                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4658                        o_buflen - o_namelen);
4659                 lustre_cfg_bufs_set(n_bufs, 3, n_buf, o_buflen + diff);
4660                 break;
4661         }
4662         case LCFG_SETUP: {
4663                 if (cnt == 2) {
4664                         o_buflen = o_lcfg->lcfg_buflens[1];
4665                         if (o_buflen == sizeof(struct lov_desc) ||
4666                             o_buflen == sizeof(struct lmv_desc)) {
4667                                 char *o_uuid;
4668                                 char *n_uuid;
4669                                 int uuid_len;
4670
4671                                 /* buf[1] */
4672                                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4673                                 if (o_buflen == sizeof(struct lov_desc)) {
4674                                         struct lov_desc *o_desc =
4675                                                 (struct lov_desc *)o_buf;
4676                                         struct lov_desc *n_desc =
4677                                                 (struct lov_desc *)n_buf;
4678
4679                                         *n_desc = *o_desc;
4680                                         o_uuid = o_desc->ld_uuid.uuid;
4681                                         n_uuid = n_desc->ld_uuid.uuid;
4682                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4683                                 } else {
4684                                         struct lmv_desc *o_desc =
4685                                                 (struct lmv_desc *)o_buf;
4686                                         struct lmv_desc *n_desc =
4687                                                 (struct lmv_desc *)n_buf;
4688
4689                                         *n_desc = *o_desc;
4690                                         o_uuid = o_desc->ld_uuid.uuid;
4691                                         n_uuid = n_desc->ld_uuid.uuid;
4692                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4693                                 }
4694
4695                                 if (unlikely(!contain_valid_fsname(o_uuid,
4696                                                 mlfd->mlfd_oldname, uuid_len,
4697                                                 o_namelen))) {
4698                                         lustre_cfg_bufs_set(n_bufs, 1, o_buf,
4699                                                             o_buflen);
4700                                         break;
4701                                 }
4702
4703                                 memcpy(n_uuid, mlfd->mlfd_newname, n_namelen);
4704                                 uuid_len = strlen(o_uuid);
4705                                 if (uuid_len > o_namelen)
4706                                         memcpy(n_uuid + n_namelen,
4707                                                o_uuid + o_namelen,
4708                                                uuid_len - o_namelen);
4709                                 n_uuid[uuid_len + diff] = '\0';
4710                                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen);
4711                                 break;
4712                         } /* else case fall through */
4713                 } /* else case fall through */
4714         }
4715         default: {
4716                 for (i = 1; i < cnt; i++) {
4717                         o_buflen = o_lcfg->lcfg_buflens[i];
4718                         if (o_buflen == 0)
4719                                 continue;
4720
4721                         o_buf = lustre_cfg_buf(o_lcfg, i);
4722                         if (!contain_valid_fsname(o_buf, mlfd->mlfd_oldname,
4723                                                   o_buflen, o_namelen)) {
4724                                 lustre_cfg_bufs_set(n_bufs, i, o_buf, o_buflen);
4725                                 continue;
4726                         }
4727
4728                         memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4729                         if (o_buflen == o_namelen) {
4730                                 lustre_cfg_bufs_set(n_bufs, i, n_buf,
4731                                                     n_namelen);
4732                                 n_buf += cfs_size_round(n_namelen);
4733                                 continue;
4734                         }
4735
4736                         memcpy(n_buf + n_namelen, o_buf + o_namelen,
4737                                o_buflen - o_namelen);
4738                         lustre_cfg_bufs_set(n_bufs, i, n_buf, o_buflen + diff);
4739                         n_buf += cfs_size_round(o_buflen + diff);
4740                 }
4741                 break;
4742         }
4743         }
4744
4745         lcr = lustre_cfg_rec_new(cmd, n_bufs);
4746         if (!lcr)
4747                 RETURN(-ENOMEM);
4748
4749         lcr->lcr_cfg = *o_lcfg;
4750         rc = llog_write(env, mlfd->mlfd_llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
4751         lustre_cfg_rec_free(lcr);
4752
4753         RETURN(rc);
4754 }
4755
4756 static int mgs_lcfg_fork_one(const struct lu_env *env, struct mgs_device *mgs,
4757                              struct mgs_direntry *mde, const char *oldname,
4758                              const char *newname)
4759 {
4760         struct llog_handle *old_llh = NULL;
4761         struct llog_handle *new_llh = NULL;
4762         struct llog_ctxt *ctxt = NULL;
4763         struct mgs_lcfg_fork_data *mlfd = NULL;
4764         char *name_buf = NULL;
4765         int name_buflen;
4766         int old_namelen = strlen(oldname);
4767         int new_namelen = strlen(newname);
4768         int rc;
4769         ENTRY;
4770
4771         name_buflen = mde->mde_len + new_namelen - old_namelen;
4772         OBD_ALLOC(name_buf, name_buflen);
4773         if (!name_buf)
4774                 RETURN(-ENOMEM);
4775
4776         memcpy(name_buf, newname, new_namelen);
4777         memcpy(name_buf + new_namelen, mde->mde_name + old_namelen,
4778                mde->mde_len - old_namelen);
4779
4780         CDEBUG(D_MGS, "Fork the config-log from %s to %s\n",
4781                mde->mde_name, name_buf);
4782
4783         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4784         LASSERT(ctxt);
4785
4786         rc = llog_open_create(env, ctxt, &new_llh, NULL, name_buf);
4787         if (rc)
4788                 GOTO(out, rc);
4789
4790         rc = llog_init_handle(env, new_llh, LLOG_F_IS_PLAIN, NULL);
4791         if (rc)
4792                 GOTO(out, rc);
4793
4794         if (unlikely(mgs_log_is_empty(env, mgs, mde->mde_name)))
4795                 GOTO(out, rc = 0);
4796
4797         rc = llog_open(env, ctxt, &old_llh, NULL, mde->mde_name,
4798                        LLOG_OPEN_EXISTS);
4799         if (rc)
4800                 GOTO(out, rc);
4801
4802         rc = llog_init_handle(env, old_llh, LLOG_F_IS_PLAIN, NULL);
4803         if (rc)
4804                 GOTO(out, rc);
4805
4806         new_llh->lgh_hdr->llh_tgtuuid = old_llh->lgh_hdr->llh_tgtuuid;
4807
4808         OBD_ALLOC(mlfd, LLOG_MIN_CHUNK_SIZE);
4809         if (!mlfd)
4810                 GOTO(out, rc = -ENOMEM);
4811
4812         mlfd->mlfd_mgs = mgs;
4813         mlfd->mlfd_llh = new_llh;
4814         mlfd->mlfd_oldname = oldname;
4815         mlfd->mlfd_newname = newname;
4816
4817         rc = llog_process(env, old_llh, mgs_lcfg_fork_handler, mlfd, NULL);
4818         OBD_FREE(mlfd, LLOG_MIN_CHUNK_SIZE);
4819
4820         GOTO(out, rc);
4821
4822 out:
4823         if (old_llh)
4824                 llog_close(env, old_llh);
4825         if (new_llh)
4826                 llog_close(env, new_llh);
4827         if (name_buf)
4828                 OBD_FREE(name_buf, name_buflen);
4829         if (ctxt)
4830                 llog_ctxt_put(ctxt);
4831
4832         return rc;
4833 }
4834
4835 int mgs_lcfg_fork(const struct lu_env *env, struct mgs_device *mgs,
4836                   const char *oldname, const char *newname)
4837 {
4838         struct list_head log_list;
4839         struct mgs_direntry *dirent, *n;
4840         int olen = strlen(oldname);
4841         int nlen = strlen(newname);
4842         int count = 0;
4843         int rc = 0;
4844         ENTRY;
4845
4846         if (unlikely(!oldname || oldname[0] == '\0' ||
4847                      !newname || newname[0] == '\0'))
4848                 RETURN(-EINVAL);
4849
4850         if (strcmp(oldname, newname) == 0)
4851                 RETURN(-EINVAL);
4852
4853         /* lock it to prevent fork/erase/register in parallel. */
4854         mutex_lock(&mgs->mgs_mutex);
4855
4856         rc = class_dentry_readdir(env, mgs, &log_list);
4857         if (rc) {
4858                 mutex_unlock(&mgs->mgs_mutex);
4859                 RETURN(rc);
4860         }
4861
4862         if (list_empty(&log_list)) {
4863                 mutex_unlock(&mgs->mgs_mutex);
4864                 RETURN(-ENOENT);
4865         }
4866
4867         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4868                 char *ptr;
4869
4870                 ptr = strrchr(dirent->mde_name, '-');
4871                 if (ptr) {
4872                         int tlen = ptr - dirent->mde_name;
4873
4874                         if (tlen == nlen &&
4875                             strncmp(newname, dirent->mde_name, tlen) == 0)
4876                                 GOTO(out, rc = -EEXIST);
4877
4878                         if (tlen == olen &&
4879                             strncmp(oldname, dirent->mde_name, tlen) == 0)
4880                                 continue;
4881                 }
4882
4883                 list_del_init(&dirent->mde_list);
4884                 mgs_direntry_free(dirent);
4885         }
4886
4887         if (list_empty(&log_list)) {
4888                 mutex_unlock(&mgs->mgs_mutex);
4889                 RETURN(-ENOENT);
4890         }
4891
4892         list_for_each_entry(dirent, &log_list, mde_list) {
4893                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, newname);
4894                 if (rc)
4895                         break;
4896
4897                 count++;
4898         }
4899
4900 out:
4901         mutex_unlock(&mgs->mgs_mutex);
4902
4903         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4904                 list_del_init(&dirent->mde_list);
4905                 mgs_direntry_free(dirent);
4906         }
4907
4908         if (rc && count > 0)
4909                 mgs_erase_logs(env, mgs, newname);
4910
4911         RETURN(rc);
4912 }
4913
4914 int mgs_lcfg_erase(const struct lu_env *env, struct mgs_device *mgs,
4915                    const char *fsname)
4916 {
4917         int rc;
4918         ENTRY;
4919
4920         if (unlikely(!fsname || fsname[0] == '\0'))
4921                 RETURN(-EINVAL);
4922
4923         rc = mgs_erase_logs(env, mgs, fsname);
4924
4925         RETURN(rc);
4926 }
4927
4928 static int mgs_xattr_del(const struct lu_env *env, struct dt_object *obj)
4929 {
4930         struct dt_device *dev;
4931         struct thandle *th = NULL;
4932         int rc = 0;
4933
4934         ENTRY;
4935
4936         dev = container_of0(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
4937         th = dt_trans_create(env, dev);
4938         if (IS_ERR(th))
4939                 RETURN(PTR_ERR(th));
4940
4941         rc = dt_declare_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4942         if (rc)
4943                 GOTO(stop, rc);
4944
4945         rc = dt_trans_start_local(env, dev, th);
4946         if (rc)
4947                 GOTO(stop, rc);
4948
4949         dt_write_lock(env, obj, 0);
4950         rc = dt_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4951
4952         GOTO(unlock, rc);
4953
4954 unlock:
4955         dt_write_unlock(env, obj);
4956
4957 stop:
4958         dt_trans_stop(env, dev, th);
4959
4960         return rc;
4961 }
4962
4963 int mgs_lcfg_rename(const struct lu_env *env, struct mgs_device *mgs)
4964 {
4965         struct list_head log_list;
4966         struct mgs_direntry *dirent, *n;
4967         char fsname[16];
4968         struct lu_buf buf = {
4969                 .lb_buf = fsname,
4970                 .lb_len = sizeof(fsname)
4971         };
4972         int rc = 0;
4973
4974         ENTRY;
4975
4976         rc = class_dentry_readdir(env, mgs, &log_list);
4977         if (rc)
4978                 RETURN(rc);
4979
4980         if (list_empty(&log_list))
4981                 RETURN(0);
4982
4983         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4984                 struct dt_object *o = NULL;
4985                 char oldname[16];
4986                 char *ptr;
4987                 int len;
4988
4989                 list_del_init(&dirent->mde_list);
4990                 ptr = strrchr(dirent->mde_name, '-');
4991                 if (!ptr)
4992                         goto next;
4993
4994                 len = ptr - dirent->mde_name;
4995                 if (unlikely(len >= sizeof(oldname))) {
4996                         CDEBUG(D_MGS, "Skip invalid configuration file %s\n",
4997                                dirent->mde_name);
4998                         goto next;
4999                 }
5000
5001                 o = local_file_find(env, mgs->mgs_los, mgs->mgs_configs_dir,
5002                                     dirent->mde_name);
5003                 if (IS_ERR(o)) {
5004                         rc = PTR_ERR(o);
5005                         CDEBUG(D_MGS, "Fail to locate file %s: rc = %d\n",
5006                                dirent->mde_name, rc);
5007                         goto next;
5008                 }
5009
5010                 rc = dt_xattr_get(env, o, &buf, XATTR_TARGET_RENAME);
5011                 if (rc < 0) {
5012                         if (rc == -ENODATA)
5013                                 rc = 0;
5014                         else
5015                                 CDEBUG(D_MGS,
5016                                        "Fail to get EA for %s: rc = %d\n",
5017                                        dirent->mde_name, rc);
5018                         goto next;
5019                 }
5020
5021                 if (unlikely(rc == len &&
5022                              memcmp(fsname, dirent->mde_name, len) == 0)) {
5023                         /* The new fsname is the same as the old one. */
5024                         rc = mgs_xattr_del(env, o);
5025                         goto next;
5026                 }
5027
5028                 memcpy(oldname, dirent->mde_name, len);
5029                 oldname[len] = '\0';
5030                 fsname[rc] = '\0';
5031                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, fsname);
5032                 if (rc && rc != -EEXIST) {
5033                         CDEBUG(D_MGS, "Fail to fork %s: rc = %d\n",
5034                                dirent->mde_name, rc);
5035                         goto next;
5036                 }
5037
5038                 rc = mgs_erase_log(env, mgs, dirent->mde_name);
5039                 if (rc) {
5040                         CDEBUG(D_MGS, "Fail to erase old %s: rc = %d\n",
5041                                dirent->mde_name, rc);
5042                         /* keep it there if failed to remove it. */
5043                         rc = 0;
5044                 }
5045
5046 next:
5047                 if (o && !IS_ERR(o))
5048                         lu_object_put(env, &o->do_lu);
5049
5050                 mgs_direntry_free(dirent);
5051                 if (rc)
5052                         break;
5053         }
5054
5055         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
5056                 list_del_init(&dirent->mde_list);
5057                 mgs_direntry_free(dirent);
5058         }
5059
5060         RETURN(rc);
5061 }
5062
5063 /* Setup _mgs fsdb and log
5064  */
5065 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5066 {
5067         struct fs_db *fsdb = NULL;
5068         int rc;
5069         ENTRY;
5070
5071         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
5072         if (!rc)
5073                 mgs_put_fsdb(mgs, fsdb);
5074
5075         RETURN(rc);
5076 }
5077
5078 /* Setup params fsdb and log
5079  */
5080 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
5081 {
5082         struct fs_db *fsdb = NULL;
5083         struct llog_handle *params_llh = NULL;
5084         int rc;
5085         ENTRY;
5086
5087         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5088         if (!rc) {
5089                 mutex_lock(&fsdb->fsdb_mutex);
5090                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
5091                 if (!rc)
5092                         rc = record_end_log(env, &params_llh);
5093                 mutex_unlock(&fsdb->fsdb_mutex);
5094                 mgs_put_fsdb(mgs, fsdb);
5095         }
5096
5097         RETURN(rc);
5098 }
5099
5100 /* Cleanup params fsdb and log
5101  */
5102 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
5103 {
5104         int rc;
5105
5106         rc = mgs_erase_logs(env, mgs, PARAMS_FILENAME);
5107         return rc == -ENOENT ? 0 : rc;
5108 }
5109
5110 /**
5111  * Fill in the mgs_target_info based on data devname and param provide.
5112  *
5113  * @env         thread context
5114  * @mgs         mgs device
5115  * @mti         mgs target info. We want to set this based other paramters
5116  *              passed to this function. Once setup we write it to the config
5117  *              logs.
5118  * @devname     optional OBD device name
5119  * @param       string that contains both what tunable to set and the value to
5120  *              set it to.
5121  *
5122  * RETURN       0 for success
5123  *              negative error number on failure
5124  **/
5125 static int mgs_set_conf_param(const struct lu_env *env, struct mgs_device *mgs,
5126                               struct mgs_target_info *mti, const char *devname,
5127                               const char *param)
5128 {
5129         struct fs_db *fsdb = NULL;
5130         int dev_type;
5131         int rc = 0;
5132
5133         ENTRY;
5134         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
5135         if (!devname) {
5136                 size_t len;
5137
5138                 /* We have two possible cases here:
5139                  *
5140                  * 1) the device name embedded in the param:
5141                  *    lustre-OST0000.osc.max_dirty_mb=32
5142                  *
5143                  * 2) the file system name is embedded in
5144                  *    the param: lustre.sys.at.min=0
5145                  */
5146                 len = strcspn(param, ".=");
5147                 if (!len || param[len] == '=')
5148                         RETURN(-EINVAL);
5149
5150                 if (len >= sizeof(mti->mti_svname))
5151                         RETURN(-E2BIG);
5152
5153                 snprintf(mti->mti_svname, sizeof(mti->mti_svname),
5154                          "%.*s", (int)len, param);
5155                 param += len + 1;
5156         } else {
5157                 if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname)) >=
5158                     sizeof(mti->mti_svname))
5159                         RETURN(-E2BIG);
5160         }
5161
5162         if (!strlen(mti->mti_svname)) {
5163                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
5164                 RETURN(-ENOSYS);
5165         }
5166
5167         dev_type = mgs_parse_devname(mti->mti_svname, mti->mti_fsname,
5168                                      &mti->mti_stripe_index);
5169         switch (dev_type) {
5170         /* For this case we have an invalid obd device name */
5171         case -ENXIO:
5172                 CDEBUG(D_MGS, "%s don't contain an index\n", mti->mti_svname);
5173                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5174                 dev_type = 0;
5175                 break;
5176         /* Not an obd device, assume devname is the fsname.
5177          * User might of only provided fsname and not obd device
5178          */
5179         case -EINVAL:
5180                 CDEBUG(D_MGS, "%s is seen as a file system name\n", mti->mti_svname);
5181                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
5182                 dev_type = 0;
5183                 break;
5184         default:
5185                 if (dev_type < 0)
5186                         GOTO(out, rc = dev_type);
5187
5188                 /* param related to llite isn't allowed to set by OST or MDT */
5189                 if (dev_type & LDD_F_SV_TYPE_OST ||
5190                     dev_type & LDD_F_SV_TYPE_MDT) {
5191                         /* param related to llite isn't allowed to set by OST
5192                          * or MDT
5193                          */
5194                         if (!strncmp(param, PARAM_LLITE,
5195                                      sizeof(PARAM_LLITE) - 1))
5196                                 GOTO(out, rc = -EINVAL);
5197
5198                         /* Strip -osc or -mdc suffix from svname */
5199                         if (server_make_name(dev_type, mti->mti_stripe_index,
5200                                              mti->mti_fsname, mti->mti_svname,
5201                                              sizeof(mti->mti_svname)))
5202                                 GOTO(out, rc = -EINVAL);
5203                 }
5204                 break;
5205         }
5206
5207         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
5208             sizeof(mti->mti_params))
5209                 GOTO(out, rc = -E2BIG);
5210
5211         CDEBUG(D_MGS, "set_conf_param fs='%s' device='%s' param='%s'\n",
5212                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5213
5214         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
5215         if (rc)
5216                 GOTO(out, rc);
5217
5218         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
5219             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5220                 CERROR("No filesystem targets for %s. cfg_device from lctl "
5221                        "is '%s'\n", mti->mti_fsname, mti->mti_svname);
5222                 mgs_unlink_fsdb(mgs, fsdb);
5223                 GOTO(out, rc = -EINVAL);
5224         }
5225
5226         /*
5227          * Revoke lock so everyone updates.  Should be alright if
5228          * someone was already reading while we were updating the logs,
5229          * so we don't really need to hold the lock while we're
5230          * writing (above).
5231          */
5232         mti->mti_flags = dev_type | LDD_F_PARAM;
5233         mutex_lock(&fsdb->fsdb_mutex);
5234         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
5235         mutex_unlock(&fsdb->fsdb_mutex);
5236         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
5237
5238 out:
5239         if (fsdb)
5240                 mgs_put_fsdb(mgs, fsdb);
5241
5242         RETURN(rc);
5243 }
5244
5245 static int mgs_set_param2(const struct lu_env *env, struct mgs_device *mgs,
5246                           struct mgs_target_info *mti, const char *param)
5247 {
5248         struct fs_db *fsdb = NULL;
5249         int dev_type;
5250         size_t len;
5251         int rc;
5252
5253         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
5254             sizeof(mti->mti_params))
5255                 GOTO(out, rc = -E2BIG);
5256
5257         /* obdname2fsname reports devname as an obd device */
5258         len = strcspn(param, ".=");
5259         if (len && param[len] != '=') {
5260                 char *ptr;
5261
5262                 param += len + 1;
5263                 ptr = strchr(param, '.');
5264
5265                 len = strlen(param);
5266                 if (ptr)
5267                         len -= strlen(ptr);
5268                 if (len >= sizeof(mti->mti_svname))
5269                         GOTO(out, rc = -E2BIG);
5270
5271                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "%.*s",
5272                         (int)len, param);
5273
5274                 obdname2fsname(mti->mti_svname, mti->mti_fsname,
5275                                sizeof(mti->mti_fsname));
5276         } else {
5277                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "general");
5278         }
5279
5280         CDEBUG(D_MGS, "set_param2 fs='%s' device='%s' param='%s'\n",
5281                mti->mti_fsname, mti->mti_svname, mti->mti_params);
5282
5283         /* The return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5284          * A returned error tells us we don't have a target obd device.
5285          */
5286         dev_type = server_name2index(mti->mti_svname, &mti->mti_stripe_index,
5287                                      NULL);
5288         if (dev_type < 0)
5289                 dev_type = 0;
5290
5291         /* the return value should be the device type i.e LDD_F_SV_TYPE_XXX.
5292          * Strip -osc or -mdc suffix from svname
5293          */
5294         if ((dev_type & LDD_F_SV_TYPE_OST || dev_type & LDD_F_SV_TYPE_MDT) &&
5295             server_make_name(dev_type, mti->mti_stripe_index,
5296                              mti->mti_fsname, mti->mti_svname,
5297                              sizeof(mti->mti_svname)))
5298                 GOTO(out, rc = -EINVAL);
5299
5300         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5301         if (rc)
5302                 GOTO(out, rc);
5303         /*
5304          * Revoke lock so everyone updates.  Should be alright if
5305          * someone was already reading while we were updating the logs,
5306          * so we don't really need to hold the lock while we're
5307          * writing (above).
5308          */
5309         mti->mti_flags = dev_type | LDD_F_PARAM2;
5310         mutex_lock(&fsdb->fsdb_mutex);
5311         rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
5312         mutex_unlock(&fsdb->fsdb_mutex);
5313         mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
5314         mgs_put_fsdb(mgs, fsdb);
5315 out:
5316         RETURN(rc);
5317 }
5318
5319 /* Set a permanent (config log) param for a target or fs
5320  *
5321  * @lcfg buf0 may contain the device (testfs-MDT0000) name
5322  *       buf1 contains the single parameter
5323  */
5324 int mgs_set_param(const struct lu_env *env, struct mgs_device *mgs,
5325                   struct lustre_cfg *lcfg)
5326 {
5327         const char *param = lustre_cfg_string(lcfg, 1);
5328         struct mgs_target_info *mti;
5329         int rc;
5330
5331         /* Create a fake mti to hold everything */
5332         OBD_ALLOC_PTR(mti);
5333         if (!mti)
5334                 return -ENOMEM;
5335
5336         print_lustre_cfg(lcfg);
5337
5338         if (lcfg->lcfg_command == LCFG_PARAM) {
5339                 /* For the case of lctl conf_param devname can be
5340                  * lustre, lustre-mdtlov, lustre-client, lustre-MDT0000
5341                  */
5342                 const char *devname = lustre_cfg_string(lcfg, 0);
5343
5344                 rc = mgs_set_conf_param(env, mgs, mti, devname, param);
5345         } else {
5346                 /* In the case of lctl set_param -P lcfg[0] will always
5347                  * be 'general'. At least for now.
5348                  */
5349                 rc = mgs_set_param2(env, mgs, mti, param);
5350         }
5351
5352         OBD_FREE_PTR(mti);
5353
5354         return rc;
5355 }
5356
5357 static int mgs_write_log_pool(const struct lu_env *env,
5358                               struct mgs_device *mgs, char *logname,
5359                               struct fs_db *fsdb, char *tgtname,
5360                               enum lcfg_command_type cmd,
5361                               char *fsname, char *poolname,
5362                               char *ostname, char *comment)
5363 {
5364         struct llog_handle *llh = NULL;
5365         int rc;
5366
5367         rc = record_start_log(env, mgs, &llh, logname);
5368         if (rc)
5369                 return rc;
5370         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
5371         if (rc)
5372                 goto out;
5373         rc = record_base(env, llh, tgtname, 0, cmd,
5374                          fsname, poolname, ostname, NULL);
5375         if (rc)
5376                 goto out;
5377         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
5378 out:
5379         record_end_log(env, &llh);
5380         return rc;
5381 }
5382
5383 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
5384                     enum lcfg_command_type cmd, const char *nodemap_name,
5385                     char *param)
5386 {
5387         lnet_nid_t      nid[2];
5388         __u32           idmap[2];
5389         bool            bool_switch;
5390         __u32           int_id;
5391         int             rc = 0;
5392         ENTRY;
5393
5394         switch (cmd) {
5395         case LCFG_NODEMAP_ADD:
5396                 rc = nodemap_add(nodemap_name);
5397                 break;
5398         case LCFG_NODEMAP_DEL:
5399                 rc = nodemap_del(nodemap_name);
5400                 break;
5401         case LCFG_NODEMAP_ADD_RANGE:
5402                 rc = nodemap_parse_range(param, nid);
5403                 if (rc != 0)
5404                         break;
5405                 rc = nodemap_add_range(nodemap_name, nid);
5406                 break;
5407         case LCFG_NODEMAP_DEL_RANGE:
5408                 rc = nodemap_parse_range(param, nid);
5409                 if (rc != 0)
5410                         break;
5411                 rc = nodemap_del_range(nodemap_name, nid);
5412                 break;
5413         case LCFG_NODEMAP_ADMIN:
5414                 bool_switch = simple_strtoul(param, NULL, 10);
5415                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
5416                 break;
5417         case LCFG_NODEMAP_DENY_UNKNOWN:
5418                 bool_switch = simple_strtoul(param, NULL, 10);
5419                 rc = nodemap_set_deny_unknown(nodemap_name, bool_switch);
5420                 break;
5421         case LCFG_NODEMAP_AUDIT_MODE:
5422                 rc = kstrtoul(param, 10, (unsigned long *)&bool_switch);
5423                 if (rc == 0)
5424                         rc = nodemap_set_audit_mode(nodemap_name, bool_switch);
5425                 break;
5426         case LCFG_NODEMAP_MAP_MODE:
5427                 if (strcmp("both", param) == 0)
5428                         rc = nodemap_set_mapping_mode(nodemap_name,
5429                                                       NODEMAP_MAP_BOTH);
5430                 else if (strcmp("uid_only", param) == 0)
5431                         rc = nodemap_set_mapping_mode(nodemap_name,
5432                                                       NODEMAP_MAP_UID_ONLY);
5433                 else if (strcmp("gid_only", param) == 0)
5434                         rc = nodemap_set_mapping_mode(nodemap_name,
5435                                                       NODEMAP_MAP_GID_ONLY);
5436                 else
5437                         rc = -EINVAL;
5438                 break;
5439         case LCFG_NODEMAP_TRUSTED:
5440                 bool_switch = simple_strtoul(param, NULL, 10);
5441                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
5442                 break;
5443         case LCFG_NODEMAP_SQUASH_UID:
5444                 int_id = simple_strtoul(param, NULL, 10);
5445                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
5446                 break;
5447         case LCFG_NODEMAP_SQUASH_GID:
5448                 int_id = simple_strtoul(param, NULL, 10);
5449                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
5450                 break;
5451         case LCFG_NODEMAP_ADD_UIDMAP:
5452         case LCFG_NODEMAP_ADD_GIDMAP:
5453                 rc = nodemap_parse_idmap(param, idmap);
5454                 if (rc != 0)
5455                         break;
5456                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
5457                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
5458                                                idmap);
5459                 else
5460                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
5461                                                idmap);
5462                 break;
5463         case LCFG_NODEMAP_DEL_UIDMAP:
5464         case LCFG_NODEMAP_DEL_GIDMAP:
5465                 rc = nodemap_parse_idmap(param, idmap);
5466                 if (rc != 0)
5467                         break;
5468                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
5469                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
5470                                                idmap);
5471                 else
5472                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
5473                                                idmap);
5474                 break;
5475         case LCFG_NODEMAP_SET_FILESET:
5476                 rc = nodemap_set_fileset(nodemap_name, param);
5477                 break;
5478         case LCFG_NODEMAP_SET_SEPOL:
5479                 rc = nodemap_set_sepol(nodemap_name, param);
5480                 break;
5481         default:
5482                 rc = -EINVAL;
5483         }
5484
5485         RETURN(rc);
5486 }
5487
5488 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
5489                  enum lcfg_command_type cmd, char *fsname,
5490                  char *poolname, char *ostname)
5491 {
5492         struct fs_db *fsdb;
5493         char *lovname;
5494         char *logname;
5495         char *label = NULL, *canceled_label = NULL;
5496         int label_sz;
5497         struct mgs_target_info *mti = NULL;
5498         bool checked = false;
5499         bool locked = false;
5500         bool free = false;
5501         int rc, i;
5502         ENTRY;
5503
5504         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
5505         if (rc) {
5506                 CERROR("Can't get db for %s\n", fsname);
5507                 RETURN(rc);
5508         }
5509         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5510                 CERROR("%s is not defined\n", fsname);
5511                 free = true;
5512                 GOTO(out_fsdb, rc = -EINVAL);
5513         }
5514
5515         label_sz = 10 + strlen(fsname) + strlen(poolname);
5516
5517         /* check if ostname match fsname */
5518         if (ostname != NULL) {
5519                 char *ptr;
5520
5521                 ptr = strrchr(ostname, '-');
5522                 if ((ptr == NULL) ||
5523                     (strncmp(fsname, ostname, ptr-ostname) != 0))
5524                         RETURN(-EINVAL);
5525                 label_sz += strlen(ostname);
5526         }
5527
5528         OBD_ALLOC(label, label_sz);
5529         if (!label)
5530                 GOTO(out_fsdb, rc = -ENOMEM);
5531
5532         switch(cmd) {
5533         case LCFG_POOL_NEW:
5534                 sprintf(label,
5535                         "new %s.%s", fsname, poolname);
5536                 break;
5537         case LCFG_POOL_ADD:
5538                 sprintf(label,
5539                         "add %s.%s.%s", fsname, poolname, ostname);
5540                 break;
5541         case LCFG_POOL_REM:
5542                 OBD_ALLOC(canceled_label, label_sz);
5543                 if (canceled_label == NULL)
5544                         GOTO(out_label, rc = -ENOMEM);
5545                 sprintf(label,
5546                         "rem %s.%s.%s", fsname, poolname, ostname);
5547                 sprintf(canceled_label,
5548                         "add %s.%s.%s", fsname, poolname, ostname);
5549                 break;
5550         case LCFG_POOL_DEL:
5551                 OBD_ALLOC(canceled_label, label_sz);
5552                 if (canceled_label == NULL)
5553                         GOTO(out_label, rc = -ENOMEM);
5554                 sprintf(label,
5555                         "del %s.%s", fsname, poolname);
5556                 sprintf(canceled_label,
5557                         "new %s.%s", fsname, poolname);
5558                 break;
5559         default:
5560                 break;
5561         }
5562
5563         OBD_ALLOC_PTR(mti);
5564         if (mti == NULL)
5565                 GOTO(out_cancel, rc = -ENOMEM);
5566         strncpy(mti->mti_svname, "lov pool", sizeof(mti->mti_svname));
5567
5568         mutex_lock(&fsdb->fsdb_mutex);
5569         locked = true;
5570         /* write pool def to all MDT logs */
5571         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
5572                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
5573                         rc = name_create_mdt_and_lov(&logname, &lovname,
5574                                                      fsdb, i);
5575                         if (rc)
5576                                 GOTO(out_mti, rc);
5577
5578                         if (!checked && (canceled_label == NULL)) {
5579                                 rc = mgs_check_marker(env, mgs, fsdb, mti,
5580                                                 logname, lovname, label);
5581                                 if (rc) {
5582                                         name_destroy(&logname);
5583                                         name_destroy(&lovname);
5584                                         GOTO(out_mti,
5585                                                 rc = (rc == LLOG_PROC_BREAK ?
5586                                                         -EEXIST : rc));
5587                                 }
5588                                 checked = true;
5589                         }
5590                         if (canceled_label != NULL)
5591                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5592                                                 lovname, canceled_label,
5593                                                 CM_SKIP);
5594
5595                         if (rc >= 0)
5596                                 rc = mgs_write_log_pool(env, mgs, logname,
5597                                                         fsdb, lovname, cmd,
5598                                                         fsname, poolname,
5599                                                         ostname, label);
5600                         name_destroy(&logname);
5601                         name_destroy(&lovname);
5602                         if (rc)
5603                                 GOTO(out_mti, rc);
5604                 }
5605         }
5606
5607         rc = name_create(&logname, fsname, "-client");
5608         if (rc)
5609                 GOTO(out_mti, rc);
5610
5611         if (!checked && (canceled_label == NULL)) {
5612                 rc = mgs_check_marker(env, mgs, fsdb, mti, logname,
5613                                 fsdb->fsdb_clilov, label);
5614                 if (rc) {
5615                         name_destroy(&logname);
5616                         GOTO(out_mti, rc = (rc == LLOG_PROC_BREAK ?
5617                                 -EEXIST : rc));
5618                 }
5619         }
5620         if (canceled_label != NULL) {
5621                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5622                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
5623                 if (rc < 0) {
5624                         name_destroy(&logname);
5625                         GOTO(out_mti, rc);
5626                 }
5627         }
5628
5629         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
5630                                 cmd, fsname, poolname, ostname, label);
5631         mutex_unlock(&fsdb->fsdb_mutex);
5632         locked = false;
5633         name_destroy(&logname);
5634         /* request for update */
5635         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
5636
5637         GOTO(out_mti, rc);
5638
5639 out_mti:
5640         if (locked)
5641                 mutex_unlock(&fsdb->fsdb_mutex);
5642         if (mti != NULL)
5643                 OBD_FREE_PTR(mti);
5644 out_cancel:
5645         if (canceled_label != NULL)
5646                 OBD_FREE(canceled_label, label_sz);
5647 out_label:
5648         OBD_FREE(label, label_sz);
5649 out_fsdb:
5650         if (free)
5651                 mgs_unlink_fsdb(mgs, fsdb);
5652         mgs_put_fsdb(mgs, fsdb);
5653
5654         return rc;
5655 }