Whamcloud - gitweb
LU-5396 lod: (and mdt, mgs) make some symbols static
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(NULL);
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(NULL);
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351         } else {
352                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
353                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
354                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
355                         CERROR("No memory for index maps\n");
356                         GOTO(err, rc = -ENOMEM);
357                 }
358
359                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
360                 if (rc)
361                         GOTO(err, rc);
362                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
363                 if (rc)
364                         GOTO(err, rc);
365
366                 /* initialise data for NID table */
367                 mgs_ir_init_fs(env, mgs, fsdb);
368
369                 lproc_mgs_add_live(mgs, fsdb);
370         }
371
372         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
373
374         RETURN(fsdb);
375 err:
376         if (fsdb->fsdb_ost_index_map)
377                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
378         if (fsdb->fsdb_mdt_index_map)
379                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
380         name_destroy(&fsdb->fsdb_clilov);
381         name_destroy(&fsdb->fsdb_clilmv);
382         OBD_FREE_PTR(fsdb);
383         RETURN(NULL);
384 }
385
386 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
387 {
388         /* wait for anyone with the sem */
389         mutex_lock(&fsdb->fsdb_mutex);
390         lproc_mgs_del_live(mgs, fsdb);
391         list_del(&fsdb->fsdb_list);
392
393         /* deinitialize fsr */
394         mgs_ir_fini_fs(mgs, fsdb);
395
396         if (fsdb->fsdb_ost_index_map)
397                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
398         if (fsdb->fsdb_mdt_index_map)
399                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
400         name_destroy(&fsdb->fsdb_clilov);
401         name_destroy(&fsdb->fsdb_clilmv);
402         mgs_free_fsdb_srpc(fsdb);
403         mutex_unlock(&fsdb->fsdb_mutex);
404         OBD_FREE_PTR(fsdb);
405 }
406
407 int mgs_init_fsdb_list(struct mgs_device *mgs)
408 {
409         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
410         return 0;
411 }
412
413 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
414 {
415         struct fs_db *fsdb;
416         struct list_head *tmp, *tmp2;
417
418         mutex_lock(&mgs->mgs_mutex);
419         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
420                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
421                 mgs_free_fsdb(mgs, fsdb);
422         }
423         mutex_unlock(&mgs->mgs_mutex);
424         return 0;
425 }
426
427 int mgs_find_or_make_fsdb(const struct lu_env *env,
428                           struct mgs_device *mgs, char *name,
429                           struct fs_db **dbh)
430 {
431         struct fs_db *fsdb;
432         int rc = 0;
433
434         ENTRY;
435         mutex_lock(&mgs->mgs_mutex);
436         fsdb = mgs_find_fsdb(mgs, name);
437         if (fsdb) {
438                 mutex_unlock(&mgs->mgs_mutex);
439                 *dbh = fsdb;
440                 RETURN(0);
441         }
442
443         CDEBUG(D_MGS, "Creating new db\n");
444         fsdb = mgs_new_fsdb(env, mgs, name);
445         /* lock fsdb_mutex until the db is loaded from llogs */
446         if (fsdb)
447                 mutex_lock(&fsdb->fsdb_mutex);
448         mutex_unlock(&mgs->mgs_mutex);
449         if (!fsdb)
450                 RETURN(-ENOMEM);
451
452         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
453                 /* populate the db from the client llog */
454                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
455                 if (rc) {
456                         CERROR("Can't get db from client log %d\n", rc);
457                         GOTO(out_free, rc);
458                 }
459         }
460
461         /* populate srpc rules from params llog */
462         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
463         if (rc) {
464                 CERROR("Can't get db from params log %d\n", rc);
465                 GOTO(out_free, rc);
466         }
467
468         mutex_unlock(&fsdb->fsdb_mutex);
469         *dbh = fsdb;
470
471         RETURN(0);
472
473 out_free:
474         mutex_unlock(&fsdb->fsdb_mutex);
475         mgs_free_fsdb(mgs, fsdb);
476         return rc;
477 }
478
479 /* 1 = index in use
480    0 = index unused
481    -1= empty client log */
482 int mgs_check_index(const struct lu_env *env,
483                     struct mgs_device *mgs,
484                     struct mgs_target_info *mti)
485 {
486         struct fs_db *fsdb;
487         void *imap;
488         int rc = 0;
489         ENTRY;
490
491         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
492
493         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
494         if (rc) {
495                 CERROR("Can't get db for %s\n", mti->mti_fsname);
496                 RETURN(rc);
497         }
498
499         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
500                 RETURN(-1);
501
502         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
503                 imap = fsdb->fsdb_ost_index_map;
504         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
505                 imap = fsdb->fsdb_mdt_index_map;
506         else
507                 RETURN(-EINVAL);
508
509         if (test_bit(mti->mti_stripe_index, imap))
510                 RETURN(1);
511         RETURN(0);
512 }
513
514 static __inline__ int next_index(void *index_map, int map_len)
515 {
516         int i;
517         for (i = 0; i < map_len * 8; i++)
518                 if (!test_bit(i, index_map)) {
519                          return i;
520                  }
521         CERROR("max index %d exceeded.\n", i);
522         return -1;
523 }
524
525 /* Return codes:
526         0  newly marked as in use
527         <0 err
528         +EALREADY for update of an old index */
529 static int mgs_set_index(const struct lu_env *env,
530                          struct mgs_device *mgs,
531                          struct mgs_target_info *mti)
532 {
533         struct fs_db *fsdb;
534         void *imap;
535         int rc = 0;
536         ENTRY;
537
538         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
539         if (rc) {
540                 CERROR("Can't get db for %s\n", mti->mti_fsname);
541                 RETURN(rc);
542         }
543
544         mutex_lock(&fsdb->fsdb_mutex);
545         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
546                 imap = fsdb->fsdb_ost_index_map;
547         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
548                 imap = fsdb->fsdb_mdt_index_map;
549         } else {
550                 GOTO(out_up, rc = -EINVAL);
551         }
552
553         if (mti->mti_flags & LDD_F_NEED_INDEX) {
554                 rc = next_index(imap, INDEX_MAP_SIZE);
555                 if (rc == -1)
556                         GOTO(out_up, rc = -ERANGE);
557                 mti->mti_stripe_index = rc;
558                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
559                         fsdb->fsdb_mdt_count ++;
560         }
561
562         /* the last index(0xffff) is reserved for default value. */
563         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
564                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
565                                    "but index must be less than %u.\n",
566                                    mti->mti_svname, mti->mti_stripe_index,
567                                    INDEX_MAP_SIZE * 8 - 1);
568                 GOTO(out_up, rc = -ERANGE);
569         }
570
571         if (test_bit(mti->mti_stripe_index, imap)) {
572                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
573                     !(mti->mti_flags & LDD_F_WRITECONF)) {
574                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
575                                            "%d, but that index is already in "
576                                            "use. Use --writeconf to force\n",
577                                            mti->mti_svname,
578                                            mti->mti_stripe_index);
579                         GOTO(out_up, rc = -EADDRINUSE);
580                 } else {
581                         CDEBUG(D_MGS, "Server %s updating index %d\n",
582                                mti->mti_svname, mti->mti_stripe_index);
583                         GOTO(out_up, rc = EALREADY);
584                 }
585         }
586
587         set_bit(mti->mti_stripe_index, imap);
588         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
589         mutex_unlock(&fsdb->fsdb_mutex);
590         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
591                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
592
593         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
594                mti->mti_stripe_index);
595
596         RETURN(0);
597 out_up:
598         mutex_unlock(&fsdb->fsdb_mutex);
599         return rc;
600 }
601
602 struct mgs_modify_lookup {
603         struct cfg_marker mml_marker;
604         int               mml_modified;
605 };
606
607 static int mgs_modify_handler(const struct lu_env *env,
608                               struct llog_handle *llh,
609                               struct llog_rec_hdr *rec, void *data)
610 {
611         struct mgs_modify_lookup *mml = data;
612         struct cfg_marker *marker;
613         struct lustre_cfg *lcfg = REC_DATA(rec);
614         int cfg_len = REC_DATA_LEN(rec);
615         int rc;
616         ENTRY;
617
618         if (rec->lrh_type != OBD_CFG_REC) {
619                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
620                 RETURN(-EINVAL);
621         }
622
623         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
624         if (rc) {
625                 CERROR("Insane cfg\n");
626                 RETURN(rc);
627         }
628
629         /* We only care about markers */
630         if (lcfg->lcfg_command != LCFG_MARKER)
631                 RETURN(0);
632
633         marker = lustre_cfg_buf(lcfg, 1);
634         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
635             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
636             !(marker->cm_flags & CM_SKIP)) {
637                 /* Found a non-skipped marker match */
638                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
639                        rec->lrh_index, marker->cm_step,
640                        marker->cm_flags, mml->mml_marker.cm_flags,
641                        marker->cm_tgtname, marker->cm_comment);
642                 /* Overwrite the old marker llog entry */
643                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
644                 marker->cm_flags |= mml->mml_marker.cm_flags;
645                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
646                 rc = llog_write(env, llh, rec, rec->lrh_index);
647                 if (!rc)
648                          mml->mml_modified++;
649         }
650
651         RETURN(rc);
652 }
653
654 /**
655  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
656  * Return code:
657  * 0 - modified successfully,
658  * 1 - no modification was done
659  * negative - error
660  */
661 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
662                       struct fs_db *fsdb, struct mgs_target_info *mti,
663                       char *logname, char *devname, char *comment, int flags)
664 {
665         struct llog_handle *loghandle;
666         struct llog_ctxt *ctxt;
667         struct mgs_modify_lookup *mml;
668         int rc;
669
670         ENTRY;
671
672         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
673         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
674                flags);
675
676         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
677         LASSERT(ctxt != NULL);
678         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
679         if (rc < 0) {
680                 if (rc == -ENOENT)
681                         rc = 0;
682                 GOTO(out_pop, rc);
683         }
684
685         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
686         if (rc)
687                 GOTO(out_close, rc);
688
689         if (llog_get_size(loghandle) <= 1)
690                 GOTO(out_close, rc = 0);
691
692         OBD_ALLOC_PTR(mml);
693         if (!mml)
694                 GOTO(out_close, rc = -ENOMEM);
695         if (strlcpy(mml->mml_marker.cm_comment, comment,
696                     sizeof(mml->mml_marker.cm_comment)) >=
697             sizeof(mml->mml_marker.cm_comment))
698                 GOTO(out_free, rc = -E2BIG);
699         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
700                     sizeof(mml->mml_marker.cm_tgtname)) >=
701             sizeof(mml->mml_marker.cm_tgtname))
702                 GOTO(out_free, rc = -E2BIG);
703         /* Modify mostly means cancel */
704         mml->mml_marker.cm_flags = flags;
705         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
706         mml->mml_modified = 0;
707         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
708                           NULL);
709         if (!rc && !mml->mml_modified)
710                 rc = 1;
711
712 out_free:
713         OBD_FREE_PTR(mml);
714
715 out_close:
716         llog_close(env, loghandle);
717 out_pop:
718         if (rc < 0)
719                 CERROR("%s: modify %s/%s failed: rc = %d\n",
720                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
721         llog_ctxt_put(ctxt);
722         RETURN(rc);
723 }
724
725 /** This structure is passed to mgs_replace_handler */
726 struct mgs_replace_uuid_lookup {
727         /* Nids are replaced for this target device */
728         struct mgs_target_info target;
729         /* Temporary modified llog */
730         struct llog_handle *temp_llh;
731         /* Flag is set if in target block*/
732         int in_target_device;
733         /* Nids already added. Just skip (multiple nids) */
734         int device_nids_added;
735         /* Flag is set if this block should not be copied */
736         int skip_it;
737 };
738
739 /**
740  * Check: a) if block should be skipped
741  * b) is it target block
742  *
743  * \param[in] lcfg
744  * \param[in] mrul
745  *
746  * \retval 0 should not to be skipped
747  * \retval 1 should to be skipped
748  */
749 static int check_markers(struct lustre_cfg *lcfg,
750                          struct mgs_replace_uuid_lookup *mrul)
751 {
752          struct cfg_marker *marker;
753
754         /* Track markers. Find given device */
755         if (lcfg->lcfg_command == LCFG_MARKER) {
756                 marker = lustre_cfg_buf(lcfg, 1);
757                 /* Clean llog from records marked as CM_EXCLUDE.
758                    CM_SKIP records are used for "active" command
759                    and can be restored if needed */
760                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
761                     (CM_EXCLUDE | CM_START)) {
762                         mrul->skip_it = 1;
763                         return 1;
764                 }
765
766                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
767                     (CM_EXCLUDE | CM_END)) {
768                         mrul->skip_it = 0;
769                         return 1;
770                 }
771
772                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
773                         LASSERT(!(marker->cm_flags & CM_START) ||
774                                 !(marker->cm_flags & CM_END));
775                         if (marker->cm_flags & CM_START) {
776                                 mrul->in_target_device = 1;
777                                 mrul->device_nids_added = 0;
778                         } else if (marker->cm_flags & CM_END)
779                                 mrul->in_target_device = 0;
780                 }
781         }
782
783         return 0;
784 }
785
786 static int record_base(const struct lu_env *env, struct llog_handle *llh,
787                      char *cfgname, lnet_nid_t nid, int cmd,
788                      char *s1, char *s2, char *s3, char *s4)
789 {
790         struct mgs_thread_info  *mgi = mgs_env_info(env);
791         struct llog_cfg_rec     *lcr;
792         int rc;
793
794         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
795                cmd, s1, s2, s3, s4);
796
797         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
798         if (s1)
799                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
800         if (s2)
801                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
802         if (s3)
803                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
804         if (s4)
805                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
806
807         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
808         if (lcr == NULL)
809                 return -ENOMEM;
810
811         lcr->lcr_cfg.lcfg_nid = nid;
812         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
813
814         lustre_cfg_rec_free(lcr);
815
816         if (rc < 0)
817                 CDEBUG(D_MGS,
818                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
819                        cfgname, cmd, s1, s2, s3, s4, rc);
820         return rc;
821 }
822
823 static inline int record_add_uuid(const struct lu_env *env,
824                                   struct llog_handle *llh,
825                                   uint64_t nid, char *uuid)
826 {
827         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
828 }
829
830 static inline int record_add_conn(const struct lu_env *env,
831                                   struct llog_handle *llh,
832                                   char *devname, char *uuid)
833 {
834         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
835 }
836
837 static inline int record_attach(const struct lu_env *env,
838                                 struct llog_handle *llh, char *devname,
839                                 char *type, char *uuid)
840 {
841         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
842 }
843
844 static inline int record_setup(const struct lu_env *env,
845                                struct llog_handle *llh, char *devname,
846                                char *s1, char *s2, char *s3, char *s4)
847 {
848         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
849 }
850
851 /**
852  * \retval <0 record processing error
853  * \retval n record is processed. No need copy original one.
854  * \retval 0 record is not processed.
855  */
856 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
857                            struct mgs_replace_uuid_lookup *mrul)
858 {
859         int nids_added = 0;
860         lnet_nid_t nid;
861         char *ptr;
862         int rc;
863
864         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
865                 /* LCFG_ADD_UUID command found. Let's skip original command
866                    and add passed nids */
867                 ptr = mrul->target.mti_params;
868                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
869                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
870                                "device %s\n", libcfs_nid2str(nid),
871                                 mrul->target.mti_params,
872                                 mrul->target.mti_svname);
873                         rc = record_add_uuid(env,
874                                              mrul->temp_llh, nid,
875                                              mrul->target.mti_params);
876                         if (!rc)
877                                 nids_added++;
878                 }
879
880                 if (nids_added == 0) {
881                         CERROR("No new nids were added, nid %s with uuid %s, "
882                                "device %s\n", libcfs_nid2str(nid),
883                                mrul->target.mti_params,
884                                mrul->target.mti_svname);
885                         RETURN(-ENXIO);
886                 } else {
887                         mrul->device_nids_added = 1;
888                 }
889
890                 return nids_added;
891         }
892
893         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
894                 /* LCFG_SETUP command found. UUID should be changed */
895                 rc = record_setup(env,
896                                   mrul->temp_llh,
897                                   /* devname the same */
898                                   lustre_cfg_string(lcfg, 0),
899                                   /* s1 is not changed */
900                                   lustre_cfg_string(lcfg, 1),
901                                   /* new uuid should be
902                                   the full nidlist */
903                                   mrul->target.mti_params,
904                                   /* s3 is not changed */
905                                   lustre_cfg_string(lcfg, 3),
906                                   /* s4 is not changed */
907                                   lustre_cfg_string(lcfg, 4));
908                 return rc ? rc : 1;
909         }
910
911         /* Another commands in target device block */
912         return 0;
913 }
914
915 /**
916  * Handler that called for every record in llog.
917  * Records are processed in order they placed in llog.
918  *
919  * \param[in] llh       log to be processed
920  * \param[in] rec       current record
921  * \param[in] data      mgs_replace_uuid_lookup structure
922  *
923  * \retval 0    success
924  */
925 static int mgs_replace_handler(const struct lu_env *env,
926                                struct llog_handle *llh,
927                                struct llog_rec_hdr *rec,
928                                void *data)
929 {
930         struct mgs_replace_uuid_lookup *mrul;
931         struct lustre_cfg *lcfg = REC_DATA(rec);
932         int cfg_len = REC_DATA_LEN(rec);
933         int rc;
934         ENTRY;
935
936         mrul = (struct mgs_replace_uuid_lookup *)data;
937
938         if (rec->lrh_type != OBD_CFG_REC) {
939                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
940                        rec->lrh_type, lcfg->lcfg_command,
941                        lustre_cfg_string(lcfg, 0),
942                        lustre_cfg_string(lcfg, 1));
943                 RETURN(-EINVAL);
944         }
945
946         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
947         if (rc) {
948                 /* Do not copy any invalidated records */
949                 GOTO(skip_out, rc = 0);
950         }
951
952         rc = check_markers(lcfg, mrul);
953         if (rc || mrul->skip_it)
954                 GOTO(skip_out, rc = 0);
955
956         /* Write to new log all commands outside target device block */
957         if (!mrul->in_target_device)
958                 GOTO(copy_out, rc = 0);
959
960         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
961            (failover nids) for this target, assuming that if then
962            primary is changing then so is the failover */
963         if (mrul->device_nids_added &&
964             (lcfg->lcfg_command == LCFG_ADD_UUID ||
965              lcfg->lcfg_command == LCFG_ADD_CONN))
966                 GOTO(skip_out, rc = 0);
967
968         rc = process_command(env, lcfg, mrul);
969         if (rc < 0)
970                 RETURN(rc);
971
972         if (rc)
973                 RETURN(0);
974 copy_out:
975         /* Record is placed in temporary llog as is */
976         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
977
978         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
979                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
980                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
981         RETURN(rc);
982
983 skip_out:
984         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
985                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
986                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
987         RETURN(rc);
988 }
989
990 static int mgs_log_is_empty(const struct lu_env *env,
991                             struct mgs_device *mgs, char *name)
992 {
993         struct llog_ctxt        *ctxt;
994         int                      rc;
995
996         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
997         LASSERT(ctxt != NULL);
998
999         rc = llog_is_empty(env, ctxt, name);
1000         llog_ctxt_put(ctxt);
1001         return rc;
1002 }
1003
1004 static int mgs_replace_nids_log(const struct lu_env *env,
1005                                 struct obd_device *mgs, struct fs_db *fsdb,
1006                                 char *logname, char *devname, char *nids)
1007 {
1008         struct llog_handle *orig_llh, *backup_llh;
1009         struct llog_ctxt *ctxt;
1010         struct mgs_replace_uuid_lookup *mrul;
1011         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1012         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1013         char *backup;
1014         int rc, rc2;
1015         ENTRY;
1016
1017         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1018
1019         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1020         LASSERT(ctxt != NULL);
1021
1022         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1023                 /* Log is empty. Nothing to replace */
1024                 GOTO(out_put, rc = 0);
1025         }
1026
1027         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1028         if (backup == NULL)
1029                 GOTO(out_put, rc = -ENOMEM);
1030
1031         sprintf(backup, "%s.bak", logname);
1032
1033         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1034         if (rc == 0) {
1035                 /* Now erase original log file. Connections are not allowed.
1036                    Backup is already saved */
1037                 rc = llog_erase(env, ctxt, NULL, logname);
1038                 if (rc < 0)
1039                         GOTO(out_free, rc);
1040         } else if (rc != -ENOENT) {
1041                 CERROR("%s: can't make backup for %s: rc = %d\n",
1042                        mgs->obd_name, logname, rc);
1043                 GOTO(out_free,rc);
1044         }
1045
1046         /* open local log */
1047         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1048         if (rc)
1049                 GOTO(out_restore, rc);
1050
1051         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1052         if (rc)
1053                 GOTO(out_closel, rc);
1054
1055         /* open backup llog */
1056         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1057                        LLOG_OPEN_EXISTS);
1058         if (rc)
1059                 GOTO(out_closel, rc);
1060
1061         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1062         if (rc)
1063                 GOTO(out_close, rc);
1064
1065         if (llog_get_size(backup_llh) <= 1)
1066                 GOTO(out_close, rc = 0);
1067
1068         OBD_ALLOC_PTR(mrul);
1069         if (!mrul)
1070                 GOTO(out_close, rc = -ENOMEM);
1071         /* devname is only needed information to replace UUID records */
1072         strlcpy(mrul->target.mti_svname, devname,
1073                 sizeof(mrul->target.mti_svname));
1074         /* parse nids later */
1075         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1076         /* Copy records to this temporary llog */
1077         mrul->temp_llh = orig_llh;
1078
1079         rc = llog_process(env, backup_llh, mgs_replace_handler,
1080                           (void *)mrul, NULL);
1081         OBD_FREE_PTR(mrul);
1082 out_close:
1083         rc2 = llog_close(NULL, backup_llh);
1084         if (!rc)
1085                 rc = rc2;
1086 out_closel:
1087         rc2 = llog_close(NULL, orig_llh);
1088         if (!rc)
1089                 rc = rc2;
1090
1091 out_restore:
1092         if (rc) {
1093                 CERROR("%s: llog should be restored: rc = %d\n",
1094                        mgs->obd_name, rc);
1095                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1096                                   logname);
1097                 if (rc2 < 0)
1098                         CERROR("%s: can't restore backup %s: rc = %d\n",
1099                                mgs->obd_name, logname, rc2);
1100         }
1101
1102 out_free:
1103         OBD_FREE(backup, strlen(backup) + 1);
1104
1105 out_put:
1106         llog_ctxt_put(ctxt);
1107
1108         if (rc)
1109                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1110                        mgs->obd_name, logname, rc);
1111
1112         RETURN(rc);
1113 }
1114
1115 /**
1116  * Parse device name and get file system name and/or device index
1117  *
1118  * \param[in]   devname device name (ex. lustre-MDT0000)
1119  * \param[out]  fsname  file system name(optional)
1120  * \param[out]  index   device index(optional)
1121  *
1122  * \retval 0    success
1123  */
1124 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1125 {
1126         int rc;
1127         ENTRY;
1128
1129         /* Extract fsname */
1130         if (fsname) {
1131                 rc = server_name2fsname(devname, fsname, NULL);
1132                 if (rc < 0) {
1133                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1134                                devname);
1135                         RETURN(-EINVAL);
1136                 }
1137         }
1138
1139         if (index) {
1140                 rc = server_name2index(devname, index, NULL);
1141                 if (rc < 0) {
1142                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1143                                devname);
1144                         RETURN(-EINVAL);
1145                 }
1146         }
1147
1148         RETURN(0);
1149 }
1150
1151 /* This is only called during replace_nids */
1152 static int only_mgs_is_running(struct obd_device *mgs_obd)
1153 {
1154         /* TDB: Is global variable with devices count exists? */
1155         int num_devices = get_devices_count();
1156         int num_exports = 0;
1157         struct obd_export *exp;
1158
1159         spin_lock(&mgs_obd->obd_dev_lock);
1160         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1161                 /* skip self export */
1162                 if (exp == mgs_obd->obd_self_export)
1163                         continue;
1164                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1165                         continue;
1166
1167                 ++num_exports;
1168
1169                 CERROR("%s: node %s still connected during replace_nids "
1170                        "connect_flags:%llx\n",
1171                        mgs_obd->obd_name,
1172                        libcfs_nid2str(exp->exp_nid_stats->nid),
1173                        exp_connect_flags(exp));
1174
1175         }
1176         spin_unlock(&mgs_obd->obd_dev_lock);
1177
1178         /* osd, MGS and MGC + self_export
1179            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1180         return (num_devices <= 3) && (num_exports == 0);
1181 }
1182
1183 static int name_create_mdt(char **logname, char *fsname, int i)
1184 {
1185         char mdt_index[9];
1186
1187         sprintf(mdt_index, "-MDT%04x", i);
1188         return name_create(logname, fsname, mdt_index);
1189 }
1190
1191 /**
1192  * Replace nids for \a device to \a nids values
1193  *
1194  * \param obd           MGS obd device
1195  * \param devname       nids need to be replaced for this device
1196  * (ex. lustre-OST0000)
1197  * \param nids          nids list (ex. nid1,nid2,nid3)
1198  *
1199  * \retval 0    success
1200  */
1201 int mgs_replace_nids(const struct lu_env *env,
1202                      struct mgs_device *mgs,
1203                      char *devname, char *nids)
1204 {
1205         /* Assume fsname is part of device name */
1206         char fsname[MTI_NAME_MAXLEN];
1207         int rc;
1208         __u32 index;
1209         char *logname;
1210         struct fs_db *fsdb;
1211         unsigned int i;
1212         int conn_state;
1213         struct obd_device *mgs_obd = mgs->mgs_obd;
1214         ENTRY;
1215
1216         /* We can only change NIDs if no other nodes are connected */
1217         spin_lock(&mgs_obd->obd_dev_lock);
1218         conn_state = mgs_obd->obd_no_conn;
1219         mgs_obd->obd_no_conn = 1;
1220         spin_unlock(&mgs_obd->obd_dev_lock);
1221
1222         /* We can not change nids if not only MGS is started */
1223         if (!only_mgs_is_running(mgs_obd)) {
1224                 CERROR("Only MGS is allowed to be started\n");
1225                 GOTO(out, rc = -EINPROGRESS);
1226         }
1227
1228         /* Get fsname and index*/
1229         rc = mgs_parse_devname(devname, fsname, &index);
1230         if (rc)
1231                 GOTO(out, rc);
1232
1233         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1234         if (rc) {
1235                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1236                 GOTO(out, rc);
1237         }
1238
1239         /* Process client llogs */
1240         name_create(&logname, fsname, "-client");
1241         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1242         name_destroy(&logname);
1243         if (rc) {
1244                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1245                        fsname, devname, rc);
1246                 GOTO(out, rc);
1247         }
1248
1249         /* Process MDT llogs */
1250         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1251                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1252                         continue;
1253                 name_create_mdt(&logname, fsname, i);
1254                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1255                 name_destroy(&logname);
1256                 if (rc)
1257                         GOTO(out, rc);
1258         }
1259
1260 out:
1261         spin_lock(&mgs_obd->obd_dev_lock);
1262         mgs_obd->obd_no_conn = conn_state;
1263         spin_unlock(&mgs_obd->obd_dev_lock);
1264
1265         RETURN(rc);
1266 }
1267
1268 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1269                             char *devname, struct lov_desc *desc)
1270 {
1271         struct mgs_thread_info  *mgi = mgs_env_info(env);
1272         struct llog_cfg_rec     *lcr;
1273         int rc;
1274
1275         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1276         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1277         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1278         if (lcr == NULL)
1279                 return -ENOMEM;
1280
1281         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1282         lustre_cfg_rec_free(lcr);
1283         return rc;
1284 }
1285
1286 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1287                             char *devname, struct lmv_desc *desc)
1288 {
1289         struct mgs_thread_info  *mgi = mgs_env_info(env);
1290         struct llog_cfg_rec     *lcr;
1291         int rc;
1292
1293         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1294         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1295         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1296         if (lcr == NULL)
1297                 return -ENOMEM;
1298
1299         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1300         lustre_cfg_rec_free(lcr);
1301         return rc;
1302 }
1303
1304 static inline int record_mdc_add(const struct lu_env *env,
1305                                  struct llog_handle *llh,
1306                                  char *logname, char *mdcuuid,
1307                                  char *mdtuuid, char *index,
1308                                  char *gen)
1309 {
1310         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1311                            mdtuuid,index,gen,mdcuuid);
1312 }
1313
1314 static inline int record_lov_add(const struct lu_env *env,
1315                                  struct llog_handle *llh,
1316                                  char *lov_name, char *ost_uuid,
1317                                  char *index, char *gen)
1318 {
1319         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1320                            ost_uuid, index, gen, 0);
1321 }
1322
1323 static inline int record_mount_opt(const struct lu_env *env,
1324                                    struct llog_handle *llh,
1325                                    char *profile, char *lov_name,
1326                                    char *mdc_name)
1327 {
1328         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1329                            profile,lov_name,mdc_name,0);
1330 }
1331
1332 static int record_marker(const struct lu_env *env,
1333                          struct llog_handle *llh,
1334                          struct fs_db *fsdb, __u32 flags,
1335                          char *tgtname, char *comment)
1336 {
1337         struct mgs_thread_info *mgi = mgs_env_info(env);
1338         struct llog_cfg_rec *lcr;
1339         int rc;
1340         int cplen = 0;
1341
1342         if (flags & CM_START)
1343                 fsdb->fsdb_gen++;
1344         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1345         mgi->mgi_marker.cm_flags = flags;
1346         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1347         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1348                         sizeof(mgi->mgi_marker.cm_tgtname));
1349         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1350                 return -E2BIG;
1351         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1352                         sizeof(mgi->mgi_marker.cm_comment));
1353         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1354                 return -E2BIG;
1355         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1356         mgi->mgi_marker.cm_canceltime = 0;
1357         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1358         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1359                             sizeof(mgi->mgi_marker));
1360         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1361         if (lcr == NULL)
1362                 return -ENOMEM;
1363
1364         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1365         lustre_cfg_rec_free(lcr);
1366         return rc;
1367 }
1368
1369 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1370                             struct llog_handle **llh, char *name)
1371 {
1372         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1373         struct llog_ctxt        *ctxt;
1374         int                      rc = 0;
1375         ENTRY;
1376
1377         if (*llh)
1378                 GOTO(out, rc = -EBUSY);
1379
1380         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1381         if (!ctxt)
1382                 GOTO(out, rc = -ENODEV);
1383         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1384
1385         rc = llog_open_create(env, ctxt, llh, NULL, name);
1386         if (rc)
1387                 GOTO(out_ctxt, rc);
1388         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1389         if (rc)
1390                 llog_close(env, *llh);
1391 out_ctxt:
1392         llog_ctxt_put(ctxt);
1393 out:
1394         if (rc) {
1395                 CERROR("%s: can't start log %s: rc = %d\n",
1396                        mgs->mgs_obd->obd_name, name, rc);
1397                 *llh = NULL;
1398         }
1399         RETURN(rc);
1400 }
1401
1402 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1403 {
1404         int rc;
1405
1406         rc = llog_close(env, *llh);
1407         *llh = NULL;
1408
1409         return rc;
1410 }
1411
1412 /******************** config "macros" *********************/
1413
1414 /* write an lcfg directly into a log (with markers) */
1415 static int mgs_write_log_direct(const struct lu_env *env,
1416                                 struct mgs_device *mgs, struct fs_db *fsdb,
1417                                 char *logname, struct llog_cfg_rec *lcr,
1418                                 char *devname, char *comment)
1419 {
1420         struct llog_handle *llh = NULL;
1421         int rc;
1422
1423         ENTRY;
1424
1425         rc = record_start_log(env, mgs, &llh, logname);
1426         if (rc)
1427                 RETURN(rc);
1428
1429         /* FIXME These should be a single journal transaction */
1430         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1431         if (rc)
1432                 GOTO(out_end, rc);
1433         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1434         if (rc)
1435                 GOTO(out_end, rc);
1436         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1437         if (rc)
1438                 GOTO(out_end, rc);
1439 out_end:
1440         record_end_log(env, &llh);
1441         RETURN(rc);
1442 }
1443
1444 /* write the lcfg in all logs for the given fs */
1445 static int mgs_write_log_direct_all(const struct lu_env *env,
1446                                     struct mgs_device *mgs,
1447                                     struct fs_db *fsdb,
1448                                     struct mgs_target_info *mti,
1449                                     struct llog_cfg_rec *lcr, char *devname,
1450                                     char *comment, int server_only)
1451 {
1452         struct list_head         log_list;
1453         struct mgs_direntry     *dirent, *n;
1454         char                    *fsname = mti->mti_fsname;
1455         int                      rc = 0, len = strlen(fsname);
1456
1457         ENTRY;
1458         /* Find all the logs in the CONFIGS directory */
1459         rc = class_dentry_readdir(env, mgs, &log_list);
1460         if (rc)
1461                 RETURN(rc);
1462
1463         /* Could use fsdb index maps instead of directory listing */
1464         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1465                 list_del_init(&dirent->mde_list);
1466                 /* don't write to sptlrpc rule log */
1467                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1468                         goto next;
1469
1470                 /* caller wants write server logs only */
1471                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1472                         goto next;
1473
1474                 if (strlen(dirent->mde_name) <= len ||
1475                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1476                     dirent->mde_name[len] != '-')
1477                         goto next;
1478
1479                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1480                 /* Erase any old settings of this same parameter */
1481                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1482                                 devname, comment, CM_SKIP);
1483                 if (rc < 0)
1484                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1485                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1486                 if (lcr == NULL)
1487                         goto next;
1488                 /* Write the new one */
1489                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1490                                           lcr, devname, comment);
1491                 if (rc != 0)
1492                         CERROR("%s: writing log %s: rc = %d\n",
1493                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1494 next:
1495                 mgs_direntry_free(dirent);
1496         }
1497
1498         RETURN(rc);
1499 }
1500
1501 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1502                                     struct mgs_device *mgs,
1503                                     struct fs_db *fsdb,
1504                                     struct mgs_target_info *mti,
1505                                     int index, char *logname);
1506 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1507                                     struct mgs_device *mgs,
1508                                     struct fs_db *fsdb,
1509                                     struct mgs_target_info *mti,
1510                                     char *logname, char *suffix, char *lovname,
1511                                     enum lustre_sec_part sec_part, int flags);
1512 static int name_create_mdt_and_lov(char **logname, char **lovname,
1513                                    struct fs_db *fsdb, int i);
1514
1515 static int add_param(char *params, char *key, char *val)
1516 {
1517         char *start = params + strlen(params);
1518         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1519         int keylen = 0;
1520
1521         if (key != NULL)
1522                 keylen = strlen(key);
1523         if (start + 1 + keylen + strlen(val) >= end) {
1524                 CERROR("params are too long: %s %s%s\n",
1525                        params, key != NULL ? key : "", val);
1526                 return -EINVAL;
1527         }
1528
1529         sprintf(start, " %s%s", key != NULL ? key : "", val);
1530         return 0;
1531 }
1532
1533 /**
1534  * Walk through client config log record and convert the related records
1535  * into the target.
1536  **/
1537 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1538                                          struct llog_handle *llh,
1539                                          struct llog_rec_hdr *rec, void *data)
1540 {
1541         struct mgs_device *mgs;
1542         struct obd_device *obd;
1543         struct mgs_target_info *mti, *tmti;
1544         struct fs_db *fsdb;
1545         int cfg_len = rec->lrh_len;
1546         char *cfg_buf = (char*) (rec + 1);
1547         struct lustre_cfg *lcfg;
1548         int rc = 0;
1549         struct llog_handle *mdt_llh = NULL;
1550         static int got_an_osc_or_mdc = 0;
1551         /* 0: not found any osc/mdc;
1552            1: found osc;
1553            2: found mdc;
1554         */
1555         static int last_step = -1;
1556         int cplen = 0;
1557
1558         ENTRY;
1559
1560         mti = ((struct temp_comp*)data)->comp_mti;
1561         tmti = ((struct temp_comp*)data)->comp_tmti;
1562         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1563         obd = ((struct temp_comp *)data)->comp_obd;
1564         mgs = lu2mgs_dev(obd->obd_lu_dev);
1565         LASSERT(mgs);
1566
1567         if (rec->lrh_type != OBD_CFG_REC) {
1568                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1569                 RETURN(-EINVAL);
1570         }
1571
1572         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1573         if (rc) {
1574                 CERROR("Insane cfg\n");
1575                 RETURN(rc);
1576         }
1577
1578         lcfg = (struct lustre_cfg *)cfg_buf;
1579
1580         if (lcfg->lcfg_command == LCFG_MARKER) {
1581                 struct cfg_marker *marker;
1582                 marker = lustre_cfg_buf(lcfg, 1);
1583                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1584                     (marker->cm_flags & CM_START) &&
1585                      !(marker->cm_flags & CM_SKIP)) {
1586                         got_an_osc_or_mdc = 1;
1587                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1588                                         sizeof(tmti->mti_svname));
1589                         if (cplen >= sizeof(tmti->mti_svname))
1590                                 RETURN(-E2BIG);
1591                         rc = record_start_log(env, mgs, &mdt_llh,
1592                                               mti->mti_svname);
1593                         if (rc)
1594                                 RETURN(rc);
1595                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1596                                            mti->mti_svname, "add osc(copied)");
1597                         record_end_log(env, &mdt_llh);
1598                         last_step = marker->cm_step;
1599                         RETURN(rc);
1600                 }
1601                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1602                     (marker->cm_flags & CM_END) &&
1603                      !(marker->cm_flags & CM_SKIP)) {
1604                         LASSERT(last_step == marker->cm_step);
1605                         last_step = -1;
1606                         got_an_osc_or_mdc = 0;
1607                         memset(tmti, 0, sizeof(*tmti));
1608                         rc = record_start_log(env, mgs, &mdt_llh,
1609                                               mti->mti_svname);
1610                         if (rc)
1611                                 RETURN(rc);
1612                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1613                                            mti->mti_svname, "add osc(copied)");
1614                         record_end_log(env, &mdt_llh);
1615                         RETURN(rc);
1616                 }
1617                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1618                     (marker->cm_flags & CM_START) &&
1619                      !(marker->cm_flags & CM_SKIP)) {
1620                         got_an_osc_or_mdc = 2;
1621                         last_step = marker->cm_step;
1622                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1623                                strlen(marker->cm_tgtname));
1624
1625                         RETURN(rc);
1626                 }
1627                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1628                     (marker->cm_flags & CM_END) &&
1629                      !(marker->cm_flags & CM_SKIP)) {
1630                         LASSERT(last_step == marker->cm_step);
1631                         last_step = -1;
1632                         got_an_osc_or_mdc = 0;
1633                         memset(tmti, 0, sizeof(*tmti));
1634                         RETURN(rc);
1635                 }
1636         }
1637
1638         if (got_an_osc_or_mdc == 0 || last_step < 0)
1639                 RETURN(rc);
1640
1641         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1642                 uint64_t nodenid = lcfg->lcfg_nid;
1643
1644                 if (strlen(tmti->mti_uuid) == 0) {
1645                         /* target uuid not set, this config record is before
1646                          * LCFG_SETUP, this nid is one of target node nid.
1647                          */
1648                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1649                         tmti->mti_nid_count++;
1650                 } else {
1651                         /* failover node nid */
1652                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1653                                        libcfs_nid2str(nodenid));
1654                 }
1655
1656                 RETURN(rc);
1657         }
1658
1659         if (lcfg->lcfg_command == LCFG_SETUP) {
1660                 char *target;
1661
1662                 target = lustre_cfg_string(lcfg, 1);
1663                 memcpy(tmti->mti_uuid, target, strlen(target));
1664                 RETURN(rc);
1665         }
1666
1667         /* ignore client side sptlrpc_conf_log */
1668         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1669                 RETURN(rc);
1670
1671         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1672                 int index;
1673
1674                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1675                         RETURN (-EINVAL);
1676
1677                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1678                        strlen(mti->mti_fsname));
1679                 tmti->mti_stripe_index = index;
1680
1681                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1682                                               mti->mti_stripe_index,
1683                                               mti->mti_svname);
1684                 memset(tmti, 0, sizeof(*tmti));
1685                 RETURN(rc);
1686         }
1687
1688         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1689                 int index;
1690                 char mdt_index[9];
1691                 char *logname, *lovname;
1692
1693                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1694                                              mti->mti_stripe_index);
1695                 if (rc)
1696                         RETURN(rc);
1697                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1698
1699                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1700                         name_destroy(&logname);
1701                         name_destroy(&lovname);
1702                         RETURN(-EINVAL);
1703                 }
1704
1705                 tmti->mti_stripe_index = index;
1706                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1707                                          mdt_index, lovname,
1708                                          LUSTRE_SP_MDT, 0);
1709                 name_destroy(&logname);
1710                 name_destroy(&lovname);
1711                 RETURN(rc);
1712         }
1713         RETURN(rc);
1714 }
1715
1716 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1717 /* stealed from mgs_get_fsdb_from_llog*/
1718 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1719                                               struct mgs_device *mgs,
1720                                               char *client_name,
1721                                               struct temp_comp* comp)
1722 {
1723         struct llog_handle *loghandle;
1724         struct mgs_target_info *tmti;
1725         struct llog_ctxt *ctxt;
1726         int rc;
1727
1728         ENTRY;
1729
1730         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1731         LASSERT(ctxt != NULL);
1732
1733         OBD_ALLOC_PTR(tmti);
1734         if (tmti == NULL)
1735                 GOTO(out_ctxt, rc = -ENOMEM);
1736
1737         comp->comp_tmti = tmti;
1738         comp->comp_obd = mgs->mgs_obd;
1739
1740         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1741                        LLOG_OPEN_EXISTS);
1742         if (rc < 0) {
1743                 if (rc == -ENOENT)
1744                         rc = 0;
1745                 GOTO(out_pop, rc);
1746         }
1747
1748         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1749         if (rc)
1750                 GOTO(out_close, rc);
1751
1752         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1753                                   (void *)comp, NULL, false);
1754         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1755 out_close:
1756         llog_close(env, loghandle);
1757 out_pop:
1758         OBD_FREE_PTR(tmti);
1759 out_ctxt:
1760         llog_ctxt_put(ctxt);
1761         RETURN(rc);
1762 }
1763
1764 /* lmv is the second thing for client logs */
1765 /* copied from mgs_write_log_lov. Please refer to that.  */
1766 static int mgs_write_log_lmv(const struct lu_env *env,
1767                              struct mgs_device *mgs,
1768                              struct fs_db *fsdb,
1769                              struct mgs_target_info *mti,
1770                              char *logname, char *lmvname)
1771 {
1772         struct llog_handle *llh = NULL;
1773         struct lmv_desc *lmvdesc;
1774         char *uuid;
1775         int rc = 0;
1776         ENTRY;
1777
1778         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1779
1780         OBD_ALLOC_PTR(lmvdesc);
1781         if (lmvdesc == NULL)
1782                 RETURN(-ENOMEM);
1783         lmvdesc->ld_active_tgt_count = 0;
1784         lmvdesc->ld_tgt_count = 0;
1785         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1786         uuid = (char *)lmvdesc->ld_uuid.uuid;
1787
1788         rc = record_start_log(env, mgs, &llh, logname);
1789         if (rc)
1790                 GOTO(out_free, rc);
1791         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1792         if (rc)
1793                 GOTO(out_end, rc);
1794         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1795         if (rc)
1796                 GOTO(out_end, rc);
1797         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1798         if (rc)
1799                 GOTO(out_end, rc);
1800         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1801         if (rc)
1802                 GOTO(out_end, rc);
1803 out_end:
1804         record_end_log(env, &llh);
1805 out_free:
1806         OBD_FREE_PTR(lmvdesc);
1807         RETURN(rc);
1808 }
1809
1810 /* lov is the first thing in the mdt and client logs */
1811 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1812                              struct fs_db *fsdb, struct mgs_target_info *mti,
1813                              char *logname, char *lovname)
1814 {
1815         struct llog_handle *llh = NULL;
1816         struct lov_desc *lovdesc;
1817         char *uuid;
1818         int rc = 0;
1819         ENTRY;
1820
1821         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1822
1823         /*
1824         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1825         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1826               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1827         */
1828
1829         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1830         OBD_ALLOC_PTR(lovdesc);
1831         if (lovdesc == NULL)
1832                 RETURN(-ENOMEM);
1833         lovdesc->ld_magic = LOV_DESC_MAGIC;
1834         lovdesc->ld_tgt_count = 0;
1835         /* Defaults.  Can be changed later by lcfg config_param */
1836         lovdesc->ld_default_stripe_count = 1;
1837         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1838         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1839         lovdesc->ld_default_stripe_offset = -1;
1840         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1841         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1842         /* can these be the same? */
1843         uuid = (char *)lovdesc->ld_uuid.uuid;
1844
1845         /* This should always be the first entry in a log.
1846         rc = mgs_clear_log(obd, logname); */
1847         rc = record_start_log(env, mgs, &llh, logname);
1848         if (rc)
1849                 GOTO(out_free, rc);
1850         /* FIXME these should be a single journal transaction */
1851         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1852         if (rc)
1853                 GOTO(out_end, rc);
1854         rc = record_attach(env, llh, lovname, "lov", uuid);
1855         if (rc)
1856                 GOTO(out_end, rc);
1857         rc = record_lov_setup(env, llh, lovname, lovdesc);
1858         if (rc)
1859                 GOTO(out_end, rc);
1860         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1861         if (rc)
1862                 GOTO(out_end, rc);
1863         EXIT;
1864 out_end:
1865         record_end_log(env, &llh);
1866 out_free:
1867         OBD_FREE_PTR(lovdesc);
1868         return rc;
1869 }
1870
1871 /* add failnids to open log */
1872 static int mgs_write_log_failnids(const struct lu_env *env,
1873                                   struct mgs_target_info *mti,
1874                                   struct llog_handle *llh,
1875                                   char *cliname)
1876 {
1877         char *failnodeuuid = NULL;
1878         char *ptr = mti->mti_params;
1879         lnet_nid_t nid;
1880         int rc = 0;
1881
1882         /*
1883         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1884         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1885         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1886         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1887         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1888         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1889         */
1890
1891         /*
1892          * Pull failnid info out of params string, which may contain something
1893          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1894          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1895          * etc.  However, convert_hostnames() should have caught those.
1896          */
1897         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1898                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1899                         if (failnodeuuid == NULL) {
1900                                 /* We don't know the failover node name,
1901                                    so just use the first nid as the uuid */
1902                                 rc = name_create(&failnodeuuid,
1903                                                  libcfs_nid2str(nid), "");
1904                                 if (rc)
1905                                         return rc;
1906                         }
1907                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1908                                "client %s\n", libcfs_nid2str(nid),
1909                                failnodeuuid, cliname);
1910                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1911                         /*
1912                          * If *ptr is ':', we have added all NIDs for
1913                          * failnodeuuid.
1914                          */
1915                         if (*ptr == ':') {
1916                                 rc = record_add_conn(env, llh, cliname,
1917                                                      failnodeuuid);
1918                                 name_destroy(&failnodeuuid);
1919                                 failnodeuuid = NULL;
1920                         }
1921                 }
1922                 if (failnodeuuid) {
1923                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1924                         name_destroy(&failnodeuuid);
1925                         failnodeuuid = NULL;
1926                 }
1927         }
1928
1929         return rc;
1930 }
1931
1932 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1933                                     struct mgs_device *mgs,
1934                                     struct fs_db *fsdb,
1935                                     struct mgs_target_info *mti,
1936                                     char *logname, char *lmvname)
1937 {
1938         struct llog_handle *llh = NULL;
1939         char *mdcname = NULL;
1940         char *nodeuuid = NULL;
1941         char *mdcuuid = NULL;
1942         char *lmvuuid = NULL;
1943         char index[6];
1944         int i, rc;
1945         ENTRY;
1946
1947         if (mgs_log_is_empty(env, mgs, logname)) {
1948                 CERROR("log is empty! Logical error\n");
1949                 RETURN(-EINVAL);
1950         }
1951
1952         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1953                mti->mti_svname, logname, lmvname);
1954
1955         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1956         if (rc)
1957                 RETURN(rc);
1958         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1959         if (rc)
1960                 GOTO(out_free, rc);
1961         rc = name_create(&mdcuuid, mdcname, "_UUID");
1962         if (rc)
1963                 GOTO(out_free, rc);
1964         rc = name_create(&lmvuuid, lmvname, "_UUID");
1965         if (rc)
1966                 GOTO(out_free, rc);
1967
1968         rc = record_start_log(env, mgs, &llh, logname);
1969         if (rc)
1970                 GOTO(out_free, rc);
1971         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1972                            "add mdc");
1973         if (rc)
1974                 GOTO(out_end, rc);
1975         for (i = 0; i < mti->mti_nid_count; i++) {
1976                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1977                        libcfs_nid2str(mti->mti_nids[i]));
1978
1979                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1980                 if (rc)
1981                         GOTO(out_end, rc);
1982         }
1983
1984         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1985         if (rc)
1986                 GOTO(out_end, rc);
1987         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1988         if (rc)
1989                 GOTO(out_end, rc);
1990         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1991         if (rc)
1992                 GOTO(out_end, rc);
1993         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1994         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1995                             index, "1");
1996         if (rc)
1997                 GOTO(out_end, rc);
1998         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1999                            "add mdc");
2000         if (rc)
2001                 GOTO(out_end, rc);
2002 out_end:
2003         record_end_log(env, &llh);
2004 out_free:
2005         name_destroy(&lmvuuid);
2006         name_destroy(&mdcuuid);
2007         name_destroy(&mdcname);
2008         name_destroy(&nodeuuid);
2009         RETURN(rc);
2010 }
2011
2012 static inline int name_create_lov(char **lovname, char *mdtname,
2013                                   struct fs_db *fsdb, int index)
2014 {
2015         /* COMPAT_180 */
2016         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2017                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2018         else
2019                 return name_create(lovname, mdtname, "-mdtlov");
2020 }
2021
2022 static int name_create_mdt_and_lov(char **logname, char **lovname,
2023                                    struct fs_db *fsdb, int i)
2024 {
2025         int rc;
2026
2027         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2028         if (rc)
2029                 return rc;
2030         /* COMPAT_180 */
2031         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2032                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2033         else
2034                 rc = name_create(lovname, *logname, "-mdtlov");
2035         if (rc) {
2036                 name_destroy(logname);
2037                 *logname = NULL;
2038         }
2039         return rc;
2040 }
2041
2042 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2043                                       struct fs_db *fsdb, int i)
2044 {
2045         char suffix[16];
2046
2047         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2048                 sprintf(suffix, "-osc");
2049         else
2050                 sprintf(suffix, "-osc-MDT%04x", i);
2051         return name_create(oscname, ostname, suffix);
2052 }
2053
2054 /* add new mdc to already existent MDS */
2055 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2056                                     struct mgs_device *mgs,
2057                                     struct fs_db *fsdb,
2058                                     struct mgs_target_info *mti,
2059                                     int mdt_index, char *logname)
2060 {
2061         struct llog_handle      *llh = NULL;
2062         char    *nodeuuid = NULL;
2063         char    *ospname = NULL;
2064         char    *lovuuid = NULL;
2065         char    *mdtuuid = NULL;
2066         char    *svname = NULL;
2067         char    *mdtname = NULL;
2068         char    *lovname = NULL;
2069         char    index_str[16];
2070         int     i, rc;
2071
2072         ENTRY;
2073         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2074                 CERROR("log is empty! Logical error\n");
2075                 RETURN (-EINVAL);
2076         }
2077
2078         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2079                logname);
2080
2081         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2082         if (rc)
2083                 RETURN(rc);
2084
2085         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2086         if (rc)
2087                 GOTO(out_destory, rc);
2088
2089         rc = name_create(&svname, mdtname, "-osp");
2090         if (rc)
2091                 GOTO(out_destory, rc);
2092
2093         sprintf(index_str, "-MDT%04x", mdt_index);
2094         rc = name_create(&ospname, svname, index_str);
2095         if (rc)
2096                 GOTO(out_destory, rc);
2097
2098         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2099         if (rc)
2100                 GOTO(out_destory, rc);
2101
2102         rc = name_create(&lovuuid, lovname, "_UUID");
2103         if (rc)
2104                 GOTO(out_destory, rc);
2105
2106         rc = name_create(&mdtuuid, mdtname, "_UUID");
2107         if (rc)
2108                 GOTO(out_destory, rc);
2109
2110         rc = record_start_log(env, mgs, &llh, logname);
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2115                            "add osp");
2116         if (rc)
2117                 GOTO(out_destory, rc);
2118
2119         for (i = 0; i < mti->mti_nid_count; i++) {
2120                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2121                        libcfs_nid2str(mti->mti_nids[i]));
2122                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2123                 if (rc)
2124                         GOTO(out_end, rc);
2125         }
2126
2127         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2128         if (rc)
2129                 GOTO(out_end, rc);
2130
2131         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2132                           NULL, NULL);
2133         if (rc)
2134                 GOTO(out_end, rc);
2135
2136         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2137         if (rc)
2138                 GOTO(out_end, rc);
2139
2140         /* Add mdc(osp) to lod */
2141         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2142                  mti->mti_stripe_index);
2143         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2144                          index_str, "1", NULL);
2145         if (rc)
2146                 GOTO(out_end, rc);
2147
2148         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2149         if (rc)
2150                 GOTO(out_end, rc);
2151
2152 out_end:
2153         record_end_log(env, &llh);
2154
2155 out_destory:
2156         name_destroy(&mdtuuid);
2157         name_destroy(&lovuuid);
2158         name_destroy(&lovname);
2159         name_destroy(&ospname);
2160         name_destroy(&svname);
2161         name_destroy(&nodeuuid);
2162         name_destroy(&mdtname);
2163         RETURN(rc);
2164 }
2165
2166 static int mgs_write_log_mdt0(const struct lu_env *env,
2167                               struct mgs_device *mgs,
2168                               struct fs_db *fsdb,
2169                               struct mgs_target_info *mti)
2170 {
2171         char *log = mti->mti_svname;
2172         struct llog_handle *llh = NULL;
2173         char *uuid, *lovname;
2174         char mdt_index[6];
2175         char *ptr = mti->mti_params;
2176         int rc = 0, failout = 0;
2177         ENTRY;
2178
2179         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2180         if (uuid == NULL)
2181                 RETURN(-ENOMEM);
2182
2183         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2184                 failout = (strncmp(ptr, "failout", 7) == 0);
2185
2186         rc = name_create(&lovname, log, "-mdtlov");
2187         if (rc)
2188                 GOTO(out_free, rc);
2189         if (mgs_log_is_empty(env, mgs, log)) {
2190                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2191                 if (rc)
2192                         GOTO(out_lod, rc);
2193         }
2194
2195         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2196
2197         rc = record_start_log(env, mgs, &llh, log);
2198         if (rc)
2199                 GOTO(out_lod, rc);
2200
2201         /* add MDT itself */
2202
2203         /* FIXME this whole fn should be a single journal transaction */
2204         sprintf(uuid, "%s_UUID", log);
2205         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2206         if (rc)
2207                 GOTO(out_lod, rc);
2208         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2209         if (rc)
2210                 GOTO(out_end, rc);
2211         rc = record_mount_opt(env, llh, log, lovname, NULL);
2212         if (rc)
2213                 GOTO(out_end, rc);
2214         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2215                         failout ? "n" : "f");
2216         if (rc)
2217                 GOTO(out_end, rc);
2218         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2219         if (rc)
2220                 GOTO(out_end, rc);
2221 out_end:
2222         record_end_log(env, &llh);
2223 out_lod:
2224         name_destroy(&lovname);
2225 out_free:
2226         OBD_FREE(uuid, sizeof(struct obd_uuid));
2227         RETURN(rc);
2228 }
2229
2230 /* envelope method for all layers log */
2231 static int mgs_write_log_mdt(const struct lu_env *env,
2232                              struct mgs_device *mgs,
2233                              struct fs_db *fsdb,
2234                              struct mgs_target_info *mti)
2235 {
2236         struct mgs_thread_info *mgi = mgs_env_info(env);
2237         struct llog_handle *llh = NULL;
2238         char *cliname;
2239         int rc, i = 0;
2240         ENTRY;
2241
2242         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2243
2244         if (mti->mti_uuid[0] == '\0') {
2245                 /* Make up our own uuid */
2246                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2247                          "%s_UUID", mti->mti_svname);
2248         }
2249
2250         /* add mdt */
2251         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2252         if (rc)
2253                 RETURN(rc);
2254         /* Append the mdt info to the client log */
2255         rc = name_create(&cliname, mti->mti_fsname, "-client");
2256         if (rc)
2257                 RETURN(rc);
2258
2259         if (mgs_log_is_empty(env, mgs, cliname)) {
2260                 /* Start client log */
2261                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2262                                        fsdb->fsdb_clilov);
2263                 if (rc)
2264                         GOTO(out_free, rc);
2265                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2266                                        fsdb->fsdb_clilmv);
2267                 if (rc)
2268                         GOTO(out_free, rc);
2269         }
2270
2271         /*
2272         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2273         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2274         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2275         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2276         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2277         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2278         */
2279
2280                 /* copy client info about lov/lmv */
2281                 mgi->mgi_comp.comp_mti = mti;
2282                 mgi->mgi_comp.comp_fsdb = fsdb;
2283
2284                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2285                                                         &mgi->mgi_comp);
2286                 if (rc)
2287                         GOTO(out_free, rc);
2288                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2289                                               fsdb->fsdb_clilmv);
2290                 if (rc)
2291                         GOTO(out_free, rc);
2292
2293                 /* add mountopts */
2294                 rc = record_start_log(env, mgs, &llh, cliname);
2295                 if (rc)
2296                         GOTO(out_free, rc);
2297
2298                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2299                                    "mount opts");
2300                 if (rc)
2301                         GOTO(out_end, rc);
2302                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2303                                       fsdb->fsdb_clilmv);
2304                 if (rc)
2305                         GOTO(out_end, rc);
2306                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2307                                    "mount opts");
2308
2309         if (rc)
2310                 GOTO(out_end, rc);
2311
2312         /* for_all_existing_mdt except current one */
2313         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2314                 if (i !=  mti->mti_stripe_index &&
2315                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2316                         char *logname;
2317
2318                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2319                         if (rc)
2320                                 GOTO(out_end, rc);
2321
2322                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2323                                                       i, logname);
2324                         name_destroy(&logname);
2325                         if (rc)
2326                                 GOTO(out_end, rc);
2327                 }
2328         }
2329 out_end:
2330         record_end_log(env, &llh);
2331 out_free:
2332         name_destroy(&cliname);
2333         RETURN(rc);
2334 }
2335
2336 /* Add the ost info to the client/mdt lov */
2337 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2338                                     struct mgs_device *mgs, struct fs_db *fsdb,
2339                                     struct mgs_target_info *mti,
2340                                     char *logname, char *suffix, char *lovname,
2341                                     enum lustre_sec_part sec_part, int flags)
2342 {
2343         struct llog_handle *llh = NULL;
2344         char *nodeuuid = NULL;
2345         char *oscname = NULL;
2346         char *oscuuid = NULL;
2347         char *lovuuid = NULL;
2348         char *svname = NULL;
2349         char index[6];
2350         int i, rc;
2351
2352         ENTRY;
2353         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2354                mti->mti_svname, logname);
2355
2356         if (mgs_log_is_empty(env, mgs, logname)) {
2357                 CERROR("log is empty! Logical error\n");
2358                 RETURN (-EINVAL);
2359         }
2360
2361         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2362         if (rc)
2363                 RETURN(rc);
2364         rc = name_create(&svname, mti->mti_svname, "-osc");
2365         if (rc)
2366                 GOTO(out_free, rc);
2367
2368         /* for the system upgraded from old 1.8, keep using the old osc naming
2369          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2370         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2371                 rc = name_create(&oscname, svname, "");
2372         else
2373                 rc = name_create(&oscname, svname, suffix);
2374         if (rc)
2375                 GOTO(out_free, rc);
2376
2377         rc = name_create(&oscuuid, oscname, "_UUID");
2378         if (rc)
2379                 GOTO(out_free, rc);
2380         rc = name_create(&lovuuid, lovname, "_UUID");
2381         if (rc)
2382                 GOTO(out_free, rc);
2383
2384
2385         /*
2386         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2387         multihomed (#4)
2388         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2389         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2390         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2391         failover (#6,7)
2392         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2393         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2394         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2395         */
2396
2397         rc = record_start_log(env, mgs, &llh, logname);
2398         if (rc)
2399                 GOTO(out_free, rc);
2400
2401         /* FIXME these should be a single journal transaction */
2402         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2403                            "add osc");
2404         if (rc)
2405                 GOTO(out_end, rc);
2406
2407         /* NB: don't change record order, because upon MDT steal OSC config
2408          * from client, it treats all nids before LCFG_SETUP as target nids
2409          * (multiple interfaces), while nids after as failover node nids.
2410          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2411          */
2412         for (i = 0; i < mti->mti_nid_count; i++) {
2413                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2414                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2415                 if (rc)
2416                         GOTO(out_end, rc);
2417         }
2418         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2419         if (rc)
2420                 GOTO(out_end, rc);
2421         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2422         if (rc)
2423                 GOTO(out_end, rc);
2424         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2425         if (rc)
2426                 GOTO(out_end, rc);
2427
2428         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2429
2430         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2431         if (rc)
2432                 GOTO(out_end, rc);
2433         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2434                            "add osc");
2435         if (rc)
2436                 GOTO(out_end, rc);
2437 out_end:
2438         record_end_log(env, &llh);
2439 out_free:
2440         name_destroy(&lovuuid);
2441         name_destroy(&oscuuid);
2442         name_destroy(&oscname);
2443         name_destroy(&svname);
2444         name_destroy(&nodeuuid);
2445         RETURN(rc);
2446 }
2447
2448 static int mgs_write_log_ost(const struct lu_env *env,
2449                              struct mgs_device *mgs, struct fs_db *fsdb,
2450                              struct mgs_target_info *mti)
2451 {
2452         struct llog_handle *llh = NULL;
2453         char *logname, *lovname;
2454         char *ptr = mti->mti_params;
2455         int rc, flags = 0, failout = 0, i;
2456         ENTRY;
2457
2458         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2459
2460         /* The ost startup log */
2461
2462         /* If the ost log already exists, that means that someone reformatted
2463            the ost and it called target_add again. */
2464         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2465                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2466                                    "exists, yet the server claims it never "
2467                                    "registered. It may have been reformatted, "
2468                                    "or the index changed. writeconf the MDT to "
2469                                    "regenerate all logs.\n", mti->mti_svname);
2470                 RETURN(-EALREADY);
2471         }
2472
2473         /*
2474         attach obdfilter ost1 ost1_UUID
2475         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2476         */
2477         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2478                 failout = (strncmp(ptr, "failout", 7) == 0);
2479         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2480         if (rc)
2481                 RETURN(rc);
2482         /* FIXME these should be a single journal transaction */
2483         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2484         if (rc)
2485                 GOTO(out_end, rc);
2486         if (*mti->mti_uuid == '\0')
2487                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2488                          "%s_UUID", mti->mti_svname);
2489         rc = record_attach(env, llh, mti->mti_svname,
2490                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2491         if (rc)
2492                 GOTO(out_end, rc);
2493         rc = record_setup(env, llh, mti->mti_svname,
2494                           "dev"/*ignored*/, "type"/*ignored*/,
2495                           failout ? "n" : "f", 0/*options*/);
2496         if (rc)
2497                 GOTO(out_end, rc);
2498         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2499         if (rc)
2500                 GOTO(out_end, rc);
2501 out_end:
2502         record_end_log(env, &llh);
2503         if (rc)
2504                 RETURN(rc);
2505         /* We also have to update the other logs where this osc is part of
2506            the lov */
2507
2508         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2509                 /* If we're upgrading, the old mdt log already has our
2510                    entry. Let's do a fake one for fun. */
2511                 /* Note that we can't add any new failnids, since we don't
2512                    know the old osc names. */
2513                 flags = CM_SKIP | CM_UPGRADE146;
2514
2515         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2516                 /* If the update flag isn't set, don't update client/mdt
2517                    logs. */
2518                 flags |= CM_SKIP;
2519                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2520                               "the MDT first to regenerate it.\n",
2521                               mti->mti_svname);
2522         }
2523
2524         /* Add ost to all MDT lov defs */
2525         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2526                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2527                         char mdt_index[9];
2528
2529                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2530                                                      i);
2531                         if (rc)
2532                                 RETURN(rc);
2533                         sprintf(mdt_index, "-MDT%04x", i);
2534                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2535                                                       logname, mdt_index,
2536                                                       lovname, LUSTRE_SP_MDT,
2537                                                       flags);
2538                         name_destroy(&logname);
2539                         name_destroy(&lovname);
2540                         if (rc)
2541                                 RETURN(rc);
2542                 }
2543         }
2544
2545         /* Append ost info to the client log */
2546         rc = name_create(&logname, mti->mti_fsname, "-client");
2547         if (rc)
2548                 RETURN(rc);
2549         if (mgs_log_is_empty(env, mgs, logname)) {
2550                 /* Start client log */
2551                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2552                                        fsdb->fsdb_clilov);
2553                 if (rc)
2554                         GOTO(out_free, rc);
2555                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2556                                        fsdb->fsdb_clilmv);
2557                 if (rc)
2558                         GOTO(out_free, rc);
2559         }
2560         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2561                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2562 out_free:
2563         name_destroy(&logname);
2564         RETURN(rc);
2565 }
2566
2567 static __inline__ int mgs_param_empty(char *ptr)
2568 {
2569         char *tmp;
2570
2571         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2572                 return 1;
2573         return 0;
2574 }
2575
2576 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2577                                           struct mgs_device *mgs,
2578                                           struct fs_db *fsdb,
2579                                           struct mgs_target_info *mti,
2580                                           char *logname, char *cliname)
2581 {
2582         int rc;
2583         struct llog_handle *llh = NULL;
2584
2585         if (mgs_param_empty(mti->mti_params)) {
2586                 /* Remove _all_ failnids */
2587                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2588                                 mti->mti_svname, "add failnid", CM_SKIP);
2589                 return rc < 0 ? rc : 0;
2590         }
2591
2592         /* Otherwise failover nids are additive */
2593         rc = record_start_log(env, mgs, &llh, logname);
2594         if (rc)
2595                 return rc;
2596                 /* FIXME this should be a single journal transaction */
2597         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2598                            "add failnid");
2599         if (rc)
2600                 goto out_end;
2601         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2602         if (rc)
2603                 goto out_end;
2604         rc = record_marker(env, llh, fsdb, CM_END,
2605                            mti->mti_svname, "add failnid");
2606 out_end:
2607         record_end_log(env, &llh);
2608         return rc;
2609 }
2610
2611
2612 /* Add additional failnids to an existing log.
2613    The mdc/osc must have been added to logs first */
2614 /* tcp nids must be in dotted-quad ascii -
2615    we can't resolve hostnames from the kernel. */
2616 static int mgs_write_log_add_failnid(const struct lu_env *env,
2617                                      struct mgs_device *mgs,
2618                                      struct fs_db *fsdb,
2619                                      struct mgs_target_info *mti)
2620 {
2621         char *logname, *cliname;
2622         int rc;
2623         ENTRY;
2624
2625         /* FIXME we currently can't erase the failnids
2626          * given when a target first registers, since they aren't part of
2627          * an "add uuid" stanza */
2628
2629         /* Verify that we know about this target */
2630         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2631                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2632                                    "yet. It must be started before failnids "
2633                                    "can be added.\n", mti->mti_svname);
2634                 RETURN(-ENOENT);
2635         }
2636
2637         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2638         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2639                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2640         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2641                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2642         } else {
2643                 RETURN(-EINVAL);
2644         }
2645         if (rc)
2646                 RETURN(rc);
2647         /* Add failover nids to the client log */
2648         rc = name_create(&logname, mti->mti_fsname, "-client");
2649         if (rc) {
2650                 name_destroy(&cliname);
2651                 RETURN(rc);
2652         }
2653         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2654         name_destroy(&logname);
2655         name_destroy(&cliname);
2656         if (rc)
2657                 RETURN(rc);
2658
2659         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2660                 /* Add OST failover nids to the MDT logs as well */
2661                 int i;
2662
2663                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2664                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2665                                 continue;
2666                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2667                         if (rc)
2668                                 RETURN(rc);
2669                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2670                                                  fsdb, i);
2671                         if (rc) {
2672                                 name_destroy(&logname);
2673                                 RETURN(rc);
2674                         }
2675                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2676                                                             mti, logname,
2677                                                             cliname);
2678                         name_destroy(&cliname);
2679                         name_destroy(&logname);
2680                         if (rc)
2681                                 RETURN(rc);
2682                 }
2683         }
2684
2685         RETURN(rc);
2686 }
2687
2688 static int mgs_wlp_lcfg(const struct lu_env *env,
2689                         struct mgs_device *mgs, struct fs_db *fsdb,
2690                         struct mgs_target_info *mti,
2691                         char *logname, struct lustre_cfg_bufs *bufs,
2692                         char *tgtname, char *ptr)
2693 {
2694         char comment[MTI_NAME_MAXLEN];
2695         char *tmp;
2696         struct llog_cfg_rec *lcr;
2697         int rc, del;
2698
2699         /* Erase any old settings of this same parameter */
2700         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2701         comment[MTI_NAME_MAXLEN - 1] = 0;
2702         /* But don't try to match the value. */
2703         tmp = strchr(comment, '=');
2704         if (tmp != NULL)
2705                 *tmp = 0;
2706         /* FIXME we should skip settings that are the same as old values */
2707         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2708         if (rc < 0)
2709                 return rc;
2710         del = mgs_param_empty(ptr);
2711
2712         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2713                       "Setting" : "Modifying", tgtname, comment, logname);
2714         if (del) {
2715                 /* mgs_modify() will return 1 if nothing had to be done */
2716                 if (rc == 1)
2717                         rc = 0;
2718                 return rc;
2719         }
2720
2721         lustre_cfg_bufs_reset(bufs, tgtname);
2722         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2723         if (mti->mti_flags & LDD_F_PARAM2)
2724                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2725
2726         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2727                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2728         if (lcr == NULL)
2729                 return -ENOMEM;
2730
2731         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2732                                   comment);
2733         lustre_cfg_rec_free(lcr);
2734         return rc;
2735 }
2736
2737 static int mgs_write_log_param2(const struct lu_env *env,
2738                                 struct mgs_device *mgs,
2739                                 struct fs_db *fsdb,
2740                                 struct mgs_target_info *mti, char *ptr)
2741 {
2742         struct lustre_cfg_bufs  bufs;
2743         int                     rc = 0;
2744         ENTRY;
2745
2746         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2747         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2748                           mti->mti_svname, ptr);
2749
2750         RETURN(rc);
2751 }
2752
2753 /* write global variable settings into log */
2754 static int mgs_write_log_sys(const struct lu_env *env,
2755                              struct mgs_device *mgs, struct fs_db *fsdb,
2756                              struct mgs_target_info *mti, char *sys, char *ptr)
2757 {
2758         struct mgs_thread_info  *mgi = mgs_env_info(env);
2759         struct lustre_cfg       *lcfg;
2760         struct llog_cfg_rec     *lcr;
2761         char *tmp, sep;
2762         int rc, cmd, convert = 1;
2763
2764         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2765                 cmd = LCFG_SET_TIMEOUT;
2766         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2767                 cmd = LCFG_SET_LDLM_TIMEOUT;
2768         /* Check for known params here so we can return error to lctl */
2769         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2770                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2771                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2772                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2773                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2774                 cmd = LCFG_PARAM;
2775         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2776                 convert = 0; /* Don't convert string value to integer */
2777                 cmd = LCFG_PARAM;
2778         } else {
2779                 return -EINVAL;
2780         }
2781
2782         if (mgs_param_empty(ptr))
2783                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2784         else
2785                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2786
2787         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2788         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2789         if (!convert && *tmp != '\0')
2790                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2791         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2792         if (lcr == NULL)
2793                 return -ENOMEM;
2794
2795         lcfg = &lcr->lcr_cfg;
2796         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2797         /* truncate the comment to the parameter name */
2798         ptr = tmp - 1;
2799         sep = *ptr;
2800         *ptr = '\0';
2801         /* modify all servers and clients */
2802         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2803                                       *tmp == '\0' ? NULL : lcr,
2804                                       mti->mti_fsname, sys, 0);
2805         if (rc == 0 && *tmp != '\0') {
2806                 switch (cmd) {
2807                 case LCFG_SET_TIMEOUT:
2808                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2809                                 class_process_config(lcfg);
2810                         break;
2811                 case LCFG_SET_LDLM_TIMEOUT:
2812                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2813                                 class_process_config(lcfg);
2814                         break;
2815                 default:
2816                         break;
2817                 }
2818         }
2819         *ptr = sep;
2820         lustre_cfg_rec_free(lcr);
2821         return rc;
2822 }
2823
2824 /* write quota settings into log */
2825 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2826                                struct fs_db *fsdb, struct mgs_target_info *mti,
2827                                char *quota, char *ptr)
2828 {
2829         struct mgs_thread_info  *mgi = mgs_env_info(env);
2830         struct llog_cfg_rec     *lcr;
2831         char                    *tmp;
2832         char                     sep;
2833         int                      rc, cmd = LCFG_PARAM;
2834
2835         /* support only 'meta' and 'data' pools so far */
2836         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2837             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2838                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2839                        "& quota.ost are)\n", ptr);
2840                 return -EINVAL;
2841         }
2842
2843         if (*tmp == '\0') {
2844                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2845         } else {
2846                 CDEBUG(D_MGS, "global '%s'\n", quota);
2847
2848                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2849                     strcmp(tmp, "none") != 0) {
2850                         CERROR("enable option(%s) isn't supported\n", tmp);
2851                         return -EINVAL;
2852                 }
2853         }
2854
2855         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2856         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2857         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2858         if (lcr == NULL)
2859                 return -ENOMEM;
2860
2861         /* truncate the comment to the parameter name */
2862         ptr = tmp - 1;
2863         sep = *ptr;
2864         *ptr = '\0';
2865
2866         /* XXX we duplicated quota enable information in all server
2867          *     config logs, it should be moved to a separate config
2868          *     log once we cleanup the config log for global param. */
2869         /* modify all servers */
2870         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2871                                       *tmp == '\0' ? NULL : lcr,
2872                                       mti->mti_fsname, quota, 1);
2873         *ptr = sep;
2874         lustre_cfg_rec_free(lcr);
2875         return rc < 0 ? rc : 0;
2876 }
2877
2878 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2879                                    struct mgs_device *mgs,
2880                                    struct fs_db *fsdb,
2881                                    struct mgs_target_info *mti,
2882                                    char *param)
2883 {
2884         struct mgs_thread_info  *mgi = mgs_env_info(env);
2885         struct llog_cfg_rec     *lcr;
2886         struct llog_handle      *llh = NULL;
2887         char                    *logname;
2888         char                    *comment, *ptr;
2889         int                      rc, len;
2890
2891         ENTRY;
2892
2893         /* get comment */
2894         ptr = strchr(param, '=');
2895         LASSERT(ptr != NULL);
2896         len = ptr - param;
2897
2898         OBD_ALLOC(comment, len + 1);
2899         if (comment == NULL)
2900                 RETURN(-ENOMEM);
2901         strncpy(comment, param, len);
2902         comment[len] = '\0';
2903
2904         /* prepare lcfg */
2905         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2906         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2907         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2908         if (lcr == NULL)
2909                 GOTO(out_comment, rc = -ENOMEM);
2910
2911         /* construct log name */
2912         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2913         if (rc < 0)
2914                 GOTO(out_lcfg, rc);
2915
2916         if (mgs_log_is_empty(env, mgs, logname)) {
2917                 rc = record_start_log(env, mgs, &llh, logname);
2918                 if (rc < 0)
2919                         GOTO(out, rc);
2920                 record_end_log(env, &llh);
2921         }
2922
2923         /* obsolete old one */
2924         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2925                         comment, CM_SKIP);
2926         if (rc < 0)
2927                 GOTO(out, rc);
2928         /* write the new one */
2929         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2930                                   mti->mti_svname, comment);
2931         if (rc)
2932                 CERROR("%s: error writing log %s: rc = %d\n",
2933                        mgs->mgs_obd->obd_name, logname, rc);
2934 out:
2935         name_destroy(&logname);
2936 out_lcfg:
2937         lustre_cfg_rec_free(lcr);
2938 out_comment:
2939         OBD_FREE(comment, len + 1);
2940         RETURN(rc);
2941 }
2942
2943 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2944                                         char *param)
2945 {
2946         char    *ptr;
2947
2948         /* disable the adjustable udesc parameter for now, i.e. use default
2949          * setting that client always ship udesc to MDT if possible. to enable
2950          * it simply remove the following line */
2951         goto error_out;
2952
2953         ptr = strchr(param, '=');
2954         if (ptr == NULL)
2955                 goto error_out;
2956         *ptr++ = '\0';
2957
2958         if (strcmp(param, PARAM_SRPC_UDESC))
2959                 goto error_out;
2960
2961         if (strcmp(ptr, "yes") == 0) {
2962                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2963                 CWARN("Enable user descriptor shipping from client to MDT\n");
2964         } else if (strcmp(ptr, "no") == 0) {
2965                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2966                 CWARN("Disable user descriptor shipping from client to MDT\n");
2967         } else {
2968                 *(ptr - 1) = '=';
2969                 goto error_out;
2970         }
2971         return 0;
2972
2973 error_out:
2974         CERROR("Invalid param: %s\n", param);
2975         return -EINVAL;
2976 }
2977
2978 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2979                                   const char *svname,
2980                                   char *param)
2981 {
2982         struct sptlrpc_rule      rule;
2983         struct sptlrpc_rule_set *rset;
2984         int                      rc;
2985         ENTRY;
2986
2987         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2988                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2989                 RETURN(-EINVAL);
2990         }
2991
2992         if (strncmp(param, PARAM_SRPC_UDESC,
2993                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2994                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2995         }
2996
2997         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2998                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2999                 RETURN(-EINVAL);
3000         }
3001
3002         param += sizeof(PARAM_SRPC_FLVR) - 1;
3003
3004         rc = sptlrpc_parse_rule(param, &rule);
3005         if (rc)
3006                 RETURN(rc);
3007
3008         /* mgs rules implies must be mgc->mgs */
3009         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3010                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3011                      rule.sr_from != LUSTRE_SP_ANY) ||
3012                     (rule.sr_to != LUSTRE_SP_MGS &&
3013                      rule.sr_to != LUSTRE_SP_ANY))
3014                         RETURN(-EINVAL);
3015         }
3016
3017         /* preapre room for this coming rule. svcname format should be:
3018          * - fsname: general rule
3019          * - fsname-tgtname: target-specific rule
3020          */
3021         if (strchr(svname, '-')) {
3022                 struct mgs_tgt_srpc_conf *tgtconf;
3023                 int                       found = 0;
3024
3025                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3026                      tgtconf = tgtconf->mtsc_next) {
3027                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3028                                 found = 1;
3029                                 break;
3030                         }
3031                 }
3032
3033                 if (!found) {
3034                         int name_len;
3035
3036                         OBD_ALLOC_PTR(tgtconf);
3037                         if (tgtconf == NULL)
3038                                 RETURN(-ENOMEM);
3039
3040                         name_len = strlen(svname);
3041
3042                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3043                         if (tgtconf->mtsc_tgt == NULL) {
3044                                 OBD_FREE_PTR(tgtconf);
3045                                 RETURN(-ENOMEM);
3046                         }
3047                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3048
3049                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3050                         fsdb->fsdb_srpc_tgt = tgtconf;
3051                 }
3052
3053                 rset = &tgtconf->mtsc_rset;
3054         } else {
3055                 rset = &fsdb->fsdb_srpc_gen;
3056         }
3057
3058         rc = sptlrpc_rule_set_merge(rset, &rule);
3059
3060         RETURN(rc);
3061 }
3062
3063 static int mgs_srpc_set_param(const struct lu_env *env,
3064                               struct mgs_device *mgs,
3065                               struct fs_db *fsdb,
3066                               struct mgs_target_info *mti,
3067                               char *param)
3068 {
3069         char                   *copy;
3070         int                     rc, copy_size;
3071         ENTRY;
3072
3073 #ifndef HAVE_GSS
3074         RETURN(-EINVAL);
3075 #endif
3076         /* keep a copy of original param, which could be destroied
3077          * during parsing */
3078         copy_size = strlen(param) + 1;
3079         OBD_ALLOC(copy, copy_size);
3080         if (copy == NULL)
3081                 return -ENOMEM;
3082         memcpy(copy, param, copy_size);
3083
3084         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3085         if (rc)
3086                 goto out_free;
3087
3088         /* previous steps guaranteed the syntax is correct */
3089         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3090         if (rc)
3091                 goto out_free;
3092
3093         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3094                 /*
3095                  * for mgs rules, make them effective immediately.
3096                  */
3097                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3098                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3099                                                  &fsdb->fsdb_srpc_gen);
3100         }
3101
3102 out_free:
3103         OBD_FREE(copy, copy_size);
3104         RETURN(rc);
3105 }
3106
3107 struct mgs_srpc_read_data {
3108         struct fs_db   *msrd_fsdb;
3109         int             msrd_skip;
3110 };
3111
3112 static int mgs_srpc_read_handler(const struct lu_env *env,
3113                                  struct llog_handle *llh,
3114                                  struct llog_rec_hdr *rec, void *data)
3115 {
3116         struct mgs_srpc_read_data *msrd = data;
3117         struct cfg_marker         *marker;
3118         struct lustre_cfg         *lcfg = REC_DATA(rec);
3119         char                      *svname, *param;
3120         int                        cfg_len, rc;
3121         ENTRY;
3122
3123         if (rec->lrh_type != OBD_CFG_REC) {
3124                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3125                 RETURN(-EINVAL);
3126         }
3127
3128         cfg_len = REC_DATA_LEN(rec);
3129
3130         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3131         if (rc) {
3132                 CERROR("Insane cfg\n");
3133                 RETURN(rc);
3134         }
3135
3136         if (lcfg->lcfg_command == LCFG_MARKER) {
3137                 marker = lustre_cfg_buf(lcfg, 1);
3138
3139                 if (marker->cm_flags & CM_START &&
3140                     marker->cm_flags & CM_SKIP)
3141                         msrd->msrd_skip = 1;
3142                 if (marker->cm_flags & CM_END)
3143                         msrd->msrd_skip = 0;
3144
3145                 RETURN(0);
3146         }
3147
3148         if (msrd->msrd_skip)
3149                 RETURN(0);
3150
3151         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3152                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3153                 RETURN(0);
3154         }
3155
3156         svname = lustre_cfg_string(lcfg, 0);
3157         if (svname == NULL) {
3158                 CERROR("svname is empty\n");
3159                 RETURN(0);
3160         }
3161
3162         param = lustre_cfg_string(lcfg, 1);
3163         if (param == NULL) {
3164                 CERROR("param is empty\n");
3165                 RETURN(0);
3166         }
3167
3168         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3169         if (rc)
3170                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3171
3172         RETURN(0);
3173 }
3174
3175 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3176                                 struct mgs_device *mgs,
3177                                 struct fs_db *fsdb)
3178 {
3179         struct llog_handle        *llh = NULL;
3180         struct llog_ctxt          *ctxt;
3181         char                      *logname;
3182         struct mgs_srpc_read_data  msrd;
3183         int                        rc;
3184         ENTRY;
3185
3186         /* construct log name */
3187         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3188         if (rc)
3189                 RETURN(rc);
3190
3191         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3192         LASSERT(ctxt != NULL);
3193
3194         if (mgs_log_is_empty(env, mgs, logname))
3195                 GOTO(out, rc = 0);
3196
3197         rc = llog_open(env, ctxt, &llh, NULL, logname,
3198                        LLOG_OPEN_EXISTS);
3199         if (rc < 0) {
3200                 if (rc == -ENOENT)
3201                         rc = 0;
3202                 GOTO(out, rc);
3203         }
3204
3205         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3206         if (rc)
3207                 GOTO(out_close, rc);
3208
3209         if (llog_get_size(llh) <= 1)
3210                 GOTO(out_close, rc = 0);
3211
3212         msrd.msrd_fsdb = fsdb;
3213         msrd.msrd_skip = 0;
3214
3215         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3216                           NULL);
3217
3218 out_close:
3219         llog_close(env, llh);
3220 out:
3221         llog_ctxt_put(ctxt);
3222         name_destroy(&logname);
3223
3224         if (rc)
3225                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3226         RETURN(rc);
3227 }
3228
3229 /* Permanent settings of all parameters by writing into the appropriate
3230  * configuration logs.
3231  * A parameter with null value ("<param>='\0'") means to erase it out of
3232  * the logs.
3233  */
3234 static int mgs_write_log_param(const struct lu_env *env,
3235                                struct mgs_device *mgs, struct fs_db *fsdb,
3236                                struct mgs_target_info *mti, char *ptr)
3237 {
3238         struct mgs_thread_info *mgi = mgs_env_info(env);
3239         char *logname;
3240         char *tmp;
3241         int rc = 0, rc2 = 0;
3242         ENTRY;
3243
3244         /* For various parameter settings, we have to figure out which logs
3245            care about them (e.g. both mdt and client for lov settings) */
3246         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3247
3248         /* The params are stored in MOUNT_DATA_FILE and modified via
3249            tunefs.lustre, or set using lctl conf_param */
3250
3251         /* Processed in lustre_start_mgc */
3252         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3253                 GOTO(end, rc);