Whamcloud - gitweb
LU-6491 llog: fix wrong return value for too long fsname
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(ERR_PTR(-EINVAL));
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(ERR_PTR(-ENOMEM));
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351         } else {
352                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
353                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
354                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
355                         CERROR("No memory for index maps\n");
356                         GOTO(err, rc = -ENOMEM);
357                 }
358
359                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
360                 if (rc)
361                         GOTO(err, rc);
362                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
363                 if (rc)
364                         GOTO(err, rc);
365
366                 /* initialise data for NID table */
367                 mgs_ir_init_fs(env, mgs, fsdb);
368
369                 lproc_mgs_add_live(mgs, fsdb);
370         }
371
372         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
373
374         RETURN(fsdb);
375 err:
376         if (fsdb->fsdb_ost_index_map)
377                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
378         if (fsdb->fsdb_mdt_index_map)
379                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
380         name_destroy(&fsdb->fsdb_clilov);
381         name_destroy(&fsdb->fsdb_clilmv);
382         OBD_FREE_PTR(fsdb);
383         RETURN(ERR_PTR(rc));
384 }
385
386 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
387 {
388         /* wait for anyone with the sem */
389         mutex_lock(&fsdb->fsdb_mutex);
390         lproc_mgs_del_live(mgs, fsdb);
391         list_del(&fsdb->fsdb_list);
392
393         /* deinitialize fsr */
394         mgs_ir_fini_fs(mgs, fsdb);
395
396         if (fsdb->fsdb_ost_index_map)
397                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
398         if (fsdb->fsdb_mdt_index_map)
399                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
400         name_destroy(&fsdb->fsdb_clilov);
401         name_destroy(&fsdb->fsdb_clilmv);
402         mgs_free_fsdb_srpc(fsdb);
403         mutex_unlock(&fsdb->fsdb_mutex);
404         OBD_FREE_PTR(fsdb);
405 }
406
407 int mgs_init_fsdb_list(struct mgs_device *mgs)
408 {
409         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
410         return 0;
411 }
412
413 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
414 {
415         struct fs_db *fsdb;
416         struct list_head *tmp, *tmp2;
417
418         mutex_lock(&mgs->mgs_mutex);
419         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
420                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
421                 mgs_free_fsdb(mgs, fsdb);
422         }
423         mutex_unlock(&mgs->mgs_mutex);
424         return 0;
425 }
426
427 int mgs_find_or_make_fsdb(const struct lu_env *env,
428                           struct mgs_device *mgs, char *name,
429                           struct fs_db **dbh)
430 {
431         struct fs_db *fsdb;
432         int rc = 0;
433
434         ENTRY;
435         mutex_lock(&mgs->mgs_mutex);
436         fsdb = mgs_find_fsdb(mgs, name);
437         if (fsdb) {
438                 mutex_unlock(&mgs->mgs_mutex);
439                 *dbh = fsdb;
440                 RETURN(0);
441         }
442
443         CDEBUG(D_MGS, "Creating new db\n");
444         fsdb = mgs_new_fsdb(env, mgs, name);
445         /* lock fsdb_mutex until the db is loaded from llogs */
446         if (!IS_ERR(fsdb))
447                 mutex_lock(&fsdb->fsdb_mutex);
448         mutex_unlock(&mgs->mgs_mutex);
449         if (IS_ERR(fsdb))
450                 RETURN(PTR_ERR(fsdb));
451
452         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
453                 /* populate the db from the client llog */
454                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
455                 if (rc) {
456                         CERROR("Can't get db from client log %d\n", rc);
457                         GOTO(out_free, rc);
458                 }
459         }
460
461         /* populate srpc rules from params llog */
462         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
463         if (rc) {
464                 CERROR("Can't get db from params log %d\n", rc);
465                 GOTO(out_free, rc);
466         }
467
468         mutex_unlock(&fsdb->fsdb_mutex);
469         *dbh = fsdb;
470
471         RETURN(0);
472
473 out_free:
474         mutex_unlock(&fsdb->fsdb_mutex);
475         mgs_free_fsdb(mgs, fsdb);
476         return rc;
477 }
478
479 /* 1 = index in use
480    0 = index unused
481    -1= empty client log */
482 int mgs_check_index(const struct lu_env *env,
483                     struct mgs_device *mgs,
484                     struct mgs_target_info *mti)
485 {
486         struct fs_db *fsdb;
487         void *imap;
488         int rc = 0;
489         ENTRY;
490
491         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
492
493         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
494         if (rc) {
495                 CERROR("Can't get db for %s\n", mti->mti_fsname);
496                 RETURN(rc);
497         }
498
499         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
500                 RETURN(-1);
501
502         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
503                 imap = fsdb->fsdb_ost_index_map;
504         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
505                 imap = fsdb->fsdb_mdt_index_map;
506         else
507                 RETURN(-EINVAL);
508
509         if (test_bit(mti->mti_stripe_index, imap))
510                 RETURN(1);
511         RETURN(0);
512 }
513
514 static __inline__ int next_index(void *index_map, int map_len)
515 {
516         int i;
517         for (i = 0; i < map_len * 8; i++)
518                 if (!test_bit(i, index_map)) {
519                          return i;
520                  }
521         CERROR("max index %d exceeded.\n", i);
522         return -1;
523 }
524
525 /* Return codes:
526         0  newly marked as in use
527         <0 err
528         +EALREADY for update of an old index */
529 static int mgs_set_index(const struct lu_env *env,
530                          struct mgs_device *mgs,
531                          struct mgs_target_info *mti)
532 {
533         struct fs_db *fsdb;
534         void *imap;
535         int rc = 0;
536         ENTRY;
537
538         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
539         if (rc) {
540                 CERROR("Can't get db for %s\n", mti->mti_fsname);
541                 RETURN(rc);
542         }
543
544         mutex_lock(&fsdb->fsdb_mutex);
545         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
546                 imap = fsdb->fsdb_ost_index_map;
547         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
548                 imap = fsdb->fsdb_mdt_index_map;
549         } else {
550                 GOTO(out_up, rc = -EINVAL);
551         }
552
553         if (mti->mti_flags & LDD_F_NEED_INDEX) {
554                 rc = next_index(imap, INDEX_MAP_SIZE);
555                 if (rc == -1)
556                         GOTO(out_up, rc = -ERANGE);
557                 mti->mti_stripe_index = rc;
558                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
559                         fsdb->fsdb_mdt_count ++;
560         }
561
562         /* the last index(0xffff) is reserved for default value. */
563         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
564                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
565                                    "but index must be less than %u.\n",
566                                    mti->mti_svname, mti->mti_stripe_index,
567                                    INDEX_MAP_SIZE * 8 - 1);
568                 GOTO(out_up, rc = -ERANGE);
569         }
570
571         if (test_bit(mti->mti_stripe_index, imap)) {
572                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
573                     !(mti->mti_flags & LDD_F_WRITECONF)) {
574                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
575                                            "%d, but that index is already in "
576                                            "use. Use --writeconf to force\n",
577                                            mti->mti_svname,
578                                            mti->mti_stripe_index);
579                         GOTO(out_up, rc = -EADDRINUSE);
580                 } else {
581                         CDEBUG(D_MGS, "Server %s updating index %d\n",
582                                mti->mti_svname, mti->mti_stripe_index);
583                         GOTO(out_up, rc = EALREADY);
584                 }
585         }
586
587         set_bit(mti->mti_stripe_index, imap);
588         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
589         mutex_unlock(&fsdb->fsdb_mutex);
590         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
591                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
592
593         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
594                mti->mti_stripe_index);
595
596         RETURN(0);
597 out_up:
598         mutex_unlock(&fsdb->fsdb_mutex);
599         return rc;
600 }
601
602 struct mgs_modify_lookup {
603         struct cfg_marker mml_marker;
604         int               mml_modified;
605 };
606
607 static int mgs_modify_handler(const struct lu_env *env,
608                               struct llog_handle *llh,
609                               struct llog_rec_hdr *rec, void *data)
610 {
611         struct mgs_modify_lookup *mml = data;
612         struct cfg_marker *marker;
613         struct lustre_cfg *lcfg = REC_DATA(rec);
614         int cfg_len = REC_DATA_LEN(rec);
615         int rc;
616         ENTRY;
617
618         if (rec->lrh_type != OBD_CFG_REC) {
619                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
620                 RETURN(-EINVAL);
621         }
622
623         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
624         if (rc) {
625                 CERROR("Insane cfg\n");
626                 RETURN(rc);
627         }
628
629         /* We only care about markers */
630         if (lcfg->lcfg_command != LCFG_MARKER)
631                 RETURN(0);
632
633         marker = lustre_cfg_buf(lcfg, 1);
634         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
635             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
636             !(marker->cm_flags & CM_SKIP)) {
637                 /* Found a non-skipped marker match */
638                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
639                        rec->lrh_index, marker->cm_step,
640                        marker->cm_flags, mml->mml_marker.cm_flags,
641                        marker->cm_tgtname, marker->cm_comment);
642                 /* Overwrite the old marker llog entry */
643                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
644                 marker->cm_flags |= mml->mml_marker.cm_flags;
645                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
646                 rc = llog_write(env, llh, rec, rec->lrh_index);
647                 if (!rc)
648                          mml->mml_modified++;
649         }
650
651         RETURN(rc);
652 }
653
654 /**
655  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
656  * Return code:
657  * 0 - modified successfully,
658  * 1 - no modification was done
659  * negative - error
660  */
661 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
662                       struct fs_db *fsdb, struct mgs_target_info *mti,
663                       char *logname, char *devname, char *comment, int flags)
664 {
665         struct llog_handle *loghandle;
666         struct llog_ctxt *ctxt;
667         struct mgs_modify_lookup *mml;
668         int rc;
669
670         ENTRY;
671
672         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
673         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
674                flags);
675
676         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
677         LASSERT(ctxt != NULL);
678         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
679         if (rc < 0) {
680                 if (rc == -ENOENT)
681                         rc = 0;
682                 GOTO(out_pop, rc);
683         }
684
685         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
686         if (rc)
687                 GOTO(out_close, rc);
688
689         if (llog_get_size(loghandle) <= 1)
690                 GOTO(out_close, rc = 0);
691
692         OBD_ALLOC_PTR(mml);
693         if (!mml)
694                 GOTO(out_close, rc = -ENOMEM);
695         if (strlcpy(mml->mml_marker.cm_comment, comment,
696                     sizeof(mml->mml_marker.cm_comment)) >=
697             sizeof(mml->mml_marker.cm_comment))
698                 GOTO(out_free, rc = -E2BIG);
699         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
700                     sizeof(mml->mml_marker.cm_tgtname)) >=
701             sizeof(mml->mml_marker.cm_tgtname))
702                 GOTO(out_free, rc = -E2BIG);
703         /* Modify mostly means cancel */
704         mml->mml_marker.cm_flags = flags;
705         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
706         mml->mml_modified = 0;
707         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
708                           NULL);
709         if (!rc && !mml->mml_modified)
710                 rc = 1;
711
712 out_free:
713         OBD_FREE_PTR(mml);
714
715 out_close:
716         llog_close(env, loghandle);
717 out_pop:
718         if (rc < 0)
719                 CERROR("%s: modify %s/%s failed: rc = %d\n",
720                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
721         llog_ctxt_put(ctxt);
722         RETURN(rc);
723 }
724
725 /** This structure is passed to mgs_replace_handler */
726 struct mgs_replace_uuid_lookup {
727         /* Nids are replaced for this target device */
728         struct mgs_target_info target;
729         /* Temporary modified llog */
730         struct llog_handle *temp_llh;
731         /* Flag is set if in target block*/
732         int in_target_device;
733         /* Nids already added. Just skip (multiple nids) */
734         int device_nids_added;
735         /* Flag is set if this block should not be copied */
736         int skip_it;
737 };
738
739 /**
740  * Check: a) if block should be skipped
741  * b) is it target block
742  *
743  * \param[in] lcfg
744  * \param[in] mrul
745  *
746  * \retval 0 should not to be skipped
747  * \retval 1 should to be skipped
748  */
749 static int check_markers(struct lustre_cfg *lcfg,
750                          struct mgs_replace_uuid_lookup *mrul)
751 {
752          struct cfg_marker *marker;
753
754         /* Track markers. Find given device */
755         if (lcfg->lcfg_command == LCFG_MARKER) {
756                 marker = lustre_cfg_buf(lcfg, 1);
757                 /* Clean llog from records marked as CM_EXCLUDE.
758                    CM_SKIP records are used for "active" command
759                    and can be restored if needed */
760                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
761                     (CM_EXCLUDE | CM_START)) {
762                         mrul->skip_it = 1;
763                         return 1;
764                 }
765
766                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
767                     (CM_EXCLUDE | CM_END)) {
768                         mrul->skip_it = 0;
769                         return 1;
770                 }
771
772                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
773                         LASSERT(!(marker->cm_flags & CM_START) ||
774                                 !(marker->cm_flags & CM_END));
775                         if (marker->cm_flags & CM_START) {
776                                 mrul->in_target_device = 1;
777                                 mrul->device_nids_added = 0;
778                         } else if (marker->cm_flags & CM_END)
779                                 mrul->in_target_device = 0;
780                 }
781         }
782
783         return 0;
784 }
785
786 static int record_base(const struct lu_env *env, struct llog_handle *llh,
787                      char *cfgname, lnet_nid_t nid, int cmd,
788                      char *s1, char *s2, char *s3, char *s4)
789 {
790         struct mgs_thread_info  *mgi = mgs_env_info(env);
791         struct llog_cfg_rec     *lcr;
792         int rc;
793
794         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
795                cmd, s1, s2, s3, s4);
796
797         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
798         if (s1)
799                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
800         if (s2)
801                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
802         if (s3)
803                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
804         if (s4)
805                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
806
807         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
808         if (lcr == NULL)
809                 return -ENOMEM;
810
811         lcr->lcr_cfg.lcfg_nid = nid;
812         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
813
814         lustre_cfg_rec_free(lcr);
815
816         if (rc < 0)
817                 CDEBUG(D_MGS,
818                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
819                        cfgname, cmd, s1, s2, s3, s4, rc);
820         return rc;
821 }
822
823 static inline int record_add_uuid(const struct lu_env *env,
824                                   struct llog_handle *llh,
825                                   uint64_t nid, char *uuid)
826 {
827         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
828                            NULL, NULL, NULL);
829 }
830
831 static inline int record_add_conn(const struct lu_env *env,
832                                   struct llog_handle *llh,
833                                   char *devname, char *uuid)
834 {
835         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
836                            NULL, NULL, NULL);
837 }
838
839 static inline int record_attach(const struct lu_env *env,
840                                 struct llog_handle *llh, char *devname,
841                                 char *type, char *uuid)
842 {
843         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
844                            NULL, NULL);
845 }
846
847 static inline int record_setup(const struct lu_env *env,
848                                struct llog_handle *llh, char *devname,
849                                char *s1, char *s2, char *s3, char *s4)
850 {
851         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
852 }
853
854 /**
855  * \retval <0 record processing error
856  * \retval n record is processed. No need copy original one.
857  * \retval 0 record is not processed.
858  */
859 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
860                            struct mgs_replace_uuid_lookup *mrul)
861 {
862         int nids_added = 0;
863         lnet_nid_t nid;
864         char *ptr;
865         int rc;
866
867         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
868                 /* LCFG_ADD_UUID command found. Let's skip original command
869                    and add passed nids */
870                 ptr = mrul->target.mti_params;
871                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
872                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
873                                "device %s\n", libcfs_nid2str(nid),
874                                 mrul->target.mti_params,
875                                 mrul->target.mti_svname);
876                         rc = record_add_uuid(env,
877                                              mrul->temp_llh, nid,
878                                              mrul->target.mti_params);
879                         if (!rc)
880                                 nids_added++;
881                 }
882
883                 if (nids_added == 0) {
884                         CERROR("No new nids were added, nid %s with uuid %s, "
885                                "device %s\n", libcfs_nid2str(nid),
886                                mrul->target.mti_params,
887                                mrul->target.mti_svname);
888                         RETURN(-ENXIO);
889                 } else {
890                         mrul->device_nids_added = 1;
891                 }
892
893                 return nids_added;
894         }
895
896         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
897                 /* LCFG_SETUP command found. UUID should be changed */
898                 rc = record_setup(env,
899                                   mrul->temp_llh,
900                                   /* devname the same */
901                                   lustre_cfg_string(lcfg, 0),
902                                   /* s1 is not changed */
903                                   lustre_cfg_string(lcfg, 1),
904                                   /* new uuid should be
905                                   the full nidlist */
906                                   mrul->target.mti_params,
907                                   /* s3 is not changed */
908                                   lustre_cfg_string(lcfg, 3),
909                                   /* s4 is not changed */
910                                   lustre_cfg_string(lcfg, 4));
911                 return rc ? rc : 1;
912         }
913
914         /* Another commands in target device block */
915         return 0;
916 }
917
918 /**
919  * Handler that called for every record in llog.
920  * Records are processed in order they placed in llog.
921  *
922  * \param[in] llh       log to be processed
923  * \param[in] rec       current record
924  * \param[in] data      mgs_replace_uuid_lookup structure
925  *
926  * \retval 0    success
927  */
928 static int mgs_replace_handler(const struct lu_env *env,
929                                struct llog_handle *llh,
930                                struct llog_rec_hdr *rec,
931                                void *data)
932 {
933         struct mgs_replace_uuid_lookup *mrul;
934         struct lustre_cfg *lcfg = REC_DATA(rec);
935         int cfg_len = REC_DATA_LEN(rec);
936         int rc;
937         ENTRY;
938
939         mrul = (struct mgs_replace_uuid_lookup *)data;
940
941         if (rec->lrh_type != OBD_CFG_REC) {
942                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
943                        rec->lrh_type, lcfg->lcfg_command,
944                        lustre_cfg_string(lcfg, 0),
945                        lustre_cfg_string(lcfg, 1));
946                 RETURN(-EINVAL);
947         }
948
949         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
950         if (rc) {
951                 /* Do not copy any invalidated records */
952                 GOTO(skip_out, rc = 0);
953         }
954
955         rc = check_markers(lcfg, mrul);
956         if (rc || mrul->skip_it)
957                 GOTO(skip_out, rc = 0);
958
959         /* Write to new log all commands outside target device block */
960         if (!mrul->in_target_device)
961                 GOTO(copy_out, rc = 0);
962
963         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
964            (failover nids) for this target, assuming that if then
965            primary is changing then so is the failover */
966         if (mrul->device_nids_added &&
967             (lcfg->lcfg_command == LCFG_ADD_UUID ||
968              lcfg->lcfg_command == LCFG_ADD_CONN))
969                 GOTO(skip_out, rc = 0);
970
971         rc = process_command(env, lcfg, mrul);
972         if (rc < 0)
973                 RETURN(rc);
974
975         if (rc)
976                 RETURN(0);
977 copy_out:
978         /* Record is placed in temporary llog as is */
979         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
980
981         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
982                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
983                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
984         RETURN(rc);
985
986 skip_out:
987         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
988                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
989                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
990         RETURN(rc);
991 }
992
993 static int mgs_log_is_empty(const struct lu_env *env,
994                             struct mgs_device *mgs, char *name)
995 {
996         struct llog_ctxt        *ctxt;
997         int                      rc;
998
999         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1000         LASSERT(ctxt != NULL);
1001
1002         rc = llog_is_empty(env, ctxt, name);
1003         llog_ctxt_put(ctxt);
1004         return rc;
1005 }
1006
1007 static int mgs_replace_nids_log(const struct lu_env *env,
1008                                 struct obd_device *mgs, struct fs_db *fsdb,
1009                                 char *logname, char *devname, char *nids)
1010 {
1011         struct llog_handle *orig_llh, *backup_llh;
1012         struct llog_ctxt *ctxt;
1013         struct mgs_replace_uuid_lookup *mrul;
1014         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1015         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1016         char *backup;
1017         int rc, rc2;
1018         ENTRY;
1019
1020         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1021
1022         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1023         LASSERT(ctxt != NULL);
1024
1025         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1026                 /* Log is empty. Nothing to replace */
1027                 GOTO(out_put, rc = 0);
1028         }
1029
1030         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1031         if (backup == NULL)
1032                 GOTO(out_put, rc = -ENOMEM);
1033
1034         sprintf(backup, "%s.bak", logname);
1035
1036         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1037         if (rc == 0) {
1038                 /* Now erase original log file. Connections are not allowed.
1039                    Backup is already saved */
1040                 rc = llog_erase(env, ctxt, NULL, logname);
1041                 if (rc < 0)
1042                         GOTO(out_free, rc);
1043         } else if (rc != -ENOENT) {
1044                 CERROR("%s: can't make backup for %s: rc = %d\n",
1045                        mgs->obd_name, logname, rc);
1046                 GOTO(out_free,rc);
1047         }
1048
1049         /* open local log */
1050         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1051         if (rc)
1052                 GOTO(out_restore, rc);
1053
1054         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1055         if (rc)
1056                 GOTO(out_closel, rc);
1057
1058         /* open backup llog */
1059         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1060                        LLOG_OPEN_EXISTS);
1061         if (rc)
1062                 GOTO(out_closel, rc);
1063
1064         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1065         if (rc)
1066                 GOTO(out_close, rc);
1067
1068         if (llog_get_size(backup_llh) <= 1)
1069                 GOTO(out_close, rc = 0);
1070
1071         OBD_ALLOC_PTR(mrul);
1072         if (!mrul)
1073                 GOTO(out_close, rc = -ENOMEM);
1074         /* devname is only needed information to replace UUID records */
1075         strlcpy(mrul->target.mti_svname, devname,
1076                 sizeof(mrul->target.mti_svname));
1077         /* parse nids later */
1078         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1079         /* Copy records to this temporary llog */
1080         mrul->temp_llh = orig_llh;
1081
1082         rc = llog_process(env, backup_llh, mgs_replace_handler,
1083                           (void *)mrul, NULL);
1084         OBD_FREE_PTR(mrul);
1085 out_close:
1086         rc2 = llog_close(NULL, backup_llh);
1087         if (!rc)
1088                 rc = rc2;
1089 out_closel:
1090         rc2 = llog_close(NULL, orig_llh);
1091         if (!rc)
1092                 rc = rc2;
1093
1094 out_restore:
1095         if (rc) {
1096                 CERROR("%s: llog should be restored: rc = %d\n",
1097                        mgs->obd_name, rc);
1098                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1099                                   logname);
1100                 if (rc2 < 0)
1101                         CERROR("%s: can't restore backup %s: rc = %d\n",
1102                                mgs->obd_name, logname, rc2);
1103         }
1104
1105 out_free:
1106         OBD_FREE(backup, strlen(backup) + 1);
1107
1108 out_put:
1109         llog_ctxt_put(ctxt);
1110
1111         if (rc)
1112                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1113                        mgs->obd_name, logname, rc);
1114
1115         RETURN(rc);
1116 }
1117
1118 /**
1119  * Parse device name and get file system name and/or device index
1120  *
1121  * \param[in]   devname device name (ex. lustre-MDT0000)
1122  * \param[out]  fsname  file system name(optional)
1123  * \param[out]  index   device index(optional)
1124  *
1125  * \retval 0    success
1126  */
1127 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1128 {
1129         int rc;
1130         ENTRY;
1131
1132         /* Extract fsname */
1133         if (fsname) {
1134                 rc = server_name2fsname(devname, fsname, NULL);
1135                 if (rc < 0) {
1136                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1137                                devname);
1138                         RETURN(-EINVAL);
1139                 }
1140         }
1141
1142         if (index) {
1143                 rc = server_name2index(devname, index, NULL);
1144                 if (rc < 0) {
1145                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1146                                devname);
1147                         RETURN(-EINVAL);
1148                 }
1149         }
1150
1151         RETURN(0);
1152 }
1153
1154 /* This is only called during replace_nids */
1155 static int only_mgs_is_running(struct obd_device *mgs_obd)
1156 {
1157         /* TDB: Is global variable with devices count exists? */
1158         int num_devices = get_devices_count();
1159         int num_exports = 0;
1160         struct obd_export *exp;
1161
1162         spin_lock(&mgs_obd->obd_dev_lock);
1163         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1164                 /* skip self export */
1165                 if (exp == mgs_obd->obd_self_export)
1166                         continue;
1167                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1168                         continue;
1169
1170                 ++num_exports;
1171
1172                 CERROR("%s: node %s still connected during replace_nids "
1173                        "connect_flags:%llx\n",
1174                        mgs_obd->obd_name,
1175                        libcfs_nid2str(exp->exp_nid_stats->nid),
1176                        exp_connect_flags(exp));
1177
1178         }
1179         spin_unlock(&mgs_obd->obd_dev_lock);
1180
1181         /* osd, MGS and MGC + self_export
1182            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1183         return (num_devices <= 3) && (num_exports == 0);
1184 }
1185
1186 static int name_create_mdt(char **logname, char *fsname, int i)
1187 {
1188         char mdt_index[9];
1189
1190         sprintf(mdt_index, "-MDT%04x", i);
1191         return name_create(logname, fsname, mdt_index);
1192 }
1193
1194 /**
1195  * Replace nids for \a device to \a nids values
1196  *
1197  * \param obd           MGS obd device
1198  * \param devname       nids need to be replaced for this device
1199  * (ex. lustre-OST0000)
1200  * \param nids          nids list (ex. nid1,nid2,nid3)
1201  *
1202  * \retval 0    success
1203  */
1204 int mgs_replace_nids(const struct lu_env *env,
1205                      struct mgs_device *mgs,
1206                      char *devname, char *nids)
1207 {
1208         /* Assume fsname is part of device name */
1209         char fsname[MTI_NAME_MAXLEN];
1210         int rc;
1211         __u32 index;
1212         char *logname;
1213         struct fs_db *fsdb;
1214         unsigned int i;
1215         int conn_state;
1216         struct obd_device *mgs_obd = mgs->mgs_obd;
1217         ENTRY;
1218
1219         /* We can only change NIDs if no other nodes are connected */
1220         spin_lock(&mgs_obd->obd_dev_lock);
1221         conn_state = mgs_obd->obd_no_conn;
1222         mgs_obd->obd_no_conn = 1;
1223         spin_unlock(&mgs_obd->obd_dev_lock);
1224
1225         /* We can not change nids if not only MGS is started */
1226         if (!only_mgs_is_running(mgs_obd)) {
1227                 CERROR("Only MGS is allowed to be started\n");
1228                 GOTO(out, rc = -EINPROGRESS);
1229         }
1230
1231         /* Get fsname and index*/
1232         rc = mgs_parse_devname(devname, fsname, &index);
1233         if (rc)
1234                 GOTO(out, rc);
1235
1236         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1237         if (rc) {
1238                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1239                 GOTO(out, rc);
1240         }
1241
1242         /* Process client llogs */
1243         name_create(&logname, fsname, "-client");
1244         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1245         name_destroy(&logname);
1246         if (rc) {
1247                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1248                        fsname, devname, rc);
1249                 GOTO(out, rc);
1250         }
1251
1252         /* Process MDT llogs */
1253         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1254                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1255                         continue;
1256                 name_create_mdt(&logname, fsname, i);
1257                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1258                 name_destroy(&logname);
1259                 if (rc)
1260                         GOTO(out, rc);
1261         }
1262
1263 out:
1264         spin_lock(&mgs_obd->obd_dev_lock);
1265         mgs_obd->obd_no_conn = conn_state;
1266         spin_unlock(&mgs_obd->obd_dev_lock);
1267
1268         RETURN(rc);
1269 }
1270
1271 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1272                             char *devname, struct lov_desc *desc)
1273 {
1274         struct mgs_thread_info  *mgi = mgs_env_info(env);
1275         struct llog_cfg_rec     *lcr;
1276         int rc;
1277
1278         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1279         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1280         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1281         if (lcr == NULL)
1282                 return -ENOMEM;
1283
1284         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1285         lustre_cfg_rec_free(lcr);
1286         return rc;
1287 }
1288
1289 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1290                             char *devname, struct lmv_desc *desc)
1291 {
1292         struct mgs_thread_info  *mgi = mgs_env_info(env);
1293         struct llog_cfg_rec     *lcr;
1294         int rc;
1295
1296         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1297         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1298         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1299         if (lcr == NULL)
1300                 return -ENOMEM;
1301
1302         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1303         lustre_cfg_rec_free(lcr);
1304         return rc;
1305 }
1306
1307 static inline int record_mdc_add(const struct lu_env *env,
1308                                  struct llog_handle *llh,
1309                                  char *logname, char *mdcuuid,
1310                                  char *mdtuuid, char *index,
1311                                  char *gen)
1312 {
1313         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1314                            mdtuuid,index,gen,mdcuuid);
1315 }
1316
1317 static inline int record_lov_add(const struct lu_env *env,
1318                                  struct llog_handle *llh,
1319                                  char *lov_name, char *ost_uuid,
1320                                  char *index, char *gen)
1321 {
1322         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1323                            ost_uuid, index, gen, NULL);
1324 }
1325
1326 static inline int record_mount_opt(const struct lu_env *env,
1327                                    struct llog_handle *llh,
1328                                    char *profile, char *lov_name,
1329                                    char *mdc_name)
1330 {
1331         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1332                            profile, lov_name, mdc_name, NULL);
1333 }
1334
1335 static int record_marker(const struct lu_env *env,
1336                          struct llog_handle *llh,
1337                          struct fs_db *fsdb, __u32 flags,
1338                          char *tgtname, char *comment)
1339 {
1340         struct mgs_thread_info *mgi = mgs_env_info(env);
1341         struct llog_cfg_rec *lcr;
1342         int rc;
1343         int cplen = 0;
1344
1345         if (flags & CM_START)
1346                 fsdb->fsdb_gen++;
1347         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1348         mgi->mgi_marker.cm_flags = flags;
1349         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1350         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1351                         sizeof(mgi->mgi_marker.cm_tgtname));
1352         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1353                 return -E2BIG;
1354         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1355                         sizeof(mgi->mgi_marker.cm_comment));
1356         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1357                 return -E2BIG;
1358         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1359         mgi->mgi_marker.cm_canceltime = 0;
1360         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1361         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1362                             sizeof(mgi->mgi_marker));
1363         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1364         if (lcr == NULL)
1365                 return -ENOMEM;
1366
1367         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1368         lustre_cfg_rec_free(lcr);
1369         return rc;
1370 }
1371
1372 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1373                             struct llog_handle **llh, char *name)
1374 {
1375         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1376         struct llog_ctxt        *ctxt;
1377         int                      rc = 0;
1378         ENTRY;
1379
1380         if (*llh)
1381                 GOTO(out, rc = -EBUSY);
1382
1383         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1384         if (!ctxt)
1385                 GOTO(out, rc = -ENODEV);
1386         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1387
1388         rc = llog_open_create(env, ctxt, llh, NULL, name);
1389         if (rc)
1390                 GOTO(out_ctxt, rc);
1391         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1392         if (rc)
1393                 llog_close(env, *llh);
1394 out_ctxt:
1395         llog_ctxt_put(ctxt);
1396 out:
1397         if (rc) {
1398                 CERROR("%s: can't start log %s: rc = %d\n",
1399                        mgs->mgs_obd->obd_name, name, rc);
1400                 *llh = NULL;
1401         }
1402         RETURN(rc);
1403 }
1404
1405 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1406 {
1407         int rc;
1408
1409         rc = llog_close(env, *llh);
1410         *llh = NULL;
1411
1412         return rc;
1413 }
1414
1415 /******************** config "macros" *********************/
1416
1417 /* write an lcfg directly into a log (with markers) */
1418 static int mgs_write_log_direct(const struct lu_env *env,
1419                                 struct mgs_device *mgs, struct fs_db *fsdb,
1420                                 char *logname, struct llog_cfg_rec *lcr,
1421                                 char *devname, char *comment)
1422 {
1423         struct llog_handle *llh = NULL;
1424         int rc;
1425
1426         ENTRY;
1427
1428         rc = record_start_log(env, mgs, &llh, logname);
1429         if (rc)
1430                 RETURN(rc);
1431
1432         /* FIXME These should be a single journal transaction */
1433         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1434         if (rc)
1435                 GOTO(out_end, rc);
1436         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1437         if (rc)
1438                 GOTO(out_end, rc);
1439         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1440         if (rc)
1441                 GOTO(out_end, rc);
1442 out_end:
1443         record_end_log(env, &llh);
1444         RETURN(rc);
1445 }
1446
1447 /* write the lcfg in all logs for the given fs */
1448 static int mgs_write_log_direct_all(const struct lu_env *env,
1449                                     struct mgs_device *mgs,
1450                                     struct fs_db *fsdb,
1451                                     struct mgs_target_info *mti,
1452                                     struct llog_cfg_rec *lcr, char *devname,
1453                                     char *comment, int server_only)
1454 {
1455         struct list_head         log_list;
1456         struct mgs_direntry     *dirent, *n;
1457         char                    *fsname = mti->mti_fsname;
1458         int                      rc = 0, len = strlen(fsname);
1459
1460         ENTRY;
1461         /* Find all the logs in the CONFIGS directory */
1462         rc = class_dentry_readdir(env, mgs, &log_list);
1463         if (rc)
1464                 RETURN(rc);
1465
1466         /* Could use fsdb index maps instead of directory listing */
1467         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1468                 list_del_init(&dirent->mde_list);
1469                 /* don't write to sptlrpc rule log */
1470                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1471                         goto next;
1472
1473                 /* caller wants write server logs only */
1474                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1475                         goto next;
1476
1477                 if (strlen(dirent->mde_name) <= len ||
1478                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1479                     dirent->mde_name[len] != '-')
1480                         goto next;
1481
1482                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1483                 /* Erase any old settings of this same parameter */
1484                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1485                                 devname, comment, CM_SKIP);
1486                 if (rc < 0)
1487                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1488                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1489                 if (lcr == NULL)
1490                         goto next;
1491                 /* Write the new one */
1492                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1493                                           lcr, devname, comment);
1494                 if (rc != 0)
1495                         CERROR("%s: writing log %s: rc = %d\n",
1496                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1497 next:
1498                 mgs_direntry_free(dirent);
1499         }
1500
1501         RETURN(rc);
1502 }
1503
1504 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1505                                     struct mgs_device *mgs,
1506                                     struct fs_db *fsdb,
1507                                     struct mgs_target_info *mti,
1508                                     int index, char *logname);
1509 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1510                                     struct mgs_device *mgs,
1511                                     struct fs_db *fsdb,
1512                                     struct mgs_target_info *mti,
1513                                     char *logname, char *suffix, char *lovname,
1514                                     enum lustre_sec_part sec_part, int flags);
1515 static int name_create_mdt_and_lov(char **logname, char **lovname,
1516                                    struct fs_db *fsdb, int i);
1517
1518 static int add_param(char *params, char *key, char *val)
1519 {
1520         char *start = params + strlen(params);
1521         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1522         int keylen = 0;
1523
1524         if (key != NULL)
1525                 keylen = strlen(key);
1526         if (start + 1 + keylen + strlen(val) >= end) {
1527                 CERROR("params are too long: %s %s%s\n",
1528                        params, key != NULL ? key : "", val);
1529                 return -EINVAL;
1530         }
1531
1532         sprintf(start, " %s%s", key != NULL ? key : "", val);
1533         return 0;
1534 }
1535
1536 /**
1537  * Walk through client config log record and convert the related records
1538  * into the target.
1539  **/
1540 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1541                                          struct llog_handle *llh,
1542                                          struct llog_rec_hdr *rec, void *data)
1543 {
1544         struct mgs_device *mgs;
1545         struct obd_device *obd;
1546         struct mgs_target_info *mti, *tmti;
1547         struct fs_db *fsdb;
1548         int cfg_len = rec->lrh_len;
1549         char *cfg_buf = (char*) (rec + 1);
1550         struct lustre_cfg *lcfg;
1551         int rc = 0;
1552         struct llog_handle *mdt_llh = NULL;
1553         static int got_an_osc_or_mdc = 0;
1554         /* 0: not found any osc/mdc;
1555            1: found osc;
1556            2: found mdc;
1557         */
1558         static int last_step = -1;
1559         int cplen = 0;
1560
1561         ENTRY;
1562
1563         mti = ((struct temp_comp*)data)->comp_mti;
1564         tmti = ((struct temp_comp*)data)->comp_tmti;
1565         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1566         obd = ((struct temp_comp *)data)->comp_obd;
1567         mgs = lu2mgs_dev(obd->obd_lu_dev);
1568         LASSERT(mgs);
1569
1570         if (rec->lrh_type != OBD_CFG_REC) {
1571                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1572                 RETURN(-EINVAL);
1573         }
1574
1575         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1576         if (rc) {
1577                 CERROR("Insane cfg\n");
1578                 RETURN(rc);
1579         }
1580
1581         lcfg = (struct lustre_cfg *)cfg_buf;
1582
1583         if (lcfg->lcfg_command == LCFG_MARKER) {
1584                 struct cfg_marker *marker;
1585                 marker = lustre_cfg_buf(lcfg, 1);
1586                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1587                     (marker->cm_flags & CM_START) &&
1588                      !(marker->cm_flags & CM_SKIP)) {
1589                         got_an_osc_or_mdc = 1;
1590                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1591                                         sizeof(tmti->mti_svname));
1592                         if (cplen >= sizeof(tmti->mti_svname))
1593                                 RETURN(-E2BIG);
1594                         rc = record_start_log(env, mgs, &mdt_llh,
1595                                               mti->mti_svname);
1596                         if (rc)
1597                                 RETURN(rc);
1598                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1599                                            mti->mti_svname, "add osc(copied)");
1600                         record_end_log(env, &mdt_llh);
1601                         last_step = marker->cm_step;
1602                         RETURN(rc);
1603                 }
1604                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1605                     (marker->cm_flags & CM_END) &&
1606                      !(marker->cm_flags & CM_SKIP)) {
1607                         LASSERT(last_step == marker->cm_step);
1608                         last_step = -1;
1609                         got_an_osc_or_mdc = 0;
1610                         memset(tmti, 0, sizeof(*tmti));
1611                         rc = record_start_log(env, mgs, &mdt_llh,
1612                                               mti->mti_svname);
1613                         if (rc)
1614                                 RETURN(rc);
1615                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1616                                            mti->mti_svname, "add osc(copied)");
1617                         record_end_log(env, &mdt_llh);
1618                         RETURN(rc);
1619                 }
1620                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1621                     (marker->cm_flags & CM_START) &&
1622                      !(marker->cm_flags & CM_SKIP)) {
1623                         got_an_osc_or_mdc = 2;
1624                         last_step = marker->cm_step;
1625                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1626                                strlen(marker->cm_tgtname));
1627
1628                         RETURN(rc);
1629                 }
1630                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1631                     (marker->cm_flags & CM_END) &&
1632                      !(marker->cm_flags & CM_SKIP)) {
1633                         LASSERT(last_step == marker->cm_step);
1634                         last_step = -1;
1635                         got_an_osc_or_mdc = 0;
1636                         memset(tmti, 0, sizeof(*tmti));
1637                         RETURN(rc);
1638                 }
1639         }
1640
1641         if (got_an_osc_or_mdc == 0 || last_step < 0)
1642                 RETURN(rc);
1643
1644         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1645                 __u64 nodenid = lcfg->lcfg_nid;
1646
1647                 if (strlen(tmti->mti_uuid) == 0) {
1648                         /* target uuid not set, this config record is before
1649                          * LCFG_SETUP, this nid is one of target node nid.
1650                          */
1651                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1652                         tmti->mti_nid_count++;
1653                 } else {
1654                         char nidstr[LNET_NIDSTR_SIZE];
1655
1656                         /* failover node nid */
1657                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1658                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1659                                         nidstr);
1660                 }
1661
1662                 RETURN(rc);
1663         }
1664
1665         if (lcfg->lcfg_command == LCFG_SETUP) {
1666                 char *target;
1667
1668                 target = lustre_cfg_string(lcfg, 1);
1669                 memcpy(tmti->mti_uuid, target, strlen(target));
1670                 RETURN(rc);
1671         }
1672
1673         /* ignore client side sptlrpc_conf_log */
1674         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1675                 RETURN(rc);
1676
1677         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1678                 int index;
1679
1680                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1681                         RETURN (-EINVAL);
1682
1683                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1684                        strlen(mti->mti_fsname));
1685                 tmti->mti_stripe_index = index;
1686
1687                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1688                                               mti->mti_stripe_index,
1689                                               mti->mti_svname);
1690                 memset(tmti, 0, sizeof(*tmti));
1691                 RETURN(rc);
1692         }
1693
1694         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1695                 int index;
1696                 char mdt_index[9];
1697                 char *logname, *lovname;
1698
1699                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1700                                              mti->mti_stripe_index);
1701                 if (rc)
1702                         RETURN(rc);
1703                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1704
1705                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1706                         name_destroy(&logname);
1707                         name_destroy(&lovname);
1708                         RETURN(-EINVAL);
1709                 }
1710
1711                 tmti->mti_stripe_index = index;
1712                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1713                                          mdt_index, lovname,
1714                                          LUSTRE_SP_MDT, 0);
1715                 name_destroy(&logname);
1716                 name_destroy(&lovname);
1717                 RETURN(rc);
1718         }
1719         RETURN(rc);
1720 }
1721
1722 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1723 /* stealed from mgs_get_fsdb_from_llog*/
1724 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1725                                               struct mgs_device *mgs,
1726                                               char *client_name,
1727                                               struct temp_comp* comp)
1728 {
1729         struct llog_handle *loghandle;
1730         struct mgs_target_info *tmti;
1731         struct llog_ctxt *ctxt;
1732         int rc;
1733
1734         ENTRY;
1735
1736         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1737         LASSERT(ctxt != NULL);
1738
1739         OBD_ALLOC_PTR(tmti);
1740         if (tmti == NULL)
1741                 GOTO(out_ctxt, rc = -ENOMEM);
1742
1743         comp->comp_tmti = tmti;
1744         comp->comp_obd = mgs->mgs_obd;
1745
1746         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1747                        LLOG_OPEN_EXISTS);
1748         if (rc < 0) {
1749                 if (rc == -ENOENT)
1750                         rc = 0;
1751                 GOTO(out_pop, rc);
1752         }
1753
1754         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1755         if (rc)
1756                 GOTO(out_close, rc);
1757
1758         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1759                                   (void *)comp, NULL, false);
1760         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1761 out_close:
1762         llog_close(env, loghandle);
1763 out_pop:
1764         OBD_FREE_PTR(tmti);
1765 out_ctxt:
1766         llog_ctxt_put(ctxt);
1767         RETURN(rc);
1768 }
1769
1770 /* lmv is the second thing for client logs */
1771 /* copied from mgs_write_log_lov. Please refer to that.  */
1772 static int mgs_write_log_lmv(const struct lu_env *env,
1773                              struct mgs_device *mgs,
1774                              struct fs_db *fsdb,
1775                              struct mgs_target_info *mti,
1776                              char *logname, char *lmvname)
1777 {
1778         struct llog_handle *llh = NULL;
1779         struct lmv_desc *lmvdesc;
1780         char *uuid;
1781         int rc = 0;
1782         ENTRY;
1783
1784         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1785
1786         OBD_ALLOC_PTR(lmvdesc);
1787         if (lmvdesc == NULL)
1788                 RETURN(-ENOMEM);
1789         lmvdesc->ld_active_tgt_count = 0;
1790         lmvdesc->ld_tgt_count = 0;
1791         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1792         uuid = (char *)lmvdesc->ld_uuid.uuid;
1793
1794         rc = record_start_log(env, mgs, &llh, logname);
1795         if (rc)
1796                 GOTO(out_free, rc);
1797         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1798         if (rc)
1799                 GOTO(out_end, rc);
1800         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1801         if (rc)
1802                 GOTO(out_end, rc);
1803         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1804         if (rc)
1805                 GOTO(out_end, rc);
1806         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1807         if (rc)
1808                 GOTO(out_end, rc);
1809 out_end:
1810         record_end_log(env, &llh);
1811 out_free:
1812         OBD_FREE_PTR(lmvdesc);
1813         RETURN(rc);
1814 }
1815
1816 /* lov is the first thing in the mdt and client logs */
1817 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1818                              struct fs_db *fsdb, struct mgs_target_info *mti,
1819                              char *logname, char *lovname)
1820 {
1821         struct llog_handle *llh = NULL;
1822         struct lov_desc *lovdesc;
1823         char *uuid;
1824         int rc = 0;
1825         ENTRY;
1826
1827         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1828
1829         /*
1830         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1831         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1832               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1833         */
1834
1835         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1836         OBD_ALLOC_PTR(lovdesc);
1837         if (lovdesc == NULL)
1838                 RETURN(-ENOMEM);
1839         lovdesc->ld_magic = LOV_DESC_MAGIC;
1840         lovdesc->ld_tgt_count = 0;
1841         /* Defaults.  Can be changed later by lcfg config_param */
1842         lovdesc->ld_default_stripe_count = 1;
1843         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1844         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1845         lovdesc->ld_default_stripe_offset = -1;
1846         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1847         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1848         /* can these be the same? */
1849         uuid = (char *)lovdesc->ld_uuid.uuid;
1850
1851         /* This should always be the first entry in a log.
1852         rc = mgs_clear_log(obd, logname); */
1853         rc = record_start_log(env, mgs, &llh, logname);
1854         if (rc)
1855                 GOTO(out_free, rc);
1856         /* FIXME these should be a single journal transaction */
1857         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1858         if (rc)
1859                 GOTO(out_end, rc);
1860         rc = record_attach(env, llh, lovname, "lov", uuid);
1861         if (rc)
1862                 GOTO(out_end, rc);
1863         rc = record_lov_setup(env, llh, lovname, lovdesc);
1864         if (rc)
1865                 GOTO(out_end, rc);
1866         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1867         if (rc)
1868                 GOTO(out_end, rc);
1869         EXIT;
1870 out_end:
1871         record_end_log(env, &llh);
1872 out_free:
1873         OBD_FREE_PTR(lovdesc);
1874         return rc;
1875 }
1876
1877 /* add failnids to open log */
1878 static int mgs_write_log_failnids(const struct lu_env *env,
1879                                   struct mgs_target_info *mti,
1880                                   struct llog_handle *llh,
1881                                   char *cliname)
1882 {
1883         char *failnodeuuid = NULL;
1884         char *ptr = mti->mti_params;
1885         lnet_nid_t nid;
1886         int rc = 0;
1887
1888         /*
1889         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1890         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1891         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1892         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1893         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1894         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1895         */
1896
1897         /*
1898          * Pull failnid info out of params string, which may contain something
1899          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1900          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1901          * etc.  However, convert_hostnames() should have caught those.
1902          */
1903         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1904                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1905                         char nidstr[LNET_NIDSTR_SIZE];
1906
1907                         if (failnodeuuid == NULL) {
1908                                 /* We don't know the failover node name,
1909                                  * so just use the first nid as the uuid */
1910                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
1911                                 rc = name_create(&failnodeuuid, nidstr, "");
1912                                 if (rc != 0)
1913                                         return rc;
1914                         }
1915                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1916                                 "client %s\n",
1917                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
1918                                 failnodeuuid, cliname);
1919                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1920                         /*
1921                          * If *ptr is ':', we have added all NIDs for
1922                          * failnodeuuid.
1923                          */
1924                         if (*ptr == ':') {
1925                                 rc = record_add_conn(env, llh, cliname,
1926                                                      failnodeuuid);
1927                                 name_destroy(&failnodeuuid);
1928                                 failnodeuuid = NULL;
1929                         }
1930                 }
1931                 if (failnodeuuid) {
1932                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1933                         name_destroy(&failnodeuuid);
1934                         failnodeuuid = NULL;
1935                 }
1936         }
1937
1938         return rc;
1939 }
1940
1941 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1942                                     struct mgs_device *mgs,
1943                                     struct fs_db *fsdb,
1944                                     struct mgs_target_info *mti,
1945                                     char *logname, char *lmvname)
1946 {
1947         struct llog_handle *llh = NULL;
1948         char *mdcname = NULL;
1949         char *nodeuuid = NULL;
1950         char *mdcuuid = NULL;
1951         char *lmvuuid = NULL;
1952         char index[6];
1953         char nidstr[LNET_NIDSTR_SIZE];
1954         int i, rc;
1955         ENTRY;
1956
1957         if (mgs_log_is_empty(env, mgs, logname)) {
1958                 CERROR("log is empty! Logical error\n");
1959                 RETURN(-EINVAL);
1960         }
1961
1962         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1963                mti->mti_svname, logname, lmvname);
1964
1965         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
1966         rc = name_create(&nodeuuid, nidstr, "");
1967         if (rc)
1968                 RETURN(rc);
1969         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1970         if (rc)
1971                 GOTO(out_free, rc);
1972         rc = name_create(&mdcuuid, mdcname, "_UUID");
1973         if (rc)
1974                 GOTO(out_free, rc);
1975         rc = name_create(&lmvuuid, lmvname, "_UUID");
1976         if (rc)
1977                 GOTO(out_free, rc);
1978
1979         rc = record_start_log(env, mgs, &llh, logname);
1980         if (rc)
1981                 GOTO(out_free, rc);
1982         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1983                            "add mdc");
1984         if (rc)
1985                 GOTO(out_end, rc);
1986         for (i = 0; i < mti->mti_nid_count; i++) {
1987                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1988                         libcfs_nid2str_r(mti->mti_nids[i],
1989                                          nidstr, sizeof(nidstr)));
1990
1991                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1992                 if (rc)
1993                         GOTO(out_end, rc);
1994         }
1995
1996         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1997         if (rc)
1998                 GOTO(out_end, rc);
1999         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2000                           NULL, NULL);
2001         if (rc)
2002                 GOTO(out_end, rc);
2003         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2004         if (rc)
2005                 GOTO(out_end, rc);
2006         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2007         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2008                             index, "1");
2009         if (rc)
2010                 GOTO(out_end, rc);
2011         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2012                            "add mdc");
2013         if (rc)
2014                 GOTO(out_end, rc);
2015 out_end:
2016         record_end_log(env, &llh);
2017 out_free:
2018         name_destroy(&lmvuuid);
2019         name_destroy(&mdcuuid);
2020         name_destroy(&mdcname);
2021         name_destroy(&nodeuuid);
2022         RETURN(rc);
2023 }
2024
2025 static inline int name_create_lov(char **lovname, char *mdtname,
2026                                   struct fs_db *fsdb, int index)
2027 {
2028         /* COMPAT_180 */
2029         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2030                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2031         else
2032                 return name_create(lovname, mdtname, "-mdtlov");
2033 }
2034
2035 static int name_create_mdt_and_lov(char **logname, char **lovname,
2036                                    struct fs_db *fsdb, int i)
2037 {
2038         int rc;
2039
2040         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2041         if (rc)
2042                 return rc;
2043         /* COMPAT_180 */
2044         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2045                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2046         else
2047                 rc = name_create(lovname, *logname, "-mdtlov");
2048         if (rc) {
2049                 name_destroy(logname);
2050                 *logname = NULL;
2051         }
2052         return rc;
2053 }
2054
2055 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2056                                       struct fs_db *fsdb, int i)
2057 {
2058         char suffix[16];
2059
2060         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2061                 sprintf(suffix, "-osc");
2062         else
2063                 sprintf(suffix, "-osc-MDT%04x", i);
2064         return name_create(oscname, ostname, suffix);
2065 }
2066
2067 /* add new mdc to already existent MDS */
2068 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2069                                     struct mgs_device *mgs,
2070                                     struct fs_db *fsdb,
2071                                     struct mgs_target_info *mti,
2072                                     int mdt_index, char *logname)
2073 {
2074         struct llog_handle      *llh = NULL;
2075         char    *nodeuuid = NULL;
2076         char    *ospname = NULL;
2077         char    *lovuuid = NULL;
2078         char    *mdtuuid = NULL;
2079         char    *svname = NULL;
2080         char    *mdtname = NULL;
2081         char    *lovname = NULL;
2082         char    index_str[16];
2083         char    nidstr[LNET_NIDSTR_SIZE];
2084         int     i, rc;
2085
2086         ENTRY;
2087         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2088                 CERROR("log is empty! Logical error\n");
2089                 RETURN (-EINVAL);
2090         }
2091
2092         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2093                logname);
2094
2095         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2096         if (rc)
2097                 RETURN(rc);
2098
2099         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2100         rc = name_create(&nodeuuid, nidstr, "");
2101         if (rc)
2102                 GOTO(out_destory, rc);
2103
2104         rc = name_create(&svname, mdtname, "-osp");
2105         if (rc)
2106                 GOTO(out_destory, rc);
2107
2108         sprintf(index_str, "-MDT%04x", mdt_index);
2109         rc = name_create(&ospname, svname, index_str);
2110         if (rc)
2111                 GOTO(out_destory, rc);
2112
2113         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2114         if (rc)
2115                 GOTO(out_destory, rc);
2116
2117         rc = name_create(&lovuuid, lovname, "_UUID");
2118         if (rc)
2119                 GOTO(out_destory, rc);
2120
2121         rc = name_create(&mdtuuid, mdtname, "_UUID");
2122         if (rc)
2123                 GOTO(out_destory, rc);
2124
2125         rc = record_start_log(env, mgs, &llh, logname);
2126         if (rc)
2127                 GOTO(out_destory, rc);
2128
2129         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2130                            "add osp");
2131         if (rc)
2132                 GOTO(out_destory, rc);
2133
2134         for (i = 0; i < mti->mti_nid_count; i++) {
2135                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2136                         libcfs_nid2str_r(mti->mti_nids[i],
2137                                          nidstr, sizeof(nidstr)));
2138                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2139                 if (rc)
2140                         GOTO(out_end, rc);
2141         }
2142
2143         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2144         if (rc)
2145                 GOTO(out_end, rc);
2146
2147         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2148                           NULL, NULL);
2149         if (rc)
2150                 GOTO(out_end, rc);
2151
2152         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2153         if (rc)
2154                 GOTO(out_end, rc);
2155
2156         /* Add mdc(osp) to lod */
2157         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2158                  mti->mti_stripe_index);
2159         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2160                          index_str, "1", NULL);
2161         if (rc)
2162                 GOTO(out_end, rc);
2163
2164         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2165         if (rc)
2166                 GOTO(out_end, rc);
2167
2168 out_end:
2169         record_end_log(env, &llh);
2170
2171 out_destory:
2172         name_destroy(&mdtuuid);
2173         name_destroy(&lovuuid);
2174         name_destroy(&lovname);
2175         name_destroy(&ospname);
2176         name_destroy(&svname);
2177         name_destroy(&nodeuuid);
2178         name_destroy(&mdtname);
2179         RETURN(rc);
2180 }
2181
2182 static int mgs_write_log_mdt0(const struct lu_env *env,
2183                               struct mgs_device *mgs,
2184                               struct fs_db *fsdb,
2185                               struct mgs_target_info *mti)
2186 {
2187         char *log = mti->mti_svname;
2188         struct llog_handle *llh = NULL;
2189         char *uuid, *lovname;
2190         char mdt_index[6];
2191         char *ptr = mti->mti_params;
2192         int rc = 0, failout = 0;
2193         ENTRY;
2194
2195         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2196         if (uuid == NULL)
2197                 RETURN(-ENOMEM);
2198
2199         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2200                 failout = (strncmp(ptr, "failout", 7) == 0);
2201
2202         rc = name_create(&lovname, log, "-mdtlov");
2203         if (rc)
2204                 GOTO(out_free, rc);
2205         if (mgs_log_is_empty(env, mgs, log)) {
2206                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2207                 if (rc)
2208                         GOTO(out_lod, rc);
2209         }
2210
2211         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2212
2213         rc = record_start_log(env, mgs, &llh, log);
2214         if (rc)
2215                 GOTO(out_lod, rc);
2216
2217         /* add MDT itself */
2218
2219         /* FIXME this whole fn should be a single journal transaction */
2220         sprintf(uuid, "%s_UUID", log);
2221         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2222         if (rc)
2223                 GOTO(out_lod, rc);
2224         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2225         if (rc)
2226                 GOTO(out_end, rc);
2227         rc = record_mount_opt(env, llh, log, lovname, NULL);
2228         if (rc)
2229                 GOTO(out_end, rc);
2230         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2231                         failout ? "n" : "f");
2232         if (rc)
2233                 GOTO(out_end, rc);
2234         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2235         if (rc)
2236                 GOTO(out_end, rc);
2237 out_end:
2238         record_end_log(env, &llh);
2239 out_lod:
2240         name_destroy(&lovname);
2241 out_free:
2242         OBD_FREE(uuid, sizeof(struct obd_uuid));
2243         RETURN(rc);
2244 }
2245
2246 /* envelope method for all layers log */
2247 static int mgs_write_log_mdt(const struct lu_env *env,
2248                              struct mgs_device *mgs,
2249                              struct fs_db *fsdb,
2250                              struct mgs_target_info *mti)
2251 {
2252         struct mgs_thread_info *mgi = mgs_env_info(env);
2253         struct llog_handle *llh = NULL;
2254         char *cliname;
2255         int rc, i = 0;
2256         ENTRY;
2257
2258         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2259
2260         if (mti->mti_uuid[0] == '\0') {
2261                 /* Make up our own uuid */
2262                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2263                          "%s_UUID", mti->mti_svname);
2264         }
2265
2266         /* add mdt */
2267         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2268         if (rc)
2269                 RETURN(rc);
2270         /* Append the mdt info to the client log */
2271         rc = name_create(&cliname, mti->mti_fsname, "-client");
2272         if (rc)
2273                 RETURN(rc);
2274
2275         if (mgs_log_is_empty(env, mgs, cliname)) {
2276                 /* Start client log */
2277                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2278                                        fsdb->fsdb_clilov);
2279                 if (rc)
2280                         GOTO(out_free, rc);
2281                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2282                                        fsdb->fsdb_clilmv);
2283                 if (rc)
2284                         GOTO(out_free, rc);
2285         }
2286
2287         /*
2288         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2289         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2290         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2291         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2292         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2293         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2294         */
2295
2296                 /* copy client info about lov/lmv */
2297                 mgi->mgi_comp.comp_mti = mti;
2298                 mgi->mgi_comp.comp_fsdb = fsdb;
2299
2300                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2301                                                         &mgi->mgi_comp);
2302                 if (rc)
2303                         GOTO(out_free, rc);
2304                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2305                                               fsdb->fsdb_clilmv);
2306                 if (rc)
2307                         GOTO(out_free, rc);
2308
2309                 /* add mountopts */
2310                 rc = record_start_log(env, mgs, &llh, cliname);
2311                 if (rc)
2312                         GOTO(out_free, rc);
2313
2314                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2315                                    "mount opts");
2316                 if (rc)
2317                         GOTO(out_end, rc);
2318                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2319                                       fsdb->fsdb_clilmv);
2320                 if (rc)
2321                         GOTO(out_end, rc);
2322                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2323                                    "mount opts");
2324
2325         if (rc)
2326                 GOTO(out_end, rc);
2327
2328         /* for_all_existing_mdt except current one */
2329         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2330                 if (i !=  mti->mti_stripe_index &&
2331                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2332                         char *logname;
2333
2334                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2335                         if (rc)
2336                                 GOTO(out_end, rc);
2337
2338                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2339                                                       i, logname);
2340                         name_destroy(&logname);
2341                         if (rc)
2342                                 GOTO(out_end, rc);
2343                 }
2344         }
2345 out_end:
2346         record_end_log(env, &llh);
2347 out_free:
2348         name_destroy(&cliname);
2349         RETURN(rc);
2350 }
2351
2352 /* Add the ost info to the client/mdt lov */
2353 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2354                                     struct mgs_device *mgs, struct fs_db *fsdb,
2355                                     struct mgs_target_info *mti,
2356                                     char *logname, char *suffix, char *lovname,
2357                                     enum lustre_sec_part sec_part, int flags)
2358 {
2359         struct llog_handle *llh = NULL;
2360         char *nodeuuid = NULL;
2361         char *oscname = NULL;
2362         char *oscuuid = NULL;
2363         char *lovuuid = NULL;
2364         char *svname = NULL;
2365         char index[6];
2366         char nidstr[LNET_NIDSTR_SIZE];
2367         int i, rc;
2368         ENTRY;
2369
2370         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2371                 mti->mti_svname, logname);
2372
2373         if (mgs_log_is_empty(env, mgs, logname)) {
2374                 CERROR("log is empty! Logical error\n");
2375                 RETURN(-EINVAL);
2376         }
2377
2378         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2379         rc = name_create(&nodeuuid, nidstr, "");
2380         if (rc)
2381                 RETURN(rc);
2382         rc = name_create(&svname, mti->mti_svname, "-osc");
2383         if (rc)
2384                 GOTO(out_free, rc);
2385
2386         /* for the system upgraded from old 1.8, keep using the old osc naming
2387          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2388         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2389                 rc = name_create(&oscname, svname, "");
2390         else
2391                 rc = name_create(&oscname, svname, suffix);
2392         if (rc)
2393                 GOTO(out_free, rc);
2394
2395         rc = name_create(&oscuuid, oscname, "_UUID");
2396         if (rc)
2397                 GOTO(out_free, rc);
2398         rc = name_create(&lovuuid, lovname, "_UUID");
2399         if (rc)
2400                 GOTO(out_free, rc);
2401
2402
2403         /*
2404         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2405         multihomed (#4)
2406         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2407         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2408         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2409         failover (#6,7)
2410         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2411         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2412         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2413         */
2414
2415         rc = record_start_log(env, mgs, &llh, logname);
2416         if (rc)
2417                 GOTO(out_free, rc);
2418
2419         /* FIXME these should be a single journal transaction */
2420         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2421                            "add osc");
2422         if (rc)
2423                 GOTO(out_end, rc);
2424
2425         /* NB: don't change record order, because upon MDT steal OSC config
2426          * from client, it treats all nids before LCFG_SETUP as target nids
2427          * (multiple interfaces), while nids after as failover node nids.
2428          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2429          */
2430         for (i = 0; i < mti->mti_nid_count; i++) {
2431                 CDEBUG(D_MGS, "add nid %s\n",
2432                         libcfs_nid2str_r(mti->mti_nids[i],
2433                                          nidstr, sizeof(nidstr)));
2434                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2435                 if (rc)
2436                         GOTO(out_end, rc);
2437         }
2438         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2439         if (rc)
2440                 GOTO(out_end, rc);
2441         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2442                           NULL, NULL);
2443         if (rc)
2444                 GOTO(out_end, rc);
2445         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2446         if (rc)
2447                 GOTO(out_end, rc);
2448
2449         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2450
2451         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2452         if (rc)
2453                 GOTO(out_end, rc);
2454         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2455                            "add osc");
2456         if (rc)
2457                 GOTO(out_end, rc);
2458 out_end:
2459         record_end_log(env, &llh);
2460 out_free:
2461         name_destroy(&lovuuid);
2462         name_destroy(&oscuuid);
2463         name_destroy(&oscname);
2464         name_destroy(&svname);
2465         name_destroy(&nodeuuid);
2466         RETURN(rc);
2467 }
2468
2469 static int mgs_write_log_ost(const struct lu_env *env,
2470                              struct mgs_device *mgs, struct fs_db *fsdb,
2471                              struct mgs_target_info *mti)
2472 {
2473         struct llog_handle *llh = NULL;
2474         char *logname, *lovname;
2475         char *ptr = mti->mti_params;
2476         int rc, flags = 0, failout = 0, i;
2477         ENTRY;
2478
2479         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2480
2481         /* The ost startup log */
2482
2483         /* If the ost log already exists, that means that someone reformatted
2484            the ost and it called target_add again. */
2485         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2486                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2487                                    "exists, yet the server claims it never "
2488                                    "registered. It may have been reformatted, "
2489                                    "or the index changed. writeconf the MDT to "
2490                                    "regenerate all logs.\n", mti->mti_svname);
2491                 RETURN(-EALREADY);
2492         }
2493
2494         /*
2495         attach obdfilter ost1 ost1_UUID
2496         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2497         */
2498         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2499                 failout = (strncmp(ptr, "failout", 7) == 0);
2500         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2501         if (rc)
2502                 RETURN(rc);
2503         /* FIXME these should be a single journal transaction */
2504         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2505         if (rc)
2506                 GOTO(out_end, rc);
2507         if (*mti->mti_uuid == '\0')
2508                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2509                          "%s_UUID", mti->mti_svname);
2510         rc = record_attach(env, llh, mti->mti_svname,
2511                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2512         if (rc)
2513                 GOTO(out_end, rc);
2514         rc = record_setup(env, llh, mti->mti_svname,
2515                           "dev"/*ignored*/, "type"/*ignored*/,
2516                           failout ? "n" : "f", NULL/*options*/);
2517         if (rc)
2518                 GOTO(out_end, rc);
2519         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2520         if (rc)
2521                 GOTO(out_end, rc);
2522 out_end:
2523         record_end_log(env, &llh);
2524         if (rc)
2525                 RETURN(rc);
2526         /* We also have to update the other logs where this osc is part of
2527            the lov */
2528
2529         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2530                 /* If we're upgrading, the old mdt log already has our
2531                    entry. Let's do a fake one for fun. */
2532                 /* Note that we can't add any new failnids, since we don't
2533                    know the old osc names. */
2534                 flags = CM_SKIP | CM_UPGRADE146;
2535
2536         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2537                 /* If the update flag isn't set, don't update client/mdt
2538                    logs. */
2539                 flags |= CM_SKIP;
2540                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2541                               "the MDT first to regenerate it.\n",
2542                               mti->mti_svname);
2543         }
2544
2545         /* Add ost to all MDT lov defs */
2546         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2547                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2548                         char mdt_index[9];
2549
2550                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2551                                                      i);
2552                         if (rc)
2553                                 RETURN(rc);
2554                         sprintf(mdt_index, "-MDT%04x", i);
2555                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2556                                                       logname, mdt_index,
2557                                                       lovname, LUSTRE_SP_MDT,
2558                                                       flags);
2559                         name_destroy(&logname);
2560                         name_destroy(&lovname);
2561                         if (rc)
2562                                 RETURN(rc);
2563                 }
2564         }
2565
2566         /* Append ost info to the client log */
2567         rc = name_create(&logname, mti->mti_fsname, "-client");
2568         if (rc)
2569                 RETURN(rc);
2570         if (mgs_log_is_empty(env, mgs, logname)) {
2571                 /* Start client log */
2572                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2573                                        fsdb->fsdb_clilov);
2574                 if (rc)
2575                         GOTO(out_free, rc);
2576                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2577                                        fsdb->fsdb_clilmv);
2578                 if (rc)
2579                         GOTO(out_free, rc);
2580         }
2581         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2582                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2583 out_free:
2584         name_destroy(&logname);
2585         RETURN(rc);
2586 }
2587
2588 static __inline__ int mgs_param_empty(char *ptr)
2589 {
2590         char *tmp;
2591
2592         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2593                 return 1;
2594         return 0;
2595 }
2596
2597 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2598                                           struct mgs_device *mgs,
2599                                           struct fs_db *fsdb,
2600                                           struct mgs_target_info *mti,
2601                                           char *logname, char *cliname)
2602 {
2603         int rc;
2604         struct llog_handle *llh = NULL;
2605
2606         if (mgs_param_empty(mti->mti_params)) {
2607                 /* Remove _all_ failnids */
2608                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2609                                 mti->mti_svname, "add failnid", CM_SKIP);
2610                 return rc < 0 ? rc : 0;
2611         }
2612
2613         /* Otherwise failover nids are additive */
2614         rc = record_start_log(env, mgs, &llh, logname);
2615         if (rc)
2616                 return rc;
2617                 /* FIXME this should be a single journal transaction */
2618         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2619                            "add failnid");
2620         if (rc)
2621                 goto out_end;
2622         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2623         if (rc)
2624                 goto out_end;
2625         rc = record_marker(env, llh, fsdb, CM_END,
2626                            mti->mti_svname, "add failnid");
2627 out_end:
2628         record_end_log(env, &llh);
2629         return rc;
2630 }
2631
2632
2633 /* Add additional failnids to an existing log.
2634    The mdc/osc must have been added to logs first */
2635 /* tcp nids must be in dotted-quad ascii -
2636    we can't resolve hostnames from the kernel. */
2637 static int mgs_write_log_add_failnid(const struct lu_env *env,
2638                                      struct mgs_device *mgs,
2639                                      struct fs_db *fsdb,
2640                                      struct mgs_target_info *mti)
2641 {
2642         char *logname, *cliname;
2643         int rc;
2644         ENTRY;
2645
2646         /* FIXME we currently can't erase the failnids
2647          * given when a target first registers, since they aren't part of
2648          * an "add uuid" stanza */
2649
2650         /* Verify that we know about this target */
2651         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2652                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2653                                    "yet. It must be started before failnids "
2654                                    "can be added.\n", mti->mti_svname);
2655                 RETURN(-ENOENT);
2656         }
2657
2658         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2659         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2660                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2661         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2662                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2663         } else {
2664                 RETURN(-EINVAL);
2665         }
2666         if (rc)
2667                 RETURN(rc);
2668         /* Add failover nids to the client log */
2669         rc = name_create(&logname, mti->mti_fsname, "-client");
2670         if (rc) {
2671                 name_destroy(&cliname);
2672                 RETURN(rc);
2673         }
2674         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2675         name_destroy(&logname);
2676         name_destroy(&cliname);
2677         if (rc)
2678                 RETURN(rc);
2679
2680         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2681                 /* Add OST failover nids to the MDT logs as well */
2682                 int i;
2683
2684                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2685                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2686                                 continue;
2687                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2688                         if (rc)
2689                                 RETURN(rc);
2690                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2691                                                  fsdb, i);
2692                         if (rc) {
2693                                 name_destroy(&logname);
2694                                 RETURN(rc);
2695                         }
2696                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2697                                                             mti, logname,
2698                                                             cliname);
2699                         name_destroy(&cliname);
2700                         name_destroy(&logname);
2701                         if (rc)
2702                                 RETURN(rc);
2703                 }
2704         }
2705
2706         RETURN(rc);
2707 }
2708
2709 static int mgs_wlp_lcfg(const struct lu_env *env,
2710                         struct mgs_device *mgs, struct fs_db *fsdb,
2711                         struct mgs_target_info *mti,
2712                         char *logname, struct lustre_cfg_bufs *bufs,
2713                         char *tgtname, char *ptr)
2714 {
2715         char comment[MTI_NAME_MAXLEN];
2716         char *tmp;
2717         struct llog_cfg_rec *lcr;
2718         int rc, del;
2719
2720         /* Erase any old settings of this same parameter */
2721         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2722         comment[MTI_NAME_MAXLEN - 1] = 0;
2723         /* But don't try to match the value. */
2724         tmp = strchr(comment, '=');
2725         if (tmp != NULL)
2726                 *tmp = 0;
2727         /* FIXME we should skip settings that are the same as old values */
2728         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2729         if (rc < 0)
2730                 return rc;
2731         del = mgs_param_empty(ptr);
2732
2733         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2734                       "Setting" : "Modifying", tgtname, comment, logname);
2735         if (del) {
2736                 /* mgs_modify() will return 1 if nothing had to be done */
2737                 if (rc == 1)
2738                         rc = 0;
2739                 return rc;
2740         }
2741
2742         lustre_cfg_bufs_reset(bufs, tgtname);
2743         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2744         if (mti->mti_flags & LDD_F_PARAM2)
2745                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2746
2747         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2748                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2749         if (lcr == NULL)
2750                 return -ENOMEM;
2751
2752         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2753                                   comment);
2754         lustre_cfg_rec_free(lcr);
2755         return rc;
2756 }
2757
2758 static int mgs_write_log_param2(const struct lu_env *env,
2759                                 struct mgs_device *mgs,
2760                                 struct fs_db *fsdb,
2761                                 struct mgs_target_info *mti, char *ptr)
2762 {
2763         struct lustre_cfg_bufs  bufs;
2764         int                     rc = 0;
2765         ENTRY;
2766
2767         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2768         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2769                           mti->mti_svname, ptr);
2770
2771         RETURN(rc);
2772 }
2773
2774 /* write global variable settings into log */
2775 static int mgs_write_log_sys(const struct lu_env *env,
2776                              struct mgs_device *mgs, struct fs_db *fsdb,
2777                              struct mgs_target_info *mti, char *sys, char *ptr)
2778 {
2779         struct mgs_thread_info  *mgi = mgs_env_info(env);
2780         struct lustre_cfg       *lcfg;
2781         struct llog_cfg_rec     *lcr;
2782         char *tmp, sep;
2783         int rc, cmd, convert = 1;
2784
2785         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2786                 cmd = LCFG_SET_TIMEOUT;
2787         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2788                 cmd = LCFG_SET_LDLM_TIMEOUT;
2789         /* Check for known params here so we can return error to lctl */
2790         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2791                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2792                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2793                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2794                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2795                 cmd = LCFG_PARAM;
2796         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2797                 convert = 0; /* Don't convert string value to integer */
2798                 cmd = LCFG_PARAM;
2799         } else {
2800                 return -EINVAL;
2801         }
2802
2803         if (mgs_param_empty(ptr))
2804                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2805         else
2806                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2807
2808         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2809         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2810         if (!convert && *tmp != '\0')
2811                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2812         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2813         if (lcr == NULL)
2814                 return -ENOMEM;
2815
2816         lcfg = &lcr->lcr_cfg;
2817         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2818         /* truncate the comment to the parameter name */
2819         ptr = tmp - 1;
2820         sep = *ptr;
2821         *ptr = '\0';
2822         /* modify all servers and clients */
2823         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2824                                       *tmp == '\0' ? NULL : lcr,
2825                                       mti->mti_fsname, sys, 0);
2826         if (rc == 0 && *tmp != '\0') {
2827                 switch (cmd) {
2828                 case LCFG_SET_TIMEOUT:
2829                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2830                                 class_process_config(lcfg);
2831                         break;
2832                 case LCFG_SET_LDLM_TIMEOUT:
2833                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2834                                 class_process_config(lcfg);
2835                         break;
2836                 default:
2837                         break;
2838                 }
2839         }
2840         *ptr = sep;
2841         lustre_cfg_rec_free(lcr);
2842         return rc;
2843 }
2844
2845 /* write quota settings into log */
2846 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2847                                struct fs_db *fsdb, struct mgs_target_info *mti,
2848                                char *quota, char *ptr)
2849 {
2850         struct mgs_thread_info  *mgi = mgs_env_info(env);
2851         struct llog_cfg_rec     *lcr;
2852         char                    *tmp;
2853         char                     sep;
2854         int                      rc, cmd = LCFG_PARAM;
2855
2856         /* support only 'meta' and 'data' pools so far */
2857         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2858             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2859                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2860                        "& quota.ost are)\n", ptr);
2861                 return -EINVAL;
2862         }
2863
2864         if (*tmp == '\0') {
2865                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2866         } else {
2867                 CDEBUG(D_MGS, "global '%s'\n", quota);
2868
2869                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2870                     strcmp(tmp, "none") != 0) {
2871                         CERROR("enable option(%s) isn't supported\n", tmp);
2872                         return -EINVAL;
2873                 }
2874         }
2875
2876         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2877         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2878         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2879         if (lcr == NULL)
2880                 return -ENOMEM;
2881
2882         /* truncate the comment to the parameter name */
2883         ptr = tmp - 1;
2884         sep = *ptr;
2885         *ptr = '\0';
2886
2887         /* XXX we duplicated quota enable information in all server
2888          *     config logs, it should be moved to a separate config
2889          *     log once we cleanup the config log for global param. */
2890         /* modify all servers */
2891         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2892                                       *tmp == '\0' ? NULL : lcr,
2893                                       mti->mti_fsname, quota, 1);
2894         *ptr = sep;
2895         lustre_cfg_rec_free(lcr);
2896         return rc < 0 ? rc : 0;
2897 }
2898
2899 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2900                                    struct mgs_device *mgs,
2901                                    struct fs_db *fsdb,
2902                                    struct mgs_target_info *mti,
2903                                    char *param)
2904 {
2905         struct mgs_thread_info  *mgi = mgs_env_info(env);
2906         struct llog_cfg_rec     *lcr;
2907         struct llog_handle      *llh = NULL;
2908         char                    *logname;
2909         char                    *comment, *ptr;
2910         int                      rc, len;
2911
2912         ENTRY;
2913
2914         /* get comment */
2915         ptr = strchr(param, '=');
2916         LASSERT(ptr != NULL);
2917         len = ptr - param;
2918
2919         OBD_ALLOC(comment, len + 1);
2920         if (comment == NULL)
2921                 RETURN(-ENOMEM);
2922         strncpy(comment, param, len);
2923         comment[len] = '\0';
2924
2925         /* prepare lcfg */
2926         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2927         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2928         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2929         if (lcr == NULL)
2930                 GOTO(out_comment, rc = -ENOMEM);
2931
2932         /* construct log name */
2933         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2934         if (rc < 0)
2935                 GOTO(out_lcfg, rc);
2936
2937         if (mgs_log_is_empty(env, mgs, logname)) {
2938                 rc = record_start_log(env, mgs, &llh, logname);
2939                 if (rc < 0)
2940                         GOTO(out, rc);
2941                 record_end_log(env, &llh);
2942         }
2943
2944         /* obsolete old one */
2945         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2946                         comment, CM_SKIP);
2947         if (rc < 0)
2948                 GOTO(out, rc);
2949         /* write the new one */
2950         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2951                                   mti->mti_svname, comment);
2952         if (rc)
2953                 CERROR("%s: error writing log %s: rc = %d\n",
2954                        mgs->mgs_obd->obd_name, logname, rc);
2955 out:
2956         name_destroy(&logname);
2957 out_lcfg:
2958         lustre_cfg_rec_free(lcr);
2959 out_comment:
2960         OBD_FREE(comment, len + 1);
2961         RETURN(rc);
2962 }
2963
2964 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2965                                         char *param)
2966 {
2967         char    *ptr;
2968
2969         /* disable the adjustable udesc parameter for now, i.e. use default
2970          * setting that client always ship udesc to MDT if possible. to enable
2971          * it simply remove the following line */
2972         goto error_out;
2973
2974         ptr = strchr(param, '=');
2975         if (ptr == NULL)
2976                 goto error_out;
2977         *ptr++ = '\0';
2978
2979         if (strcmp(param, PARAM_SRPC_UDESC))
2980                 goto error_out;
2981
2982         if (strcmp(ptr, "yes") == 0) {
2983                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2984                 CWARN("Enable user descriptor shipping from client to MDT\n");
2985         } else if (strcmp(ptr, "no") == 0) {
2986                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2987                 CWARN("Disable user descriptor shipping from client to MDT\n");
2988         } else {
2989                 *(ptr - 1) = '=';
2990                 goto error_out;
2991         }
2992         return 0;
2993
2994 error_out:
2995         CERROR("Invalid param: %s\n", param);
2996         return -EINVAL;
2997 }
2998
2999 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3000                                   const char *svname,
3001                                   char *param)
3002 {
3003         struct sptlrpc_rule      rule;
3004         struct sptlrpc_rule_set *rset;
3005         int                      rc;
3006         ENTRY;
3007
3008         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3009                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3010                 RETURN(-EINVAL);
3011         }
3012
3013         if (strncmp(param, PARAM_SRPC_UDESC,
3014                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3015                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3016         }
3017
3018         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3019                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3020                 RETURN(-EINVAL);
3021         }
3022
3023         param += sizeof(PARAM_SRPC_FLVR) - 1;
3024
3025         rc = sptlrpc_parse_rule(param, &rule);
3026         if (rc)
3027                 RETURN(rc);
3028
3029         /* mgs rules implies must be mgc->mgs */
3030         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3031                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3032                      rule.sr_from != LUSTRE_SP_ANY) ||
3033                     (rule.sr_to != LUSTRE_SP_MGS &&
3034                      rule.sr_to != LUSTRE_SP_ANY))
3035                         RETURN(-EINVAL);
3036         }
3037
3038         /* preapre room for this coming rule. svcname format should be:
3039          * - fsname: general rule
3040          * - fsname-tgtname: target-specific rule
3041          */
3042         if (strchr(svname, '-')) {
3043                 struct mgs_tgt_srpc_conf *tgtconf;
3044                 int                       found = 0;
3045
3046                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3047                      tgtconf = tgtconf->mtsc_next) {
3048                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3049                                 found = 1;
3050                                 break;
3051                         }
3052                 }
3053
3054                 if (!found) {
3055                         int name_len;
3056
3057                         OBD_ALLOC_PTR(tgtconf);
3058                         if (tgtconf == NULL)
3059                                 RETURN(-ENOMEM);
3060
3061                         name_len = strlen(svname);
3062
3063                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3064                         if (tgtconf->mtsc_tgt == NULL) {
3065                                 OBD_FREE_PTR(tgtconf);
3066                                 RETURN(-ENOMEM);
3067                         }
3068                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3069
3070                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3071                         fsdb->fsdb_srpc_tgt = tgtconf;
3072                 }
3073
3074                 rset = &tgtconf->mtsc_rset;
3075         } else {
3076                 rset = &fsdb->fsdb_srpc_gen;
3077         }
3078
3079         rc = sptlrpc_rule_set_merge(rset, &rule);
3080
3081         RETURN(rc);
3082 }
3083
3084 static int mgs_srpc_set_param(const struct lu_env *env,
3085                               struct mgs_device *mgs,
3086                               struct fs_db *fsdb,
3087                               struct mgs_target_info *mti,
3088                               char *param)
3089 {
3090         char                   *copy;
3091         int                     rc, copy_size;
3092         ENTRY;
3093
3094 #ifndef HAVE_GSS
3095         RETURN(-EINVAL);
3096 #endif
3097         /* keep a copy of original param, which could be destroied
3098          * during parsing */
3099         copy_size = strlen(param) + 1;
3100         OBD_ALLOC(copy, copy_size);
3101         if (copy == NULL)
3102                 return -ENOMEM;
3103         memcpy(copy, param, copy_size);
3104
3105         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3106         if (rc)
3107                 goto out_free;
3108
3109         /* previous steps guaranteed the syntax is correct */
3110         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3111         if (rc)
3112                 goto out_free;
3113
3114         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3115                 /*
3116                  * for mgs rules, make them effective immediately.
3117                  */
3118                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3119                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3120                                                  &fsdb->fsdb_srpc_gen);
3121         }
3122
3123 out_free:
3124         OBD_FREE(copy, copy_size);
3125         RETURN(rc);
3126 }
3127
3128 struct mgs_srpc_read_data {
3129         struct fs_db   *msrd_fsdb;
3130         int             msrd_skip;
3131 };
3132
3133 static int mgs_srpc_read_handler(const struct lu_env *env,
3134                                  struct llog_handle *llh,
3135                                  struct llog_rec_hdr *rec, void *data)
3136 {
3137         struct mgs_srpc_read_data *msrd = data;
3138         struct cfg_marker         *marker;
3139         struct lustre_cfg         *lcfg = REC_DATA(rec);
3140         char                      *svname, *param;
3141         int                        cfg_len, rc;
3142         ENTRY;
3143
3144         if (rec->lrh_type != OBD_CFG_REC) {
3145                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3146                 RETURN(-EINVAL);
3147         }
3148
3149         cfg_len = REC_DATA_LEN(rec);
3150
3151         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3152         if (rc) {
3153                 CERROR("Insane cfg\n");
3154                 RETURN(rc);
3155         }
3156
3157         if (lcfg->lcfg_command == LCFG_MARKER) {
3158                 marker = lustre_cfg_buf(lcfg, 1);
3159
3160                 if (marker->cm_flags & CM_START &&
3161                     marker->cm_flags & CM_SKIP)
3162                         msrd->msrd_skip = 1;
3163                 if (marker->cm_flags & CM_END)
3164                         msrd->msrd_skip = 0;
3165
3166                 RETURN(0);
3167         }
3168
3169         if (msrd->msrd_skip)
3170                 RETURN(0);
3171
3172         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3173                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3174                 RETURN(0);
3175         }
3176
3177         svname = lustre_cfg_string(lcfg, 0);
3178         if (svname == NULL) {
3179                 CERROR("svname is empty\n");
3180                 RETURN(0);
3181         }
3182
3183         param = lustre_cfg_string(lcfg, 1);
3184         if (param == NULL) {
3185                 CERROR("param is empty\n");
3186                 RETURN(0);
3187         }
3188
3189         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3190         if (rc)
3191                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3192
3193         RETURN(0);
3194 }
3195
3196 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3197                                 struct mgs_device *mgs,
3198                                 struct fs_db *fsdb)
3199 {
3200         struct llog_handle        *llh = NULL;
3201         struct llog_ctxt          *ctxt;
3202         char                      *logname;
3203         struct mgs_srpc_read_data  msrd;
3204         int                        rc;
3205         ENTRY;
3206
3207         /* construct log name */
3208         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3209         if (rc)
3210                 RETURN(rc);
3211
3212         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3213         LASSERT(ctxt != NULL);
3214
3215         if (mgs_log_is_empty(env, mgs, logname))
3216                 GOTO(out, rc = 0);
3217
3218         rc = llog_open(env, ctxt, &llh, NULL, logname,
3219                        LLOG_OPEN_EXISTS);
3220         if (rc < 0) {
3221                 if (rc == -ENOENT)
3222                         rc = 0;
3223                 GOTO(out, rc);
3224         }
3225
3226         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3227         if (rc)
3228                 GOTO(out_close, rc);
3229
3230         if (llog_get_size(llh) <= 1)
3231                 GOTO(out_close, rc = 0);
3232
3233         msrd.msrd_fsdb = fsdb;
3234         msrd.msrd_skip = 0;
3235
3236         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3237                           NULL);
3238
3239 out_close:
3240         llog_close(env, llh);
3241 out:
3242         llog_ctxt_put(ctxt);
3243         name_destroy(&logname);
3244
3245         if (rc)
3246                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3247         RETURN(rc);
3248 }
3249
3250 /* Permanent settings of all parameters by writing into the appropriate
3251  * configuration logs.
3252  * A parameter with null value ("<param>='\0'") means to erase it out of
3253  * the logs.
3254  */
3255 static int mgs_write_log_param(const struct lu_env *env,
3256                                struct mgs_device *mgs, struct fs_db *fsdb,
3257                                struct mgs_target_info *mti, char *ptr)
3258 {
3259         struct mgs_thread_info *mgi = mgs_env_info(env);
3260         char *logname;
3261         char *tmp;
3262         int rc = 0, rc2 = 0;
3263         ENTRY;
3264
3265         /* For various parameter settings, we have to figure out which logs
3266            care about them (e.g. both mdt and client for lov settings) */
3267         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3268
3269         /* The params are stored in MOUNT_DATA_FILE and modified via
3270            tunefs.lustre, or set using lctl conf_param */
3271
3272         /* Processed in lustre_start_mgc */
3273         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3274                 GOTO(end, rc);
3275
3276         /* Processed in ost/mdt */
3277         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3278                 GOTO(end, rc);
3279
3280         /* Processed in mgs_write_log_ost */
3281         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3282                 if (mti->mti_flags & LDD_F_PARAM) {
3283                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3284                                            "changed with tunefs.lustre"
3285                                            "and --writeconf\n", ptr);
3286                         rc = -EPERM;
3287                 }
3288                 GOTO(end, rc);
3289         }
3290
3291         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3292                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3293                 GOTO(end, rc);
3294         }
3295
3296         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3297                 /* Add a failover nidlist */
3298                 rc = 0;
3299                 /* We already processed failovers params for new
3300                    targets in mgs_write_log_target */
3301                 if (mti->mti_flags & LDD_F_PARAM) {
3302                         CDEBUG(D_MGS, "Adding failnode\n");
3303                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3304                 }
3305                 GOTO(end, rc);
3306         }
3307
3308         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3309                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3310                 GOTO(end, rc);
3311         }
3312
3313         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3314                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3315                 GOTO(end, rc);
3316         }
3317
3318         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3319                 /* active=0 means off, anything else means on */
3320                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3321                 int i;
3322
3323                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3324                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3325                                            "be (de)activated.\n",
3326                                            mti->mti_svname);
3327                         GOTO(end, rc = -EINVAL);
3328                 }
3329                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3330                               flag ? "de": "re", mti->mti_svname);
3331                 /* Modify clilov */
3332                 rc = name_create(&logname, mti->mti_fsname, "-client");
3333                 if (rc)
3334                         GOTO(end, rc);
3335                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3336                                 mti->mti_svname, "add osc", flag);
3337                 name_destroy(&logname);
3338                 if (rc)
3339                         goto active_err;
3340                 /* Modify mdtlov */
3341                 /* Add to all MDT logs for CMD */
3342                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3343                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3344                                 continue;
3345                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3346                         if (rc)
3347                                 GOTO(end, rc);
3348                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3349                                         mti->mti_svname, "add osc", flag);
3350                         name_destroy(&logname);
3351                         if (rc)
3352                                 goto active_err;
3353                 }
3354         active_err:
3355                 if (rc) {
3356                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3357                                            "log (%d). No permanent "
3358                                            "changes were made to the "
3359                                            "config log.\n",
3360                                            mti->mti_svname, rc);
3361                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3362                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3363                                                    " because the log"
3364                                                    "is in the old 1.4"
3365                                                    "style. Consider "
3366                                                    " --writeconf to "
3367                                                    "update the logs.\n");
3368                         GOTO(end, rc);
3369                 }
3370                 /* Fall through to osc proc for deactivating live OSC
3371                    on running MDT / clients. */
3372         }
3373         /* Below here, let obd's XXX_process_config methods handle it */
3374
3375         /* All lov. in proc */
3376         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3377                 char *mdtlovname;
3378
3379                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3380                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3381                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3382                                            "set on the MDT, not %s. "
3383                                            "Ignoring.\n",
3384                                            mti->mti_svname);
3385                         GOTO(end, rc = 0);
3386                 }
3387
3388                 /* Modify mdtlov */
3389                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3390                         GOTO(end, rc = -ENODEV);
3391
3392                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3393                                              mti->mti_stripe_index);
3394                 if (rc)
3395                         GOTO(end, rc);
3396                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3397                                   &mgi->mgi_bufs, mdtlovname, ptr);
3398                 name_destroy(&logname);
3399                 name_destroy(&mdtlovname);
3400                 if (rc)
3401                         GOTO(end, rc);
3402
3403                 /* Modify clilov */
3404                 rc = name_create(&logname, mti->mti_fsname, "-client");
3405                 if (rc)
3406                         GOTO(end, rc);
3407                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3408                                   fsdb->fsdb_clilov, ptr);
3409                 name_destroy(&logname);
3410                 GOTO(end, rc);
3411         }
3412
3413         /* All osc., mdc., llite. params in proc */
3414         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3415             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3416             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3417                 char *cname;
3418
3419                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3420                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3421                                            " cannot be modified. Consider"
3422                                            " updating the configuration with"
3423                                            " --writeconf\n",
3424                                            mti->mti_svname);
3425                         GOTO(end, rc = -EINVAL);
3426                 }
3427                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3428                         rc = name_create(&cname, mti->mti_fsname, "-client");
3429                         /* Add the client type to match the obdname in
3430                            class_config_llog_handler */
3431                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3432                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3433                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3434                         rc = name_create(&cname, mti->mti_svname, "-osc");
3435                 } else {
3436                         GOTO(end, rc = -EINVAL);
3437                 }
3438                 if (rc)
3439                         GOTO(end, rc);
3440
3441                 /* Forbid direct update of llite root squash parameters.
3442                  * These parameters are indirectly set via the MDT settings.
3443                  * See (LU-1778) */
3444                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3445                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3446                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3447                         LCONSOLE_ERROR("%s: root squash parameters can only "
3448                                 "be updated through MDT component\n",
3449                                 mti->mti_fsname);
3450                         name_destroy(&cname);
3451                         GOTO(end, rc = -EINVAL);
3452                 }
3453
3454                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3455
3456                 /* Modify client */
3457                 rc = name_create(&logname, mti->mti_fsname, "-client");
3458                 if (rc) {
3459                         name_destroy(&cname);
3460                         GOTO(end, rc);
3461                 }
3462                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3463                                   cname, ptr);
3464
3465                 /* osc params affect the MDT as well */
3466                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3467                         int i;
3468
3469                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3470                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3471                                         continue;
3472                                 name_destroy(&cname);
3473                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3474                                                          fsdb, i);
3475                                 name_destroy(&logname);
3476                                 if (rc)
3477                                         break;
3478                                 rc = name_create_mdt(&logname,
3479                                                      mti->mti_fsname, i);
3480                                 if (rc)
3481                                         break;
3482                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3483                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3484                                                           mti, logname,
3485                                                           &mgi->mgi_bufs,
3486                                                           cname, ptr);
3487                                         if (rc)
3488                                                 break;
3489                                 }
3490                         }
3491                 }
3492                 name_destroy(&logname);
3493                 name_destroy(&cname);
3494                 GOTO(end, rc);
3495         }
3496
3497         /* All mdt. params in proc */
3498         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3499                 int i;
3500                 __u32 idx;
3501
3502                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3503                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3504                             MTI_NAME_MAXLEN) == 0)
3505                         /* device is unspecified completely? */
3506                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3507                 else
3508                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3509                 if (rc < 0)
3510                         goto active_err;
3511                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3512                         goto active_err;
3513                 if (rc & LDD_F_SV_ALL) {
3514                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3515                                 if (!test_bit(i,
3516                                                   fsdb->fsdb_mdt_index_map))
3517                                         continue;
3518                                 rc = name_create_mdt(&logname,
3519                                                 mti->mti_fsname, i);
3520                                 if (rc)
3521                                         goto active_err;
3522                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3523                                                   logname, &mgi->mgi_bufs,
3524                                                   logname, ptr);
3525                                 name_destroy(&logname);
3526                                 if (rc)
3527                                         goto active_err;
3528                         }
3529                 } else {
3530                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3531                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3532                                 LCONSOLE_ERROR("%s: root squash parameters "
3533                                         "cannot be applied to a single MDT\n",
3534                                         mti->mti_fsname);
3535                                 GOTO(end, rc = -EINVAL);
3536                         }
3537                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3538                                           mti->mti_svname, &mgi->mgi_bufs,
3539                                           mti->mti_svname, ptr);
3540                         if (rc)
3541                                 goto active_err;
3542                 }
3543
3544                 /* root squash settings are also applied to llite
3545                  * config log (see LU-1778) */
3546                 if (rc == 0 &&
3547                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3548                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3549                         char *cname;
3550                         char *ptr2;
3551
3552                         rc = name_create(&cname, mti->mti_fsname, "-client");
3553                         if (rc)
3554                                 GOTO(end, rc);
3555                         rc = name_create(&logname, mti->mti_fsname, "-client");
3556                         if (rc) {
3557                                 name_destroy(&cname);
3558                                 GOTO(end, rc);
3559                         }
3560                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3561                         if (rc) {
3562                                 name_destroy(&cname);
3563                                 name_destroy(&logname);
3564                                 GOTO(end, rc);
3565                         }
3566                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3567                                           &mgi->mgi_bufs, cname, ptr2);
3568                         name_destroy(&ptr2);
3569                         name_destroy(&logname);
3570                         name_destroy(&cname);
3571                 }
3572                 GOTO(end, rc);
3573         }
3574
3575         /* All mdd., ost. and osd. params in proc */
3576         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3577             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3578             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3579                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3580                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3581                         GOTO(end, rc = -ENODEV);
3582
3583                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3584                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3585                 GOTO(end, rc);
3586         }
3587
3588         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3589         rc2 = -ENOSYS;
3590
3591 end:
3592         if (rc)
3593                 CERROR("err %d on param '%s'\n", rc, ptr);
3594
3595         RETURN(rc ?: rc2);
3596 }
3597
3598 /* Not implementing automatic failover nid addition at this time. */
3599 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3600                       struct mgs_target_info *mti)
3601 {
3602 #if 0
3603         struct fs_db *fsdb;
3604         int rc;
3605         ENTRY;
3606
3607         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3608         if (rc)
3609                 RETURN(rc);
3610
3611         if (mgs_log_is_empty(obd, mti->mti_svname))
3612                 /* should never happen */
3613                 RETURN(-ENOENT);
3614
3615         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3616
3617         /* FIXME We can just check mti->params to see if we're already in
3618            the failover list.  Modify mti->params for rewriting back at
3619            server_register_target(). */
3620
3621         mutex_lock(&fsdb->fsdb_mutex);
3622         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3623         mutex_unlock(&fsdb->fsdb_mutex);
3624         char    *buf, *params;
3625         int      rc = -EINVAL;
3626
3627         RETURN(rc);
3628 #endif
3629         return 0;
3630 }
3631
3632 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3633                          struct mgs_target_info *mti, struct fs_db *fsdb)
3634 {
3635         char    *buf, *params;
3636         int      rc = -EINVAL;
3637
3638         ENTRY;
3639
3640         /* set/check the new target index */
3641         rc = mgs_set_index(env, mgs, mti);
3642         if (rc < 0)
3643                 RETURN(rc);
3644
3645         if (rc == EALREADY) {
3646                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3647                               mti->mti_stripe_index, mti->mti_svname);
3648                 /* We would like to mark old log sections as invalid
3649                    and add new log sections in the client and mdt logs.
3650                    But if we add new sections, then live clients will
3651                    get repeat setup instructions for already running
3652                    osc's. So don't update the client/mdt logs. */
3653                 mti->mti_flags &= ~LDD_F_UPDATE;
3654                 rc = 0;
3655         }
3656
3657         mutex_lock(&fsdb->fsdb_mutex);
3658
3659         if (mti->mti_flags &
3660             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3661                 /* Generate a log from scratch */
3662                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3663                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3664                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3665                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3666                 } else {
3667                         CERROR("Unknown target type %#x, can't create log for "
3668                                "%s\n", mti->mti_flags, mti->mti_svname);
3669                 }
3670                 if (rc) {
3671                         CERROR("Can't write logs for %s (%d)\n",
3672                                mti->mti_svname, rc);
3673                         GOTO(out_up, rc);
3674                 }
3675         } else {
3676                 /* Just update the params from tunefs in mgs_write_log_params */
3677                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3678                 mti->mti_flags |= LDD_F_PARAM;
3679         }
3680
3681         /* allocate temporary buffer, where class_get_next_param will
3682            make copy of a current  parameter */
3683         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3684         if (buf == NULL)
3685                 GOTO(out_up, rc = -ENOMEM);
3686         params = mti->mti_params;
3687         while (params != NULL) {
3688                 rc = class_get_next_param(&params, buf);
3689                 if (rc) {
3690                         if (rc == 1)
3691                                 /* there is no next parameter, that is
3692                                    not an error */
3693                                 rc = 0;
3694                         break;
3695                 }
3696                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3697                        params, buf);
3698                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3699                 if (rc)
3700                         break;
3701         }
3702
3703         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3704
3705 out_up:
3706         mutex_unlock(&fsdb->fsdb_mutex);
3707         RETURN(rc);
3708 }
3709
3710 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3711 {
3712         struct llog_ctxt        *ctxt;
3713         int                      rc = 0;
3714
3715         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3716         if (ctxt == NULL) {
3717                 CERROR("%s: MGS config context doesn't exist\n",
3718                        mgs->mgs_obd->obd_name);
3719                 rc = -ENODEV;
3720         } else {
3721                 rc = llog_erase(env, ctxt, NULL, name);
3722                 /* llog may not exist */
3723                 if (rc == -ENOENT)
3724                         rc = 0;
3725                 llog_ctxt_put(ctxt);
3726         }
3727
3728         if (rc)
3729                 CERROR("%s: failed to clear log %s: %d\n",
3730                        mgs->mgs_obd->obd_name, name, rc);
3731
3732         return rc;
3733 }
3734
3735 /* erase all logs for the given fs */
3736 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3737 {
3738         struct fs_db *fsdb;
3739         struct list_head log_list;
3740         struct mgs_direntry *dirent, *n;
3741         int rc, len = strlen(fsname);
3742         char *suffix;
3743         ENTRY;
3744
3745         /* Find all the logs in the CONFIGS directory */
3746         rc = class_dentry_readdir(env, mgs, &log_list);
3747         if (rc)
3748                 RETURN(rc);
3749
3750         mutex_lock(&mgs->mgs_mutex);
3751
3752         /* Delete the fs db */
3753         fsdb = mgs_find_fsdb(mgs, fsname);
3754         if (fsdb)
3755                 mgs_free_fsdb(mgs, fsdb);
3756
3757         mutex_unlock(&mgs->mgs_mutex);
3758
3759         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3760                 list_del_init(&dirent->mde_list);
3761                 suffix = strrchr(dirent->mde_name, '-');
3762                 if (suffix != NULL) {
3763                         if ((len == suffix - dirent->mde_name) &&
3764                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3765                                 CDEBUG(D_MGS, "Removing log %s\n",
3766                                        dirent->mde_name);
3767                                 mgs_erase_log(env, mgs, dirent->mde_name);
3768                         }
3769                 }
3770                 mgs_direntry_free(dirent);
3771         }
3772
3773         RETURN(rc);
3774 }
3775
3776 /* list all logs for the given fs */
3777 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3778                   struct obd_ioctl_data *data)
3779 {
3780         struct list_head         log_list;
3781         struct mgs_direntry     *dirent, *n;
3782         char                    *out, *suffix;
3783         int                      l, remains, rc;
3784
3785         ENTRY;
3786
3787         /* Find all the logs in the CONFIGS directory */
3788         rc = class_dentry_readdir(env, mgs, &log_list);
3789         if (rc)
3790                 RETURN(rc);
3791
3792         out = data->ioc_bulk;
3793         remains = data->ioc_inllen1;
3794         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3795                 list_del_init(&dirent->mde_list);
3796                 suffix = strrchr(dirent->mde_name, '-');
3797                 if (suffix != NULL) {
3798                         l = snprintf(out, remains, "config log: $%s\n",
3799                                      dirent->mde_name);
3800                         out += l;
3801                         remains -= l;
3802                 }
3803                 mgs_direntry_free(dirent);
3804                 if (remains <= 0)
3805                         break;
3806         }
3807         RETURN(rc);
3808 }
3809
3810 /* from llog_swab */
3811 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3812 {
3813         int i;
3814         ENTRY;
3815
3816         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3817         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3818
3819         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3820         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3821         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3822         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3823
3824         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3825         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3826                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3827                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3828                                i, lcfg->lcfg_buflens[i],
3829                                lustre_cfg_string(lcfg, i));
3830                 }
3831         EXIT;
3832 }
3833
3834 /* Setup params fsdb and log
3835  */
3836 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3837                           struct fs_db *fsdb)
3838 {
3839         struct llog_handle      *params_llh = NULL;
3840         int                     rc;
3841         ENTRY;
3842
3843         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3844         if (fsdb != NULL) {
3845                 mutex_lock(&fsdb->fsdb_mutex);
3846                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3847                 if (rc == 0)
3848                         rc = record_end_log(env, &params_llh);
3849                 mutex_unlock(&fsdb->fsdb_mutex);
3850         }
3851
3852         RETURN(rc);
3853 }
3854
3855 /* Cleanup params fsdb and log
3856  */
3857 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3858 {
3859         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3860 }
3861
3862 /* Set a permanent (config log) param for a target or fs
3863  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3864  *             buf1 contains the single parameter
3865  */
3866 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3867                  struct lustre_cfg *lcfg, char *fsname)
3868 {
3869         struct fs_db *fsdb;
3870         struct mgs_target_info *mti;
3871         char *devname, *param;
3872         char *ptr;
3873         const char *tmp;
3874         __u32 index;
3875         int rc = 0;
3876         ENTRY;
3877
3878         print_lustre_cfg(lcfg);
3879
3880         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3881         devname = lustre_cfg_string(lcfg, 0);
3882         param = lustre_cfg_string(lcfg, 1);
3883         if (!devname) {
3884                 /* Assume device name embedded in param:
3885                    lustre-OST0000.osc.max_dirty_mb=32 */
3886                 ptr = strchr(param, '.');
3887                 if (ptr) {
3888                         devname = param;
3889                         *ptr = 0;
3890                         param = ptr + 1;
3891                 }
3892         }
3893         if (!devname) {
3894                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3895                 RETURN(-ENOSYS);
3896         }
3897
3898         rc = mgs_parse_devname(devname, fsname, NULL);
3899         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3900                 /* param related to llite isn't allowed to set by OST or MDT */
3901                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3902                                        sizeof(PARAM_LLITE) - 1) == 0)
3903                         RETURN(-EINVAL);
3904         } else {
3905                 /* assume devname is the fsname */
3906                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
3907         }
3908         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3909
3910         rc = mgs_find_or_make_fsdb(env, mgs,
3911                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3912                                    PARAMS_FILENAME : fsname, &fsdb);
3913         if (rc)
3914                 RETURN(rc);
3915
3916         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3917             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3918             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3919                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3920                        "is '%s'\n", fsname, devname);
3921                 mgs_free_fsdb(mgs, fsdb);
3922                 RETURN(-EINVAL);
3923         }
3924
3925         /* Create a fake mti to hold everything */
3926         OBD_ALLOC_PTR(mti);
3927         if (!mti)
3928                 GOTO(out, rc = -ENOMEM);
3929         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3930             >= sizeof(mti->mti_fsname))
3931                 GOTO(out, rc = -E2BIG);
3932         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3933             >= sizeof(mti->mti_svname))
3934                 GOTO(out, rc = -E2BIG);
3935         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3936             >= sizeof(mti->mti_params))
3937                 GOTO(out, rc = -E2BIG);
3938         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3939         if (rc < 0)
3940                 /* Not a valid server; may be only fsname */
3941                 rc = 0;
3942         else
3943                 /* Strip -osc or -mdc suffix from svname */
3944                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3945                                      mti->mti_svname))
3946                         GOTO(out, rc = -EINVAL);
3947         /*
3948          * Revoke lock so everyone updates.  Should be alright if
3949          * someone was already reading while we were updating the logs,
3950          * so we don't really need to hold the lock while we're
3951          * writing (above).
3952          */
3953         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3954                 mti->mti_flags = rc | LDD_F_PARAM2;
3955                 mutex_lock(&fsdb->fsdb_mutex);
3956                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3957                 mutex_unlock(&fsdb->fsdb_mutex);
3958                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3959         } else {
3960                 mti->mti_flags = rc | LDD_F_PARAM;
3961                 mutex_lock(&fsdb->fsdb_mutex);
3962                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3963                 mutex_unlock(&fsdb->fsdb_mutex);
3964                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3965         }
3966
3967 out:
3968         OBD_FREE_PTR(mti);
3969         RETURN(rc);
3970 }
3971
3972 static int mgs_write_log_pool(const struct lu_env *env,
3973                               struct mgs_device *mgs, char *logname,
3974                               struct fs_db *fsdb, char *tgtname,
3975                               enum lcfg_command_type cmd,
3976                               char *fsname, char *poolname,
3977                               char *ostname, char *comment)
3978 {
3979         struct llog_handle *llh = NULL;
3980         int rc;
3981
3982         rc = record_start_log(env, mgs, &llh, logname);
3983         if (rc)
3984                 return rc;
3985         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3986         if (rc)
3987                 goto out;
3988         rc = record_base(env, llh, tgtname, 0, cmd,
3989                          fsname, poolname, ostname, NULL);
3990         if (rc)
3991                 goto out;
3992         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3993 out:
3994         record_end_log(env, &llh);
3995         return rc;
3996 }
3997
3998 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
3999                     enum lcfg_command_type cmd, const char *nodemap_name,
4000                     char *param)
4001 {
4002         lnet_nid_t      nid[2];
4003         __u32           idmap[2];
4004         bool            bool_switch;
4005         __u32           int_id;
4006         int             rc = 0;
4007         ENTRY;
4008
4009         switch (cmd) {
4010         case LCFG_NODEMAP_ADD:
4011                 rc = nodemap_add(nodemap_name);
4012                 break;
4013         case LCFG_NODEMAP_DEL:
4014                 rc = nodemap_del(nodemap_name);
4015                 break;
4016         case LCFG_NODEMAP_ADD_RANGE:
4017                 rc = nodemap_parse_range(param, nid);
4018                 if (rc != 0)
4019                         break;
4020                 rc = nodemap_add_range(nodemap_name, nid);
4021                 break;
4022         case LCFG_NODEMAP_DEL_RANGE:
4023                 rc = nodemap_parse_range(param, nid);
4024                 if (rc != 0)
4025                         break;
4026                 rc = nodemap_del_range(nodemap_name, nid);
4027                 break;
4028         case LCFG_NODEMAP_ADMIN:
4029                 bool_switch = simple_strtoul(param, NULL, 10);
4030                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4031                 break;
4032         case LCFG_NODEMAP_TRUSTED:
4033                 bool_switch = simple_strtoul(param, NULL, 10);
4034                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4035                 break;
4036         case LCFG_NODEMAP_SQUASH_UID:
4037                 int_id = simple_strtoul(param, NULL, 10);
4038                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4039                 break;
4040         case LCFG_NODEMAP_SQUASH_GID:
4041                 int_id = simple_strtoul(param, NULL, 10);
4042                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4043                 break;
4044         case LCFG_NODEMAP_ADD_UIDMAP:
4045         case LCFG_NODEMAP_ADD_GIDMAP:
4046                 rc = nodemap_parse_idmap(param, idmap);
4047                 if (rc != 0)
4048                         break;
4049                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4050                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4051                                                idmap);
4052                 else
4053                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4054                                                idmap);
4055                 break;
4056         case LCFG_NODEMAP_DEL_UIDMAP:
4057         case LCFG_NODEMAP_DEL_GIDMAP:
4058                 rc = nodemap_parse_idmap(param, idmap);
4059                 if (rc != 0)
4060                         break;
4061                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4062                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4063                                                idmap);
4064                 else
4065                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4066                                                idmap);
4067                 break;
4068         default:
4069                 rc = -EINVAL;
4070         }
4071
4072         RETURN(rc);
4073 }
4074
4075 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4076                  enum lcfg_command_type cmd, char *fsname,
4077                  char *poolname, char *ostname)
4078 {
4079         struct fs_db *fsdb;
4080         char *lovname;
4081         char *logname;
4082         char *label = NULL, *canceled_label = NULL;
4083         int label_sz;
4084         struct mgs_target_info *mti = NULL;
4085         int rc, i;
4086         ENTRY;
4087
4088         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4089         if (rc) {
4090                 CERROR("Can't get db for %s\n", fsname);
4091                 RETURN(rc);
4092         }
4093         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4094                 CERROR("%s is not defined\n", fsname);
4095                 mgs_free_fsdb(mgs, fsdb);
4096                 RETURN(-EINVAL);
4097         }
4098
4099         label_sz = 10 + strlen(fsname) + strlen(poolname);
4100
4101         /* check if ostname match fsname */
4102         if (ostname != NULL) {
4103                 char *ptr;
4104
4105                 ptr = strrchr(ostname, '-');
4106                 if ((ptr == NULL) ||
4107                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4108                         RETURN(-EINVAL);
4109                 label_sz += strlen(ostname);
4110         }
4111
4112         OBD_ALLOC(label, label_sz);
4113         if (label == NULL)
4114                 RETURN(-ENOMEM);
4115
4116         switch(cmd) {
4117         case LCFG_POOL_NEW:
4118                 sprintf(label,
4119                         "new %s.%s", fsname, poolname);
4120                 break;
4121         case LCFG_POOL_ADD:
4122                 sprintf(label,
4123                         "add %s.%s.%s", fsname, poolname, ostname);
4124                 break;
4125         case LCFG_POOL_REM:
4126                 OBD_ALLOC(canceled_label, label_sz);
4127                 if (canceled_label == NULL)
4128                         GOTO(out_label, rc = -ENOMEM);
4129                 sprintf(label,
4130                         "rem %s.%s.%s", fsname, poolname, ostname);
4131                 sprintf(canceled_label,
4132                         "add %s.%s.%s", fsname, poolname, ostname);
4133                 break;
4134         case LCFG_POOL_DEL:
4135                 OBD_ALLOC(canceled_label, label_sz);
4136                 if (canceled_label == NULL)
4137                         GOTO(out_label, rc = -ENOMEM);
4138                 sprintf(label,
4139                         "del %s.%s", fsname, poolname);
4140                 sprintf(canceled_label,
4141                         "new %s.%s", fsname, poolname);
4142                 break;
4143         default:
4144                 break;
4145         }
4146
4147         if (canceled_label != NULL) {
4148                 OBD_ALLOC_PTR(mti);
4149                 if (mti == NULL)
4150                         GOTO(out_cancel, rc = -ENOMEM);
4151         }
4152
4153         mutex_lock(&fsdb->fsdb_mutex);
4154         /* write pool def to all MDT logs */
4155         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4156                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4157                         rc = name_create_mdt_and_lov(&logname, &lovname,
4158                                                      fsdb, i);
4159                         if (rc) {
4160                                 mutex_unlock(&fsdb->fsdb_mutex);
4161                                 GOTO(out_mti, rc);
4162                         }
4163                         if (canceled_label != NULL) {
4164                                 strcpy(mti->mti_svname, "lov pool");
4165                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4166                                                 lovname, canceled_label,
4167                                                 CM_SKIP);
4168                         }
4169
4170                         if (rc >= 0)
4171                                 rc = mgs_write_log_pool(env, mgs, logname,
4172                                                         fsdb, lovname, cmd,
4173                                                         fsname, poolname,
4174                                                         ostname, label);
4175                         name_destroy(&logname);
4176                         name_destroy(&lovname);
4177                         if (rc) {
4178                                 mutex_unlock(&fsdb->fsdb_mutex);
4179                                 GOTO(out_mti, rc);
4180                         }
4181                 }
4182         }
4183
4184         rc = name_create(&logname, fsname, "-client");
4185         if (rc) {
4186                 mutex_unlock(&fsdb->fsdb_mutex);
4187                 GOTO(out_mti, rc);
4188         }
4189         if (canceled_label != NULL) {
4190                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4191                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4192                 if (rc < 0) {
4193                         mutex_unlock(&fsdb->fsdb_mutex);
4194                         name_destroy(&logname);
4195                         GOTO(out_mti, rc);
4196                 }
4197         }
4198
4199         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4200                                 cmd, fsname, poolname, ostname, label);
4201         mutex_unlock(&fsdb->fsdb_mutex);
4202         name_destroy(&logname);
4203         /* request for update */
4204         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4205
4206         EXIT;
4207 out_mti:
4208         if (mti != NULL)
4209                 OBD_FREE_PTR(mti);
4210 out_cancel:
4211         if (canceled_label != NULL)
4212                 OBD_FREE(canceled_label, label_sz);
4213 out_label:
4214         OBD_FREE(label, label_sz);
4215         return rc;
4216 }