Whamcloud - gitweb
LU-3572 mgs: build failure seen in SLES SP2 server build
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         mutex_init(&fsdb->fsdb_mutex);
349         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551         } else {
552                 GOTO(out_up, rc = -EINVAL);
553         }
554
555         if (mti->mti_flags & LDD_F_NEED_INDEX) {
556                 rc = next_index(imap, INDEX_MAP_SIZE);
557                 if (rc == -1)
558                         GOTO(out_up, rc = -ERANGE);
559                 mti->mti_stripe_index = rc;
560                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
561                         fsdb->fsdb_mdt_count ++;
562         }
563
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
566                                    "but the max index is %d.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 /* Header and tail are added back to lrh_len in
648                    llog_lvfs_write_rec */
649                 rec->lrh_len = cfg_len;
650                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
651                                 rec->lrh_index);
652                 if (!rc)
653                          mml->mml_modified++;
654         }
655
656         RETURN(rc);
657 }
658
659 /**
660  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
661  * Return code:
662  * 0 - modified successfully,
663  * 1 - no modification was done
664  * negative - error
665  */
666 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
667                       struct fs_db *fsdb, struct mgs_target_info *mti,
668                       char *logname, char *devname, char *comment, int flags)
669 {
670         struct llog_handle *loghandle;
671         struct llog_ctxt *ctxt;
672         struct mgs_modify_lookup *mml;
673         int rc;
674
675         ENTRY;
676
677         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
678         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
679                flags);
680
681         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
682         LASSERT(ctxt != NULL);
683         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
684         if (rc < 0) {
685                 if (rc == -ENOENT)
686                         rc = 0;
687                 GOTO(out_pop, rc);
688         }
689
690         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
691         if (rc)
692                 GOTO(out_close, rc);
693
694         if (llog_get_size(loghandle) <= 1)
695                 GOTO(out_close, rc = 0);
696
697         OBD_ALLOC_PTR(mml);
698         if (!mml)
699                 GOTO(out_close, rc = -ENOMEM);
700         strcpy(mml->mml_marker.cm_comment, comment);
701         strcpy(mml->mml_marker.cm_tgtname, devname);
702         /* Modify mostly means cancel */
703         mml->mml_marker.cm_flags = flags;
704         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
705         mml->mml_modified = 0;
706         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
707                           NULL);
708         if (!rc && !mml->mml_modified)
709                 rc = 1;
710         OBD_FREE_PTR(mml);
711
712 out_close:
713         llog_close(env, loghandle);
714 out_pop:
715         if (rc < 0)
716                 CERROR("%s: modify %s/%s failed: rc = %d\n",
717                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
718         llog_ctxt_put(ctxt);
719         RETURN(rc);
720 }
721
722 /** This structure is passed to mgs_replace_handler */
723 struct mgs_replace_uuid_lookup {
724         /* Nids are replaced for this target device */
725         struct mgs_target_info target;
726         /* Temporary modified llog */
727         struct llog_handle *temp_llh;
728         /* Flag is set if in target block*/
729         int in_target_device;
730         /* Nids already added. Just skip (multiple nids) */
731         int device_nids_added;
732         /* Flag is set if this block should not be copied */
733         int skip_it;
734 };
735
736 /**
737  * Check: a) if block should be skipped
738  * b) is it target block
739  *
740  * \param[in] lcfg
741  * \param[in] mrul
742  *
743  * \retval 0 should not to be skipped
744  * \retval 1 should to be skipped
745  */
746 static int check_markers(struct lustre_cfg *lcfg,
747                          struct mgs_replace_uuid_lookup *mrul)
748 {
749          struct cfg_marker *marker;
750
751         /* Track markers. Find given device */
752         if (lcfg->lcfg_command == LCFG_MARKER) {
753                 marker = lustre_cfg_buf(lcfg, 1);
754                 /* Clean llog from records marked as CM_EXCLUDE.
755                    CM_SKIP records are used for "active" command
756                    and can be restored if needed */
757                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
758                     (CM_EXCLUDE | CM_START)) {
759                         mrul->skip_it = 1;
760                         return 1;
761                 }
762
763                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
764                     (CM_EXCLUDE | CM_END)) {
765                         mrul->skip_it = 0;
766                         return 1;
767                 }
768
769                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
770                         LASSERT(!(marker->cm_flags & CM_START) ||
771                                 !(marker->cm_flags & CM_END));
772                         if (marker->cm_flags & CM_START) {
773                                 mrul->in_target_device = 1;
774                                 mrul->device_nids_added = 0;
775                         } else if (marker->cm_flags & CM_END)
776                                 mrul->in_target_device = 0;
777                 }
778         }
779
780         return 0;
781 }
782
783 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
784                        struct lustre_cfg *lcfg)
785 {
786         struct llog_rec_hdr      rec;
787         int                      buflen, rc;
788
789         if (!lcfg || !llh)
790                 return -ENOMEM;
791
792         LASSERT(llh->lgh_ctxt);
793
794         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
795                                 lcfg->lcfg_buflens);
796         rec.lrh_len = llog_data_len(buflen);
797         rec.lrh_type = OBD_CFG_REC;
798
799         /* idx = -1 means append */
800         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
801         if (rc)
802                 CERROR("failed %d\n", rc);
803         return rc;
804 }
805
806 static int record_base(const struct lu_env *env, struct llog_handle *llh,
807                      char *cfgname, lnet_nid_t nid, int cmd,
808                      char *s1, char *s2, char *s3, char *s4)
809 {
810         struct mgs_thread_info *mgi = mgs_env_info(env);
811         struct lustre_cfg     *lcfg;
812         int rc;
813
814         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
815                cmd, s1, s2, s3, s4);
816
817         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
818         if (s1)
819                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
820         if (s2)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
822         if (s3)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
824         if (s4)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
826
827         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
828         if (!lcfg)
829                 return -ENOMEM;
830         lcfg->lcfg_nid = nid;
831
832         rc = record_lcfg(env, llh, lcfg);
833
834         lustre_cfg_free(lcfg);
835
836         if (rc) {
837                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
838                        cmd, s1, s2, s3, s4);
839         }
840         return rc;
841 }
842
843 static inline int record_add_uuid(const struct lu_env *env,
844                                   struct llog_handle *llh,
845                                   uint64_t nid, char *uuid)
846 {
847         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
848 }
849
850 static inline int record_add_conn(const struct lu_env *env,
851                                   struct llog_handle *llh,
852                                   char *devname, char *uuid)
853 {
854         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
855 }
856
857 static inline int record_attach(const struct lu_env *env,
858                                 struct llog_handle *llh, char *devname,
859                                 char *type, char *uuid)
860 {
861         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
862 }
863
864 static inline int record_setup(const struct lu_env *env,
865                                struct llog_handle *llh, char *devname,
866                                char *s1, char *s2, char *s3, char *s4)
867 {
868         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
869 }
870
871 /**
872  * \retval <0 record processing error
873  * \retval n record is processed. No need copy original one.
874  * \retval 0 record is not processed.
875  */
876 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
877                            struct mgs_replace_uuid_lookup *mrul)
878 {
879         int nids_added = 0;
880         lnet_nid_t nid;
881         char *ptr;
882         int rc;
883
884         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
885                 /* LCFG_ADD_UUID command found. Let's skip original command
886                    and add passed nids */
887                 ptr = mrul->target.mti_params;
888                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
889                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
890                                "device %s\n", libcfs_nid2str(nid),
891                                 mrul->target.mti_params,
892                                 mrul->target.mti_svname);
893                         rc = record_add_uuid(env,
894                                              mrul->temp_llh, nid,
895                                              mrul->target.mti_params);
896                         if (!rc)
897                                 nids_added++;
898                 }
899
900                 if (nids_added == 0) {
901                         CERROR("No new nids were added, nid %s with uuid %s, "
902                                "device %s\n", libcfs_nid2str(nid),
903                                mrul->target.mti_params,
904                                mrul->target.mti_svname);
905                         RETURN(-ENXIO);
906                 } else {
907                         mrul->device_nids_added = 1;
908                 }
909
910                 return nids_added;
911         }
912
913         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
914                 /* LCFG_SETUP command found. UUID should be changed */
915                 rc = record_setup(env,
916                                   mrul->temp_llh,
917                                   /* devname the same */
918                                   lustre_cfg_string(lcfg, 0),
919                                   /* s1 is not changed */
920                                   lustre_cfg_string(lcfg, 1),
921                                   /* new uuid should be
922                                   the full nidlist */
923                                   mrul->target.mti_params,
924                                   /* s3 is not changed */
925                                   lustre_cfg_string(lcfg, 3),
926                                   /* s4 is not changed */
927                                   lustre_cfg_string(lcfg, 4));
928                 return rc ? rc : 1;
929         }
930
931         /* Another commands in target device block */
932         return 0;
933 }
934
935 /**
936  * Handler that called for every record in llog.
937  * Records are processed in order they placed in llog.
938  *
939  * \param[in] llh       log to be processed
940  * \param[in] rec       current record
941  * \param[in] data      mgs_replace_uuid_lookup structure
942  *
943  * \retval 0    success
944  */
945 static int mgs_replace_handler(const struct lu_env *env,
946                                struct llog_handle *llh,
947                                struct llog_rec_hdr *rec,
948                                void *data)
949 {
950         struct llog_rec_hdr local_rec = *rec;
951         struct mgs_replace_uuid_lookup *mrul;
952         struct lustre_cfg *lcfg = REC_DATA(rec);
953         int cfg_len = REC_DATA_LEN(rec);
954         int rc;
955         ENTRY;
956
957         mrul = (struct mgs_replace_uuid_lookup *)data;
958
959         if (rec->lrh_type != OBD_CFG_REC) {
960                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
961                        rec->lrh_type, lcfg->lcfg_command,
962                        lustre_cfg_string(lcfg, 0),
963                        lustre_cfg_string(lcfg, 1));
964                 RETURN(-EINVAL);
965         }
966
967         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
968         if (rc) {
969                 /* Do not copy any invalidated records */
970                 GOTO(skip_out, rc = 0);
971         }
972
973         rc = check_markers(lcfg, mrul);
974         if (rc || mrul->skip_it)
975                 GOTO(skip_out, rc = 0);
976
977         /* Write to new log all commands outside target device block */
978         if (!mrul->in_target_device)
979                 GOTO(copy_out, rc = 0);
980
981         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
982            (failover nids) for this target, assuming that if then
983            primary is changing then so is the failover */
984         if (mrul->device_nids_added &&
985             (lcfg->lcfg_command == LCFG_ADD_UUID ||
986              lcfg->lcfg_command == LCFG_ADD_CONN))
987                 GOTO(skip_out, rc = 0);
988
989         rc = process_command(env, lcfg, mrul);
990         if (rc < 0)
991                 RETURN(rc);
992
993         if (rc)
994                 RETURN(0);
995 copy_out:
996         /* Record is placed in temporary llog as is */
997         local_rec.lrh_len -= sizeof(*rec) + sizeof(struct llog_rec_tail);
998         rc = llog_write(env, mrul->temp_llh, &local_rec, NULL, 0,
999                         (void *)lcfg, -1);
1000
1001         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1002                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1003                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1004         RETURN(rc);
1005
1006 skip_out:
1007         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1008                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1009                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1010         RETURN(rc);
1011 }
1012
1013 static int mgs_backup_llog(const struct lu_env *env,
1014                            struct obd_device *mgs,
1015                            char *fsname, char *backup)
1016 {
1017         struct obd_uuid *uuid;
1018         struct llog_handle *orig_llh, *bak_llh;
1019         struct llog_ctxt *lctxt;
1020         int rc, rc2;
1021         ENTRY;
1022
1023         lctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1024         if (!lctxt) {
1025                 CERROR("%s: missing llog context\n", mgs->obd_name);
1026                 GOTO(out, rc = -EINVAL);
1027         }
1028
1029         /* Make sure there's no old backup log */
1030         rc = llog_erase(env, lctxt, NULL, backup);
1031         if (rc < 0 && rc != -ENOENT)
1032                 GOTO(out_put, rc);
1033
1034         /* open backup log */
1035         rc = llog_open_create(env, lctxt, &bak_llh, NULL, backup);
1036         if (rc) {
1037                 CERROR("%s: backup logfile open %s: rc = %d\n",
1038                        mgs->obd_name, backup, rc);
1039                 GOTO(out_put, rc);
1040         }
1041
1042         /* set the log header uuid */
1043         OBD_ALLOC_PTR(uuid);
1044         if (uuid == NULL)
1045                 GOTO(out_put, rc = -ENOMEM);
1046         obd_str2uuid(uuid, backup);
1047         rc = llog_init_handle(env, bak_llh, LLOG_F_IS_PLAIN, uuid);
1048         OBD_FREE_PTR(uuid);
1049         if (rc)
1050                 GOTO(out_close1, rc);
1051
1052         /* open original log */
1053         rc = llog_open(env, lctxt, &orig_llh, NULL, fsname,
1054                        LLOG_OPEN_EXISTS);
1055         if (rc < 0) {
1056                 if (rc == -ENOENT)
1057                         rc = 0;
1058                 GOTO(out_close1, rc);
1059         }
1060
1061         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1062         if (rc)
1063                 GOTO(out_close2, rc);
1064
1065         /* Copy remote log */
1066         rc = llog_process(env, orig_llh, llog_copy_handler,
1067                           (void *)bak_llh, NULL);
1068
1069 out_close2:
1070         rc2 = llog_close(env, orig_llh);
1071         if (!rc)
1072                 rc = rc2;
1073 out_close1:
1074         rc2 = llog_close(env, bak_llh);
1075         if (!rc)
1076                 rc = rc2;
1077 out_put:
1078         if (lctxt)
1079                 llog_ctxt_put(lctxt);
1080 out:
1081         if (rc)
1082                 CERROR("%s: Failed to backup log %s: rc = %d\n",
1083                        mgs->obd_name, fsname, rc);
1084         RETURN(rc);
1085 }
1086
1087 static int mgs_log_is_empty(const struct lu_env *env, struct mgs_device *mgs,
1088                             char *name);
1089
1090 static int mgs_replace_nids_log(const struct lu_env *env,
1091                                 struct obd_device *mgs, struct fs_db *fsdb,
1092                                 char *logname, char *devname, char *nids)
1093 {
1094         struct llog_handle *orig_llh, *backup_llh;
1095         struct llog_ctxt *ctxt;
1096         struct mgs_replace_uuid_lookup *mrul;
1097         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1098         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1099         char *backup;
1100         int rc, rc2;
1101         ENTRY;
1102
1103         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1104
1105         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1106         LASSERT(ctxt != NULL);
1107
1108         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1109                 /* Log is empty. Nothing to replace */
1110                 GOTO(out_put, rc = 0);
1111         }
1112
1113         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1114         if (backup == NULL)
1115                 GOTO(out_put, rc = -ENOMEM);
1116
1117         sprintf(backup, "%s.bak", logname);
1118
1119         rc = mgs_backup_llog(env, mgs, logname, backup);
1120         if (rc < 0) {
1121                 CERROR("%s: can't make backup for %s: rc = %d\n",
1122                        mgs->obd_name, logname, rc);
1123                 GOTO(out_free,rc);
1124         }
1125
1126         /* Now erase original log file. Connections are not allowed.
1127            Backup is already saved */
1128         rc = llog_erase(env, ctxt, NULL, logname);
1129         if (rc < 0 && rc != -ENOENT)
1130                 GOTO(out_free, rc);
1131
1132         /* open local log */
1133         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1134         if (rc)
1135                 GOTO(out_restore, rc);
1136
1137         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1138         if (rc)
1139                 GOTO(out_closel, rc);
1140
1141         /* open backup llog */
1142         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1143                        LLOG_OPEN_EXISTS);
1144         if (rc)
1145                 GOTO(out_closel, rc);
1146
1147         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1148         if (rc)
1149                 GOTO(out_close, rc);
1150
1151         if (llog_get_size(backup_llh) <= 1)
1152                 GOTO(out_close, rc = 0);
1153
1154         OBD_ALLOC_PTR(mrul);
1155         if (!mrul)
1156                 GOTO(out_close, rc = -ENOMEM);
1157         /* devname is only needed information to replace UUID records */
1158         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1159         /* parse nids later */
1160         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1161         /* Copy records to this temporary llog */
1162         mrul->temp_llh = orig_llh;
1163
1164         rc = llog_process(env, backup_llh, mgs_replace_handler,
1165                           (void *)mrul, NULL);
1166         OBD_FREE_PTR(mrul);
1167 out_close:
1168         rc2 = llog_close(NULL, backup_llh);
1169         if (!rc)
1170                 rc = rc2;
1171 out_closel:
1172         rc2 = llog_close(NULL, orig_llh);
1173         if (!rc)
1174                 rc = rc2;
1175
1176 out_restore:
1177         if (rc) {
1178                 CERROR("%s: llog should be restored: rc = %d\n",
1179                        mgs->obd_name, rc);
1180                 rc2 = mgs_backup_llog(env, mgs, backup, logname);
1181                 if (rc2 < 0)
1182                         CERROR("%s: can't restore backup %s: rc = %d\n",
1183                                mgs->obd_name, logname, rc2);
1184         }
1185
1186 out_free:
1187         OBD_FREE(backup, strlen(backup) + 1);
1188
1189 out_put:
1190         llog_ctxt_put(ctxt);
1191
1192         if (rc)
1193                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1194                        mgs->obd_name, logname, rc);
1195
1196         RETURN(rc);
1197 }
1198
1199 /**
1200  * Parse device name and get file system name and/or device index
1201  *
1202  * \param[in]   devname device name (ex. lustre-MDT0000)
1203  * \param[out]  fsname  file system name(optional)
1204  * \param[out]  index   device index(optional)
1205  *
1206  * \retval 0    success
1207  */
1208 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1209 {
1210         int rc;
1211         ENTRY;
1212
1213         /* Extract fsname */
1214         if (fsname) {
1215                 rc = server_name2fsname(devname, fsname, NULL);
1216                 if (rc < 0) {
1217                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1218                                devname);
1219                         RETURN(-EINVAL);
1220                 }
1221         }
1222
1223         if (index) {
1224                 rc = server_name2index(devname, index, NULL);
1225                 if (rc < 0) {
1226                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1227                                devname);
1228                         RETURN(-EINVAL);
1229                 }
1230         }
1231
1232         RETURN(0);
1233 }
1234
1235 static int only_mgs_is_running(struct obd_device *mgs_obd)
1236 {
1237         /* TDB: Is global variable with devices count exists? */
1238         int num_devices = get_devices_count();
1239         /* osd, MGS and MGC + self_export
1240            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1241         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1242 }
1243
1244 static int name_create_mdt(char **logname, char *fsname, int i)
1245 {
1246         char mdt_index[9];
1247
1248         sprintf(mdt_index, "-MDT%04x", i);
1249         return name_create(logname, fsname, mdt_index);
1250 }
1251
1252 /**
1253  * Replace nids for \a device to \a nids values
1254  *
1255  * \param obd           MGS obd device
1256  * \param devname       nids need to be replaced for this device
1257  * (ex. lustre-OST0000)
1258  * \param nids          nids list (ex. nid1,nid2,nid3)
1259  *
1260  * \retval 0    success
1261  */
1262 int mgs_replace_nids(const struct lu_env *env,
1263                      struct mgs_device *mgs,
1264                      char *devname, char *nids)
1265 {
1266         /* Assume fsname is part of device name */
1267         char fsname[MTI_NAME_MAXLEN];
1268         int rc;
1269         __u32 index;
1270         char *logname;
1271         struct fs_db *fsdb;
1272         unsigned int i;
1273         int conn_state;
1274         struct obd_device *mgs_obd = mgs->mgs_obd;
1275         ENTRY;
1276
1277         /* We can only change NIDs if no other nodes are connected */
1278         spin_lock(&mgs_obd->obd_dev_lock);
1279         conn_state = mgs_obd->obd_no_conn;
1280         mgs_obd->obd_no_conn = 1;
1281         spin_unlock(&mgs_obd->obd_dev_lock);
1282
1283         /* We can not change nids if not only MGS is started */
1284         if (!only_mgs_is_running(mgs_obd)) {
1285                 CERROR("Only MGS is allowed to be started\n");
1286                 GOTO(out, rc = -EINPROGRESS);
1287         }
1288
1289         /* Get fsname and index*/
1290         rc = mgs_parse_devname(devname, fsname, &index);
1291         if (rc)
1292                 GOTO(out, rc);
1293
1294         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1295         if (rc) {
1296                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1297                 GOTO(out, rc);
1298         }
1299
1300         /* Process client llogs */
1301         name_create(&logname, fsname, "-client");
1302         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1303         name_destroy(&logname);
1304         if (rc) {
1305                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1306                        fsname, devname, rc);
1307                 GOTO(out, rc);
1308         }
1309
1310         /* Process MDT llogs */
1311         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1312                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1313                         continue;
1314                 name_create_mdt(&logname, fsname, i);
1315                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1316                 name_destroy(&logname);
1317                 if (rc)
1318                         GOTO(out, rc);
1319         }
1320
1321 out:
1322         spin_lock(&mgs_obd->obd_dev_lock);
1323         mgs_obd->obd_no_conn = conn_state;
1324         spin_unlock(&mgs_obd->obd_dev_lock);
1325
1326         RETURN(rc);
1327 }
1328
1329 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1330                             char *devname, struct lov_desc *desc)
1331 {
1332         struct mgs_thread_info *mgi = mgs_env_info(env);
1333         struct lustre_cfg *lcfg;
1334         int rc;
1335
1336         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1337         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1338         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1339         if (!lcfg)
1340                 return -ENOMEM;
1341         rc = record_lcfg(env, llh, lcfg);
1342
1343         lustre_cfg_free(lcfg);
1344         return rc;
1345 }
1346
1347 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1348                             char *devname, struct lmv_desc *desc)
1349 {
1350         struct mgs_thread_info *mgi = mgs_env_info(env);
1351         struct lustre_cfg *lcfg;
1352         int rc;
1353
1354         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1355         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1356         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1357
1358         rc = record_lcfg(env, llh, lcfg);
1359
1360         lustre_cfg_free(lcfg);
1361         return rc;
1362 }
1363
1364 static inline int record_mdc_add(const struct lu_env *env,
1365                                  struct llog_handle *llh,
1366                                  char *logname, char *mdcuuid,
1367                                  char *mdtuuid, char *index,
1368                                  char *gen)
1369 {
1370         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1371                            mdtuuid,index,gen,mdcuuid);
1372 }
1373
1374 static inline int record_lov_add(const struct lu_env *env,
1375                                  struct llog_handle *llh,
1376                                  char *lov_name, char *ost_uuid,
1377                                  char *index, char *gen)
1378 {
1379         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1380                            ost_uuid, index, gen, 0);
1381 }
1382
1383 static inline int record_mount_opt(const struct lu_env *env,
1384                                    struct llog_handle *llh,
1385                                    char *profile, char *lov_name,
1386                                    char *mdc_name)
1387 {
1388         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1389                            profile,lov_name,mdc_name,0);
1390 }
1391
1392 static int record_marker(const struct lu_env *env,
1393                          struct llog_handle *llh,
1394                          struct fs_db *fsdb, __u32 flags,
1395                          char *tgtname, char *comment)
1396 {
1397         struct mgs_thread_info *mgi = mgs_env_info(env);
1398         struct lustre_cfg *lcfg;
1399         int rc;
1400         int cplen = 0;
1401
1402         if (flags & CM_START)
1403                 fsdb->fsdb_gen++;
1404         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1405         mgi->mgi_marker.cm_flags = flags;
1406         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1407         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1408                         sizeof(mgi->mgi_marker.cm_tgtname));
1409         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1410                 return -E2BIG;
1411         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1412                         sizeof(mgi->mgi_marker.cm_comment));
1413         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1414                 return -E2BIG;
1415         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1416         mgi->mgi_marker.cm_canceltime = 0;
1417         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1418         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1419                             sizeof(mgi->mgi_marker));
1420         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1421         if (!lcfg)
1422                 return -ENOMEM;
1423         rc = record_lcfg(env, llh, lcfg);
1424
1425         lustre_cfg_free(lcfg);
1426         return rc;
1427 }
1428
1429 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1430                             struct llog_handle **llh, char *name)
1431 {
1432         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1433         struct llog_ctxt        *ctxt;
1434         int                      rc = 0;
1435
1436         if (*llh)
1437                 GOTO(out, rc = -EBUSY);
1438
1439         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1440         if (!ctxt)
1441                 GOTO(out, rc = -ENODEV);
1442         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1443
1444         rc = llog_open_create(env, ctxt, llh, NULL, name);
1445         if (rc)
1446                 GOTO(out_ctxt, rc);
1447         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1448         if (rc)
1449                 llog_close(env, *llh);
1450 out_ctxt:
1451         llog_ctxt_put(ctxt);
1452 out:
1453         if (rc) {
1454                 CERROR("%s: can't start log %s: rc = %d\n",
1455                        mgs->mgs_obd->obd_name, name, rc);
1456                 *llh = NULL;
1457         }
1458         RETURN(rc);
1459 }
1460
1461 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1462 {
1463         int rc;
1464
1465         rc = llog_close(env, *llh);
1466         *llh = NULL;
1467
1468         return rc;
1469 }
1470
1471 static int mgs_log_is_empty(const struct lu_env *env,
1472                             struct mgs_device *mgs, char *name)
1473 {
1474         struct llog_handle *llh;
1475         struct llog_ctxt *ctxt;
1476         int rc = 0;
1477
1478         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1479         LASSERT(ctxt != NULL);
1480         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1481         if (rc < 0) {
1482                 if (rc == -ENOENT)
1483                         rc = 0;
1484                 GOTO(out_ctxt, rc);
1485         }
1486
1487         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1488         if (rc)
1489                 GOTO(out_close, rc);
1490         rc = llog_get_size(llh);
1491
1492 out_close:
1493         llog_close(env, llh);
1494 out_ctxt:
1495         llog_ctxt_put(ctxt);
1496         /* header is record 1 */
1497         return (rc <= 1);
1498 }
1499
1500 /******************** config "macros" *********************/
1501
1502 /* write an lcfg directly into a log (with markers) */
1503 static int mgs_write_log_direct(const struct lu_env *env,
1504                                 struct mgs_device *mgs, struct fs_db *fsdb,
1505                                 char *logname, struct lustre_cfg *lcfg,
1506                                 char *devname, char *comment)
1507 {
1508         struct llog_handle *llh = NULL;
1509         int rc;
1510         ENTRY;
1511
1512         if (!lcfg)
1513                 RETURN(-ENOMEM);
1514
1515         rc = record_start_log(env, mgs, &llh, logname);
1516         if (rc)
1517                 RETURN(rc);
1518
1519         /* FIXME These should be a single journal transaction */
1520         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1521         if (rc)
1522                 GOTO(out_end, rc);
1523         rc = record_lcfg(env, llh, lcfg);
1524         if (rc)
1525                 GOTO(out_end, rc);
1526         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1527         if (rc)
1528                 GOTO(out_end, rc);
1529 out_end:
1530         record_end_log(env, &llh);
1531         RETURN(rc);
1532 }
1533
1534 /* write the lcfg in all logs for the given fs */
1535 int mgs_write_log_direct_all(const struct lu_env *env,
1536                              struct mgs_device *mgs,
1537                              struct fs_db *fsdb,
1538                              struct mgs_target_info *mti,
1539                              struct lustre_cfg *lcfg,
1540                              char *devname, char *comment,
1541                              int server_only)
1542 {
1543         cfs_list_t list;
1544         struct mgs_direntry *dirent, *n;
1545         char *fsname = mti->mti_fsname;
1546         char *logname;
1547         int rc = 0, len = strlen(fsname);
1548         ENTRY;
1549
1550         /* We need to set params for any future logs
1551            as well. FIXME Append this file to every new log.
1552            Actually, we should store as params (text), not llogs.  Or
1553            in a database. */
1554         rc = name_create(&logname, fsname, "-params");
1555         if (rc)
1556                 RETURN(rc);
1557         if (mgs_log_is_empty(env, mgs, logname)) {
1558                 struct llog_handle *llh = NULL;
1559                 rc = record_start_log(env, mgs, &llh, logname);
1560                 record_end_log(env, &llh);
1561         }
1562         name_destroy(&logname);
1563         if (rc)
1564                 RETURN(rc);
1565
1566         /* Find all the logs in the CONFIGS directory */
1567         rc = class_dentry_readdir(env, mgs, &list);
1568         if (rc)
1569                 RETURN(rc);
1570
1571         /* Could use fsdb index maps instead of directory listing */
1572         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1573                 cfs_list_del(&dirent->list);
1574                 /* don't write to sptlrpc rule log */
1575                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1576                         goto next;
1577
1578                 /* caller wants write server logs only */
1579                 if (server_only && strstr(dirent->name, "-client") != NULL)
1580                         goto next;
1581
1582                 if (strncmp(fsname, dirent->name, len) == 0) {
1583                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1584                         /* Erase any old settings of this same parameter */
1585                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1586                                         devname, comment, CM_SKIP);
1587                         if (rc < 0)
1588                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1589                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1590                         /* Write the new one */
1591                         if (lcfg) {
1592                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1593                                                           dirent->name,
1594                                                           lcfg, devname,
1595                                                           comment);
1596                                 if (rc)
1597                                         CERROR("%s: writing log %s: rc = %d\n",
1598                                                mgs->mgs_obd->obd_name,
1599                                                dirent->name, rc);
1600                         }
1601                 }
1602 next:
1603                 mgs_direntry_free(dirent);
1604         }
1605
1606         RETURN(rc);
1607 }
1608
1609 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1610                                     struct mgs_device *mgs,
1611                                     struct fs_db *fsdb,
1612                                     struct mgs_target_info *mti,
1613                                     int index, char *logname);
1614 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1615                                     struct mgs_device *mgs,
1616                                     struct fs_db *fsdb,
1617                                     struct mgs_target_info *mti,
1618                                     char *logname, char *suffix, char *lovname,
1619                                     enum lustre_sec_part sec_part, int flags);
1620 static int name_create_mdt_and_lov(char **logname, char **lovname,
1621                                    struct fs_db *fsdb, int i);
1622
1623 static int add_param(char *params, char *key, char *val)
1624 {
1625         char *start = params + strlen(params);
1626         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1627         int keylen = 0;
1628
1629         if (key != NULL)
1630                 keylen = strlen(key);
1631         if (start + 1 + keylen + strlen(val) >= end) {
1632                 CERROR("params are too long: %s %s%s\n",
1633                        params, key != NULL ? key : "", val);
1634                 return -EINVAL;
1635         }
1636
1637         sprintf(start, " %s%s", key != NULL ? key : "", val);
1638         return 0;
1639 }
1640
1641 /**
1642  * Walk through client config log record and convert the related records
1643  * into the target.
1644  **/
1645 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1646                                          struct llog_handle *llh,
1647                                          struct llog_rec_hdr *rec, void *data)
1648 {
1649         struct mgs_device *mgs;
1650         struct obd_device *obd;
1651         struct mgs_target_info *mti, *tmti;
1652         struct fs_db *fsdb;
1653         int cfg_len = rec->lrh_len;
1654         char *cfg_buf = (char*) (rec + 1);
1655         struct lustre_cfg *lcfg;
1656         int rc = 0;
1657         struct llog_handle *mdt_llh = NULL;
1658         static int got_an_osc_or_mdc = 0;
1659         /* 0: not found any osc/mdc;
1660            1: found osc;
1661            2: found mdc;
1662         */
1663         static int last_step = -1;
1664         int cplen = 0;
1665
1666         ENTRY;
1667
1668         mti = ((struct temp_comp*)data)->comp_mti;
1669         tmti = ((struct temp_comp*)data)->comp_tmti;
1670         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1671         obd = ((struct temp_comp *)data)->comp_obd;
1672         mgs = lu2mgs_dev(obd->obd_lu_dev);
1673         LASSERT(mgs);
1674
1675         if (rec->lrh_type != OBD_CFG_REC) {
1676                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1677                 RETURN(-EINVAL);
1678         }
1679
1680         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1681         if (rc) {
1682                 CERROR("Insane cfg\n");
1683                 RETURN(rc);
1684         }
1685
1686         lcfg = (struct lustre_cfg *)cfg_buf;
1687
1688         if (lcfg->lcfg_command == LCFG_MARKER) {
1689                 struct cfg_marker *marker;
1690                 marker = lustre_cfg_buf(lcfg, 1);
1691                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1692                     (marker->cm_flags & CM_START) &&
1693                      !(marker->cm_flags & CM_SKIP)) {
1694                         got_an_osc_or_mdc = 1;
1695                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1696                                         sizeof(tmti->mti_svname));
1697                         if (cplen >= sizeof(tmti->mti_svname))
1698                                 RETURN(-E2BIG);
1699                         rc = record_start_log(env, mgs, &mdt_llh,
1700                                               mti->mti_svname);
1701                         if (rc)
1702                                 RETURN(rc);
1703                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1704                                            mti->mti_svname, "add osc(copied)");
1705                         record_end_log(env, &mdt_llh);
1706                         last_step = marker->cm_step;
1707                         RETURN(rc);
1708                 }
1709                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1710                     (marker->cm_flags & CM_END) &&
1711                      !(marker->cm_flags & CM_SKIP)) {
1712                         LASSERT(last_step == marker->cm_step);
1713                         last_step = -1;
1714                         got_an_osc_or_mdc = 0;
1715                         memset(tmti, 0, sizeof(*tmti));
1716                         rc = record_start_log(env, mgs, &mdt_llh,
1717                                               mti->mti_svname);
1718                         if (rc)
1719                                 RETURN(rc);
1720                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1721                                            mti->mti_svname, "add osc(copied)");
1722                         record_end_log(env, &mdt_llh);
1723                         RETURN(rc);
1724                 }
1725                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1726                     (marker->cm_flags & CM_START) &&
1727                      !(marker->cm_flags & CM_SKIP)) {
1728                         got_an_osc_or_mdc = 2;
1729                         last_step = marker->cm_step;
1730                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1731                                strlen(marker->cm_tgtname));
1732
1733                         RETURN(rc);
1734                 }
1735                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1736                     (marker->cm_flags & CM_END) &&
1737                      !(marker->cm_flags & CM_SKIP)) {
1738                         LASSERT(last_step == marker->cm_step);
1739                         last_step = -1;
1740                         got_an_osc_or_mdc = 0;
1741                         memset(tmti, 0, sizeof(*tmti));
1742                         RETURN(rc);
1743                 }
1744         }
1745
1746         if (got_an_osc_or_mdc == 0 || last_step < 0)
1747                 RETURN(rc);
1748
1749         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1750                 uint64_t nodenid = lcfg->lcfg_nid;
1751
1752                 if (strlen(tmti->mti_uuid) == 0) {
1753                         /* target uuid not set, this config record is before
1754                          * LCFG_SETUP, this nid is one of target node nid.
1755                          */
1756                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1757                         tmti->mti_nid_count++;
1758                 } else {
1759                         /* failover node nid */
1760                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1761                                        libcfs_nid2str(nodenid));
1762                 }
1763
1764                 RETURN(rc);
1765         }
1766
1767         if (lcfg->lcfg_command == LCFG_SETUP) {
1768                 char *target;
1769
1770                 target = lustre_cfg_string(lcfg, 1);
1771                 memcpy(tmti->mti_uuid, target, strlen(target));
1772                 RETURN(rc);
1773         }
1774
1775         /* ignore client side sptlrpc_conf_log */
1776         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1777                 RETURN(rc);
1778
1779         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1780                 int index;
1781
1782                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1783                         RETURN (-EINVAL);
1784
1785                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1786                        strlen(mti->mti_fsname));
1787                 tmti->mti_stripe_index = index;
1788
1789                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1790                                               mti->mti_stripe_index,
1791                                               mti->mti_svname);
1792                 memset(tmti, 0, sizeof(*tmti));
1793                 RETURN(rc);
1794         }
1795
1796         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1797                 int index;
1798                 char mdt_index[9];
1799                 char *logname, *lovname;
1800
1801                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1802                                              mti->mti_stripe_index);
1803                 if (rc)
1804                         RETURN(rc);
1805                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1806
1807                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1808                         name_destroy(&logname);
1809                         name_destroy(&lovname);
1810                         RETURN(-EINVAL);
1811                 }
1812
1813                 tmti->mti_stripe_index = index;
1814                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1815                                          mdt_index, lovname,
1816                                          LUSTRE_SP_MDT, 0);
1817                 name_destroy(&logname);
1818                 name_destroy(&lovname);
1819                 RETURN(rc);
1820         }
1821         RETURN(rc);
1822 }
1823
1824 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1825 /* stealed from mgs_get_fsdb_from_llog*/
1826 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1827                                               struct mgs_device *mgs,
1828                                               char *client_name,
1829                                               struct temp_comp* comp)
1830 {
1831         struct llog_handle *loghandle;
1832         struct mgs_target_info *tmti;
1833         struct llog_ctxt *ctxt;
1834         int rc;
1835
1836         ENTRY;
1837
1838         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1839         LASSERT(ctxt != NULL);
1840
1841         OBD_ALLOC_PTR(tmti);
1842         if (tmti == NULL)
1843                 GOTO(out_ctxt, rc = -ENOMEM);
1844
1845         comp->comp_tmti = tmti;
1846         comp->comp_obd = mgs->mgs_obd;
1847
1848         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1849                        LLOG_OPEN_EXISTS);
1850         if (rc < 0) {
1851                 if (rc == -ENOENT)
1852                         rc = 0;
1853                 GOTO(out_pop, rc);
1854         }
1855
1856         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1857         if (rc)
1858                 GOTO(out_close, rc);
1859
1860         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1861                                   (void *)comp, NULL, false);
1862         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1863 out_close:
1864         llog_close(env, loghandle);
1865 out_pop:
1866         OBD_FREE_PTR(tmti);
1867 out_ctxt:
1868         llog_ctxt_put(ctxt);
1869         RETURN(rc);
1870 }
1871
1872 /* lmv is the second thing for client logs */
1873 /* copied from mgs_write_log_lov. Please refer to that.  */
1874 static int mgs_write_log_lmv(const struct lu_env *env,
1875                              struct mgs_device *mgs,
1876                              struct fs_db *fsdb,
1877                              struct mgs_target_info *mti,
1878                              char *logname, char *lmvname)
1879 {
1880         struct llog_handle *llh = NULL;
1881         struct lmv_desc *lmvdesc;
1882         char *uuid;
1883         int rc = 0;
1884         ENTRY;
1885
1886         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1887
1888         OBD_ALLOC_PTR(lmvdesc);
1889         if (lmvdesc == NULL)
1890                 RETURN(-ENOMEM);
1891         lmvdesc->ld_active_tgt_count = 0;
1892         lmvdesc->ld_tgt_count = 0;
1893         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1894         uuid = (char *)lmvdesc->ld_uuid.uuid;
1895
1896         rc = record_start_log(env, mgs, &llh, logname);
1897         if (rc)
1898                 GOTO(out_free, rc);
1899         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1900         if (rc)
1901                 GOTO(out_end, rc);
1902         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1903         if (rc)
1904                 GOTO(out_end, rc);
1905         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1906         if (rc)
1907                 GOTO(out_end, rc);
1908         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1909         if (rc)
1910                 GOTO(out_end, rc);
1911 out_end:
1912         record_end_log(env, &llh);
1913 out_free:
1914         OBD_FREE_PTR(lmvdesc);
1915         RETURN(rc);
1916 }
1917
1918 /* lov is the first thing in the mdt and client logs */
1919 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1920                              struct fs_db *fsdb, struct mgs_target_info *mti,
1921                              char *logname, char *lovname)
1922 {
1923         struct llog_handle *llh = NULL;
1924         struct lov_desc *lovdesc;
1925         char *uuid;
1926         int rc = 0;
1927         ENTRY;
1928
1929         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1930
1931         /*
1932         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1933         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1934               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1935         */
1936
1937         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1938         OBD_ALLOC_PTR(lovdesc);
1939         if (lovdesc == NULL)
1940                 RETURN(-ENOMEM);
1941         lovdesc->ld_magic = LOV_DESC_MAGIC;
1942         lovdesc->ld_tgt_count = 0;
1943         /* Defaults.  Can be changed later by lcfg config_param */
1944         lovdesc->ld_default_stripe_count = 1;
1945         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1946         lovdesc->ld_default_stripe_size = 1024 * 1024;
1947         lovdesc->ld_default_stripe_offset = -1;
1948         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1949         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1950         /* can these be the same? */
1951         uuid = (char *)lovdesc->ld_uuid.uuid;
1952
1953         /* This should always be the first entry in a log.
1954         rc = mgs_clear_log(obd, logname); */
1955         rc = record_start_log(env, mgs, &llh, logname);
1956         if (rc)
1957                 GOTO(out_free, rc);
1958         /* FIXME these should be a single journal transaction */
1959         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1960         if (rc)
1961                 GOTO(out_end, rc);
1962         rc = record_attach(env, llh, lovname, "lov", uuid);
1963         if (rc)
1964                 GOTO(out_end, rc);
1965         rc = record_lov_setup(env, llh, lovname, lovdesc);
1966         if (rc)
1967                 GOTO(out_end, rc);
1968         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1969         if (rc)
1970                 GOTO(out_end, rc);
1971         EXIT;
1972 out_end:
1973         record_end_log(env, &llh);
1974 out_free:
1975         OBD_FREE_PTR(lovdesc);
1976         return rc;
1977 }
1978
1979 /* add failnids to open log */
1980 static int mgs_write_log_failnids(const struct lu_env *env,
1981                                   struct mgs_target_info *mti,
1982                                   struct llog_handle *llh,
1983                                   char *cliname)
1984 {
1985         char *failnodeuuid = NULL;
1986         char *ptr = mti->mti_params;
1987         lnet_nid_t nid;
1988         int rc = 0;
1989
1990         /*
1991         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1992         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1993         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1994         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1995         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1996         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1997         */
1998
1999         /* Pull failnid info out of params string */
2000         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2001                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2002                         if (failnodeuuid == NULL) {
2003                                 /* We don't know the failover node name,
2004                                    so just use the first nid as the uuid */
2005                                 rc = name_create(&failnodeuuid,
2006                                                  libcfs_nid2str(nid), "");
2007                                 if (rc)
2008                                         return rc;
2009                         }
2010                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2011                                "client %s\n", libcfs_nid2str(nid),
2012                                failnodeuuid, cliname);
2013                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2014                 }
2015                 if (failnodeuuid)
2016                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2017         }
2018
2019         name_destroy(&failnodeuuid);
2020         return rc;
2021 }
2022
2023 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2024                                     struct mgs_device *mgs,
2025                                     struct fs_db *fsdb,
2026                                     struct mgs_target_info *mti,
2027                                     char *logname, char *lmvname)
2028 {
2029         struct llog_handle *llh = NULL;
2030         char *mdcname = NULL;
2031         char *nodeuuid = NULL;
2032         char *mdcuuid = NULL;
2033         char *lmvuuid = NULL;
2034         char index[6];
2035         int i, rc;
2036         ENTRY;
2037
2038         if (mgs_log_is_empty(env, mgs, logname)) {
2039                 CERROR("log is empty! Logical error\n");
2040                 RETURN(-EINVAL);
2041         }
2042
2043         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2044                mti->mti_svname, logname, lmvname);
2045
2046         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2047         if (rc)
2048                 RETURN(rc);
2049         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2050         if (rc)
2051                 GOTO(out_free, rc);
2052         rc = name_create(&mdcuuid, mdcname, "_UUID");
2053         if (rc)
2054                 GOTO(out_free, rc);
2055         rc = name_create(&lmvuuid, lmvname, "_UUID");
2056         if (rc)
2057                 GOTO(out_free, rc);
2058
2059         rc = record_start_log(env, mgs, &llh, logname);
2060         if (rc)
2061                 GOTO(out_free, rc);
2062         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2063                            "add mdc");
2064         if (rc)
2065                 GOTO(out_end, rc);
2066         for (i = 0; i < mti->mti_nid_count; i++) {
2067                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2068                        libcfs_nid2str(mti->mti_nids[i]));
2069
2070                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2071                 if (rc)
2072                         GOTO(out_end, rc);
2073         }
2074
2075         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2076         if (rc)
2077                 GOTO(out_end, rc);
2078         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
2079         if (rc)
2080                 GOTO(out_end, rc);
2081         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2082         if (rc)
2083                 GOTO(out_end, rc);
2084         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2085         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2086                             index, "1");
2087         if (rc)
2088                 GOTO(out_end, rc);
2089         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2090                            "add mdc");
2091         if (rc)
2092                 GOTO(out_end, rc);
2093 out_end:
2094         record_end_log(env, &llh);
2095 out_free:
2096         name_destroy(&lmvuuid);
2097         name_destroy(&mdcuuid);
2098         name_destroy(&mdcname);
2099         name_destroy(&nodeuuid);
2100         RETURN(rc);
2101 }
2102
2103 static inline int name_create_lov(char **lovname, char *mdtname,
2104                                   struct fs_db *fsdb, int index)
2105 {
2106         /* COMPAT_180 */
2107         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2108                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2109         else
2110                 return name_create(lovname, mdtname, "-mdtlov");
2111 }
2112
2113 static int name_create_mdt_and_lov(char **logname, char **lovname,
2114                                    struct fs_db *fsdb, int i)
2115 {
2116         int rc;
2117
2118         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2119         if (rc)
2120                 return rc;
2121         /* COMPAT_180 */
2122         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2123                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2124         else
2125                 rc = name_create(lovname, *logname, "-mdtlov");
2126         if (rc) {
2127                 name_destroy(logname);
2128                 *logname = NULL;
2129         }
2130         return rc;
2131 }
2132
2133 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2134                                       struct fs_db *fsdb, int i)
2135 {
2136         char suffix[16];
2137
2138         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2139                 sprintf(suffix, "-osc");
2140         else
2141                 sprintf(suffix, "-osc-MDT%04x", i);
2142         return name_create(oscname, ostname, suffix);
2143 }
2144
2145 /* add new mdc to already existent MDS */
2146 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2147                                     struct mgs_device *mgs,
2148                                     struct fs_db *fsdb,
2149                                     struct mgs_target_info *mti,
2150                                     int mdt_index, char *logname)
2151 {
2152         struct llog_handle      *llh = NULL;
2153         char    *nodeuuid = NULL;
2154         char    *ospname = NULL;
2155         char    *lovuuid = NULL;
2156         char    *mdtuuid = NULL;
2157         char    *svname = NULL;
2158         char    *mdtname = NULL;
2159         char    *lovname = NULL;
2160         char    index_str[16];
2161         int     i, rc;
2162
2163         ENTRY;
2164         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2165                 CERROR("log is empty! Logical error\n");
2166                 RETURN (-EINVAL);
2167         }
2168
2169         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2170                logname);
2171
2172         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2173         if (rc)
2174                 RETURN(rc);
2175
2176         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2177         if (rc)
2178                 GOTO(out_destory, rc);
2179
2180         rc = name_create(&svname, mdtname, "-osp");
2181         if (rc)
2182                 GOTO(out_destory, rc);
2183
2184         sprintf(index_str, "-MDT%04x", mdt_index);
2185         rc = name_create(&ospname, svname, index_str);
2186         if (rc)
2187                 GOTO(out_destory, rc);
2188
2189         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2190         if (rc)
2191                 GOTO(out_destory, rc);
2192
2193         rc = name_create(&lovuuid, lovname, "_UUID");
2194         if (rc)
2195                 GOTO(out_destory, rc);
2196
2197         rc = name_create(&mdtuuid, mdtname, "_UUID");
2198         if (rc)
2199                 GOTO(out_destory, rc);
2200
2201         rc = record_start_log(env, mgs, &llh, logname);
2202         if (rc)
2203                 GOTO(out_destory, rc);
2204
2205         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2206                            "add osp");
2207         if (rc)
2208                 GOTO(out_destory, rc);
2209
2210         for (i = 0; i < mti->mti_nid_count; i++) {
2211                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2212                        libcfs_nid2str(mti->mti_nids[i]));
2213                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2214                 if (rc)
2215                         GOTO(out_end, rc);
2216         }
2217
2218         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2219         if (rc)
2220                 GOTO(out_end, rc);
2221
2222         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2223                           NULL, NULL);
2224         if (rc)
2225                 GOTO(out_end, rc);
2226
2227         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2228         if (rc)
2229                 GOTO(out_end, rc);
2230
2231         /* Add mdc(osp) to lod */
2232         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2233                  mti->mti_stripe_index);
2234         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2235                          index_str, "1", NULL);
2236         if (rc)
2237                 GOTO(out_end, rc);
2238
2239         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2240         if (rc)
2241                 GOTO(out_end, rc);
2242
2243 out_end:
2244         record_end_log(env, &llh);
2245
2246 out_destory:
2247         name_destroy(&mdtuuid);
2248         name_destroy(&lovuuid);
2249         name_destroy(&lovname);
2250         name_destroy(&ospname);
2251         name_destroy(&svname);
2252         name_destroy(&nodeuuid);
2253         name_destroy(&mdtname);
2254         RETURN(rc);
2255 }
2256
2257 static int mgs_write_log_mdt0(const struct lu_env *env,
2258                               struct mgs_device *mgs,
2259                               struct fs_db *fsdb,
2260                               struct mgs_target_info *mti)
2261 {
2262         char *log = mti->mti_svname;
2263         struct llog_handle *llh = NULL;
2264         char *uuid, *lovname;
2265         char mdt_index[6];
2266         char *ptr = mti->mti_params;
2267         int rc = 0, failout = 0;
2268         ENTRY;
2269
2270         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2271         if (uuid == NULL)
2272                 RETURN(-ENOMEM);
2273
2274         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2275                 failout = (strncmp(ptr, "failout", 7) == 0);
2276
2277         rc = name_create(&lovname, log, "-mdtlov");
2278         if (rc)
2279                 GOTO(out_free, rc);
2280         if (mgs_log_is_empty(env, mgs, log)) {
2281                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2282                 if (rc)
2283                         GOTO(out_lod, rc);
2284         }
2285
2286         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2287
2288         rc = record_start_log(env, mgs, &llh, log);
2289         if (rc)
2290                 GOTO(out_lod, rc);
2291
2292         /* add MDT itself */
2293
2294         /* FIXME this whole fn should be a single journal transaction */
2295         sprintf(uuid, "%s_UUID", log);
2296         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2297         if (rc)
2298                 GOTO(out_lod, rc);
2299         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2300         if (rc)
2301                 GOTO(out_end, rc);
2302         rc = record_mount_opt(env, llh, log, lovname, NULL);
2303         if (rc)
2304                 GOTO(out_end, rc);
2305         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2306                         failout ? "n" : "f");
2307         if (rc)
2308                 GOTO(out_end, rc);
2309         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2310         if (rc)
2311                 GOTO(out_end, rc);
2312 out_end:
2313         record_end_log(env, &llh);
2314 out_lod:
2315         name_destroy(&lovname);
2316 out_free:
2317         OBD_FREE(uuid, sizeof(struct obd_uuid));
2318         RETURN(rc);
2319 }
2320
2321 /* envelope method for all layers log */
2322 static int mgs_write_log_mdt(const struct lu_env *env,
2323                              struct mgs_device *mgs,
2324                              struct fs_db *fsdb,
2325                              struct mgs_target_info *mti)
2326 {
2327         struct mgs_thread_info *mgi = mgs_env_info(env);
2328         struct llog_handle *llh = NULL;
2329         char *cliname;
2330         int rc, i = 0;
2331         ENTRY;
2332
2333         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2334
2335         if (mti->mti_uuid[0] == '\0') {
2336                 /* Make up our own uuid */
2337                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2338                          "%s_UUID", mti->mti_svname);
2339         }
2340
2341         /* add mdt */
2342         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2343         if (rc)
2344                 RETURN(rc);
2345         /* Append the mdt info to the client log */
2346         rc = name_create(&cliname, mti->mti_fsname, "-client");
2347         if (rc)
2348                 RETURN(rc);
2349
2350         if (mgs_log_is_empty(env, mgs, cliname)) {
2351                 /* Start client log */
2352                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2353                                        fsdb->fsdb_clilov);
2354                 if (rc)
2355                         GOTO(out_free, rc);
2356                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2357                                        fsdb->fsdb_clilmv);
2358                 if (rc)
2359                         GOTO(out_free, rc);
2360         }
2361
2362         /*
2363         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2364         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2365         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2366         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2367         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2368         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2369         */
2370
2371                 /* copy client info about lov/lmv */
2372                 mgi->mgi_comp.comp_mti = mti;
2373                 mgi->mgi_comp.comp_fsdb = fsdb;
2374
2375                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2376                                                         &mgi->mgi_comp);
2377                 if (rc)
2378                         GOTO(out_free, rc);
2379                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2380                                               fsdb->fsdb_clilmv);
2381                 if (rc)
2382                         GOTO(out_free, rc);
2383
2384                 /* add mountopts */
2385                 rc = record_start_log(env, mgs, &llh, cliname);
2386                 if (rc)
2387                         GOTO(out_free, rc);
2388
2389                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2390                                    "mount opts");
2391                 if (rc)
2392                         GOTO(out_end, rc);
2393                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2394                                       fsdb->fsdb_clilmv);
2395                 if (rc)
2396                         GOTO(out_end, rc);
2397                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2398                                    "mount opts");
2399
2400         if (rc)
2401                 GOTO(out_end, rc);
2402
2403         /* for_all_existing_mdt except current one */
2404         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2405                 if (i !=  mti->mti_stripe_index &&
2406                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2407                         char *logname;
2408
2409                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2410                         if (rc)
2411                                 GOTO(out_end, rc);
2412
2413                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2414                                                       i, logname);
2415                         name_destroy(&logname);
2416                         if (rc)
2417                                 GOTO(out_end, rc);
2418                 }
2419         }
2420 out_end:
2421         record_end_log(env, &llh);
2422 out_free:
2423         name_destroy(&cliname);
2424         RETURN(rc);
2425 }
2426
2427 /* Add the ost info to the client/mdt lov */
2428 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2429                                     struct mgs_device *mgs, struct fs_db *fsdb,
2430                                     struct mgs_target_info *mti,
2431                                     char *logname, char *suffix, char *lovname,
2432                                     enum lustre_sec_part sec_part, int flags)
2433 {
2434         struct llog_handle *llh = NULL;
2435         char *nodeuuid = NULL;
2436         char *oscname = NULL;
2437         char *oscuuid = NULL;
2438         char *lovuuid = NULL;
2439         char *svname = NULL;
2440         char index[6];
2441         int i, rc;
2442
2443         ENTRY;
2444         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2445                mti->mti_svname, logname);
2446
2447         if (mgs_log_is_empty(env, mgs, logname)) {
2448                 CERROR("log is empty! Logical error\n");
2449                 RETURN (-EINVAL);
2450         }
2451
2452         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2453         if (rc)
2454                 RETURN(rc);
2455         rc = name_create(&svname, mti->mti_svname, "-osc");
2456         if (rc)
2457                 GOTO(out_free, rc);
2458
2459         /* for the system upgraded from old 1.8, keep using the old osc naming
2460          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2461         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2462                 rc = name_create(&oscname, svname, "");
2463         else
2464                 rc = name_create(&oscname, svname, suffix);
2465         if (rc)
2466                 GOTO(out_free, rc);
2467
2468         rc = name_create(&oscuuid, oscname, "_UUID");
2469         if (rc)
2470                 GOTO(out_free, rc);
2471         rc = name_create(&lovuuid, lovname, "_UUID");
2472         if (rc)
2473                 GOTO(out_free, rc);
2474
2475
2476         /*
2477         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2478         multihomed (#4)
2479         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2480         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2481         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2482         failover (#6,7)
2483         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2484         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2485         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2486         */
2487
2488         rc = record_start_log(env, mgs, &llh, logname);
2489         if (rc)
2490                 GOTO(out_free, rc);
2491
2492         /* FIXME these should be a single journal transaction */
2493         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2494                            "add osc");
2495         if (rc)
2496                 GOTO(out_end, rc);
2497
2498         /* NB: don't change record order, because upon MDT steal OSC config
2499          * from client, it treats all nids before LCFG_SETUP as target nids
2500          * (multiple interfaces), while nids after as failover node nids.
2501          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2502          */
2503         for (i = 0; i < mti->mti_nid_count; i++) {
2504                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2505                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2506                 if (rc)
2507                         GOTO(out_end, rc);
2508         }
2509         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2510         if (rc)
2511                 GOTO(out_end, rc);
2512         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2513         if (rc)
2514                 GOTO(out_end, rc);
2515         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2516         if (rc)
2517                 GOTO(out_end, rc);
2518
2519         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2520
2521         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2522         if (rc)
2523                 GOTO(out_end, rc);
2524         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2525                            "add osc");
2526         if (rc)
2527                 GOTO(out_end, rc);
2528 out_end:
2529         record_end_log(env, &llh);
2530 out_free:
2531         name_destroy(&lovuuid);
2532         name_destroy(&oscuuid);
2533         name_destroy(&oscname);
2534         name_destroy(&svname);
2535         name_destroy(&nodeuuid);
2536         RETURN(rc);
2537 }
2538
2539 static int mgs_write_log_ost(const struct lu_env *env,
2540                              struct mgs_device *mgs, struct fs_db *fsdb,
2541                              struct mgs_target_info *mti)
2542 {
2543         struct llog_handle *llh = NULL;
2544         char *logname, *lovname;
2545         char *ptr = mti->mti_params;
2546         int rc, flags = 0, failout = 0, i;
2547         ENTRY;
2548
2549         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2550
2551         /* The ost startup log */
2552
2553         /* If the ost log already exists, that means that someone reformatted
2554            the ost and it called target_add again. */
2555         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2556                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2557                                    "exists, yet the server claims it never "
2558                                    "registered. It may have been reformatted, "
2559                                    "or the index changed. writeconf the MDT to "
2560                                    "regenerate all logs.\n", mti->mti_svname);
2561                 RETURN(-EALREADY);
2562         }
2563
2564         /*
2565         attach obdfilter ost1 ost1_UUID
2566         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2567         */
2568         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2569                 failout = (strncmp(ptr, "failout", 7) == 0);
2570         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2571         if (rc)
2572                 RETURN(rc);
2573         /* FIXME these should be a single journal transaction */
2574         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2575         if (rc)
2576                 GOTO(out_end, rc);
2577         if (*mti->mti_uuid == '\0')
2578                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2579                          "%s_UUID", mti->mti_svname);
2580         rc = record_attach(env, llh, mti->mti_svname,
2581                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2582         if (rc)
2583                 GOTO(out_end, rc);
2584         rc = record_setup(env, llh, mti->mti_svname,
2585                           "dev"/*ignored*/, "type"/*ignored*/,
2586                           failout ? "n" : "f", 0/*options*/);
2587         if (rc)
2588                 GOTO(out_end, rc);
2589         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2590         if (rc)
2591                 GOTO(out_end, rc);
2592 out_end:
2593         record_end_log(env, &llh);
2594         if (rc)
2595                 RETURN(rc);
2596         /* We also have to update the other logs where this osc is part of
2597            the lov */
2598
2599         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2600                 /* If we're upgrading, the old mdt log already has our
2601                    entry. Let's do a fake one for fun. */
2602                 /* Note that we can't add any new failnids, since we don't
2603                    know the old osc names. */
2604                 flags = CM_SKIP | CM_UPGRADE146;
2605
2606         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2607                 /* If the update flag isn't set, don't update client/mdt
2608                    logs. */
2609                 flags |= CM_SKIP;
2610                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2611                               "the MDT first to regenerate it.\n",
2612                               mti->mti_svname);
2613         }
2614
2615         /* Add ost to all MDT lov defs */
2616         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2617                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2618                         char mdt_index[9];
2619
2620                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2621                                                      i);
2622                         if (rc)
2623                                 RETURN(rc);
2624                         sprintf(mdt_index, "-MDT%04x", i);
2625                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2626                                                       logname, mdt_index,
2627                                                       lovname, LUSTRE_SP_MDT,
2628                                                       flags);
2629                         name_destroy(&logname);
2630                         name_destroy(&lovname);
2631                         if (rc)
2632                                 RETURN(rc);
2633                 }
2634         }
2635
2636         /* Append ost info to the client log */
2637         rc = name_create(&logname, mti->mti_fsname, "-client");
2638         if (rc)
2639                 RETURN(rc);
2640         if (mgs_log_is_empty(env, mgs, logname)) {
2641                 /* Start client log */
2642                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2643                                        fsdb->fsdb_clilov);
2644                 if (rc)
2645                         GOTO(out_free, rc);
2646                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2647                                        fsdb->fsdb_clilmv);
2648                 if (rc)
2649                         GOTO(out_free, rc);
2650         }
2651         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2652                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2653 out_free:
2654         name_destroy(&logname);
2655         RETURN(rc);
2656 }
2657
2658 static __inline__ int mgs_param_empty(char *ptr)
2659 {
2660         char *tmp;
2661
2662         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2663                 return 1;
2664         return 0;
2665 }
2666
2667 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2668                                           struct mgs_device *mgs,
2669                                           struct fs_db *fsdb,
2670                                           struct mgs_target_info *mti,
2671                                           char *logname, char *cliname)
2672 {
2673         int rc;
2674         struct llog_handle *llh = NULL;
2675
2676         if (mgs_param_empty(mti->mti_params)) {
2677                 /* Remove _all_ failnids */
2678                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2679                                 mti->mti_svname, "add failnid", CM_SKIP);
2680                 return rc < 0 ? rc : 0;
2681         }
2682
2683         /* Otherwise failover nids are additive */
2684         rc = record_start_log(env, mgs, &llh, logname);
2685         if (rc)
2686                 return rc;
2687                 /* FIXME this should be a single journal transaction */
2688         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2689                            "add failnid");
2690         if (rc)
2691                 goto out_end;
2692         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2693         if (rc)
2694                 goto out_end;
2695         rc = record_marker(env, llh, fsdb, CM_END,
2696                            mti->mti_svname, "add failnid");
2697 out_end:
2698         record_end_log(env, &llh);
2699         return rc;
2700 }
2701
2702
2703 /* Add additional failnids to an existing log.
2704    The mdc/osc must have been added to logs first */
2705 /* tcp nids must be in dotted-quad ascii -
2706    we can't resolve hostnames from the kernel. */
2707 static int mgs_write_log_add_failnid(const struct lu_env *env,
2708                                      struct mgs_device *mgs,
2709                                      struct fs_db *fsdb,
2710                                      struct mgs_target_info *mti)
2711 {
2712         char *logname, *cliname;
2713         int rc;
2714         ENTRY;
2715
2716         /* FIXME we currently can't erase the failnids
2717          * given when a target first registers, since they aren't part of
2718          * an "add uuid" stanza */
2719
2720         /* Verify that we know about this target */
2721         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2722                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2723                                    "yet. It must be started before failnids "
2724                                    "can be added.\n", mti->mti_svname);
2725                 RETURN(-ENOENT);
2726         }
2727
2728         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2729         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2730                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2731         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2732                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2733         } else {
2734                 RETURN(-EINVAL);
2735         }
2736         if (rc)
2737                 RETURN(rc);
2738         /* Add failover nids to the client log */
2739         rc = name_create(&logname, mti->mti_fsname, "-client");
2740         if (rc) {
2741                 name_destroy(&cliname);
2742                 RETURN(rc);
2743         }
2744         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2745         name_destroy(&logname);
2746         name_destroy(&cliname);
2747         if (rc)
2748                 RETURN(rc);
2749
2750         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2751                 /* Add OST failover nids to the MDT logs as well */
2752                 int i;
2753
2754                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2755                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2756                                 continue;
2757                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2758                         if (rc)
2759                                 RETURN(rc);
2760                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2761                                                  fsdb, i);
2762                         if (rc) {
2763                                 name_destroy(&logname);
2764                                 RETURN(rc);
2765                         }
2766                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2767                                                             mti, logname,
2768                                                             cliname);
2769                         name_destroy(&cliname);
2770                         name_destroy(&logname);
2771                         if (rc)
2772                                 RETURN(rc);
2773                 }
2774         }
2775
2776         RETURN(rc);
2777 }
2778
2779 static int mgs_wlp_lcfg(const struct lu_env *env,
2780                         struct mgs_device *mgs, struct fs_db *fsdb,
2781                         struct mgs_target_info *mti,
2782                         char *logname, struct lustre_cfg_bufs *bufs,
2783                         char *tgtname, char *ptr)
2784 {
2785         char comment[MTI_NAME_MAXLEN];
2786         char *tmp;
2787         struct lustre_cfg *lcfg;
2788         int rc, del;
2789
2790         /* Erase any old settings of this same parameter */
2791         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2792         comment[MTI_NAME_MAXLEN - 1] = 0;
2793         /* But don't try to match the value. */
2794         if ((tmp = strchr(comment, '=')))
2795             *tmp = 0;
2796         /* FIXME we should skip settings that are the same as old values */
2797         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2798         if (rc < 0)
2799                 return rc;
2800         del = mgs_param_empty(ptr);
2801
2802         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2803                       "Sett" : "Modify", tgtname, comment, logname);
2804         if (del)
2805                 return rc;
2806
2807         lustre_cfg_bufs_reset(bufs, tgtname);
2808         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2809         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2810         if (!lcfg)
2811                 return -ENOMEM;
2812         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2813         lustre_cfg_free(lcfg);
2814         return rc;
2815 }
2816
2817 /* write global variable settings into log */
2818 static int mgs_write_log_sys(const struct lu_env *env,
2819                              struct mgs_device *mgs, struct fs_db *fsdb,
2820                              struct mgs_target_info *mti, char *sys, char *ptr)
2821 {
2822         struct mgs_thread_info *mgi = mgs_env_info(env);
2823         struct lustre_cfg *lcfg;
2824         char *tmp, sep;
2825         int rc, cmd, convert = 1;
2826
2827         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2828                 cmd = LCFG_SET_TIMEOUT;
2829         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2830                 cmd = LCFG_SET_LDLM_TIMEOUT;
2831         /* Check for known params here so we can return error to lctl */
2832         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2833                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2834                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2835                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2836                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2837                 cmd = LCFG_PARAM;
2838         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2839                 convert = 0; /* Don't convert string value to integer */
2840                 cmd = LCFG_PARAM;
2841         } else {
2842                 return -EINVAL;
2843         }
2844
2845         if (mgs_param_empty(ptr))
2846                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2847         else
2848                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2849
2850         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2851         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2852         if (!convert && *tmp != '\0')
2853                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2854         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2855         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2856         /* truncate the comment to the parameter name */
2857         ptr = tmp - 1;
2858         sep = *ptr;
2859         *ptr = '\0';
2860         /* modify all servers and clients */
2861         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2862                                       *tmp == '\0' ? NULL : lcfg,
2863                                       mti->mti_fsname, sys, 0);
2864         if (rc == 0 && *tmp != '\0') {
2865                 switch (cmd) {
2866                 case LCFG_SET_TIMEOUT:
2867                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2868                                 class_process_config(lcfg);
2869                         break;
2870                 case LCFG_SET_LDLM_TIMEOUT:
2871                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2872                                 class_process_config(lcfg);
2873                         break;
2874                 default:
2875                         break;
2876                 }
2877         }
2878         *ptr = sep;
2879         lustre_cfg_free(lcfg);
2880         return rc;
2881 }
2882
2883 /* write quota settings into log */
2884 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2885                                struct fs_db *fsdb, struct mgs_target_info *mti,
2886                                char *quota, char *ptr)
2887 {
2888         struct mgs_thread_info  *mgi = mgs_env_info(env);
2889         struct lustre_cfg       *lcfg;
2890         char                    *tmp;
2891         char                     sep;
2892         int                      rc, cmd = LCFG_PARAM;
2893
2894         /* support only 'meta' and 'data' pools so far */
2895         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2896             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2897                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2898                        "& quota.ost are)\n", ptr);
2899                 return -EINVAL;
2900         }
2901
2902         if (*tmp == '\0') {
2903                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2904         } else {
2905                 CDEBUG(D_MGS, "global '%s'\n", quota);
2906
2907                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2908                     strcmp(tmp, "none") != 0) {
2909                         CERROR("enable option(%s) isn't supported\n", tmp);
2910                         return -EINVAL;
2911                 }
2912         }
2913
2914         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2915         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2916         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2917         /* truncate the comment to the parameter name */
2918         ptr = tmp - 1;
2919         sep = *ptr;
2920         *ptr = '\0';
2921
2922         /* XXX we duplicated quota enable information in all server
2923          *     config logs, it should be moved to a separate config
2924          *     log once we cleanup the config log for global param. */
2925         /* modify all servers */
2926         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2927                                       *tmp == '\0' ? NULL : lcfg,
2928                                       mti->mti_fsname, quota, 1);
2929         *ptr = sep;
2930         lustre_cfg_free(lcfg);
2931         return rc < 0 ? rc : 0;
2932 }
2933
2934 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2935                                    struct mgs_device *mgs,
2936                                    struct fs_db *fsdb,
2937                                    struct mgs_target_info *mti,
2938                                    char *param)
2939 {
2940         struct mgs_thread_info *mgi = mgs_env_info(env);
2941         struct llog_handle     *llh = NULL;
2942         char                   *logname;
2943         char                   *comment, *ptr;
2944         struct lustre_cfg      *lcfg;
2945         int                     rc, len;
2946         ENTRY;
2947
2948         /* get comment */
2949         ptr = strchr(param, '=');
2950         LASSERT(ptr);
2951         len = ptr - param;
2952
2953         OBD_ALLOC(comment, len + 1);
2954         if (comment == NULL)
2955                 RETURN(-ENOMEM);
2956         strncpy(comment, param, len);
2957         comment[len] = '\0';
2958
2959         /* prepare lcfg */
2960         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2961         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2962         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2963         if (lcfg == NULL)
2964                 GOTO(out_comment, rc = -ENOMEM);
2965
2966         /* construct log name */
2967         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2968         if (rc)
2969                 GOTO(out_lcfg, rc);
2970
2971         if (mgs_log_is_empty(env, mgs, logname)) {
2972                 rc = record_start_log(env, mgs, &llh, logname);
2973                 if (rc)
2974                         GOTO(out, rc);
2975                 record_end_log(env, &llh);
2976         }
2977
2978         /* obsolete old one */
2979         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2980                         comment, CM_SKIP);
2981         if (rc < 0)
2982                 GOTO(out, rc);
2983         /* write the new one */
2984         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2985                                   mti->mti_svname, comment);
2986         if (rc)
2987                 CERROR("err %d writing log %s\n", rc, logname);
2988 out:
2989         name_destroy(&logname);
2990 out_lcfg:
2991         lustre_cfg_free(lcfg);
2992 out_comment:
2993         OBD_FREE(comment, len + 1);
2994         RETURN(rc);
2995 }
2996
2997 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2998                                         char *param)
2999 {
3000         char    *ptr;
3001
3002         /* disable the adjustable udesc parameter for now, i.e. use default
3003          * setting that client always ship udesc to MDT if possible. to enable
3004          * it simply remove the following line */
3005         goto error_out;
3006
3007         ptr = strchr(param, '=');
3008         if (ptr == NULL)
3009                 goto error_out;
3010         *ptr++ = '\0';
3011
3012         if (strcmp(param, PARAM_SRPC_UDESC))
3013                 goto error_out;
3014
3015         if (strcmp(ptr, "yes") == 0) {
3016                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3017                 CWARN("Enable user descriptor shipping from client to MDT\n");
3018         } else if (strcmp(ptr, "no") == 0) {
3019                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3020                 CWARN("Disable user descriptor shipping from client to MDT\n");
3021         } else {
3022                 *(ptr - 1) = '=';
3023                 goto error_out;
3024         }
3025         return 0;
3026
3027 error_out:
3028         CERROR("Invalid param: %s\n", param);
3029         return -EINVAL;
3030 }
3031
3032 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3033                                   const char *svname,
3034                                   char *param)
3035 {
3036         struct sptlrpc_rule      rule;
3037         struct sptlrpc_rule_set *rset;
3038         int                      rc;
3039         ENTRY;
3040
3041         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3042                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3043                 RETURN(-EINVAL);
3044         }
3045
3046         if (strncmp(param, PARAM_SRPC_UDESC,
3047                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3048                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3049         }
3050
3051         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3052                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3053                 RETURN(-EINVAL);
3054         }
3055
3056         param += sizeof(PARAM_SRPC_FLVR) - 1;
3057
3058         rc = sptlrpc_parse_rule(param, &rule);
3059         if (rc)
3060                 RETURN(rc);
3061
3062         /* mgs rules implies must be mgc->mgs */
3063         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3064                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3065                      rule.sr_from != LUSTRE_SP_ANY) ||
3066                     (rule.sr_to != LUSTRE_SP_MGS &&
3067                      rule.sr_to != LUSTRE_SP_ANY))
3068                         RETURN(-EINVAL);
3069         }
3070
3071         /* preapre room for this coming rule. svcname format should be:
3072          * - fsname: general rule
3073          * - fsname-tgtname: target-specific rule
3074          */
3075         if (strchr(svname, '-')) {
3076                 struct mgs_tgt_srpc_conf *tgtconf;
3077                 int                       found = 0;
3078
3079                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3080                      tgtconf = tgtconf->mtsc_next) {
3081                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3082                                 found = 1;
3083                                 break;
3084                         }
3085                 }
3086
3087                 if (!found) {
3088                         int name_len;
3089
3090                         OBD_ALLOC_PTR(tgtconf);
3091                         if (tgtconf == NULL)
3092                                 RETURN(-ENOMEM);
3093
3094                         name_len = strlen(svname);
3095
3096                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3097                         if (tgtconf->mtsc_tgt == NULL) {
3098                                 OBD_FREE_PTR(tgtconf);
3099                                 RETURN(-ENOMEM);
3100                         }
3101                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3102
3103                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3104                         fsdb->fsdb_srpc_tgt = tgtconf;
3105                 }
3106
3107                 rset = &tgtconf->mtsc_rset;
3108         } else {
3109                 rset = &fsdb->fsdb_srpc_gen;
3110         }
3111
3112         rc = sptlrpc_rule_set_merge(rset, &rule);
3113
3114         RETURN(rc);
3115 }
3116
3117 static int mgs_srpc_set_param(const struct lu_env *env,
3118                               struct mgs_device *mgs,
3119                               struct fs_db *fsdb,
3120                               struct mgs_target_info *mti,
3121                               char *param)
3122 {
3123         char                   *copy;
3124         int                     rc, copy_size;
3125         ENTRY;
3126
3127 #ifndef HAVE_GSS
3128         RETURN(-EINVAL);
3129 #endif
3130         /* keep a copy of original param, which could be destroied
3131          * during parsing */
3132         copy_size = strlen(param) + 1;
3133         OBD_ALLOC(copy, copy_size);
3134         if (copy == NULL)
3135                 return -ENOMEM;
3136         memcpy(copy, param, copy_size);
3137
3138         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3139         if (rc)
3140                 goto out_free;
3141
3142         /* previous steps guaranteed the syntax is correct */
3143         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3144         if (rc)
3145                 goto out_free;
3146
3147         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3148                 /*
3149                  * for mgs rules, make them effective immediately.
3150                  */
3151                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3152                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3153                                                  &fsdb->fsdb_srpc_gen);
3154         }
3155
3156 out_free:
3157         OBD_FREE(copy, copy_size);
3158         RETURN(rc);
3159 }
3160
3161 struct mgs_srpc_read_data {
3162         struct fs_db   *msrd_fsdb;
3163         int             msrd_skip;
3164 };
3165
3166 static int mgs_srpc_read_handler(const struct lu_env *env,
3167                                  struct llog_handle *llh,
3168                                  struct llog_rec_hdr *rec, void *data)
3169 {
3170         struct mgs_srpc_read_data *msrd = data;
3171         struct cfg_marker         *marker;
3172         struct lustre_cfg         *lcfg = REC_DATA(rec);
3173         char                      *svname, *param;
3174         int                        cfg_len, rc;
3175         ENTRY;
3176
3177         if (rec->lrh_type != OBD_CFG_REC) {
3178                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3179                 RETURN(-EINVAL);
3180         }
3181
3182         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3183                   sizeof(struct llog_rec_tail);
3184
3185         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3186         if (rc) {
3187                 CERROR("Insane cfg\n");
3188                 RETURN(rc);
3189         }
3190
3191         if (lcfg->lcfg_command == LCFG_MARKER) {
3192                 marker = lustre_cfg_buf(lcfg, 1);
3193
3194                 if (marker->cm_flags & CM_START &&
3195                     marker->cm_flags & CM_SKIP)
3196                         msrd->msrd_skip = 1;
3197                 if (marker->cm_flags & CM_END)
3198                         msrd->msrd_skip = 0;
3199
3200                 RETURN(0);
3201         }
3202
3203         if (msrd->msrd_skip)
3204                 RETURN(0);
3205
3206         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3207                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3208                 RETURN(0);
3209         }
3210
3211         svname = lustre_cfg_string(lcfg, 0);
3212         if (svname == NULL) {
3213                 CERROR("svname is empty\n");
3214                 RETURN(0);
3215         }
3216
3217         param = lustre_cfg_string(lcfg, 1);
3218         if (param == NULL) {
3219                 CERROR("param is empty\n");
3220                 RETURN(0);
3221         }
3222
3223         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3224         if (rc)
3225                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3226
3227         RETURN(0);
3228 }
3229
3230 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3231                                 struct mgs_device *mgs,
3232                                 struct fs_db *fsdb)
3233 {
3234         struct llog_handle        *llh = NULL;
3235         struct llog_ctxt          *ctxt;
3236         char                      *logname;
3237         struct mgs_srpc_read_data  msrd;
3238         int                        rc;
3239         ENTRY;
3240
3241         /* construct log name */
3242         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3243         if (rc)
3244                 RETURN(rc);
3245
3246         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3247         LASSERT(ctxt != NULL);
3248
3249         if (mgs_log_is_empty(env, mgs, logname))
3250                 GOTO(out, rc = 0);
3251
3252         rc = llog_open(env, ctxt, &llh, NULL, logname,
3253                        LLOG_OPEN_EXISTS);
3254         if (rc < 0) {
3255                 if (rc == -ENOENT)
3256                         rc = 0;
3257                 GOTO(out, rc);
3258         }
3259
3260         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3261         if (rc)
3262                 GOTO(out_close, rc);
3263
3264         if (llog_get_size(llh) <= 1)
3265                 GOTO(out_close, rc = 0);
3266
3267         msrd.msrd_fsdb = fsdb;
3268         msrd.msrd_skip = 0;
3269
3270         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3271                           NULL);
3272
3273 out_close:
3274         llog_close(env, llh);
3275 out:
3276         llog_ctxt_put(ctxt);
3277         name_destroy(&logname);
3278
3279         if (rc)
3280                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3281         RETURN(rc);
3282 }
3283
3284 /* Permanent settings of all parameters by writing into the appropriate
3285  * configuration logs.
3286  * A parameter with null value ("<param>='\0'") means to erase it out of
3287  * the logs.
3288  */
3289 static int mgs_write_log_param(const struct lu_env *env,
3290                                struct mgs_device *mgs, struct fs_db *fsdb,
3291                                struct mgs_target_info *mti, char *ptr)
3292 {
3293         struct mgs_thread_info *mgi = mgs_env_info(env);
3294         char *logname;
3295         char *tmp;
3296         int rc = 0, rc2 = 0;
3297         ENTRY;
3298
3299         /* For various parameter settings, we have to figure out which logs
3300            care about them (e.g. both mdt and client for lov settings) */
3301         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3302
3303         /* The params are stored in MOUNT_DATA_FILE and modified via
3304            tunefs.lustre, or set using lctl conf_param */
3305
3306         /* Processed in lustre_start_mgc */
3307         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3308                 GOTO(end, rc);
3309
3310         /* Processed in ost/mdt */
3311         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3312                 GOTO(end, rc);
3313
3314         /* Processed in mgs_write_log_ost */
3315         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3316                 if (mti->mti_flags & LDD_F_PARAM) {
3317                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3318                                            "changed with tunefs.lustre"
3319                                            "and --writeconf\n", ptr);
3320                         rc = -EPERM;
3321                 }
3322                 GOTO(end, rc);
3323         }
3324
3325         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3326                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3327                 GOTO(end, rc);
3328         }
3329
3330         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3331                 /* Add a failover nidlist */
3332                 rc = 0;
3333                 /* We already processed failovers params for new
3334                    targets in mgs_write_log_target */
3335                 if (mti->mti_flags & LDD_F_PARAM) {
3336                         CDEBUG(D_MGS, "Adding failnode\n");
3337                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3338                 }
3339                 GOTO(end, rc);
3340         }
3341
3342         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3343                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3344                 GOTO(end, rc);
3345         }
3346
3347         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3348                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3349                 GOTO(end, rc);
3350         }
3351
3352         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3353                 /* active=0 means off, anything else means on */
3354                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3355                 int i;
3356
3357                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3358                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3359                                            "be (de)activated.\n",
3360                                            mti->mti_svname);
3361                         GOTO(end, rc = -EINVAL);
3362                 }
3363                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3364                               flag ? "de": "re", mti->mti_svname);
3365                 /* Modify clilov */
3366                 rc = name_create(&logname, mti->mti_fsname, "-client");
3367                 if (rc)
3368                         GOTO(end, rc);
3369                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3370                                 mti->mti_svname, "add osc", flag);
3371                 name_destroy(&logname);
3372                 if (rc)
3373                         goto active_err;
3374                 /* Modify mdtlov */
3375                 /* Add to all MDT logs for CMD */
3376                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3377                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3378                                 continue;
3379                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3380                         if (rc)
3381                                 GOTO(end, rc);
3382                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3383                                         mti->mti_svname, "add osc", flag);
3384                         name_destroy(&logname);
3385                         if (rc)
3386                                 goto active_err;
3387                 }
3388         active_err:
3389                 if (rc) {
3390                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3391                                            "log (%d). No permanent "
3392                                            "changes were made to the "
3393                                            "config log.\n",
3394                                            mti->mti_svname, rc);
3395                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3396                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3397                                                    " because the log"
3398                                                    "is in the old 1.4"
3399                                                    "style. Consider "
3400                                                    " --writeconf to "
3401                                                    "update the logs.\n");
3402                         GOTO(end, rc);
3403                 }
3404                 /* Fall through to osc proc for deactivating live OSC
3405                    on running MDT / clients. */
3406         }
3407         /* Below here, let obd's XXX_process_config methods handle it */
3408
3409         /* All lov. in proc */
3410         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3411                 char *mdtlovname;
3412
3413                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3414                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3415                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3416                                            "set on the MDT, not %s. "
3417                                            "Ignoring.\n",
3418                                            mti->mti_svname);
3419                         GOTO(end, rc = 0);
3420                 }
3421
3422                 /* Modify mdtlov */
3423                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3424                         GOTO(end, rc = -ENODEV);
3425
3426                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3427                                              mti->mti_stripe_index);
3428                 if (rc)
3429                         GOTO(end, rc);
3430                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3431                                   &mgi->mgi_bufs, mdtlovname, ptr);
3432                 name_destroy(&logname);
3433                 name_destroy(&mdtlovname);
3434                 if (rc)
3435                         GOTO(end, rc);
3436
3437                 /* Modify clilov */
3438                 rc = name_create(&logname, mti->mti_fsname, "-client");
3439                 if (rc)
3440                         GOTO(end, rc);
3441                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3442                                   fsdb->fsdb_clilov, ptr);
3443                 name_destroy(&logname);
3444                 GOTO(end, rc);
3445         }
3446
3447         /* All osc., mdc., llite. params in proc */
3448         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3449             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3450             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3451                 char *cname;
3452
3453                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3454                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3455                                            " cannot be modified. Consider"
3456                                            " updating the configuration with"
3457                                            " --writeconf\n",
3458                                            mti->mti_svname);
3459                         GOTO(end, rc = -EINVAL);
3460                 }
3461                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3462                         rc = name_create(&cname, mti->mti_fsname, "-client");
3463                         /* Add the client type to match the obdname in
3464                            class_config_llog_handler */
3465                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3466                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3467                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3468                         rc = name_create(&cname, mti->mti_svname, "-osc");
3469                 } else {
3470                         GOTO(end, rc = -EINVAL);
3471                 }
3472                 if (rc)
3473                         GOTO(end, rc);
3474
3475                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3476
3477                 /* Modify client */
3478                 rc = name_create(&logname, mti->mti_fsname, "-client");
3479                 if (rc) {
3480                         name_destroy(&cname);
3481                         GOTO(end, rc);
3482                 }
3483                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3484                                   cname, ptr);
3485
3486                 /* osc params affect the MDT as well */
3487                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3488                         int i;
3489
3490                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3491                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3492                                         continue;
3493                                 name_destroy(&cname);
3494                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3495                                                          fsdb, i);
3496                                 name_destroy(&logname);
3497                                 if (rc)
3498                                         break;
3499                                 rc = name_create_mdt(&logname,
3500                                                      mti->mti_fsname, i);
3501                                 if (rc)
3502                                         break;
3503                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3504                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3505                                                           mti, logname,
3506                                                           &mgi->mgi_bufs,
3507                                                           cname, ptr);
3508                                         if (rc)
3509                                                 break;
3510                                 }
3511                         }
3512                 }
3513                 name_destroy(&logname);
3514                 name_destroy(&cname);
3515                 GOTO(end, rc);
3516         }
3517
3518         /* All mdt. params in proc */
3519         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3520                 int i;
3521                 __u32 idx;
3522
3523                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3524                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3525                             MTI_NAME_MAXLEN) == 0)
3526                         /* device is unspecified completely? */
3527                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3528                 else
3529                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3530                 if (rc < 0)
3531                         goto active_err;
3532                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3533                         goto active_err;
3534                 if (rc & LDD_F_SV_ALL) {
3535                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3536                                 if (!test_bit(i,
3537                                                   fsdb->fsdb_mdt_index_map))
3538                                         continue;
3539                                 rc = name_create_mdt(&logname,
3540                                                 mti->mti_fsname, i);
3541                                 if (rc)
3542                                         goto active_err;
3543                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3544                                                   logname, &mgi->mgi_bufs,
3545                                                   logname, ptr);
3546                                 name_destroy(&logname);
3547                                 if (rc)
3548                                         goto active_err;
3549                         }
3550                 } else {
3551                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3552                                           mti->mti_svname, &mgi->mgi_bufs,
3553                                           mti->mti_svname, ptr);
3554                         if (rc)
3555                                 goto active_err;
3556                 }
3557                 GOTO(end, rc);
3558         }
3559
3560         /* All mdd., ost. params in proc */
3561         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3562             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
3563                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3564                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3565                         GOTO(end, rc = -ENODEV);
3566
3567                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3568                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3569                 GOTO(end, rc);
3570         }
3571
3572         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3573         rc2 = -ENOSYS;
3574
3575 end:
3576         if (rc)
3577                 CERROR("err %d on param '%s'\n", rc, ptr);
3578
3579         RETURN(rc ?: rc2);
3580 }
3581
3582 /* Not implementing automatic failover nid addition at this time. */
3583 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3584                       struct mgs_target_info *mti)
3585 {
3586 #if 0
3587         struct fs_db *fsdb;
3588         int rc;
3589         ENTRY;
3590
3591         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3592         if (rc)
3593                 RETURN(rc);
3594
3595         if (mgs_log_is_empty(obd, mti->mti_svname))
3596                 /* should never happen */
3597                 RETURN(-ENOENT);
3598
3599         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3600
3601         /* FIXME We can just check mti->params to see if we're already in
3602            the failover list.  Modify mti->params for rewriting back at
3603            server_register_target(). */
3604
3605         mutex_lock(&fsdb->fsdb_mutex);
3606         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3607         mutex_unlock(&fsdb->fsdb_mutex);
3608
3609         RETURN(rc);
3610 #endif
3611         return 0;
3612 }
3613
3614 int mgs_write_log_target(const struct lu_env *env,
3615                          struct mgs_device *mgs,
3616                          struct mgs_target_info *mti,
3617                          struct fs_db *fsdb)
3618 {
3619         int rc = -EINVAL;
3620         char *buf, *params;
3621         ENTRY;
3622
3623         /* set/check the new target index */
3624         rc = mgs_set_index(env, mgs, mti);
3625         if (rc < 0) {
3626                 CERROR("Can't get index (%d)\n", rc);
3627                 RETURN(rc);
3628         }
3629
3630         if (rc == EALREADY) {
3631                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3632                               mti->mti_stripe_index, mti->mti_svname);
3633                 /* We would like to mark old log sections as invalid
3634                    and add new log sections in the client and mdt logs.
3635                    But if we add new sections, then live clients will
3636                    get repeat setup instructions for already running
3637                    osc's. So don't update the client/mdt logs. */
3638                 mti->mti_flags &= ~LDD_F_UPDATE;
3639                 rc = 0;
3640         }
3641
3642         mutex_lock(&fsdb->fsdb_mutex);
3643
3644         if (mti->mti_flags &
3645             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3646                 /* Generate a log from scratch */
3647                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3648                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3649                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3650                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3651                 } else {
3652                         CERROR("Unknown target type %#x, can't create log for "
3653                                "%s\n", mti->mti_flags, mti->mti_svname);
3654                 }
3655                 if (rc) {
3656                         CERROR("Can't write logs for %s (%d)\n",
3657                                mti->mti_svname, rc);
3658                         GOTO(out_up, rc);
3659                 }
3660         } else {
3661                 /* Just update the params from tunefs in mgs_write_log_params */
3662                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3663                 mti->mti_flags |= LDD_F_PARAM;
3664         }
3665
3666         /* allocate temporary buffer, where class_get_next_param will
3667            make copy of a current  parameter */
3668         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3669         if (buf == NULL)
3670                 GOTO(out_up, rc = -ENOMEM);
3671         params = mti->mti_params;
3672         while (params != NULL) {
3673                 rc = class_get_next_param(&params, buf);
3674                 if (rc) {
3675                         if (rc == 1)
3676                                 /* there is no next parameter, that is
3677                                    not an error */
3678                                 rc = 0;
3679                         break;
3680                 }
3681                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3682                        params, buf);
3683                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3684                 if (rc)
3685                         break;
3686         }
3687
3688         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3689
3690 out_up:
3691         mutex_unlock(&fsdb->fsdb_mutex);
3692         RETURN(rc);
3693 }
3694
3695 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3696 {
3697         struct llog_ctxt        *ctxt;
3698         int                      rc = 0;
3699
3700         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3701         if (ctxt == NULL) {
3702                 CERROR("%s: MGS config context doesn't exist\n",
3703                        mgs->mgs_obd->obd_name);
3704                 rc = -ENODEV;
3705         } else {
3706                 rc = llog_erase(env, ctxt, NULL, name);
3707                 /* llog may not exist */
3708                 if (rc == -ENOENT)
3709                         rc = 0;
3710                 llog_ctxt_put(ctxt);
3711         }
3712
3713         if (rc)
3714                 CERROR("%s: failed to clear log %s: %d\n",
3715                        mgs->mgs_obd->obd_name, name, rc);
3716
3717         return rc;
3718 }
3719
3720 /* erase all logs for the given fs */
3721 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3722 {
3723         struct fs_db *fsdb;
3724         cfs_list_t list;
3725         struct mgs_direntry *dirent, *n;
3726         int rc, len = strlen(fsname);
3727         char *suffix;
3728         ENTRY;
3729
3730         /* Find all the logs in the CONFIGS directory */
3731         rc = class_dentry_readdir(env, mgs, &list);
3732         if (rc)
3733                 RETURN(rc);
3734
3735         mutex_lock(&mgs->mgs_mutex);
3736
3737         /* Delete the fs db */
3738         fsdb = mgs_find_fsdb(mgs, fsname);
3739         if (fsdb)
3740                 mgs_free_fsdb(mgs, fsdb);
3741
3742         mutex_unlock(&mgs->mgs_mutex);
3743
3744         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3745                 cfs_list_del(&dirent->list);
3746                 suffix = strrchr(dirent->name, '-');
3747                 if (suffix != NULL) {
3748                         if ((len == suffix - dirent->name) &&
3749                             (strncmp(fsname, dirent->name, len) == 0)) {
3750                                 CDEBUG(D_MGS, "Removing log %s\n",
3751                                        dirent->name);
3752                                 mgs_erase_log(env, mgs, dirent->name);
3753                         }
3754                 }
3755                 mgs_direntry_free(dirent);
3756         }
3757
3758         RETURN(rc);
3759 }
3760
3761 /* from llog_swab */
3762 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3763 {
3764         int i;
3765         ENTRY;
3766
3767         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3768         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3769
3770         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3771         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3772         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3773         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3774
3775         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3776         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3777                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3778                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3779                                i, lcfg->lcfg_buflens[i],
3780                                lustre_cfg_string(lcfg, i));
3781                 }
3782         EXIT;
3783 }
3784
3785 /* Set a permanent (config log) param for a target or fs
3786  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3787  *             buf1 contains the single parameter
3788  */
3789 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3790                  struct lustre_cfg *lcfg, char *fsname)
3791 {
3792         struct fs_db *fsdb;
3793         struct mgs_target_info *mti;
3794         char *devname, *param;
3795         char *ptr;
3796         const char *tmp;
3797         __u32 index;
3798         int rc = 0;
3799         ENTRY;
3800
3801         print_lustre_cfg(lcfg);
3802
3803         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3804         devname = lustre_cfg_string(lcfg, 0);
3805         param = lustre_cfg_string(lcfg, 1);
3806         if (!devname) {
3807                 /* Assume device name embedded in param:
3808                    lustre-OST0000.osc.max_dirty_mb=32 */
3809                 ptr = strchr(param, '.');
3810                 if (ptr) {
3811                         devname = param;
3812                         *ptr = 0;
3813                         param = ptr + 1;
3814                 }
3815         }
3816         if (!devname) {
3817                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3818                 RETURN(-ENOSYS);
3819         }
3820
3821         rc = mgs_parse_devname(devname, fsname, NULL);
3822         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3823                 /* param related to llite isn't allowed to set by OST or MDT */
3824                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3825                                    sizeof(PARAM_LLITE)) == 0)
3826                         RETURN(-EINVAL);
3827         } else {
3828                 /* assume devname is the fsname */
3829                 memset(fsname, 0, MTI_NAME_MAXLEN);
3830                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3831                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3832         }
3833         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3834
3835         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3836         if (rc)
3837                 RETURN(rc);
3838         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3839             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3840                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3841                        "is '%s'\n", fsname, devname);
3842                 mgs_free_fsdb(mgs, fsdb);
3843                 RETURN(-EINVAL);
3844         }
3845
3846         /* Create a fake mti to hold everything */
3847         OBD_ALLOC_PTR(mti);
3848         if (!mti)
3849                 GOTO(out, rc = -ENOMEM);
3850         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3851             >= sizeof(mti->mti_fsname))
3852                 GOTO(out, rc = -E2BIG);
3853         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3854             >= sizeof(mti->mti_svname))
3855                 GOTO(out, rc = -E2BIG);
3856         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3857             >= sizeof(mti->mti_params))
3858                 GOTO(out, rc = -E2BIG);
3859         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3860         if (rc < 0)
3861                 /* Not a valid server; may be only fsname */
3862                 rc = 0;
3863         else
3864                 /* Strip -osc or -mdc suffix from svname */
3865                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3866                                      mti->mti_svname))
3867                         GOTO(out, rc = -EINVAL);
3868
3869         mti->mti_flags = rc | LDD_F_PARAM;
3870
3871         mutex_lock(&fsdb->fsdb_mutex);
3872         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3873         mutex_unlock(&fsdb->fsdb_mutex);
3874
3875         /*
3876          * Revoke lock so everyone updates.  Should be alright if
3877          * someone was already reading while we were updating the logs,
3878          * so we don't really need to hold the lock while we're
3879          * writing (above).
3880          */
3881         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3882 out:
3883         OBD_FREE_PTR(mti);
3884         RETURN(rc);
3885 }
3886
3887 static int mgs_write_log_pool(const struct lu_env *env,
3888                               struct mgs_device *mgs, char *logname,
3889                               struct fs_db *fsdb, char *lovname,
3890                               enum lcfg_command_type cmd,
3891                               char *poolname, char *fsname,
3892                               char *ostname, char *comment)
3893 {
3894         struct llog_handle *llh = NULL;
3895         int rc;
3896
3897         rc = record_start_log(env, mgs, &llh, logname);
3898         if (rc)
3899                 return rc;
3900         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3901         if (rc)
3902                 goto out;
3903         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3904         if (rc)
3905                 goto out;
3906         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3907 out:
3908         record_end_log(env, &llh);
3909         return rc;
3910 }
3911
3912 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3913                  enum lcfg_command_type cmd, char *fsname,
3914                  char *poolname, char *ostname)
3915 {
3916         struct fs_db *fsdb;
3917         char *lovname;
3918         char *logname;
3919         char *label = NULL, *canceled_label = NULL;
3920         int label_sz;
3921         struct mgs_target_info *mti = NULL;
3922         int rc, i;
3923         ENTRY;
3924
3925         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3926         if (rc) {
3927                 CERROR("Can't get db for %s\n", fsname);
3928                 RETURN(rc);
3929         }
3930         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3931                 CERROR("%s is not defined\n", fsname);
3932                 mgs_free_fsdb(mgs, fsdb);
3933                 RETURN(-EINVAL);
3934         }
3935
3936         label_sz = 10 + strlen(fsname) + strlen(poolname);
3937
3938         /* check if ostname match fsname */
3939         if (ostname != NULL) {
3940                 char *ptr;
3941
3942                 ptr = strrchr(ostname, '-');
3943                 if ((ptr == NULL) ||
3944                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3945                         RETURN(-EINVAL);
3946                 label_sz += strlen(ostname);
3947         }
3948
3949         OBD_ALLOC(label, label_sz);
3950         if (label == NULL)
3951                 RETURN(-ENOMEM);
3952
3953         switch(cmd) {
3954         case LCFG_POOL_NEW:
3955                 sprintf(label,
3956                         "new %s.%s", fsname, poolname);
3957                 break;
3958         case LCFG_POOL_ADD:
3959                 sprintf(label,
3960                         "add %s.%s.%s", fsname, poolname, ostname);
3961                 break;
3962         case LCFG_POOL_REM:
3963                 OBD_ALLOC(canceled_label, label_sz);
3964                 if (canceled_label == NULL)
3965                         GOTO(out_label, rc = -ENOMEM);
3966                 sprintf(label,
3967                         "rem %s.%s.%s", fsname, poolname, ostname);
3968                 sprintf(canceled_label,
3969                         "add %s.%s.%s", fsname, poolname, ostname);
3970                 break;
3971         case LCFG_POOL_DEL:
3972                 OBD_ALLOC(canceled_label, label_sz);
3973                 if (canceled_label == NULL)
3974                         GOTO(out_label, rc = -ENOMEM);
3975                 sprintf(label,
3976                         "del %s.%s", fsname, poolname);
3977                 sprintf(canceled_label,
3978                         "new %s.%s", fsname, poolname);
3979                 break;
3980         default:
3981                 break;
3982         }
3983
3984         mutex_lock(&fsdb->fsdb_mutex);
3985
3986         if (canceled_label != NULL) {
3987                 OBD_ALLOC_PTR(mti);
3988                 if (mti == NULL)
3989                         GOTO(out_cancel, rc = -ENOMEM);
3990         }
3991
3992         /* write pool def to all MDT logs */
3993         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3994                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3995                         rc = name_create_mdt_and_lov(&logname, &lovname,
3996                                                      fsdb, i);
3997                         if (rc) {
3998                                 mutex_unlock(&fsdb->fsdb_mutex);
3999                                 GOTO(out_mti, rc);
4000                         }
4001                         if (canceled_label != NULL) {
4002                                 strcpy(mti->mti_svname, "lov pool");
4003                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4004                                                 lovname, canceled_label,
4005                                                 CM_SKIP);
4006                         }
4007
4008                         if (rc >= 0)
4009                                 rc = mgs_write_log_pool(env, mgs, logname,
4010                                                         fsdb, lovname, cmd,
4011                                                         fsname, poolname,
4012                                                         ostname, label);
4013                         name_destroy(&logname);
4014                         name_destroy(&lovname);
4015                         if (rc) {
4016                                 mutex_unlock(&fsdb->fsdb_mutex);
4017                                 GOTO(out_mti, rc);
4018                         }
4019                 }
4020         }
4021
4022         rc = name_create(&logname, fsname, "-client");
4023         if (rc) {
4024                 mutex_unlock(&fsdb->fsdb_mutex);
4025                 GOTO(out_mti, rc);
4026         }
4027         if (canceled_label != NULL) {
4028                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4029                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4030                 if (rc < 0) {
4031                         mutex_unlock(&fsdb->fsdb_mutex);
4032                         name_destroy(&logname);
4033                         GOTO(out_mti, rc);
4034                 }
4035         }
4036
4037         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4038                                 cmd, fsname, poolname, ostname, label);
4039         mutex_unlock(&fsdb->fsdb_mutex);
4040         name_destroy(&logname);
4041         /* request for update */
4042         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4043
4044         EXIT;
4045 out_mti:
4046         if (mti != NULL)
4047                 OBD_FREE_PTR(mti);
4048 out_cancel:
4049         if (canceled_label != NULL)
4050                 OBD_FREE(canceled_label, label_sz);
4051 out_label:
4052         OBD_FREE(label, label_sz);
4053         return rc;
4054 }
4055
4056