Whamcloud - gitweb
LU-8900 snapshot: fork/erase configuration
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgs/mgs_llog.c
33  *
34  * Lustre Management Server (mgs) config llog creation
35  *
36  * Author: Nathan Rutman <nathan@clusterfs.com>
37  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
38  * Author: Mikhail Pershin <tappro@whamcloud.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MGS
42 #define D_MGS D_CONFIG
43
44 #include <obd.h>
45 #include <lustre_ioctl.h>
46 #include <lustre_param.h>
47 #include <lustre_sec.h>
48 #include <lustre_quota.h>
49
50 #include "mgs_internal.h"
51
52 /********************** Class functions ********************/
53
54 /**
55  * Find all logs in CONFIG directory and link then into list.
56  *
57  * \param[in] env       pointer to the thread context
58  * \param[in] mgs       pointer to the mgs device
59  * \param[out] log_list the list to hold the found llog name entry
60  *
61  * \retval              0 for success
62  * \retval              negative error number on failure
63  **/
64 int class_dentry_readdir(const struct lu_env *env, struct mgs_device *mgs,
65                          struct list_head *log_list)
66 {
67         struct dt_object *dir = mgs->mgs_configs_dir;
68         const struct dt_it_ops *iops;
69         struct dt_it *it;
70         struct mgs_direntry *de;
71         char *key;
72         int rc, key_sz;
73
74         INIT_LIST_HEAD(log_list);
75
76         LASSERT(dir);
77         LASSERT(dir->do_index_ops);
78
79         iops = &dir->do_index_ops->dio_it;
80         it = iops->init(env, dir, LUDA_64BITHASH);
81         if (IS_ERR(it))
82                 RETURN(PTR_ERR(it));
83
84         rc = iops->load(env, it, 0);
85         if (rc <= 0)
86                 GOTO(fini, rc = 0);
87
88         /* main cycle */
89         do {
90                 key = (void *)iops->key(env, it);
91                 if (IS_ERR(key)) {
92                         CERROR("%s: key failed when listing %s: rc = %d\n",
93                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
94                                (int) PTR_ERR(key));
95                         goto next;
96                 }
97                 key_sz = iops->key_size(env, it);
98                 LASSERT(key_sz > 0);
99
100                 /* filter out "." and ".." entries */
101                 if (key[0] == '.') {
102                         if (key_sz == 1)
103                                 goto next;
104                         if (key_sz == 2 && key[1] == '.')
105                                 goto next;
106                 }
107
108                 /* filter out ".bak" files */
109                 /* sizeof(".bak") - 1 == 3 */
110                 if (key_sz >= 3 &&
111                     !memcmp(".bak", key + key_sz - 3, 3)) {
112                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
113                                key_sz, key);
114                         goto next;
115                 }
116
117                 de = mgs_direntry_alloc(key_sz + 1);
118                 if (de == NULL) {
119                         rc = -ENOMEM;
120                         break;
121                 }
122
123                 memcpy(de->mde_name, key, key_sz);
124                 de->mde_name[key_sz] = 0;
125
126                 list_add(&de->mde_list, log_list);
127
128 next:
129                 rc = iops->next(env, it);
130         } while (rc == 0);
131         if (rc > 0)
132                 rc = 0;
133
134         iops->put(env, it);
135
136 fini:
137         iops->fini(env, it);
138         if (rc) {
139                 struct mgs_direntry *n;
140
141                 CERROR("%s: key failed when listing %s: rc = %d\n",
142                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
143
144                 list_for_each_entry_safe(de, n, log_list, mde_list) {
145                         list_del_init(&de->mde_list);
146                         mgs_direntry_free(de);
147                 }
148         }
149
150         RETURN(rc);
151 }
152
153 /******************** DB functions *********************/
154
155 static inline int name_create(char **newname, char *prefix, char *suffix)
156 {
157         LASSERT(newname);
158         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
159         if (!*newname)
160                 return -ENOMEM;
161         sprintf(*newname, "%s%s", prefix, suffix);
162         return 0;
163 }
164
165 static inline void name_destroy(char **name)
166 {
167         if (*name)
168                 OBD_FREE(*name, strlen(*name) + 1);
169         *name = NULL;
170 }
171
172 struct mgs_fsdb_handler_data
173 {
174         struct fs_db   *fsdb;
175         __u32           ver;
176 };
177
178 /* from the (client) config log, figure out:
179         1. which ost's/mdt's are configured (by index)
180         2. what the last config step is
181         3. COMPAT_18 osc name
182 */
183 /* It might be better to have a separate db file, instead of parsing the info
184    out of the client log.  This is slow and potentially error-prone. */
185 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
186                             struct llog_rec_hdr *rec, void *data)
187 {
188         struct mgs_fsdb_handler_data *d = data;
189         struct fs_db *fsdb = d->fsdb;
190         int cfg_len = rec->lrh_len;
191         char *cfg_buf = (char*) (rec + 1);
192         struct lustre_cfg *lcfg;
193         __u32 index;
194         int rc = 0;
195         ENTRY;
196
197         if (rec->lrh_type != OBD_CFG_REC) {
198                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
199                 RETURN(-EINVAL);
200         }
201
202         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
203         if (rc) {
204                 CERROR("Insane cfg\n");
205                 RETURN(rc);
206         }
207
208         lcfg = (struct lustre_cfg *)cfg_buf;
209
210         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
211                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
212
213         /* Figure out ost indicies */
214         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
215         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
216             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
217                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
218                                        NULL, 10);
219                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
220                        lustre_cfg_string(lcfg, 1), index,
221                        lustre_cfg_string(lcfg, 2));
222                 set_bit(index, fsdb->fsdb_ost_index_map);
223         }
224
225         /* Figure out mdt indicies */
226         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
227         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
228             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
229                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
230                                        &index, NULL);
231                 if (rc != LDD_F_SV_TYPE_MDT) {
232                         CWARN("Unparsable MDC name %s, assuming index 0\n",
233                               lustre_cfg_string(lcfg, 0));
234                         index = 0;
235                 }
236                 rc = 0;
237                 CDEBUG(D_MGS, "MDT index is %u\n", index);
238                 if (!test_bit(index, fsdb->fsdb_mdt_index_map)) {
239                         set_bit(index, fsdb->fsdb_mdt_index_map);
240                         fsdb->fsdb_mdt_count++;
241                 }
242         }
243
244         /**
245          * figure out the old config. fsdb_gen = 0 means old log
246          * It is obsoleted and not supported anymore
247          */
248         if (fsdb->fsdb_gen == 0) {
249                 CERROR("Old config format is not supported\n");
250                 RETURN(-EINVAL);
251         }
252
253         /*
254          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
255          */
256         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
257             lcfg->lcfg_command == LCFG_ATTACH &&
258             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
259                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
260                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
261                         CWARN("MDT using 1.8 OSC name scheme\n");
262                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
263                 }
264         }
265
266         if (lcfg->lcfg_command == LCFG_MARKER) {
267                 struct cfg_marker *marker;
268                 marker = lustre_cfg_buf(lcfg, 1);
269
270                 d->ver = marker->cm_vers;
271
272                 /* Keep track of the latest marker step */
273                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
274         }
275
276         RETURN(rc);
277 }
278
279 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
280 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
281                                   struct mgs_device *mgs,
282                                   struct fs_db *fsdb)
283 {
284         char *logname;
285         struct llog_handle *loghandle;
286         struct llog_ctxt *ctxt;
287         struct mgs_fsdb_handler_data d = {
288                 .fsdb = fsdb,
289         };
290         int rc;
291
292         ENTRY;
293
294         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
295         LASSERT(ctxt != NULL);
296         rc = name_create(&logname, fsdb->fsdb_name, "-client");
297         if (rc)
298                 GOTO(out_put, rc);
299         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
300         if (rc)
301                 GOTO(out_pop, rc);
302
303         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
304         if (rc)
305                 GOTO(out_close, rc);
306
307         if (llog_get_size(loghandle) <= 1)
308                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
309
310         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
311         CDEBUG(D_INFO, "get_db = %d\n", rc);
312 out_close:
313         llog_close(env, loghandle);
314 out_pop:
315         name_destroy(&logname);
316 out_put:
317         llog_ctxt_put(ctxt);
318
319         RETURN(rc);
320 }
321
322 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
323 {
324         struct mgs_tgt_srpc_conf *tgtconf;
325
326         /* free target-specific rules */
327         while (fsdb->fsdb_srpc_tgt) {
328                 tgtconf = fsdb->fsdb_srpc_tgt;
329                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
330
331                 LASSERT(tgtconf->mtsc_tgt);
332
333                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
334                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
335                 OBD_FREE_PTR(tgtconf);
336         }
337
338         /* free general rules */
339         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
340 }
341
342 static void mgs_unlink_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
343 {
344         mutex_lock(&mgs->mgs_mutex);
345         if (likely(!list_empty(&fsdb->fsdb_list))) {
346                 LASSERTF(atomic_read(&fsdb->fsdb_ref) >= 2,
347                          "Invalid ref %d on %s\n",
348                          atomic_read(&fsdb->fsdb_ref),
349                          fsdb->fsdb_name);
350
351                 list_del_init(&fsdb->fsdb_list);
352                 /* Drop the reference on the list.*/
353                 mgs_put_fsdb(mgs, fsdb);
354         }
355         mutex_unlock(&mgs->mgs_mutex);
356 }
357
358 /* The caller must hold mgs->mgs_mutex. */
359 static inline struct fs_db *
360 mgs_find_fsdb_noref(struct mgs_device *mgs, const char *fsname)
361 {
362         struct fs_db *fsdb;
363         struct list_head *tmp;
364
365         list_for_each(tmp, &mgs->mgs_fs_db_list) {
366                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
367                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
368                         return fsdb;
369         }
370
371         return NULL;
372 }
373
374 /* The caller must hold mgs->mgs_mutex. */
375 static void mgs_remove_fsdb_by_name(struct mgs_device *mgs, const char *name)
376 {
377         struct fs_db *fsdb;
378
379         fsdb = mgs_find_fsdb_noref(mgs, name);
380         if (fsdb) {
381                 list_del_init(&fsdb->fsdb_list);
382                 /* Drop the reference on the list.*/
383                 mgs_put_fsdb(mgs, fsdb);
384         }
385 }
386
387 /* The caller must hold mgs->mgs_mutex. */
388 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, const char *fsname)
389 {
390         struct fs_db *fsdb;
391
392         fsdb = mgs_find_fsdb_noref(mgs, fsname);
393         if (fsdb)
394                 atomic_inc(&fsdb->fsdb_ref);
395
396         return fsdb;
397 }
398
399 /* The caller must hold mgs->mgs_mutex. */
400 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
401                                   struct mgs_device *mgs, char *fsname)
402 {
403         struct fs_db *fsdb;
404         int rc;
405         ENTRY;
406
407         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
408                 CERROR("fsname %s is too long\n", fsname);
409
410                 RETURN(ERR_PTR(-EINVAL));
411         }
412
413         OBD_ALLOC_PTR(fsdb);
414         if (!fsdb)
415                 RETURN(ERR_PTR(-ENOMEM));
416
417         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
418         mutex_init(&fsdb->fsdb_mutex);
419         INIT_LIST_HEAD(&fsdb->fsdb_list);
420         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
421         fsdb->fsdb_gen = 1;
422         INIT_LIST_HEAD(&fsdb->fsdb_clients);
423         atomic_set(&fsdb->fsdb_notify_phase, 0);
424         init_waitqueue_head(&fsdb->fsdb_notify_waitq);
425         init_completion(&fsdb->fsdb_notify_comp);
426
427         if (strcmp(fsname, MGSSELF_NAME) == 0) {
428                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
429                 fsdb->fsdb_mgs = mgs;
430                 if (logname_is_barrier(fsname))
431                         goto add;
432         } else {
433                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
434                 if (!fsdb->fsdb_mdt_index_map) {
435                         CERROR("No memory for MDT index maps\n");
436
437                         GOTO(err, rc = -ENOMEM);
438                 }
439
440                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
441                 if (!fsdb->fsdb_ost_index_map) {
442                         CERROR("No memory for OST index maps\n");
443
444                         GOTO(err, rc = -ENOMEM);
445                 }
446
447                 if (logname_is_barrier(fsname))
448                         goto add;
449
450                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
451                 if (rc)
452                         GOTO(err, rc);
453
454                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
455                 if (rc)
456                         GOTO(err, rc);
457
458                 /* initialise data for NID table */
459                 mgs_ir_init_fs(env, mgs, fsdb);
460                 lproc_mgs_add_live(mgs, fsdb);
461         }
462
463         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
464                 /* populate the db from the client llog */
465                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
466                 if (rc) {
467                         CERROR("Can't get db from client log %d\n", rc);
468
469                         GOTO(err, rc);
470                 }
471         }
472
473         /* populate srpc rules from params llog */
474         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
475         if (rc) {
476                 CERROR("Can't get db from params log %d\n", rc);
477
478                 GOTO(err, rc);
479         }
480
481 add:
482         /* One ref is for the fsdb on the list.
483          * The other ref is for the caller. */
484         atomic_set(&fsdb->fsdb_ref, 2);
485         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
486
487         RETURN(fsdb);
488
489 err:
490         atomic_set(&fsdb->fsdb_ref, 1);
491         mgs_put_fsdb(mgs, fsdb);
492
493         RETURN(ERR_PTR(rc));
494 }
495
496 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
497 {
498         LASSERT(list_empty(&fsdb->fsdb_list));
499
500         lproc_mgs_del_live(mgs, fsdb);
501
502         /* deinitialize fsr */
503         if (fsdb->fsdb_mgs)
504                 mgs_ir_fini_fs(mgs, fsdb);
505
506         if (fsdb->fsdb_ost_index_map)
507                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
508         if (fsdb->fsdb_mdt_index_map)
509                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
510         name_destroy(&fsdb->fsdb_clilov);
511         name_destroy(&fsdb->fsdb_clilmv);
512         mgs_free_fsdb_srpc(fsdb);
513         OBD_FREE_PTR(fsdb);
514 }
515
516 void mgs_put_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
517 {
518         if (atomic_dec_and_test(&fsdb->fsdb_ref))
519                 mgs_free_fsdb(mgs, fsdb);
520 }
521
522 int mgs_init_fsdb_list(struct mgs_device *mgs)
523 {
524         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
525         return 0;
526 }
527
528 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
529 {
530         struct fs_db *fsdb;
531         struct list_head *tmp, *tmp2;
532
533         mutex_lock(&mgs->mgs_mutex);
534         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
535                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
536                 list_del_init(&fsdb->fsdb_list);
537                 mgs_put_fsdb(mgs, fsdb);
538         }
539         mutex_unlock(&mgs->mgs_mutex);
540         return 0;
541 }
542
543 int mgs_find_or_make_fsdb(const struct lu_env *env, struct mgs_device *mgs,
544                           char *name, struct fs_db **dbh)
545 {
546         struct fs_db *fsdb;
547         int rc = 0;
548         ENTRY;
549
550         mutex_lock(&mgs->mgs_mutex);
551         fsdb = mgs_find_fsdb(mgs, name);
552         if (!fsdb) {
553                 fsdb = mgs_new_fsdb(env, mgs, name);
554                 if (IS_ERR(fsdb))
555                         rc = PTR_ERR(fsdb);
556
557                 CDEBUG(D_MGS, "Created new db: rc = %d\n", rc);
558         }
559         mutex_unlock(&mgs->mgs_mutex);
560
561         if (!rc)
562                 *dbh = fsdb;
563
564         RETURN(rc);
565 }
566
567 /* 1 = index in use
568    0 = index unused
569    -1= empty client log */
570 int mgs_check_index(const struct lu_env *env,
571                     struct mgs_device *mgs,
572                     struct mgs_target_info *mti)
573 {
574         struct fs_db *fsdb;
575         void *imap;
576         int rc = 0;
577         ENTRY;
578
579         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
580
581         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
582         if (rc) {
583                 CERROR("Can't get db for %s\n", mti->mti_fsname);
584                 RETURN(rc);
585         }
586
587         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
588                 GOTO(out, rc = -1);
589
590         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
591                 imap = fsdb->fsdb_ost_index_map;
592         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
593                 imap = fsdb->fsdb_mdt_index_map;
594         else
595                 GOTO(out, rc = -EINVAL);
596
597         if (test_bit(mti->mti_stripe_index, imap))
598                 GOTO(out, rc = 1);
599
600         GOTO(out, rc = 0);
601
602 out:
603         mgs_put_fsdb(mgs, fsdb);
604         return rc;
605 }
606
607 static __inline__ int next_index(void *index_map, int map_len)
608 {
609         int i;
610         for (i = 0; i < map_len * 8; i++)
611                 if (!test_bit(i, index_map)) {
612                          return i;
613                  }
614         CERROR("max index %d exceeded.\n", i);
615         return -1;
616 }
617
618 /* Return codes:
619         0  newly marked as in use
620         <0 err
621         +EALREADY for update of an old index */
622 static int mgs_set_index(const struct lu_env *env,
623                          struct mgs_device *mgs,
624                          struct mgs_target_info *mti)
625 {
626         struct fs_db *fsdb;
627         void *imap;
628         int rc = 0;
629         ENTRY;
630
631         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
632         if (rc) {
633                 CERROR("Can't get db for %s\n", mti->mti_fsname);
634                 RETURN(rc);
635         }
636
637         mutex_lock(&fsdb->fsdb_mutex);
638         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
639                 imap = fsdb->fsdb_ost_index_map;
640         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
641                 imap = fsdb->fsdb_mdt_index_map;
642         } else {
643                 GOTO(out_up, rc = -EINVAL);
644         }
645
646         if (mti->mti_flags & LDD_F_NEED_INDEX) {
647                 rc = next_index(imap, INDEX_MAP_SIZE);
648                 if (rc == -1)
649                         GOTO(out_up, rc = -ERANGE);
650                 mti->mti_stripe_index = rc;
651         }
652
653         /* the last index(0xffff) is reserved for default value. */
654         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
655                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
656                                    "but index must be less than %u.\n",
657                                    mti->mti_svname, mti->mti_stripe_index,
658                                    INDEX_MAP_SIZE * 8 - 1);
659                 GOTO(out_up, rc = -ERANGE);
660         }
661
662         if (test_bit(mti->mti_stripe_index, imap)) {
663                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
664                     !(mti->mti_flags & LDD_F_WRITECONF)) {
665                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
666                                            "%d, but that index is already in "
667                                            "use. Use --writeconf to force\n",
668                                            mti->mti_svname,
669                                            mti->mti_stripe_index);
670                         GOTO(out_up, rc = -EADDRINUSE);
671                 } else {
672                         CDEBUG(D_MGS, "Server %s updating index %d\n",
673                                mti->mti_svname, mti->mti_stripe_index);
674                         GOTO(out_up, rc = EALREADY);
675                 }
676         } else {
677                 set_bit(mti->mti_stripe_index, imap);
678                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
679                         fsdb->fsdb_mdt_count++;
680         }
681
682         set_bit(mti->mti_stripe_index, imap);
683         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
684         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
685                              mti->mti_stripe_index, mti->mti_fsname,
686                              mti->mti_svname)) {
687                 CERROR("unknown server type %#x\n", mti->mti_flags);
688                 GOTO(out_up, rc = -EINVAL);
689         }
690
691         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
692                mti->mti_stripe_index);
693
694         GOTO(out_up, rc = 0);
695
696 out_up:
697         mutex_unlock(&fsdb->fsdb_mutex);
698         mgs_put_fsdb(mgs, fsdb);
699         return rc;
700 }
701
702 struct mgs_modify_lookup {
703         struct cfg_marker mml_marker;
704         int               mml_modified;
705 };
706
707 static int mgs_check_record_match(const struct lu_env *env,
708                                 struct llog_handle *llh,
709                                 struct llog_rec_hdr *rec, void *data)
710 {
711         struct cfg_marker *mc_marker = data;
712         struct cfg_marker *marker;
713         struct lustre_cfg *lcfg = REC_DATA(rec);
714         int cfg_len = REC_DATA_LEN(rec);
715         int rc;
716         ENTRY;
717
718
719         if (rec->lrh_type != OBD_CFG_REC) {
720                 CDEBUG(D_ERROR, "Unhandled lrh_type: %#x\n", rec->lrh_type);
721                 RETURN(-EINVAL);
722         }
723
724         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
725         if (rc) {
726                 CDEBUG(D_ERROR, "Insane cfg\n");
727                 RETURN(rc);
728         }
729
730         /* We only care about markers */
731         if (lcfg->lcfg_command != LCFG_MARKER)
732                 RETURN(0);
733
734         marker = lustre_cfg_buf(lcfg, 1);
735
736         if (marker->cm_flags & CM_SKIP)
737                 RETURN(0);
738
739         if ((strcmp(mc_marker->cm_comment, marker->cm_comment) == 0) &&
740                 (strcmp(mc_marker->cm_tgtname, marker->cm_tgtname) == 0)) {
741                 /* Found a non-skipped marker match */
742                 CDEBUG(D_MGS, "Matched rec %u marker %d flag %x %s %s\n",
743                         rec->lrh_index, marker->cm_step,
744                         marker->cm_flags, marker->cm_tgtname,
745                         marker->cm_comment);
746                 rc = LLOG_PROC_BREAK;
747         }
748
749         RETURN(rc);
750 }
751
752 /**
753  * Check an existing config log record with matching comment and device
754  * Return code:
755  * 0 - checked successfully,
756  * LLOG_PROC_BREAK - record matches
757  * negative - error
758  */
759 static int mgs_check_marker(const struct lu_env *env, struct mgs_device *mgs,
760                 struct fs_db *fsdb, struct mgs_target_info *mti,
761                 char *logname, char *devname, char *comment)
762 {
763         struct llog_handle *loghandle;
764         struct llog_ctxt *ctxt;
765         struct cfg_marker *mc_marker;
766         int rc;
767
768         ENTRY;
769
770         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
771         CDEBUG(D_MGS, "mgs check %s/%s/%s\n", logname, devname, comment);
772
773         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
774         LASSERT(ctxt != NULL);
775         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
776         if (rc < 0) {
777                 if (rc == -ENOENT)
778                         rc = 0;
779                 GOTO(out_pop, rc);
780         }
781
782         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
783         if (rc)
784                 GOTO(out_close, rc);
785
786         if (llog_get_size(loghandle) <= 1)
787                 GOTO(out_close, rc = 0);
788
789         OBD_ALLOC_PTR(mc_marker);
790         if (!mc_marker)
791                 GOTO(out_close, rc = -ENOMEM);
792         if (strlcpy(mc_marker->cm_comment, comment,
793                 sizeof(mc_marker->cm_comment)) >=
794                 sizeof(mc_marker->cm_comment))
795                 GOTO(out_free, rc = -E2BIG);
796         if (strlcpy(mc_marker->cm_tgtname, devname,
797                 sizeof(mc_marker->cm_tgtname)) >=
798                 sizeof(mc_marker->cm_tgtname))
799                 GOTO(out_free, rc = -E2BIG);
800
801         rc = llog_process(env, loghandle, mgs_check_record_match,
802                         (void *)mc_marker, NULL);
803
804 out_free:
805         OBD_FREE_PTR(mc_marker);
806
807 out_close:
808         llog_close(env, loghandle);
809 out_pop:
810         if (rc && rc != LLOG_PROC_BREAK)
811                 CDEBUG(D_ERROR, "%s: mgs check %s/%s failed: rc = %d\n",
812                         mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
813         llog_ctxt_put(ctxt);
814         RETURN(rc);
815 }
816
817 static int mgs_modify_handler(const struct lu_env *env,
818                               struct llog_handle *llh,
819                               struct llog_rec_hdr *rec, void *data)
820 {
821         struct mgs_modify_lookup *mml = data;
822         struct cfg_marker *marker;
823         struct lustre_cfg *lcfg = REC_DATA(rec);
824         int cfg_len = REC_DATA_LEN(rec);
825         int rc;
826         ENTRY;
827
828         if (rec->lrh_type != OBD_CFG_REC) {
829                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
830                 RETURN(-EINVAL);
831         }
832
833         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
834         if (rc) {
835                 CERROR("Insane cfg\n");
836                 RETURN(rc);
837         }
838
839         /* We only care about markers */
840         if (lcfg->lcfg_command != LCFG_MARKER)
841                 RETURN(0);
842
843         marker = lustre_cfg_buf(lcfg, 1);
844         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
845             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
846             !(marker->cm_flags & CM_SKIP)) {
847                 /* Found a non-skipped marker match */
848                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
849                        rec->lrh_index, marker->cm_step,
850                        marker->cm_flags, mml->mml_marker.cm_flags,
851                        marker->cm_tgtname, marker->cm_comment);
852                 /* Overwrite the old marker llog entry */
853                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
854                 marker->cm_flags |= mml->mml_marker.cm_flags;
855                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
856                 rc = llog_write(env, llh, rec, rec->lrh_index);
857                 if (!rc)
858                          mml->mml_modified++;
859         }
860
861         RETURN(rc);
862 }
863
864 /**
865  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
866  * Return code:
867  * 0 - modified successfully,
868  * 1 - no modification was done
869  * negative - error
870  */
871 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
872                       struct fs_db *fsdb, struct mgs_target_info *mti,
873                       char *logname, char *devname, char *comment, int flags)
874 {
875         struct llog_handle *loghandle;
876         struct llog_ctxt *ctxt;
877         struct mgs_modify_lookup *mml;
878         int rc;
879
880         ENTRY;
881
882         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
883         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
884                flags);
885
886         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
887         LASSERT(ctxt != NULL);
888         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
889         if (rc < 0) {
890                 if (rc == -ENOENT)
891                         rc = 0;
892                 GOTO(out_pop, rc);
893         }
894
895         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
896         if (rc)
897                 GOTO(out_close, rc);
898
899         if (llog_get_size(loghandle) <= 1)
900                 GOTO(out_close, rc = 0);
901
902         OBD_ALLOC_PTR(mml);
903         if (!mml)
904                 GOTO(out_close, rc = -ENOMEM);
905         if (strlcpy(mml->mml_marker.cm_comment, comment,
906                     sizeof(mml->mml_marker.cm_comment)) >=
907             sizeof(mml->mml_marker.cm_comment))
908                 GOTO(out_free, rc = -E2BIG);
909         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
910                     sizeof(mml->mml_marker.cm_tgtname)) >=
911             sizeof(mml->mml_marker.cm_tgtname))
912                 GOTO(out_free, rc = -E2BIG);
913         /* Modify mostly means cancel */
914         mml->mml_marker.cm_flags = flags;
915         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
916         mml->mml_modified = 0;
917         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
918                           NULL);
919         if (!rc && !mml->mml_modified)
920                 rc = 1;
921
922 out_free:
923         OBD_FREE_PTR(mml);
924
925 out_close:
926         llog_close(env, loghandle);
927 out_pop:
928         if (rc < 0)
929                 CERROR("%s: modify %s/%s failed: rc = %d\n",
930                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
931         llog_ctxt_put(ctxt);
932         RETURN(rc);
933 }
934
935 /** This structure is passed to mgs_replace_handler */
936 struct mgs_replace_uuid_lookup {
937         /* Nids are replaced for this target device */
938         struct mgs_target_info target;
939         /* Temporary modified llog */
940         struct llog_handle *temp_llh;
941         /* Flag is set if in target block*/
942         int in_target_device;
943         /* Nids already added. Just skip (multiple nids) */
944         int device_nids_added;
945         /* Flag is set if this block should not be copied */
946         int skip_it;
947 };
948
949 /**
950  * Check: a) if block should be skipped
951  * b) is it target block
952  *
953  * \param[in] lcfg
954  * \param[in] mrul
955  *
956  * \retval 0 should not to be skipped
957  * \retval 1 should to be skipped
958  */
959 static int check_markers(struct lustre_cfg *lcfg,
960                          struct mgs_replace_uuid_lookup *mrul)
961 {
962          struct cfg_marker *marker;
963
964         /* Track markers. Find given device */
965         if (lcfg->lcfg_command == LCFG_MARKER) {
966                 marker = lustre_cfg_buf(lcfg, 1);
967                 /* Clean llog from records marked as CM_EXCLUDE.
968                    CM_SKIP records are used for "active" command
969                    and can be restored if needed */
970                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
971                     (CM_EXCLUDE | CM_START)) {
972                         mrul->skip_it = 1;
973                         return 1;
974                 }
975
976                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
977                     (CM_EXCLUDE | CM_END)) {
978                         mrul->skip_it = 0;
979                         return 1;
980                 }
981
982                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
983                         LASSERT(!(marker->cm_flags & CM_START) ||
984                                 !(marker->cm_flags & CM_END));
985                         if (marker->cm_flags & CM_START) {
986                                 mrul->in_target_device = 1;
987                                 mrul->device_nids_added = 0;
988                         } else if (marker->cm_flags & CM_END)
989                                 mrul->in_target_device = 0;
990                 }
991         }
992
993         return 0;
994 }
995
996 static int record_base(const struct lu_env *env, struct llog_handle *llh,
997                      char *cfgname, lnet_nid_t nid, int cmd,
998                      char *s1, char *s2, char *s3, char *s4)
999 {
1000         struct mgs_thread_info  *mgi = mgs_env_info(env);
1001         struct llog_cfg_rec     *lcr;
1002         int rc;
1003
1004         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
1005                cmd, s1, s2, s3, s4);
1006
1007         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
1008         if (s1)
1009                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
1010         if (s2)
1011                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
1012         if (s3)
1013                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
1014         if (s4)
1015                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
1016
1017         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
1018         if (lcr == NULL)
1019                 return -ENOMEM;
1020
1021         lcr->lcr_cfg.lcfg_nid = nid;
1022         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1023
1024         lustre_cfg_rec_free(lcr);
1025
1026         if (rc < 0)
1027                 CDEBUG(D_MGS,
1028                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
1029                        cfgname, cmd, s1, s2, s3, s4, rc);
1030         return rc;
1031 }
1032
1033 static inline int record_add_uuid(const struct lu_env *env,
1034                                   struct llog_handle *llh,
1035                                   uint64_t nid, char *uuid)
1036 {
1037         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
1038                            NULL, NULL, NULL);
1039 }
1040
1041 static inline int record_add_conn(const struct lu_env *env,
1042                                   struct llog_handle *llh,
1043                                   char *devname, char *uuid)
1044 {
1045         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
1046                            NULL, NULL, NULL);
1047 }
1048
1049 static inline int record_attach(const struct lu_env *env,
1050                                 struct llog_handle *llh, char *devname,
1051                                 char *type, char *uuid)
1052 {
1053         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
1054                            NULL, NULL);
1055 }
1056
1057 static inline int record_setup(const struct lu_env *env,
1058                                struct llog_handle *llh, char *devname,
1059                                char *s1, char *s2, char *s3, char *s4)
1060 {
1061         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
1062 }
1063
1064 /**
1065  * \retval <0 record processing error
1066  * \retval n record is processed. No need copy original one.
1067  * \retval 0 record is not processed.
1068  */
1069 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
1070                            struct mgs_replace_uuid_lookup *mrul)
1071 {
1072         int nids_added = 0;
1073         lnet_nid_t nid;
1074         char *ptr;
1075         int rc;
1076
1077         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1078                 /* LCFG_ADD_UUID command found. Let's skip original command
1079                    and add passed nids */
1080                 ptr = mrul->target.mti_params;
1081                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1082                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
1083                                "device %s\n", libcfs_nid2str(nid),
1084                                 mrul->target.mti_params,
1085                                 mrul->target.mti_svname);
1086                         rc = record_add_uuid(env,
1087                                              mrul->temp_llh, nid,
1088                                              mrul->target.mti_params);
1089                         if (!rc)
1090                                 nids_added++;
1091                 }
1092
1093                 if (nids_added == 0) {
1094                         CERROR("No new nids were added, nid %s with uuid %s, "
1095                                "device %s\n", libcfs_nid2str(nid),
1096                                mrul->target.mti_params,
1097                                mrul->target.mti_svname);
1098                         RETURN(-ENXIO);
1099                 } else {
1100                         mrul->device_nids_added = 1;
1101                 }
1102
1103                 return nids_added;
1104         }
1105
1106         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
1107                 /* LCFG_SETUP command found. UUID should be changed */
1108                 rc = record_setup(env,
1109                                   mrul->temp_llh,
1110                                   /* devname the same */
1111                                   lustre_cfg_string(lcfg, 0),
1112                                   /* s1 is not changed */
1113                                   lustre_cfg_string(lcfg, 1),
1114                                   /* new uuid should be
1115                                   the full nidlist */
1116                                   mrul->target.mti_params,
1117                                   /* s3 is not changed */
1118                                   lustre_cfg_string(lcfg, 3),
1119                                   /* s4 is not changed */
1120                                   lustre_cfg_string(lcfg, 4));
1121                 return rc ? rc : 1;
1122         }
1123
1124         /* Another commands in target device block */
1125         return 0;
1126 }
1127
1128 /**
1129  * Handler that called for every record in llog.
1130  * Records are processed in order they placed in llog.
1131  *
1132  * \param[in] llh       log to be processed
1133  * \param[in] rec       current record
1134  * \param[in] data      mgs_replace_uuid_lookup structure
1135  *
1136  * \retval 0    success
1137  */
1138 static int mgs_replace_handler(const struct lu_env *env,
1139                                struct llog_handle *llh,
1140                                struct llog_rec_hdr *rec,
1141                                void *data)
1142 {
1143         struct mgs_replace_uuid_lookup *mrul;
1144         struct lustre_cfg *lcfg = REC_DATA(rec);
1145         int cfg_len = REC_DATA_LEN(rec);
1146         int rc;
1147         ENTRY;
1148
1149         mrul = (struct mgs_replace_uuid_lookup *)data;
1150
1151         if (rec->lrh_type != OBD_CFG_REC) {
1152                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
1153                        rec->lrh_type, lcfg->lcfg_command,
1154                        lustre_cfg_string(lcfg, 0),
1155                        lustre_cfg_string(lcfg, 1));
1156                 RETURN(-EINVAL);
1157         }
1158
1159         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1160         if (rc) {
1161                 /* Do not copy any invalidated records */
1162                 GOTO(skip_out, rc = 0);
1163         }
1164
1165         rc = check_markers(lcfg, mrul);
1166         if (rc || mrul->skip_it)
1167                 GOTO(skip_out, rc = 0);
1168
1169         /* Write to new log all commands outside target device block */
1170         if (!mrul->in_target_device)
1171                 GOTO(copy_out, rc = 0);
1172
1173         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
1174            (failover nids) for this target, assuming that if then
1175            primary is changing then so is the failover */
1176         if (mrul->device_nids_added &&
1177             (lcfg->lcfg_command == LCFG_ADD_UUID ||
1178              lcfg->lcfg_command == LCFG_ADD_CONN))
1179                 GOTO(skip_out, rc = 0);
1180
1181         rc = process_command(env, lcfg, mrul);
1182         if (rc < 0)
1183                 RETURN(rc);
1184
1185         if (rc)
1186                 RETURN(0);
1187 copy_out:
1188         /* Record is placed in temporary llog as is */
1189         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
1190
1191         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1192                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1193                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1194         RETURN(rc);
1195
1196 skip_out:
1197         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1198                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1199                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1200         RETURN(rc);
1201 }
1202
1203 static int mgs_log_is_empty(const struct lu_env *env,
1204                             struct mgs_device *mgs, char *name)
1205 {
1206         struct llog_ctxt        *ctxt;
1207         int                      rc;
1208
1209         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1210         LASSERT(ctxt != NULL);
1211
1212         rc = llog_is_empty(env, ctxt, name);
1213         llog_ctxt_put(ctxt);
1214         return rc;
1215 }
1216
1217 static int mgs_replace_nids_log(const struct lu_env *env,
1218                                 struct obd_device *mgs, struct fs_db *fsdb,
1219                                 char *logname, char *devname, char *nids)
1220 {
1221         struct llog_handle *orig_llh, *backup_llh;
1222         struct llog_ctxt *ctxt;
1223         struct mgs_replace_uuid_lookup *mrul;
1224         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1225         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1226         char *backup;
1227         int rc, rc2;
1228         ENTRY;
1229
1230         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1231
1232         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1233         LASSERT(ctxt != NULL);
1234
1235         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1236                 /* Log is empty. Nothing to replace */
1237                 GOTO(out_put, rc = 0);
1238         }
1239
1240         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1241         if (backup == NULL)
1242                 GOTO(out_put, rc = -ENOMEM);
1243
1244         sprintf(backup, "%s.bak", logname);
1245
1246         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1247         if (rc == 0) {
1248                 /* Now erase original log file. Connections are not allowed.
1249                    Backup is already saved */
1250                 rc = llog_erase(env, ctxt, NULL, logname);
1251                 if (rc < 0)
1252                         GOTO(out_free, rc);
1253         } else if (rc != -ENOENT) {
1254                 CERROR("%s: can't make backup for %s: rc = %d\n",
1255                        mgs->obd_name, logname, rc);
1256                 GOTO(out_free,rc);
1257         }
1258
1259         /* open local log */
1260         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1261         if (rc)
1262                 GOTO(out_restore, rc);
1263
1264         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1265         if (rc)
1266                 GOTO(out_closel, rc);
1267
1268         /* open backup llog */
1269         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1270                        LLOG_OPEN_EXISTS);
1271         if (rc)
1272                 GOTO(out_closel, rc);
1273
1274         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1275         if (rc)
1276                 GOTO(out_close, rc);
1277
1278         if (llog_get_size(backup_llh) <= 1)
1279                 GOTO(out_close, rc = 0);
1280
1281         OBD_ALLOC_PTR(mrul);
1282         if (!mrul)
1283                 GOTO(out_close, rc = -ENOMEM);
1284         /* devname is only needed information to replace UUID records */
1285         strlcpy(mrul->target.mti_svname, devname,
1286                 sizeof(mrul->target.mti_svname));
1287         /* parse nids later */
1288         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1289         /* Copy records to this temporary llog */
1290         mrul->temp_llh = orig_llh;
1291
1292         rc = llog_process(env, backup_llh, mgs_replace_handler,
1293                           (void *)mrul, NULL);
1294         OBD_FREE_PTR(mrul);
1295 out_close:
1296         rc2 = llog_close(NULL, backup_llh);
1297         if (!rc)
1298                 rc = rc2;
1299 out_closel:
1300         rc2 = llog_close(NULL, orig_llh);
1301         if (!rc)
1302                 rc = rc2;
1303
1304 out_restore:
1305         if (rc) {
1306                 CERROR("%s: llog should be restored: rc = %d\n",
1307                        mgs->obd_name, rc);
1308                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1309                                   logname);
1310                 if (rc2 < 0)
1311                         CERROR("%s: can't restore backup %s: rc = %d\n",
1312                                mgs->obd_name, logname, rc2);
1313         }
1314
1315 out_free:
1316         OBD_FREE(backup, strlen(backup) + 1);
1317
1318 out_put:
1319         llog_ctxt_put(ctxt);
1320
1321         if (rc)
1322                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1323                        mgs->obd_name, logname, rc);
1324
1325         RETURN(rc);
1326 }
1327
1328 /**
1329  * Parse device name and get file system name and/or device index
1330  *
1331  * \param[in]   devname device name (ex. lustre-MDT0000)
1332  * \param[out]  fsname  file system name(optional)
1333  * \param[out]  index   device index(optional)
1334  *
1335  * \retval 0    success
1336  */
1337 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1338 {
1339         int rc;
1340         ENTRY;
1341
1342         /* Extract fsname */
1343         if (fsname) {
1344                 rc = server_name2fsname(devname, fsname, NULL);
1345                 if (rc < 0) {
1346                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1347                                devname);
1348                         RETURN(-EINVAL);
1349                 }
1350         }
1351
1352         if (index) {
1353                 rc = server_name2index(devname, index, NULL);
1354                 if (rc < 0) {
1355                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1356                                devname);
1357                         RETURN(-EINVAL);
1358                 }
1359         }
1360
1361         RETURN(0);
1362 }
1363
1364 /* This is only called during replace_nids */
1365 static int only_mgs_is_running(struct obd_device *mgs_obd)
1366 {
1367         /* TDB: Is global variable with devices count exists? */
1368         int num_devices = get_devices_count();
1369         int num_exports = 0;
1370         struct obd_export *exp;
1371
1372         spin_lock(&mgs_obd->obd_dev_lock);
1373         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1374                 /* skip self export */
1375                 if (exp == mgs_obd->obd_self_export)
1376                         continue;
1377                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1378                         continue;
1379
1380                 ++num_exports;
1381
1382                 CERROR("%s: node %s still connected during replace_nids "
1383                        "connect_flags:%llx\n",
1384                        mgs_obd->obd_name,
1385                        libcfs_nid2str(exp->exp_nid_stats->nid),
1386                        exp_connect_flags(exp));
1387
1388         }
1389         spin_unlock(&mgs_obd->obd_dev_lock);
1390
1391         /* osd, MGS and MGC + self_export
1392            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1393         return (num_devices <= 3) && (num_exports == 0);
1394 }
1395
1396 static int name_create_mdt(char **logname, char *fsname, int i)
1397 {
1398         char mdt_index[9];
1399
1400         sprintf(mdt_index, "-MDT%04x", i);
1401         return name_create(logname, fsname, mdt_index);
1402 }
1403
1404 /**
1405  * Replace nids for \a device to \a nids values
1406  *
1407  * \param obd           MGS obd device
1408  * \param devname       nids need to be replaced for this device
1409  * (ex. lustre-OST0000)
1410  * \param nids          nids list (ex. nid1,nid2,nid3)
1411  *
1412  * \retval 0    success
1413  */
1414 int mgs_replace_nids(const struct lu_env *env,
1415                      struct mgs_device *mgs,
1416                      char *devname, char *nids)
1417 {
1418         /* Assume fsname is part of device name */
1419         char fsname[MTI_NAME_MAXLEN];
1420         int rc;
1421         __u32 index;
1422         char *logname;
1423         struct fs_db *fsdb = NULL;
1424         unsigned int i;
1425         int conn_state;
1426         struct obd_device *mgs_obd = mgs->mgs_obd;
1427         ENTRY;
1428
1429         /* We can only change NIDs if no other nodes are connected */
1430         spin_lock(&mgs_obd->obd_dev_lock);
1431         conn_state = mgs_obd->obd_no_conn;
1432         mgs_obd->obd_no_conn = 1;
1433         spin_unlock(&mgs_obd->obd_dev_lock);
1434
1435         /* We can not change nids if not only MGS is started */
1436         if (!only_mgs_is_running(mgs_obd)) {
1437                 CERROR("Only MGS is allowed to be started\n");
1438                 GOTO(out, rc = -EINPROGRESS);
1439         }
1440
1441         /* Get fsname and index*/
1442         rc = mgs_parse_devname(devname, fsname, &index);
1443         if (rc)
1444                 GOTO(out, rc);
1445
1446         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1447         if (rc) {
1448                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1449                 GOTO(out, rc);
1450         }
1451
1452         /* Process client llogs */
1453         name_create(&logname, fsname, "-client");
1454         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1455         name_destroy(&logname);
1456         if (rc) {
1457                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1458                        fsname, devname, rc);
1459                 GOTO(out, rc);
1460         }
1461
1462         /* Process MDT llogs */
1463         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1464                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1465                         continue;
1466                 name_create_mdt(&logname, fsname, i);
1467                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1468                 name_destroy(&logname);
1469                 if (rc)
1470                         GOTO(out, rc);
1471         }
1472
1473 out:
1474         spin_lock(&mgs_obd->obd_dev_lock);
1475         mgs_obd->obd_no_conn = conn_state;
1476         spin_unlock(&mgs_obd->obd_dev_lock);
1477
1478         if (fsdb)
1479                 mgs_put_fsdb(mgs, fsdb);
1480
1481         RETURN(rc);
1482 }
1483
1484 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1485                             char *devname, struct lov_desc *desc)
1486 {
1487         struct mgs_thread_info  *mgi = mgs_env_info(env);
1488         struct llog_cfg_rec     *lcr;
1489         int rc;
1490
1491         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1492         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1493         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1494         if (lcr == NULL)
1495                 return -ENOMEM;
1496
1497         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1498         lustre_cfg_rec_free(lcr);
1499         return rc;
1500 }
1501
1502 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1503                             char *devname, struct lmv_desc *desc)
1504 {
1505         struct mgs_thread_info  *mgi = mgs_env_info(env);
1506         struct llog_cfg_rec     *lcr;
1507         int rc;
1508
1509         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1510         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1511         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1512         if (lcr == NULL)
1513                 return -ENOMEM;
1514
1515         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1516         lustre_cfg_rec_free(lcr);
1517         return rc;
1518 }
1519
1520 static inline int record_mdc_add(const struct lu_env *env,
1521                                  struct llog_handle *llh,
1522                                  char *logname, char *mdcuuid,
1523                                  char *mdtuuid, char *index,
1524                                  char *gen)
1525 {
1526         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1527                            mdtuuid,index,gen,mdcuuid);
1528 }
1529
1530 static inline int record_lov_add(const struct lu_env *env,
1531                                  struct llog_handle *llh,
1532                                  char *lov_name, char *ost_uuid,
1533                                  char *index, char *gen)
1534 {
1535         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1536                            ost_uuid, index, gen, NULL);
1537 }
1538
1539 static inline int record_mount_opt(const struct lu_env *env,
1540                                    struct llog_handle *llh,
1541                                    char *profile, char *lov_name,
1542                                    char *mdc_name)
1543 {
1544         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1545                            profile, lov_name, mdc_name, NULL);
1546 }
1547
1548 static int record_marker(const struct lu_env *env,
1549                          struct llog_handle *llh,
1550                          struct fs_db *fsdb, __u32 flags,
1551                          char *tgtname, char *comment)
1552 {
1553         struct mgs_thread_info *mgi = mgs_env_info(env);
1554         struct llog_cfg_rec *lcr;
1555         int rc;
1556         int cplen = 0;
1557
1558         if (flags & CM_START)
1559                 fsdb->fsdb_gen++;
1560         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1561         mgi->mgi_marker.cm_flags = flags;
1562         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1563         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1564                         sizeof(mgi->mgi_marker.cm_tgtname));
1565         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1566                 return -E2BIG;
1567         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1568                         sizeof(mgi->mgi_marker.cm_comment));
1569         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1570                 return -E2BIG;
1571         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1572         mgi->mgi_marker.cm_canceltime = 0;
1573         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1574         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1575                             sizeof(mgi->mgi_marker));
1576         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1577         if (lcr == NULL)
1578                 return -ENOMEM;
1579
1580         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1581         lustre_cfg_rec_free(lcr);
1582         return rc;
1583 }
1584
1585 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1586                             struct llog_handle **llh, char *name)
1587 {
1588         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1589         struct llog_ctxt        *ctxt;
1590         int                      rc = 0;
1591         ENTRY;
1592
1593         if (*llh)
1594                 GOTO(out, rc = -EBUSY);
1595
1596         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1597         if (!ctxt)
1598                 GOTO(out, rc = -ENODEV);
1599         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1600
1601         rc = llog_open_create(env, ctxt, llh, NULL, name);
1602         if (rc)
1603                 GOTO(out_ctxt, rc);
1604         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1605         if (rc)
1606                 llog_close(env, *llh);
1607 out_ctxt:
1608         llog_ctxt_put(ctxt);
1609 out:
1610         if (rc) {
1611                 CERROR("%s: can't start log %s: rc = %d\n",
1612                        mgs->mgs_obd->obd_name, name, rc);
1613                 *llh = NULL;
1614         }
1615         RETURN(rc);
1616 }
1617
1618 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1619 {
1620         int rc;
1621
1622         rc = llog_close(env, *llh);
1623         *llh = NULL;
1624
1625         return rc;
1626 }
1627
1628 /******************** config "macros" *********************/
1629
1630 /* write an lcfg directly into a log (with markers) */
1631 static int mgs_write_log_direct(const struct lu_env *env,
1632                                 struct mgs_device *mgs, struct fs_db *fsdb,
1633                                 char *logname, struct llog_cfg_rec *lcr,
1634                                 char *devname, char *comment)
1635 {
1636         struct llog_handle *llh = NULL;
1637         int rc;
1638
1639         ENTRY;
1640
1641         rc = record_start_log(env, mgs, &llh, logname);
1642         if (rc)
1643                 RETURN(rc);
1644
1645         /* FIXME These should be a single journal transaction */
1646         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1647         if (rc)
1648                 GOTO(out_end, rc);
1649         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1650         if (rc)
1651                 GOTO(out_end, rc);
1652         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1653         if (rc)
1654                 GOTO(out_end, rc);
1655 out_end:
1656         record_end_log(env, &llh);
1657         RETURN(rc);
1658 }
1659
1660 /* write the lcfg in all logs for the given fs */
1661 static int mgs_write_log_direct_all(const struct lu_env *env,
1662                                     struct mgs_device *mgs,
1663                                     struct fs_db *fsdb,
1664                                     struct mgs_target_info *mti,
1665                                     struct llog_cfg_rec *lcr, char *devname,
1666                                     char *comment, int server_only)
1667 {
1668         struct list_head         log_list;
1669         struct mgs_direntry     *dirent, *n;
1670         char                    *fsname = mti->mti_fsname;
1671         int                      rc = 0, len = strlen(fsname);
1672
1673         ENTRY;
1674         /* Find all the logs in the CONFIGS directory */
1675         rc = class_dentry_readdir(env, mgs, &log_list);
1676         if (rc)
1677                 RETURN(rc);
1678
1679         /* Could use fsdb index maps instead of directory listing */
1680         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1681                 list_del_init(&dirent->mde_list);
1682                 /* don't write to sptlrpc rule log */
1683                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1684                         goto next;
1685
1686                 /* caller wants write server logs only */
1687                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1688                         goto next;
1689
1690                 if (strlen(dirent->mde_name) <= len ||
1691                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1692                     dirent->mde_name[len] != '-')
1693                         goto next;
1694
1695                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1696                 /* Erase any old settings of this same parameter */
1697                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1698                                 devname, comment, CM_SKIP);
1699                 if (rc < 0)
1700                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1701                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1702                 if (lcr == NULL)
1703                         goto next;
1704                 /* Write the new one */
1705                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1706                                           lcr, devname, comment);
1707                 if (rc != 0)
1708                         CERROR("%s: writing log %s: rc = %d\n",
1709                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1710 next:
1711                 mgs_direntry_free(dirent);
1712         }
1713
1714         RETURN(rc);
1715 }
1716
1717 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1718                                     struct mgs_device *mgs,
1719                                     struct fs_db *fsdb,
1720                                     struct mgs_target_info *mti,
1721                                     int index, char *logname);
1722 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1723                                     struct mgs_device *mgs,
1724                                     struct fs_db *fsdb,
1725                                     struct mgs_target_info *mti,
1726                                     char *logname, char *suffix, char *lovname,
1727                                     enum lustre_sec_part sec_part, int flags);
1728 static int name_create_mdt_and_lov(char **logname, char **lovname,
1729                                    struct fs_db *fsdb, int i);
1730
1731 static int add_param(char *params, char *key, char *val)
1732 {
1733         char *start = params + strlen(params);
1734         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1735         int keylen = 0;
1736
1737         if (key != NULL)
1738                 keylen = strlen(key);
1739         if (start + 1 + keylen + strlen(val) >= end) {
1740                 CERROR("params are too long: %s %s%s\n",
1741                        params, key != NULL ? key : "", val);
1742                 return -EINVAL;
1743         }
1744
1745         sprintf(start, " %s%s", key != NULL ? key : "", val);
1746         return 0;
1747 }
1748
1749 /**
1750  * Walk through client config log record and convert the related records
1751  * into the target.
1752  **/
1753 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1754                                          struct llog_handle *llh,
1755                                          struct llog_rec_hdr *rec, void *data)
1756 {
1757         struct mgs_device *mgs;
1758         struct obd_device *obd;
1759         struct mgs_target_info *mti, *tmti;
1760         struct fs_db *fsdb;
1761         int cfg_len = rec->lrh_len;
1762         char *cfg_buf = (char*) (rec + 1);
1763         struct lustre_cfg *lcfg;
1764         int rc = 0;
1765         struct llog_handle *mdt_llh = NULL;
1766         static int got_an_osc_or_mdc = 0;
1767         /* 0: not found any osc/mdc;
1768            1: found osc;
1769            2: found mdc;
1770         */
1771         static int last_step = -1;
1772         int cplen = 0;
1773
1774         ENTRY;
1775
1776         mti = ((struct temp_comp*)data)->comp_mti;
1777         tmti = ((struct temp_comp*)data)->comp_tmti;
1778         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1779         obd = ((struct temp_comp *)data)->comp_obd;
1780         mgs = lu2mgs_dev(obd->obd_lu_dev);
1781         LASSERT(mgs);
1782
1783         if (rec->lrh_type != OBD_CFG_REC) {
1784                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1785                 RETURN(-EINVAL);
1786         }
1787
1788         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1789         if (rc) {
1790                 CERROR("Insane cfg\n");
1791                 RETURN(rc);
1792         }
1793
1794         lcfg = (struct lustre_cfg *)cfg_buf;
1795
1796         if (lcfg->lcfg_command == LCFG_MARKER) {
1797                 struct cfg_marker *marker;
1798                 marker = lustre_cfg_buf(lcfg, 1);
1799                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1800                     (marker->cm_flags & CM_START) &&
1801                      !(marker->cm_flags & CM_SKIP)) {
1802                         got_an_osc_or_mdc = 1;
1803                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1804                                         sizeof(tmti->mti_svname));
1805                         if (cplen >= sizeof(tmti->mti_svname))
1806                                 RETURN(-E2BIG);
1807                         rc = record_start_log(env, mgs, &mdt_llh,
1808                                               mti->mti_svname);
1809                         if (rc)
1810                                 RETURN(rc);
1811                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1812                                            mti->mti_svname, "add osc(copied)");
1813                         record_end_log(env, &mdt_llh);
1814                         last_step = marker->cm_step;
1815                         RETURN(rc);
1816                 }
1817                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1818                     (marker->cm_flags & CM_END) &&
1819                      !(marker->cm_flags & CM_SKIP)) {
1820                         LASSERT(last_step == marker->cm_step);
1821                         last_step = -1;
1822                         got_an_osc_or_mdc = 0;
1823                         memset(tmti, 0, sizeof(*tmti));
1824                         rc = record_start_log(env, mgs, &mdt_llh,
1825                                               mti->mti_svname);
1826                         if (rc)
1827                                 RETURN(rc);
1828                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1829                                            mti->mti_svname, "add osc(copied)");
1830                         record_end_log(env, &mdt_llh);
1831                         RETURN(rc);
1832                 }
1833                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1834                     (marker->cm_flags & CM_START) &&
1835                      !(marker->cm_flags & CM_SKIP)) {
1836                         got_an_osc_or_mdc = 2;
1837                         last_step = marker->cm_step;
1838                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1839                                strlen(marker->cm_tgtname));
1840
1841                         RETURN(rc);
1842                 }
1843                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1844                     (marker->cm_flags & CM_END) &&
1845                      !(marker->cm_flags & CM_SKIP)) {
1846                         LASSERT(last_step == marker->cm_step);
1847                         last_step = -1;
1848                         got_an_osc_or_mdc = 0;
1849                         memset(tmti, 0, sizeof(*tmti));
1850                         RETURN(rc);
1851                 }
1852         }
1853
1854         if (got_an_osc_or_mdc == 0 || last_step < 0)
1855                 RETURN(rc);
1856
1857         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1858                 __u64 nodenid = lcfg->lcfg_nid;
1859
1860                 if (strlen(tmti->mti_uuid) == 0) {
1861                         /* target uuid not set, this config record is before
1862                          * LCFG_SETUP, this nid is one of target node nid.
1863                          */
1864                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1865                         tmti->mti_nid_count++;
1866                 } else {
1867                         char nidstr[LNET_NIDSTR_SIZE];
1868
1869                         /* failover node nid */
1870                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1871                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1872                                         nidstr);
1873                 }
1874
1875                 RETURN(rc);
1876         }
1877
1878         if (lcfg->lcfg_command == LCFG_SETUP) {
1879                 char *target;
1880
1881                 target = lustre_cfg_string(lcfg, 1);
1882                 memcpy(tmti->mti_uuid, target, strlen(target));
1883                 RETURN(rc);
1884         }
1885
1886         /* ignore client side sptlrpc_conf_log */
1887         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1888                 RETURN(rc);
1889
1890         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1891                 int index;
1892
1893                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1894                         RETURN (-EINVAL);
1895
1896                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1897                        strlen(mti->mti_fsname));
1898                 tmti->mti_stripe_index = index;
1899
1900                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1901                                               mti->mti_stripe_index,
1902                                               mti->mti_svname);
1903                 memset(tmti, 0, sizeof(*tmti));
1904                 RETURN(rc);
1905         }
1906
1907         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1908                 int index;
1909                 char mdt_index[9];
1910                 char *logname, *lovname;
1911
1912                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1913                                              mti->mti_stripe_index);
1914                 if (rc)
1915                         RETURN(rc);
1916                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1917
1918                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1919                         name_destroy(&logname);
1920                         name_destroy(&lovname);
1921                         RETURN(-EINVAL);
1922                 }
1923
1924                 tmti->mti_stripe_index = index;
1925                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1926                                          mdt_index, lovname,
1927                                          LUSTRE_SP_MDT, 0);
1928                 name_destroy(&logname);
1929                 name_destroy(&lovname);
1930                 RETURN(rc);
1931         }
1932         RETURN(rc);
1933 }
1934
1935 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1936 /* stealed from mgs_get_fsdb_from_llog*/
1937 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1938                                               struct mgs_device *mgs,
1939                                               char *client_name,
1940                                               struct temp_comp* comp)
1941 {
1942         struct llog_handle *loghandle;
1943         struct mgs_target_info *tmti;
1944         struct llog_ctxt *ctxt;
1945         int rc;
1946
1947         ENTRY;
1948
1949         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1950         LASSERT(ctxt != NULL);
1951
1952         OBD_ALLOC_PTR(tmti);
1953         if (tmti == NULL)
1954                 GOTO(out_ctxt, rc = -ENOMEM);
1955
1956         comp->comp_tmti = tmti;
1957         comp->comp_obd = mgs->mgs_obd;
1958
1959         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1960                        LLOG_OPEN_EXISTS);
1961         if (rc < 0) {
1962                 if (rc == -ENOENT)
1963                         rc = 0;
1964                 GOTO(out_pop, rc);
1965         }
1966
1967         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1968         if (rc)
1969                 GOTO(out_close, rc);
1970
1971         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1972                                   (void *)comp, NULL, false);
1973         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1974 out_close:
1975         llog_close(env, loghandle);
1976 out_pop:
1977         OBD_FREE_PTR(tmti);
1978 out_ctxt:
1979         llog_ctxt_put(ctxt);
1980         RETURN(rc);
1981 }
1982
1983 /* lmv is the second thing for client logs */
1984 /* copied from mgs_write_log_lov. Please refer to that.  */
1985 static int mgs_write_log_lmv(const struct lu_env *env,
1986                              struct mgs_device *mgs,
1987                              struct fs_db *fsdb,
1988                              struct mgs_target_info *mti,
1989                              char *logname, char *lmvname)
1990 {
1991         struct llog_handle *llh = NULL;
1992         struct lmv_desc *lmvdesc;
1993         char *uuid;
1994         int rc = 0;
1995         ENTRY;
1996
1997         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1998
1999         OBD_ALLOC_PTR(lmvdesc);
2000         if (lmvdesc == NULL)
2001                 RETURN(-ENOMEM);
2002         lmvdesc->ld_active_tgt_count = 0;
2003         lmvdesc->ld_tgt_count = 0;
2004         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
2005         uuid = (char *)lmvdesc->ld_uuid.uuid;
2006
2007         rc = record_start_log(env, mgs, &llh, logname);
2008         if (rc)
2009                 GOTO(out_free, rc);
2010         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
2011         if (rc)
2012                 GOTO(out_end, rc);
2013         rc = record_attach(env, llh, lmvname, "lmv", uuid);
2014         if (rc)
2015                 GOTO(out_end, rc);
2016         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
2017         if (rc)
2018                 GOTO(out_end, rc);
2019         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
2020         if (rc)
2021                 GOTO(out_end, rc);
2022 out_end:
2023         record_end_log(env, &llh);
2024 out_free:
2025         OBD_FREE_PTR(lmvdesc);
2026         RETURN(rc);
2027 }
2028
2029 /* lov is the first thing in the mdt and client logs */
2030 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
2031                              struct fs_db *fsdb, struct mgs_target_info *mti,
2032                              char *logname, char *lovname)
2033 {
2034         struct llog_handle *llh = NULL;
2035         struct lov_desc *lovdesc;
2036         char *uuid;
2037         int rc = 0;
2038         ENTRY;
2039
2040         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
2041
2042         /*
2043         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
2044         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
2045               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
2046         */
2047
2048         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
2049         OBD_ALLOC_PTR(lovdesc);
2050         if (lovdesc == NULL)
2051                 RETURN(-ENOMEM);
2052         lovdesc->ld_magic = LOV_DESC_MAGIC;
2053         lovdesc->ld_tgt_count = 0;
2054         /* Defaults.  Can be changed later by lcfg config_param */
2055         lovdesc->ld_default_stripe_count = 1;
2056         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
2057         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
2058         lovdesc->ld_default_stripe_offset = -1;
2059         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
2060         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
2061         /* can these be the same? */
2062         uuid = (char *)lovdesc->ld_uuid.uuid;
2063
2064         /* This should always be the first entry in a log.
2065         rc = mgs_clear_log(obd, logname); */
2066         rc = record_start_log(env, mgs, &llh, logname);
2067         if (rc)
2068                 GOTO(out_free, rc);
2069         /* FIXME these should be a single journal transaction */
2070         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
2071         if (rc)
2072                 GOTO(out_end, rc);
2073         rc = record_attach(env, llh, lovname, "lov", uuid);
2074         if (rc)
2075                 GOTO(out_end, rc);
2076         rc = record_lov_setup(env, llh, lovname, lovdesc);
2077         if (rc)
2078                 GOTO(out_end, rc);
2079         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
2080         if (rc)
2081                 GOTO(out_end, rc);
2082         EXIT;
2083 out_end:
2084         record_end_log(env, &llh);
2085 out_free:
2086         OBD_FREE_PTR(lovdesc);
2087         return rc;
2088 }
2089
2090 /* add failnids to open log */
2091 static int mgs_write_log_failnids(const struct lu_env *env,
2092                                   struct mgs_target_info *mti,
2093                                   struct llog_handle *llh,
2094                                   char *cliname)
2095 {
2096         char *failnodeuuid = NULL;
2097         char *ptr = mti->mti_params;
2098         lnet_nid_t nid;
2099         int rc = 0;
2100
2101         /*
2102         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
2103         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
2104         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
2105         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
2106         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
2107         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
2108         */
2109
2110         /*
2111          * Pull failnid info out of params string, which may contain something
2112          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
2113          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
2114          * etc.  However, convert_hostnames() should have caught those.
2115          */
2116         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2117                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2118                         char nidstr[LNET_NIDSTR_SIZE];
2119
2120                         if (failnodeuuid == NULL) {
2121                                 /* We don't know the failover node name,
2122                                  * so just use the first nid as the uuid */
2123                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
2124                                 rc = name_create(&failnodeuuid, nidstr, "");
2125                                 if (rc != 0)
2126                                         return rc;
2127                         }
2128                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2129                                 "client %s\n",
2130                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
2131                                 failnodeuuid, cliname);
2132                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2133                         /*
2134                          * If *ptr is ':', we have added all NIDs for
2135                          * failnodeuuid.
2136                          */
2137                         if (*ptr == ':') {
2138                                 rc = record_add_conn(env, llh, cliname,
2139                                                      failnodeuuid);
2140                                 name_destroy(&failnodeuuid);
2141                                 failnodeuuid = NULL;
2142                         }
2143                 }
2144                 if (failnodeuuid) {
2145                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2146                         name_destroy(&failnodeuuid);
2147                         failnodeuuid = NULL;
2148                 }
2149         }
2150
2151         return rc;
2152 }
2153
2154 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2155                                     struct mgs_device *mgs,
2156                                     struct fs_db *fsdb,
2157                                     struct mgs_target_info *mti,
2158                                     char *logname, char *lmvname)
2159 {
2160         struct llog_handle *llh = NULL;
2161         char *mdcname = NULL;
2162         char *nodeuuid = NULL;
2163         char *mdcuuid = NULL;
2164         char *lmvuuid = NULL;
2165         char index[6];
2166         char nidstr[LNET_NIDSTR_SIZE];
2167         int i, rc;
2168         ENTRY;
2169
2170         if (mgs_log_is_empty(env, mgs, logname)) {
2171                 CERROR("log is empty! Logical error\n");
2172                 RETURN(-EINVAL);
2173         }
2174
2175         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2176                mti->mti_svname, logname, lmvname);
2177
2178         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2179         rc = name_create(&nodeuuid, nidstr, "");
2180         if (rc)
2181                 RETURN(rc);
2182         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2183         if (rc)
2184                 GOTO(out_free, rc);
2185         rc = name_create(&mdcuuid, mdcname, "_UUID");
2186         if (rc)
2187                 GOTO(out_free, rc);
2188         rc = name_create(&lmvuuid, lmvname, "_UUID");
2189         if (rc)
2190                 GOTO(out_free, rc);
2191
2192         rc = record_start_log(env, mgs, &llh, logname);
2193         if (rc)
2194                 GOTO(out_free, rc);
2195         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2196                            "add mdc");
2197         if (rc)
2198                 GOTO(out_end, rc);
2199         for (i = 0; i < mti->mti_nid_count; i++) {
2200                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2201                         libcfs_nid2str_r(mti->mti_nids[i],
2202                                          nidstr, sizeof(nidstr)));
2203
2204                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2205                 if (rc)
2206                         GOTO(out_end, rc);
2207         }
2208
2209         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2210         if (rc)
2211                 GOTO(out_end, rc);
2212         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2213                           NULL, NULL);
2214         if (rc)
2215                 GOTO(out_end, rc);
2216         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2217         if (rc)
2218                 GOTO(out_end, rc);
2219         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2220         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2221                             index, "1");
2222         if (rc)
2223                 GOTO(out_end, rc);
2224         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2225                            "add mdc");
2226         if (rc)
2227                 GOTO(out_end, rc);
2228 out_end:
2229         record_end_log(env, &llh);
2230 out_free:
2231         name_destroy(&lmvuuid);
2232         name_destroy(&mdcuuid);
2233         name_destroy(&mdcname);
2234         name_destroy(&nodeuuid);
2235         RETURN(rc);
2236 }
2237
2238 static inline int name_create_lov(char **lovname, char *mdtname,
2239                                   struct fs_db *fsdb, int index)
2240 {
2241         /* COMPAT_180 */
2242         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2243                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2244         else
2245                 return name_create(lovname, mdtname, "-mdtlov");
2246 }
2247
2248 static int name_create_mdt_and_lov(char **logname, char **lovname,
2249                                    struct fs_db *fsdb, int i)
2250 {
2251         int rc;
2252
2253         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2254         if (rc)
2255                 return rc;
2256         /* COMPAT_180 */
2257         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2258                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2259         else
2260                 rc = name_create(lovname, *logname, "-mdtlov");
2261         if (rc) {
2262                 name_destroy(logname);
2263                 *logname = NULL;
2264         }
2265         return rc;
2266 }
2267
2268 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2269                                       struct fs_db *fsdb, int i)
2270 {
2271         char suffix[16];
2272
2273         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2274                 sprintf(suffix, "-osc");
2275         else
2276                 sprintf(suffix, "-osc-MDT%04x", i);
2277         return name_create(oscname, ostname, suffix);
2278 }
2279
2280 /* add new mdc to already existent MDS */
2281 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2282                                     struct mgs_device *mgs,
2283                                     struct fs_db *fsdb,
2284                                     struct mgs_target_info *mti,
2285                                     int mdt_index, char *logname)
2286 {
2287         struct llog_handle      *llh = NULL;
2288         char    *nodeuuid = NULL;
2289         char    *ospname = NULL;
2290         char    *lovuuid = NULL;
2291         char    *mdtuuid = NULL;
2292         char    *svname = NULL;
2293         char    *mdtname = NULL;
2294         char    *lovname = NULL;
2295         char    index_str[16];
2296         char    nidstr[LNET_NIDSTR_SIZE];
2297         int     i, rc;
2298
2299         ENTRY;
2300         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2301                 CERROR("log is empty! Logical error\n");
2302                 RETURN (-EINVAL);
2303         }
2304
2305         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2306                logname);
2307
2308         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2309         if (rc)
2310                 RETURN(rc);
2311
2312         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2313         rc = name_create(&nodeuuid, nidstr, "");
2314         if (rc)
2315                 GOTO(out_destory, rc);
2316
2317         rc = name_create(&svname, mdtname, "-osp");
2318         if (rc)
2319                 GOTO(out_destory, rc);
2320
2321         sprintf(index_str, "-MDT%04x", mdt_index);
2322         rc = name_create(&ospname, svname, index_str);
2323         if (rc)
2324                 GOTO(out_destory, rc);
2325
2326         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2327         if (rc)
2328                 GOTO(out_destory, rc);
2329
2330         rc = name_create(&lovuuid, lovname, "_UUID");
2331         if (rc)
2332                 GOTO(out_destory, rc);
2333
2334         rc = name_create(&mdtuuid, mdtname, "_UUID");
2335         if (rc)
2336                 GOTO(out_destory, rc);
2337
2338         rc = record_start_log(env, mgs, &llh, logname);
2339         if (rc)
2340                 GOTO(out_destory, rc);
2341
2342         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2343                            "add osp");
2344         if (rc)
2345                 GOTO(out_destory, rc);
2346
2347         for (i = 0; i < mti->mti_nid_count; i++) {
2348                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2349                         libcfs_nid2str_r(mti->mti_nids[i],
2350                                          nidstr, sizeof(nidstr)));
2351                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2352                 if (rc)
2353                         GOTO(out_end, rc);
2354         }
2355
2356         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2357         if (rc)
2358                 GOTO(out_end, rc);
2359
2360         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2361                           NULL, NULL);
2362         if (rc)
2363                 GOTO(out_end, rc);
2364
2365         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2366         if (rc)
2367                 GOTO(out_end, rc);
2368
2369         /* Add mdc(osp) to lod */
2370         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2371         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2372                          index_str, "1", NULL);
2373         if (rc)
2374                 GOTO(out_end, rc);
2375
2376         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2377         if (rc)
2378                 GOTO(out_end, rc);
2379
2380 out_end:
2381         record_end_log(env, &llh);
2382
2383 out_destory:
2384         name_destroy(&mdtuuid);
2385         name_destroy(&lovuuid);
2386         name_destroy(&lovname);
2387         name_destroy(&ospname);
2388         name_destroy(&svname);
2389         name_destroy(&nodeuuid);
2390         name_destroy(&mdtname);
2391         RETURN(rc);
2392 }
2393
2394 static int mgs_write_log_mdt0(const struct lu_env *env,
2395                               struct mgs_device *mgs,
2396                               struct fs_db *fsdb,
2397                               struct mgs_target_info *mti)
2398 {
2399         char *log = mti->mti_svname;
2400         struct llog_handle *llh = NULL;
2401         char *uuid, *lovname;
2402         char mdt_index[6];
2403         char *ptr = mti->mti_params;
2404         int rc = 0, failout = 0;
2405         ENTRY;
2406
2407         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2408         if (uuid == NULL)
2409                 RETURN(-ENOMEM);
2410
2411         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2412                 failout = (strncmp(ptr, "failout", 7) == 0);
2413
2414         rc = name_create(&lovname, log, "-mdtlov");
2415         if (rc)
2416                 GOTO(out_free, rc);
2417         if (mgs_log_is_empty(env, mgs, log)) {
2418                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2419                 if (rc)
2420                         GOTO(out_lod, rc);
2421         }
2422
2423         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2424
2425         rc = record_start_log(env, mgs, &llh, log);
2426         if (rc)
2427                 GOTO(out_lod, rc);
2428
2429         /* add MDT itself */
2430
2431         /* FIXME this whole fn should be a single journal transaction */
2432         sprintf(uuid, "%s_UUID", log);
2433         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2434         if (rc)
2435                 GOTO(out_lod, rc);
2436         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2437         if (rc)
2438                 GOTO(out_end, rc);
2439         rc = record_mount_opt(env, llh, log, lovname, NULL);
2440         if (rc)
2441                 GOTO(out_end, rc);
2442         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2443                         failout ? "n" : "f");
2444         if (rc)
2445                 GOTO(out_end, rc);
2446         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2447         if (rc)
2448                 GOTO(out_end, rc);
2449 out_end:
2450         record_end_log(env, &llh);
2451 out_lod:
2452         name_destroy(&lovname);
2453 out_free:
2454         OBD_FREE(uuid, sizeof(struct obd_uuid));
2455         RETURN(rc);
2456 }
2457
2458 /* envelope method for all layers log */
2459 static int mgs_write_log_mdt(const struct lu_env *env,
2460                              struct mgs_device *mgs,
2461                              struct fs_db *fsdb,
2462                              struct mgs_target_info *mti)
2463 {
2464         struct mgs_thread_info *mgi = mgs_env_info(env);
2465         struct llog_handle *llh = NULL;
2466         char *cliname;
2467         int rc, i = 0;
2468         ENTRY;
2469
2470         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2471
2472         if (mti->mti_uuid[0] == '\0') {
2473                 /* Make up our own uuid */
2474                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2475                          "%s_UUID", mti->mti_svname);
2476         }
2477
2478         /* add mdt */
2479         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2480         if (rc)
2481                 RETURN(rc);
2482         /* Append the mdt info to the client log */
2483         rc = name_create(&cliname, mti->mti_fsname, "-client");
2484         if (rc)
2485                 RETURN(rc);
2486
2487         if (mgs_log_is_empty(env, mgs, cliname)) {
2488                 /* Start client log */
2489                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2490                                        fsdb->fsdb_clilov);
2491                 if (rc)
2492                         GOTO(out_free, rc);
2493                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2494                                        fsdb->fsdb_clilmv);
2495                 if (rc)
2496                         GOTO(out_free, rc);
2497         }
2498
2499         /*
2500         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2501         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2502         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2503         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2504         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2505         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2506         */
2507
2508         /* copy client info about lov/lmv */
2509         mgi->mgi_comp.comp_mti = mti;
2510         mgi->mgi_comp.comp_fsdb = fsdb;
2511
2512         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2513                                                 &mgi->mgi_comp);
2514         if (rc)
2515                 GOTO(out_free, rc);
2516         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2517                                       fsdb->fsdb_clilmv);
2518         if (rc)
2519                 GOTO(out_free, rc);
2520
2521         /* add mountopts */
2522         rc = record_start_log(env, mgs, &llh, cliname);
2523         if (rc)
2524                 GOTO(out_free, rc);
2525
2526         rc = record_marker(env, llh, fsdb, CM_START, cliname,
2527                            "mount opts");
2528         if (rc)
2529                 GOTO(out_end, rc);
2530         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2531                               fsdb->fsdb_clilmv);
2532         if (rc)
2533                 GOTO(out_end, rc);
2534         rc = record_marker(env, llh, fsdb, CM_END, cliname,
2535                            "mount opts");
2536
2537         if (rc)
2538                 GOTO(out_end, rc);
2539
2540         /* for_all_existing_mdt except current one */
2541         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2542                 if (i !=  mti->mti_stripe_index &&
2543                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2544                         char *logname;
2545
2546                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2547                         if (rc)
2548                                 GOTO(out_end, rc);
2549
2550                         /* NB: If the log for the MDT is empty, it means
2551                          * the MDT is only added to the index
2552                          * map, and not being process yet, i.e. this
2553                          * is an unregistered MDT, see mgs_write_log_target().
2554                          * so we should skip it. Otherwise
2555                          *
2556                          * 1. MGS get register request for MDT1 and MDT2.
2557                          *
2558                          * 2. Then both MDT1 and MDT2 are added into
2559                          * fsdb_mdt_index_map. (see mgs_set_index()).
2560                          *
2561                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2562                          * generate the config log, here, it will regard MDT2
2563                          * as an existent MDT, and generate "add osp" for
2564                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2565                          * MDT0002 config log is still empty, so it will
2566                          * add "add osp" even before "lov setup", which
2567                          * will definitly cause trouble.
2568                          *
2569                          * 4. MDT1 registeration finished, fsdb_mutex is
2570                          * released, then MDT2 get in, then in above
2571                          * mgs_steal_llog_for_mdt_from_client(), it will
2572                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2573                          * which will cause another trouble.*/
2574                         if (!mgs_log_is_empty(env, mgs, logname))
2575                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2576                                                               mti, i, logname);
2577
2578                         name_destroy(&logname);
2579                         if (rc)
2580                                 GOTO(out_end, rc);
2581                 }
2582         }
2583 out_end:
2584         record_end_log(env, &llh);
2585 out_free:
2586         name_destroy(&cliname);
2587         RETURN(rc);
2588 }
2589
2590 /* Add the ost info to the client/mdt lov */
2591 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2592                                     struct mgs_device *mgs, struct fs_db *fsdb,
2593                                     struct mgs_target_info *mti,
2594                                     char *logname, char *suffix, char *lovname,
2595                                     enum lustre_sec_part sec_part, int flags)
2596 {
2597         struct llog_handle *llh = NULL;
2598         char *nodeuuid = NULL;
2599         char *oscname = NULL;
2600         char *oscuuid = NULL;
2601         char *lovuuid = NULL;
2602         char *svname = NULL;
2603         char index[6];
2604         char nidstr[LNET_NIDSTR_SIZE];
2605         int i, rc;
2606         ENTRY;
2607
2608         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2609                 mti->mti_svname, logname);
2610
2611         if (mgs_log_is_empty(env, mgs, logname)) {
2612                 CERROR("log is empty! Logical error\n");
2613                 RETURN(-EINVAL);
2614         }
2615
2616         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2617         rc = name_create(&nodeuuid, nidstr, "");
2618         if (rc)
2619                 RETURN(rc);
2620         rc = name_create(&svname, mti->mti_svname, "-osc");
2621         if (rc)
2622                 GOTO(out_free, rc);
2623
2624         /* for the system upgraded from old 1.8, keep using the old osc naming
2625          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2626         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2627                 rc = name_create(&oscname, svname, "");
2628         else
2629                 rc = name_create(&oscname, svname, suffix);
2630         if (rc)
2631                 GOTO(out_free, rc);
2632
2633         rc = name_create(&oscuuid, oscname, "_UUID");
2634         if (rc)
2635                 GOTO(out_free, rc);
2636         rc = name_create(&lovuuid, lovname, "_UUID");
2637         if (rc)
2638                 GOTO(out_free, rc);
2639
2640
2641         /*
2642         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2643         multihomed (#4)
2644         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2645         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2646         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2647         failover (#6,7)
2648         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2649         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2650         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2651         */
2652
2653         rc = record_start_log(env, mgs, &llh, logname);
2654         if (rc)
2655                 GOTO(out_free, rc);
2656
2657         /* FIXME these should be a single journal transaction */
2658         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2659                            "add osc");
2660         if (rc)
2661                 GOTO(out_end, rc);
2662
2663         /* NB: don't change record order, because upon MDT steal OSC config
2664          * from client, it treats all nids before LCFG_SETUP as target nids
2665          * (multiple interfaces), while nids after as failover node nids.
2666          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2667          */
2668         for (i = 0; i < mti->mti_nid_count; i++) {
2669                 CDEBUG(D_MGS, "add nid %s\n",
2670                         libcfs_nid2str_r(mti->mti_nids[i],
2671                                          nidstr, sizeof(nidstr)));
2672                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2673                 if (rc)
2674                         GOTO(out_end, rc);
2675         }
2676         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2677         if (rc)
2678                 GOTO(out_end, rc);
2679         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2680                           NULL, NULL);
2681         if (rc)
2682                 GOTO(out_end, rc);
2683         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2684         if (rc)
2685                 GOTO(out_end, rc);
2686
2687         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2688
2689         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2690         if (rc)
2691                 GOTO(out_end, rc);
2692         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2693                            "add osc");
2694         if (rc)
2695                 GOTO(out_end, rc);
2696 out_end:
2697         record_end_log(env, &llh);
2698 out_free:
2699         name_destroy(&lovuuid);
2700         name_destroy(&oscuuid);
2701         name_destroy(&oscname);
2702         name_destroy(&svname);
2703         name_destroy(&nodeuuid);
2704         RETURN(rc);
2705 }
2706
2707 static int mgs_write_log_ost(const struct lu_env *env,
2708                              struct mgs_device *mgs, struct fs_db *fsdb,
2709                              struct mgs_target_info *mti)
2710 {
2711         struct llog_handle *llh = NULL;
2712         char *logname, *lovname;
2713         char *ptr = mti->mti_params;
2714         int rc, flags = 0, failout = 0, i;
2715         ENTRY;
2716
2717         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2718
2719         /* The ost startup log */
2720
2721         /* If the ost log already exists, that means that someone reformatted
2722            the ost and it called target_add again. */
2723         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2724                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2725                                    "exists, yet the server claims it never "
2726                                    "registered. It may have been reformatted, "
2727                                    "or the index changed. writeconf the MDT to "
2728                                    "regenerate all logs.\n", mti->mti_svname);
2729                 RETURN(-EALREADY);
2730         }
2731
2732         /*
2733         attach obdfilter ost1 ost1_UUID
2734         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2735         */
2736         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2737                 failout = (strncmp(ptr, "failout", 7) == 0);
2738         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2739         if (rc)
2740                 RETURN(rc);
2741         /* FIXME these should be a single journal transaction */
2742         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2743         if (rc)
2744                 GOTO(out_end, rc);
2745         if (*mti->mti_uuid == '\0')
2746                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2747                          "%s_UUID", mti->mti_svname);
2748         rc = record_attach(env, llh, mti->mti_svname,
2749                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2750         if (rc)
2751                 GOTO(out_end, rc);
2752         rc = record_setup(env, llh, mti->mti_svname,
2753                           "dev"/*ignored*/, "type"/*ignored*/,
2754                           failout ? "n" : "f", NULL/*options*/);
2755         if (rc)
2756                 GOTO(out_end, rc);
2757         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2758         if (rc)
2759                 GOTO(out_end, rc);
2760 out_end:
2761         record_end_log(env, &llh);
2762         if (rc)
2763                 RETURN(rc);
2764         /* We also have to update the other logs where this osc is part of
2765            the lov */
2766
2767         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2768                 /* If we're upgrading, the old mdt log already has our
2769                    entry. Let's do a fake one for fun. */
2770                 /* Note that we can't add any new failnids, since we don't
2771                    know the old osc names. */
2772                 flags = CM_SKIP | CM_UPGRADE146;
2773
2774         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2775                 /* If the update flag isn't set, don't update client/mdt
2776                    logs. */
2777                 flags |= CM_SKIP;
2778                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2779                               "the MDT first to regenerate it.\n",
2780                               mti->mti_svname);
2781         }
2782
2783         /* Add ost to all MDT lov defs */
2784         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2785                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2786                         char mdt_index[9];
2787
2788                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2789                                                      i);
2790                         if (rc)
2791                                 RETURN(rc);
2792                         sprintf(mdt_index, "-MDT%04x", i);
2793                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2794                                                       logname, mdt_index,
2795                                                       lovname, LUSTRE_SP_MDT,
2796                                                       flags);
2797                         name_destroy(&logname);
2798                         name_destroy(&lovname);
2799                         if (rc)
2800                                 RETURN(rc);
2801                 }
2802         }
2803
2804         /* Append ost info to the client log */
2805         rc = name_create(&logname, mti->mti_fsname, "-client");
2806         if (rc)
2807                 RETURN(rc);
2808         if (mgs_log_is_empty(env, mgs, logname)) {
2809                 /* Start client log */
2810                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2811                                        fsdb->fsdb_clilov);
2812                 if (rc)
2813                         GOTO(out_free, rc);
2814                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2815                                        fsdb->fsdb_clilmv);
2816                 if (rc)
2817                         GOTO(out_free, rc);
2818         }
2819         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2820                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
2821 out_free:
2822         name_destroy(&logname);
2823         RETURN(rc);
2824 }
2825
2826 static __inline__ int mgs_param_empty(char *ptr)
2827 {
2828         char *tmp;
2829
2830         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2831                 return 1;
2832         return 0;
2833 }
2834
2835 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2836                                           struct mgs_device *mgs,
2837                                           struct fs_db *fsdb,
2838                                           struct mgs_target_info *mti,
2839                                           char *logname, char *cliname)
2840 {
2841         int rc;
2842         struct llog_handle *llh = NULL;
2843
2844         if (mgs_param_empty(mti->mti_params)) {
2845                 /* Remove _all_ failnids */
2846                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2847                                 mti->mti_svname, "add failnid", CM_SKIP);
2848                 return rc < 0 ? rc : 0;
2849         }
2850
2851         /* Otherwise failover nids are additive */
2852         rc = record_start_log(env, mgs, &llh, logname);
2853         if (rc)
2854                 return rc;
2855                 /* FIXME this should be a single journal transaction */
2856         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2857                            "add failnid");
2858         if (rc)
2859                 goto out_end;
2860         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2861         if (rc)
2862                 goto out_end;
2863         rc = record_marker(env, llh, fsdb, CM_END,
2864                            mti->mti_svname, "add failnid");
2865 out_end:
2866         record_end_log(env, &llh);
2867         return rc;
2868 }
2869
2870
2871 /* Add additional failnids to an existing log.
2872    The mdc/osc must have been added to logs first */
2873 /* tcp nids must be in dotted-quad ascii -
2874    we can't resolve hostnames from the kernel. */
2875 static int mgs_write_log_add_failnid(const struct lu_env *env,
2876                                      struct mgs_device *mgs,
2877                                      struct fs_db *fsdb,
2878                                      struct mgs_target_info *mti)
2879 {
2880         char *logname, *cliname;
2881         int rc;
2882         ENTRY;
2883
2884         /* FIXME we currently can't erase the failnids
2885          * given when a target first registers, since they aren't part of
2886          * an "add uuid" stanza */
2887
2888         /* Verify that we know about this target */
2889         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2890                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2891                                    "yet. It must be started before failnids "
2892                                    "can be added.\n", mti->mti_svname);
2893                 RETURN(-ENOENT);
2894         }
2895
2896         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2897         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2898                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2899         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2900                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2901         } else {
2902                 RETURN(-EINVAL);
2903         }
2904         if (rc)
2905                 RETURN(rc);
2906         /* Add failover nids to the client log */
2907         rc = name_create(&logname, mti->mti_fsname, "-client");
2908         if (rc) {
2909                 name_destroy(&cliname);
2910                 RETURN(rc);
2911         }
2912         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2913         name_destroy(&logname);
2914         name_destroy(&cliname);
2915         if (rc)
2916                 RETURN(rc);
2917
2918         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2919                 /* Add OST failover nids to the MDT logs as well */
2920                 int i;
2921
2922                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2923                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2924                                 continue;
2925                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2926                         if (rc)
2927                                 RETURN(rc);
2928                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2929                                                  fsdb, i);
2930                         if (rc) {
2931                                 name_destroy(&logname);
2932                                 RETURN(rc);
2933                         }
2934                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2935                                                             mti, logname,
2936                                                             cliname);
2937                         name_destroy(&cliname);
2938                         name_destroy(&logname);
2939                         if (rc)
2940                                 RETURN(rc);
2941                 }
2942         }
2943
2944         RETURN(rc);
2945 }
2946
2947 static int mgs_wlp_lcfg(const struct lu_env *env,
2948                         struct mgs_device *mgs, struct fs_db *fsdb,
2949                         struct mgs_target_info *mti,
2950                         char *logname, struct lustre_cfg_bufs *bufs,
2951                         char *tgtname, char *ptr)
2952 {
2953         char comment[MTI_NAME_MAXLEN];
2954         char *tmp;
2955         struct llog_cfg_rec *lcr;
2956         int rc, del;
2957
2958         /* Erase any old settings of this same parameter */
2959         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2960         comment[MTI_NAME_MAXLEN - 1] = 0;
2961         /* But don't try to match the value. */
2962         tmp = strchr(comment, '=');
2963         if (tmp != NULL)
2964                 *tmp = 0;
2965         /* FIXME we should skip settings that are the same as old values */
2966         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2967         if (rc < 0)
2968                 return rc;
2969         del = mgs_param_empty(ptr);
2970
2971         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2972                       "Setting" : "Modifying", tgtname, comment, logname);
2973         if (del) {
2974                 /* mgs_modify() will return 1 if nothing had to be done */
2975                 if (rc == 1)
2976                         rc = 0;
2977                 return rc;
2978         }
2979
2980         lustre_cfg_bufs_reset(bufs, tgtname);
2981         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2982         if (mti->mti_flags & LDD_F_PARAM2)
2983                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2984
2985         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2986                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2987         if (lcr == NULL)
2988                 return -ENOMEM;
2989
2990         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2991                                   comment);
2992         lustre_cfg_rec_free(lcr);
2993         return rc;
2994 }
2995
2996 static int mgs_write_log_param2(const struct lu_env *env,
2997                                 struct mgs_device *mgs,
2998                                 struct fs_db *fsdb,
2999                                 struct mgs_target_info *mti, char *ptr)
3000 {
3001         struct lustre_cfg_bufs  bufs;
3002         int                     rc = 0;
3003         ENTRY;
3004
3005         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3006         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
3007                           mti->mti_svname, ptr);
3008
3009         RETURN(rc);
3010 }
3011
3012 /* write global variable settings into log */
3013 static int mgs_write_log_sys(const struct lu_env *env,
3014                              struct mgs_device *mgs, struct fs_db *fsdb,
3015                              struct mgs_target_info *mti, char *sys, char *ptr)
3016 {
3017         struct mgs_thread_info  *mgi = mgs_env_info(env);
3018         struct lustre_cfg       *lcfg;
3019         struct llog_cfg_rec     *lcr;
3020         char *tmp, sep;
3021         int rc, cmd, convert = 1;
3022
3023         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
3024                 cmd = LCFG_SET_TIMEOUT;
3025         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
3026                 cmd = LCFG_SET_LDLM_TIMEOUT;
3027         /* Check for known params here so we can return error to lctl */
3028         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
3029                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
3030                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
3031                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
3032                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
3033                 cmd = LCFG_PARAM;
3034         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
3035                 convert = 0; /* Don't convert string value to integer */
3036                 cmd = LCFG_PARAM;
3037         } else {
3038                 return -EINVAL;
3039         }
3040
3041         if (mgs_param_empty(ptr))
3042                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
3043         else
3044                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
3045
3046         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
3047         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
3048         if (!convert && *tmp != '\0')
3049                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
3050         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3051         if (lcr == NULL)
3052                 return -ENOMEM;
3053
3054         lcfg = &lcr->lcr_cfg;
3055         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
3056         /* truncate the comment to the parameter name */
3057         ptr = tmp - 1;
3058         sep = *ptr;
3059         *ptr = '\0';
3060         /* modify all servers and clients */
3061         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3062                                       *tmp == '\0' ? NULL : lcr,
3063                                       mti->mti_fsname, sys, 0);
3064         if (rc == 0 && *tmp != '\0') {
3065                 switch (cmd) {
3066                 case LCFG_SET_TIMEOUT:
3067                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
3068                                 class_process_config(lcfg);
3069                         break;
3070                 case LCFG_SET_LDLM_TIMEOUT:
3071                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
3072                                 class_process_config(lcfg);
3073                         break;
3074                 default:
3075                         break;
3076                 }
3077         }
3078         *ptr = sep;
3079         lustre_cfg_rec_free(lcr);
3080         return rc;
3081 }
3082
3083 /* write quota settings into log */
3084 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
3085                                struct fs_db *fsdb, struct mgs_target_info *mti,
3086                                char *quota, char *ptr)
3087 {
3088         struct mgs_thread_info  *mgi = mgs_env_info(env);
3089         struct llog_cfg_rec     *lcr;
3090         char                    *tmp;
3091         char                     sep;
3092         int                      rc, cmd = LCFG_PARAM;
3093
3094         /* support only 'meta' and 'data' pools so far */
3095         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
3096             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
3097                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
3098                        "& quota.ost are)\n", ptr);
3099                 return -EINVAL;
3100         }
3101
3102         if (*tmp == '\0') {
3103                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
3104         } else {
3105                 CDEBUG(D_MGS, "global '%s'\n", quota);
3106
3107                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
3108                     strcmp(tmp, "none") != 0) {
3109                         CERROR("enable option(%s) isn't supported\n", tmp);
3110                         return -EINVAL;
3111                 }
3112         }
3113
3114         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
3115         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
3116         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3117         if (lcr == NULL)
3118                 return -ENOMEM;
3119
3120         /* truncate the comment to the parameter name */
3121         ptr = tmp - 1;
3122         sep = *ptr;
3123         *ptr = '\0';
3124
3125         /* XXX we duplicated quota enable information in all server
3126          *     config logs, it should be moved to a separate config
3127          *     log once we cleanup the config log for global param. */
3128         /* modify all servers */
3129         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3130                                       *tmp == '\0' ? NULL : lcr,
3131                                       mti->mti_fsname, quota, 1);
3132         *ptr = sep;
3133         lustre_cfg_rec_free(lcr);
3134         return rc < 0 ? rc : 0;
3135 }
3136
3137 static int mgs_srpc_set_param_disk(const struct lu_env *env,
3138                                    struct mgs_device *mgs,
3139                                    struct fs_db *fsdb,
3140                                    struct mgs_target_info *mti,
3141                                    char *param)
3142 {
3143         struct mgs_thread_info  *mgi = mgs_env_info(env);
3144         struct llog_cfg_rec     *lcr;
3145         struct llog_handle      *llh = NULL;
3146         char                    *logname;
3147         char                    *comment, *ptr;
3148         int                      rc, len;
3149
3150         ENTRY;
3151
3152         /* get comment */
3153         ptr = strchr(param, '=');
3154         LASSERT(ptr != NULL);
3155         len = ptr - param;
3156
3157         OBD_ALLOC(comment, len + 1);
3158         if (comment == NULL)
3159                 RETURN(-ENOMEM);
3160         strncpy(comment, param, len);
3161         comment[len] = '\0';
3162
3163         /* prepare lcfg */
3164         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
3165         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
3166         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
3167         if (lcr == NULL)
3168                 GOTO(out_comment, rc = -ENOMEM);
3169
3170         /* construct log name */
3171         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
3172         if (rc < 0)
3173                 GOTO(out_lcfg, rc);
3174
3175         if (mgs_log_is_empty(env, mgs, logname)) {
3176                 rc = record_start_log(env, mgs, &llh, logname);
3177                 if (rc < 0)
3178                         GOTO(out, rc);
3179                 record_end_log(env, &llh);
3180         }
3181
3182         /* obsolete old one */
3183         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
3184                         comment, CM_SKIP);
3185         if (rc < 0)
3186                 GOTO(out, rc);
3187         /* write the new one */
3188         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
3189                                   mti->mti_svname, comment);
3190         if (rc)
3191                 CERROR("%s: error writing log %s: rc = %d\n",
3192                        mgs->mgs_obd->obd_name, logname, rc);
3193 out:
3194         name_destroy(&logname);
3195 out_lcfg:
3196         lustre_cfg_rec_free(lcr);
3197 out_comment:
3198         OBD_FREE(comment, len + 1);
3199         RETURN(rc);
3200 }
3201
3202 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3203                                         char *param)
3204 {
3205         char    *ptr;
3206
3207         /* disable the adjustable udesc parameter for now, i.e. use default
3208          * setting that client always ship udesc to MDT if possible. to enable
3209          * it simply remove the following line */
3210         goto error_out;
3211
3212         ptr = strchr(param, '=');
3213         if (ptr == NULL)
3214                 goto error_out;
3215         *ptr++ = '\0';
3216
3217         if (strcmp(param, PARAM_SRPC_UDESC))
3218                 goto error_out;
3219
3220         if (strcmp(ptr, "yes") == 0) {
3221                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3222                 CWARN("Enable user descriptor shipping from client to MDT\n");
3223         } else if (strcmp(ptr, "no") == 0) {
3224                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3225                 CWARN("Disable user descriptor shipping from client to MDT\n");
3226         } else {
3227                 *(ptr - 1) = '=';
3228                 goto error_out;
3229         }
3230         return 0;
3231
3232 error_out:
3233         CERROR("Invalid param: %s\n", param);
3234         return -EINVAL;
3235 }
3236
3237 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3238                                   const char *svname,
3239                                   char *param)
3240 {
3241         struct sptlrpc_rule      rule;
3242         struct sptlrpc_rule_set *rset;
3243         int                      rc;
3244         ENTRY;
3245
3246         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3247                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3248                 RETURN(-EINVAL);
3249         }
3250
3251         if (strncmp(param, PARAM_SRPC_UDESC,
3252                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3253                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3254         }
3255
3256         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3257                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3258                 RETURN(-EINVAL);
3259         }
3260
3261         param += sizeof(PARAM_SRPC_FLVR) - 1;
3262
3263         rc = sptlrpc_parse_rule(param, &rule);
3264         if (rc)
3265                 RETURN(rc);
3266
3267         /* mgs rules implies must be mgc->mgs */
3268         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3269                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3270                      rule.sr_from != LUSTRE_SP_ANY) ||
3271                     (rule.sr_to != LUSTRE_SP_MGS &&
3272                      rule.sr_to != LUSTRE_SP_ANY))
3273                         RETURN(-EINVAL);
3274         }
3275
3276         /* preapre room for this coming rule. svcname format should be:
3277          * - fsname: general rule
3278          * - fsname-tgtname: target-specific rule
3279          */
3280         if (strchr(svname, '-')) {
3281                 struct mgs_tgt_srpc_conf *tgtconf;
3282                 int                       found = 0;
3283
3284                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3285                      tgtconf = tgtconf->mtsc_next) {
3286                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3287                                 found = 1;
3288                                 break;
3289                         }
3290                 }
3291
3292                 if (!found) {
3293                         int name_len;
3294
3295                         OBD_ALLOC_PTR(tgtconf);
3296                         if (tgtconf == NULL)
3297                                 RETURN(-ENOMEM);
3298
3299                         name_len = strlen(svname);
3300
3301                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3302                         if (tgtconf->mtsc_tgt == NULL) {
3303                                 OBD_FREE_PTR(tgtconf);
3304                                 RETURN(-ENOMEM);
3305                         }
3306                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3307
3308                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3309                         fsdb->fsdb_srpc_tgt = tgtconf;
3310                 }
3311
3312                 rset = &tgtconf->mtsc_rset;
3313         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3314                 /* put _mgs related srpc rule directly in mgs ruleset */
3315                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3316         } else {
3317                 rset = &fsdb->fsdb_srpc_gen;
3318         }
3319
3320         rc = sptlrpc_rule_set_merge(rset, &rule);
3321
3322         RETURN(rc);
3323 }
3324
3325 static int mgs_srpc_set_param(const struct lu_env *env,
3326                               struct mgs_device *mgs,
3327                               struct fs_db *fsdb,
3328                               struct mgs_target_info *mti,
3329                               char *param)
3330 {
3331         char                   *copy;
3332         int                     rc, copy_size;
3333         ENTRY;
3334
3335 #ifndef HAVE_GSS
3336         RETURN(-EINVAL);
3337 #endif
3338         /* keep a copy of original param, which could be destroied
3339          * during parsing */
3340         copy_size = strlen(param) + 1;
3341         OBD_ALLOC(copy, copy_size);
3342         if (copy == NULL)
3343                 return -ENOMEM;
3344         memcpy(copy, param, copy_size);
3345
3346         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3347         if (rc)
3348                 goto out_free;
3349
3350         /* previous steps guaranteed the syntax is correct */
3351         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3352         if (rc)
3353                 goto out_free;
3354
3355         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3356                 /*
3357                  * for mgs rules, make them effective immediately.
3358                  */
3359                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3360                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3361                                                  &fsdb->fsdb_srpc_gen);
3362         }
3363
3364 out_free:
3365         OBD_FREE(copy, copy_size);
3366         RETURN(rc);
3367 }
3368
3369 struct mgs_srpc_read_data {
3370         struct fs_db   *msrd_fsdb;
3371         int             msrd_skip;
3372 };
3373
3374 static int mgs_srpc_read_handler(const struct lu_env *env,
3375                                  struct llog_handle *llh,
3376                                  struct llog_rec_hdr *rec, void *data)
3377 {
3378         struct mgs_srpc_read_data *msrd = data;
3379         struct cfg_marker         *marker;
3380         struct lustre_cfg         *lcfg = REC_DATA(rec);
3381         char                      *svname, *param;
3382         int                        cfg_len, rc;
3383         ENTRY;
3384
3385         if (rec->lrh_type != OBD_CFG_REC) {
3386                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3387                 RETURN(-EINVAL);
3388         }
3389
3390         cfg_len = REC_DATA_LEN(rec);
3391
3392         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3393         if (rc) {
3394                 CERROR("Insane cfg\n");
3395                 RETURN(rc);
3396         }
3397
3398         if (lcfg->lcfg_command == LCFG_MARKER) {
3399                 marker = lustre_cfg_buf(lcfg, 1);
3400
3401                 if (marker->cm_flags & CM_START &&
3402                     marker->cm_flags & CM_SKIP)
3403                         msrd->msrd_skip = 1;
3404                 if (marker->cm_flags & CM_END)
3405                         msrd->msrd_skip = 0;
3406
3407                 RETURN(0);
3408         }
3409
3410         if (msrd->msrd_skip)
3411                 RETURN(0);
3412
3413         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3414                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3415                 RETURN(0);
3416         }
3417
3418         svname = lustre_cfg_string(lcfg, 0);
3419         if (svname == NULL) {
3420                 CERROR("svname is empty\n");
3421                 RETURN(0);
3422         }
3423
3424         param = lustre_cfg_string(lcfg, 1);
3425         if (param == NULL) {
3426                 CERROR("param is empty\n");
3427                 RETURN(0);
3428         }
3429
3430         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3431         if (rc)
3432                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3433
3434         RETURN(0);
3435 }
3436
3437 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3438                                 struct mgs_device *mgs,
3439                                 struct fs_db *fsdb)
3440 {
3441         struct llog_handle        *llh = NULL;
3442         struct llog_ctxt          *ctxt;
3443         char                      *logname;
3444         struct mgs_srpc_read_data  msrd;
3445         int                        rc;
3446         ENTRY;
3447
3448         /* construct log name */
3449         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3450         if (rc)
3451                 RETURN(rc);
3452
3453         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3454         LASSERT(ctxt != NULL);
3455
3456         if (mgs_log_is_empty(env, mgs, logname))
3457                 GOTO(out, rc = 0);
3458
3459         rc = llog_open(env, ctxt, &llh, NULL, logname,
3460                        LLOG_OPEN_EXISTS);
3461         if (rc < 0) {
3462                 if (rc == -ENOENT)
3463                         rc = 0;
3464                 GOTO(out, rc);
3465         }
3466
3467         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3468         if (rc)
3469                 GOTO(out_close, rc);
3470
3471         if (llog_get_size(llh) <= 1)
3472                 GOTO(out_close, rc = 0);
3473
3474         msrd.msrd_fsdb = fsdb;
3475         msrd.msrd_skip = 0;
3476
3477         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3478                           NULL);
3479
3480 out_close:
3481         llog_close(env, llh);
3482 out:
3483         llog_ctxt_put(ctxt);
3484         name_destroy(&logname);
3485
3486         if (rc)
3487                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3488         RETURN(rc);
3489 }
3490
3491 /* Permanent settings of all parameters by writing into the appropriate
3492  * configuration logs.
3493  * A parameter with null value ("<param>='\0'") means to erase it out of
3494  * the logs.
3495  */
3496 static int mgs_write_log_param(const struct lu_env *env,
3497                                struct mgs_device *mgs, struct fs_db *fsdb,
3498                                struct mgs_target_info *mti, char *ptr)
3499 {
3500         struct mgs_thread_info *mgi = mgs_env_info(env);
3501         char *logname;
3502         char *tmp;
3503         int rc = 0;
3504         ENTRY;
3505
3506         /* For various parameter settings, we have to figure out which logs
3507            care about them (e.g. both mdt and client for lov settings) */
3508         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3509
3510         /* The params are stored in MOUNT_DATA_FILE and modified via
3511            tunefs.lustre, or set using lctl conf_param */
3512
3513         /* Processed in lustre_start_mgc */
3514         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3515                 GOTO(end, rc);
3516
3517         /* Processed in ost/mdt */
3518         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3519                 GOTO(end, rc);
3520
3521         /* Processed in mgs_write_log_ost */
3522         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3523                 if (mti->mti_flags & LDD_F_PARAM) {
3524                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3525                                            "changed with tunefs.lustre"
3526                                            "and --writeconf\n", ptr);
3527                         rc = -EPERM;
3528                 }
3529                 GOTO(end, rc);
3530         }
3531
3532         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3533                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3534                 GOTO(end, rc);
3535         }
3536
3537         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3538                 /* Add a failover nidlist */
3539                 rc = 0;
3540                 /* We already processed failovers params for new
3541                    targets in mgs_write_log_target */
3542                 if (mti->mti_flags & LDD_F_PARAM) {
3543                         CDEBUG(D_MGS, "Adding failnode\n");
3544                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3545                 }
3546                 GOTO(end, rc);
3547         }
3548
3549         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3550                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3551                 GOTO(end, rc);
3552         }
3553
3554         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3555                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3556                 GOTO(end, rc);
3557         }
3558
3559         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3560             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3561                 /* active=0 means off, anything else means on */
3562                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3563                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3564                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3565                 int i;
3566
3567                 if (!deactive_osc) {
3568                         __u32   index;
3569
3570                         rc = server_name2index(mti->mti_svname, &index, NULL);
3571                         if (rc < 0)
3572                                 GOTO(end, rc);
3573
3574                         if (index == 0) {
3575                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3576                                                    " (de)activated.\n",
3577                                                    mti->mti_svname);
3578                                 GOTO(end, rc = -EINVAL);
3579                         }
3580                 }
3581
3582                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3583                               flag ? "de" : "re", mti->mti_svname);
3584                 /* Modify clilov */
3585                 rc = name_create(&logname, mti->mti_fsname, "-client");
3586                 if (rc < 0)
3587                         GOTO(end, rc);
3588                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3589                                 mti->mti_svname,
3590                                 deactive_osc ? "add osc" : "add mdc", flag);
3591                 name_destroy(&logname);
3592                 if (rc < 0)
3593                         goto active_err;
3594
3595                 /* Modify mdtlov */
3596                 /* Add to all MDT logs for DNE */
3597                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3598                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3599                                 continue;
3600                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3601                         if (rc < 0)
3602                                 GOTO(end, rc);
3603                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3604                                         mti->mti_svname,
3605                                         deactive_osc ? "add osc" : "add osp",
3606                                         flag);
3607                         name_destroy(&logname);
3608                         if (rc < 0)
3609                                 goto active_err;
3610                 }
3611 active_err:
3612                 if (rc < 0) {
3613                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3614                                            "log (%d). No permanent "
3615                                            "changes were made to the "
3616                                            "config log.\n",
3617                                            mti->mti_svname, rc);
3618                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3619                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3620                                                    " because the log"
3621                                                    "is in the old 1.4"
3622                                                    "style. Consider "
3623                                                    " --writeconf to "
3624                                                    "update the logs.\n");
3625                         GOTO(end, rc);
3626                 }
3627                 /* Fall through to osc/mdc proc for deactivating live
3628                    OSC/OSP on running MDT / clients. */
3629         }
3630         /* Below here, let obd's XXX_process_config methods handle it */
3631
3632         /* All lov. in proc */
3633         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3634                 char *mdtlovname;
3635
3636                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3637                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3638                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3639                                            "set on the MDT, not %s. "
3640                                            "Ignoring.\n",
3641                                            mti->mti_svname);
3642                         GOTO(end, rc = 0);
3643                 }
3644
3645                 /* Modify mdtlov */
3646                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3647                         GOTO(end, rc = -ENODEV);
3648
3649                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3650                                              mti->mti_stripe_index);
3651                 if (rc)
3652                         GOTO(end, rc);
3653                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3654                                   &mgi->mgi_bufs, mdtlovname, ptr);
3655                 name_destroy(&logname);
3656                 name_destroy(&mdtlovname);
3657                 if (rc)
3658                         GOTO(end, rc);
3659
3660                 /* Modify clilov */
3661                 rc = name_create(&logname, mti->mti_fsname, "-client");
3662                 if (rc)
3663                         GOTO(end, rc);
3664                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3665                                   fsdb->fsdb_clilov, ptr);
3666                 name_destroy(&logname);
3667                 GOTO(end, rc);
3668         }
3669
3670         /* All osc., mdc., llite. params in proc */
3671         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3672             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3673             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3674                 char *cname;
3675
3676                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3677                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3678                                            " cannot be modified. Consider"
3679                                            " updating the configuration with"
3680                                            " --writeconf\n",
3681                                            mti->mti_svname);
3682                         GOTO(end, rc = -EINVAL);
3683                 }
3684                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3685                         rc = name_create(&cname, mti->mti_fsname, "-client");
3686                         /* Add the client type to match the obdname in
3687                            class_config_llog_handler */
3688                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3689                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3690                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3691                         rc = name_create(&cname, mti->mti_svname, "-osc");
3692                 } else {
3693                         GOTO(end, rc = -EINVAL);
3694                 }
3695                 if (rc)
3696                         GOTO(end, rc);
3697
3698                 /* Forbid direct update of llite root squash parameters.
3699                  * These parameters are indirectly set via the MDT settings.
3700                  * See (LU-1778) */
3701                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3702                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3703                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3704                         LCONSOLE_ERROR("%s: root squash parameters can only "
3705                                 "be updated through MDT component\n",
3706                                 mti->mti_fsname);
3707                         name_destroy(&cname);
3708                         GOTO(end, rc = -EINVAL);
3709                 }
3710
3711                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3712
3713                 /* Modify client */
3714                 rc = name_create(&logname, mti->mti_fsname, "-client");
3715                 if (rc) {
3716                         name_destroy(&cname);
3717                         GOTO(end, rc);
3718                 }
3719                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3720                                   cname, ptr);
3721
3722                 /* osc params affect the MDT as well */
3723                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3724                         int i;
3725
3726                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3727                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3728                                         continue;
3729                                 name_destroy(&cname);
3730                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3731                                                          fsdb, i);
3732                                 name_destroy(&logname);
3733                                 if (rc)
3734                                         break;
3735                                 rc = name_create_mdt(&logname,
3736                                                      mti->mti_fsname, i);
3737                                 if (rc)
3738                                         break;
3739                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3740                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3741                                                           mti, logname,
3742                                                           &mgi->mgi_bufs,
3743                                                           cname, ptr);
3744                                         if (rc)
3745                                                 break;
3746                                 }
3747                         }
3748                 }
3749
3750                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
3751                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
3752                     rc == 0) {
3753                         char suffix[16];
3754                         char *lodname = NULL;
3755                         char *param_str = NULL;
3756                         int i;
3757                         int index;
3758
3759                         /* replace mdc with osp */
3760                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
3761                         rc = server_name2index(mti->mti_svname, &index, NULL);
3762                         if (rc < 0) {
3763                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3764                                 GOTO(end, rc);
3765                         }
3766
3767                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3768                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3769                                         continue;
3770
3771                                 if (i == index)
3772                                         continue;
3773
3774                                 name_destroy(&logname);
3775                                 rc = name_create_mdt(&logname, mti->mti_fsname,
3776                                                      i);
3777                                 if (rc < 0)
3778                                         break;
3779
3780                                 if (mgs_log_is_empty(env, mgs, logname))
3781                                         continue;
3782
3783                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
3784                                          i);
3785                                 name_destroy(&cname);
3786                                 rc = name_create(&cname, mti->mti_svname,
3787                                                  suffix);
3788                                 if (rc < 0)
3789                                         break;
3790
3791                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3792                                                   &mgi->mgi_bufs, cname, ptr);
3793                                 if (rc < 0)
3794                                         break;
3795
3796                                 /* Add configuration log for noitfying LOD
3797                                  * to active/deactive the OSP. */
3798                                 name_destroy(&param_str);
3799                                 rc = name_create(&param_str, cname,
3800                                                  (*tmp == '0') ?  ".active=0" :
3801                                                  ".active=1");
3802                                 if (rc < 0)
3803                                         break;
3804
3805                                 name_destroy(&lodname);
3806                                 rc = name_create(&lodname, logname, "-mdtlov");
3807                                 if (rc < 0)
3808                                         break;
3809
3810                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3811                                                   &mgi->mgi_bufs, lodname,
3812                                                   param_str);
3813                                 if (rc < 0)
3814                                         break;
3815                         }
3816                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3817                         name_destroy(&lodname);
3818                         name_destroy(&param_str);
3819                 }
3820
3821                 name_destroy(&logname);
3822                 name_destroy(&cname);
3823                 GOTO(end, rc);
3824         }
3825
3826         /* All mdt. params in proc */
3827         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3828                 int i;
3829                 __u32 idx;
3830
3831                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3832                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3833                             MTI_NAME_MAXLEN) == 0)
3834                         /* device is unspecified completely? */
3835                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3836                 else
3837                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3838                 if (rc < 0)
3839                         goto active_err;
3840                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3841                         goto active_err;
3842                 if (rc & LDD_F_SV_ALL) {
3843                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3844                                 if (!test_bit(i,
3845                                                   fsdb->fsdb_mdt_index_map))
3846                                         continue;
3847                                 rc = name_create_mdt(&logname,
3848                                                 mti->mti_fsname, i);
3849                                 if (rc)
3850                                         goto active_err;
3851                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3852                                                   logname, &mgi->mgi_bufs,
3853                                                   logname, ptr);
3854                                 name_destroy(&logname);
3855                                 if (rc)
3856                                         goto active_err;
3857                         }
3858                 } else {
3859                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3860                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3861                                 LCONSOLE_ERROR("%s: root squash parameters "
3862                                         "cannot be applied to a single MDT\n",
3863                                         mti->mti_fsname);
3864                                 GOTO(end, rc = -EINVAL);
3865                         }
3866                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3867                                           mti->mti_svname, &mgi->mgi_bufs,
3868                                           mti->mti_svname, ptr);
3869                         if (rc)
3870                                 goto active_err;
3871                 }
3872
3873                 /* root squash settings are also applied to llite
3874                  * config log (see LU-1778) */
3875                 if (rc == 0 &&
3876                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3877                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3878                         char *cname;
3879                         char *ptr2;
3880
3881                         rc = name_create(&cname, mti->mti_fsname, "-client");
3882                         if (rc)
3883                                 GOTO(end, rc);
3884                         rc = name_create(&logname, mti->mti_fsname, "-client");
3885                         if (rc) {
3886                                 name_destroy(&cname);
3887                                 GOTO(end, rc);
3888                         }
3889                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3890                         if (rc) {
3891                                 name_destroy(&cname);
3892                                 name_destroy(&logname);
3893                                 GOTO(end, rc);
3894                         }
3895                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3896                                           &mgi->mgi_bufs, cname, ptr2);
3897                         name_destroy(&ptr2);
3898                         name_destroy(&logname);
3899                         name_destroy(&cname);
3900                 }
3901                 GOTO(end, rc);
3902         }
3903
3904         /* All mdd., ost. and osd. params in proc */
3905         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3906             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3907             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3908                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3909                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3910                         GOTO(end, rc = -ENODEV);
3911
3912                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3913                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3914                 GOTO(end, rc);
3915         }
3916
3917         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3918
3919 end:
3920         if (rc)
3921                 CERROR("err %d on param '%s'\n", rc, ptr);
3922
3923         RETURN(rc);
3924 }
3925
3926 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3927                          struct mgs_target_info *mti, struct fs_db *fsdb)
3928 {
3929         char    *buf, *params;
3930         int      rc = -EINVAL;
3931
3932         ENTRY;
3933
3934         /* set/check the new target index */
3935         rc = mgs_set_index(env, mgs, mti);
3936         if (rc < 0)
3937                 RETURN(rc);
3938
3939         if (rc == EALREADY) {
3940                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3941                               mti->mti_stripe_index, mti->mti_svname);
3942                 /* We would like to mark old log sections as invalid
3943                    and add new log sections in the client and mdt logs.
3944                    But if we add new sections, then live clients will
3945                    get repeat setup instructions for already running
3946                    osc's. So don't update the client/mdt logs. */
3947                 mti->mti_flags &= ~LDD_F_UPDATE;
3948                 rc = 0;
3949         }
3950
3951         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
3952                          cfs_fail_val : 10);
3953
3954         mutex_lock(&fsdb->fsdb_mutex);
3955
3956         if (mti->mti_flags &
3957             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3958                 /* Generate a log from scratch */
3959                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3960                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3961                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3962                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3963                 } else {
3964                         CERROR("Unknown target type %#x, can't create log for "
3965                                "%s\n", mti->mti_flags, mti->mti_svname);
3966                 }
3967                 if (rc) {
3968                         CERROR("Can't write logs for %s (%d)\n",
3969                                mti->mti_svname, rc);
3970                         GOTO(out_up, rc);
3971                 }
3972         } else {
3973                 /* Just update the params from tunefs in mgs_write_log_params */
3974                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3975                 mti->mti_flags |= LDD_F_PARAM;
3976         }
3977
3978         /* allocate temporary buffer, where class_get_next_param will
3979            make copy of a current  parameter */
3980         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3981         if (buf == NULL)
3982                 GOTO(out_up, rc = -ENOMEM);
3983         params = mti->mti_params;
3984         while (params != NULL) {
3985                 rc = class_get_next_param(&params, buf);
3986                 if (rc) {
3987                         if (rc == 1)
3988                                 /* there is no next parameter, that is
3989                                    not an error */
3990                                 rc = 0;
3991                         break;
3992                 }
3993                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3994                        params, buf);
3995                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3996                 if (rc)
3997                         break;
3998         }
3999
4000         OBD_FREE(buf, strlen(mti->mti_params) + 1);
4001
4002 out_up:
4003         mutex_unlock(&fsdb->fsdb_mutex);
4004         RETURN(rc);
4005 }
4006
4007 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
4008 {
4009         struct llog_ctxt        *ctxt;
4010         int                      rc = 0;
4011
4012         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4013         if (ctxt == NULL) {
4014                 CERROR("%s: MGS config context doesn't exist\n",
4015                        mgs->mgs_obd->obd_name);
4016                 rc = -ENODEV;
4017         } else {
4018                 rc = llog_erase(env, ctxt, NULL, name);
4019                 /* llog may not exist */
4020                 if (rc == -ENOENT)
4021                         rc = 0;
4022                 llog_ctxt_put(ctxt);
4023         }
4024
4025         if (rc)
4026                 CERROR("%s: failed to clear log %s: %d\n",
4027                        mgs->mgs_obd->obd_name, name, rc);
4028
4029         return rc;
4030 }
4031
4032 /* erase all logs for the given fs */
4033 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs,
4034                    const char *fsname)
4035 {
4036         struct list_head log_list;
4037         struct mgs_direntry *dirent, *n;
4038         char barrier_name[20] = {};
4039         char *suffix;
4040         int count = 0;
4041         int rc, len = strlen(fsname);
4042         ENTRY;
4043
4044         mutex_lock(&mgs->mgs_mutex);
4045
4046         /* Find all the logs in the CONFIGS directory */
4047         rc = class_dentry_readdir(env, mgs, &log_list);
4048         if (rc) {
4049                 mutex_unlock(&mgs->mgs_mutex);
4050                 RETURN(rc);
4051         }
4052
4053         if (list_empty(&log_list)) {
4054                 mutex_unlock(&mgs->mgs_mutex);
4055                 RETURN(-ENOENT);
4056         }
4057
4058         snprintf(barrier_name, sizeof(barrier_name) - 1, "%s-%s",
4059                  fsname, BARRIER_FILENAME);
4060         /* Delete the barrier fsdb */
4061         mgs_remove_fsdb_by_name(mgs, barrier_name);
4062         /* Delete the fs db */
4063         mgs_remove_fsdb_by_name(mgs, fsname);
4064         mutex_unlock(&mgs->mgs_mutex);
4065
4066         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4067                 list_del_init(&dirent->mde_list);
4068                 suffix = strrchr(dirent->mde_name, '-');
4069                 if (suffix != NULL) {
4070                         if ((len == suffix - dirent->mde_name) &&
4071                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
4072                                 CDEBUG(D_MGS, "Removing log %s\n",
4073                                        dirent->mde_name);
4074                                 mgs_erase_log(env, mgs, dirent->mde_name);
4075                                 count++;
4076                         }
4077                 }
4078                 mgs_direntry_free(dirent);
4079         }
4080
4081         if (count == 0)
4082                 rc = -ENOENT;
4083
4084         RETURN(rc);
4085 }
4086
4087 /* list all logs for the given fs */
4088 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
4089                   struct obd_ioctl_data *data)
4090 {
4091         struct list_head         log_list;
4092         struct mgs_direntry     *dirent, *n;
4093         char                    *out, *suffix;
4094         int                      l, remains, rc;
4095
4096         ENTRY;
4097
4098         /* Find all the logs in the CONFIGS directory */
4099         rc = class_dentry_readdir(env, mgs, &log_list);
4100         if (rc)
4101                 RETURN(rc);
4102
4103         out = data->ioc_bulk;
4104         remains = data->ioc_inllen1;
4105         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4106                 list_del_init(&dirent->mde_list);
4107                 suffix = strrchr(dirent->mde_name, '-');
4108                 if (suffix != NULL) {
4109                         l = snprintf(out, remains, "config log: $%s\n",
4110                                      dirent->mde_name);
4111                         out += l;
4112                         remains -= l;
4113                 }
4114                 mgs_direntry_free(dirent);
4115                 if (remains <= 0)
4116                         break;
4117         }
4118         RETURN(rc);
4119 }
4120
4121 struct mgs_lcfg_fork_data {
4122         struct lustre_cfg_bufs   mlfd_bufs;
4123         struct mgs_device       *mlfd_mgs;
4124         struct llog_handle      *mlfd_llh;
4125         const char              *mlfd_oldname;
4126         const char              *mlfd_newname;
4127         char                     mlfd_data[0];
4128 };
4129
4130 static bool contain_valid_fsname(char *buf, const char *fsname,
4131                                  int buflen, int namelen)
4132 {
4133         if (buflen < namelen)
4134                 return false;
4135
4136         if (memcmp(buf, fsname, namelen) != 0)
4137                 return false;
4138
4139         if (buf[namelen] != '\0' && buf[namelen] != '-')
4140                 return false;
4141
4142         return true;
4143 }
4144
4145 static int mgs_lcfg_fork_handler(const struct lu_env *env,
4146                                  struct llog_handle *o_llh,
4147                                  struct llog_rec_hdr *o_rec, void *data)
4148 {
4149         struct mgs_lcfg_fork_data *mlfd = data;
4150         struct lustre_cfg_bufs *n_bufs = &mlfd->mlfd_bufs;
4151         struct lustre_cfg *o_lcfg = (struct lustre_cfg *)(o_rec + 1);
4152         struct llog_cfg_rec *lcr;
4153         char *o_buf;
4154         char *n_buf = mlfd->mlfd_data;
4155         int o_buflen;
4156         int o_namelen = strlen(mlfd->mlfd_oldname);
4157         int n_namelen = strlen(mlfd->mlfd_newname);
4158         int diff = n_namelen - o_namelen;
4159         __u32 cmd = o_lcfg->lcfg_command;
4160         __u32 cnt = o_lcfg->lcfg_bufcount;
4161         int rc;
4162         int i;
4163         ENTRY;
4164
4165         /* buf[0] */
4166         o_buf = lustre_cfg_buf(o_lcfg, 0);
4167         o_buflen = o_lcfg->lcfg_buflens[0];
4168         if (contain_valid_fsname(o_buf, mlfd->mlfd_oldname, o_buflen,
4169                                  o_namelen)) {
4170                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4171                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4172                        o_buflen - o_namelen);
4173                 lustre_cfg_bufs_reset(n_bufs, n_buf);
4174                 n_buf += cfs_size_round(o_buflen + diff);
4175         } else {
4176                 lustre_cfg_bufs_reset(n_bufs, o_buflen != 0 ? o_buf : NULL);
4177         }
4178
4179         switch (cmd) {
4180         case LCFG_MARKER: {
4181                 struct cfg_marker *o_marker;
4182                 struct cfg_marker *n_marker;
4183                 int tgt_namelen;
4184
4185                 if (cnt != 2) {
4186                         CDEBUG(D_MGS, "Unknown cfg marker entry with %d "
4187                                "buffers\n", cnt);
4188                         RETURN(-EINVAL);
4189                 }
4190
4191                 /* buf[1] is marker */
4192                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4193                 o_buflen = o_lcfg->lcfg_buflens[1];
4194                 o_marker = (struct cfg_marker *)o_buf;
4195                 if (!contain_valid_fsname(o_marker->cm_tgtname,
4196                                           mlfd->mlfd_oldname,
4197                                           sizeof(o_marker->cm_tgtname),
4198                                           o_namelen)) {
4199                         lustre_cfg_bufs_set(n_bufs, 1, o_marker,
4200                                             sizeof(*o_marker));
4201                         break;
4202                 }
4203
4204                 n_marker = (struct cfg_marker *)n_buf;
4205                 *n_marker = *o_marker;
4206                 memcpy(n_marker->cm_tgtname, mlfd->mlfd_newname, n_namelen);
4207                 tgt_namelen = strlen(o_marker->cm_tgtname);
4208                 if (tgt_namelen > o_namelen)
4209                         memcpy(n_marker->cm_tgtname + n_namelen,
4210                                o_marker->cm_tgtname + o_namelen,
4211                                tgt_namelen - o_namelen);
4212                 n_marker->cm_tgtname[tgt_namelen + diff] = '\0';
4213                 lustre_cfg_bufs_set(n_bufs, 1, n_marker, sizeof(*n_marker));
4214                 break;
4215         }
4216         case LCFG_PARAM:
4217         case LCFG_SET_PARAM: {
4218                 for (i = 1; i < cnt; i++)
4219                         /* buf[i] is the param value, reuse it directly */
4220                         lustre_cfg_bufs_set(n_bufs, i,
4221                                             lustre_cfg_buf(o_lcfg, i),
4222                                             o_lcfg->lcfg_buflens[i]);
4223                 break;
4224         }
4225         case LCFG_POOL_NEW:
4226         case LCFG_POOL_ADD:
4227         case LCFG_POOL_REM:
4228         case LCFG_POOL_DEL: {
4229                 if (cnt < 3 || cnt > 4) {
4230                         CDEBUG(D_MGS, "Unknown cfg pool (%x) entry with %d "
4231                                "buffers\n", cmd, cnt);
4232                         RETURN(-EINVAL);
4233                 }
4234
4235                 /* buf[1] is fsname */
4236                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4237                 o_buflen = o_lcfg->lcfg_buflens[1];
4238                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4239                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4240                        o_buflen - o_namelen);
4241                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen + diff);
4242                 n_buf += cfs_size_round(o_buflen + diff);
4243
4244                 /* buf[2] is the pool name, reuse it directly */
4245                 lustre_cfg_bufs_set(n_bufs, 2, lustre_cfg_buf(o_lcfg, 2),
4246                                     o_lcfg->lcfg_buflens[2]);
4247
4248                 if (cnt == 3)
4249                         break;
4250
4251                 /* buf[3] is ostname */
4252                 o_buf = lustre_cfg_buf(o_lcfg, 3);
4253                 o_buflen = o_lcfg->lcfg_buflens[3];
4254                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4255                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4256                        o_buflen - o_namelen);
4257                 lustre_cfg_bufs_set(n_bufs, 3, n_buf, o_buflen + diff);
4258                 break;
4259         }
4260         case LCFG_SETUP: {
4261                 if (cnt == 2) {
4262                         o_buflen = o_lcfg->lcfg_buflens[1];
4263                         if (o_buflen == sizeof(struct lov_desc) ||
4264                             o_buflen == sizeof(struct lmv_desc)) {
4265                                 char *o_uuid;
4266                                 char *n_uuid;
4267                                 int uuid_len;
4268
4269                                 /* buf[1] */
4270                                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4271                                 if (o_buflen == sizeof(struct lov_desc)) {
4272                                         struct lov_desc *o_desc =
4273                                                 (struct lov_desc *)o_buf;
4274                                         struct lov_desc *n_desc =
4275                                                 (struct lov_desc *)n_buf;
4276
4277                                         *n_desc = *o_desc;
4278                                         o_uuid = o_desc->ld_uuid.uuid;
4279                                         n_uuid = n_desc->ld_uuid.uuid;
4280                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4281                                 } else {
4282                                         struct lmv_desc *o_desc =
4283                                                 (struct lmv_desc *)o_buf;
4284                                         struct lmv_desc *n_desc =
4285                                                 (struct lmv_desc *)n_buf;
4286
4287                                         *n_desc = *o_desc;
4288                                         o_uuid = o_desc->ld_uuid.uuid;
4289                                         n_uuid = n_desc->ld_uuid.uuid;
4290                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4291                                 }
4292
4293                                 if (unlikely(!contain_valid_fsname(o_uuid,
4294                                                 mlfd->mlfd_oldname, uuid_len,
4295                                                 o_namelen))) {
4296                                         lustre_cfg_bufs_set(n_bufs, 1, o_buf,
4297                                                             o_buflen);
4298                                         break;
4299                                 }
4300
4301                                 memcpy(n_uuid, mlfd->mlfd_newname, n_namelen);
4302                                 uuid_len = strlen(o_uuid);
4303                                 if (uuid_len > o_namelen)
4304                                         memcpy(n_uuid + n_namelen,
4305                                                o_uuid + o_namelen,
4306                                                uuid_len - o_namelen);
4307                                 n_uuid[uuid_len + diff] = '\0';
4308                                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen);
4309                                 break;
4310                         } /* else case fall through */
4311                 } /* else case fall through */
4312         }
4313         default: {
4314                 for (i = 1; i < cnt; i++) {
4315                         o_buflen = o_lcfg->lcfg_buflens[i];
4316                         if (o_buflen == 0)
4317                                 continue;
4318
4319                         o_buf = lustre_cfg_buf(o_lcfg, i);
4320                         if (!contain_valid_fsname(o_buf, mlfd->mlfd_oldname,
4321                                                   o_buflen, o_namelen)) {
4322                                 lustre_cfg_bufs_set(n_bufs, i, o_buf, o_buflen);
4323                                 continue;
4324                         }
4325
4326                         memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4327                         if (o_buflen == o_namelen) {
4328                                 lustre_cfg_bufs_set(n_bufs, i, n_buf,
4329                                                     n_namelen);
4330                                 n_buf += cfs_size_round(n_namelen);
4331                                 continue;
4332                         }
4333
4334                         memcpy(n_buf + n_namelen, o_buf + o_namelen,
4335                                o_buflen - o_namelen);
4336                         lustre_cfg_bufs_set(n_bufs, i, n_buf, o_buflen + diff);
4337                         n_buf += cfs_size_round(o_buflen + diff);
4338                 }
4339                 break;
4340         }
4341         }
4342
4343         lcr = lustre_cfg_rec_new(cmd, n_bufs);
4344         if (!lcr)
4345                 RETURN(-ENOMEM);
4346
4347         lcr->lcr_cfg = *o_lcfg;
4348         rc = llog_write(env, mlfd->mlfd_llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
4349         lustre_cfg_rec_free(lcr);
4350
4351         RETURN(rc);
4352 }
4353
4354 static int mgs_lcfg_fork_one(const struct lu_env *env, struct mgs_device *mgs,
4355                              struct mgs_direntry *mde, const char *oldname,
4356                              const char *newname)
4357 {
4358         struct llog_handle *old_llh = NULL;
4359         struct llog_handle *new_llh = NULL;
4360         struct llog_ctxt *ctxt = NULL;
4361         struct mgs_lcfg_fork_data *mlfd = NULL;
4362         char *name_buf = NULL;
4363         int name_buflen;
4364         int old_namelen = strlen(oldname);
4365         int new_namelen = strlen(newname);
4366         int rc;
4367         ENTRY;
4368
4369         name_buflen = mde->mde_len + new_namelen - old_namelen;
4370         OBD_ALLOC(name_buf, name_buflen);
4371         if (!name_buf)
4372                 RETURN(-ENOMEM);
4373
4374         memcpy(name_buf, newname, new_namelen);
4375         memcpy(name_buf + new_namelen, mde->mde_name + old_namelen,
4376                mde->mde_len - old_namelen);
4377
4378         CDEBUG(D_MGS, "Fork the config-log from %s to %s\n",
4379                mde->mde_name, name_buf);
4380
4381         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4382         LASSERT(ctxt);
4383
4384         rc = llog_open_create(env, ctxt, &new_llh, NULL, name_buf);
4385         if (rc)
4386                 GOTO(out, rc);
4387
4388         rc = llog_init_handle(env, new_llh, LLOG_F_IS_PLAIN, NULL);
4389         if (rc)
4390                 GOTO(out, rc);
4391
4392         if (unlikely(mgs_log_is_empty(env, mgs, mde->mde_name)))
4393                 GOTO(out, rc = 0);
4394
4395         rc = llog_open(env, ctxt, &old_llh, NULL, mde->mde_name,
4396                        LLOG_OPEN_EXISTS);
4397         if (rc)
4398                 GOTO(out, rc);
4399
4400         rc = llog_init_handle(env, old_llh, LLOG_F_IS_PLAIN, NULL);
4401         if (rc)
4402                 GOTO(out, rc);
4403
4404         new_llh->lgh_hdr->llh_tgtuuid = old_llh->lgh_hdr->llh_tgtuuid;
4405
4406         OBD_ALLOC(mlfd, LLOG_MIN_CHUNK_SIZE);
4407         if (!mlfd)
4408                 GOTO(out, rc = -ENOMEM);
4409
4410         mlfd->mlfd_mgs = mgs;
4411         mlfd->mlfd_llh = new_llh;
4412         mlfd->mlfd_oldname = oldname;
4413         mlfd->mlfd_newname = newname;
4414
4415         rc = llog_process(env, old_llh, mgs_lcfg_fork_handler, mlfd, NULL);
4416         OBD_FREE(mlfd, LLOG_MIN_CHUNK_SIZE);
4417
4418         GOTO(out, rc);
4419
4420 out:
4421         if (old_llh)
4422                 llog_close(env, old_llh);
4423         if (new_llh)
4424                 llog_close(env, new_llh);
4425         if (name_buf)
4426                 OBD_FREE(name_buf, name_buflen);
4427         if (ctxt)
4428                 llog_ctxt_put(ctxt);
4429
4430         return rc;
4431 }
4432
4433 int mgs_lcfg_fork(const struct lu_env *env, struct mgs_device *mgs,
4434                   const char *oldname, const char *newname)
4435 {
4436         struct list_head log_list;
4437         struct mgs_direntry *dirent, *n;
4438         int olen = strlen(oldname);
4439         int nlen = strlen(newname);
4440         int count = 0;
4441         int rc = 0;
4442         ENTRY;
4443
4444         if (strcmp(oldname, newname) == 0)
4445                 RETURN(-EINVAL);
4446
4447         /* lock it to prevent fork/erase/register in parallel. */
4448         mutex_lock(&mgs->mgs_mutex);
4449
4450         rc = class_dentry_readdir(env, mgs, &log_list);
4451         if (rc) {
4452                 mutex_unlock(&mgs->mgs_mutex);
4453                 RETURN(rc);
4454         }
4455
4456         if (list_empty(&log_list)) {
4457                 mutex_unlock(&mgs->mgs_mutex);
4458                 RETURN(-ENOENT);
4459         }
4460
4461         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4462                 char *ptr;
4463
4464                 ptr = strrchr(dirent->mde_name, '-');
4465                 if (ptr) {
4466                         int tlen = ptr - dirent->mde_name;
4467
4468                         if (tlen == nlen &&
4469                             strncmp(newname, dirent->mde_name, tlen) == 0)
4470                                 GOTO(out, rc = -EEXIST);
4471
4472                         if (tlen == olen &&
4473                             strncmp(oldname, dirent->mde_name, tlen) == 0)
4474                                 continue;
4475                 }
4476
4477                 list_del_init(&dirent->mde_list);
4478                 mgs_direntry_free(dirent);
4479         }
4480
4481         if (list_empty(&log_list)) {
4482                 mutex_unlock(&mgs->mgs_mutex);
4483                 RETURN(-ENOENT);
4484         }
4485
4486         list_for_each_entry(dirent, &log_list, mde_list) {
4487                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, newname);
4488                 if (rc)
4489                         break;
4490
4491                 count++;
4492         }
4493
4494 out:
4495         mutex_unlock(&mgs->mgs_mutex);
4496
4497         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4498                 list_del_init(&dirent->mde_list);
4499                 mgs_direntry_free(dirent);
4500         }
4501
4502         if (rc && count > 0)
4503                 mgs_erase_logs(env, mgs, newname);
4504
4505         RETURN(rc);
4506 }
4507
4508 int mgs_lcfg_erase(const struct lu_env *env, struct mgs_device *mgs,
4509                    const char *fsname)
4510 {
4511         int rc;
4512         ENTRY;
4513
4514         rc = mgs_erase_logs(env, mgs, fsname);
4515
4516         RETURN(rc);
4517 }
4518
4519 /* from llog_swab */
4520 static void print_lustre_cfg(struct lustre_cfg *lcfg)
4521 {
4522         int i;
4523         ENTRY;
4524
4525         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
4526         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
4527
4528         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
4529         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
4530         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
4531         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
4532
4533         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
4534         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
4535                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
4536                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
4537                                i, lcfg->lcfg_buflens[i],
4538                                lustre_cfg_string(lcfg, i));
4539                 }
4540         EXIT;
4541 }
4542
4543 /* Setup _mgs fsdb and log
4544  */
4545 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
4546 {
4547         struct fs_db *fsdb = NULL;
4548         int rc;
4549         ENTRY;
4550
4551         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
4552         if (!rc)
4553                 mgs_put_fsdb(mgs, fsdb);
4554
4555         RETURN(rc);
4556 }
4557
4558 /* Setup params fsdb and log
4559  */
4560 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
4561 {
4562         struct fs_db *fsdb = NULL;
4563         struct llog_handle *params_llh = NULL;
4564         int rc;
4565         ENTRY;
4566
4567         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
4568         if (!rc) {
4569                 mutex_lock(&fsdb->fsdb_mutex);
4570                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
4571                 if (!rc)
4572                         rc = record_end_log(env, &params_llh);
4573                 mutex_unlock(&fsdb->fsdb_mutex);
4574                 mgs_put_fsdb(mgs, fsdb);
4575         }
4576
4577         RETURN(rc);
4578 }
4579
4580 /* Cleanup params fsdb and log
4581  */
4582 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
4583 {
4584         int rc;
4585
4586         rc = mgs_erase_logs(env, mgs, PARAMS_FILENAME);
4587         return rc == -ENOENT ? 0 : rc;
4588 }
4589
4590 /* Set a permanent (config log) param for a target or fs
4591  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
4592  *             buf1 contains the single parameter
4593  */
4594 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
4595                  struct lustre_cfg *lcfg, char *fsname)
4596 {
4597         struct fs_db *fsdb = NULL;
4598         struct mgs_target_info *mti = NULL;
4599         char *devname, *param;
4600         char *ptr;
4601         const char *tmp;
4602         __u32 index;
4603         int rc = 0;
4604         bool free = false;
4605         ENTRY;
4606
4607         print_lustre_cfg(lcfg);
4608
4609         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
4610         devname = lustre_cfg_string(lcfg, 0);
4611         param = lustre_cfg_string(lcfg, 1);
4612         if (!devname) {
4613                 /* Assume device name embedded in param:
4614                    lustre-OST0000.osc.max_dirty_mb=32 */
4615                 ptr = strchr(param, '.');
4616                 if (ptr) {
4617                         devname = param;
4618                         *ptr = 0;
4619                         param = ptr + 1;
4620                 }
4621         }
4622         if (!devname) {
4623                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
4624                 RETURN(-ENOSYS);
4625         }
4626
4627         rc = mgs_parse_devname(devname, fsname, NULL);
4628         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
4629                 /* param related to llite isn't allowed to set by OST or MDT */
4630                 if (rc == 0 && strncmp(param, PARAM_LLITE,
4631                                        sizeof(PARAM_LLITE) - 1) == 0)
4632                         RETURN(-EINVAL);
4633         } else {
4634                 /* assume devname is the fsname */
4635                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
4636         }
4637         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
4638
4639         rc = mgs_find_or_make_fsdb(env, mgs,
4640                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
4641                                    PARAMS_FILENAME : fsname, &fsdb);
4642         if (rc)
4643                 RETURN(rc);
4644
4645         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
4646             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
4647             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4648                 CERROR("No filesystem targets for %s. cfg_device from lctl "
4649                        "is '%s'\n", fsname, devname);
4650                 free = true;
4651                 GOTO(out, rc = -EINVAL);
4652         }
4653
4654         /* Create a fake mti to hold everything */
4655         OBD_ALLOC_PTR(mti);
4656         if (!mti)
4657                 GOTO(out, rc = -ENOMEM);
4658         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
4659             >= sizeof(mti->mti_fsname))
4660                 GOTO(out, rc = -E2BIG);
4661         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
4662             >= sizeof(mti->mti_svname))
4663                 GOTO(out, rc = -E2BIG);
4664         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
4665             >= sizeof(mti->mti_params))
4666                 GOTO(out, rc = -E2BIG);
4667         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
4668         if (rc < 0)
4669                 /* Not a valid server; may be only fsname */
4670                 rc = 0;
4671         else
4672                 /* Strip -osc or -mdc suffix from svname */
4673                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
4674                                      mti->mti_svname))
4675                         GOTO(out, rc = -EINVAL);
4676         /*
4677          * Revoke lock so everyone updates.  Should be alright if
4678          * someone was already reading while we were updating the logs,
4679          * so we don't really need to hold the lock while we're
4680          * writing (above).
4681          */
4682         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
4683                 mti->mti_flags = rc | LDD_F_PARAM2;
4684                 mutex_lock(&fsdb->fsdb_mutex);
4685                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
4686                 mutex_unlock(&fsdb->fsdb_mutex);
4687                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
4688         } else {
4689                 mti->mti_flags = rc | LDD_F_PARAM;
4690                 mutex_lock(&fsdb->fsdb_mutex);
4691                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
4692                 mutex_unlock(&fsdb->fsdb_mutex);
4693                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4694         }
4695
4696 out:
4697         if (mti)
4698                 OBD_FREE_PTR(mti);
4699
4700         if (fsdb) {
4701                 if (free)
4702                         mgs_unlink_fsdb(mgs, fsdb);
4703                 mgs_put_fsdb(mgs, fsdb);
4704         }
4705
4706         RETURN(rc);
4707 }
4708
4709 static int mgs_write_log_pool(const struct lu_env *env,
4710                               struct mgs_device *mgs, char *logname,
4711                               struct fs_db *fsdb, char *tgtname,
4712                               enum lcfg_command_type cmd,
4713                               char *fsname, char *poolname,
4714                               char *ostname, char *comment)
4715 {
4716         struct llog_handle *llh = NULL;
4717         int rc;
4718
4719         rc = record_start_log(env, mgs, &llh, logname);
4720         if (rc)
4721                 return rc;
4722         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
4723         if (rc)
4724                 goto out;
4725         rc = record_base(env, llh, tgtname, 0, cmd,
4726                          fsname, poolname, ostname, NULL);
4727         if (rc)
4728                 goto out;
4729         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
4730 out:
4731         record_end_log(env, &llh);
4732         return rc;
4733 }
4734
4735 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
4736                     enum lcfg_command_type cmd, const char *nodemap_name,
4737                     char *param)
4738 {
4739         lnet_nid_t      nid[2];
4740         __u32           idmap[2];
4741         bool            bool_switch;
4742         __u32           int_id;
4743         int             rc = 0;
4744         ENTRY;
4745
4746         switch (cmd) {
4747         case LCFG_NODEMAP_ADD:
4748                 rc = nodemap_add(nodemap_name);
4749                 break;
4750         case LCFG_NODEMAP_DEL:
4751                 rc = nodemap_del(nodemap_name);
4752                 break;
4753         case LCFG_NODEMAP_ADD_RANGE:
4754                 rc = nodemap_parse_range(param, nid);
4755                 if (rc != 0)
4756                         break;
4757                 rc = nodemap_add_range(nodemap_name, nid);
4758                 break;
4759         case LCFG_NODEMAP_DEL_RANGE:
4760                 rc = nodemap_parse_range(param, nid);
4761                 if (rc != 0)
4762                         break;
4763                 rc = nodemap_del_range(nodemap_name, nid);
4764                 break;
4765         case LCFG_NODEMAP_ADMIN:
4766                 bool_switch = simple_strtoul(param, NULL, 10);
4767                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4768                 break;
4769         case LCFG_NODEMAP_DENY_UNKNOWN:
4770                 bool_switch = simple_strtoul(param, NULL, 10);
4771                 rc = nodemap_set_deny_unknown(nodemap_name, bool_switch);
4772                 break;
4773         case LCFG_NODEMAP_TRUSTED:
4774                 bool_switch = simple_strtoul(param, NULL, 10);
4775                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4776                 break;
4777         case LCFG_NODEMAP_SQUASH_UID:
4778                 int_id = simple_strtoul(param, NULL, 10);
4779                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4780                 break;
4781         case LCFG_NODEMAP_SQUASH_GID:
4782                 int_id = simple_strtoul(param, NULL, 10);
4783                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4784                 break;
4785         case LCFG_NODEMAP_ADD_UIDMAP:
4786         case LCFG_NODEMAP_ADD_GIDMAP:
4787                 rc = nodemap_parse_idmap(param, idmap);
4788                 if (rc != 0)
4789                         break;
4790                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4791                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4792                                                idmap);
4793                 else
4794                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4795                                                idmap);
4796                 break;
4797         case LCFG_NODEMAP_DEL_UIDMAP:
4798         case LCFG_NODEMAP_DEL_GIDMAP:
4799                 rc = nodemap_parse_idmap(param, idmap);
4800                 if (rc != 0)
4801                         break;
4802                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4803                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4804                                                idmap);
4805                 else
4806                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4807                                                idmap);
4808                 break;
4809         case LCFG_NODEMAP_SET_FILESET:
4810                 rc = nodemap_set_fileset(nodemap_name, param);
4811                 break;
4812         default:
4813                 rc = -EINVAL;
4814         }
4815
4816         RETURN(rc);
4817 }
4818
4819 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4820                  enum lcfg_command_type cmd, char *fsname,
4821                  char *poolname, char *ostname)
4822 {
4823         struct fs_db *fsdb;
4824         char *lovname;
4825         char *logname;
4826         char *label = NULL, *canceled_label = NULL;
4827         int label_sz;
4828         struct mgs_target_info *mti = NULL;
4829         bool checked = false;
4830         bool locked = false;
4831         bool free = false;
4832         int rc, i;
4833         ENTRY;
4834
4835         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4836         if (rc) {
4837                 CERROR("Can't get db for %s\n", fsname);
4838                 RETURN(rc);
4839         }
4840         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4841                 CERROR("%s is not defined\n", fsname);
4842                 free = true;
4843                 GOTO(out_fsdb, rc = -EINVAL);
4844         }
4845
4846         label_sz = 10 + strlen(fsname) + strlen(poolname);
4847
4848         /* check if ostname match fsname */
4849         if (ostname != NULL) {
4850                 char *ptr;
4851
4852                 ptr = strrchr(ostname, '-');
4853                 if ((ptr == NULL) ||
4854                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4855                         RETURN(-EINVAL);
4856                 label_sz += strlen(ostname);
4857         }
4858
4859         OBD_ALLOC(label, label_sz);
4860         if (!label)
4861                 GOTO(out_fsdb, rc = -ENOMEM);
4862
4863         switch(cmd) {
4864         case LCFG_POOL_NEW:
4865                 sprintf(label,
4866                         "new %s.%s", fsname, poolname);
4867                 break;
4868         case LCFG_POOL_ADD:
4869                 sprintf(label,
4870                         "add %s.%s.%s", fsname, poolname, ostname);
4871                 break;
4872         case LCFG_POOL_REM:
4873                 OBD_ALLOC(canceled_label, label_sz);
4874                 if (canceled_label == NULL)
4875                         GOTO(out_label, rc = -ENOMEM);
4876                 sprintf(label,
4877                         "rem %s.%s.%s", fsname, poolname, ostname);
4878                 sprintf(canceled_label,
4879                         "add %s.%s.%s", fsname, poolname, ostname);
4880                 break;
4881         case LCFG_POOL_DEL:
4882                 OBD_ALLOC(canceled_label, label_sz);
4883                 if (canceled_label == NULL)
4884                         GOTO(out_label, rc = -ENOMEM);
4885                 sprintf(label,
4886                         "del %s.%s", fsname, poolname);
4887                 sprintf(canceled_label,
4888                         "new %s.%s", fsname, poolname);
4889                 break;
4890         default:
4891                 break;
4892         }
4893
4894         OBD_ALLOC_PTR(mti);
4895         if (mti == NULL)
4896                 GOTO(out_cancel, rc = -ENOMEM);
4897         strncpy(mti->mti_svname, "lov pool", sizeof(mti->mti_svname));
4898
4899         mutex_lock(&fsdb->fsdb_mutex);
4900         locked = true;
4901         /* write pool def to all MDT logs */
4902         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4903                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4904                         rc = name_create_mdt_and_lov(&logname, &lovname,
4905                                                      fsdb, i);
4906                         if (rc)
4907                                 GOTO(out_mti, rc);
4908
4909                         if (!checked && (canceled_label == NULL)) {
4910                                 rc = mgs_check_marker(env, mgs, fsdb, mti,
4911                                                 logname, lovname, label);
4912                                 if (rc) {
4913                                         name_destroy(&logname);
4914                                         name_destroy(&lovname);
4915                                         GOTO(out_mti,
4916                                                 rc = (rc == LLOG_PROC_BREAK ?
4917                                                         -EEXIST : rc));
4918                                 }
4919                                 checked = true;
4920                         }
4921                         if (canceled_label != NULL)
4922                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4923                                                 lovname, canceled_label,
4924                                                 CM_SKIP);
4925
4926                         if (rc >= 0)
4927                                 rc = mgs_write_log_pool(env, mgs, logname,
4928                                                         fsdb, lovname, cmd,
4929                                                         fsname, poolname,
4930                                                         ostname, label);
4931                         name_destroy(&logname);
4932                         name_destroy(&lovname);
4933                         if (rc)
4934                                 GOTO(out_mti, rc);
4935                 }
4936         }
4937
4938         rc = name_create(&logname, fsname, "-client");
4939         if (rc)
4940                 GOTO(out_mti, rc);
4941
4942         if (!checked && (canceled_label == NULL)) {
4943                 rc = mgs_check_marker(env, mgs, fsdb, mti, logname,
4944                                 fsdb->fsdb_clilov, label);
4945                 if (rc) {
4946                         name_destroy(&logname);
4947                         GOTO(out_mti, rc = (rc == LLOG_PROC_BREAK ?
4948                                 -EEXIST : rc));
4949                 }
4950         }
4951         if (canceled_label != NULL) {
4952                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4953                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4954                 if (rc < 0) {
4955                         name_destroy(&logname);
4956                         GOTO(out_mti, rc);
4957                 }
4958         }
4959
4960         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4961                                 cmd, fsname, poolname, ostname, label);
4962         mutex_unlock(&fsdb->fsdb_mutex);
4963         locked = false;
4964         name_destroy(&logname);
4965         /* request for update */
4966         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4967
4968         GOTO(out_mti, rc);
4969
4970 out_mti:
4971         if (locked)
4972                 mutex_unlock(&fsdb->fsdb_mutex);
4973         if (mti != NULL)
4974                 OBD_FREE_PTR(mti);
4975 out_cancel:
4976         if (canceled_label != NULL)
4977                 OBD_FREE(canceled_label, label_sz);
4978 out_label:
4979         OBD_FREE(label, label_sz);
4980 out_fsdb:
4981         if (free)
4982                 mgs_unlink_fsdb(mgs, fsdb);
4983         mgs_put_fsdb(mgs, fsdb);
4984
4985         return rc;
4986 }