Whamcloud - gitweb
820acba1824b7f24d1d75aeebf9f401f5317dc87
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_param.h>
50 #include <lustre_sec.h>
51 #include <lustre_quota.h>
52
53 #include "mgs_internal.h"
54
55 /********************** Class functions ********************/
56
57 int class_dentry_readdir(const struct lu_env *env,
58                          struct mgs_device *mgs, cfs_list_t *list)
59 {
60         struct dt_object    *dir = mgs->mgs_configs_dir;
61         const struct dt_it_ops *iops;
62         struct dt_it        *it;
63         struct mgs_direntry *de;
64         char                *key;
65         int                  rc, key_sz;
66
67         CFS_INIT_LIST_HEAD(list);
68
69         LASSERT(dir);
70         LASSERT(dir->do_index_ops);
71
72         iops = &dir->do_index_ops->dio_it;
73         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
74         if (IS_ERR(it))
75                 RETURN(PTR_ERR(it));
76
77         rc = iops->load(env, it, 0);
78         if (rc <= 0)
79                 GOTO(fini, rc = 0);
80
81         /* main cycle */
82         do {
83                 key = (void *)iops->key(env, it);
84                 if (IS_ERR(key)) {
85                         CERROR("%s: key failed when listing %s: rc = %d\n",
86                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
87                                (int) PTR_ERR(key));
88                         goto next;
89                 }
90                 key_sz = iops->key_size(env, it);
91                 LASSERT(key_sz > 0);
92
93                 /* filter out "." and ".." entries */
94                 if (key[0] == '.') {
95                         if (key_sz == 1)
96                                 goto next;
97                         if (key_sz == 2 && key[1] == '.')
98                                 goto next;
99                 }
100
101                 de = mgs_direntry_alloc(key_sz + 1);
102                 if (de == NULL) {
103                         rc = -ENOMEM;
104                         break;
105                 }
106
107                 memcpy(de->name, key, key_sz);
108                 de->name[key_sz] = 0;
109
110                 cfs_list_add(&de->list, list);
111
112 next:
113                 rc = iops->next(env, it);
114         } while (rc == 0);
115         rc = 0;
116
117         iops->put(env, it);
118
119 fini:
120         iops->fini(env, it);
121         if (rc)
122                 CERROR("%s: key failed when listing %s: rc = %d\n",
123                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
124         RETURN(rc);
125 }
126
127 /******************** DB functions *********************/
128
129 static inline int name_create(char **newname, char *prefix, char *suffix)
130 {
131         LASSERT(newname);
132         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
133         if (!*newname)
134                 return -ENOMEM;
135         sprintf(*newname, "%s%s", prefix, suffix);
136         return 0;
137 }
138
139 static inline void name_destroy(char **name)
140 {
141         if (*name)
142                 OBD_FREE(*name, strlen(*name) + 1);
143         *name = NULL;
144 }
145
146 struct mgs_fsdb_handler_data
147 {
148         struct fs_db   *fsdb;
149         __u32           ver;
150 };
151
152 /* from the (client) config log, figure out:
153         1. which ost's/mdt's are configured (by index)
154         2. what the last config step is
155         3. COMPAT_18 osc name
156 */
157 /* It might be better to have a separate db file, instead of parsing the info
158    out of the client log.  This is slow and potentially error-prone. */
159 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
160                             struct llog_rec_hdr *rec, void *data)
161 {
162         struct mgs_fsdb_handler_data *d = data;
163         struct fs_db *fsdb = d->fsdb;
164         int cfg_len = rec->lrh_len;
165         char *cfg_buf = (char*) (rec + 1);
166         struct lustre_cfg *lcfg;
167         __u32 index;
168         int rc = 0;
169         ENTRY;
170
171         if (rec->lrh_type != OBD_CFG_REC) {
172                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
173                 RETURN(-EINVAL);
174         }
175
176         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
177         if (rc) {
178                 CERROR("Insane cfg\n");
179                 RETURN(rc);
180         }
181
182         lcfg = (struct lustre_cfg *)cfg_buf;
183
184         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
185                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
186
187         /* Figure out ost indicies */
188         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
189         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
190             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
191                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
192                                        NULL, 10);
193                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
194                        lustre_cfg_string(lcfg, 1), index,
195                        lustre_cfg_string(lcfg, 2));
196                 set_bit(index, fsdb->fsdb_ost_index_map);
197         }
198
199         /* Figure out mdt indicies */
200         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
201         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
202             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
203                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
204                                        &index, NULL);
205                 if (rc != LDD_F_SV_TYPE_MDT) {
206                         CWARN("Unparsable MDC name %s, assuming index 0\n",
207                               lustre_cfg_string(lcfg, 0));
208                         index = 0;
209                 }
210                 rc = 0;
211                 CDEBUG(D_MGS, "MDT index is %u\n", index);
212                 set_bit(index, fsdb->fsdb_mdt_index_map);
213                 fsdb->fsdb_mdt_count ++;
214         }
215
216         /**
217          * figure out the old config. fsdb_gen = 0 means old log
218          * It is obsoleted and not supported anymore
219          */
220         if (fsdb->fsdb_gen == 0) {
221                 CERROR("Old config format is not supported\n");
222                 RETURN(-EINVAL);
223         }
224
225         /*
226          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
227          */
228         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
229             lcfg->lcfg_command == LCFG_ATTACH &&
230             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
231                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
232                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
233                         CWARN("MDT using 1.8 OSC name scheme\n");
234                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
235                 }
236         }
237
238         if (lcfg->lcfg_command == LCFG_MARKER) {
239                 struct cfg_marker *marker;
240                 marker = lustre_cfg_buf(lcfg, 1);
241
242                 d->ver = marker->cm_vers;
243
244                 /* Keep track of the latest marker step */
245                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
246         }
247
248         RETURN(rc);
249 }
250
251 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
252 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
253                                   struct mgs_device *mgs,
254                                   struct fs_db *fsdb)
255 {
256         char                            *logname;
257         struct llog_handle              *loghandle;
258         struct llog_ctxt                *ctxt;
259         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
260         int rc;
261
262         ENTRY;
263
264         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
265         LASSERT(ctxt != NULL);
266         rc = name_create(&logname, fsdb->fsdb_name, "-client");
267         if (rc)
268                 GOTO(out_put, rc);
269         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
270         if (rc)
271                 GOTO(out_pop, rc);
272
273         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
274         if (rc)
275                 GOTO(out_close, rc);
276
277         if (llog_get_size(loghandle) <= 1)
278                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
279
280         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
281         CDEBUG(D_INFO, "get_db = %d\n", rc);
282 out_close:
283         llog_close(env, loghandle);
284 out_pop:
285         name_destroy(&logname);
286 out_put:
287         llog_ctxt_put(ctxt);
288
289         RETURN(rc);
290 }
291
292 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
293 {
294         struct mgs_tgt_srpc_conf *tgtconf;
295
296         /* free target-specific rules */
297         while (fsdb->fsdb_srpc_tgt) {
298                 tgtconf = fsdb->fsdb_srpc_tgt;
299                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
300
301                 LASSERT(tgtconf->mtsc_tgt);
302
303                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
304                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
305                 OBD_FREE_PTR(tgtconf);
306         }
307
308         /* free general rules */
309         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
310 }
311
312 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
313 {
314         struct fs_db *fsdb;
315         cfs_list_t *tmp;
316
317         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
318                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
319                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
320                         return fsdb;
321         }
322         return NULL;
323 }
324
325 /* caller must hold the mgs->mgs_fs_db_lock */
326 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
327                                   struct mgs_device *mgs, char *fsname)
328 {
329         struct fs_db *fsdb;
330         int rc;
331         ENTRY;
332
333         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
334                 CERROR("fsname %s is too long\n", fsname);
335                 RETURN(NULL);
336         }
337
338         OBD_ALLOC_PTR(fsdb);
339         if (!fsdb)
340                 RETURN(NULL);
341
342         strcpy(fsdb->fsdb_name, fsname);
343         mutex_init(&fsdb->fsdb_mutex);
344         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
345         fsdb->fsdb_gen = 1;
346
347         if (strcmp(fsname, MGSSELF_NAME) == 0) {
348                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
349         } else {
350                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
351                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
352                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
353                         CERROR("No memory for index maps\n");
354                         GOTO(err, rc = -ENOMEM);
355                 }
356
357                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
358                 if (rc)
359                         GOTO(err, rc);
360                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
361                 if (rc)
362                         GOTO(err, rc);
363
364                 /* initialise data for NID table */
365                 mgs_ir_init_fs(env, mgs, fsdb);
366
367                 lproc_mgs_add_live(mgs, fsdb);
368         }
369
370         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
371
372         RETURN(fsdb);
373 err:
374         if (fsdb->fsdb_ost_index_map)
375                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
376         if (fsdb->fsdb_mdt_index_map)
377                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
378         name_destroy(&fsdb->fsdb_clilov);
379         name_destroy(&fsdb->fsdb_clilmv);
380         OBD_FREE_PTR(fsdb);
381         RETURN(NULL);
382 }
383
384 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
385 {
386         /* wait for anyone with the sem */
387         mutex_lock(&fsdb->fsdb_mutex);
388         lproc_mgs_del_live(mgs, fsdb);
389         cfs_list_del(&fsdb->fsdb_list);
390
391         /* deinitialize fsr */
392         mgs_ir_fini_fs(mgs, fsdb);
393
394         if (fsdb->fsdb_ost_index_map)
395                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
396         if (fsdb->fsdb_mdt_index_map)
397                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
398         name_destroy(&fsdb->fsdb_clilov);
399         name_destroy(&fsdb->fsdb_clilmv);
400         mgs_free_fsdb_srpc(fsdb);
401         mutex_unlock(&fsdb->fsdb_mutex);
402         OBD_FREE_PTR(fsdb);
403 }
404
405 int mgs_init_fsdb_list(struct mgs_device *mgs)
406 {
407         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
408         return 0;
409 }
410
411 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
412 {
413         struct fs_db *fsdb;
414         cfs_list_t *tmp, *tmp2;
415         mutex_lock(&mgs->mgs_mutex);
416         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
417                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
418                 mgs_free_fsdb(mgs, fsdb);
419         }
420         mutex_unlock(&mgs->mgs_mutex);
421         return 0;
422 }
423
424 int mgs_find_or_make_fsdb(const struct lu_env *env,
425                           struct mgs_device *mgs, char *name,
426                           struct fs_db **dbh)
427 {
428         struct fs_db *fsdb;
429         int rc = 0;
430
431         ENTRY;
432         mutex_lock(&mgs->mgs_mutex);
433         fsdb = mgs_find_fsdb(mgs, name);
434         if (fsdb) {
435                 mutex_unlock(&mgs->mgs_mutex);
436                 *dbh = fsdb;
437                 RETURN(0);
438         }
439
440         CDEBUG(D_MGS, "Creating new db\n");
441         fsdb = mgs_new_fsdb(env, mgs, name);
442         /* lock fsdb_mutex until the db is loaded from llogs */
443         if (fsdb)
444                 mutex_lock(&fsdb->fsdb_mutex);
445         mutex_unlock(&mgs->mgs_mutex);
446         if (!fsdb)
447                 RETURN(-ENOMEM);
448
449         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
450                 /* populate the db from the client llog */
451                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
452                 if (rc) {
453                         CERROR("Can't get db from client log %d\n", rc);
454                         GOTO(out_free, rc);
455                 }
456         }
457
458         /* populate srpc rules from params llog */
459         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
460         if (rc) {
461                 CERROR("Can't get db from params log %d\n", rc);
462                 GOTO(out_free, rc);
463         }
464
465         mutex_unlock(&fsdb->fsdb_mutex);
466         *dbh = fsdb;
467
468         RETURN(0);
469
470 out_free:
471         mutex_unlock(&fsdb->fsdb_mutex);
472         mgs_free_fsdb(mgs, fsdb);
473         return rc;
474 }
475
476 /* 1 = index in use
477    0 = index unused
478    -1= empty client log */
479 int mgs_check_index(const struct lu_env *env,
480                     struct mgs_device *mgs,
481                     struct mgs_target_info *mti)
482 {
483         struct fs_db *fsdb;
484         void *imap;
485         int rc = 0;
486         ENTRY;
487
488         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
489
490         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
491         if (rc) {
492                 CERROR("Can't get db for %s\n", mti->mti_fsname);
493                 RETURN(rc);
494         }
495
496         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
497                 RETURN(-1);
498
499         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
500                 imap = fsdb->fsdb_ost_index_map;
501         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
502                 imap = fsdb->fsdb_mdt_index_map;
503         else
504                 RETURN(-EINVAL);
505
506         if (test_bit(mti->mti_stripe_index, imap))
507                 RETURN(1);
508         RETURN(0);
509 }
510
511 static __inline__ int next_index(void *index_map, int map_len)
512 {
513         int i;
514         for (i = 0; i < map_len * 8; i++)
515                 if (!test_bit(i, index_map)) {
516                          return i;
517                  }
518         CERROR("max index %d exceeded.\n", i);
519         return -1;
520 }
521
522 /* Return codes:
523         0  newly marked as in use
524         <0 err
525         +EALREADY for update of an old index */
526 static int mgs_set_index(const struct lu_env *env,
527                          struct mgs_device *mgs,
528                          struct mgs_target_info *mti)
529 {
530         struct fs_db *fsdb;
531         void *imap;
532         int rc = 0;
533         ENTRY;
534
535         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
536         if (rc) {
537                 CERROR("Can't get db for %s\n", mti->mti_fsname);
538                 RETURN(rc);
539         }
540
541         mutex_lock(&fsdb->fsdb_mutex);
542         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
543                 imap = fsdb->fsdb_ost_index_map;
544         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
545                 imap = fsdb->fsdb_mdt_index_map;
546         } else {
547                 GOTO(out_up, rc = -EINVAL);
548         }
549
550         if (mti->mti_flags & LDD_F_NEED_INDEX) {
551                 rc = next_index(imap, INDEX_MAP_SIZE);
552                 if (rc == -1)
553                         GOTO(out_up, rc = -ERANGE);
554                 mti->mti_stripe_index = rc;
555                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
556                         fsdb->fsdb_mdt_count ++;
557         }
558
559         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
560                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
561                                    "but the max index is %d.\n",
562                                    mti->mti_svname, mti->mti_stripe_index,
563                                    INDEX_MAP_SIZE * 8);
564                 GOTO(out_up, rc = -ERANGE);
565         }
566
567         if (test_bit(mti->mti_stripe_index, imap)) {
568                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
569                     !(mti->mti_flags & LDD_F_WRITECONF)) {
570                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
571                                            "%d, but that index is already in "
572                                            "use. Use --writeconf to force\n",
573                                            mti->mti_svname,
574                                            mti->mti_stripe_index);
575                         GOTO(out_up, rc = -EADDRINUSE);
576                 } else {
577                         CDEBUG(D_MGS, "Server %s updating index %d\n",
578                                mti->mti_svname, mti->mti_stripe_index);
579                         GOTO(out_up, rc = EALREADY);
580                 }
581         }
582
583         set_bit(mti->mti_stripe_index, imap);
584         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
585         mutex_unlock(&fsdb->fsdb_mutex);
586         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
587                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
588
589         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
590                mti->mti_stripe_index);
591
592         RETURN(0);
593 out_up:
594         mutex_unlock(&fsdb->fsdb_mutex);
595         return rc;
596 }
597
598 struct mgs_modify_lookup {
599         struct cfg_marker mml_marker;
600         int               mml_modified;
601 };
602
603 static int mgs_modify_handler(const struct lu_env *env,
604                               struct llog_handle *llh,
605                               struct llog_rec_hdr *rec, void *data)
606 {
607         struct mgs_modify_lookup *mml = data;
608         struct cfg_marker *marker;
609         struct lustre_cfg *lcfg = REC_DATA(rec);
610         int cfg_len = REC_DATA_LEN(rec);
611         int rc;
612         ENTRY;
613
614         if (rec->lrh_type != OBD_CFG_REC) {
615                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
616                 RETURN(-EINVAL);
617         }
618
619         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
620         if (rc) {
621                 CERROR("Insane cfg\n");
622                 RETURN(rc);
623         }
624
625         /* We only care about markers */
626         if (lcfg->lcfg_command != LCFG_MARKER)
627                 RETURN(0);
628
629         marker = lustre_cfg_buf(lcfg, 1);
630         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
631             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
632             !(marker->cm_flags & CM_SKIP)) {
633                 /* Found a non-skipped marker match */
634                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
635                        rec->lrh_index, marker->cm_step,
636                        marker->cm_flags, mml->mml_marker.cm_flags,
637                        marker->cm_tgtname, marker->cm_comment);
638                 /* Overwrite the old marker llog entry */
639                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
640                 marker->cm_flags |= mml->mml_marker.cm_flags;
641                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
642                 /* Header and tail are added back to lrh_len in
643                    llog_lvfs_write_rec */
644                 rec->lrh_len = cfg_len;
645                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
646                                 rec->lrh_index);
647                 if (!rc)
648                          mml->mml_modified++;
649         }
650
651         RETURN(rc);
652 }
653
654 /**
655  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
656  * Return code:
657  * 0 - modified successfully,
658  * 1 - no modification was done
659  * negative - error
660  */
661 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
662                       struct fs_db *fsdb, struct mgs_target_info *mti,
663                       char *logname, char *devname, char *comment, int flags)
664 {
665         struct llog_handle *loghandle;
666         struct llog_ctxt *ctxt;
667         struct mgs_modify_lookup *mml;
668         int rc;
669
670         ENTRY;
671
672         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
673         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
674                flags);
675
676         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
677         LASSERT(ctxt != NULL);
678         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
679         if (rc < 0) {
680                 if (rc == -ENOENT)
681                         rc = 0;
682                 GOTO(out_pop, rc);
683         }
684
685         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
686         if (rc)
687                 GOTO(out_close, rc);
688
689         if (llog_get_size(loghandle) <= 1)
690                 GOTO(out_close, rc = 0);
691
692         OBD_ALLOC_PTR(mml);
693         if (!mml)
694                 GOTO(out_close, rc = -ENOMEM);
695         if (strlcpy(mml->mml_marker.cm_comment, comment,
696                     sizeof(mml->mml_marker.cm_comment)) >=
697             sizeof(mml->mml_marker.cm_comment))
698                 GOTO(out_free, rc = -E2BIG);
699         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
700                     sizeof(mml->mml_marker.cm_tgtname)) >=
701             sizeof(mml->mml_marker.cm_tgtname))
702                 GOTO(out_free, rc = -E2BIG);
703         /* Modify mostly means cancel */
704         mml->mml_marker.cm_flags = flags;
705         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
706         mml->mml_modified = 0;
707         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
708                           NULL);
709         if (!rc && !mml->mml_modified)
710                 rc = 1;
711
712 out_free:
713         OBD_FREE_PTR(mml);
714
715 out_close:
716         llog_close(env, loghandle);
717 out_pop:
718         if (rc < 0)
719                 CERROR("%s: modify %s/%s failed: rc = %d\n",
720                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
721         llog_ctxt_put(ctxt);
722         RETURN(rc);
723 }
724
725 /** This structure is passed to mgs_replace_handler */
726 struct mgs_replace_uuid_lookup {
727         /* Nids are replaced for this target device */
728         struct mgs_target_info target;
729         /* Temporary modified llog */
730         struct llog_handle *temp_llh;
731         /* Flag is set if in target block*/
732         int in_target_device;
733         /* Nids already added. Just skip (multiple nids) */
734         int device_nids_added;
735         /* Flag is set if this block should not be copied */
736         int skip_it;
737 };
738
739 /**
740  * Check: a) if block should be skipped
741  * b) is it target block
742  *
743  * \param[in] lcfg
744  * \param[in] mrul
745  *
746  * \retval 0 should not to be skipped
747  * \retval 1 should to be skipped
748  */
749 static int check_markers(struct lustre_cfg *lcfg,
750                          struct mgs_replace_uuid_lookup *mrul)
751 {
752          struct cfg_marker *marker;
753
754         /* Track markers. Find given device */
755         if (lcfg->lcfg_command == LCFG_MARKER) {
756                 marker = lustre_cfg_buf(lcfg, 1);
757                 /* Clean llog from records marked as CM_EXCLUDE.
758                    CM_SKIP records are used for "active" command
759                    and can be restored if needed */
760                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
761                     (CM_EXCLUDE | CM_START)) {
762                         mrul->skip_it = 1;
763                         return 1;
764                 }
765
766                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
767                     (CM_EXCLUDE | CM_END)) {
768                         mrul->skip_it = 0;
769                         return 1;
770                 }
771
772                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
773                         LASSERT(!(marker->cm_flags & CM_START) ||
774                                 !(marker->cm_flags & CM_END));
775                         if (marker->cm_flags & CM_START) {
776                                 mrul->in_target_device = 1;
777                                 mrul->device_nids_added = 0;
778                         } else if (marker->cm_flags & CM_END)
779                                 mrul->in_target_device = 0;
780                 }
781         }
782
783         return 0;
784 }
785
786 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
787                        struct lustre_cfg *lcfg)
788 {
789         struct llog_rec_hdr      rec;
790         int                      buflen, rc;
791
792         if (!lcfg || !llh)
793                 return -ENOMEM;
794
795         LASSERT(llh->lgh_ctxt);
796
797         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
798                                 lcfg->lcfg_buflens);
799         rec.lrh_len = llog_data_len(buflen);
800         rec.lrh_type = OBD_CFG_REC;
801
802         /* idx = -1 means append */
803         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
804         if (rc)
805                 CERROR("failed %d\n", rc);
806         return rc;
807 }
808
809 static int record_base(const struct lu_env *env, struct llog_handle *llh,
810                      char *cfgname, lnet_nid_t nid, int cmd,
811                      char *s1, char *s2, char *s3, char *s4)
812 {
813         struct mgs_thread_info *mgi = mgs_env_info(env);
814         struct lustre_cfg     *lcfg;
815         int rc;
816
817         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
818                cmd, s1, s2, s3, s4);
819
820         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
821         if (s1)
822                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
823         if (s2)
824                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
825         if (s3)
826                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
827         if (s4)
828                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
829
830         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
831         if (!lcfg)
832                 return -ENOMEM;
833         lcfg->lcfg_nid = nid;
834
835         rc = record_lcfg(env, llh, lcfg);
836
837         lustre_cfg_free(lcfg);
838
839         if (rc) {
840                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
841                        cmd, s1, s2, s3, s4);
842         }
843         return rc;
844 }
845
846 static inline int record_add_uuid(const struct lu_env *env,
847                                   struct llog_handle *llh,
848                                   uint64_t nid, char *uuid)
849 {
850         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
851 }
852
853 static inline int record_add_conn(const struct lu_env *env,
854                                   struct llog_handle *llh,
855                                   char *devname, char *uuid)
856 {
857         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
858 }
859
860 static inline int record_attach(const struct lu_env *env,
861                                 struct llog_handle *llh, char *devname,
862                                 char *type, char *uuid)
863 {
864         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
865 }
866
867 static inline int record_setup(const struct lu_env *env,
868                                struct llog_handle *llh, char *devname,
869                                char *s1, char *s2, char *s3, char *s4)
870 {
871         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
872 }
873
874 /**
875  * \retval <0 record processing error
876  * \retval n record is processed. No need copy original one.
877  * \retval 0 record is not processed.
878  */
879 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
880                            struct mgs_replace_uuid_lookup *mrul)
881 {
882         int nids_added = 0;
883         lnet_nid_t nid;
884         char *ptr;
885         int rc;
886
887         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
888                 /* LCFG_ADD_UUID command found. Let's skip original command
889                    and add passed nids */
890                 ptr = mrul->target.mti_params;
891                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
892                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
893                                "device %s\n", libcfs_nid2str(nid),
894                                 mrul->target.mti_params,
895                                 mrul->target.mti_svname);
896                         rc = record_add_uuid(env,
897                                              mrul->temp_llh, nid,
898                                              mrul->target.mti_params);
899                         if (!rc)
900                                 nids_added++;
901                 }
902
903                 if (nids_added == 0) {
904                         CERROR("No new nids were added, nid %s with uuid %s, "
905                                "device %s\n", libcfs_nid2str(nid),
906                                mrul->target.mti_params,
907                                mrul->target.mti_svname);
908                         RETURN(-ENXIO);
909                 } else {
910                         mrul->device_nids_added = 1;
911                 }
912
913                 return nids_added;
914         }
915
916         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
917                 /* LCFG_SETUP command found. UUID should be changed */
918                 rc = record_setup(env,
919                                   mrul->temp_llh,
920                                   /* devname the same */
921                                   lustre_cfg_string(lcfg, 0),
922                                   /* s1 is not changed */
923                                   lustre_cfg_string(lcfg, 1),
924                                   /* new uuid should be
925                                   the full nidlist */
926                                   mrul->target.mti_params,
927                                   /* s3 is not changed */
928                                   lustre_cfg_string(lcfg, 3),
929                                   /* s4 is not changed */
930                                   lustre_cfg_string(lcfg, 4));
931                 return rc ? rc : 1;
932         }
933
934         /* Another commands in target device block */
935         return 0;
936 }
937
938 /**
939  * Handler that called for every record in llog.
940  * Records are processed in order they placed in llog.
941  *
942  * \param[in] llh       log to be processed
943  * \param[in] rec       current record
944  * \param[in] data      mgs_replace_uuid_lookup structure
945  *
946  * \retval 0    success
947  */
948 static int mgs_replace_handler(const struct lu_env *env,
949                                struct llog_handle *llh,
950                                struct llog_rec_hdr *rec,
951                                void *data)
952 {
953         struct mgs_replace_uuid_lookup *mrul;
954         struct lustre_cfg *lcfg = REC_DATA(rec);
955         int cfg_len = REC_DATA_LEN(rec);
956         int rc;
957         ENTRY;
958
959         mrul = (struct mgs_replace_uuid_lookup *)data;
960
961         if (rec->lrh_type != OBD_CFG_REC) {
962                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
963                        rec->lrh_type, lcfg->lcfg_command,
964                        lustre_cfg_string(lcfg, 0),
965                        lustre_cfg_string(lcfg, 1));
966                 RETURN(-EINVAL);
967         }
968
969         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
970         if (rc) {
971                 /* Do not copy any invalidated records */
972                 GOTO(skip_out, rc = 0);
973         }
974
975         rc = check_markers(lcfg, mrul);
976         if (rc || mrul->skip_it)
977                 GOTO(skip_out, rc = 0);
978
979         /* Write to new log all commands outside target device block */
980         if (!mrul->in_target_device)
981                 GOTO(copy_out, rc = 0);
982
983         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
984            (failover nids) for this target, assuming that if then
985            primary is changing then so is the failover */
986         if (mrul->device_nids_added &&
987             (lcfg->lcfg_command == LCFG_ADD_UUID ||
988              lcfg->lcfg_command == LCFG_ADD_CONN))
989                 GOTO(skip_out, rc = 0);
990
991         rc = process_command(env, lcfg, mrul);
992         if (rc < 0)
993                 RETURN(rc);
994
995         if (rc)
996                 RETURN(0);
997 copy_out:
998         /* Record is placed in temporary llog as is */
999         rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1);
1000
1001         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1002                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1003                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1004         RETURN(rc);
1005
1006 skip_out:
1007         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1008                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1009                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1010         RETURN(rc);
1011 }
1012
1013 static int mgs_log_is_empty(const struct lu_env *env,
1014                             struct mgs_device *mgs, char *name)
1015 {
1016         struct llog_ctxt        *ctxt;
1017         int                      rc;
1018
1019         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1020         LASSERT(ctxt != NULL);
1021
1022         rc = llog_is_empty(env, ctxt, name);
1023         llog_ctxt_put(ctxt);
1024         return rc;
1025 }
1026
1027 static int mgs_replace_nids_log(const struct lu_env *env,
1028                                 struct obd_device *mgs, struct fs_db *fsdb,
1029                                 char *logname, char *devname, char *nids)
1030 {
1031         struct llog_handle *orig_llh, *backup_llh;
1032         struct llog_ctxt *ctxt;
1033         struct mgs_replace_uuid_lookup *mrul;
1034         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1035         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1036         char *backup;
1037         int rc, rc2;
1038         ENTRY;
1039
1040         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1041
1042         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1043         LASSERT(ctxt != NULL);
1044
1045         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1046                 /* Log is empty. Nothing to replace */
1047                 GOTO(out_put, rc = 0);
1048         }
1049
1050         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1051         if (backup == NULL)
1052                 GOTO(out_put, rc = -ENOMEM);
1053
1054         sprintf(backup, "%s.bak", logname);
1055
1056         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1057         if (rc == 0) {
1058                 /* Now erase original log file. Connections are not allowed.
1059                    Backup is already saved */
1060                 rc = llog_erase(env, ctxt, NULL, logname);
1061                 if (rc < 0)
1062                         GOTO(out_free, rc);
1063         } else if (rc != -ENOENT) {
1064                 CERROR("%s: can't make backup for %s: rc = %d\n",
1065                        mgs->obd_name, logname, rc);
1066                 GOTO(out_free,rc);
1067         }
1068
1069         /* open local log */
1070         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1071         if (rc)
1072                 GOTO(out_restore, rc);
1073
1074         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1075         if (rc)
1076                 GOTO(out_closel, rc);
1077
1078         /* open backup llog */
1079         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1080                        LLOG_OPEN_EXISTS);
1081         if (rc)
1082                 GOTO(out_closel, rc);
1083
1084         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1085         if (rc)
1086                 GOTO(out_close, rc);
1087
1088         if (llog_get_size(backup_llh) <= 1)
1089                 GOTO(out_close, rc = 0);
1090
1091         OBD_ALLOC_PTR(mrul);
1092         if (!mrul)
1093                 GOTO(out_close, rc = -ENOMEM);
1094         /* devname is only needed information to replace UUID records */
1095         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1096         /* parse nids later */
1097         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1098         /* Copy records to this temporary llog */
1099         mrul->temp_llh = orig_llh;
1100
1101         rc = llog_process(env, backup_llh, mgs_replace_handler,
1102                           (void *)mrul, NULL);
1103         OBD_FREE_PTR(mrul);
1104 out_close:
1105         rc2 = llog_close(NULL, backup_llh);
1106         if (!rc)
1107                 rc = rc2;
1108 out_closel:
1109         rc2 = llog_close(NULL, orig_llh);
1110         if (!rc)
1111                 rc = rc2;
1112
1113 out_restore:
1114         if (rc) {
1115                 CERROR("%s: llog should be restored: rc = %d\n",
1116                        mgs->obd_name, rc);
1117                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1118                                   logname);
1119                 if (rc2 < 0)
1120                         CERROR("%s: can't restore backup %s: rc = %d\n",
1121                                mgs->obd_name, logname, rc2);
1122         }
1123
1124 out_free:
1125         OBD_FREE(backup, strlen(backup) + 1);
1126
1127 out_put:
1128         llog_ctxt_put(ctxt);
1129
1130         if (rc)
1131                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1132                        mgs->obd_name, logname, rc);
1133
1134         RETURN(rc);
1135 }
1136
1137 /**
1138  * Parse device name and get file system name and/or device index
1139  *
1140  * \param[in]   devname device name (ex. lustre-MDT0000)
1141  * \param[out]  fsname  file system name(optional)
1142  * \param[out]  index   device index(optional)
1143  *
1144  * \retval 0    success
1145  */
1146 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1147 {
1148         int rc;
1149         ENTRY;
1150
1151         /* Extract fsname */
1152         if (fsname) {
1153                 rc = server_name2fsname(devname, fsname, NULL);
1154                 if (rc < 0) {
1155                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1156                                devname);
1157                         RETURN(-EINVAL);
1158                 }
1159         }
1160
1161         if (index) {
1162                 rc = server_name2index(devname, index, NULL);
1163                 if (rc < 0) {
1164                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1165                                devname);
1166                         RETURN(-EINVAL);
1167                 }
1168         }
1169
1170         RETURN(0);
1171 }
1172
1173 /* This is only called during replace_nids */
1174 static int only_mgs_is_running(struct obd_device *mgs_obd)
1175 {
1176         /* TDB: Is global variable with devices count exists? */
1177         int num_devices = get_devices_count();
1178         int num_exports = 0;
1179         struct obd_export *exp;
1180
1181         spin_lock(&mgs_obd->obd_dev_lock);
1182         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1183                 /* skip self export */
1184                 if (exp == mgs_obd->obd_self_export)
1185                         continue;
1186                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1187                         continue;
1188
1189                 ++num_exports;
1190
1191                 CERROR("%s: node %s still connected during replace_nids "
1192                        "connect_flags:%llx\n",
1193                        mgs_obd->obd_name,
1194                        libcfs_nid2str(exp->exp_nid_stats->nid),
1195                        exp_connect_flags(exp));
1196
1197         }
1198         spin_unlock(&mgs_obd->obd_dev_lock);
1199
1200         /* osd, MGS and MGC + self_export
1201            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1202         return (num_devices <= 3) && (num_exports == 0);
1203 }
1204
1205 static int name_create_mdt(char **logname, char *fsname, int i)
1206 {
1207         char mdt_index[9];
1208
1209         sprintf(mdt_index, "-MDT%04x", i);
1210         return name_create(logname, fsname, mdt_index);
1211 }
1212
1213 /**
1214  * Replace nids for \a device to \a nids values
1215  *
1216  * \param obd           MGS obd device
1217  * \param devname       nids need to be replaced for this device
1218  * (ex. lustre-OST0000)
1219  * \param nids          nids list (ex. nid1,nid2,nid3)
1220  *
1221  * \retval 0    success
1222  */
1223 int mgs_replace_nids(const struct lu_env *env,
1224                      struct mgs_device *mgs,
1225                      char *devname, char *nids)
1226 {
1227         /* Assume fsname is part of device name */
1228         char fsname[MTI_NAME_MAXLEN];
1229         int rc;
1230         __u32 index;
1231         char *logname;
1232         struct fs_db *fsdb;
1233         unsigned int i;
1234         int conn_state;
1235         struct obd_device *mgs_obd = mgs->mgs_obd;
1236         ENTRY;
1237
1238         /* We can only change NIDs if no other nodes are connected */
1239         spin_lock(&mgs_obd->obd_dev_lock);
1240         conn_state = mgs_obd->obd_no_conn;
1241         mgs_obd->obd_no_conn = 1;
1242         spin_unlock(&mgs_obd->obd_dev_lock);
1243
1244         /* We can not change nids if not only MGS is started */
1245         if (!only_mgs_is_running(mgs_obd)) {
1246                 CERROR("Only MGS is allowed to be started\n");
1247                 GOTO(out, rc = -EINPROGRESS);
1248         }
1249
1250         /* Get fsname and index*/
1251         rc = mgs_parse_devname(devname, fsname, &index);
1252         if (rc)
1253                 GOTO(out, rc);
1254
1255         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1256         if (rc) {
1257                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1258                 GOTO(out, rc);
1259         }
1260
1261         /* Process client llogs */
1262         name_create(&logname, fsname, "-client");
1263         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1264         name_destroy(&logname);
1265         if (rc) {
1266                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1267                        fsname, devname, rc);
1268                 GOTO(out, rc);
1269         }
1270
1271         /* Process MDT llogs */
1272         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1273                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1274                         continue;
1275                 name_create_mdt(&logname, fsname, i);
1276                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1277                 name_destroy(&logname);
1278                 if (rc)
1279                         GOTO(out, rc);
1280         }
1281
1282 out:
1283         spin_lock(&mgs_obd->obd_dev_lock);
1284         mgs_obd->obd_no_conn = conn_state;
1285         spin_unlock(&mgs_obd->obd_dev_lock);
1286
1287         RETURN(rc);
1288 }
1289
1290 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1291                             char *devname, struct lov_desc *desc)
1292 {
1293         struct mgs_thread_info *mgi = mgs_env_info(env);
1294         struct lustre_cfg *lcfg;
1295         int rc;
1296
1297         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1298         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1299         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1300         if (!lcfg)
1301                 return -ENOMEM;
1302         rc = record_lcfg(env, llh, lcfg);
1303
1304         lustre_cfg_free(lcfg);
1305         return rc;
1306 }
1307
1308 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1309                             char *devname, struct lmv_desc *desc)
1310 {
1311         struct mgs_thread_info *mgi = mgs_env_info(env);
1312         struct lustre_cfg *lcfg;
1313         int rc;
1314
1315         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1316         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1317         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1318
1319         rc = record_lcfg(env, llh, lcfg);
1320
1321         lustre_cfg_free(lcfg);
1322         return rc;
1323 }
1324
1325 static inline int record_mdc_add(const struct lu_env *env,
1326                                  struct llog_handle *llh,
1327                                  char *logname, char *mdcuuid,
1328                                  char *mdtuuid, char *index,
1329                                  char *gen)
1330 {
1331         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1332                            mdtuuid,index,gen,mdcuuid);
1333 }
1334
1335 static inline int record_lov_add(const struct lu_env *env,
1336                                  struct llog_handle *llh,
1337                                  char *lov_name, char *ost_uuid,
1338                                  char *index, char *gen)
1339 {
1340         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1341                            ost_uuid, index, gen, 0);
1342 }
1343
1344 static inline int record_mount_opt(const struct lu_env *env,
1345                                    struct llog_handle *llh,
1346                                    char *profile, char *lov_name,
1347                                    char *mdc_name)
1348 {
1349         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1350                            profile,lov_name,mdc_name,0);
1351 }
1352
1353 static int record_marker(const struct lu_env *env,
1354                          struct llog_handle *llh,
1355                          struct fs_db *fsdb, __u32 flags,
1356                          char *tgtname, char *comment)
1357 {
1358         struct mgs_thread_info *mgi = mgs_env_info(env);
1359         struct lustre_cfg *lcfg;
1360         int rc;
1361         int cplen = 0;
1362
1363         if (flags & CM_START)
1364                 fsdb->fsdb_gen++;
1365         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1366         mgi->mgi_marker.cm_flags = flags;
1367         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1368         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1369                         sizeof(mgi->mgi_marker.cm_tgtname));
1370         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1371                 return -E2BIG;
1372         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1373                         sizeof(mgi->mgi_marker.cm_comment));
1374         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1375                 return -E2BIG;
1376         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1377         mgi->mgi_marker.cm_canceltime = 0;
1378         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1379         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1380                             sizeof(mgi->mgi_marker));
1381         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1382         if (!lcfg)
1383                 return -ENOMEM;
1384         rc = record_lcfg(env, llh, lcfg);
1385
1386         lustre_cfg_free(lcfg);
1387         return rc;
1388 }
1389
1390 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1391                             struct llog_handle **llh, char *name)
1392 {
1393         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1394         struct llog_ctxt        *ctxt;
1395         int                      rc = 0;
1396
1397         if (*llh)
1398                 GOTO(out, rc = -EBUSY);
1399
1400         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1401         if (!ctxt)
1402                 GOTO(out, rc = -ENODEV);
1403         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1404
1405         rc = llog_open_create(env, ctxt, llh, NULL, name);
1406         if (rc)
1407                 GOTO(out_ctxt, rc);
1408         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1409         if (rc)
1410                 llog_close(env, *llh);
1411 out_ctxt:
1412         llog_ctxt_put(ctxt);
1413 out:
1414         if (rc) {
1415                 CERROR("%s: can't start log %s: rc = %d\n",
1416                        mgs->mgs_obd->obd_name, name, rc);
1417                 *llh = NULL;
1418         }
1419         RETURN(rc);
1420 }
1421
1422 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1423 {
1424         int rc;
1425
1426         rc = llog_close(env, *llh);
1427         *llh = NULL;
1428
1429         return rc;
1430 }
1431
1432 /******************** config "macros" *********************/
1433
1434 /* write an lcfg directly into a log (with markers) */
1435 static int mgs_write_log_direct(const struct lu_env *env,
1436                                 struct mgs_device *mgs, struct fs_db *fsdb,
1437                                 char *logname, struct lustre_cfg *lcfg,
1438                                 char *devname, char *comment)
1439 {
1440         struct llog_handle *llh = NULL;
1441         int rc;
1442         ENTRY;
1443
1444         if (!lcfg)
1445                 RETURN(-ENOMEM);
1446
1447         rc = record_start_log(env, mgs, &llh, logname);
1448         if (rc)
1449                 RETURN(rc);
1450
1451         /* FIXME These should be a single journal transaction */
1452         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1453         if (rc)
1454                 GOTO(out_end, rc);
1455         rc = record_lcfg(env, llh, lcfg);
1456         if (rc)
1457                 GOTO(out_end, rc);
1458         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1459         if (rc)
1460                 GOTO(out_end, rc);
1461 out_end:
1462         record_end_log(env, &llh);
1463         RETURN(rc);
1464 }
1465
1466 /* write the lcfg in all logs for the given fs */
1467 int mgs_write_log_direct_all(const struct lu_env *env,
1468                              struct mgs_device *mgs,
1469                              struct fs_db *fsdb,
1470                              struct mgs_target_info *mti,
1471                              struct lustre_cfg *lcfg,
1472                              char *devname, char *comment,
1473                              int server_only)
1474 {
1475         cfs_list_t list;
1476         struct mgs_direntry *dirent, *n;
1477         char *fsname = mti->mti_fsname;
1478         char *logname;
1479         int rc = 0, len = strlen(fsname);
1480         ENTRY;
1481
1482         /* We need to set params for any future logs
1483            as well. FIXME Append this file to every new log.
1484            Actually, we should store as params (text), not llogs.  Or
1485            in a database. */
1486         rc = name_create(&logname, fsname, "-params");
1487         if (rc)
1488                 RETURN(rc);
1489         if (mgs_log_is_empty(env, mgs, logname)) {
1490                 struct llog_handle *llh = NULL;
1491                 rc = record_start_log(env, mgs, &llh, logname);
1492                 if (rc == 0)
1493                         record_end_log(env, &llh);
1494         }
1495         name_destroy(&logname);
1496         if (rc)
1497                 RETURN(rc);
1498
1499         /* Find all the logs in the CONFIGS directory */
1500         rc = class_dentry_readdir(env, mgs, &list);
1501         if (rc)
1502                 RETURN(rc);
1503
1504         /* Could use fsdb index maps instead of directory listing */
1505         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1506                 cfs_list_del(&dirent->list);
1507                 /* don't write to sptlrpc rule log */
1508                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1509                         goto next;
1510
1511                 /* caller wants write server logs only */
1512                 if (server_only && strstr(dirent->name, "-client") != NULL)
1513                         goto next;
1514
1515                 if (strncmp(fsname, dirent->name, len) == 0) {
1516                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1517                         /* Erase any old settings of this same parameter */
1518                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1519                                         devname, comment, CM_SKIP);
1520                         if (rc < 0)
1521                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1522                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1523                         /* Write the new one */
1524                         if (lcfg) {
1525                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1526                                                           dirent->name,
1527                                                           lcfg, devname,
1528                                                           comment);
1529                                 if (rc)
1530                                         CERROR("%s: writing log %s: rc = %d\n",
1531                                                mgs->mgs_obd->obd_name,
1532                                                dirent->name, rc);
1533                         }
1534                 }
1535 next:
1536                 mgs_direntry_free(dirent);
1537         }
1538
1539         RETURN(rc);
1540 }
1541
1542 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1543                                     struct mgs_device *mgs,
1544                                     struct fs_db *fsdb,
1545                                     struct mgs_target_info *mti,
1546                                     int index, char *logname);
1547 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1548                                     struct mgs_device *mgs,
1549                                     struct fs_db *fsdb,
1550                                     struct mgs_target_info *mti,
1551                                     char *logname, char *suffix, char *lovname,
1552                                     enum lustre_sec_part sec_part, int flags);
1553 static int name_create_mdt_and_lov(char **logname, char **lovname,
1554                                    struct fs_db *fsdb, int i);
1555
1556 static int add_param(char *params, char *key, char *val)
1557 {
1558         char *start = params + strlen(params);
1559         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1560         int keylen = 0;
1561
1562         if (key != NULL)
1563                 keylen = strlen(key);
1564         if (start + 1 + keylen + strlen(val) >= end) {
1565                 CERROR("params are too long: %s %s%s\n",
1566                        params, key != NULL ? key : "", val);
1567                 return -EINVAL;
1568         }
1569
1570         sprintf(start, " %s%s", key != NULL ? key : "", val);
1571         return 0;
1572 }
1573
1574 /**
1575  * Walk through client config log record and convert the related records
1576  * into the target.
1577  **/
1578 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1579                                          struct llog_handle *llh,
1580                                          struct llog_rec_hdr *rec, void *data)
1581 {
1582         struct mgs_device *mgs;
1583         struct obd_device *obd;
1584         struct mgs_target_info *mti, *tmti;
1585         struct fs_db *fsdb;
1586         int cfg_len = rec->lrh_len;
1587         char *cfg_buf = (char*) (rec + 1);
1588         struct lustre_cfg *lcfg;
1589         int rc = 0;
1590         struct llog_handle *mdt_llh = NULL;
1591         static int got_an_osc_or_mdc = 0;
1592         /* 0: not found any osc/mdc;
1593            1: found osc;
1594            2: found mdc;
1595         */
1596         static int last_step = -1;
1597         int cplen = 0;
1598
1599         ENTRY;
1600
1601         mti = ((struct temp_comp*)data)->comp_mti;
1602         tmti = ((struct temp_comp*)data)->comp_tmti;
1603         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1604         obd = ((struct temp_comp *)data)->comp_obd;
1605         mgs = lu2mgs_dev(obd->obd_lu_dev);
1606         LASSERT(mgs);
1607
1608         if (rec->lrh_type != OBD_CFG_REC) {
1609                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1610                 RETURN(-EINVAL);
1611         }
1612
1613         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1614         if (rc) {
1615                 CERROR("Insane cfg\n");
1616                 RETURN(rc);
1617         }
1618
1619         lcfg = (struct lustre_cfg *)cfg_buf;
1620
1621         if (lcfg->lcfg_command == LCFG_MARKER) {
1622                 struct cfg_marker *marker;
1623                 marker = lustre_cfg_buf(lcfg, 1);
1624                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1625                     (marker->cm_flags & CM_START) &&
1626                      !(marker->cm_flags & CM_SKIP)) {
1627                         got_an_osc_or_mdc = 1;
1628                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1629                                         sizeof(tmti->mti_svname));
1630                         if (cplen >= sizeof(tmti->mti_svname))
1631                                 RETURN(-E2BIG);
1632                         rc = record_start_log(env, mgs, &mdt_llh,
1633                                               mti->mti_svname);
1634                         if (rc)
1635                                 RETURN(rc);
1636                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1637                                            mti->mti_svname, "add osc(copied)");
1638                         record_end_log(env, &mdt_llh);
1639                         last_step = marker->cm_step;
1640                         RETURN(rc);
1641                 }
1642                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1643                     (marker->cm_flags & CM_END) &&
1644                      !(marker->cm_flags & CM_SKIP)) {
1645                         LASSERT(last_step == marker->cm_step);
1646                         last_step = -1;
1647                         got_an_osc_or_mdc = 0;
1648                         memset(tmti, 0, sizeof(*tmti));
1649                         rc = record_start_log(env, mgs, &mdt_llh,
1650                                               mti->mti_svname);
1651                         if (rc)
1652                                 RETURN(rc);
1653                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1654                                            mti->mti_svname, "add osc(copied)");
1655                         record_end_log(env, &mdt_llh);
1656                         RETURN(rc);
1657                 }
1658                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1659                     (marker->cm_flags & CM_START) &&
1660                      !(marker->cm_flags & CM_SKIP)) {
1661                         got_an_osc_or_mdc = 2;
1662                         last_step = marker->cm_step;
1663                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1664                                strlen(marker->cm_tgtname));
1665
1666                         RETURN(rc);
1667                 }
1668                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1669                     (marker->cm_flags & CM_END) &&
1670                      !(marker->cm_flags & CM_SKIP)) {
1671                         LASSERT(last_step == marker->cm_step);
1672                         last_step = -1;
1673                         got_an_osc_or_mdc = 0;
1674                         memset(tmti, 0, sizeof(*tmti));
1675                         RETURN(rc);
1676                 }
1677         }
1678
1679         if (got_an_osc_or_mdc == 0 || last_step < 0)
1680                 RETURN(rc);
1681
1682         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1683                 uint64_t nodenid = lcfg->lcfg_nid;
1684
1685                 if (strlen(tmti->mti_uuid) == 0) {
1686                         /* target uuid not set, this config record is before
1687                          * LCFG_SETUP, this nid is one of target node nid.
1688                          */
1689                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1690                         tmti->mti_nid_count++;
1691                 } else {
1692                         /* failover node nid */
1693                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1694                                        libcfs_nid2str(nodenid));
1695                 }
1696
1697                 RETURN(rc);
1698         }
1699
1700         if (lcfg->lcfg_command == LCFG_SETUP) {
1701                 char *target;
1702
1703                 target = lustre_cfg_string(lcfg, 1);
1704                 memcpy(tmti->mti_uuid, target, strlen(target));
1705                 RETURN(rc);
1706         }
1707
1708         /* ignore client side sptlrpc_conf_log */
1709         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1710                 RETURN(rc);
1711
1712         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1713                 int index;
1714
1715                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1716                         RETURN (-EINVAL);
1717
1718                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1719                        strlen(mti->mti_fsname));
1720                 tmti->mti_stripe_index = index;
1721
1722                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1723                                               mti->mti_stripe_index,
1724                                               mti->mti_svname);
1725                 memset(tmti, 0, sizeof(*tmti));
1726                 RETURN(rc);
1727         }
1728
1729         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1730                 int index;
1731                 char mdt_index[9];
1732                 char *logname, *lovname;
1733
1734                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1735                                              mti->mti_stripe_index);
1736                 if (rc)
1737                         RETURN(rc);
1738                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1739
1740                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1741                         name_destroy(&logname);
1742                         name_destroy(&lovname);
1743                         RETURN(-EINVAL);
1744                 }
1745
1746                 tmti->mti_stripe_index = index;
1747                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1748                                          mdt_index, lovname,
1749                                          LUSTRE_SP_MDT, 0);
1750                 name_destroy(&logname);
1751                 name_destroy(&lovname);
1752                 RETURN(rc);
1753         }
1754         RETURN(rc);
1755 }
1756
1757 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1758 /* stealed from mgs_get_fsdb_from_llog*/
1759 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1760                                               struct mgs_device *mgs,
1761                                               char *client_name,
1762                                               struct temp_comp* comp)
1763 {
1764         struct llog_handle *loghandle;
1765         struct mgs_target_info *tmti;
1766         struct llog_ctxt *ctxt;
1767         int rc;
1768
1769         ENTRY;
1770
1771         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1772         LASSERT(ctxt != NULL);
1773
1774         OBD_ALLOC_PTR(tmti);
1775         if (tmti == NULL)
1776                 GOTO(out_ctxt, rc = -ENOMEM);
1777
1778         comp->comp_tmti = tmti;
1779         comp->comp_obd = mgs->mgs_obd;
1780
1781         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1782                        LLOG_OPEN_EXISTS);
1783         if (rc < 0) {
1784                 if (rc == -ENOENT)
1785                         rc = 0;
1786                 GOTO(out_pop, rc);
1787         }
1788
1789         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1790         if (rc)
1791                 GOTO(out_close, rc);
1792
1793         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1794                                   (void *)comp, NULL, false);
1795         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1796 out_close:
1797         llog_close(env, loghandle);
1798 out_pop:
1799         OBD_FREE_PTR(tmti);
1800 out_ctxt:
1801         llog_ctxt_put(ctxt);
1802         RETURN(rc);
1803 }
1804
1805 /* lmv is the second thing for client logs */
1806 /* copied from mgs_write_log_lov. Please refer to that.  */
1807 static int mgs_write_log_lmv(const struct lu_env *env,
1808                              struct mgs_device *mgs,
1809                              struct fs_db *fsdb,
1810                              struct mgs_target_info *mti,
1811                              char *logname, char *lmvname)
1812 {
1813         struct llog_handle *llh = NULL;
1814         struct lmv_desc *lmvdesc;
1815         char *uuid;
1816         int rc = 0;
1817         ENTRY;
1818
1819         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1820
1821         OBD_ALLOC_PTR(lmvdesc);
1822         if (lmvdesc == NULL)
1823                 RETURN(-ENOMEM);
1824         lmvdesc->ld_active_tgt_count = 0;
1825         lmvdesc->ld_tgt_count = 0;
1826         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1827         uuid = (char *)lmvdesc->ld_uuid.uuid;
1828
1829         rc = record_start_log(env, mgs, &llh, logname);
1830         if (rc)
1831                 GOTO(out_free, rc);
1832         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1833         if (rc)
1834                 GOTO(out_end, rc);
1835         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1836         if (rc)
1837                 GOTO(out_end, rc);
1838         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1839         if (rc)
1840                 GOTO(out_end, rc);
1841         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1842         if (rc)
1843                 GOTO(out_end, rc);
1844 out_end:
1845         record_end_log(env, &llh);
1846 out_free:
1847         OBD_FREE_PTR(lmvdesc);
1848         RETURN(rc);
1849 }
1850
1851 /* lov is the first thing in the mdt and client logs */
1852 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1853                              struct fs_db *fsdb, struct mgs_target_info *mti,
1854                              char *logname, char *lovname)
1855 {
1856         struct llog_handle *llh = NULL;
1857         struct lov_desc *lovdesc;
1858         char *uuid;
1859         int rc = 0;
1860         ENTRY;
1861
1862         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1863
1864         /*
1865         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1866         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1867               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1868         */
1869
1870         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1871         OBD_ALLOC_PTR(lovdesc);
1872         if (lovdesc == NULL)
1873                 RETURN(-ENOMEM);
1874         lovdesc->ld_magic = LOV_DESC_MAGIC;
1875         lovdesc->ld_tgt_count = 0;
1876         /* Defaults.  Can be changed later by lcfg config_param */
1877         lovdesc->ld_default_stripe_count = 1;
1878         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1879         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1880         lovdesc->ld_default_stripe_offset = -1;
1881         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1882         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1883         /* can these be the same? */
1884         uuid = (char *)lovdesc->ld_uuid.uuid;
1885
1886         /* This should always be the first entry in a log.
1887         rc = mgs_clear_log(obd, logname); */
1888         rc = record_start_log(env, mgs, &llh, logname);
1889         if (rc)
1890                 GOTO(out_free, rc);
1891         /* FIXME these should be a single journal transaction */
1892         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1893         if (rc)
1894                 GOTO(out_end, rc);
1895         rc = record_attach(env, llh, lovname, "lov", uuid);
1896         if (rc)
1897                 GOTO(out_end, rc);
1898         rc = record_lov_setup(env, llh, lovname, lovdesc);
1899         if (rc)
1900                 GOTO(out_end, rc);
1901         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1902         if (rc)
1903                 GOTO(out_end, rc);
1904         EXIT;
1905 out_end:
1906         record_end_log(env, &llh);
1907 out_free:
1908         OBD_FREE_PTR(lovdesc);
1909         return rc;
1910 }
1911
1912 /* add failnids to open log */
1913 static int mgs_write_log_failnids(const struct lu_env *env,
1914                                   struct mgs_target_info *mti,
1915                                   struct llog_handle *llh,
1916                                   char *cliname)
1917 {
1918         char *failnodeuuid = NULL;
1919         char *ptr = mti->mti_params;
1920         lnet_nid_t nid;
1921         int rc = 0;
1922
1923         /*
1924         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1925         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1926         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1927         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1928         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1929         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1930         */
1931
1932         /* Pull failnid info out of params string */
1933         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1934                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1935                         if (failnodeuuid == NULL) {
1936                                 /* We don't know the failover node name,
1937                                    so just use the first nid as the uuid */
1938                                 rc = name_create(&failnodeuuid,
1939                                                  libcfs_nid2str(nid), "");
1940                                 if (rc)
1941                                         return rc;
1942                         }
1943                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1944                                "client %s\n", libcfs_nid2str(nid),
1945                                failnodeuuid, cliname);
1946                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1947                 }
1948                 if (failnodeuuid) {
1949                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1950                         name_destroy(&failnodeuuid);
1951                         failnodeuuid = NULL;
1952                 }
1953         }
1954
1955         return rc;
1956 }
1957
1958 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1959                                     struct mgs_device *mgs,
1960                                     struct fs_db *fsdb,
1961                                     struct mgs_target_info *mti,
1962                                     char *logname, char *lmvname)
1963 {
1964         struct llog_handle *llh = NULL;
1965         char *mdcname = NULL;
1966         char *nodeuuid = NULL;
1967         char *mdcuuid = NULL;
1968         char *lmvuuid = NULL;
1969         char index[6];
1970         int i, rc;
1971         ENTRY;
1972
1973         if (mgs_log_is_empty(env, mgs, logname)) {
1974                 CERROR("log is empty! Logical error\n");
1975                 RETURN(-EINVAL);
1976         }
1977
1978         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1979                mti->mti_svname, logname, lmvname);
1980
1981         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1982         if (rc)
1983                 RETURN(rc);
1984         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1985         if (rc)
1986                 GOTO(out_free, rc);
1987         rc = name_create(&mdcuuid, mdcname, "_UUID");
1988         if (rc)
1989                 GOTO(out_free, rc);
1990         rc = name_create(&lmvuuid, lmvname, "_UUID");
1991         if (rc)
1992                 GOTO(out_free, rc);
1993
1994         rc = record_start_log(env, mgs, &llh, logname);
1995         if (rc)
1996                 GOTO(out_free, rc);
1997         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1998                            "add mdc");
1999         if (rc)
2000                 GOTO(out_end, rc);
2001         for (i = 0; i < mti->mti_nid_count; i++) {
2002                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2003                        libcfs_nid2str(mti->mti_nids[i]));
2004
2005                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2006                 if (rc)
2007                         GOTO(out_end, rc);
2008         }
2009
2010         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2011         if (rc)
2012                 GOTO(out_end, rc);
2013         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
2014         if (rc)
2015                 GOTO(out_end, rc);
2016         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2017         if (rc)
2018                 GOTO(out_end, rc);
2019         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2020         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2021                             index, "1");
2022         if (rc)
2023                 GOTO(out_end, rc);
2024         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2025                            "add mdc");
2026         if (rc)
2027                 GOTO(out_end, rc);
2028 out_end:
2029         record_end_log(env, &llh);
2030 out_free:
2031         name_destroy(&lmvuuid);
2032         name_destroy(&mdcuuid);
2033         name_destroy(&mdcname);
2034         name_destroy(&nodeuuid);
2035         RETURN(rc);
2036 }
2037
2038 static inline int name_create_lov(char **lovname, char *mdtname,
2039                                   struct fs_db *fsdb, int index)
2040 {
2041         /* COMPAT_180 */
2042         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2043                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2044         else
2045                 return name_create(lovname, mdtname, "-mdtlov");
2046 }
2047
2048 static int name_create_mdt_and_lov(char **logname, char **lovname,
2049                                    struct fs_db *fsdb, int i)
2050 {
2051         int rc;
2052
2053         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2054         if (rc)
2055                 return rc;
2056         /* COMPAT_180 */
2057         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2058                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2059         else
2060                 rc = name_create(lovname, *logname, "-mdtlov");
2061         if (rc) {
2062                 name_destroy(logname);
2063                 *logname = NULL;
2064         }
2065         return rc;
2066 }
2067
2068 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2069                                       struct fs_db *fsdb, int i)
2070 {
2071         char suffix[16];
2072
2073         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2074                 sprintf(suffix, "-osc");
2075         else
2076                 sprintf(suffix, "-osc-MDT%04x", i);
2077         return name_create(oscname, ostname, suffix);
2078 }
2079
2080 /* add new mdc to already existent MDS */
2081 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2082                                     struct mgs_device *mgs,
2083                                     struct fs_db *fsdb,
2084                                     struct mgs_target_info *mti,
2085                                     int mdt_index, char *logname)
2086 {
2087         struct llog_handle      *llh = NULL;
2088         char    *nodeuuid = NULL;
2089         char    *ospname = NULL;
2090         char    *lovuuid = NULL;
2091         char    *mdtuuid = NULL;
2092         char    *svname = NULL;
2093         char    *mdtname = NULL;
2094         char    *lovname = NULL;
2095         char    index_str[16];
2096         int     i, rc;
2097
2098         ENTRY;
2099         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2100                 CERROR("log is empty! Logical error\n");
2101                 RETURN (-EINVAL);
2102         }
2103
2104         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2105                logname);
2106
2107         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2108         if (rc)
2109                 RETURN(rc);
2110
2111         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2112         if (rc)
2113                 GOTO(out_destory, rc);
2114
2115         rc = name_create(&svname, mdtname, "-osp");
2116         if (rc)
2117                 GOTO(out_destory, rc);
2118
2119         sprintf(index_str, "-MDT%04x", mdt_index);
2120         rc = name_create(&ospname, svname, index_str);
2121         if (rc)
2122                 GOTO(out_destory, rc);
2123
2124         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2125         if (rc)
2126                 GOTO(out_destory, rc);
2127
2128         rc = name_create(&lovuuid, lovname, "_UUID");
2129         if (rc)
2130                 GOTO(out_destory, rc);
2131
2132         rc = name_create(&mdtuuid, mdtname, "_UUID");
2133         if (rc)
2134                 GOTO(out_destory, rc);
2135
2136         rc = record_start_log(env, mgs, &llh, logname);
2137         if (rc)
2138                 GOTO(out_destory, rc);
2139
2140         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2141                            "add osp");
2142         if (rc)
2143                 GOTO(out_destory, rc);
2144
2145         for (i = 0; i < mti->mti_nid_count; i++) {
2146                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2147                        libcfs_nid2str(mti->mti_nids[i]));
2148                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2149                 if (rc)
2150                         GOTO(out_end, rc);
2151         }
2152
2153         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2154         if (rc)
2155                 GOTO(out_end, rc);
2156
2157         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2158                           NULL, NULL);
2159         if (rc)
2160                 GOTO(out_end, rc);
2161
2162         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2163         if (rc)
2164                 GOTO(out_end, rc);
2165
2166         /* Add mdc(osp) to lod */
2167         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2168                  mti->mti_stripe_index);
2169         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2170                          index_str, "1", NULL);
2171         if (rc)
2172                 GOTO(out_end, rc);
2173
2174         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2175         if (rc)
2176                 GOTO(out_end, rc);
2177
2178 out_end:
2179         record_end_log(env, &llh);
2180
2181 out_destory:
2182         name_destroy(&mdtuuid);
2183         name_destroy(&lovuuid);
2184         name_destroy(&lovname);
2185         name_destroy(&ospname);
2186         name_destroy(&svname);
2187         name_destroy(&nodeuuid);
2188         name_destroy(&mdtname);
2189         RETURN(rc);
2190 }
2191
2192 static int mgs_write_log_mdt0(const struct lu_env *env,
2193                               struct mgs_device *mgs,
2194                               struct fs_db *fsdb,
2195                               struct mgs_target_info *mti)
2196 {
2197         char *log = mti->mti_svname;
2198         struct llog_handle *llh = NULL;
2199         char *uuid, *lovname;
2200         char mdt_index[6];
2201         char *ptr = mti->mti_params;
2202         int rc = 0, failout = 0;
2203         ENTRY;
2204
2205         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2206         if (uuid == NULL)
2207                 RETURN(-ENOMEM);
2208
2209         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2210                 failout = (strncmp(ptr, "failout", 7) == 0);
2211
2212         rc = name_create(&lovname, log, "-mdtlov");
2213         if (rc)
2214                 GOTO(out_free, rc);
2215         if (mgs_log_is_empty(env, mgs, log)) {
2216                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2217                 if (rc)
2218                         GOTO(out_lod, rc);
2219         }
2220
2221         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2222
2223         rc = record_start_log(env, mgs, &llh, log);
2224         if (rc)
2225                 GOTO(out_lod, rc);
2226
2227         /* add MDT itself */
2228
2229         /* FIXME this whole fn should be a single journal transaction */
2230         sprintf(uuid, "%s_UUID", log);
2231         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2232         if (rc)
2233                 GOTO(out_lod, rc);
2234         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2235         if (rc)
2236                 GOTO(out_end, rc);
2237         rc = record_mount_opt(env, llh, log, lovname, NULL);
2238         if (rc)
2239                 GOTO(out_end, rc);
2240         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2241                         failout ? "n" : "f");
2242         if (rc)
2243                 GOTO(out_end, rc);
2244         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2245         if (rc)
2246                 GOTO(out_end, rc);
2247 out_end:
2248         record_end_log(env, &llh);
2249 out_lod:
2250         name_destroy(&lovname);
2251 out_free:
2252         OBD_FREE(uuid, sizeof(struct obd_uuid));
2253         RETURN(rc);
2254 }
2255
2256 /* envelope method for all layers log */
2257 static int mgs_write_log_mdt(const struct lu_env *env,
2258                              struct mgs_device *mgs,
2259                              struct fs_db *fsdb,
2260                              struct mgs_target_info *mti)
2261 {
2262         struct mgs_thread_info *mgi = mgs_env_info(env);
2263         struct llog_handle *llh = NULL;
2264         char *cliname;
2265         int rc, i = 0;
2266         ENTRY;
2267
2268         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2269
2270         if (mti->mti_uuid[0] == '\0') {
2271                 /* Make up our own uuid */
2272                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2273                          "%s_UUID", mti->mti_svname);
2274         }
2275
2276         /* add mdt */
2277         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2278         if (rc)
2279                 RETURN(rc);
2280         /* Append the mdt info to the client log */
2281         rc = name_create(&cliname, mti->mti_fsname, "-client");
2282         if (rc)
2283                 RETURN(rc);
2284
2285         if (mgs_log_is_empty(env, mgs, cliname)) {
2286                 /* Start client log */
2287                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2288                                        fsdb->fsdb_clilov);
2289                 if (rc)
2290                         GOTO(out_free, rc);
2291                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2292                                        fsdb->fsdb_clilmv);
2293                 if (rc)
2294                         GOTO(out_free, rc);
2295         }
2296
2297         /*
2298         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2299         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2300         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2301         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2302         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2303         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2304         */
2305
2306                 /* copy client info about lov/lmv */
2307                 mgi->mgi_comp.comp_mti = mti;
2308                 mgi->mgi_comp.comp_fsdb = fsdb;
2309
2310                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2311                                                         &mgi->mgi_comp);
2312                 if (rc)
2313                         GOTO(out_free, rc);
2314                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2315                                               fsdb->fsdb_clilmv);
2316                 if (rc)
2317                         GOTO(out_free, rc);
2318
2319                 /* add mountopts */
2320                 rc = record_start_log(env, mgs, &llh, cliname);
2321                 if (rc)
2322                         GOTO(out_free, rc);
2323
2324                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2325                                    "mount opts");
2326                 if (rc)
2327                         GOTO(out_end, rc);
2328                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2329                                       fsdb->fsdb_clilmv);
2330                 if (rc)
2331                         GOTO(out_end, rc);
2332                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2333                                    "mount opts");
2334
2335         if (rc)
2336                 GOTO(out_end, rc);
2337
2338         /* for_all_existing_mdt except current one */
2339         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2340                 if (i !=  mti->mti_stripe_index &&
2341                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2342                         char *logname;
2343
2344                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2345                         if (rc)
2346                                 GOTO(out_end, rc);
2347
2348                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2349                                                       i, logname);
2350                         name_destroy(&logname);
2351                         if (rc)
2352                                 GOTO(out_end, rc);
2353                 }
2354         }
2355 out_end:
2356         record_end_log(env, &llh);
2357 out_free:
2358         name_destroy(&cliname);
2359         RETURN(rc);
2360 }
2361
2362 /* Add the ost info to the client/mdt lov */
2363 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2364                                     struct mgs_device *mgs, struct fs_db *fsdb,
2365                                     struct mgs_target_info *mti,
2366                                     char *logname, char *suffix, char *lovname,
2367                                     enum lustre_sec_part sec_part, int flags)
2368 {
2369         struct llog_handle *llh = NULL;
2370         char *nodeuuid = NULL;
2371         char *oscname = NULL;
2372         char *oscuuid = NULL;
2373         char *lovuuid = NULL;
2374         char *svname = NULL;
2375         char index[6];
2376         int i, rc;
2377
2378         ENTRY;
2379         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2380                mti->mti_svname, logname);
2381
2382         if (mgs_log_is_empty(env, mgs, logname)) {
2383                 CERROR("log is empty! Logical error\n");
2384                 RETURN (-EINVAL);
2385         }
2386
2387         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2388         if (rc)
2389                 RETURN(rc);
2390         rc = name_create(&svname, mti->mti_svname, "-osc");
2391         if (rc)
2392                 GOTO(out_free, rc);
2393
2394         /* for the system upgraded from old 1.8, keep using the old osc naming
2395          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2396         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2397                 rc = name_create(&oscname, svname, "");
2398         else
2399                 rc = name_create(&oscname, svname, suffix);
2400         if (rc)
2401                 GOTO(out_free, rc);
2402
2403         rc = name_create(&oscuuid, oscname, "_UUID");
2404         if (rc)
2405                 GOTO(out_free, rc);
2406         rc = name_create(&lovuuid, lovname, "_UUID");
2407         if (rc)
2408                 GOTO(out_free, rc);
2409
2410
2411         /*
2412         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2413         multihomed (#4)
2414         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2415         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2416         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2417         failover (#6,7)
2418         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2419         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2420         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2421         */
2422
2423         rc = record_start_log(env, mgs, &llh, logname);
2424         if (rc)
2425                 GOTO(out_free, rc);
2426
2427         /* FIXME these should be a single journal transaction */
2428         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2429                            "add osc");
2430         if (rc)
2431                 GOTO(out_end, rc);
2432
2433         /* NB: don't change record order, because upon MDT steal OSC config
2434          * from client, it treats all nids before LCFG_SETUP as target nids
2435          * (multiple interfaces), while nids after as failover node nids.
2436          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2437          */
2438         for (i = 0; i < mti->mti_nid_count; i++) {
2439                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2440                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2441                 if (rc)
2442                         GOTO(out_end, rc);
2443         }
2444         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2445         if (rc)
2446                 GOTO(out_end, rc);
2447         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2448         if (rc)
2449                 GOTO(out_end, rc);
2450         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2451         if (rc)
2452                 GOTO(out_end, rc);
2453
2454         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2455
2456         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2457         if (rc)
2458                 GOTO(out_end, rc);
2459         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2460                            "add osc");
2461         if (rc)
2462                 GOTO(out_end, rc);
2463 out_end:
2464         record_end_log(env, &llh);
2465 out_free:
2466         name_destroy(&lovuuid);
2467         name_destroy(&oscuuid);
2468         name_destroy(&oscname);
2469         name_destroy(&svname);
2470         name_destroy(&nodeuuid);
2471         RETURN(rc);
2472 }
2473
2474 static int mgs_write_log_ost(const struct lu_env *env,
2475                              struct mgs_device *mgs, struct fs_db *fsdb,
2476                              struct mgs_target_info *mti)
2477 {
2478         struct llog_handle *llh = NULL;
2479         char *logname, *lovname;
2480         char *ptr = mti->mti_params;
2481         int rc, flags = 0, failout = 0, i;
2482         ENTRY;
2483
2484         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2485
2486         /* The ost startup log */
2487
2488         /* If the ost log already exists, that means that someone reformatted
2489            the ost and it called target_add again. */
2490         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2491                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2492                                    "exists, yet the server claims it never "
2493                                    "registered. It may have been reformatted, "
2494                                    "or the index changed. writeconf the MDT to "
2495                                    "regenerate all logs.\n", mti->mti_svname);
2496                 RETURN(-EALREADY);
2497         }
2498
2499         /*
2500         attach obdfilter ost1 ost1_UUID
2501         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2502         */
2503         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2504                 failout = (strncmp(ptr, "failout", 7) == 0);
2505         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2506         if (rc)
2507                 RETURN(rc);
2508         /* FIXME these should be a single journal transaction */
2509         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2510         if (rc)
2511                 GOTO(out_end, rc);
2512         if (*mti->mti_uuid == '\0')
2513                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2514                          "%s_UUID", mti->mti_svname);
2515         rc = record_attach(env, llh, mti->mti_svname,
2516                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2517         if (rc)
2518                 GOTO(out_end, rc);
2519         rc = record_setup(env, llh, mti->mti_svname,
2520                           "dev"/*ignored*/, "type"/*ignored*/,
2521                           failout ? "n" : "f", 0/*options*/);
2522         if (rc)
2523                 GOTO(out_end, rc);
2524         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2525         if (rc)
2526                 GOTO(out_end, rc);
2527 out_end:
2528         record_end_log(env, &llh);
2529         if (rc)
2530                 RETURN(rc);
2531         /* We also have to update the other logs where this osc is part of
2532            the lov */
2533
2534         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2535                 /* If we're upgrading, the old mdt log already has our
2536                    entry. Let's do a fake one for fun. */
2537                 /* Note that we can't add any new failnids, since we don't
2538                    know the old osc names. */
2539                 flags = CM_SKIP | CM_UPGRADE146;
2540
2541         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2542                 /* If the update flag isn't set, don't update client/mdt
2543                    logs. */
2544                 flags |= CM_SKIP;
2545                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2546                               "the MDT first to regenerate it.\n",
2547                               mti->mti_svname);
2548         }
2549
2550         /* Add ost to all MDT lov defs */
2551         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2552                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2553                         char mdt_index[9];
2554
2555                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2556                                                      i);
2557                         if (rc)
2558                                 RETURN(rc);
2559                         sprintf(mdt_index, "-MDT%04x", i);
2560                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2561                                                       logname, mdt_index,
2562                                                       lovname, LUSTRE_SP_MDT,
2563                                                       flags);
2564                         name_destroy(&logname);
2565                         name_destroy(&lovname);
2566                         if (rc)
2567                                 RETURN(rc);
2568                 }
2569         }
2570
2571         /* Append ost info to the client log */
2572         rc = name_create(&logname, mti->mti_fsname, "-client");
2573         if (rc)
2574                 RETURN(rc);
2575         if (mgs_log_is_empty(env, mgs, logname)) {
2576                 /* Start client log */
2577                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2578                                        fsdb->fsdb_clilov);
2579                 if (rc)
2580                         GOTO(out_free, rc);
2581                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2582                                        fsdb->fsdb_clilmv);
2583                 if (rc)
2584                         GOTO(out_free, rc);
2585         }
2586         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2587                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2588 out_free:
2589         name_destroy(&logname);
2590         RETURN(rc);
2591 }
2592
2593 static __inline__ int mgs_param_empty(char *ptr)
2594 {
2595         char *tmp;
2596
2597         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2598                 return 1;
2599         return 0;
2600 }
2601
2602 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2603                                           struct mgs_device *mgs,
2604                                           struct fs_db *fsdb,
2605                                           struct mgs_target_info *mti,
2606                                           char *logname, char *cliname)
2607 {
2608         int rc;
2609         struct llog_handle *llh = NULL;
2610
2611         if (mgs_param_empty(mti->mti_params)) {
2612                 /* Remove _all_ failnids */
2613                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2614                                 mti->mti_svname, "add failnid", CM_SKIP);
2615                 return rc < 0 ? rc : 0;
2616         }
2617
2618         /* Otherwise failover nids are additive */
2619         rc = record_start_log(env, mgs, &llh, logname);
2620         if (rc)
2621                 return rc;
2622                 /* FIXME this should be a single journal transaction */
2623         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2624                            "add failnid");
2625         if (rc)
2626                 goto out_end;
2627         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2628         if (rc)
2629                 goto out_end;
2630         rc = record_marker(env, llh, fsdb, CM_END,
2631                            mti->mti_svname, "add failnid");
2632 out_end:
2633         record_end_log(env, &llh);
2634         return rc;
2635 }
2636
2637
2638 /* Add additional failnids to an existing log.
2639    The mdc/osc must have been added to logs first */
2640 /* tcp nids must be in dotted-quad ascii -
2641    we can't resolve hostnames from the kernel. */
2642 static int mgs_write_log_add_failnid(const struct lu_env *env,
2643                                      struct mgs_device *mgs,
2644                                      struct fs_db *fsdb,
2645                                      struct mgs_target_info *mti)
2646 {
2647         char *logname, *cliname;
2648         int rc;
2649         ENTRY;
2650
2651         /* FIXME we currently can't erase the failnids
2652          * given when a target first registers, since they aren't part of
2653          * an "add uuid" stanza */
2654
2655         /* Verify that we know about this target */
2656         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2657                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2658                                    "yet. It must be started before failnids "
2659                                    "can be added.\n", mti->mti_svname);
2660                 RETURN(-ENOENT);
2661         }
2662
2663         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2664         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2665                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2666         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2667                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2668         } else {
2669                 RETURN(-EINVAL);
2670         }
2671         if (rc)
2672                 RETURN(rc);
2673         /* Add failover nids to the client log */
2674         rc = name_create(&logname, mti->mti_fsname, "-client");
2675         if (rc) {
2676                 name_destroy(&cliname);
2677                 RETURN(rc);
2678         }
2679         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2680         name_destroy(&logname);
2681         name_destroy(&cliname);
2682         if (rc)
2683                 RETURN(rc);
2684
2685         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2686                 /* Add OST failover nids to the MDT logs as well */
2687                 int i;
2688
2689                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2690                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2691                                 continue;
2692                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2693                         if (rc)
2694                                 RETURN(rc);
2695                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2696                                                  fsdb, i);
2697                         if (rc) {
2698                                 name_destroy(&logname);
2699                                 RETURN(rc);
2700                         }
2701                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2702                                                             mti, logname,
2703                                                             cliname);
2704                         name_destroy(&cliname);
2705                         name_destroy(&logname);
2706                         if (rc)
2707                                 RETURN(rc);
2708                 }
2709         }
2710
2711         RETURN(rc);
2712 }
2713
2714 static int mgs_wlp_lcfg(const struct lu_env *env,
2715                         struct mgs_device *mgs, struct fs_db *fsdb,
2716                         struct mgs_target_info *mti,
2717                         char *logname, struct lustre_cfg_bufs *bufs,
2718                         char *tgtname, char *ptr)
2719 {
2720         char comment[MTI_NAME_MAXLEN];
2721         char *tmp;
2722         struct lustre_cfg *lcfg;
2723         int rc, del;
2724
2725         /* Erase any old settings of this same parameter */
2726         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2727         comment[MTI_NAME_MAXLEN - 1] = 0;
2728         /* But don't try to match the value. */
2729         if ((tmp = strchr(comment, '=')))
2730             *tmp = 0;
2731         /* FIXME we should skip settings that are the same as old values */
2732         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2733         if (rc < 0)
2734                 return rc;
2735         del = mgs_param_empty(ptr);
2736
2737         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2738                       "Sett" : "Modify", tgtname, comment, logname);
2739         if (del)
2740                 return rc;
2741
2742         lustre_cfg_bufs_reset(bufs, tgtname);
2743         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2744         if (mti->mti_flags & LDD_F_PARAM2)
2745                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2746
2747         lcfg = lustre_cfg_new((mti->mti_flags & LDD_F_PARAM2) ?
2748                               LCFG_SET_PARAM : LCFG_PARAM, bufs);
2749
2750         if (!lcfg)
2751                 return -ENOMEM;
2752         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2753         lustre_cfg_free(lcfg);
2754         return rc;
2755 }
2756
2757 static int mgs_write_log_param2(const struct lu_env *env,
2758                                 struct mgs_device *mgs,
2759                                 struct fs_db *fsdb,
2760                                 struct mgs_target_info *mti, char *ptr)
2761 {
2762         struct lustre_cfg_bufs  bufs;
2763         int                     rc = 0;
2764         ENTRY;
2765
2766         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2767         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2768                           mti->mti_svname, ptr);
2769
2770         RETURN(rc);
2771 }
2772
2773 /* write global variable settings into log */
2774 static int mgs_write_log_sys(const struct lu_env *env,
2775                              struct mgs_device *mgs, struct fs_db *fsdb,
2776                              struct mgs_target_info *mti, char *sys, char *ptr)
2777 {
2778         struct mgs_thread_info *mgi = mgs_env_info(env);
2779         struct lustre_cfg *lcfg;
2780         char *tmp, sep;
2781         int rc, cmd, convert = 1;
2782
2783         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2784                 cmd = LCFG_SET_TIMEOUT;
2785         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2786                 cmd = LCFG_SET_LDLM_TIMEOUT;
2787         /* Check for known params here so we can return error to lctl */
2788         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2789                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2790                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2791                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2792                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2793                 cmd = LCFG_PARAM;
2794         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2795                 convert = 0; /* Don't convert string value to integer */
2796                 cmd = LCFG_PARAM;
2797         } else {
2798                 return -EINVAL;
2799         }
2800
2801         if (mgs_param_empty(ptr))
2802                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2803         else
2804                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2805
2806         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2807         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2808         if (!convert && *tmp != '\0')
2809                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2810         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2811         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2812         /* truncate the comment to the parameter name */
2813         ptr = tmp - 1;
2814         sep = *ptr;
2815         *ptr = '\0';
2816         /* modify all servers and clients */
2817         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2818                                       *tmp == '\0' ? NULL : lcfg,
2819                                       mti->mti_fsname, sys, 0);
2820         if (rc == 0 && *tmp != '\0') {
2821                 switch (cmd) {
2822                 case LCFG_SET_TIMEOUT:
2823                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2824                                 class_process_config(lcfg);
2825                         break;
2826                 case LCFG_SET_LDLM_TIMEOUT:
2827                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2828                                 class_process_config(lcfg);
2829                         break;
2830                 default:
2831                         break;
2832                 }
2833         }
2834         *ptr = sep;
2835         lustre_cfg_free(lcfg);
2836         return rc;
2837 }
2838
2839 /* write quota settings into log */
2840 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2841                                struct fs_db *fsdb, struct mgs_target_info *mti,
2842                                char *quota, char *ptr)
2843 {
2844         struct mgs_thread_info  *mgi = mgs_env_info(env);
2845         struct lustre_cfg       *lcfg;
2846         char                    *tmp;
2847         char                     sep;
2848         int                      rc, cmd = LCFG_PARAM;
2849
2850         /* support only 'meta' and 'data' pools so far */
2851         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2852             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2853                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2854                        "& quota.ost are)\n", ptr);
2855                 return -EINVAL;
2856         }
2857
2858         if (*tmp == '\0') {
2859                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2860         } else {
2861                 CDEBUG(D_MGS, "global '%s'\n", quota);
2862
2863                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2864                     strcmp(tmp, "none") != 0) {
2865                         CERROR("enable option(%s) isn't supported\n", tmp);
2866                         return -EINVAL;
2867                 }
2868         }
2869
2870         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2871         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2872         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2873         /* truncate the comment to the parameter name */
2874         ptr = tmp - 1;
2875         sep = *ptr;
2876         *ptr = '\0';
2877
2878         /* XXX we duplicated quota enable information in all server
2879          *     config logs, it should be moved to a separate config
2880          *     log once we cleanup the config log for global param. */
2881         /* modify all servers */
2882         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2883                                       *tmp == '\0' ? NULL : lcfg,
2884                                       mti->mti_fsname, quota, 1);
2885         *ptr = sep;
2886         lustre_cfg_free(lcfg);
2887         return rc < 0 ? rc : 0;
2888 }
2889
2890 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2891                                    struct mgs_device *mgs,
2892                                    struct fs_db *fsdb,
2893                                    struct mgs_target_info *mti,
2894                                    char *param)
2895 {
2896         struct mgs_thread_info *mgi = mgs_env_info(env);
2897         struct llog_handle     *llh = NULL;
2898         char                   *logname;
2899         char                   *comment, *ptr;
2900         struct lustre_cfg      *lcfg;
2901         int                     rc, len;
2902         ENTRY;
2903
2904         /* get comment */
2905         ptr = strchr(param, '=');
2906         LASSERT(ptr);
2907         len = ptr - param;
2908
2909         OBD_ALLOC(comment, len + 1);
2910         if (comment == NULL)
2911                 RETURN(-ENOMEM);
2912         strncpy(comment, param, len);
2913         comment[len] = '\0';
2914
2915         /* prepare lcfg */
2916         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2917         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2918         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2919         if (lcfg == NULL)
2920                 GOTO(out_comment, rc = -ENOMEM);
2921
2922         /* construct log name */
2923         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2924         if (rc)
2925                 GOTO(out_lcfg, rc);
2926
2927         if (mgs_log_is_empty(env, mgs, logname)) {
2928                 rc = record_start_log(env, mgs, &llh, logname);
2929                 if (rc)
2930                         GOTO(out, rc);
2931                 record_end_log(env, &llh);
2932         }
2933
2934         /* obsolete old one */
2935         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2936                         comment, CM_SKIP);
2937         if (rc < 0)
2938                 GOTO(out, rc);
2939         /* write the new one */
2940         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2941                                   mti->mti_svname, comment);
2942         if (rc)
2943                 CERROR("err %d writing log %s\n", rc, logname);
2944 out:
2945         name_destroy(&logname);
2946 out_lcfg:
2947         lustre_cfg_free(lcfg);
2948 out_comment:
2949         OBD_FREE(comment, len + 1);
2950         RETURN(rc);
2951 }
2952
2953 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2954                                         char *param)
2955 {
2956         char    *ptr;
2957
2958         /* disable the adjustable udesc parameter for now, i.e. use default
2959          * setting that client always ship udesc to MDT if possible. to enable
2960          * it simply remove the following line */
2961         goto error_out;
2962
2963         ptr = strchr(param, '=');
2964         if (ptr == NULL)
2965                 goto error_out;
2966         *ptr++ = '\0';
2967
2968         if (strcmp(param, PARAM_SRPC_UDESC))
2969                 goto error_out;
2970
2971         if (strcmp(ptr, "yes") == 0) {
2972                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2973                 CWARN("Enable user descriptor shipping from client to MDT\n");
2974         } else if (strcmp(ptr, "no") == 0) {
2975                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2976                 CWARN("Disable user descriptor shipping from client to MDT\n");
2977         } else {
2978                 *(ptr - 1) = '=';
2979                 goto error_out;
2980         }
2981         return 0;
2982
2983 error_out:
2984         CERROR("Invalid param: %s\n", param);
2985         return -EINVAL;
2986 }
2987
2988 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2989                                   const char *svname,
2990                                   char *param)
2991 {
2992         struct sptlrpc_rule      rule;
2993         struct sptlrpc_rule_set *rset;
2994         int                      rc;
2995         ENTRY;
2996
2997         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2998                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2999                 RETURN(-EINVAL);
3000         }
3001
3002         if (strncmp(param, PARAM_SRPC_UDESC,
3003                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3004                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3005         }
3006
3007         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3008                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3009                 RETURN(-EINVAL);
3010         }
3011
3012         param += sizeof(PARAM_SRPC_FLVR) - 1;
3013
3014         rc = sptlrpc_parse_rule(param, &rule);
3015         if (rc)
3016                 RETURN(rc);
3017
3018         /* mgs rules implies must be mgc->mgs */
3019         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3020                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3021                      rule.sr_from != LUSTRE_SP_ANY) ||
3022                     (rule.sr_to != LUSTRE_SP_MGS &&
3023                      rule.sr_to != LUSTRE_SP_ANY))
3024                         RETURN(-EINVAL);
3025         }
3026
3027         /* preapre room for this coming rule. svcname format should be:
3028          * - fsname: general rule
3029          * - fsname-tgtname: target-specific rule
3030          */
3031         if (strchr(svname, '-')) {
3032                 struct mgs_tgt_srpc_conf *tgtconf;
3033                 int                       found = 0;
3034
3035                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3036                      tgtconf = tgtconf->mtsc_next) {
3037                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3038                                 found = 1;
3039                                 break;
3040                         }
3041                 }
3042
3043                 if (!found) {
3044                         int name_len;
3045
3046                         OBD_ALLOC_PTR(tgtconf);
3047                         if (tgtconf == NULL)
3048                                 RETURN(-ENOMEM);
3049
3050                         name_len = strlen(svname);
3051
3052                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3053                         if (tgtconf->mtsc_tgt == NULL) {
3054                                 OBD_FREE_PTR(tgtconf);
3055                                 RETURN(-ENOMEM);
3056                         }
3057                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3058
3059                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3060                         fsdb->fsdb_srpc_tgt = tgtconf;
3061                 }
3062
3063                 rset = &tgtconf->mtsc_rset;
3064         } else {
3065                 rset = &fsdb->fsdb_srpc_gen;
3066         }
3067
3068         rc = sptlrpc_rule_set_merge(rset, &rule);
3069
3070         RETURN(rc);
3071 }
3072
3073 static int mgs_srpc_set_param(const struct lu_env *env,
3074                               struct mgs_device *mgs,
3075                               struct fs_db *fsdb,
3076                               struct mgs_target_info *mti,
3077                               char *param)
3078 {
3079         char                   *copy;
3080         int                     rc, copy_size;
3081         ENTRY;
3082
3083 #ifndef HAVE_GSS
3084         RETURN(-EINVAL);
3085 #endif
3086         /* keep a copy of original param, which could be destroied
3087          * during parsing */
3088         copy_size = strlen(param) + 1;
3089         OBD_ALLOC(copy, copy_size);
3090         if (copy == NULL)
3091                 return -ENOMEM;
3092         memcpy(copy, param, copy_size);
3093
3094         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3095         if (rc)
3096                 goto out_free;
3097
3098         /* previous steps guaranteed the syntax is correct */
3099         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3100         if (rc)
3101                 goto out_free;
3102
3103         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3104                 /*
3105                  * for mgs rules, make them effective immediately.
3106                  */
3107                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3108                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3109                                                  &fsdb->fsdb_srpc_gen);
3110         }
3111
3112 out_free:
3113         OBD_FREE(copy, copy_size);
3114         RETURN(rc);
3115 }
3116
3117 struct mgs_srpc_read_data {
3118         struct fs_db   *msrd_fsdb;
3119         int             msrd_skip;
3120 };
3121
3122 static int mgs_srpc_read_handler(const struct lu_env *env,
3123                                  struct llog_handle *llh,
3124                                  struct llog_rec_hdr *rec, void *data)
3125 {
3126         struct mgs_srpc_read_data *msrd = data;
3127         struct cfg_marker         *marker;
3128         struct lustre_cfg         *lcfg = REC_DATA(rec);
3129         char                      *svname, *param;
3130         int                        cfg_len, rc;
3131         ENTRY;
3132
3133         if (rec->lrh_type != OBD_CFG_REC) {
3134                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3135                 RETURN(-EINVAL);
3136         }
3137
3138         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3139                   sizeof(struct llog_rec_tail);
3140
3141         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3142         if (rc) {
3143                 CERROR("Insane cfg\n");
3144                 RETURN(rc);
3145         }
3146
3147         if (lcfg->lcfg_command == LCFG_MARKER) {
3148                 marker = lustre_cfg_buf(lcfg, 1);
3149
3150                 if (marker->cm_flags & CM_START &&
3151                     marker->cm_flags & CM_SKIP)
3152                         msrd->msrd_skip = 1;
3153                 if (marker->cm_flags & CM_END)
3154                         msrd->msrd_skip = 0;
3155
3156                 RETURN(0);
3157         }
3158
3159         if (msrd->msrd_skip)
3160                 RETURN(0);
3161
3162         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3163                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3164                 RETURN(0);
3165         }
3166
3167         svname = lustre_cfg_string(lcfg, 0);
3168         if (svname == NULL) {
3169                 CERROR("svname is empty\n");
3170                 RETURN(0);
3171         }
3172
3173         param = lustre_cfg_string(lcfg, 1);
3174         if (param == NULL) {
3175                 CERROR("param is empty\n");
3176                 RETURN(0);
3177         }
3178
3179         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3180         if (rc)
3181                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3182
3183         RETURN(0);
3184 }
3185
3186 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3187                                 struct mgs_device *mgs,
3188                                 struct fs_db *fsdb)
3189 {
3190         struct llog_handle        *llh = NULL;
3191         struct llog_ctxt          *ctxt;
3192         char                      *logname;
3193         struct mgs_srpc_read_data  msrd;
3194         int                        rc;
3195         ENTRY;
3196
3197         /* construct log name */
3198         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3199         if (rc)
3200                 RETURN(rc);
3201
3202         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3203         LASSERT(ctxt != NULL);
3204
3205         if (mgs_log_is_empty(env, mgs, logname))
3206                 GOTO(out, rc = 0);
3207
3208         rc = llog_open(env, ctxt, &llh, NULL, logname,
3209                        LLOG_OPEN_EXISTS);
3210         if (rc < 0) {
3211                 if (rc == -ENOENT)
3212                         rc = 0;
3213                 GOTO(out, rc);
3214         }
3215
3216         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3217         if (rc)
3218                 GOTO(out_close, rc);
3219
3220         if (llog_get_size(llh) <= 1)
3221                 GOTO(out_close, rc = 0);
3222
3223         msrd.msrd_fsdb = fsdb;
3224         msrd.msrd_skip = 0;
3225
3226         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3227                           NULL);
3228
3229 out_close:
3230         llog_close(env, llh);
3231 out:
3232         llog_ctxt_put(ctxt);
3233         name_destroy(&logname);
3234
3235         if (rc)
3236                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3237         RETURN(rc);
3238 }
3239
3240 /* Permanent settings of all parameters by writing into the appropriate
3241  * configuration logs.
3242  * A parameter with null value ("<param>='\0'") means to erase it out of
3243  * the logs.
3244  */
3245 static int mgs_write_log_param(const struct lu_env *env,
3246                                struct mgs_device *mgs, struct fs_db *fsdb,
3247                                struct mgs_target_info *mti, char *ptr)
3248 {
3249         struct mgs_thread_info *mgi = mgs_env_info(env);
3250         char *logname;
3251         char *tmp;
3252         int rc = 0, rc2 = 0;
3253         ENTRY;
3254
3255         /* For various parameter settings, we have to figure out which logs
3256            care about them (e.g. both mdt and client for lov settings) */
3257         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3258
3259         /* The params are stored in MOUNT_DATA_FILE and modified via
3260            tunefs.lustre, or set using lctl conf_param */
3261
3262         /* Processed in lustre_start_mgc */
3263         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3264                 GOTO(end, rc);
3265
3266         /* Processed in ost/mdt */
3267         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3268                 GOTO(end, rc);
3269
3270         /* Processed in mgs_write_log_ost */
3271         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3272                 if (mti->mti_flags & LDD_F_PARAM) {
3273                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3274                                            "changed with tunefs.lustre"
3275                                            "and --writeconf\n", ptr);
3276                         rc = -EPERM;
3277                 }
3278                 GOTO(end, rc);
3279         }
3280
3281         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3282                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3283                 GOTO(end, rc);
3284         }
3285
3286         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3287                 /* Add a failover nidlist */
3288                 rc = 0;
3289                 /* We already processed failovers params for new
3290                    targets in mgs_write_log_target */
3291                 if (mti->mti_flags & LDD_F_PARAM) {
3292                         CDEBUG(D_MGS, "Adding failnode\n");
3293                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3294                 }
3295                 GOTO(end, rc);
3296         }
3297
3298         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3299                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3300                 GOTO(end, rc);
3301         }
3302
3303         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3304                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3305                 GOTO(end, rc);
3306         }
3307
3308         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3309                 /* active=0 means off, anything else means on */
3310                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3311                 int i;
3312
3313                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3314                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3315                                            "be (de)activated.\n",
3316                                            mti->mti_svname);
3317                         GOTO(end, rc = -EINVAL);
3318                 }
3319                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3320                               flag ? "de": "re", mti->mti_svname);
3321                 /* Modify clilov */
3322                 rc = name_create(&logname, mti->mti_fsname, "-client");
3323                 if (rc)
3324                         GOTO(end, rc);
3325                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3326                                 mti->mti_svname, "add osc", flag);
3327                 name_destroy(&logname);
3328                 if (rc)
3329                         goto active_err;
3330                 /* Modify mdtlov */
3331                 /* Add to all MDT logs for CMD */
3332                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3333                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3334                                 continue;
3335                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3336                         if (rc)
3337                                 GOTO(end, rc);
3338                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3339                                         mti->mti_svname, "add osc", flag);
3340                         name_destroy(&logname);
3341                         if (rc)
3342                                 goto active_err;
3343                 }
3344         active_err:
3345                 if (rc) {
3346                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3347                                            "log (%d). No permanent "
3348                                            "changes were made to the "
3349                                            "config log.\n",
3350                                            mti->mti_svname, rc);
3351                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3352                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3353                                                    " because the log"
3354                                                    "is in the old 1.4"
3355                                                    "style. Consider "
3356                                                    " --writeconf to "
3357                                                    "update the logs.\n");
3358                         GOTO(end, rc);
3359                 }
3360                 /* Fall through to osc proc for deactivating live OSC
3361                    on running MDT / clients. */
3362         }
3363         /* Below here, let obd's XXX_process_config methods handle it */
3364
3365         /* All lov. in proc */
3366         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3367                 char *mdtlovname;
3368
3369                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3370                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3371                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3372                                            "set on the MDT, not %s. "
3373                                            "Ignoring.\n",
3374                                            mti->mti_svname);
3375                         GOTO(end, rc = 0);
3376                 }
3377
3378                 /* Modify mdtlov */
3379                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3380                         GOTO(end, rc = -ENODEV);
3381
3382                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3383                                              mti->mti_stripe_index);
3384                 if (rc)
3385                         GOTO(end, rc);
3386                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3387                                   &mgi->mgi_bufs, mdtlovname, ptr);
3388                 name_destroy(&logname);
3389                 name_destroy(&mdtlovname);
3390                 if (rc)
3391                         GOTO(end, rc);
3392
3393                 /* Modify clilov */
3394                 rc = name_create(&logname, mti->mti_fsname, "-client");
3395                 if (rc)
3396                         GOTO(end, rc);
3397                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3398                                   fsdb->fsdb_clilov, ptr);
3399                 name_destroy(&logname);
3400                 GOTO(end, rc);
3401         }
3402
3403         /* All osc., mdc., llite. params in proc */
3404         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3405             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3406             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3407                 char *cname;
3408
3409                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3410                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3411                                            " cannot be modified. Consider"
3412                                            " updating the configuration with"
3413                                            " --writeconf\n",
3414                                            mti->mti_svname);
3415                         GOTO(end, rc = -EINVAL);
3416                 }
3417                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3418                         rc = name_create(&cname, mti->mti_fsname, "-client");
3419                         /* Add the client type to match the obdname in
3420                            class_config_llog_handler */
3421                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3422                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3423                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3424                         rc = name_create(&cname, mti->mti_svname, "-osc");
3425                 } else {
3426                         GOTO(end, rc = -EINVAL);
3427                 }
3428                 if (rc)
3429                         GOTO(end, rc);
3430
3431                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3432
3433                 /* Modify client */
3434                 rc = name_create(&logname, mti->mti_fsname, "-client");
3435                 if (rc) {
3436                         name_destroy(&cname);
3437                         GOTO(end, rc);
3438                 }
3439                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3440                                   cname, ptr);
3441
3442                 /* osc params affect the MDT as well */
3443                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3444                         int i;
3445
3446                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3447                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3448                                         continue;
3449                                 name_destroy(&cname);
3450                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3451                                                          fsdb, i);
3452                                 name_destroy(&logname);
3453                                 if (rc)
3454                                         break;
3455                                 rc = name_create_mdt(&logname,
3456                                                      mti->mti_fsname, i);
3457                                 if (rc)
3458                                         break;
3459                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3460                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3461                                                           mti, logname,
3462                                                           &mgi->mgi_bufs,
3463                                                           cname, ptr);
3464                                         if (rc)
3465                                                 break;
3466                                 }
3467                         }
3468                 }
3469                 name_destroy(&logname);
3470                 name_destroy(&cname);
3471                 GOTO(end, rc);
3472         }
3473
3474         /* All mdt. params in proc */
3475         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3476                 int i;
3477                 __u32 idx;
3478
3479                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3480                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3481                             MTI_NAME_MAXLEN) == 0)
3482                         /* device is unspecified completely? */
3483                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3484                 else
3485                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3486                 if (rc < 0)
3487                         goto active_err;
3488                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3489                         goto active_err;
3490                 if (rc & LDD_F_SV_ALL) {
3491                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3492                                 if (!test_bit(i,
3493                                                   fsdb->fsdb_mdt_index_map))
3494                                         continue;
3495                                 rc = name_create_mdt(&logname,
3496                                                 mti->mti_fsname, i);
3497                                 if (rc)
3498                                         goto active_err;
3499                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3500                                                   logname, &mgi->mgi_bufs,
3501                                                   logname, ptr);
3502                                 name_destroy(&logname);
3503                                 if (rc)
3504                                         goto active_err;
3505                         }
3506                 } else {
3507                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3508                                           mti->mti_svname, &mgi->mgi_bufs,
3509                                           mti->mti_svname, ptr);
3510                         if (rc)
3511                                 goto active_err;
3512                 }
3513                 GOTO(end, rc);
3514         }
3515
3516         /* All mdd., ost. and osd. params in proc */
3517         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3518             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3519             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3520                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3521                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3522                         GOTO(end, rc = -ENODEV);
3523
3524                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3525                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3526                 GOTO(end, rc);
3527         }
3528
3529         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3530         rc2 = -ENOSYS;
3531
3532 end:
3533         if (rc)
3534                 CERROR("err %d on param '%s'\n", rc, ptr);
3535
3536         RETURN(rc ?: rc2);
3537 }
3538
3539 /* Not implementing automatic failover nid addition at this time. */
3540 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3541                       struct mgs_target_info *mti)
3542 {
3543 #if 0
3544         struct fs_db *fsdb;
3545         int rc;
3546         ENTRY;
3547
3548         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3549         if (rc)
3550                 RETURN(rc);
3551
3552         if (mgs_log_is_empty(obd, mti->mti_svname))
3553                 /* should never happen */
3554                 RETURN(-ENOENT);
3555
3556         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3557
3558         /* FIXME We can just check mti->params to see if we're already in
3559            the failover list.  Modify mti->params for rewriting back at
3560            server_register_target(). */
3561
3562         mutex_lock(&fsdb->fsdb_mutex);
3563         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3564         mutex_unlock(&fsdb->fsdb_mutex);
3565
3566         RETURN(rc);
3567 #endif
3568         return 0;
3569 }
3570
3571 int mgs_write_log_target(const struct lu_env *env,
3572                          struct mgs_device *mgs,
3573                          struct mgs_target_info *mti,
3574                          struct fs_db *fsdb)
3575 {
3576         int rc = -EINVAL;
3577         char *buf, *params;
3578         ENTRY;
3579
3580         /* set/check the new target index */
3581         rc = mgs_set_index(env, mgs, mti);
3582         if (rc < 0) {
3583                 CERROR("Can't get index (%d)\n", rc);
3584                 RETURN(rc);
3585         }
3586
3587         if (rc == EALREADY) {
3588                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3589                               mti->mti_stripe_index, mti->mti_svname);
3590                 /* We would like to mark old log sections as invalid
3591                    and add new log sections in the client and mdt logs.
3592                    But if we add new sections, then live clients will
3593                    get repeat setup instructions for already running
3594                    osc's. So don't update the client/mdt logs. */
3595                 mti->mti_flags &= ~LDD_F_UPDATE;
3596                 rc = 0;
3597         }
3598
3599         mutex_lock(&fsdb->fsdb_mutex);
3600
3601         if (mti->mti_flags &
3602             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3603                 /* Generate a log from scratch */
3604                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3605                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3606                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3607                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3608                 } else {
3609                         CERROR("Unknown target type %#x, can't create log for "
3610                                "%s\n", mti->mti_flags, mti->mti_svname);
3611                 }
3612                 if (rc) {
3613                         CERROR("Can't write logs for %s (%d)\n",
3614                                mti->mti_svname, rc);
3615                         GOTO(out_up, rc);
3616                 }
3617         } else {
3618                 /* Just update the params from tunefs in mgs_write_log_params */
3619                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3620                 mti->mti_flags |= LDD_F_PARAM;
3621         }
3622
3623         /* allocate temporary buffer, where class_get_next_param will
3624            make copy of a current  parameter */
3625         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3626         if (buf == NULL)
3627                 GOTO(out_up, rc = -ENOMEM);
3628         params = mti->mti_params;
3629         while (params != NULL) {
3630                 rc = class_get_next_param(&params, buf);
3631                 if (rc) {
3632                         if (rc == 1)
3633                                 /* there is no next parameter, that is
3634                                    not an error */
3635                                 rc = 0;
3636                         break;
3637                 }
3638                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3639                        params, buf);
3640                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3641                 if (rc)
3642                         break;
3643         }
3644
3645         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3646
3647 out_up:
3648         mutex_unlock(&fsdb->fsdb_mutex);
3649         RETURN(rc);
3650 }
3651
3652 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3653 {
3654         struct llog_ctxt        *ctxt;
3655         int                      rc = 0;
3656
3657         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3658         if (ctxt == NULL) {
3659                 CERROR("%s: MGS config context doesn't exist\n",
3660                        mgs->mgs_obd->obd_name);
3661                 rc = -ENODEV;
3662         } else {
3663                 rc = llog_erase(env, ctxt, NULL, name);
3664                 /* llog may not exist */
3665                 if (rc == -ENOENT)
3666                         rc = 0;
3667                 llog_ctxt_put(ctxt);
3668         }
3669
3670         if (rc)
3671                 CERROR("%s: failed to clear log %s: %d\n",
3672                        mgs->mgs_obd->obd_name, name, rc);
3673
3674         return rc;
3675 }
3676
3677 /* erase all logs for the given fs */
3678 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3679 {
3680         struct fs_db *fsdb;
3681         cfs_list_t list;
3682         struct mgs_direntry *dirent, *n;
3683         int rc, len = strlen(fsname);
3684         char *suffix;
3685         ENTRY;
3686
3687         /* Find all the logs in the CONFIGS directory */
3688         rc = class_dentry_readdir(env, mgs, &list);
3689         if (rc)
3690                 RETURN(rc);
3691
3692         mutex_lock(&mgs->mgs_mutex);
3693
3694         /* Delete the fs db */
3695         fsdb = mgs_find_fsdb(mgs, fsname);
3696         if (fsdb)
3697                 mgs_free_fsdb(mgs, fsdb);
3698
3699         mutex_unlock(&mgs->mgs_mutex);
3700
3701         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3702                 cfs_list_del(&dirent->list);
3703                 suffix = strrchr(dirent->name, '-');
3704                 if (suffix != NULL) {
3705                         if ((len == suffix - dirent->name) &&
3706                             (strncmp(fsname, dirent->name, len) == 0)) {
3707                                 CDEBUG(D_MGS, "Removing log %s\n",
3708                                        dirent->name);
3709                                 mgs_erase_log(env, mgs, dirent->name);
3710                         }
3711                 }
3712                 mgs_direntry_free(dirent);
3713         }
3714
3715         RETURN(rc);
3716 }
3717
3718 /* list all logs for the given fs */
3719 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3720                   struct obd_ioctl_data *data)
3721 {
3722         cfs_list_t               list;
3723         struct mgs_direntry     *dirent, *n;
3724         char                    *out, *suffix;
3725         int                      l, remains, rc;
3726
3727         ENTRY;
3728
3729         /* Find all the logs in the CONFIGS directory */
3730         rc = class_dentry_readdir(env, mgs, &list);
3731         if (rc) {
3732                 CERROR("%s: can't read %s dir = %d\n",
3733                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
3734                 RETURN(rc);
3735         }
3736
3737         out = data->ioc_bulk;
3738         remains = data->ioc_inllen1;
3739         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3740                 cfs_list_del(&dirent->list);
3741                 suffix = strrchr(dirent->name, '-');
3742                 if (suffix != NULL) {
3743                         l = snprintf(out, remains, "config log: $%s\n",
3744                                      dirent->name);
3745                         out += l;
3746                         remains -= l;
3747                 }
3748                 mgs_direntry_free(dirent);
3749                 if (remains <= 0)
3750                         break;
3751         }
3752         RETURN(rc);
3753 }
3754
3755 /* from llog_swab */
3756 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3757 {
3758         int i;
3759         ENTRY;
3760
3761         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3762         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3763
3764         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3765         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3766         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3767         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3768
3769         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3770         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3771                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3772                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3773                                i, lcfg->lcfg_buflens[i],
3774                                lustre_cfg_string(lcfg, i));
3775                 }
3776         EXIT;
3777 }
3778
3779 /* Set a permanent (config log) param for a target or fs
3780  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3781  *             buf1 contains the single parameter
3782  */
3783 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3784                  struct lustre_cfg *lcfg, char *fsname)
3785 {
3786         struct fs_db *fsdb;
3787         struct mgs_target_info *mti;
3788         char *devname, *param;
3789         char *ptr;
3790         const char *tmp;
3791         __u32 index;
3792         int rc = 0;
3793         ENTRY;
3794
3795         print_lustre_cfg(lcfg);
3796
3797         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3798         devname = lustre_cfg_string(lcfg, 0);
3799         param = lustre_cfg_string(lcfg, 1);
3800         if (!devname) {
3801                 /* Assume device name embedded in param:
3802                    lustre-OST0000.osc.max_dirty_mb=32 */
3803                 ptr = strchr(param, '.');
3804                 if (ptr) {
3805                         devname = param;
3806                         *ptr = 0;
3807                         param = ptr + 1;
3808                 }
3809         }
3810         if (!devname) {
3811                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3812                 RETURN(-ENOSYS);
3813         }
3814
3815         rc = mgs_parse_devname(devname, fsname, NULL);
3816         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3817                 /* param related to llite isn't allowed to set by OST or MDT */
3818                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3819                                        sizeof(PARAM_LLITE) - 1) == 0)
3820                         RETURN(-EINVAL);
3821         } else {
3822                 /* assume devname is the fsname */
3823                 memset(fsname, 0, MTI_NAME_MAXLEN);
3824                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3825                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3826         }
3827         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3828
3829         rc = mgs_find_or_make_fsdb(env, mgs,
3830                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3831                                    PARAMS_FILENAME : fsname, &fsdb);
3832         if (rc)
3833                 RETURN(rc);
3834
3835         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3836             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3837             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3838                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3839                        "is '%s'\n", fsname, devname);
3840                 mgs_free_fsdb(mgs, fsdb);
3841                 RETURN(-EINVAL);
3842         }
3843
3844         /* Create a fake mti to hold everything */
3845         OBD_ALLOC_PTR(mti);
3846         if (!mti)
3847                 GOTO(out, rc = -ENOMEM);
3848         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3849             >= sizeof(mti->mti_fsname))
3850                 GOTO(out, rc = -E2BIG);
3851         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3852             >= sizeof(mti->mti_svname))
3853                 GOTO(out, rc = -E2BIG);
3854         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3855             >= sizeof(mti->mti_params))
3856                 GOTO(out, rc = -E2BIG);
3857         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3858         if (rc < 0)
3859                 /* Not a valid server; may be only fsname */
3860                 rc = 0;
3861         else
3862                 /* Strip -osc or -mdc suffix from svname */
3863                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3864                                      mti->mti_svname))
3865                         GOTO(out, rc = -EINVAL);
3866         /*
3867          * Revoke lock so everyone updates.  Should be alright if
3868          * someone was already reading while we were updating the logs,
3869          * so we don't really need to hold the lock while we're
3870          * writing (above).
3871          */
3872         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3873                 mti->mti_flags = rc | LDD_F_PARAM2;
3874                 mutex_lock(&fsdb->fsdb_mutex);
3875                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3876                 mutex_unlock(&fsdb->fsdb_mutex);
3877                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3878         } else {
3879                 mti->mti_flags = rc | LDD_F_PARAM;
3880                 mutex_lock(&fsdb->fsdb_mutex);
3881                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3882                 mutex_unlock(&fsdb->fsdb_mutex);
3883                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3884         }
3885
3886 out:
3887         OBD_FREE_PTR(mti);
3888         RETURN(rc);
3889 }
3890
3891 static int mgs_write_log_pool(const struct lu_env *env,
3892                               struct mgs_device *mgs, char *logname,
3893                               struct fs_db *fsdb, char *tgtname,
3894                               enum lcfg_command_type cmd,
3895                               char *fsname, char *poolname,
3896                               char *ostname, char *comment)
3897 {
3898         struct llog_handle *llh = NULL;
3899         int rc;
3900
3901         rc = record_start_log(env, mgs, &llh, logname);
3902         if (rc)
3903                 return rc;
3904         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3905         if (rc)
3906                 goto out;
3907         rc = record_base(env, llh, tgtname, 0, cmd,
3908                          fsname, poolname, ostname, 0);
3909         if (rc)
3910                 goto out;
3911         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3912 out:
3913         record_end_log(env, &llh);
3914         return rc;
3915 }
3916
3917 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
3918                     enum lcfg_command_type cmd, const char *nodemap_name,
3919                     const char *param)
3920 {
3921         lnet_nid_t      nid[2];
3922         __u32           idmap[2];
3923         bool            bool_switch;
3924         __u32           int_id;
3925         int             rc = 0;
3926         ENTRY;
3927
3928         switch (cmd) {
3929         case LCFG_NODEMAP_ADD:
3930                 rc = nodemap_add(nodemap_name);
3931                 break;
3932         case LCFG_NODEMAP_DEL:
3933                 rc = nodemap_del(nodemap_name);
3934                 break;
3935         case LCFG_NODEMAP_ADD_RANGE:
3936                 rc = nodemap_parse_range(param, nid);
3937                 if (rc != 0)
3938                         break;
3939                 rc = nodemap_add_range(nodemap_name, nid);
3940                 break;
3941         case LCFG_NODEMAP_DEL_RANGE:
3942                 rc = nodemap_parse_range(param, nid);
3943                 if (rc != 0)
3944                         break;
3945                 rc = nodemap_del_range(nodemap_name, nid);
3946                 break;
3947         case LCFG_NODEMAP_ADMIN:
3948                 bool_switch = simple_strtoul(param, NULL, 10);
3949                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
3950                 break;
3951         case LCFG_NODEMAP_TRUSTED:
3952                 bool_switch = simple_strtoul(param, NULL, 10);
3953                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
3954                 break;
3955         case LCFG_NODEMAP_SQUASH_UID:
3956                 int_id = simple_strtoul(param, NULL, 10);
3957                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
3958                 break;
3959         case LCFG_NODEMAP_SQUASH_GID:
3960                 int_id = simple_strtoul(param, NULL, 10);
3961                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
3962                 break;
3963         case LCFG_NODEMAP_ADD_UIDMAP:
3964         case LCFG_NODEMAP_ADD_GIDMAP:
3965                 rc = nodemap_parse_idmap(param, idmap);
3966                 if (rc != 0)
3967                         break;
3968                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
3969                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
3970                                                idmap);
3971                 else
3972                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
3973                                                idmap);
3974                 break;
3975         case LCFG_NODEMAP_DEL_UIDMAP:
3976         case LCFG_NODEMAP_DEL_GIDMAP:
3977                 rc = nodemap_parse_idmap(param, idmap);
3978                 if (rc != 0)
3979                         break;
3980                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
3981                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
3982                                                idmap);
3983                 else
3984                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
3985                                                idmap);
3986                 break;
3987         default:
3988                 rc = -EINVAL;
3989         }
3990
3991         RETURN(rc);
3992 }
3993
3994 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3995                  enum lcfg_command_type cmd, char *fsname,
3996                  char *poolname, char *ostname)
3997 {
3998         struct fs_db *fsdb;
3999         char *lovname;
4000         char *logname;
4001         char *label = NULL, *canceled_label = NULL;
4002         int label_sz;
4003         struct mgs_target_info *mti = NULL;
4004         int rc, i;
4005         ENTRY;
4006
4007         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4008         if (rc) {
4009                 CERROR("Can't get db for %s\n", fsname);
4010                 RETURN(rc);
4011         }
4012         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4013                 CERROR("%s is not defined\n", fsname);
4014                 mgs_free_fsdb(mgs, fsdb);
4015                 RETURN(-EINVAL);
4016         }
4017
4018         label_sz = 10 + strlen(fsname) + strlen(poolname);
4019
4020         /* check if ostname match fsname */
4021         if (ostname != NULL) {
4022                 char *ptr;
4023
4024                 ptr = strrchr(ostname, '-');
4025                 if ((ptr == NULL) ||
4026                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4027                         RETURN(-EINVAL);
4028                 label_sz += strlen(ostname);
4029         }
4030
4031         OBD_ALLOC(label, label_sz);
4032         if (label == NULL)
4033                 RETURN(-ENOMEM);
4034
4035         switch(cmd) {
4036         case LCFG_POOL_NEW:
4037                 sprintf(label,
4038                         "new %s.%s", fsname, poolname);
4039                 break;
4040         case LCFG_POOL_ADD:
4041                 sprintf(label,
4042                         "add %s.%s.%s", fsname, poolname, ostname);
4043                 break;
4044         case LCFG_POOL_REM:
4045                 OBD_ALLOC(canceled_label, label_sz);
4046                 if (canceled_label == NULL)
4047                         GOTO(out_label, rc = -ENOMEM);
4048                 sprintf(label,
4049                         "rem %s.%s.%s", fsname, poolname, ostname);
4050                 sprintf(canceled_label,
4051                         "add %s.%s.%s", fsname, poolname, ostname);
4052                 break;
4053         case LCFG_POOL_DEL:
4054                 OBD_ALLOC(canceled_label, label_sz);
4055                 if (canceled_label == NULL)
4056                         GOTO(out_label, rc = -ENOMEM);
4057                 sprintf(label,
4058                         "del %s.%s", fsname, poolname);
4059                 sprintf(canceled_label,
4060                         "new %s.%s", fsname, poolname);
4061                 break;
4062         default:
4063                 break;
4064         }
4065
4066         if (canceled_label != NULL) {
4067                 OBD_ALLOC_PTR(mti);
4068                 if (mti == NULL)
4069                         GOTO(out_cancel, rc = -ENOMEM);
4070         }
4071
4072         mutex_lock(&fsdb->fsdb_mutex);
4073         /* write pool def to all MDT logs */
4074         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4075                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4076                         rc = name_create_mdt_and_lov(&logname, &lovname,
4077                                                      fsdb, i);
4078                         if (rc) {
4079                                 mutex_unlock(&fsdb->fsdb_mutex);
4080                                 GOTO(out_mti, rc);
4081                         }
4082                         if (canceled_label != NULL) {
4083                                 strcpy(mti->mti_svname, "lov pool");
4084                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4085                                                 lovname, canceled_label,
4086                                                 CM_SKIP);
4087                         }
4088
4089                         if (rc >= 0)
4090                                 rc = mgs_write_log_pool(env, mgs, logname,
4091                                                         fsdb, lovname, cmd,
4092                                                         fsname, poolname,
4093                                                         ostname, label);
4094                         name_destroy(&logname);
4095                         name_destroy(&lovname);
4096                         if (rc) {
4097                                 mutex_unlock(&fsdb->fsdb_mutex);
4098                                 GOTO(out_mti, rc);
4099                         }
4100                 }
4101         }
4102
4103         rc = name_create(&logname, fsname, "-client");
4104         if (rc) {
4105                 mutex_unlock(&fsdb->fsdb_mutex);
4106                 GOTO(out_mti, rc);
4107         }
4108         if (canceled_label != NULL) {
4109                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4110                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4111                 if (rc < 0) {
4112                         mutex_unlock(&fsdb->fsdb_mutex);
4113                         name_destroy(&logname);
4114                         GOTO(out_mti, rc);
4115                 }
4116         }
4117
4118         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4119                                 cmd, fsname, poolname, ostname, label);
4120         mutex_unlock(&fsdb->fsdb_mutex);
4121         name_destroy(&logname);
4122         /* request for update */
4123         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4124
4125         EXIT;
4126 out_mti:
4127         if (mti != NULL)
4128                 OBD_FREE_PTR(mti);
4129 out_cancel:
4130         if (canceled_label != NULL)
4131                 OBD_FREE(canceled_label, label_sz);
4132 out_label:
4133         OBD_FREE(label, label_sz);
4134         return rc;
4135 }