Whamcloud - gitweb
0908249fb715c4d0f572d2db6ebb1b1bcbbd4fdb
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgs/mgs_llog.c
33  *
34  * Lustre Management Server (mgs) config llog creation
35  *
36  * Author: Nathan Rutman <nathan@clusterfs.com>
37  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
38  * Author: Mikhail Pershin <tappro@whamcloud.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MGS
42 #define D_MGS D_CONFIG
43
44 #include <obd.h>
45 #include <uapi/linux/lustre/lustre_ioctl.h>
46 #include <uapi/linux/lustre/lustre_param.h>
47 #include <lustre_sec.h>
48 #include <lustre_quota.h>
49 #include <lustre_sec.h>
50
51 #include "mgs_internal.h"
52
53 /********************** Class functions ********************/
54
55 /**
56  * Find all logs in CONFIG directory and link then into list.
57  *
58  * \param[in] env       pointer to the thread context
59  * \param[in] mgs       pointer to the mgs device
60  * \param[out] log_list the list to hold the found llog name entry
61  *
62  * \retval              0 for success
63  * \retval              negative error number on failure
64  **/
65 int class_dentry_readdir(const struct lu_env *env, struct mgs_device *mgs,
66                          struct list_head *log_list)
67 {
68         struct dt_object *dir = mgs->mgs_configs_dir;
69         const struct dt_it_ops *iops;
70         struct dt_it *it;
71         struct mgs_direntry *de;
72         char *key;
73         int rc, key_sz;
74         size_t suffix_len = sizeof(".bak") - 1;
75
76         INIT_LIST_HEAD(log_list);
77
78         LASSERT(dir);
79         LASSERT(dir->do_index_ops);
80
81         iops = &dir->do_index_ops->dio_it;
82         it = iops->init(env, dir, LUDA_64BITHASH);
83         if (IS_ERR(it))
84                 RETURN(PTR_ERR(it));
85
86         rc = iops->load(env, it, 0);
87         if (rc <= 0)
88                 GOTO(fini, rc = 0);
89
90         /* main cycle */
91         do {
92                 key = (void *)iops->key(env, it);
93                 if (IS_ERR(key)) {
94                         CERROR("%s: key failed when listing %s: rc = %d\n",
95                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
96                                (int) PTR_ERR(key));
97                         goto next;
98                 }
99                 key_sz = iops->key_size(env, it);
100                 LASSERT(key_sz > 0);
101
102                 /* filter out "." and ".." entries */
103                 if (key[0] == '.') {
104                         if (key_sz == 1)
105                                 goto next;
106                         if (key_sz == 2 && key[1] == '.')
107                                 goto next;
108                 }
109
110                 /* filter out ".bak" files */
111                 if (key_sz >= suffix_len &&
112                     !memcmp(".bak", key + key_sz - suffix_len, suffix_len)) {
113                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
114                                key_sz, key);
115                         goto next;
116                 }
117
118                 de = mgs_direntry_alloc(key_sz + 1);
119                 if (de == NULL) {
120                         rc = -ENOMEM;
121                         break;
122                 }
123
124                 memcpy(de->mde_name, key, key_sz);
125                 de->mde_name[key_sz] = 0;
126
127                 list_add(&de->mde_list, log_list);
128
129 next:
130                 rc = iops->next(env, it);
131         } while (rc == 0);
132         if (rc > 0)
133                 rc = 0;
134
135         iops->put(env, it);
136
137 fini:
138         iops->fini(env, it);
139         if (rc) {
140                 struct mgs_direntry *n;
141
142                 CERROR("%s: key failed when listing %s: rc = %d\n",
143                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
144
145                 list_for_each_entry_safe(de, n, log_list, mde_list) {
146                         list_del_init(&de->mde_list);
147                         mgs_direntry_free(de);
148                 }
149         }
150
151         RETURN(rc);
152 }
153
154 /******************** DB functions *********************/
155
156 static inline int name_create(char **newname, char *prefix, char *suffix)
157 {
158         LASSERT(newname);
159         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
160         if (!*newname)
161                 return -ENOMEM;
162         sprintf(*newname, "%s%s", prefix, suffix);
163         return 0;
164 }
165
166 static inline void name_destroy(char **name)
167 {
168         if (*name)
169                 OBD_FREE(*name, strlen(*name) + 1);
170         *name = NULL;
171 }
172
173 struct mgs_fsdb_handler_data
174 {
175         struct fs_db   *fsdb;
176         __u32           ver;
177 };
178
179 /* from the (client) config log, figure out:
180         1. which ost's/mdt's are configured (by index)
181         2. what the last config step is
182         3. COMPAT_18 osc name
183 */
184 /* It might be better to have a separate db file, instead of parsing the info
185    out of the client log.  This is slow and potentially error-prone. */
186 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
187                             struct llog_rec_hdr *rec, void *data)
188 {
189         struct mgs_fsdb_handler_data *d = data;
190         struct fs_db *fsdb = d->fsdb;
191         int cfg_len = rec->lrh_len;
192         char *cfg_buf = (char*) (rec + 1);
193         struct lustre_cfg *lcfg;
194         u32 index;
195         int rc = 0;
196         ENTRY;
197
198         if (rec->lrh_type != OBD_CFG_REC) {
199                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
200                 RETURN(-EINVAL);
201         }
202
203         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
204         if (rc) {
205                 CERROR("Insane cfg\n");
206                 RETURN(rc);
207         }
208
209         lcfg = (struct lustre_cfg *)cfg_buf;
210
211         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
212                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
213
214         /* Figure out ost indicies */
215         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
216         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
217             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
218                 rc = kstrtouint(lustre_cfg_string(lcfg, 2), 10, &index);
219                 if (rc)
220                         RETURN(rc);
221
222                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
223                        lustre_cfg_string(lcfg, 1), index,
224                        lustre_cfg_string(lcfg, 2));
225                 set_bit(index, fsdb->fsdb_ost_index_map);
226         }
227
228         /* Figure out mdt indicies */
229         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
230         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
231             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
232                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
233                                        &index, NULL);
234                 if (rc != LDD_F_SV_TYPE_MDT) {
235                         CWARN("Unparsable MDC name %s, assuming index 0\n",
236                               lustre_cfg_string(lcfg, 0));
237                         index = 0;
238                 }
239                 rc = 0;
240                 CDEBUG(D_MGS, "MDT index is %u\n", index);
241                 if (!test_bit(index, fsdb->fsdb_mdt_index_map)) {
242                         set_bit(index, fsdb->fsdb_mdt_index_map);
243                         fsdb->fsdb_mdt_count++;
244                 }
245         }
246
247         /**
248          * figure out the old config. fsdb_gen = 0 means old log
249          * It is obsoleted and not supported anymore
250          */
251         if (fsdb->fsdb_gen == 0) {
252                 CERROR("Old config format is not supported\n");
253                 RETURN(-EINVAL);
254         }
255
256         /*
257          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
258          */
259         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
260             lcfg->lcfg_command == LCFG_ATTACH &&
261             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
262                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
263                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
264                         CWARN("MDT using 1.8 OSC name scheme\n");
265                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
266                 }
267         }
268
269         if (lcfg->lcfg_command == LCFG_MARKER) {
270                 struct cfg_marker *marker;
271                 marker = lustre_cfg_buf(lcfg, 1);
272
273                 d->ver = marker->cm_vers;
274
275                 /* Keep track of the latest marker step */
276                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
277         }
278
279         RETURN(rc);
280 }
281
282 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
283 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
284                                   struct mgs_device *mgs,
285                                   struct fs_db *fsdb)
286 {
287         char *logname;
288         struct llog_handle *loghandle;
289         struct llog_ctxt *ctxt;
290         struct mgs_fsdb_handler_data d = {
291                 .fsdb = fsdb,
292         };
293         int rc;
294
295         ENTRY;
296
297         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
298         LASSERT(ctxt != NULL);
299         rc = name_create(&logname, fsdb->fsdb_name, "-client");
300         if (rc)
301                 GOTO(out_put, rc);
302         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
303         if (rc)
304                 GOTO(out_pop, rc);
305
306         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
307         if (rc)
308                 GOTO(out_close, rc);
309
310         if (llog_get_size(loghandle) <= 1)
311                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
312
313         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
314         CDEBUG(D_INFO, "get_db = %d\n", rc);
315 out_close:
316         llog_close(env, loghandle);
317 out_pop:
318         name_destroy(&logname);
319 out_put:
320         llog_ctxt_put(ctxt);
321
322         RETURN(rc);
323 }
324
325 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
326 {
327         struct mgs_tgt_srpc_conf *tgtconf;
328
329         /* free target-specific rules */
330         while (fsdb->fsdb_srpc_tgt) {
331                 tgtconf = fsdb->fsdb_srpc_tgt;
332                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
333
334                 LASSERT(tgtconf->mtsc_tgt);
335
336                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
337                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
338                 OBD_FREE_PTR(tgtconf);
339         }
340
341         /* free general rules */
342         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
343 }
344
345 static void mgs_unlink_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
346 {
347         mutex_lock(&mgs->mgs_mutex);
348         if (likely(!list_empty(&fsdb->fsdb_list))) {
349                 LASSERTF(atomic_read(&fsdb->fsdb_ref) >= 2,
350                          "Invalid ref %d on %s\n",
351                          atomic_read(&fsdb->fsdb_ref),
352                          fsdb->fsdb_name);
353
354                 list_del_init(&fsdb->fsdb_list);
355                 /* Drop the reference on the list.*/
356                 mgs_put_fsdb(mgs, fsdb);
357         }
358         mutex_unlock(&mgs->mgs_mutex);
359 }
360
361 /* The caller must hold mgs->mgs_mutex. */
362 static inline struct fs_db *
363 mgs_find_fsdb_noref(struct mgs_device *mgs, const char *fsname)
364 {
365         struct fs_db *fsdb;
366         struct list_head *tmp;
367
368         list_for_each(tmp, &mgs->mgs_fs_db_list) {
369                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
370                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
371                         return fsdb;
372         }
373
374         return NULL;
375 }
376
377 /* The caller must hold mgs->mgs_mutex. */
378 static void mgs_remove_fsdb_by_name(struct mgs_device *mgs, const char *name)
379 {
380         struct fs_db *fsdb;
381
382         fsdb = mgs_find_fsdb_noref(mgs, name);
383         if (fsdb) {
384                 list_del_init(&fsdb->fsdb_list);
385                 /* Drop the reference on the list.*/
386                 mgs_put_fsdb(mgs, fsdb);
387         }
388 }
389
390 /* The caller must hold mgs->mgs_mutex. */
391 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, const char *fsname)
392 {
393         struct fs_db *fsdb;
394
395         fsdb = mgs_find_fsdb_noref(mgs, fsname);
396         if (fsdb)
397                 atomic_inc(&fsdb->fsdb_ref);
398
399         return fsdb;
400 }
401
402 /* The caller must hold mgs->mgs_mutex. */
403 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
404                                   struct mgs_device *mgs, char *fsname)
405 {
406         struct fs_db *fsdb;
407         int rc;
408         ENTRY;
409
410         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
411                 CERROR("fsname %s is too long\n", fsname);
412
413                 RETURN(ERR_PTR(-EINVAL));
414         }
415
416         OBD_ALLOC_PTR(fsdb);
417         if (!fsdb)
418                 RETURN(ERR_PTR(-ENOMEM));
419
420         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
421         mutex_init(&fsdb->fsdb_mutex);
422         INIT_LIST_HEAD(&fsdb->fsdb_list);
423         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
424         fsdb->fsdb_gen = 1;
425         INIT_LIST_HEAD(&fsdb->fsdb_clients);
426         atomic_set(&fsdb->fsdb_notify_phase, 0);
427         init_waitqueue_head(&fsdb->fsdb_notify_waitq);
428         init_completion(&fsdb->fsdb_notify_comp);
429
430         if (strcmp(fsname, MGSSELF_NAME) == 0) {
431                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
432                 fsdb->fsdb_mgs = mgs;
433                 if (logname_is_barrier(fsname))
434                         goto add;
435         } else {
436                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
437                 if (!fsdb->fsdb_mdt_index_map) {
438                         CERROR("No memory for MDT index maps\n");
439
440                         GOTO(err, rc = -ENOMEM);
441                 }
442
443                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
444                 if (!fsdb->fsdb_ost_index_map) {
445                         CERROR("No memory for OST index maps\n");
446
447                         GOTO(err, rc = -ENOMEM);
448                 }
449
450                 if (logname_is_barrier(fsname))
451                         goto add;
452
453                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
454                 if (rc)
455                         GOTO(err, rc);
456
457                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
458                 if (rc)
459                         GOTO(err, rc);
460
461                 /* initialise data for NID table */
462                 mgs_ir_init_fs(env, mgs, fsdb);
463                 lproc_mgs_add_live(mgs, fsdb);
464         }
465
466         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
467             strcmp(PARAMS_FILENAME, fsname) != 0) {
468                 /* populate the db from the client llog */
469                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
470                 if (rc) {
471                         CERROR("Can't get db from client log %d\n", rc);
472
473                         GOTO(err, rc);
474                 }
475         }
476
477         /* populate srpc rules from params llog */
478         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
479         if (rc) {
480                 CERROR("Can't get db from params log %d\n", rc);
481
482                 GOTO(err, rc);
483         }
484
485 add:
486         /* One ref is for the fsdb on the list.
487          * The other ref is for the caller. */
488         atomic_set(&fsdb->fsdb_ref, 2);
489         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
490
491         RETURN(fsdb);
492
493 err:
494         atomic_set(&fsdb->fsdb_ref, 1);
495         mgs_put_fsdb(mgs, fsdb);
496
497         RETURN(ERR_PTR(rc));
498 }
499
500 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
501 {
502         LASSERT(list_empty(&fsdb->fsdb_list));
503
504         lproc_mgs_del_live(mgs, fsdb);
505
506         /* deinitialize fsr */
507         if (fsdb->fsdb_mgs)
508                 mgs_ir_fini_fs(mgs, fsdb);
509
510         if (fsdb->fsdb_ost_index_map)
511                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
512         if (fsdb->fsdb_mdt_index_map)
513                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
514         name_destroy(&fsdb->fsdb_clilov);
515         name_destroy(&fsdb->fsdb_clilmv);
516         mgs_free_fsdb_srpc(fsdb);
517         OBD_FREE_PTR(fsdb);
518 }
519
520 void mgs_put_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
521 {
522         if (atomic_dec_and_test(&fsdb->fsdb_ref))
523                 mgs_free_fsdb(mgs, fsdb);
524 }
525
526 int mgs_init_fsdb_list(struct mgs_device *mgs)
527 {
528         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
529         return 0;
530 }
531
532 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
533 {
534         struct fs_db *fsdb;
535         struct list_head *tmp, *tmp2;
536
537         mutex_lock(&mgs->mgs_mutex);
538         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
539                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
540                 list_del_init(&fsdb->fsdb_list);
541                 mgs_put_fsdb(mgs, fsdb);
542         }
543         mutex_unlock(&mgs->mgs_mutex);
544         return 0;
545 }
546
547 /* The caller must hold mgs->mgs_mutex. */
548 int mgs_find_or_make_fsdb_nolock(const struct lu_env *env,
549                                 struct mgs_device *mgs,
550                                 char *name, struct fs_db **dbh)
551 {
552         struct fs_db *fsdb;
553         int rc = 0;
554         ENTRY;
555
556         fsdb = mgs_find_fsdb(mgs, name);
557         if (!fsdb) {
558                 fsdb = mgs_new_fsdb(env, mgs, name);
559                 if (IS_ERR(fsdb))
560                         rc = PTR_ERR(fsdb);
561
562                 CDEBUG(D_MGS, "Created new db: rc = %d\n", rc);
563         }
564
565         if (!rc)
566                 *dbh = fsdb;
567
568         RETURN(rc);
569 }
570
571 int mgs_find_or_make_fsdb(const struct lu_env *env, struct mgs_device *mgs,
572                           char *name, struct fs_db **dbh)
573 {
574         int rc;
575         ENTRY;
576
577         mutex_lock(&mgs->mgs_mutex);
578         rc = mgs_find_or_make_fsdb_nolock(env, mgs, name, dbh);
579         mutex_unlock(&mgs->mgs_mutex);
580
581         RETURN(rc);
582 }
583
584 /* 1 = index in use
585    0 = index unused
586    -1= empty client log */
587 int mgs_check_index(const struct lu_env *env,
588                     struct mgs_device *mgs,
589                     struct mgs_target_info *mti)
590 {
591         struct fs_db *fsdb;
592         void *imap;
593         int rc = 0;
594         ENTRY;
595
596         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
597
598         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
599         if (rc) {
600                 CERROR("Can't get db for %s\n", mti->mti_fsname);
601                 RETURN(rc);
602         }
603
604         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
605                 GOTO(out, rc = -1);
606
607         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
608                 imap = fsdb->fsdb_ost_index_map;
609         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
610                 imap = fsdb->fsdb_mdt_index_map;
611         else
612                 GOTO(out, rc = -EINVAL);
613
614         if (test_bit(mti->mti_stripe_index, imap))
615                 GOTO(out, rc = 1);
616
617         GOTO(out, rc = 0);
618
619 out:
620         mgs_put_fsdb(mgs, fsdb);
621         return rc;
622 }
623
624 static __inline__ int next_index(void *index_map, int map_len)
625 {
626         int i;
627         for (i = 0; i < map_len * 8; i++)
628                 if (!test_bit(i, index_map)) {
629                          return i;
630                  }
631         CERROR("max index %d exceeded.\n", i);
632         return -1;
633 }
634
635 /* Make the mdt/ost server obd name based on the filesystem name */
636 static bool server_make_name(u32 flags, u16 index, const char *fs,
637                              char *name_buf, size_t name_buf_size)
638 {
639         bool invalid_flag = false;
640
641         if (flags & (LDD_F_SV_TYPE_MDT | LDD_F_SV_TYPE_OST)) {
642                 if (!(flags & LDD_F_SV_ALL))
643                         snprintf(name_buf, name_buf_size, "%.8s%c%s%04x", fs,
644                                 (flags & LDD_F_VIRGIN) ? ':' :
645                                         ((flags & LDD_F_WRITECONF) ? '=' : '-'),
646                                 (flags & LDD_F_SV_TYPE_MDT) ? "MDT" : "OST",
647                                 index);
648         } else if (flags & LDD_F_SV_TYPE_MGS) {
649                 snprintf(name_buf, name_buf_size, "MGS");
650         } else {
651                 CERROR("unknown server type %#x\n", flags);
652                 invalid_flag = true;
653         }
654         return invalid_flag;
655 }
656
657 /* Return codes:
658         0  newly marked as in use
659         <0 err
660         +EALREADY for update of an old index */
661 static int mgs_set_index(const struct lu_env *env,
662                          struct mgs_device *mgs,
663                          struct mgs_target_info *mti)
664 {
665         struct fs_db *fsdb;
666         void *imap;
667         int rc = 0;
668         ENTRY;
669
670         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
671         if (rc) {
672                 CERROR("Can't get db for %s\n", mti->mti_fsname);
673                 RETURN(rc);
674         }
675
676         mutex_lock(&fsdb->fsdb_mutex);
677         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
678                 imap = fsdb->fsdb_ost_index_map;
679         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
680                 imap = fsdb->fsdb_mdt_index_map;
681         } else {
682                 GOTO(out_up, rc = -EINVAL);
683         }
684
685         if (mti->mti_flags & LDD_F_NEED_INDEX) {
686                 rc = next_index(imap, INDEX_MAP_SIZE);
687                 if (rc == -1)
688                         GOTO(out_up, rc = -ERANGE);
689                 mti->mti_stripe_index = rc;
690         }
691
692         /* the last index(0xffff) is reserved for default value. */
693         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
694                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
695                                    "but index must be less than %u.\n",
696                                    mti->mti_svname, mti->mti_stripe_index,
697                                    INDEX_MAP_SIZE * 8 - 1);
698                 GOTO(out_up, rc = -ERANGE);
699         }
700
701         if (test_bit(mti->mti_stripe_index, imap)) {
702                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
703                     !(mti->mti_flags & LDD_F_WRITECONF)) {
704                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
705                                            "%d, but that index is already in "
706                                            "use. Use --writeconf to force\n",
707                                            mti->mti_svname,
708                                            mti->mti_stripe_index);
709                         GOTO(out_up, rc = -EADDRINUSE);
710                 } else {
711                         CDEBUG(D_MGS, "Server %s updating index %d\n",
712                                mti->mti_svname, mti->mti_stripe_index);
713                         GOTO(out_up, rc = EALREADY);
714                 }
715         } else {
716                 set_bit(mti->mti_stripe_index, imap);
717                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
718                         fsdb->fsdb_mdt_count++;
719         }
720
721         set_bit(mti->mti_stripe_index, imap);
722         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
723         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
724                              mti->mti_stripe_index, mti->mti_fsname,
725                              mti->mti_svname, sizeof(mti->mti_svname))) {
726                 CERROR("unknown server type %#x\n", mti->mti_flags);
727                 GOTO(out_up, rc = -EINVAL);
728         }
729
730         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
731                mti->mti_stripe_index);
732
733         GOTO(out_up, rc = 0);
734
735 out_up:
736         mutex_unlock(&fsdb->fsdb_mutex);
737         mgs_put_fsdb(mgs, fsdb);
738         return rc;
739 }
740
741 struct mgs_modify_lookup {
742         struct cfg_marker mml_marker;
743         int               mml_modified;
744 };
745
746 static int mgs_check_record_match(const struct lu_env *env,
747                                 struct llog_handle *llh,
748                                 struct llog_rec_hdr *rec, void *data)
749 {
750         struct cfg_marker *mc_marker = data;
751         struct cfg_marker *marker;
752         struct lustre_cfg *lcfg = REC_DATA(rec);
753         int cfg_len = REC_DATA_LEN(rec);
754         int rc;
755         ENTRY;
756
757
758         if (rec->lrh_type != OBD_CFG_REC) {
759                 CDEBUG(D_ERROR, "Unhandled lrh_type: %#x\n", rec->lrh_type);
760                 RETURN(-EINVAL);
761         }
762
763         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
764         if (rc) {
765                 CDEBUG(D_ERROR, "Insane cfg\n");
766                 RETURN(rc);
767         }
768
769         /* We only care about markers */
770         if (lcfg->lcfg_command != LCFG_MARKER)
771                 RETURN(0);
772
773         marker = lustre_cfg_buf(lcfg, 1);
774
775         if (marker->cm_flags & CM_SKIP)
776                 RETURN(0);
777
778         if ((strcmp(mc_marker->cm_comment, marker->cm_comment) == 0) &&
779                 (strcmp(mc_marker->cm_tgtname, marker->cm_tgtname) == 0)) {
780                 /* Found a non-skipped marker match */
781                 CDEBUG(D_MGS, "Matched rec %u marker %d flag %x %s %s\n",
782                         rec->lrh_index, marker->cm_step,
783                         marker->cm_flags, marker->cm_tgtname,
784                         marker->cm_comment);
785                 rc = LLOG_PROC_BREAK;
786         }
787
788         RETURN(rc);
789 }
790
791 /**
792  * Check an existing config log record with matching comment and device
793  * Return code:
794  * 0 - checked successfully,
795  * LLOG_PROC_BREAK - record matches
796  * negative - error
797  */
798 static int mgs_check_marker(const struct lu_env *env, struct mgs_device *mgs,
799                 struct fs_db *fsdb, struct mgs_target_info *mti,
800                 char *logname, char *devname, char *comment)
801 {
802         struct llog_handle *loghandle;
803         struct llog_ctxt *ctxt;
804         struct cfg_marker *mc_marker;
805         int rc;
806
807         ENTRY;
808
809         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
810         CDEBUG(D_MGS, "mgs check %s/%s/%s\n", logname, devname, comment);
811
812         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
813         LASSERT(ctxt != NULL);
814         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
815         if (rc < 0) {
816                 if (rc == -ENOENT)
817                         rc = 0;
818                 GOTO(out_pop, rc);
819         }
820
821         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
822         if (rc)
823                 GOTO(out_close, rc);
824
825         if (llog_get_size(loghandle) <= 1)
826                 GOTO(out_close, rc = 0);
827
828         OBD_ALLOC_PTR(mc_marker);
829         if (!mc_marker)
830                 GOTO(out_close, rc = -ENOMEM);
831         if (strlcpy(mc_marker->cm_comment, comment,
832                 sizeof(mc_marker->cm_comment)) >=
833                 sizeof(mc_marker->cm_comment))
834                 GOTO(out_free, rc = -E2BIG);
835         if (strlcpy(mc_marker->cm_tgtname, devname,
836                 sizeof(mc_marker->cm_tgtname)) >=
837                 sizeof(mc_marker->cm_tgtname))
838                 GOTO(out_free, rc = -E2BIG);
839
840         rc = llog_process(env, loghandle, mgs_check_record_match,
841                         (void *)mc_marker, NULL);
842
843 out_free:
844         OBD_FREE_PTR(mc_marker);
845
846 out_close:
847         llog_close(env, loghandle);
848 out_pop:
849         if (rc && rc != LLOG_PROC_BREAK)
850                 CDEBUG(D_ERROR, "%s: mgs check %s/%s failed: rc = %d\n",
851                         mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
852         llog_ctxt_put(ctxt);
853         RETURN(rc);
854 }
855
856 static int mgs_modify_handler(const struct lu_env *env,
857                               struct llog_handle *llh,
858                               struct llog_rec_hdr *rec, void *data)
859 {
860         struct mgs_modify_lookup *mml = data;
861         struct cfg_marker *marker;
862         struct lustre_cfg *lcfg = REC_DATA(rec);
863         int cfg_len = REC_DATA_LEN(rec);
864         int rc;
865         ENTRY;
866
867         if (rec->lrh_type != OBD_CFG_REC) {
868                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
869                 RETURN(-EINVAL);
870         }
871
872         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
873         if (rc) {
874                 CERROR("Insane cfg\n");
875                 RETURN(rc);
876         }
877
878         /* We only care about markers */
879         if (lcfg->lcfg_command != LCFG_MARKER)
880                 RETURN(0);
881
882         marker = lustre_cfg_buf(lcfg, 1);
883         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
884             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
885             !(marker->cm_flags & CM_SKIP)) {
886                 /* Found a non-skipped marker match */
887                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
888                        rec->lrh_index, marker->cm_step,
889                        marker->cm_flags, mml->mml_marker.cm_flags,
890                        marker->cm_tgtname, marker->cm_comment);
891                 /* Overwrite the old marker llog entry */
892                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
893                 marker->cm_flags |= mml->mml_marker.cm_flags;
894                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
895                 rc = llog_write(env, llh, rec, rec->lrh_index);
896                 if (!rc)
897                          mml->mml_modified++;
898         }
899
900         RETURN(rc);
901 }
902
903 /**
904  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
905  * Return code:
906  * 0 - modified successfully,
907  * 1 - no modification was done
908  * negative - error
909  */
910 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
911                       struct fs_db *fsdb, struct mgs_target_info *mti,
912                       char *logname, char *devname, char *comment, int flags)
913 {
914         struct llog_handle *loghandle;
915         struct llog_ctxt *ctxt;
916         struct mgs_modify_lookup *mml;
917         int rc;
918
919         ENTRY;
920
921         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
922         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
923                flags);
924
925         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
926         LASSERT(ctxt != NULL);
927         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
928         if (rc < 0) {
929                 if (rc == -ENOENT)
930                         rc = 0;
931                 GOTO(out_pop, rc);
932         }
933
934         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
935         if (rc)
936                 GOTO(out_close, rc);
937
938         if (llog_get_size(loghandle) <= 1)
939                 GOTO(out_close, rc = 0);
940
941         OBD_ALLOC_PTR(mml);
942         if (!mml)
943                 GOTO(out_close, rc = -ENOMEM);
944         if (strlcpy(mml->mml_marker.cm_comment, comment,
945                     sizeof(mml->mml_marker.cm_comment)) >=
946             sizeof(mml->mml_marker.cm_comment))
947                 GOTO(out_free, rc = -E2BIG);
948         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
949                     sizeof(mml->mml_marker.cm_tgtname)) >=
950             sizeof(mml->mml_marker.cm_tgtname))
951                 GOTO(out_free, rc = -E2BIG);
952         /* Modify mostly means cancel */
953         mml->mml_marker.cm_flags = flags;
954         mml->mml_marker.cm_canceltime = flags ? ktime_get_real_seconds() : 0;
955         mml->mml_modified = 0;
956         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
957                           NULL);
958         if (!rc && !mml->mml_modified)
959                 rc = 1;
960
961 out_free:
962         OBD_FREE_PTR(mml);
963
964 out_close:
965         llog_close(env, loghandle);
966 out_pop:
967         if (rc < 0)
968                 CERROR("%s: modify %s/%s failed: rc = %d\n",
969                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
970         llog_ctxt_put(ctxt);
971         RETURN(rc);
972 }
973
974 enum replace_state {
975         REPLACE_COPY = 0,
976         REPLACE_SKIP,
977         REPLACE_DONE,
978         REPLACE_UUID,
979         REPLACE_SETUP
980 };
981
982 /** This structure is passed to mgs_replace_handler */
983 struct mgs_replace_data {
984         /* Nids are replaced for this target device */
985         struct mgs_target_info target;
986         /* Temporary modified llog */
987         struct llog_handle *temp_llh;
988         enum replace_state state;
989         char *failover;
990         char *nodeuuid;
991 };
992
993 /**
994  * Check: a) if block should be skipped
995  * b) is it target block
996  *
997  * \param[in] lcfg
998  * \param[in] mrd
999  *
1000  * \retval 0 should not to be skipped
1001  * \retval 1 should to be skipped
1002  */
1003 static int check_markers(struct lustre_cfg *lcfg,
1004                          struct mgs_replace_data *mrd)
1005 {
1006          struct cfg_marker *marker;
1007
1008         /* Track markers. Find given device */
1009         if (lcfg->lcfg_command == LCFG_MARKER) {
1010                 marker = lustre_cfg_buf(lcfg, 1);
1011                 /* Clean llog from records marked as CM_SKIP.
1012                    CM_EXCLUDE records are used for "active" command
1013                    and can be restored if needed */
1014                 if ((marker->cm_flags & (CM_SKIP | CM_START)) ==
1015                     (CM_SKIP | CM_START)) {
1016                         mrd->state = REPLACE_SKIP;
1017                         return 1;
1018                 }
1019
1020                 if ((marker->cm_flags & (CM_SKIP | CM_END)) ==
1021                     (CM_SKIP | CM_END)) {
1022                         mrd->state = REPLACE_COPY;
1023                         return 1;
1024                 }
1025
1026                 if (strcmp(mrd->target.mti_svname, marker->cm_tgtname) == 0) {
1027                         LASSERT(!(marker->cm_flags & CM_START) ||
1028                                 !(marker->cm_flags & CM_END));
1029                         if (marker->cm_flags & CM_START) {
1030                                 mrd->state = REPLACE_UUID;
1031                                 mrd->failover = NULL;
1032                         } else if (marker->cm_flags & CM_END)
1033                                 mrd->state = REPLACE_COPY;
1034                 }
1035         }
1036
1037         return 0;
1038 }
1039
1040 static int record_base(const struct lu_env *env, struct llog_handle *llh,
1041                      char *cfgname, lnet_nid_t nid, int cmd,
1042                      char *s1, char *s2, char *s3, char *s4)
1043 {
1044         struct mgs_thread_info  *mgi = mgs_env_info(env);
1045         struct llog_cfg_rec     *lcr;
1046         int rc;
1047
1048         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
1049                cmd, s1, s2, s3, s4);
1050
1051         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
1052         if (s1)
1053                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
1054         if (s2)
1055                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
1056         if (s3)
1057                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
1058         if (s4)
1059                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
1060
1061         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
1062         if (lcr == NULL)
1063                 return -ENOMEM;
1064
1065         lcr->lcr_cfg.lcfg_nid = nid;
1066         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1067
1068         lustre_cfg_rec_free(lcr);
1069
1070         if (rc < 0)
1071                 CDEBUG(D_MGS,
1072                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
1073                        cfgname, cmd, s1, s2, s3, s4, rc);
1074         return rc;
1075 }
1076
1077 static inline int record_add_uuid(const struct lu_env *env,
1078                                   struct llog_handle *llh,
1079                                   uint64_t nid, char *uuid)
1080 {
1081         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
1082                            NULL, NULL, NULL);
1083 }
1084
1085 static inline int record_add_conn(const struct lu_env *env,
1086                                   struct llog_handle *llh,
1087                                   char *devname, char *uuid)
1088 {
1089         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
1090                            NULL, NULL, NULL);
1091 }
1092
1093 static inline int record_attach(const struct lu_env *env,
1094                                 struct llog_handle *llh, char *devname,
1095                                 char *type, char *uuid)
1096 {
1097         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
1098                            NULL, NULL);
1099 }
1100
1101 static inline int record_setup(const struct lu_env *env,
1102                                struct llog_handle *llh, char *devname,
1103                                char *s1, char *s2, char *s3, char *s4)
1104 {
1105         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
1106 }
1107
1108 /**
1109  * \retval <0 record processing error
1110  * \retval n record is processed. No need copy original one.
1111  * \retval 0 record is not processed.
1112  */
1113 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
1114                            struct mgs_replace_data *mrd)
1115 {
1116         int nids_added = 0;
1117         lnet_nid_t nid;
1118         char *ptr;
1119         int rc = 0;
1120
1121         if (mrd->state == REPLACE_UUID &&
1122             lcfg->lcfg_command == LCFG_ADD_UUID) {
1123                 /* LCFG_ADD_UUID command found. Let's skip original command
1124                    and add passed nids */
1125                 ptr = mrd->target.mti_params;
1126                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1127                         if (!mrd->nodeuuid) {
1128                                 rc = name_create(&mrd->nodeuuid,
1129                                                  libcfs_nid2str(nid), "");
1130                                 if (rc) {
1131                                         CERROR("Can't create uuid for "
1132                                                 "nid  %s, device %s\n",
1133                                                 libcfs_nid2str(nid),
1134                                                 mrd->target.mti_svname);
1135                                         return rc;
1136                                 }
1137                         }
1138                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
1139                                "device %s\n", libcfs_nid2str(nid),
1140                                 mrd->target.mti_params,
1141                                 mrd->nodeuuid);
1142                         rc = record_add_uuid(env,
1143                                              mrd->temp_llh, nid,
1144                                              mrd->nodeuuid);
1145                         if (!rc)
1146                                 nids_added++;
1147
1148                         if (*ptr == ':') {
1149                                 mrd->failover = ptr;
1150                                 break;
1151                         }
1152                 }
1153
1154                 if (nids_added == 0) {
1155                         CERROR("No new nids were added, nid %s with uuid %s, "
1156                                "device %s\n", libcfs_nid2str(nid),
1157                                mrd->nodeuuid ? mrd->nodeuuid : "NULL",
1158                                mrd->target.mti_svname);
1159                         name_destroy(&mrd->nodeuuid);
1160                         return -ENXIO;
1161                 } else {
1162                         mrd->state = REPLACE_SETUP;
1163                 }
1164
1165                 return nids_added;
1166         }
1167
1168         if (mrd->state == REPLACE_SETUP && lcfg->lcfg_command == LCFG_SETUP) {
1169                 /* LCFG_SETUP command found. UUID should be changed */
1170                 rc = record_setup(env,
1171                                   mrd->temp_llh,
1172                                   /* devname the same */
1173                                   lustre_cfg_string(lcfg, 0),
1174                                   /* s1 is not changed */
1175                                   lustre_cfg_string(lcfg, 1),
1176                                   mrd->nodeuuid,
1177                                   /* s3 is not changed */
1178                                   lustre_cfg_string(lcfg, 3),
1179                                   /* s4 is not changed */
1180                                   lustre_cfg_string(lcfg, 4));
1181
1182                 name_destroy(&mrd->nodeuuid);
1183                 if (rc)
1184                         return rc;
1185
1186                 if (mrd->failover) {
1187                         ptr = mrd->failover;
1188                         while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1189                                 if (mrd->nodeuuid == NULL) {
1190                                         rc =  name_create(&mrd->nodeuuid,
1191                                                           libcfs_nid2str(nid),
1192                                                           "");
1193                                         if (rc)
1194                                                 return rc;
1195                                 }
1196
1197                                 CDEBUG(D_MGS, "add nid %s for failover %s\n",
1198                                        libcfs_nid2str(nid), mrd->nodeuuid);
1199                                 rc = record_add_uuid(env, mrd->temp_llh, nid,
1200                                                      mrd->nodeuuid);
1201                                 if (rc) {
1202                                         name_destroy(&mrd->nodeuuid);
1203                                         return rc;
1204                                 }
1205                                 if (*ptr == ':') {
1206                                         rc = record_add_conn(env,
1207                                                 mrd->temp_llh,
1208                                                 lustre_cfg_string(lcfg, 0),
1209                                                 mrd->nodeuuid);
1210                                         name_destroy(&mrd->nodeuuid);
1211                                         if (rc)
1212                                                 return rc;
1213                                 }
1214                         }
1215                         if (mrd->nodeuuid) {
1216                                 rc = record_add_conn(env, mrd->temp_llh,
1217                                                      lustre_cfg_string(lcfg, 0),
1218                                                      mrd->nodeuuid);
1219                                 name_destroy(&mrd->nodeuuid);
1220                                 if (rc)
1221                                         return rc;
1222                         }
1223                 }
1224                 mrd->state = REPLACE_DONE;
1225                 return rc ? rc : 1;
1226         }
1227
1228         /* Another commands in target device block */
1229         return 0;
1230 }
1231
1232 /**
1233  * Handler that called for every record in llog.
1234  * Records are processed in order they placed in llog.
1235  *
1236  * \param[in] llh       log to be processed
1237  * \param[in] rec       current record
1238  * \param[in] data      mgs_replace_data structure
1239  *
1240  * \retval 0    success
1241  */
1242 static int mgs_replace_nids_handler(const struct lu_env *env,
1243                                     struct llog_handle *llh,
1244                                     struct llog_rec_hdr *rec,
1245                                     void *data)
1246 {
1247         struct mgs_replace_data *mrd;
1248         struct lustre_cfg *lcfg = REC_DATA(rec);
1249         int cfg_len = REC_DATA_LEN(rec);
1250         int rc;
1251         ENTRY;
1252
1253         mrd = (struct mgs_replace_data *)data;
1254
1255         if (rec->lrh_type != OBD_CFG_REC) {
1256                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
1257                        rec->lrh_type, lcfg->lcfg_command,
1258                        lustre_cfg_string(lcfg, 0),
1259                        lustre_cfg_string(lcfg, 1));
1260                 RETURN(-EINVAL);
1261         }
1262
1263         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1264         if (rc) {
1265                 /* Do not copy any invalidated records */
1266                 GOTO(skip_out, rc = 0);
1267         }
1268
1269         rc = check_markers(lcfg, mrd);
1270         if (rc || mrd->state == REPLACE_SKIP)
1271                 GOTO(skip_out, rc = 0);
1272
1273         /* Write to new log all commands outside target device block */
1274         if (mrd->state == REPLACE_COPY)
1275                 GOTO(copy_out, rc = 0);
1276
1277         if (mrd->state == REPLACE_DONE &&
1278             (lcfg->lcfg_command == LCFG_ADD_UUID ||
1279              lcfg->lcfg_command == LCFG_ADD_CONN)) {
1280                 if (!mrd->failover)
1281                         CWARN("Previous failover is deleted, but new one is "
1282                               "not set. This means you configure system "
1283                               "without failover or passed wrong replace_nids "
1284                               "command parameters. Device %s, passed nids %s\n",
1285                               mrd->target.mti_svname, mrd->target.mti_params);
1286                 GOTO(skip_out, rc = 0);
1287         }
1288
1289         rc = process_command(env, lcfg, mrd);
1290         if (rc < 0)
1291                 RETURN(rc);
1292
1293         if (rc)
1294                 RETURN(0);
1295 copy_out:
1296         /* Record is placed in temporary llog as is */
1297         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1298
1299         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1300                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1301                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1302         RETURN(rc);
1303
1304 skip_out:
1305         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1306                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1307                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1308         RETURN(rc);
1309 }
1310
1311 static int mgs_log_is_empty(const struct lu_env *env,
1312                             struct mgs_device *mgs, char *name)
1313 {
1314         struct llog_ctxt        *ctxt;
1315         int                      rc;
1316
1317         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1318         LASSERT(ctxt != NULL);
1319
1320         rc = llog_is_empty(env, ctxt, name);
1321         llog_ctxt_put(ctxt);
1322         return rc;
1323 }
1324
1325 static int mgs_replace_log(const struct lu_env *env,
1326                            struct obd_device *mgs,
1327                            char *logname, char *devname,
1328                            llog_cb_t replace_handler, void *data)
1329 {
1330         struct llog_handle *orig_llh, *backup_llh;
1331         struct llog_ctxt *ctxt;
1332         struct mgs_replace_data *mrd;
1333         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1334         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1335         char *backup;
1336         int rc, rc2, buf_size;
1337         time64_t now;
1338         ENTRY;
1339
1340         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1341         LASSERT(ctxt != NULL);
1342
1343         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1344                 /* Log is empty. Nothing to replace */
1345                 GOTO(out_put, rc = 0);
1346         }
1347
1348         now = ktime_get_real_seconds();
1349
1350         /* max time64_t in decimal fits into 20 bytes long string */
1351         buf_size = strlen(logname) + 1 + 20 + 1 + strlen(".bak") + 1;
1352         OBD_ALLOC(backup, buf_size);
1353         if (backup == NULL)
1354                 GOTO(out_put, rc = -ENOMEM);
1355
1356         snprintf(backup, buf_size, "%s.%llu.bak", logname, now);
1357
1358         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1359         if (rc == 0) {
1360                 /* Now erase original log file. Connections are not allowed.
1361                    Backup is already saved */
1362                 rc = llog_erase(env, ctxt, NULL, logname);
1363                 if (rc < 0)
1364                         GOTO(out_free, rc);
1365         } else if (rc != -ENOENT) {
1366                 CERROR("%s: can't make backup for %s: rc = %d\n",
1367                        mgs->obd_name, logname, rc);
1368                 GOTO(out_free,rc);
1369         }
1370
1371         /* open local log */
1372         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1373         if (rc)
1374                 GOTO(out_restore, rc);
1375
1376         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1377         if (rc)
1378                 GOTO(out_closel, rc);
1379
1380         /* open backup llog */
1381         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1382                        LLOG_OPEN_EXISTS);
1383         if (rc)
1384                 GOTO(out_closel, rc);
1385
1386         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1387         if (rc)
1388                 GOTO(out_close, rc);
1389
1390         if (llog_get_size(backup_llh) <= 1)
1391                 GOTO(out_close, rc = 0);
1392
1393         OBD_ALLOC_PTR(mrd);
1394         if (!mrd)
1395                 GOTO(out_close, rc = -ENOMEM);
1396         /* devname is only needed information to replace UUID records */
1397         if (devname)
1398                 strlcpy(mrd->target.mti_svname, devname,
1399                         sizeof(mrd->target.mti_svname));
1400         /* data is parsed in llog callback */
1401         if (data)
1402                 strlcpy(mrd->target.mti_params, data,
1403                         sizeof(mrd->target.mti_params));
1404         /* Copy records to this temporary llog */
1405         mrd->temp_llh = orig_llh;
1406
1407         rc = llog_process(env, backup_llh, replace_handler,
1408                           (void *)mrd, NULL);
1409         OBD_FREE_PTR(mrd);
1410 out_close:
1411         rc2 = llog_close(NULL, backup_llh);
1412         if (!rc)
1413                 rc = rc2;
1414 out_closel:
1415         rc2 = llog_close(NULL, orig_llh);
1416         if (!rc)
1417                 rc = rc2;
1418
1419 out_restore:
1420         if (rc) {
1421                 CERROR("%s: llog should be restored: rc = %d\n",
1422                        mgs->obd_name, rc);
1423                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1424                                   logname);
1425                 if (rc2 < 0)
1426                         CERROR("%s: can't restore backup %s: rc = %d\n",
1427                                mgs->obd_name, logname, rc2);
1428         }
1429
1430 out_free:
1431         OBD_FREE(backup, buf_size);
1432
1433 out_put:
1434         llog_ctxt_put(ctxt);
1435
1436         if (rc)
1437                 CERROR("%s: failed to replace log %s: rc = %d\n",
1438                        mgs->obd_name, logname, rc);
1439
1440         RETURN(rc);
1441 }
1442
1443 static int mgs_replace_nids_log(const struct lu_env *env,
1444                                 struct obd_device *obd,
1445                                 char *logname, char *devname, char *nids)
1446 {
1447         CDEBUG(D_MGS, "Replace NIDs for %s in %s\n", devname, logname);
1448         return mgs_replace_log(env, obd, logname, devname,
1449                                mgs_replace_nids_handler, nids);
1450 }
1451
1452 /**
1453  * Parse device name and get file system name and/or device index
1454  *
1455  * @devname     device name (ex. lustre-MDT0000)
1456  * @fsname      file system name extracted from @devname and returned
1457  *              to the caller (optional)
1458  * @index       device index extracted from @devname and returned to
1459  *              the caller (optional)
1460  *
1461  * RETURN       0                       success if we are only interested in
1462  *                                      extracting fsname from devname.
1463  *                                      i.e index is NULL
1464  *
1465  *              LDD_F_SV_TYPE_*         Besides extracting the fsname the
1466  *                                      user also wants the index. Report to
1467  *                                      the user the type of obd device the
1468  *                                      returned index belongs too.
1469  *
1470  *              -EINVAL                 The obd device name is improper so
1471  *                                      fsname could not be extracted.
1472  *
1473  *              -ENXIO                  Failed to extract the index out of
1474  *                                      the obd device name. Most likely an
1475  *                                      invalid obd device name
1476  */
1477 static int mgs_parse_devname(char *devname, char *fsname, u32 *index)
1478 {
1479         int rc = 0;
1480         ENTRY;
1481
1482         /* Extract fsname */
1483         if (fsname) {
1484                 rc = server_name2fsname(devname, fsname, NULL);
1485                 if (rc < 0) {
1486                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1487                                devname);
1488                         RETURN(-EINVAL);
1489                 }
1490         }
1491
1492         if (index) {
1493                 rc = server_name2index(devname, index, NULL);
1494                 if (rc < 0) {
1495                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1496                                devname);
1497                         RETURN(-ENXIO);
1498                 }
1499         }
1500
1501         /* server_name2index can return LDD_F_SV_TYPE_* so always return rc */
1502         RETURN(rc);
1503 }
1504
1505 /* This is only called during replace_nids */
1506 static int only_mgs_is_running(struct obd_device *mgs_obd)
1507 {
1508         /* TDB: Is global variable with devices count exists? */
1509         int num_devices = get_devices_count();
1510         int num_exports = 0;
1511         struct obd_export *exp;
1512
1513         spin_lock(&mgs_obd->obd_dev_lock);
1514         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1515                 /* skip self export */
1516                 if (exp == mgs_obd->obd_self_export)
1517                         continue;
1518                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1519                         continue;
1520
1521                 ++num_exports;
1522
1523                 CERROR("%s: node %s still connected during replace_nids "
1524                        "connect_flags:%llx\n",
1525                        mgs_obd->obd_name,
1526                        libcfs_nid2str(exp->exp_nid_stats->nid),
1527                        exp_connect_flags(exp));
1528
1529         }
1530         spin_unlock(&mgs_obd->obd_dev_lock);
1531
1532         /* osd, MGS and MGC + self_export
1533            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1534         return (num_devices <= 3) && (num_exports == 0);
1535 }
1536
1537 static int name_create_mdt(char **logname, char *fsname, int i)
1538 {
1539         char mdt_index[9];
1540
1541         sprintf(mdt_index, "-MDT%04x", i);
1542         return name_create(logname, fsname, mdt_index);
1543 }
1544
1545 /**
1546  * Replace nids for \a device to \a nids values
1547  *
1548  * \param obd           MGS obd device
1549  * \param devname       nids need to be replaced for this device
1550  * (ex. lustre-OST0000)
1551  * \param nids          nids list (ex. nid1,nid2,nid3)
1552  *
1553  * \retval 0    success
1554  */
1555 int mgs_replace_nids(const struct lu_env *env,
1556                      struct mgs_device *mgs,
1557                      char *devname, char *nids)
1558 {
1559         /* Assume fsname is part of device name */
1560         char fsname[MTI_NAME_MAXLEN];
1561         int rc;
1562         __u32 index;
1563         char *logname;
1564         struct fs_db *fsdb = NULL;
1565         unsigned int i;
1566         int conn_state;
1567         struct obd_device *mgs_obd = mgs->mgs_obd;
1568         ENTRY;
1569
1570         /* We can only change NIDs if no other nodes are connected */
1571         spin_lock(&mgs_obd->obd_dev_lock);
1572         conn_state = mgs_obd->obd_no_conn;
1573         mgs_obd->obd_no_conn = 1;
1574         spin_unlock(&mgs_obd->obd_dev_lock);
1575
1576         /* We can not change nids if not only MGS is started */
1577         if (!only_mgs_is_running(mgs_obd)) {
1578                 CERROR("Only MGS is allowed to be started\n");
1579                 GOTO(out, rc = -EINPROGRESS);
1580         }
1581
1582         /* Get fsname and index */
1583         rc = mgs_parse_devname(devname, fsname, &index);
1584         if (rc < 0)
1585                 GOTO(out, rc);
1586
1587         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1588         if (rc) {
1589                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1590                 GOTO(out, rc);
1591         }
1592
1593         /* Process client llogs */
1594         name_create(&logname, fsname, "-client");
1595         rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1596         name_destroy(&logname);
1597         if (rc) {
1598                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1599                        fsname, devname, rc);
1600                 GOTO(out, rc);
1601         }
1602
1603         /* Process MDT llogs */
1604         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1605                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1606                         continue;
1607                 name_create_mdt(&logname, fsname, i);
1608                 rc = mgs_replace_nids_log(env, mgs_obd, logname, devname, nids);
1609                 name_destroy(&logname);
1610                 if (rc)
1611                         GOTO(out, rc);
1612         }
1613
1614 out:
1615         spin_lock(&mgs_obd->obd_dev_lock);
1616         mgs_obd->obd_no_conn = conn_state;
1617         spin_unlock(&mgs_obd->obd_dev_lock);
1618
1619         if (fsdb)
1620                 mgs_put_fsdb(mgs, fsdb);
1621
1622         RETURN(rc);
1623 }
1624
1625 /**
1626  * This is called for every record in llog. Some of records are
1627  * skipped, others are copied to new log as is.
1628  * Records to be skipped are
1629  *  marker records marked SKIP
1630  *  records enclosed between SKIP markers
1631  *
1632  * \param[in] llh       log to be processed
1633  * \param[in] rec       current record
1634  * \param[in] data      mgs_replace_data structure
1635  *
1636  * \retval 0    success
1637  **/
1638 static int mgs_clear_config_handler(const struct lu_env *env,
1639                                     struct llog_handle *llh,
1640                                     struct llog_rec_hdr *rec, void *data)
1641 {
1642         struct mgs_replace_data *mrd;
1643         struct lustre_cfg *lcfg = REC_DATA(rec);
1644         int cfg_len = REC_DATA_LEN(rec);
1645         int rc;
1646
1647         ENTRY;
1648
1649         mrd = (struct mgs_replace_data *)data;
1650
1651         if (rec->lrh_type != OBD_CFG_REC) {
1652                 CDEBUG(D_MGS, "Config llog Name=%s, Record Index=%u, "
1653                        "Unhandled Record Type=%#x\n", llh->lgh_name,
1654                        rec->lrh_index, rec->lrh_type);
1655                 RETURN(-EINVAL);
1656         }
1657
1658         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1659         if (rc) {
1660                 CDEBUG(D_MGS, "Config llog Name=%s, Invalid config file.",
1661                        llh->lgh_name);
1662                 RETURN(-EINVAL);
1663         }
1664
1665         if (lcfg->lcfg_command == LCFG_MARKER) {
1666                 struct cfg_marker *marker;
1667
1668                 marker = lustre_cfg_buf(lcfg, 1);
1669                 if (marker->cm_flags & CM_SKIP) {
1670                         if (marker->cm_flags & CM_START)
1671                                 mrd->state = REPLACE_SKIP;
1672                         if (marker->cm_flags & CM_END)
1673                                 mrd->state = REPLACE_COPY;
1674                         /* SKIP section started or finished */
1675                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1676                                "cmd %x %s %s\n", rec->lrh_index, rc,
1677                                rec->lrh_len, lcfg->lcfg_command,
1678                                lustre_cfg_string(lcfg, 0),
1679                                lustre_cfg_string(lcfg, 1));
1680                         RETURN(0);
1681                 }
1682         } else {
1683                 if (mrd->state == REPLACE_SKIP) {
1684                         /* record enclosed between SKIP markers, skip it */
1685                         CDEBUG(D_MGS, "Skip idx=%d, rc=%d, len=%d, "
1686                                "cmd %x %s %s\n", rec->lrh_index, rc,
1687                                rec->lrh_len, lcfg->lcfg_command,
1688                                lustre_cfg_string(lcfg, 0),
1689                                lustre_cfg_string(lcfg, 1));
1690                         RETURN(0);
1691                 }
1692         }
1693
1694         /* Record is placed in temporary llog as is */
1695         rc = llog_write(env, mrd->temp_llh, rec, LLOG_NEXT_IDX);
1696
1697         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1698                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1699                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1700         RETURN(rc);
1701 }
1702
1703 /*
1704  * Directory CONFIGS/ may contain files which are not config logs to
1705  * be cleared. Skip any llogs with a non-alphanumeric character after
1706  * the last '-'. For example, fsname-MDT0000.sav, fsname-MDT0000.bak,
1707  * fsname-MDT0000.orig, fsname-MDT0000~, fsname-MDT0000.20150516, etc.
1708  */
1709 static bool config_to_clear(const char *logname)
1710 {
1711         int i;
1712         char *str;
1713
1714         str = strrchr(logname, '-');
1715         if (!str)
1716                 return 0;
1717
1718         i = 0;
1719         while (isalnum(str[++i]));
1720         return str[i] == '\0';
1721 }
1722
1723 /**
1724  * Clear config logs for \a name
1725  *
1726  * \param env
1727  * \param mgs           MGS device
1728  * \param name          name of device or of filesystem
1729  *                      (ex. lustre-OST0000 or lustre) in later case all logs
1730  *                      will be cleared
1731  *
1732  * \retval 0            success
1733  */
1734 int mgs_clear_configs(const struct lu_env *env,
1735                      struct mgs_device *mgs, const char *name)
1736 {
1737         struct list_head dentry_list;
1738         struct mgs_direntry *dirent, *n;
1739         char *namedash;
1740         int conn_state;
1741         struct obd_device *mgs_obd = mgs->mgs_obd;
1742         int rc;
1743
1744         ENTRY;
1745
1746         /* Prevent clients and servers from connecting to mgs */
1747         spin_lock(&mgs_obd->obd_dev_lock);
1748         conn_state = mgs_obd->obd_no_conn;
1749         mgs_obd->obd_no_conn = 1;
1750         spin_unlock(&mgs_obd->obd_dev_lock);
1751
1752         /*
1753          * config logs cannot be cleaned if anything other than
1754          * MGS is started
1755          */
1756         if (!only_mgs_is_running(mgs_obd)) {
1757                 CERROR("Only MGS is allowed to be started\n");
1758                 GOTO(out, rc = -EBUSY);
1759         }
1760
1761         /* Find all the logs in the CONFIGS directory */
1762         rc = class_dentry_readdir(env, mgs, &dentry_list);
1763         if (rc) {
1764                 CERROR("%s: cannot read config directory '%s': rc = %d\n",
1765                        mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
1766                 GOTO(out, rc);
1767         }
1768
1769         if (list_empty(&dentry_list)) {
1770                 CERROR("%s: list empty reading config dir '%s': rc = %d\n",
1771                         mgs_obd->obd_name, MOUNT_CONFIGS_DIR, -ENOENT);
1772                 GOTO(out, rc = -ENOENT);
1773         }
1774
1775         OBD_ALLOC(namedash, strlen(name) + 2);
1776         if (namedash == NULL)
1777                 GOTO(out, rc = -ENOMEM);
1778         snprintf(namedash, strlen(name) + 2, "%s-", name);
1779
1780         list_for_each_entry(dirent, &dentry_list, mde_list) {
1781                 if (strcmp(name, dirent->mde_name) &&
1782                     strncmp(namedash, dirent->mde_name, strlen(namedash)))
1783                         continue;
1784                 if (!config_to_clear(dirent->mde_name))
1785                         continue;
1786                 CDEBUG(D_MGS, "%s: Clear config log %s\n",
1787                        mgs_obd->obd_name, dirent->mde_name);
1788                 rc = mgs_replace_log(env, mgs_obd, dirent->mde_name, NULL,
1789                                      mgs_clear_config_handler, NULL);
1790                 if (rc)
1791                         break;
1792         }
1793
1794         list_for_each_entry_safe(dirent, n, &dentry_list, mde_list) {
1795                 list_del_init(&dirent->mde_list);
1796                 mgs_direntry_free(dirent);
1797         }
1798         OBD_FREE(namedash, strlen(name) + 2);
1799 out:
1800         spin_lock(&mgs_obd->obd_dev_lock);
1801         mgs_obd->obd_no_conn = conn_state;
1802         spin_unlock(&mgs_obd->obd_dev_lock);
1803
1804         RETURN(rc);
1805 }
1806
1807 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1808                             char *devname, struct lov_desc *desc)
1809 {
1810         struct mgs_thread_info  *mgi = mgs_env_info(env);
1811         struct llog_cfg_rec     *lcr;
1812         int rc;
1813
1814         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1815         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1816         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1817         if (lcr == NULL)
1818                 return -ENOMEM;
1819
1820         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1821         lustre_cfg_rec_free(lcr);
1822         return rc;
1823 }
1824
1825 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1826                             char *devname, struct lmv_desc *desc)
1827 {
1828         struct mgs_thread_info  *mgi = mgs_env_info(env);
1829         struct llog_cfg_rec     *lcr;
1830         int rc;
1831
1832         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1833         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1834         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1835         if (lcr == NULL)
1836                 return -ENOMEM;
1837
1838         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1839         lustre_cfg_rec_free(lcr);
1840         return rc;
1841 }
1842
1843 static inline int record_mdc_add(const struct lu_env *env,
1844                                  struct llog_handle *llh,
1845                                  char *logname, char *mdcuuid,
1846                                  char *mdtuuid, char *index,
1847                                  char *gen)
1848 {
1849         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1850                            mdtuuid,index,gen,mdcuuid);
1851 }
1852
1853 static inline int record_lov_add(const struct lu_env *env,
1854                                  struct llog_handle *llh,
1855                                  char *lov_name, char *ost_uuid,
1856                                  char *index, char *gen)
1857 {
1858         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1859                            ost_uuid, index, gen, NULL);
1860 }
1861
1862 static inline int record_mount_opt(const struct lu_env *env,
1863                                    struct llog_handle *llh,
1864                                    char *profile, char *lov_name,
1865                                    char *mdc_name)
1866 {
1867         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1868                            profile, lov_name, mdc_name, NULL);
1869 }
1870
1871 static int record_marker(const struct lu_env *env,
1872                          struct llog_handle *llh,
1873                          struct fs_db *fsdb, __u32 flags,
1874                          char *tgtname, char *comment)
1875 {
1876         struct mgs_thread_info *mgi = mgs_env_info(env);
1877         struct llog_cfg_rec *lcr;
1878         int rc;
1879         int cplen = 0;
1880
1881         if (flags & CM_START)
1882                 fsdb->fsdb_gen++;
1883         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1884         mgi->mgi_marker.cm_flags = flags;
1885         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1886         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1887                         sizeof(mgi->mgi_marker.cm_tgtname));
1888         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1889                 return -E2BIG;
1890         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1891                         sizeof(mgi->mgi_marker.cm_comment));
1892         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1893                 return -E2BIG;
1894         mgi->mgi_marker.cm_createtime = ktime_get_real_seconds();
1895         mgi->mgi_marker.cm_canceltime = 0;
1896         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1897         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1898                             sizeof(mgi->mgi_marker));
1899         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1900         if (lcr == NULL)
1901                 return -ENOMEM;
1902
1903         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1904         lustre_cfg_rec_free(lcr);
1905         return rc;
1906 }
1907
1908 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1909                             struct llog_handle **llh, char *name)
1910 {
1911         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1912         struct llog_ctxt        *ctxt;
1913         int                      rc = 0;
1914         ENTRY;
1915
1916         if (*llh)
1917                 GOTO(out, rc = -EBUSY);
1918
1919         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1920         if (!ctxt)
1921                 GOTO(out, rc = -ENODEV);
1922         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1923
1924         rc = llog_open_create(env, ctxt, llh, NULL, name);
1925         if (rc)
1926                 GOTO(out_ctxt, rc);
1927         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1928         if (rc)
1929                 llog_close(env, *llh);
1930 out_ctxt:
1931         llog_ctxt_put(ctxt);
1932 out:
1933         if (rc) {
1934                 CERROR("%s: can't start log %s: rc = %d\n",
1935                        mgs->mgs_obd->obd_name, name, rc);
1936                 *llh = NULL;
1937         }
1938         RETURN(rc);
1939 }
1940
1941 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1942 {
1943         int rc;
1944
1945         rc = llog_close(env, *llh);
1946         *llh = NULL;
1947
1948         return rc;
1949 }
1950
1951 /******************** config "macros" *********************/
1952
1953 /* write an lcfg directly into a log (with markers) */
1954 static int mgs_write_log_direct(const struct lu_env *env,
1955                                 struct mgs_device *mgs, struct fs_db *fsdb,
1956                                 char *logname, struct llog_cfg_rec *lcr,
1957                                 char *devname, char *comment)
1958 {
1959         struct llog_handle *llh = NULL;
1960         int rc;
1961
1962         ENTRY;
1963
1964         rc = record_start_log(env, mgs, &llh, logname);
1965         if (rc)
1966                 RETURN(rc);
1967
1968         /* FIXME These should be a single journal transaction */
1969         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1970         if (rc)
1971                 GOTO(out_end, rc);
1972         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1973         if (rc)
1974                 GOTO(out_end, rc);
1975         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1976         if (rc)
1977                 GOTO(out_end, rc);
1978 out_end:
1979         record_end_log(env, &llh);
1980         RETURN(rc);
1981 }
1982
1983 /* write the lcfg in all logs for the given fs */
1984 static int mgs_write_log_direct_all(const struct lu_env *env,
1985                                     struct mgs_device *mgs,
1986                                     struct fs_db *fsdb,
1987                                     struct mgs_target_info *mti,
1988                                     struct llog_cfg_rec *lcr, char *devname,
1989                                     char *comment, int server_only)
1990 {
1991         struct list_head         log_list;
1992         struct mgs_direntry     *dirent, *n;
1993         char                    *fsname = mti->mti_fsname;
1994         int                      rc = 0, len = strlen(fsname);
1995
1996         ENTRY;
1997         /* Find all the logs in the CONFIGS directory */
1998         rc = class_dentry_readdir(env, mgs, &log_list);
1999         if (rc)
2000                 RETURN(rc);
2001
2002         /* Could use fsdb index maps instead of directory listing */
2003         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
2004                 list_del_init(&dirent->mde_list);
2005                 /* don't write to sptlrpc rule log */
2006                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
2007                         goto next;
2008
2009                 /* caller wants write server logs only */
2010                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
2011                         goto next;
2012
2013                 if (strlen(dirent->mde_name) <= len ||
2014                     strncmp(fsname, dirent->mde_name, len) != 0 ||
2015                     dirent->mde_name[len] != '-')
2016                         goto next;
2017
2018                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
2019                 /* Erase any old settings of this same parameter */
2020                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
2021                                 devname, comment, CM_SKIP);
2022                 if (rc < 0)
2023                         CERROR("%s: Can't modify llog %s: rc = %d\n",
2024                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2025                 if (lcr == NULL)
2026                         goto next;
2027                 /* Write the new one */
2028                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
2029                                           lcr, devname, comment);
2030                 if (rc != 0)
2031                         CERROR("%s: writing log %s: rc = %d\n",
2032                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
2033 next:
2034                 mgs_direntry_free(dirent);
2035         }
2036
2037         RETURN(rc);
2038 }
2039
2040 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2041                                     struct mgs_device *mgs,
2042                                     struct fs_db *fsdb,
2043                                     struct mgs_target_info *mti,
2044                                     int index, char *logname);
2045 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2046                                     struct mgs_device *mgs,
2047                                     struct fs_db *fsdb,
2048                                     struct mgs_target_info *mti,
2049                                     char *logname, char *suffix, char *lovname,
2050                                     enum lustre_sec_part sec_part, int flags);
2051 static int name_create_mdt_and_lov(char **logname, char **lovname,
2052                                    struct fs_db *fsdb, int i);
2053
2054 static int add_param(char *params, char *key, char *val)
2055 {
2056         char *start = params + strlen(params);
2057         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
2058         int keylen = 0;
2059
2060         if (key != NULL)
2061                 keylen = strlen(key);
2062         if (start + 1 + keylen + strlen(val) >= end) {
2063                 CERROR("params are too long: %s %s%s\n",
2064                        params, key != NULL ? key : "", val);
2065                 return -EINVAL;
2066         }
2067
2068         sprintf(start, " %s%s", key != NULL ? key : "", val);
2069         return 0;
2070 }
2071
2072 /**
2073  * Walk through client config log record and convert the related records
2074  * into the target.
2075  **/
2076 static int mgs_steal_client_llog_handler(const struct lu_env *env,
2077                                          struct llog_handle *llh,
2078                                          struct llog_rec_hdr *rec, void *data)
2079 {
2080         struct mgs_device *mgs;
2081         struct obd_device *obd;
2082         struct mgs_target_info *mti, *tmti;
2083         struct fs_db *fsdb;
2084         int cfg_len = rec->lrh_len;
2085         char *cfg_buf = (char*) (rec + 1);
2086         struct lustre_cfg *lcfg;
2087         int rc = 0;
2088         struct llog_handle *mdt_llh = NULL;
2089         static int got_an_osc_or_mdc = 0;
2090         /* 0: not found any osc/mdc;
2091            1: found osc;
2092            2: found mdc;
2093         */
2094         static int last_step = -1;
2095         int cplen = 0;
2096
2097         ENTRY;
2098
2099         mti = ((struct temp_comp*)data)->comp_mti;
2100         tmti = ((struct temp_comp*)data)->comp_tmti;
2101         fsdb = ((struct temp_comp*)data)->comp_fsdb;
2102         obd = ((struct temp_comp *)data)->comp_obd;
2103         mgs = lu2mgs_dev(obd->obd_lu_dev);
2104         LASSERT(mgs);
2105
2106         if (rec->lrh_type != OBD_CFG_REC) {
2107                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2108                 RETURN(-EINVAL);
2109         }
2110
2111         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
2112         if (rc) {
2113                 CERROR("Insane cfg\n");
2114                 RETURN(rc);
2115         }
2116
2117         lcfg = (struct lustre_cfg *)cfg_buf;
2118
2119         if (lcfg->lcfg_command == LCFG_MARKER) {
2120                 struct cfg_marker *marker;
2121                 marker = lustre_cfg_buf(lcfg, 1);
2122                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2123                     (marker->cm_flags & CM_START) &&
2124                      !(marker->cm_flags & CM_SKIP)) {
2125                         got_an_osc_or_mdc = 1;
2126                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
2127                                         sizeof(tmti->mti_svname));
2128                         if (cplen >= sizeof(tmti->mti_svname))
2129                                 RETURN(-E2BIG);
2130                         rc = record_start_log(env, mgs, &mdt_llh,
2131                                               mti->mti_svname);
2132                         if (rc)
2133                                 RETURN(rc);
2134                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
2135                                            mti->mti_svname, "add osc(copied)");
2136                         record_end_log(env, &mdt_llh);
2137                         last_step = marker->cm_step;
2138                         RETURN(rc);
2139                 }
2140                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
2141                     (marker->cm_flags & CM_END) &&
2142                      !(marker->cm_flags & CM_SKIP)) {
2143                         LASSERT(last_step == marker->cm_step);
2144                         last_step = -1;
2145                         got_an_osc_or_mdc = 0;
2146                         memset(tmti, 0, sizeof(*tmti));
2147                         rc = record_start_log(env, mgs, &mdt_llh,
2148                                               mti->mti_svname);
2149                         if (rc)
2150                                 RETURN(rc);
2151                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
2152                                            mti->mti_svname, "add osc(copied)");
2153                         record_end_log(env, &mdt_llh);
2154                         RETURN(rc);
2155                 }
2156                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2157                     (marker->cm_flags & CM_START) &&
2158                      !(marker->cm_flags & CM_SKIP)) {
2159                         got_an_osc_or_mdc = 2;
2160                         last_step = marker->cm_step;
2161                         memcpy(tmti->mti_svname, marker->cm_tgtname,
2162                                strlen(marker->cm_tgtname));
2163
2164                         RETURN(rc);
2165                 }
2166                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
2167                     (marker->cm_flags & CM_END) &&
2168                      !(marker->cm_flags & CM_SKIP)) {
2169                         LASSERT(last_step == marker->cm_step);
2170                         last_step = -1;
2171                         got_an_osc_or_mdc = 0;
2172                         memset(tmti, 0, sizeof(*tmti));
2173                         RETURN(rc);
2174                 }
2175         }
2176
2177         if (got_an_osc_or_mdc == 0 || last_step < 0)
2178                 RETURN(rc);
2179
2180         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
2181                 __u64 nodenid = lcfg->lcfg_nid;
2182
2183                 if (strlen(tmti->mti_uuid) == 0) {
2184                         /* target uuid not set, this config record is before
2185                          * LCFG_SETUP, this nid is one of target node nid.
2186                          */
2187                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
2188                         tmti->mti_nid_count++;
2189                 } else {
2190                         char nidstr[LNET_NIDSTR_SIZE];
2191
2192                         /* failover node nid */
2193                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
2194                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
2195                                         nidstr);
2196                 }
2197
2198                 RETURN(rc);
2199         }
2200
2201         if (lcfg->lcfg_command == LCFG_SETUP) {
2202                 char *target;
2203
2204                 target = lustre_cfg_string(lcfg, 1);
2205                 memcpy(tmti->mti_uuid, target, strlen(target));
2206                 RETURN(rc);
2207         }
2208
2209         /* ignore client side sptlrpc_conf_log */
2210         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
2211                 RETURN(rc);
2212
2213         if (lcfg->lcfg_command == LCFG_ADD_MDC &&
2214             strstr(lustre_cfg_string(lcfg, 0), "-clilmv") != NULL) {
2215                 int index;
2216
2217                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
2218                         RETURN (-EINVAL);
2219
2220                 memcpy(tmti->mti_fsname, mti->mti_fsname,
2221                        strlen(mti->mti_fsname));
2222                 tmti->mti_stripe_index = index;
2223
2224                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
2225                                               mti->mti_stripe_index,
2226                                               mti->mti_svname);
2227                 memset(tmti, 0, sizeof(*tmti));
2228                 RETURN(rc);
2229         }
2230
2231         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
2232                 int index;
2233                 char mdt_index[9];
2234                 char *logname, *lovname;
2235
2236                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2237                                              mti->mti_stripe_index);
2238                 if (rc)
2239                         RETURN(rc);
2240                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
2241
2242                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
2243                         name_destroy(&logname);
2244                         name_destroy(&lovname);
2245                         RETURN(-EINVAL);
2246                 }
2247
2248                 tmti->mti_stripe_index = index;
2249                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
2250                                          mdt_index, lovname,
2251                                          LUSTRE_SP_MDT, 0);
2252                 name_destroy(&logname);
2253                 name_destroy(&lovname);
2254                 RETURN(rc);
2255         }
2256         RETURN(rc);
2257 }
2258
2259 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
2260 /* stealed from mgs_get_fsdb_from_llog*/
2261 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
2262                                               struct mgs_device *mgs,
2263                                               char *client_name,
2264                                               struct temp_comp* comp)
2265 {
2266         struct llog_handle *loghandle;
2267         struct mgs_target_info *tmti;
2268         struct llog_ctxt *ctxt;
2269         int rc;
2270
2271         ENTRY;
2272
2273         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2274         LASSERT(ctxt != NULL);
2275
2276         OBD_ALLOC_PTR(tmti);
2277         if (tmti == NULL)
2278                 GOTO(out_ctxt, rc = -ENOMEM);
2279
2280         comp->comp_tmti = tmti;
2281         comp->comp_obd = mgs->mgs_obd;
2282
2283         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
2284                        LLOG_OPEN_EXISTS);
2285         if (rc < 0) {
2286                 if (rc == -ENOENT)
2287                         rc = 0;
2288                 GOTO(out_pop, rc);
2289         }
2290
2291         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
2292         if (rc)
2293                 GOTO(out_close, rc);
2294
2295         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
2296                                   (void *)comp, NULL, false);
2297         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
2298 out_close:
2299         llog_close(env, loghandle);
2300 out_pop:
2301         OBD_FREE_PTR(tmti);
2302 out_ctxt:
2303         llog_ctxt_put(ctxt);
2304         RETURN(rc);
2305 }
2306
2307 /* lmv is the second thing for client logs */
2308 /* copied from mgs_write_log_lov. Please refer to that.  */
2309 static int mgs_write_log_lmv(const struct lu_env *env,
2310                              struct mgs_device *mgs,
2311                              struct fs_db *fsdb,
2312                              struct mgs_target_info *mti,
2313                              char *logname, char *lmvname)
2314 {
2315         struct llog_handle *llh = NULL;
2316         struct lmv_desc *lmvdesc;
2317         char *uuid;
2318         int rc = 0;
2319         ENTRY;
2320
2321         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
2322
2323         OBD_ALLOC_PTR(lmvdesc);
2324         if (lmvdesc == NULL)
2325                 RETURN(-ENOMEM);
2326         lmvdesc->ld_active_tgt_count = 0;
2327         lmvdesc->ld_tgt_count = 0;
2328         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
2329         uuid = (char *)lmvdesc->ld_uuid.uuid;
2330
2331         rc = record_start_log(env, mgs, &llh, logname);
2332         if (rc)
2333                 GOTO(out_free, rc);
2334         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
2335         if (rc)
2336                 GOTO(out_end, rc);
2337         rc = record_attach(env, llh, lmvname, "lmv", uuid);
2338         if (rc)
2339                 GOTO(out_end, rc);
2340         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
2341         if (rc)
2342                 GOTO(out_end, rc);
2343         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
2344         if (rc)
2345                 GOTO(out_end, rc);
2346 out_end:
2347         record_end_log(env, &llh);
2348 out_free:
2349         OBD_FREE_PTR(lmvdesc);
2350         RETURN(rc);
2351 }
2352
2353 /* lov is the first thing in the mdt and client logs */
2354 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
2355                              struct fs_db *fsdb, struct mgs_target_info *mti,
2356                              char *logname, char *lovname)
2357 {
2358         struct llog_handle *llh = NULL;
2359         struct lov_desc *lovdesc;
2360         char *uuid;
2361         int rc = 0;
2362         ENTRY;
2363
2364         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
2365
2366         /*
2367         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
2368         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
2369               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
2370         */
2371
2372         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
2373         OBD_ALLOC_PTR(lovdesc);
2374         if (lovdesc == NULL)
2375                 RETURN(-ENOMEM);
2376         lovdesc->ld_magic = LOV_DESC_MAGIC;
2377         lovdesc->ld_tgt_count = 0;
2378         /* Defaults.  Can be changed later by lcfg config_param */
2379         lovdesc->ld_default_stripe_count = 1;
2380         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
2381         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
2382         lovdesc->ld_default_stripe_offset = -1;
2383         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
2384         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
2385         /* can these be the same? */
2386         uuid = (char *)lovdesc->ld_uuid.uuid;
2387
2388         /* This should always be the first entry in a log.
2389         rc = mgs_clear_log(obd, logname); */
2390         rc = record_start_log(env, mgs, &llh, logname);
2391         if (rc)
2392                 GOTO(out_free, rc);
2393         /* FIXME these should be a single journal transaction */
2394         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
2395         if (rc)
2396                 GOTO(out_end, rc);
2397         rc = record_attach(env, llh, lovname, "lov", uuid);
2398         if (rc)
2399                 GOTO(out_end, rc);
2400         rc = record_lov_setup(env, llh, lovname, lovdesc);
2401         if (rc)
2402                 GOTO(out_end, rc);
2403         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
2404         if (rc)
2405                 GOTO(out_end, rc);
2406         EXIT;
2407 out_end:
2408         record_end_log(env, &llh);
2409 out_free:
2410         OBD_FREE_PTR(lovdesc);
2411         return rc;
2412 }
2413
2414 /* add failnids to open log */
2415 static int mgs_write_log_failnids(const struct lu_env *env,
2416                                   struct mgs_target_info *mti,
2417                                   struct llog_handle *llh,
2418                                   char *cliname)
2419 {
2420         char *failnodeuuid = NULL;
2421         char *ptr = mti->mti_params;
2422         lnet_nid_t nid;
2423         int rc = 0;
2424
2425         /*
2426         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
2427         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
2428         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
2429         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
2430         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
2431         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
2432         */
2433
2434         /*
2435          * Pull failnid info out of params string, which may contain something
2436          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
2437          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
2438          * etc.  However, convert_hostnames() should have caught those.
2439          */
2440         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2441                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2442                         char nidstr[LNET_NIDSTR_SIZE];
2443
2444                         if (failnodeuuid == NULL) {
2445                                 /* We don't know the failover node name,
2446                                  * so just use the first nid as the uuid */
2447                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
2448                                 rc = name_create(&failnodeuuid, nidstr, "");
2449                                 if (rc != 0)
2450                                         return rc;
2451                         }
2452                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2453                                 "client %s\n",
2454                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
2455                                 failnodeuuid, cliname);
2456                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2457                         /*
2458                          * If *ptr is ':', we have added all NIDs for
2459                          * failnodeuuid.
2460                          */
2461                         if (*ptr == ':') {
2462                                 rc = record_add_conn(env, llh, cliname,
2463                                                      failnodeuuid);
2464                                 name_destroy(&failnodeuuid);
2465                                 failnodeuuid = NULL;
2466                         }
2467                 }
2468                 if (failnodeuuid) {
2469                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2470                         name_destroy(&failnodeuuid);
2471                         failnodeuuid = NULL;
2472                 }
2473         }
2474
2475         return rc;
2476 }
2477
2478 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2479                                     struct mgs_device *mgs,
2480                                     struct fs_db *fsdb,
2481                                     struct mgs_target_info *mti,
2482                                     char *logname, char *lmvname)
2483 {
2484         struct llog_handle *llh = NULL;
2485         char *mdcname = NULL;
2486         char *nodeuuid = NULL;
2487         char *mdcuuid = NULL;
2488         char *lmvuuid = NULL;
2489         char index[6];
2490         char nidstr[LNET_NIDSTR_SIZE];
2491         int i, rc;
2492         ENTRY;
2493
2494         if (mgs_log_is_empty(env, mgs, logname)) {
2495                 CERROR("log is empty! Logical error\n");
2496                 RETURN(-EINVAL);
2497         }
2498
2499         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2500                mti->mti_svname, logname, lmvname);
2501
2502         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2503         rc = name_create(&nodeuuid, nidstr, "");
2504         if (rc)
2505                 RETURN(rc);
2506         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2507         if (rc)
2508                 GOTO(out_free, rc);
2509         rc = name_create(&mdcuuid, mdcname, "_UUID");
2510         if (rc)
2511                 GOTO(out_free, rc);
2512         rc = name_create(&lmvuuid, lmvname, "_UUID");
2513         if (rc)
2514                 GOTO(out_free, rc);
2515
2516         rc = record_start_log(env, mgs, &llh, logname);
2517         if (rc)
2518                 GOTO(out_free, rc);
2519         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2520                            "add mdc");
2521         if (rc)
2522                 GOTO(out_end, rc);
2523         for (i = 0; i < mti->mti_nid_count; i++) {
2524                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2525                         libcfs_nid2str_r(mti->mti_nids[i],
2526                                          nidstr, sizeof(nidstr)));
2527
2528                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2529                 if (rc)
2530                         GOTO(out_end, rc);
2531         }
2532
2533         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2534         if (rc)
2535                 GOTO(out_end, rc);
2536         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2537                           NULL, NULL);
2538         if (rc)
2539                 GOTO(out_end, rc);
2540         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2541         if (rc)
2542                 GOTO(out_end, rc);
2543         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2544         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2545                             index, "1");
2546         if (rc)
2547                 GOTO(out_end, rc);
2548         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2549                            "add mdc");
2550         if (rc)
2551                 GOTO(out_end, rc);
2552 out_end:
2553         record_end_log(env, &llh);
2554 out_free:
2555         name_destroy(&lmvuuid);
2556         name_destroy(&mdcuuid);
2557         name_destroy(&mdcname);
2558         name_destroy(&nodeuuid);
2559         RETURN(rc);
2560 }
2561
2562 static inline int name_create_lov(char **lovname, char *mdtname,
2563                                   struct fs_db *fsdb, int index)
2564 {
2565         /* COMPAT_180 */
2566         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2567                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2568         else
2569                 return name_create(lovname, mdtname, "-mdtlov");
2570 }
2571
2572 static int name_create_mdt_and_lov(char **logname, char **lovname,
2573                                    struct fs_db *fsdb, int i)
2574 {
2575         int rc;
2576
2577         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2578         if (rc)
2579                 return rc;
2580         /* COMPAT_180 */
2581         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2582                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2583         else
2584                 rc = name_create(lovname, *logname, "-mdtlov");
2585         if (rc) {
2586                 name_destroy(logname);
2587                 *logname = NULL;
2588         }
2589         return rc;
2590 }
2591
2592 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2593                                       struct fs_db *fsdb, int i)
2594 {
2595         char suffix[16];
2596
2597         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2598                 sprintf(suffix, "-osc");
2599         else
2600                 sprintf(suffix, "-osc-MDT%04x", i);
2601         return name_create(oscname, ostname, suffix);
2602 }
2603
2604 /* add new mdc to already existent MDS */
2605 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2606                                     struct mgs_device *mgs,
2607                                     struct fs_db *fsdb,
2608                                     struct mgs_target_info *mti,
2609                                     int mdt_index, char *logname)
2610 {
2611         struct llog_handle      *llh = NULL;
2612         char    *nodeuuid = NULL;
2613         char    *ospname = NULL;
2614         char    *lovuuid = NULL;
2615         char    *mdtuuid = NULL;
2616         char    *svname = NULL;
2617         char    *mdtname = NULL;
2618         char    *lovname = NULL;
2619         char    index_str[16];
2620         char    nidstr[LNET_NIDSTR_SIZE];
2621         int     i, rc;
2622
2623         ENTRY;
2624         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2625                 CERROR("log is empty! Logical error\n");
2626                 RETURN (-EINVAL);
2627         }
2628
2629         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2630                logname);
2631
2632         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2633         if (rc)
2634                 RETURN(rc);
2635
2636         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2637         rc = name_create(&nodeuuid, nidstr, "");
2638         if (rc)
2639                 GOTO(out_destory, rc);
2640
2641         rc = name_create(&svname, mdtname, "-osp");
2642         if (rc)
2643                 GOTO(out_destory, rc);
2644
2645         sprintf(index_str, "-MDT%04x", mdt_index);
2646         rc = name_create(&ospname, svname, index_str);
2647         if (rc)
2648                 GOTO(out_destory, rc);
2649
2650         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2651         if (rc)
2652                 GOTO(out_destory, rc);
2653
2654         rc = name_create(&lovuuid, lovname, "_UUID");
2655         if (rc)
2656                 GOTO(out_destory, rc);
2657
2658         rc = name_create(&mdtuuid, mdtname, "_UUID");
2659         if (rc)
2660                 GOTO(out_destory, rc);
2661
2662         rc = record_start_log(env, mgs, &llh, logname);
2663         if (rc)
2664                 GOTO(out_destory, rc);
2665
2666         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2667                            "add osp");
2668         if (rc)
2669                 GOTO(out_destory, rc);
2670
2671         for (i = 0; i < mti->mti_nid_count; i++) {
2672                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2673                         libcfs_nid2str_r(mti->mti_nids[i],
2674                                          nidstr, sizeof(nidstr)));
2675                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2676                 if (rc)
2677                         GOTO(out_end, rc);
2678         }
2679
2680         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2681         if (rc)
2682                 GOTO(out_end, rc);
2683
2684         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2685                           NULL, NULL);
2686         if (rc)
2687                 GOTO(out_end, rc);
2688
2689         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2690         if (rc)
2691                 GOTO(out_end, rc);
2692
2693         /* Add mdc(osp) to lod */
2694         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2695         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2696                          index_str, "1", NULL);
2697         if (rc)
2698                 GOTO(out_end, rc);
2699
2700         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2701         if (rc)
2702                 GOTO(out_end, rc);
2703
2704 out_end:
2705         record_end_log(env, &llh);
2706
2707 out_destory:
2708         name_destroy(&mdtuuid);
2709         name_destroy(&lovuuid);
2710         name_destroy(&lovname);
2711         name_destroy(&ospname);
2712         name_destroy(&svname);
2713         name_destroy(&nodeuuid);
2714         name_destroy(&mdtname);
2715         RETURN(rc);
2716 }
2717
2718 static int mgs_write_log_mdt0(const struct lu_env *env,
2719                               struct mgs_device *mgs,
2720                               struct fs_db *fsdb,
2721                               struct mgs_target_info *mti)
2722 {
2723         char *log = mti->mti_svname;
2724         struct llog_handle *llh = NULL;
2725         char *uuid, *lovname;
2726         char mdt_index[6];
2727         char *ptr = mti->mti_params;
2728         int rc = 0, failout = 0;
2729         ENTRY;
2730
2731         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2732         if (uuid == NULL)
2733                 RETURN(-ENOMEM);
2734
2735         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2736                 failout = (strncmp(ptr, "failout", 7) == 0);
2737
2738         rc = name_create(&lovname, log, "-mdtlov");
2739         if (rc)
2740                 GOTO(out_free, rc);
2741         if (mgs_log_is_empty(env, mgs, log)) {
2742                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2743                 if (rc)
2744                         GOTO(out_lod, rc);
2745         }
2746
2747         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2748
2749         rc = record_start_log(env, mgs, &llh, log);
2750         if (rc)
2751                 GOTO(out_lod, rc);
2752
2753         /* add MDT itself */
2754
2755         /* FIXME this whole fn should be a single journal transaction */
2756         sprintf(uuid, "%s_UUID", log);
2757         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2758         if (rc)
2759                 GOTO(out_lod, rc);
2760         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2761         if (rc)
2762                 GOTO(out_end, rc);
2763         rc = record_mount_opt(env, llh, log, lovname, NULL);
2764         if (rc)
2765                 GOTO(out_end, rc);
2766         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2767                         failout ? "n" : "f");
2768         if (rc)
2769                 GOTO(out_end, rc);
2770         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2771         if (rc)
2772                 GOTO(out_end, rc);
2773 out_end:
2774         record_end_log(env, &llh);
2775 out_lod:
2776         name_destroy(&lovname);
2777 out_free:
2778         OBD_FREE(uuid, sizeof(struct obd_uuid));
2779         RETURN(rc);
2780 }
2781
2782 /* envelope method for all layers log */
2783 static int mgs_write_log_mdt(const struct lu_env *env,
2784                              struct mgs_device *mgs,
2785                              struct fs_db *fsdb,
2786                              struct mgs_target_info *mti)
2787 {
2788         struct mgs_thread_info *mgi = mgs_env_info(env);
2789         struct llog_handle *llh = NULL;
2790         char *cliname;
2791         int rc, i = 0;
2792         ENTRY;
2793
2794         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2795
2796         if (mti->mti_uuid[0] == '\0') {
2797                 /* Make up our own uuid */
2798                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2799                          "%s_UUID", mti->mti_svname);
2800         }
2801
2802         /* add mdt */
2803         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2804         if (rc)
2805                 RETURN(rc);
2806         /* Append the mdt info to the client log */
2807         rc = name_create(&cliname, mti->mti_fsname, "-client");
2808         if (rc)
2809                 RETURN(rc);
2810
2811         if (mgs_log_is_empty(env, mgs, cliname)) {
2812                 /* Start client log */
2813                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2814                                        fsdb->fsdb_clilov);
2815                 if (rc)
2816                         GOTO(out_free, rc);
2817                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2818                                        fsdb->fsdb_clilmv);
2819                 if (rc)
2820                         GOTO(out_free, rc);
2821         }
2822
2823         /*
2824         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2825         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2826         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2827         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2828         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2829         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2830         */
2831
2832         /* copy client info about lov/lmv */
2833         mgi->mgi_comp.comp_mti = mti;
2834         mgi->mgi_comp.comp_fsdb = fsdb;
2835
2836         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2837                                                 &mgi->mgi_comp);
2838         if (rc)
2839                 GOTO(out_free, rc);
2840         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2841                                       fsdb->fsdb_clilmv);
2842         if (rc)
2843                 GOTO(out_free, rc);
2844
2845         /* add mountopts */
2846         rc = record_start_log(env, mgs, &llh, cliname);
2847         if (rc)
2848                 GOTO(out_free, rc);
2849
2850         rc = record_marker(env, llh, fsdb, CM_START, cliname, "mount opts");
2851         if (rc)
2852                 GOTO(out_end, rc);
2853         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2854                               fsdb->fsdb_clilmv);
2855         if (rc)
2856                 GOTO(out_end, rc);
2857         rc = record_marker(env, llh, fsdb, CM_END, cliname, "mount opts");
2858         if (rc)
2859                 GOTO(out_end, rc);
2860
2861         /* for_all_existing_mdt except current one */
2862         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2863                 if (i !=  mti->mti_stripe_index &&
2864                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2865                         char *logname;
2866
2867                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2868                         if (rc)
2869                                 GOTO(out_end, rc);
2870
2871                         /* NB: If the log for the MDT is empty, it means
2872                          * the MDT is only added to the index
2873                          * map, and not being process yet, i.e. this
2874                          * is an unregistered MDT, see mgs_write_log_target().
2875                          * so we should skip it. Otherwise
2876                          *
2877                          * 1. MGS get register request for MDT1 and MDT2.
2878                          *
2879                          * 2. Then both MDT1 and MDT2 are added into
2880                          * fsdb_mdt_index_map. (see mgs_set_index()).
2881                          *
2882                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2883                          * generate the config log, here, it will regard MDT2
2884                          * as an existent MDT, and generate "add osp" for
2885                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2886                          * MDT0002 config log is still empty, so it will
2887                          * add "add osp" even before "lov setup", which
2888                          * will definitly cause trouble.
2889                          *
2890                          * 4. MDT1 registeration finished, fsdb_mutex is
2891                          * released, then MDT2 get in, then in above
2892                          * mgs_steal_llog_for_mdt_from_client(), it will
2893                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2894                          * which will cause another trouble.*/
2895                         if (!mgs_log_is_empty(env, mgs, logname))
2896                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2897                                                               mti, i, logname);
2898
2899                         name_destroy(&logname);
2900                         if (rc)
2901                                 GOTO(out_end, rc);
2902                 }
2903         }
2904 out_end:
2905         record_end_log(env, &llh);
2906 out_free:
2907         name_destroy(&cliname);
2908         RETURN(rc);
2909 }
2910
2911 /* Add the ost info to the client/mdt lov */
2912 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2913                                     struct mgs_device *mgs, struct fs_db *fsdb,
2914                                     struct mgs_target_info *mti,
2915                                     char *logname, char *suffix, char *lovname,
2916                                     enum lustre_sec_part sec_part, int flags)
2917 {
2918         struct llog_handle *llh = NULL;
2919         char *nodeuuid = NULL;
2920         char *oscname = NULL;
2921         char *oscuuid = NULL;
2922         char *lovuuid = NULL;
2923         char *svname = NULL;
2924         char index[6];
2925         char nidstr[LNET_NIDSTR_SIZE];
2926         int i, rc;
2927         ENTRY;
2928
2929         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2930                 mti->mti_svname, logname);
2931
2932         if (mgs_log_is_empty(env, mgs, logname)) {
2933                 CERROR("log is empty! Logical error\n");
2934                 RETURN(-EINVAL);
2935         }
2936
2937         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2938         rc = name_create(&nodeuuid, nidstr, "");
2939         if (rc)
2940                 RETURN(rc);
2941         rc = name_create(&svname, mti->mti_svname, "-osc");
2942         if (rc)
2943                 GOTO(out_free, rc);
2944
2945         /* for the system upgraded from old 1.8, keep using the old osc naming
2946          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2947         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2948                 rc = name_create(&oscname, svname, "");
2949         else
2950                 rc = name_create(&oscname, svname, suffix);
2951         if (rc)
2952                 GOTO(out_free, rc);
2953
2954         rc = name_create(&oscuuid, oscname, "_UUID");
2955         if (rc)
2956                 GOTO(out_free, rc);
2957         rc = name_create(&lovuuid, lovname, "_UUID");
2958         if (rc)
2959                 GOTO(out_free, rc);
2960
2961
2962         /*
2963         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2964         multihomed (#4)
2965         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2966         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2967         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2968         failover (#6,7)
2969         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2970         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2971         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2972         */
2973
2974         rc = record_start_log(env, mgs, &llh, logname);
2975         if (rc)
2976                 GOTO(out_free, rc);
2977
2978         /* FIXME these should be a single journal transaction */
2979         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2980                            "add osc");
2981         if (rc)
2982                 GOTO(out_end, rc);
2983
2984         /* NB: don't change record order, because upon MDT steal OSC config
2985          * from client, it treats all nids before LCFG_SETUP as target nids
2986          * (multiple interfaces), while nids after as failover node nids.
2987          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2988          */
2989         for (i = 0; i < mti->mti_nid_count; i++) {
2990                 CDEBUG(D_MGS, "add nid %s\n",
2991                         libcfs_nid2str_r(mti->mti_nids[i],
2992                                          nidstr, sizeof(nidstr)));
2993                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2994                 if (rc)
2995                         GOTO(out_end, rc);
2996         }
2997         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2998         if (rc)
2999                 GOTO(out_end, rc);
3000         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
3001                           NULL, NULL);
3002         if (rc)
3003                 GOTO(out_end, rc);
3004         rc = mgs_write_log_failnids(env, mti, llh, oscname);
3005         if (rc)
3006                 GOTO(out_end, rc);
3007
3008         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
3009
3010         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
3011         if (rc)
3012                 GOTO(out_end, rc);
3013         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
3014                            "add osc");
3015         if (rc)
3016                 GOTO(out_end, rc);
3017 out_end:
3018         record_end_log(env, &llh);
3019 out_free:
3020         name_destroy(&lovuuid);
3021         name_destroy(&oscuuid);
3022         name_destroy(&oscname);
3023         name_destroy(&svname);
3024         name_destroy(&nodeuuid);
3025         RETURN(rc);
3026 }
3027
3028 static int mgs_write_log_ost(const struct lu_env *env,
3029                              struct mgs_device *mgs, struct fs_db *fsdb,
3030                              struct mgs_target_info *mti)
3031 {
3032         struct llog_handle *llh = NULL;
3033         char *logname, *lovname;
3034         char *ptr = mti->mti_params;
3035         int rc, flags = 0, failout = 0, i;
3036         ENTRY;
3037
3038         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
3039
3040         /* The ost startup log */
3041
3042         /* If the ost log already exists, that means that someone reformatted
3043            the ost and it called target_add again. */
3044         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3045                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
3046                                    "exists, yet the server claims it never "
3047                                    "registered. It may have been reformatted, "
3048                                    "or the index changed. writeconf the MDT to "
3049                                    "regenerate all logs.\n", mti->mti_svname);
3050                 RETURN(-EALREADY);
3051         }
3052
3053         /*
3054         attach obdfilter ost1 ost1_UUID
3055         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
3056         */
3057         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
3058                 failout = (strncmp(ptr, "failout", 7) == 0);
3059         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
3060         if (rc)
3061                 RETURN(rc);
3062         /* FIXME these should be a single journal transaction */
3063         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
3064         if (rc)
3065                 GOTO(out_end, rc);
3066         if (*mti->mti_uuid == '\0')
3067                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
3068                          "%s_UUID", mti->mti_svname);
3069         rc = record_attach(env, llh, mti->mti_svname,
3070                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
3071         if (rc)
3072                 GOTO(out_end, rc);
3073         rc = record_setup(env, llh, mti->mti_svname,
3074                           "dev"/*ignored*/, "type"/*ignored*/,
3075                           failout ? "n" : "f", NULL/*options*/);
3076         if (rc)
3077                 GOTO(out_end, rc);
3078         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
3079         if (rc)
3080                 GOTO(out_end, rc);
3081 out_end:
3082         record_end_log(env, &llh);
3083         if (rc)
3084                 RETURN(rc);
3085         /* We also have to update the other logs where this osc is part of
3086            the lov */
3087
3088         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3089                 /* If we're upgrading, the old mdt log already has our
3090                    entry. Let's do a fake one for fun. */
3091                 /* Note that we can't add any new failnids, since we don't
3092                    know the old osc names. */
3093                 flags = CM_SKIP | CM_UPGRADE146;
3094
3095         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
3096                 /* If the update flag isn't set, don't update client/mdt
3097                    logs. */
3098                 flags |= CM_SKIP;
3099                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
3100                               "the MDT first to regenerate it.\n",
3101                               mti->mti_svname);
3102         }
3103
3104         /* Add ost to all MDT lov defs */
3105         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3106                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
3107                         char mdt_index[13];
3108
3109                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
3110                                                      i);
3111                         if (rc)
3112                                 RETURN(rc);
3113
3114                         snprintf(mdt_index, sizeof(mdt_index), "-MDT%04x", i);
3115                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
3116                                                       logname, mdt_index,
3117                                                       lovname, LUSTRE_SP_MDT,
3118                                                       flags);
3119                         name_destroy(&logname);
3120                         name_destroy(&lovname);
3121                         if (rc)
3122                                 RETURN(rc);
3123                 }
3124         }
3125
3126         /* Append ost info to the client log */
3127         rc = name_create(&logname, mti->mti_fsname, "-client");
3128         if (rc)
3129                 RETURN(rc);
3130         if (mgs_log_is_empty(env, mgs, logname)) {
3131                 /* Start client log */
3132                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
3133                                        fsdb->fsdb_clilov);
3134                 if (rc)
3135                         GOTO(out_free, rc);
3136                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
3137                                        fsdb->fsdb_clilmv);
3138                 if (rc)
3139                         GOTO(out_free, rc);
3140         }
3141         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
3142                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
3143 out_free:
3144         name_destroy(&logname);
3145         RETURN(rc);
3146 }
3147
3148 static __inline__ int mgs_param_empty(char *ptr)
3149 {
3150         char *tmp;
3151
3152         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
3153                 return 1;
3154         return 0;
3155 }
3156
3157 static int mgs_write_log_failnid_internal(const struct lu_env *env,
3158                                           struct mgs_device *mgs,
3159                                           struct fs_db *fsdb,
3160                                           struct mgs_target_info *mti,
3161                                           char *logname, char *cliname)
3162 {
3163         int rc;
3164         struct llog_handle *llh = NULL;
3165
3166         if (mgs_param_empty(mti->mti_params)) {
3167                 /* Remove _all_ failnids */
3168                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3169                                 mti->mti_svname, "add failnid", CM_SKIP);
3170                 return rc < 0 ? rc : 0;
3171         }
3172
3173         /* Otherwise failover nids are additive */
3174         rc = record_start_log(env, mgs, &llh, logname);
3175         if (rc)
3176                 return rc;
3177                 /* FIXME this should be a single journal transaction */
3178         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
3179                            "add failnid");
3180         if (rc)
3181                 goto out_end;
3182         rc = mgs_write_log_failnids(env, mti, llh, cliname);
3183         if (rc)
3184                 goto out_end;
3185         rc = record_marker(env, llh, fsdb, CM_END,
3186                            mti->mti_svname, "add failnid");
3187 out_end:
3188         record_end_log(env, &llh);
3189         return rc;
3190 }
3191
3192
3193 /* Add additional failnids to an existing log.
3194    The mdc/osc must have been added to logs first */
3195 /* tcp nids must be in dotted-quad ascii -
3196    we can't resolve hostnames from the kernel. */
3197 static int mgs_write_log_add_failnid(const struct lu_env *env,
3198                                      struct mgs_device *mgs,
3199                                      struct fs_db *fsdb,
3200                                      struct mgs_target_info *mti)
3201 {
3202         char *logname, *cliname;
3203         int rc;
3204         ENTRY;
3205
3206         /* FIXME we currently can't erase the failnids
3207          * given when a target first registers, since they aren't part of
3208          * an "add uuid" stanza
3209          */
3210
3211         /* Verify that we know about this target */
3212         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
3213                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
3214                                    "yet. It must be started before failnids "
3215                                    "can be added.\n", mti->mti_svname);
3216                 RETURN(-ENOENT);
3217         }
3218
3219         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
3220         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3221                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
3222         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3223                 rc = name_create(&cliname, mti->mti_svname, "-osc");
3224         } else {
3225                 RETURN(-EINVAL);
3226         }
3227         if (rc)
3228                 RETURN(rc);
3229
3230         /* Add failover nids to the client log */
3231         rc = name_create(&logname, mti->mti_fsname, "-client");
3232         if (rc) {
3233                 name_destroy(&cliname);
3234                 RETURN(rc);
3235         }
3236
3237         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
3238         name_destroy(&logname);
3239         name_destroy(&cliname);
3240         if (rc)
3241                 RETURN(rc);
3242
3243         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3244                 /* Add OST failover nids to the MDT logs as well */
3245                 int i;
3246
3247                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3248                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3249                                 continue;
3250                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3251                         if (rc)
3252                                 RETURN(rc);
3253                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
3254                                                  fsdb, i);
3255                         if (rc) {
3256                                 name_destroy(&logname);
3257                                 RETURN(rc);
3258                         }
3259                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
3260                                                             mti, logname,
3261                                                             cliname);
3262                         name_destroy(&cliname);
3263                         name_destroy(&logname);
3264                         if (rc)
3265                                 RETURN(rc);
3266                 }
3267         }
3268
3269         RETURN(rc);
3270 }
3271
3272 static int mgs_wlp_lcfg(const struct lu_env *env,
3273                         struct mgs_device *mgs, struct fs_db *fsdb,
3274                         struct mgs_target_info *mti,
3275                         char *logname, struct lustre_cfg_bufs *bufs,
3276                         char *tgtname, char *ptr)
3277 {
3278         char comment[MTI_NAME_MAXLEN];
3279         char *tmp;
3280         struct llog_cfg_rec *lcr;
3281         int rc, del;
3282
3283         /* Erase any old settings of this same parameter */
3284         memcpy(comment, ptr, MTI_NAME_MAXLEN);
3285         comment[MTI_NAME_MAXLEN - 1] = 0;
3286         /* But don't try to match the value. */
3287         tmp = strchr(comment, '=');
3288         if (tmp != NULL)
3289                 *tmp = 0;
3290         /* FIXME we should skip settings that are the same as old values */
3291         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
3292         if (rc < 0)
3293                 return rc;
3294         del = mgs_param_empty(ptr);
3295
3296         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
3297                       "Setting" : "Modifying", tgtname, comment, logname);
3298         if (del) {
3299                 /* mgs_modify() will return 1 if nothing had to be done */
3300                 if (rc == 1)
3301                         rc = 0;
3302                 return rc;
3303         }
3304
3305         lustre_cfg_bufs_reset(bufs, tgtname);
3306         lustre_cfg_bufs_set_string(bufs, 1, ptr);
3307         if (mti->mti_flags & LDD_F_PARAM2)
3308                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
3309
3310         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
3311                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
3312         if (lcr == NULL)
3313                 return -ENOMEM;
3314
3315         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
3316                                   comment);
3317         lustre_cfg_rec_free(lcr);
3318         return rc;
3319 }
3320
3321 /* write global variable settings into log */
3322 static int mgs_write_log_sys(const struct lu_env *env,
3323                              struct mgs_device *mgs, struct fs_db *fsdb,
3324                              struct mgs_target_info *mti, char *sys, char *ptr)
3325 {
3326         struct mgs_thread_info  *mgi = mgs_env_info(env);
3327         struct lustre_cfg       *lcfg;
3328         struct llog_cfg_rec     *lcr;
3329         char *tmp, sep;
3330         int rc, cmd, convert = 1;
3331
3332         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
3333                 cmd = LCFG_SET_TIMEOUT;
3334         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
3335                 cmd = LCFG_SET_LDLM_TIMEOUT;
3336         /* Check for known params here so we can return error to lctl */
3337         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
3338                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
3339                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
3340                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
3341                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
3342                 cmd = LCFG_PARAM;
3343         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
3344                 convert = 0; /* Don't convert string value to integer */
3345                 cmd = LCFG_PARAM;
3346         } else {
3347                 return -EINVAL;
3348         }
3349
3350         if (mgs_param_empty(ptr))
3351                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
3352         else
3353                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
3354
3355         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
3356         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
3357         if (!convert && *tmp != '\0')
3358                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
3359         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3360         if (lcr == NULL)
3361                 return -ENOMEM;
3362
3363         lcfg = &lcr->lcr_cfg;
3364         if (convert) {
3365                 rc = kstrtouint(tmp, 0, &lcfg->lcfg_num);
3366                 if (rc)
3367                         GOTO(out_rec_free, rc);
3368         } else {
3369                 lcfg->lcfg_num = 0;
3370         }
3371
3372         /* truncate the comment to the parameter name */
3373         ptr = tmp - 1;
3374         sep = *ptr;
3375         *ptr = '\0';
3376         /* modify all servers and clients */
3377         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3378                                       *tmp == '\0' ? NULL : lcr,
3379                                       mti->mti_fsname, sys, 0);
3380         if (rc == 0 && *tmp != '\0') {
3381                 switch (cmd) {
3382                 case LCFG_SET_TIMEOUT:
3383                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
3384                                 class_process_config(lcfg);
3385                         break;
3386                 case LCFG_SET_LDLM_TIMEOUT:
3387                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
3388                                 class_process_config(lcfg);
3389                         break;
3390                 default:
3391                         break;
3392                 }
3393         }
3394         *ptr = sep;
3395 out_rec_free:
3396         lustre_cfg_rec_free(lcr);
3397         return rc;
3398 }
3399
3400 /* write quota settings into log */
3401 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
3402                                struct fs_db *fsdb, struct mgs_target_info *mti,
3403                                char *quota, char *ptr)
3404 {
3405         struct mgs_thread_info  *mgi = mgs_env_info(env);
3406         struct llog_cfg_rec     *lcr;
3407         char                    *tmp;
3408         char                     sep;
3409         int                      rc, cmd = LCFG_PARAM;
3410
3411         /* support only 'meta' and 'data' pools so far */
3412         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
3413             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
3414                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
3415                        "& quota.ost are)\n", ptr);
3416                 return -EINVAL;
3417         }
3418
3419         if (*tmp == '\0') {
3420                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
3421         } else {
3422                 CDEBUG(D_MGS, "global '%s'\n", quota);
3423
3424                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
3425                     strchr(tmp, 'p') == NULL &&
3426                     strcmp(tmp, "none") != 0) {
3427                         CERROR("enable option(%s) isn't supported\n", tmp);
3428                         return -EINVAL;
3429                 }
3430         }
3431
3432         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
3433         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
3434         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3435         if (lcr == NULL)
3436                 return -ENOMEM;
3437
3438         /* truncate the comment to the parameter name */
3439         ptr = tmp - 1;
3440         sep = *ptr;
3441         *ptr = '\0';
3442
3443         /* XXX we duplicated quota enable information in all server
3444          *     config logs, it should be moved to a separate config
3445          *     log once we cleanup the config log for global param. */
3446         /* modify all servers */
3447         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3448                                       *tmp == '\0' ? NULL : lcr,
3449                                       mti->mti_fsname, quota, 1);
3450         *ptr = sep;
3451         lustre_cfg_rec_free(lcr);
3452         return rc < 0 ? rc : 0;
3453 }
3454
3455 static int mgs_srpc_set_param_disk(const struct lu_env *env,
3456                                    struct mgs_device *mgs,
3457                                    struct fs_db *fsdb,
3458                                    struct mgs_target_info *mti,
3459                                    char *param)
3460 {
3461         struct mgs_thread_info  *mgi = mgs_env_info(env);
3462         struct llog_cfg_rec     *lcr;
3463         struct llog_handle      *llh = NULL;
3464         char                    *logname;
3465         char                    *comment, *ptr;
3466         int                      rc, len;
3467
3468         ENTRY;
3469
3470         /* get comment */
3471         ptr = strchr(param, '=');
3472         LASSERT(ptr != NULL);
3473         len = ptr - param;
3474
3475         OBD_ALLOC(comment, len + 1);
3476         if (comment == NULL)
3477                 RETURN(-ENOMEM);
3478         strncpy(comment, param, len);
3479         comment[len] = '\0';
3480
3481         /* prepare lcfg */
3482         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
3483         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
3484         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
3485         if (lcr == NULL)
3486                 GOTO(out_comment, rc = -ENOMEM);
3487
3488         /* construct log name */
3489         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
3490         if (rc < 0)
3491                 GOTO(out_lcfg, rc);
3492
3493         if (mgs_log_is_empty(env, mgs, logname)) {
3494                 rc = record_start_log(env, mgs, &llh, logname);
3495                 if (rc < 0)
3496                         GOTO(out, rc);
3497                 record_end_log(env, &llh);
3498         }
3499
3500         /* obsolete old one */
3501         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
3502                         comment, CM_SKIP);
3503         if (rc < 0)
3504                 GOTO(out, rc);
3505         /* write the new one */
3506         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
3507                                   mti->mti_svname, comment);
3508         if (rc)
3509                 CERROR("%s: error writing log %s: rc = %d\n",
3510                        mgs->mgs_obd->obd_name, logname, rc);
3511 out:
3512         name_destroy(&logname);
3513 out_lcfg:
3514         lustre_cfg_rec_free(lcr);
3515 out_comment:
3516         OBD_FREE(comment, len + 1);
3517         RETURN(rc);
3518 }
3519
3520 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3521                                         char *param)
3522 {
3523         char    *ptr;
3524
3525         /* disable the adjustable udesc parameter for now, i.e. use default
3526          * setting that client always ship udesc to MDT if possible. to enable
3527          * it simply remove the following line */
3528         goto error_out;
3529
3530         ptr = strchr(param, '=');
3531         if (ptr == NULL)
3532                 goto error_out;
3533         *ptr++ = '\0';
3534
3535         if (strcmp(param, PARAM_SRPC_UDESC))
3536                 goto error_out;
3537
3538         if (strcmp(ptr, "yes") == 0) {
3539                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3540                 CWARN("Enable user descriptor shipping from client to MDT\n");
3541         } else if (strcmp(ptr, "no") == 0) {
3542                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3543                 CWARN("Disable user descriptor shipping from client to MDT\n");
3544         } else {
3545                 *(ptr - 1) = '=';
3546                 goto error_out;
3547         }
3548         return 0;
3549
3550 error_out:
3551         CERROR("Invalid param: %s\n", param);
3552         return -EINVAL;
3553 }
3554
3555 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3556                                   const char *svname,
3557                                   char *param)
3558 {
3559         struct sptlrpc_rule      rule;
3560         struct sptlrpc_rule_set *rset;
3561         int                      rc;
3562         ENTRY;
3563
3564         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3565                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3566                 RETURN(-EINVAL);
3567         }
3568
3569         if (strncmp(param, PARAM_SRPC_UDESC,
3570                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3571                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3572         }
3573
3574         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3575                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3576                 RETURN(-EINVAL);
3577         }
3578
3579         param += sizeof(PARAM_SRPC_FLVR) - 1;
3580
3581         rc = sptlrpc_parse_rule(param, &rule);
3582         if (rc)
3583                 RETURN(rc);
3584
3585         /* mgs rules implies must be mgc->mgs */
3586         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3587                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3588                      rule.sr_from != LUSTRE_SP_ANY) ||
3589                     (rule.sr_to != LUSTRE_SP_MGS &&
3590                      rule.sr_to != LUSTRE_SP_ANY))
3591                         RETURN(-EINVAL);
3592         }
3593
3594         /* preapre room for this coming rule. svcname format should be:
3595          * - fsname: general rule
3596          * - fsname-tgtname: target-specific rule
3597          */
3598         if (strchr(svname, '-')) {
3599                 struct mgs_tgt_srpc_conf *tgtconf;
3600                 int                       found = 0;
3601
3602                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3603                      tgtconf = tgtconf->mtsc_next) {
3604                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3605                                 found = 1;
3606                                 break;
3607                         }
3608                 }
3609
3610                 if (!found) {
3611                         int name_len;
3612
3613                         OBD_ALLOC_PTR(tgtconf);
3614                         if (tgtconf == NULL)
3615                                 RETURN(-ENOMEM);
3616
3617                         name_len = strlen(svname);
3618
3619                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3620                         if (tgtconf->mtsc_tgt == NULL) {
3621                                 OBD_FREE_PTR(tgtconf);
3622                                 RETURN(-ENOMEM);
3623                         }
3624                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3625
3626                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3627                         fsdb->fsdb_srpc_tgt = tgtconf;
3628                 }
3629
3630                 rset = &tgtconf->mtsc_rset;
3631         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3632                 /* put _mgs related srpc rule directly in mgs ruleset */
3633                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3634         } else {
3635                 rset = &fsdb->fsdb_srpc_gen;
3636         }
3637
3638         rc = sptlrpc_rule_set_merge(rset, &rule);
3639
3640         RETURN(rc);
3641 }
3642
3643 static int mgs_srpc_set_param(const struct lu_env *env,
3644                               struct mgs_device *mgs,
3645                               struct fs_db *fsdb,
3646                               struct mgs_target_info *mti,
3647                               char *param)
3648 {
3649         char                   *copy;
3650         int                     rc, copy_size;
3651         ENTRY;
3652
3653 #ifndef HAVE_GSS
3654         RETURN(-EINVAL);
3655 #endif
3656         /* keep a copy of original param, which could be destroied
3657          * during parsing */
3658         copy_size = strlen(param) + 1;
3659         OBD_ALLOC(copy, copy_size);
3660         if (copy == NULL)
3661                 return -ENOMEM;
3662         memcpy(copy, param, copy_size);
3663
3664         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3665         if (rc)
3666                 goto out_free;
3667
3668         /* previous steps guaranteed the syntax is correct */
3669         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3670         if (rc)
3671                 goto out_free;
3672
3673         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3674                 /*
3675                  * for mgs rules, make them effective immediately.
3676                  */
3677                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3678                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3679                                                  &fsdb->fsdb_srpc_gen);
3680         }
3681
3682 out_free:
3683         OBD_FREE(copy, copy_size);
3684         RETURN(rc);
3685 }
3686
3687 struct mgs_srpc_read_data {
3688         struct fs_db   *msrd_fsdb;
3689         int             msrd_skip;
3690 };
3691
3692 static int mgs_srpc_read_handler(const struct lu_env *env,
3693                                  struct llog_handle *llh,
3694                                  struct llog_rec_hdr *rec, void *data)
3695 {
3696         struct mgs_srpc_read_data *msrd = data;
3697         struct cfg_marker         *marker;
3698         struct lustre_cfg         *lcfg = REC_DATA(rec);
3699         char                      *svname, *param;
3700         int                        cfg_len, rc;
3701         ENTRY;
3702
3703         if (rec->lrh_type != OBD_CFG_REC) {
3704                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3705                 RETURN(-EINVAL);
3706         }
3707
3708         cfg_len = REC_DATA_LEN(rec);
3709
3710         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3711         if (rc) {
3712                 CERROR("Insane cfg\n");
3713                 RETURN(rc);
3714         }
3715
3716         if (lcfg->lcfg_command == LCFG_MARKER) {
3717                 marker = lustre_cfg_buf(lcfg, 1);
3718
3719                 if (marker->cm_flags & CM_START &&
3720                     marker->cm_flags & CM_SKIP)
3721                         msrd->msrd_skip = 1;
3722                 if (marker->cm_flags & CM_END)
3723                         msrd->msrd_skip = 0;
3724
3725                 RETURN(0);
3726         }
3727
3728         if (msrd->msrd_skip)
3729                 RETURN(0);
3730
3731         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3732                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3733                 RETURN(0);
3734         }
3735
3736         svname = lustre_cfg_string(lcfg, 0);
3737         if (svname == NULL) {
3738                 CERROR("svname is empty\n");
3739                 RETURN(0);
3740         }
3741
3742         par