Whamcloud - gitweb
LU-4749 mgs: Recognize colons in failover.node values
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(NULL);
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(NULL);
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351         } else {
352                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
353                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
354                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
355                         CERROR("No memory for index maps\n");
356                         GOTO(err, rc = -ENOMEM);
357                 }
358
359                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
360                 if (rc)
361                         GOTO(err, rc);
362                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
363                 if (rc)
364                         GOTO(err, rc);
365
366                 /* initialise data for NID table */
367                 mgs_ir_init_fs(env, mgs, fsdb);
368
369                 lproc_mgs_add_live(mgs, fsdb);
370         }
371
372         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
373
374         RETURN(fsdb);
375 err:
376         if (fsdb->fsdb_ost_index_map)
377                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
378         if (fsdb->fsdb_mdt_index_map)
379                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
380         name_destroy(&fsdb->fsdb_clilov);
381         name_destroy(&fsdb->fsdb_clilmv);
382         OBD_FREE_PTR(fsdb);
383         RETURN(NULL);
384 }
385
386 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
387 {
388         /* wait for anyone with the sem */
389         mutex_lock(&fsdb->fsdb_mutex);
390         lproc_mgs_del_live(mgs, fsdb);
391         list_del(&fsdb->fsdb_list);
392
393         /* deinitialize fsr */
394         mgs_ir_fini_fs(mgs, fsdb);
395
396         if (fsdb->fsdb_ost_index_map)
397                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
398         if (fsdb->fsdb_mdt_index_map)
399                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
400         name_destroy(&fsdb->fsdb_clilov);
401         name_destroy(&fsdb->fsdb_clilmv);
402         mgs_free_fsdb_srpc(fsdb);
403         mutex_unlock(&fsdb->fsdb_mutex);
404         OBD_FREE_PTR(fsdb);
405 }
406
407 int mgs_init_fsdb_list(struct mgs_device *mgs)
408 {
409         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
410         return 0;
411 }
412
413 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
414 {
415         struct fs_db *fsdb;
416         struct list_head *tmp, *tmp2;
417
418         mutex_lock(&mgs->mgs_mutex);
419         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
420                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
421                 mgs_free_fsdb(mgs, fsdb);
422         }
423         mutex_unlock(&mgs->mgs_mutex);
424         return 0;
425 }
426
427 int mgs_find_or_make_fsdb(const struct lu_env *env,
428                           struct mgs_device *mgs, char *name,
429                           struct fs_db **dbh)
430 {
431         struct fs_db *fsdb;
432         int rc = 0;
433
434         ENTRY;
435         mutex_lock(&mgs->mgs_mutex);
436         fsdb = mgs_find_fsdb(mgs, name);
437         if (fsdb) {
438                 mutex_unlock(&mgs->mgs_mutex);
439                 *dbh = fsdb;
440                 RETURN(0);
441         }
442
443         CDEBUG(D_MGS, "Creating new db\n");
444         fsdb = mgs_new_fsdb(env, mgs, name);
445         /* lock fsdb_mutex until the db is loaded from llogs */
446         if (fsdb)
447                 mutex_lock(&fsdb->fsdb_mutex);
448         mutex_unlock(&mgs->mgs_mutex);
449         if (!fsdb)
450                 RETURN(-ENOMEM);
451
452         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
453                 /* populate the db from the client llog */
454                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
455                 if (rc) {
456                         CERROR("Can't get db from client log %d\n", rc);
457                         GOTO(out_free, rc);
458                 }
459         }
460
461         /* populate srpc rules from params llog */
462         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
463         if (rc) {
464                 CERROR("Can't get db from params log %d\n", rc);
465                 GOTO(out_free, rc);
466         }
467
468         mutex_unlock(&fsdb->fsdb_mutex);
469         *dbh = fsdb;
470
471         RETURN(0);
472
473 out_free:
474         mutex_unlock(&fsdb->fsdb_mutex);
475         mgs_free_fsdb(mgs, fsdb);
476         return rc;
477 }
478
479 /* 1 = index in use
480    0 = index unused
481    -1= empty client log */
482 int mgs_check_index(const struct lu_env *env,
483                     struct mgs_device *mgs,
484                     struct mgs_target_info *mti)
485 {
486         struct fs_db *fsdb;
487         void *imap;
488         int rc = 0;
489         ENTRY;
490
491         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
492
493         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
494         if (rc) {
495                 CERROR("Can't get db for %s\n", mti->mti_fsname);
496                 RETURN(rc);
497         }
498
499         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
500                 RETURN(-1);
501
502         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
503                 imap = fsdb->fsdb_ost_index_map;
504         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
505                 imap = fsdb->fsdb_mdt_index_map;
506         else
507                 RETURN(-EINVAL);
508
509         if (test_bit(mti->mti_stripe_index, imap))
510                 RETURN(1);
511         RETURN(0);
512 }
513
514 static __inline__ int next_index(void *index_map, int map_len)
515 {
516         int i;
517         for (i = 0; i < map_len * 8; i++)
518                 if (!test_bit(i, index_map)) {
519                          return i;
520                  }
521         CERROR("max index %d exceeded.\n", i);
522         return -1;
523 }
524
525 /* Return codes:
526         0  newly marked as in use
527         <0 err
528         +EALREADY for update of an old index */
529 static int mgs_set_index(const struct lu_env *env,
530                          struct mgs_device *mgs,
531                          struct mgs_target_info *mti)
532 {
533         struct fs_db *fsdb;
534         void *imap;
535         int rc = 0;
536         ENTRY;
537
538         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
539         if (rc) {
540                 CERROR("Can't get db for %s\n", mti->mti_fsname);
541                 RETURN(rc);
542         }
543
544         mutex_lock(&fsdb->fsdb_mutex);
545         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
546                 imap = fsdb->fsdb_ost_index_map;
547         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
548                 imap = fsdb->fsdb_mdt_index_map;
549         } else {
550                 GOTO(out_up, rc = -EINVAL);
551         }
552
553         if (mti->mti_flags & LDD_F_NEED_INDEX) {
554                 rc = next_index(imap, INDEX_MAP_SIZE);
555                 if (rc == -1)
556                         GOTO(out_up, rc = -ERANGE);
557                 mti->mti_stripe_index = rc;
558                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
559                         fsdb->fsdb_mdt_count ++;
560         }
561
562         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
563                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
564                                    "but the max index is %d.\n",
565                                    mti->mti_svname, mti->mti_stripe_index,
566                                    INDEX_MAP_SIZE * 8);
567                 GOTO(out_up, rc = -ERANGE);
568         }
569
570         if (test_bit(mti->mti_stripe_index, imap)) {
571                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
572                     !(mti->mti_flags & LDD_F_WRITECONF)) {
573                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
574                                            "%d, but that index is already in "
575                                            "use. Use --writeconf to force\n",
576                                            mti->mti_svname,
577                                            mti->mti_stripe_index);
578                         GOTO(out_up, rc = -EADDRINUSE);
579                 } else {
580                         CDEBUG(D_MGS, "Server %s updating index %d\n",
581                                mti->mti_svname, mti->mti_stripe_index);
582                         GOTO(out_up, rc = EALREADY);
583                 }
584         }
585
586         set_bit(mti->mti_stripe_index, imap);
587         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
588         mutex_unlock(&fsdb->fsdb_mutex);
589         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
590                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
591
592         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
593                mti->mti_stripe_index);
594
595         RETURN(0);
596 out_up:
597         mutex_unlock(&fsdb->fsdb_mutex);
598         return rc;
599 }
600
601 struct mgs_modify_lookup {
602         struct cfg_marker mml_marker;
603         int               mml_modified;
604 };
605
606 static int mgs_modify_handler(const struct lu_env *env,
607                               struct llog_handle *llh,
608                               struct llog_rec_hdr *rec, void *data)
609 {
610         struct mgs_modify_lookup *mml = data;
611         struct cfg_marker *marker;
612         struct lustre_cfg *lcfg = REC_DATA(rec);
613         int cfg_len = REC_DATA_LEN(rec);
614         int rc;
615         ENTRY;
616
617         if (rec->lrh_type != OBD_CFG_REC) {
618                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
619                 RETURN(-EINVAL);
620         }
621
622         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
623         if (rc) {
624                 CERROR("Insane cfg\n");
625                 RETURN(rc);
626         }
627
628         /* We only care about markers */
629         if (lcfg->lcfg_command != LCFG_MARKER)
630                 RETURN(0);
631
632         marker = lustre_cfg_buf(lcfg, 1);
633         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
634             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
635             !(marker->cm_flags & CM_SKIP)) {
636                 /* Found a non-skipped marker match */
637                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
638                        rec->lrh_index, marker->cm_step,
639                        marker->cm_flags, mml->mml_marker.cm_flags,
640                        marker->cm_tgtname, marker->cm_comment);
641                 /* Overwrite the old marker llog entry */
642                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
643                 marker->cm_flags |= mml->mml_marker.cm_flags;
644                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
645                 rc = llog_write(env, llh, rec, rec->lrh_index);
646                 if (!rc)
647                          mml->mml_modified++;
648         }
649
650         RETURN(rc);
651 }
652
653 /**
654  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
655  * Return code:
656  * 0 - modified successfully,
657  * 1 - no modification was done
658  * negative - error
659  */
660 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
661                       struct fs_db *fsdb, struct mgs_target_info *mti,
662                       char *logname, char *devname, char *comment, int flags)
663 {
664         struct llog_handle *loghandle;
665         struct llog_ctxt *ctxt;
666         struct mgs_modify_lookup *mml;
667         int rc;
668
669         ENTRY;
670
671         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
672         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
673                flags);
674
675         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
676         LASSERT(ctxt != NULL);
677         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
678         if (rc < 0) {
679                 if (rc == -ENOENT)
680                         rc = 0;
681                 GOTO(out_pop, rc);
682         }
683
684         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
685         if (rc)
686                 GOTO(out_close, rc);
687
688         if (llog_get_size(loghandle) <= 1)
689                 GOTO(out_close, rc = 0);
690
691         OBD_ALLOC_PTR(mml);
692         if (!mml)
693                 GOTO(out_close, rc = -ENOMEM);
694         if (strlcpy(mml->mml_marker.cm_comment, comment,
695                     sizeof(mml->mml_marker.cm_comment)) >=
696             sizeof(mml->mml_marker.cm_comment))
697                 GOTO(out_free, rc = -E2BIG);
698         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
699                     sizeof(mml->mml_marker.cm_tgtname)) >=
700             sizeof(mml->mml_marker.cm_tgtname))
701                 GOTO(out_free, rc = -E2BIG);
702         /* Modify mostly means cancel */
703         mml->mml_marker.cm_flags = flags;
704         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
705         mml->mml_modified = 0;
706         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
707                           NULL);
708         if (!rc && !mml->mml_modified)
709                 rc = 1;
710
711 out_free:
712         OBD_FREE_PTR(mml);
713
714 out_close:
715         llog_close(env, loghandle);
716 out_pop:
717         if (rc < 0)
718                 CERROR("%s: modify %s/%s failed: rc = %d\n",
719                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
720         llog_ctxt_put(ctxt);
721         RETURN(rc);
722 }
723
724 /** This structure is passed to mgs_replace_handler */
725 struct mgs_replace_uuid_lookup {
726         /* Nids are replaced for this target device */
727         struct mgs_target_info target;
728         /* Temporary modified llog */
729         struct llog_handle *temp_llh;
730         /* Flag is set if in target block*/
731         int in_target_device;
732         /* Nids already added. Just skip (multiple nids) */
733         int device_nids_added;
734         /* Flag is set if this block should not be copied */
735         int skip_it;
736 };
737
738 /**
739  * Check: a) if block should be skipped
740  * b) is it target block
741  *
742  * \param[in] lcfg
743  * \param[in] mrul
744  *
745  * \retval 0 should not to be skipped
746  * \retval 1 should to be skipped
747  */
748 static int check_markers(struct lustre_cfg *lcfg,
749                          struct mgs_replace_uuid_lookup *mrul)
750 {
751          struct cfg_marker *marker;
752
753         /* Track markers. Find given device */
754         if (lcfg->lcfg_command == LCFG_MARKER) {
755                 marker = lustre_cfg_buf(lcfg, 1);
756                 /* Clean llog from records marked as CM_EXCLUDE.
757                    CM_SKIP records are used for "active" command
758                    and can be restored if needed */
759                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
760                     (CM_EXCLUDE | CM_START)) {
761                         mrul->skip_it = 1;
762                         return 1;
763                 }
764
765                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
766                     (CM_EXCLUDE | CM_END)) {
767                         mrul->skip_it = 0;
768                         return 1;
769                 }
770
771                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
772                         LASSERT(!(marker->cm_flags & CM_START) ||
773                                 !(marker->cm_flags & CM_END));
774                         if (marker->cm_flags & CM_START) {
775                                 mrul->in_target_device = 1;
776                                 mrul->device_nids_added = 0;
777                         } else if (marker->cm_flags & CM_END)
778                                 mrul->in_target_device = 0;
779                 }
780         }
781
782         return 0;
783 }
784
785 static int record_base(const struct lu_env *env, struct llog_handle *llh,
786                      char *cfgname, lnet_nid_t nid, int cmd,
787                      char *s1, char *s2, char *s3, char *s4)
788 {
789         struct mgs_thread_info  *mgi = mgs_env_info(env);
790         struct llog_cfg_rec     *lcr;
791         int rc;
792
793         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
794                cmd, s1, s2, s3, s4);
795
796         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
797         if (s1)
798                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
799         if (s2)
800                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
801         if (s3)
802                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
803         if (s4)
804                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
805
806         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
807         if (lcr == NULL)
808                 return -ENOMEM;
809
810         lcr->lcr_cfg.lcfg_nid = nid;
811         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
812
813         lustre_cfg_rec_free(lcr);
814
815         if (rc < 0)
816                 CDEBUG(D_MGS,
817                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
818                        cfgname, cmd, s1, s2, s3, s4, rc);
819         return rc;
820 }
821
822 static inline int record_add_uuid(const struct lu_env *env,
823                                   struct llog_handle *llh,
824                                   uint64_t nid, char *uuid)
825 {
826         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
827 }
828
829 static inline int record_add_conn(const struct lu_env *env,
830                                   struct llog_handle *llh,
831                                   char *devname, char *uuid)
832 {
833         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
834 }
835
836 static inline int record_attach(const struct lu_env *env,
837                                 struct llog_handle *llh, char *devname,
838                                 char *type, char *uuid)
839 {
840         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
841 }
842
843 static inline int record_setup(const struct lu_env *env,
844                                struct llog_handle *llh, char *devname,
845                                char *s1, char *s2, char *s3, char *s4)
846 {
847         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
848 }
849
850 /**
851  * \retval <0 record processing error
852  * \retval n record is processed. No need copy original one.
853  * \retval 0 record is not processed.
854  */
855 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
856                            struct mgs_replace_uuid_lookup *mrul)
857 {
858         int nids_added = 0;
859         lnet_nid_t nid;
860         char *ptr;
861         int rc;
862
863         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
864                 /* LCFG_ADD_UUID command found. Let's skip original command
865                    and add passed nids */
866                 ptr = mrul->target.mti_params;
867                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
868                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
869                                "device %s\n", libcfs_nid2str(nid),
870                                 mrul->target.mti_params,
871                                 mrul->target.mti_svname);
872                         rc = record_add_uuid(env,
873                                              mrul->temp_llh, nid,
874                                              mrul->target.mti_params);
875                         if (!rc)
876                                 nids_added++;
877                 }
878
879                 if (nids_added == 0) {
880                         CERROR("No new nids were added, nid %s with uuid %s, "
881                                "device %s\n", libcfs_nid2str(nid),
882                                mrul->target.mti_params,
883                                mrul->target.mti_svname);
884                         RETURN(-ENXIO);
885                 } else {
886                         mrul->device_nids_added = 1;
887                 }
888
889                 return nids_added;
890         }
891
892         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
893                 /* LCFG_SETUP command found. UUID should be changed */
894                 rc = record_setup(env,
895                                   mrul->temp_llh,
896                                   /* devname the same */
897                                   lustre_cfg_string(lcfg, 0),
898                                   /* s1 is not changed */
899                                   lustre_cfg_string(lcfg, 1),
900                                   /* new uuid should be
901                                   the full nidlist */
902                                   mrul->target.mti_params,
903                                   /* s3 is not changed */
904                                   lustre_cfg_string(lcfg, 3),
905                                   /* s4 is not changed */
906                                   lustre_cfg_string(lcfg, 4));
907                 return rc ? rc : 1;
908         }
909
910         /* Another commands in target device block */
911         return 0;
912 }
913
914 /**
915  * Handler that called for every record in llog.
916  * Records are processed in order they placed in llog.
917  *
918  * \param[in] llh       log to be processed
919  * \param[in] rec       current record
920  * \param[in] data      mgs_replace_uuid_lookup structure
921  *
922  * \retval 0    success
923  */
924 static int mgs_replace_handler(const struct lu_env *env,
925                                struct llog_handle *llh,
926                                struct llog_rec_hdr *rec,
927                                void *data)
928 {
929         struct mgs_replace_uuid_lookup *mrul;
930         struct lustre_cfg *lcfg = REC_DATA(rec);
931         int cfg_len = REC_DATA_LEN(rec);
932         int rc;
933         ENTRY;
934
935         mrul = (struct mgs_replace_uuid_lookup *)data;
936
937         if (rec->lrh_type != OBD_CFG_REC) {
938                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
939                        rec->lrh_type, lcfg->lcfg_command,
940                        lustre_cfg_string(lcfg, 0),
941                        lustre_cfg_string(lcfg, 1));
942                 RETURN(-EINVAL);
943         }
944
945         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
946         if (rc) {
947                 /* Do not copy any invalidated records */
948                 GOTO(skip_out, rc = 0);
949         }
950
951         rc = check_markers(lcfg, mrul);
952         if (rc || mrul->skip_it)
953                 GOTO(skip_out, rc = 0);
954
955         /* Write to new log all commands outside target device block */
956         if (!mrul->in_target_device)
957                 GOTO(copy_out, rc = 0);
958
959         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
960            (failover nids) for this target, assuming that if then
961            primary is changing then so is the failover */
962         if (mrul->device_nids_added &&
963             (lcfg->lcfg_command == LCFG_ADD_UUID ||
964              lcfg->lcfg_command == LCFG_ADD_CONN))
965                 GOTO(skip_out, rc = 0);
966
967         rc = process_command(env, lcfg, mrul);
968         if (rc < 0)
969                 RETURN(rc);
970
971         if (rc)
972                 RETURN(0);
973 copy_out:
974         /* Record is placed in temporary llog as is */
975         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
976
977         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
978                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
979                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
980         RETURN(rc);
981
982 skip_out:
983         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
984                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
985                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
986         RETURN(rc);
987 }
988
989 static int mgs_log_is_empty(const struct lu_env *env,
990                             struct mgs_device *mgs, char *name)
991 {
992         struct llog_ctxt        *ctxt;
993         int                      rc;
994
995         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
996         LASSERT(ctxt != NULL);
997
998         rc = llog_is_empty(env, ctxt, name);
999         llog_ctxt_put(ctxt);
1000         return rc;
1001 }
1002
1003 static int mgs_replace_nids_log(const struct lu_env *env,
1004                                 struct obd_device *mgs, struct fs_db *fsdb,
1005                                 char *logname, char *devname, char *nids)
1006 {
1007         struct llog_handle *orig_llh, *backup_llh;
1008         struct llog_ctxt *ctxt;
1009         struct mgs_replace_uuid_lookup *mrul;
1010         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1011         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1012         char *backup;
1013         int rc, rc2;
1014         ENTRY;
1015
1016         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1017
1018         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1019         LASSERT(ctxt != NULL);
1020
1021         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1022                 /* Log is empty. Nothing to replace */
1023                 GOTO(out_put, rc = 0);
1024         }
1025
1026         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1027         if (backup == NULL)
1028                 GOTO(out_put, rc = -ENOMEM);
1029
1030         sprintf(backup, "%s.bak", logname);
1031
1032         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1033         if (rc == 0) {
1034                 /* Now erase original log file. Connections are not allowed.
1035                    Backup is already saved */
1036                 rc = llog_erase(env, ctxt, NULL, logname);
1037                 if (rc < 0)
1038                         GOTO(out_free, rc);
1039         } else if (rc != -ENOENT) {
1040                 CERROR("%s: can't make backup for %s: rc = %d\n",
1041                        mgs->obd_name, logname, rc);
1042                 GOTO(out_free,rc);
1043         }
1044
1045         /* open local log */
1046         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1047         if (rc)
1048                 GOTO(out_restore, rc);
1049
1050         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1051         if (rc)
1052                 GOTO(out_closel, rc);
1053
1054         /* open backup llog */
1055         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1056                        LLOG_OPEN_EXISTS);
1057         if (rc)
1058                 GOTO(out_closel, rc);
1059
1060         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1061         if (rc)
1062                 GOTO(out_close, rc);
1063
1064         if (llog_get_size(backup_llh) <= 1)
1065                 GOTO(out_close, rc = 0);
1066
1067         OBD_ALLOC_PTR(mrul);
1068         if (!mrul)
1069                 GOTO(out_close, rc = -ENOMEM);
1070         /* devname is only needed information to replace UUID records */
1071         strlcpy(mrul->target.mti_svname, devname,
1072                 sizeof(mrul->target.mti_svname));
1073         /* parse nids later */
1074         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1075         /* Copy records to this temporary llog */
1076         mrul->temp_llh = orig_llh;
1077
1078         rc = llog_process(env, backup_llh, mgs_replace_handler,
1079                           (void *)mrul, NULL);
1080         OBD_FREE_PTR(mrul);
1081 out_close:
1082         rc2 = llog_close(NULL, backup_llh);
1083         if (!rc)
1084                 rc = rc2;
1085 out_closel:
1086         rc2 = llog_close(NULL, orig_llh);
1087         if (!rc)
1088                 rc = rc2;
1089
1090 out_restore:
1091         if (rc) {
1092                 CERROR("%s: llog should be restored: rc = %d\n",
1093                        mgs->obd_name, rc);
1094                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1095                                   logname);
1096                 if (rc2 < 0)
1097                         CERROR("%s: can't restore backup %s: rc = %d\n",
1098                                mgs->obd_name, logname, rc2);
1099         }
1100
1101 out_free:
1102         OBD_FREE(backup, strlen(backup) + 1);
1103
1104 out_put:
1105         llog_ctxt_put(ctxt);
1106
1107         if (rc)
1108                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1109                        mgs->obd_name, logname, rc);
1110
1111         RETURN(rc);
1112 }
1113
1114 /**
1115  * Parse device name and get file system name and/or device index
1116  *
1117  * \param[in]   devname device name (ex. lustre-MDT0000)
1118  * \param[out]  fsname  file system name(optional)
1119  * \param[out]  index   device index(optional)
1120  *
1121  * \retval 0    success
1122  */
1123 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1124 {
1125         int rc;
1126         ENTRY;
1127
1128         /* Extract fsname */
1129         if (fsname) {
1130                 rc = server_name2fsname(devname, fsname, NULL);
1131                 if (rc < 0) {
1132                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1133                                devname);
1134                         RETURN(-EINVAL);
1135                 }
1136         }
1137
1138         if (index) {
1139                 rc = server_name2index(devname, index, NULL);
1140                 if (rc < 0) {
1141                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1142                                devname);
1143                         RETURN(-EINVAL);
1144                 }
1145         }
1146
1147         RETURN(0);
1148 }
1149
1150 /* This is only called during replace_nids */
1151 static int only_mgs_is_running(struct obd_device *mgs_obd)
1152 {
1153         /* TDB: Is global variable with devices count exists? */
1154         int num_devices = get_devices_count();
1155         int num_exports = 0;
1156         struct obd_export *exp;
1157
1158         spin_lock(&mgs_obd->obd_dev_lock);
1159         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1160                 /* skip self export */
1161                 if (exp == mgs_obd->obd_self_export)
1162                         continue;
1163                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1164                         continue;
1165
1166                 ++num_exports;
1167
1168                 CERROR("%s: node %s still connected during replace_nids "
1169                        "connect_flags:%llx\n",
1170                        mgs_obd->obd_name,
1171                        libcfs_nid2str(exp->exp_nid_stats->nid),
1172                        exp_connect_flags(exp));
1173
1174         }
1175         spin_unlock(&mgs_obd->obd_dev_lock);
1176
1177         /* osd, MGS and MGC + self_export
1178            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1179         return (num_devices <= 3) && (num_exports == 0);
1180 }
1181
1182 static int name_create_mdt(char **logname, char *fsname, int i)
1183 {
1184         char mdt_index[9];
1185
1186         sprintf(mdt_index, "-MDT%04x", i);
1187         return name_create(logname, fsname, mdt_index);
1188 }
1189
1190 /**
1191  * Replace nids for \a device to \a nids values
1192  *
1193  * \param obd           MGS obd device
1194  * \param devname       nids need to be replaced for this device
1195  * (ex. lustre-OST0000)
1196  * \param nids          nids list (ex. nid1,nid2,nid3)
1197  *
1198  * \retval 0    success
1199  */
1200 int mgs_replace_nids(const struct lu_env *env,
1201                      struct mgs_device *mgs,
1202                      char *devname, char *nids)
1203 {
1204         /* Assume fsname is part of device name */
1205         char fsname[MTI_NAME_MAXLEN];
1206         int rc;
1207         __u32 index;
1208         char *logname;
1209         struct fs_db *fsdb;
1210         unsigned int i;
1211         int conn_state;
1212         struct obd_device *mgs_obd = mgs->mgs_obd;
1213         ENTRY;
1214
1215         /* We can only change NIDs if no other nodes are connected */
1216         spin_lock(&mgs_obd->obd_dev_lock);
1217         conn_state = mgs_obd->obd_no_conn;
1218         mgs_obd->obd_no_conn = 1;
1219         spin_unlock(&mgs_obd->obd_dev_lock);
1220
1221         /* We can not change nids if not only MGS is started */
1222         if (!only_mgs_is_running(mgs_obd)) {
1223                 CERROR("Only MGS is allowed to be started\n");
1224                 GOTO(out, rc = -EINPROGRESS);
1225         }
1226
1227         /* Get fsname and index*/
1228         rc = mgs_parse_devname(devname, fsname, &index);
1229         if (rc)
1230                 GOTO(out, rc);
1231
1232         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1233         if (rc) {
1234                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1235                 GOTO(out, rc);
1236         }
1237
1238         /* Process client llogs */
1239         name_create(&logname, fsname, "-client");
1240         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1241         name_destroy(&logname);
1242         if (rc) {
1243                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1244                        fsname, devname, rc);
1245                 GOTO(out, rc);
1246         }
1247
1248         /* Process MDT llogs */
1249         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1250                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1251                         continue;
1252                 name_create_mdt(&logname, fsname, i);
1253                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1254                 name_destroy(&logname);
1255                 if (rc)
1256                         GOTO(out, rc);
1257         }
1258
1259 out:
1260         spin_lock(&mgs_obd->obd_dev_lock);
1261         mgs_obd->obd_no_conn = conn_state;
1262         spin_unlock(&mgs_obd->obd_dev_lock);
1263
1264         RETURN(rc);
1265 }
1266
1267 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1268                             char *devname, struct lov_desc *desc)
1269 {
1270         struct mgs_thread_info  *mgi = mgs_env_info(env);
1271         struct llog_cfg_rec     *lcr;
1272         int rc;
1273
1274         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1275         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1276         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1277         if (lcr == NULL)
1278                 return -ENOMEM;
1279
1280         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1281         lustre_cfg_rec_free(lcr);
1282         return rc;
1283 }
1284
1285 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1286                             char *devname, struct lmv_desc *desc)
1287 {
1288         struct mgs_thread_info  *mgi = mgs_env_info(env);
1289         struct llog_cfg_rec     *lcr;
1290         int rc;
1291
1292         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1293         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1294         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1295         if (lcr == NULL)
1296                 return -ENOMEM;
1297
1298         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1299         lustre_cfg_rec_free(lcr);
1300         return rc;
1301 }
1302
1303 static inline int record_mdc_add(const struct lu_env *env,
1304                                  struct llog_handle *llh,
1305                                  char *logname, char *mdcuuid,
1306                                  char *mdtuuid, char *index,
1307                                  char *gen)
1308 {
1309         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1310                            mdtuuid,index,gen,mdcuuid);
1311 }
1312
1313 static inline int record_lov_add(const struct lu_env *env,
1314                                  struct llog_handle *llh,
1315                                  char *lov_name, char *ost_uuid,
1316                                  char *index, char *gen)
1317 {
1318         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1319                            ost_uuid, index, gen, 0);
1320 }
1321
1322 static inline int record_mount_opt(const struct lu_env *env,
1323                                    struct llog_handle *llh,
1324                                    char *profile, char *lov_name,
1325                                    char *mdc_name)
1326 {
1327         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1328                            profile,lov_name,mdc_name,0);
1329 }
1330
1331 static int record_marker(const struct lu_env *env,
1332                          struct llog_handle *llh,
1333                          struct fs_db *fsdb, __u32 flags,
1334                          char *tgtname, char *comment)
1335 {
1336         struct mgs_thread_info *mgi = mgs_env_info(env);
1337         struct llog_cfg_rec *lcr;
1338         int rc;
1339         int cplen = 0;
1340
1341         if (flags & CM_START)
1342                 fsdb->fsdb_gen++;
1343         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1344         mgi->mgi_marker.cm_flags = flags;
1345         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1346         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1347                         sizeof(mgi->mgi_marker.cm_tgtname));
1348         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1349                 return -E2BIG;
1350         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1351                         sizeof(mgi->mgi_marker.cm_comment));
1352         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1353                 return -E2BIG;
1354         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1355         mgi->mgi_marker.cm_canceltime = 0;
1356         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1357         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1358                             sizeof(mgi->mgi_marker));
1359         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1360         if (lcr == NULL)
1361                 return -ENOMEM;
1362
1363         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1364         lustre_cfg_rec_free(lcr);
1365         return rc;
1366 }
1367
1368 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1369                             struct llog_handle **llh, char *name)
1370 {
1371         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1372         struct llog_ctxt        *ctxt;
1373         int                      rc = 0;
1374         ENTRY;
1375
1376         if (*llh)
1377                 GOTO(out, rc = -EBUSY);
1378
1379         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1380         if (!ctxt)
1381                 GOTO(out, rc = -ENODEV);
1382         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1383
1384         rc = llog_open_create(env, ctxt, llh, NULL, name);
1385         if (rc)
1386                 GOTO(out_ctxt, rc);
1387         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1388         if (rc)
1389                 llog_close(env, *llh);
1390 out_ctxt:
1391         llog_ctxt_put(ctxt);
1392 out:
1393         if (rc) {
1394                 CERROR("%s: can't start log %s: rc = %d\n",
1395                        mgs->mgs_obd->obd_name, name, rc);
1396                 *llh = NULL;
1397         }
1398         RETURN(rc);
1399 }
1400
1401 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1402 {
1403         int rc;
1404
1405         rc = llog_close(env, *llh);
1406         *llh = NULL;
1407
1408         return rc;
1409 }
1410
1411 /******************** config "macros" *********************/
1412
1413 /* write an lcfg directly into a log (with markers) */
1414 static int mgs_write_log_direct(const struct lu_env *env,
1415                                 struct mgs_device *mgs, struct fs_db *fsdb,
1416                                 char *logname, struct llog_cfg_rec *lcr,
1417                                 char *devname, char *comment)
1418 {
1419         struct llog_handle *llh = NULL;
1420         int rc;
1421
1422         ENTRY;
1423
1424         rc = record_start_log(env, mgs, &llh, logname);
1425         if (rc)
1426                 RETURN(rc);
1427
1428         /* FIXME These should be a single journal transaction */
1429         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1430         if (rc)
1431                 GOTO(out_end, rc);
1432         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1433         if (rc)
1434                 GOTO(out_end, rc);
1435         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1436         if (rc)
1437                 GOTO(out_end, rc);
1438 out_end:
1439         record_end_log(env, &llh);
1440         RETURN(rc);
1441 }
1442
1443 /* write the lcfg in all logs for the given fs */
1444 int mgs_write_log_direct_all(const struct lu_env *env, struct mgs_device *mgs,
1445                              struct fs_db *fsdb, struct mgs_target_info *mti,
1446                              struct llog_cfg_rec *lcr, char *devname,
1447                              char *comment, int server_only)
1448 {
1449         struct list_head         log_list;
1450         struct mgs_direntry     *dirent, *n;
1451         char                    *fsname = mti->mti_fsname;
1452         int                      rc = 0, len = strlen(fsname);
1453
1454         ENTRY;
1455         /* Find all the logs in the CONFIGS directory */
1456         rc = class_dentry_readdir(env, mgs, &log_list);
1457         if (rc)
1458                 RETURN(rc);
1459
1460         /* Could use fsdb index maps instead of directory listing */
1461         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1462                 list_del_init(&dirent->mde_list);
1463                 /* don't write to sptlrpc rule log */
1464                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1465                         goto next;
1466
1467                 /* caller wants write server logs only */
1468                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1469                         goto next;
1470
1471                 if (strncmp(fsname, dirent->mde_name, len) != 0)
1472                         goto next;
1473
1474                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1475                 /* Erase any old settings of this same parameter */
1476                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1477                                 devname, comment, CM_SKIP);
1478                 if (rc < 0)
1479                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1480                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1481                 if (lcr == NULL)
1482                         goto next;
1483                 /* Write the new one */
1484                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1485                                           lcr, devname, comment);
1486                 if (rc != 0)
1487                         CERROR("%s: writing log %s: rc = %d\n",
1488                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1489 next:
1490                 mgs_direntry_free(dirent);
1491         }
1492
1493         RETURN(rc);
1494 }
1495
1496 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1497                                     struct mgs_device *mgs,
1498                                     struct fs_db *fsdb,
1499                                     struct mgs_target_info *mti,
1500                                     int index, char *logname);
1501 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1502                                     struct mgs_device *mgs,
1503                                     struct fs_db *fsdb,
1504                                     struct mgs_target_info *mti,
1505                                     char *logname, char *suffix, char *lovname,
1506                                     enum lustre_sec_part sec_part, int flags);
1507 static int name_create_mdt_and_lov(char **logname, char **lovname,
1508                                    struct fs_db *fsdb, int i);
1509
1510 static int add_param(char *params, char *key, char *val)
1511 {
1512         char *start = params + strlen(params);
1513         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1514         int keylen = 0;
1515
1516         if (key != NULL)
1517                 keylen = strlen(key);
1518         if (start + 1 + keylen + strlen(val) >= end) {
1519                 CERROR("params are too long: %s %s%s\n",
1520                        params, key != NULL ? key : "", val);
1521                 return -EINVAL;
1522         }
1523
1524         sprintf(start, " %s%s", key != NULL ? key : "", val);
1525         return 0;
1526 }
1527
1528 /**
1529  * Walk through client config log record and convert the related records
1530  * into the target.
1531  **/
1532 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1533                                          struct llog_handle *llh,
1534                                          struct llog_rec_hdr *rec, void *data)
1535 {
1536         struct mgs_device *mgs;
1537         struct obd_device *obd;
1538         struct mgs_target_info *mti, *tmti;
1539         struct fs_db *fsdb;
1540         int cfg_len = rec->lrh_len;
1541         char *cfg_buf = (char*) (rec + 1);
1542         struct lustre_cfg *lcfg;
1543         int rc = 0;
1544         struct llog_handle *mdt_llh = NULL;
1545         static int got_an_osc_or_mdc = 0;
1546         /* 0: not found any osc/mdc;
1547            1: found osc;
1548            2: found mdc;
1549         */
1550         static int last_step = -1;
1551         int cplen = 0;
1552
1553         ENTRY;
1554
1555         mti = ((struct temp_comp*)data)->comp_mti;
1556         tmti = ((struct temp_comp*)data)->comp_tmti;
1557         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1558         obd = ((struct temp_comp *)data)->comp_obd;
1559         mgs = lu2mgs_dev(obd->obd_lu_dev);
1560         LASSERT(mgs);
1561
1562         if (rec->lrh_type != OBD_CFG_REC) {
1563                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1564                 RETURN(-EINVAL);
1565         }
1566
1567         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1568         if (rc) {
1569                 CERROR("Insane cfg\n");
1570                 RETURN(rc);
1571         }
1572
1573         lcfg = (struct lustre_cfg *)cfg_buf;
1574
1575         if (lcfg->lcfg_command == LCFG_MARKER) {
1576                 struct cfg_marker *marker;
1577                 marker = lustre_cfg_buf(lcfg, 1);
1578                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1579                     (marker->cm_flags & CM_START) &&
1580                      !(marker->cm_flags & CM_SKIP)) {
1581                         got_an_osc_or_mdc = 1;
1582                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1583                                         sizeof(tmti->mti_svname));
1584                         if (cplen >= sizeof(tmti->mti_svname))
1585                                 RETURN(-E2BIG);
1586                         rc = record_start_log(env, mgs, &mdt_llh,
1587                                               mti->mti_svname);
1588                         if (rc)
1589                                 RETURN(rc);
1590                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1591                                            mti->mti_svname, "add osc(copied)");
1592                         record_end_log(env, &mdt_llh);
1593                         last_step = marker->cm_step;
1594                         RETURN(rc);
1595                 }
1596                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1597                     (marker->cm_flags & CM_END) &&
1598                      !(marker->cm_flags & CM_SKIP)) {
1599                         LASSERT(last_step == marker->cm_step);
1600                         last_step = -1;
1601                         got_an_osc_or_mdc = 0;
1602                         memset(tmti, 0, sizeof(*tmti));
1603                         rc = record_start_log(env, mgs, &mdt_llh,
1604                                               mti->mti_svname);
1605                         if (rc)
1606                                 RETURN(rc);
1607                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1608                                            mti->mti_svname, "add osc(copied)");
1609                         record_end_log(env, &mdt_llh);
1610                         RETURN(rc);
1611                 }
1612                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1613                     (marker->cm_flags & CM_START) &&
1614                      !(marker->cm_flags & CM_SKIP)) {
1615                         got_an_osc_or_mdc = 2;
1616                         last_step = marker->cm_step;
1617                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1618                                strlen(marker->cm_tgtname));
1619
1620                         RETURN(rc);
1621                 }
1622                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1623                     (marker->cm_flags & CM_END) &&
1624                      !(marker->cm_flags & CM_SKIP)) {
1625                         LASSERT(last_step == marker->cm_step);
1626                         last_step = -1;
1627                         got_an_osc_or_mdc = 0;
1628                         memset(tmti, 0, sizeof(*tmti));
1629                         RETURN(rc);
1630                 }
1631         }
1632
1633         if (got_an_osc_or_mdc == 0 || last_step < 0)
1634                 RETURN(rc);
1635
1636         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1637                 uint64_t nodenid = lcfg->lcfg_nid;
1638
1639                 if (strlen(tmti->mti_uuid) == 0) {
1640                         /* target uuid not set, this config record is before
1641                          * LCFG_SETUP, this nid is one of target node nid.
1642                          */
1643                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1644                         tmti->mti_nid_count++;
1645                 } else {
1646                         /* failover node nid */
1647                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1648                                        libcfs_nid2str(nodenid));
1649                 }
1650
1651                 RETURN(rc);
1652         }
1653
1654         if (lcfg->lcfg_command == LCFG_SETUP) {
1655                 char *target;
1656
1657                 target = lustre_cfg_string(lcfg, 1);
1658                 memcpy(tmti->mti_uuid, target, strlen(target));
1659                 RETURN(rc);
1660         }
1661
1662         /* ignore client side sptlrpc_conf_log */
1663         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1664                 RETURN(rc);
1665
1666         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1667                 int index;
1668
1669                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1670                         RETURN (-EINVAL);
1671
1672                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1673                        strlen(mti->mti_fsname));
1674                 tmti->mti_stripe_index = index;
1675
1676                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1677                                               mti->mti_stripe_index,
1678                                               mti->mti_svname);
1679                 memset(tmti, 0, sizeof(*tmti));
1680                 RETURN(rc);
1681         }
1682
1683         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1684                 int index;
1685                 char mdt_index[9];
1686                 char *logname, *lovname;
1687
1688                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1689                                              mti->mti_stripe_index);
1690                 if (rc)
1691                         RETURN(rc);
1692                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1693
1694                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1695                         name_destroy(&logname);
1696                         name_destroy(&lovname);
1697                         RETURN(-EINVAL);
1698                 }
1699
1700                 tmti->mti_stripe_index = index;
1701                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1702                                          mdt_index, lovname,
1703                                          LUSTRE_SP_MDT, 0);
1704                 name_destroy(&logname);
1705                 name_destroy(&lovname);
1706                 RETURN(rc);
1707         }
1708         RETURN(rc);
1709 }
1710
1711 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1712 /* stealed from mgs_get_fsdb_from_llog*/
1713 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1714                                               struct mgs_device *mgs,
1715                                               char *client_name,
1716                                               struct temp_comp* comp)
1717 {
1718         struct llog_handle *loghandle;
1719         struct mgs_target_info *tmti;
1720         struct llog_ctxt *ctxt;
1721         int rc;
1722
1723         ENTRY;
1724
1725         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1726         LASSERT(ctxt != NULL);
1727
1728         OBD_ALLOC_PTR(tmti);
1729         if (tmti == NULL)
1730                 GOTO(out_ctxt, rc = -ENOMEM);
1731
1732         comp->comp_tmti = tmti;
1733         comp->comp_obd = mgs->mgs_obd;
1734
1735         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1736                        LLOG_OPEN_EXISTS);
1737         if (rc < 0) {
1738                 if (rc == -ENOENT)
1739                         rc = 0;
1740                 GOTO(out_pop, rc);
1741         }
1742
1743         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1744         if (rc)
1745                 GOTO(out_close, rc);
1746
1747         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1748                                   (void *)comp, NULL, false);
1749         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1750 out_close:
1751         llog_close(env, loghandle);
1752 out_pop:
1753         OBD_FREE_PTR(tmti);
1754 out_ctxt:
1755         llog_ctxt_put(ctxt);
1756         RETURN(rc);
1757 }
1758
1759 /* lmv is the second thing for client logs */
1760 /* copied from mgs_write_log_lov. Please refer to that.  */
1761 static int mgs_write_log_lmv(const struct lu_env *env,
1762                              struct mgs_device *mgs,
1763                              struct fs_db *fsdb,
1764                              struct mgs_target_info *mti,
1765                              char *logname, char *lmvname)
1766 {
1767         struct llog_handle *llh = NULL;
1768         struct lmv_desc *lmvdesc;
1769         char *uuid;
1770         int rc = 0;
1771         ENTRY;
1772
1773         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1774
1775         OBD_ALLOC_PTR(lmvdesc);
1776         if (lmvdesc == NULL)
1777                 RETURN(-ENOMEM);
1778         lmvdesc->ld_active_tgt_count = 0;
1779         lmvdesc->ld_tgt_count = 0;
1780         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1781         uuid = (char *)lmvdesc->ld_uuid.uuid;
1782
1783         rc = record_start_log(env, mgs, &llh, logname);
1784         if (rc)
1785                 GOTO(out_free, rc);
1786         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1787         if (rc)
1788                 GOTO(out_end, rc);
1789         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1790         if (rc)
1791                 GOTO(out_end, rc);
1792         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1793         if (rc)
1794                 GOTO(out_end, rc);
1795         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1796         if (rc)
1797                 GOTO(out_end, rc);
1798 out_end:
1799         record_end_log(env, &llh);
1800 out_free:
1801         OBD_FREE_PTR(lmvdesc);
1802         RETURN(rc);
1803 }
1804
1805 /* lov is the first thing in the mdt and client logs */
1806 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1807                              struct fs_db *fsdb, struct mgs_target_info *mti,
1808                              char *logname, char *lovname)
1809 {
1810         struct llog_handle *llh = NULL;
1811         struct lov_desc *lovdesc;
1812         char *uuid;
1813         int rc = 0;
1814         ENTRY;
1815
1816         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1817
1818         /*
1819         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1820         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1821               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1822         */
1823
1824         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1825         OBD_ALLOC_PTR(lovdesc);
1826         if (lovdesc == NULL)
1827                 RETURN(-ENOMEM);
1828         lovdesc->ld_magic = LOV_DESC_MAGIC;
1829         lovdesc->ld_tgt_count = 0;
1830         /* Defaults.  Can be changed later by lcfg config_param */
1831         lovdesc->ld_default_stripe_count = 1;
1832         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1833         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1834         lovdesc->ld_default_stripe_offset = -1;
1835         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1836         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1837         /* can these be the same? */
1838         uuid = (char *)lovdesc->ld_uuid.uuid;
1839
1840         /* This should always be the first entry in a log.
1841         rc = mgs_clear_log(obd, logname); */
1842         rc = record_start_log(env, mgs, &llh, logname);
1843         if (rc)
1844                 GOTO(out_free, rc);
1845         /* FIXME these should be a single journal transaction */
1846         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1847         if (rc)
1848                 GOTO(out_end, rc);
1849         rc = record_attach(env, llh, lovname, "lov", uuid);
1850         if (rc)
1851                 GOTO(out_end, rc);
1852         rc = record_lov_setup(env, llh, lovname, lovdesc);
1853         if (rc)
1854                 GOTO(out_end, rc);
1855         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1856         if (rc)
1857                 GOTO(out_end, rc);
1858         EXIT;
1859 out_end:
1860         record_end_log(env, &llh);
1861 out_free:
1862         OBD_FREE_PTR(lovdesc);
1863         return rc;
1864 }
1865
1866 /* add failnids to open log */
1867 static int mgs_write_log_failnids(const struct lu_env *env,
1868                                   struct mgs_target_info *mti,
1869                                   struct llog_handle *llh,
1870                                   char *cliname)
1871 {
1872         char *failnodeuuid = NULL;
1873         char *ptr = mti->mti_params;
1874         lnet_nid_t nid;
1875         int rc = 0;
1876
1877         /*
1878         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1879         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1880         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1881         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1882         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1883         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1884         */
1885
1886         /*
1887          * Pull failnid info out of params string, which may contain something
1888          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1889          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1890          * etc.  However, convert_hostnames() should have caught those.
1891          */
1892         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1893                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1894                         if (failnodeuuid == NULL) {
1895                                 /* We don't know the failover node name,
1896                                    so just use the first nid as the uuid */
1897                                 rc = name_create(&failnodeuuid,
1898                                                  libcfs_nid2str(nid), "");
1899                                 if (rc)
1900                                         return rc;
1901                         }
1902                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1903                                "client %s\n", libcfs_nid2str(nid),
1904                                failnodeuuid, cliname);
1905                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1906                         /*
1907                          * If *ptr is ':', we have added all NIDs for
1908                          * failnodeuuid.
1909                          */
1910                         if (*ptr == ':') {
1911                                 rc = record_add_conn(env, llh, cliname,
1912                                                      failnodeuuid);
1913                                 name_destroy(&failnodeuuid);
1914                                 failnodeuuid = NULL;
1915                         }
1916                 }
1917                 if (failnodeuuid) {
1918                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1919                         name_destroy(&failnodeuuid);
1920                         failnodeuuid = NULL;
1921                 }
1922         }
1923
1924         return rc;
1925 }
1926
1927 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1928                                     struct mgs_device *mgs,
1929                                     struct fs_db *fsdb,
1930                                     struct mgs_target_info *mti,
1931                                     char *logname, char *lmvname)
1932 {
1933         struct llog_handle *llh = NULL;
1934         char *mdcname = NULL;
1935         char *nodeuuid = NULL;
1936         char *mdcuuid = NULL;
1937         char *lmvuuid = NULL;
1938         char index[6];
1939         int i, rc;
1940         ENTRY;
1941
1942         if (mgs_log_is_empty(env, mgs, logname)) {
1943                 CERROR("log is empty! Logical error\n");
1944                 RETURN(-EINVAL);
1945         }
1946
1947         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1948                mti->mti_svname, logname, lmvname);
1949
1950         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1951         if (rc)
1952                 RETURN(rc);
1953         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1954         if (rc)
1955                 GOTO(out_free, rc);
1956         rc = name_create(&mdcuuid, mdcname, "_UUID");
1957         if (rc)
1958                 GOTO(out_free, rc);
1959         rc = name_create(&lmvuuid, lmvname, "_UUID");
1960         if (rc)
1961                 GOTO(out_free, rc);
1962
1963         rc = record_start_log(env, mgs, &llh, logname);
1964         if (rc)
1965                 GOTO(out_free, rc);
1966         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1967                            "add mdc");
1968         if (rc)
1969                 GOTO(out_end, rc);
1970         for (i = 0; i < mti->mti_nid_count; i++) {
1971                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1972                        libcfs_nid2str(mti->mti_nids[i]));
1973
1974                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1975                 if (rc)
1976                         GOTO(out_end, rc);
1977         }
1978
1979         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1980         if (rc)
1981                 GOTO(out_end, rc);
1982         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1983         if (rc)
1984                 GOTO(out_end, rc);
1985         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1986         if (rc)
1987                 GOTO(out_end, rc);
1988         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1989         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1990                             index, "1");
1991         if (rc)
1992                 GOTO(out_end, rc);
1993         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1994                            "add mdc");
1995         if (rc)
1996                 GOTO(out_end, rc);
1997 out_end:
1998         record_end_log(env, &llh);
1999 out_free:
2000         name_destroy(&lmvuuid);
2001         name_destroy(&mdcuuid);
2002         name_destroy(&mdcname);
2003         name_destroy(&nodeuuid);
2004         RETURN(rc);
2005 }
2006
2007 static inline int name_create_lov(char **lovname, char *mdtname,
2008                                   struct fs_db *fsdb, int index)
2009 {
2010         /* COMPAT_180 */
2011         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2012                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2013         else
2014                 return name_create(lovname, mdtname, "-mdtlov");
2015 }
2016
2017 static int name_create_mdt_and_lov(char **logname, char **lovname,
2018                                    struct fs_db *fsdb, int i)
2019 {
2020         int rc;
2021
2022         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2023         if (rc)
2024                 return rc;
2025         /* COMPAT_180 */
2026         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2027                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2028         else
2029                 rc = name_create(lovname, *logname, "-mdtlov");
2030         if (rc) {
2031                 name_destroy(logname);
2032                 *logname = NULL;
2033         }
2034         return rc;
2035 }
2036
2037 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2038                                       struct fs_db *fsdb, int i)
2039 {
2040         char suffix[16];
2041
2042         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2043                 sprintf(suffix, "-osc");
2044         else
2045                 sprintf(suffix, "-osc-MDT%04x", i);
2046         return name_create(oscname, ostname, suffix);
2047 }
2048
2049 /* add new mdc to already existent MDS */
2050 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2051                                     struct mgs_device *mgs,
2052                                     struct fs_db *fsdb,
2053                                     struct mgs_target_info *mti,
2054                                     int mdt_index, char *logname)
2055 {
2056         struct llog_handle      *llh = NULL;
2057         char    *nodeuuid = NULL;
2058         char    *ospname = NULL;
2059         char    *lovuuid = NULL;
2060         char    *mdtuuid = NULL;
2061         char    *svname = NULL;
2062         char    *mdtname = NULL;
2063         char    *lovname = NULL;
2064         char    index_str[16];
2065         int     i, rc;
2066
2067         ENTRY;
2068         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2069                 CERROR("log is empty! Logical error\n");
2070                 RETURN (-EINVAL);
2071         }
2072
2073         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2074                logname);
2075
2076         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2077         if (rc)
2078                 RETURN(rc);
2079
2080         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2081         if (rc)
2082                 GOTO(out_destory, rc);
2083
2084         rc = name_create(&svname, mdtname, "-osp");
2085         if (rc)
2086                 GOTO(out_destory, rc);
2087
2088         sprintf(index_str, "-MDT%04x", mdt_index);
2089         rc = name_create(&ospname, svname, index_str);
2090         if (rc)
2091                 GOTO(out_destory, rc);
2092
2093         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2094         if (rc)
2095                 GOTO(out_destory, rc);
2096
2097         rc = name_create(&lovuuid, lovname, "_UUID");
2098         if (rc)
2099                 GOTO(out_destory, rc);
2100
2101         rc = name_create(&mdtuuid, mdtname, "_UUID");
2102         if (rc)
2103                 GOTO(out_destory, rc);
2104
2105         rc = record_start_log(env, mgs, &llh, logname);
2106         if (rc)
2107                 GOTO(out_destory, rc);
2108
2109         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2110                            "add osp");
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         for (i = 0; i < mti->mti_nid_count; i++) {
2115                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2116                        libcfs_nid2str(mti->mti_nids[i]));
2117                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2118                 if (rc)
2119                         GOTO(out_end, rc);
2120         }
2121
2122         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2123         if (rc)
2124                 GOTO(out_end, rc);
2125
2126         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2127                           NULL, NULL);
2128         if (rc)
2129                 GOTO(out_end, rc);
2130
2131         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2132         if (rc)
2133                 GOTO(out_end, rc);
2134
2135         /* Add mdc(osp) to lod */
2136         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2137                  mti->mti_stripe_index);
2138         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2139                          index_str, "1", NULL);
2140         if (rc)
2141                 GOTO(out_end, rc);
2142
2143         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2144         if (rc)
2145                 GOTO(out_end, rc);
2146
2147 out_end:
2148         record_end_log(env, &llh);
2149
2150 out_destory:
2151         name_destroy(&mdtuuid);
2152         name_destroy(&lovuuid);
2153         name_destroy(&lovname);
2154         name_destroy(&ospname);
2155         name_destroy(&svname);
2156         name_destroy(&nodeuuid);
2157         name_destroy(&mdtname);
2158         RETURN(rc);
2159 }
2160
2161 static int mgs_write_log_mdt0(const struct lu_env *env,
2162                               struct mgs_device *mgs,
2163                               struct fs_db *fsdb,
2164                               struct mgs_target_info *mti)
2165 {
2166         char *log = mti->mti_svname;
2167         struct llog_handle *llh = NULL;
2168         char *uuid, *lovname;
2169         char mdt_index[6];
2170         char *ptr = mti->mti_params;
2171         int rc = 0, failout = 0;
2172         ENTRY;
2173
2174         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2175         if (uuid == NULL)
2176                 RETURN(-ENOMEM);
2177
2178         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2179                 failout = (strncmp(ptr, "failout", 7) == 0);
2180
2181         rc = name_create(&lovname, log, "-mdtlov");
2182         if (rc)
2183                 GOTO(out_free, rc);
2184         if (mgs_log_is_empty(env, mgs, log)) {
2185                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2186                 if (rc)
2187                         GOTO(out_lod, rc);
2188         }
2189
2190         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2191
2192         rc = record_start_log(env, mgs, &llh, log);
2193         if (rc)
2194                 GOTO(out_lod, rc);
2195
2196         /* add MDT itself */
2197
2198         /* FIXME this whole fn should be a single journal transaction */
2199         sprintf(uuid, "%s_UUID", log);
2200         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2201         if (rc)
2202                 GOTO(out_lod, rc);
2203         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2204         if (rc)
2205                 GOTO(out_end, rc);
2206         rc = record_mount_opt(env, llh, log, lovname, NULL);
2207         if (rc)
2208                 GOTO(out_end, rc);
2209         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2210                         failout ? "n" : "f");
2211         if (rc)
2212                 GOTO(out_end, rc);
2213         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2214         if (rc)
2215                 GOTO(out_end, rc);
2216 out_end:
2217         record_end_log(env, &llh);
2218 out_lod:
2219         name_destroy(&lovname);
2220 out_free:
2221         OBD_FREE(uuid, sizeof(struct obd_uuid));
2222         RETURN(rc);
2223 }
2224
2225 /* envelope method for all layers log */
2226 static int mgs_write_log_mdt(const struct lu_env *env,
2227                              struct mgs_device *mgs,
2228                              struct fs_db *fsdb,
2229                              struct mgs_target_info *mti)
2230 {
2231         struct mgs_thread_info *mgi = mgs_env_info(env);
2232         struct llog_handle *llh = NULL;
2233         char *cliname;
2234         int rc, i = 0;
2235         ENTRY;
2236
2237         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2238
2239         if (mti->mti_uuid[0] == '\0') {
2240                 /* Make up our own uuid */
2241                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2242                          "%s_UUID", mti->mti_svname);
2243         }
2244
2245         /* add mdt */
2246         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2247         if (rc)
2248                 RETURN(rc);
2249         /* Append the mdt info to the client log */
2250         rc = name_create(&cliname, mti->mti_fsname, "-client");
2251         if (rc)
2252                 RETURN(rc);
2253
2254         if (mgs_log_is_empty(env, mgs, cliname)) {
2255                 /* Start client log */
2256                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2257                                        fsdb->fsdb_clilov);
2258                 if (rc)
2259                         GOTO(out_free, rc);
2260                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2261                                        fsdb->fsdb_clilmv);
2262                 if (rc)
2263                         GOTO(out_free, rc);
2264         }
2265
2266         /*
2267         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2268         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2269         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2270         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2271         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2272         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2273         */
2274
2275                 /* copy client info about lov/lmv */
2276                 mgi->mgi_comp.comp_mti = mti;
2277                 mgi->mgi_comp.comp_fsdb = fsdb;
2278
2279                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2280                                                         &mgi->mgi_comp);
2281                 if (rc)
2282                         GOTO(out_free, rc);
2283                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2284                                               fsdb->fsdb_clilmv);
2285                 if (rc)
2286                         GOTO(out_free, rc);
2287
2288                 /* add mountopts */
2289                 rc = record_start_log(env, mgs, &llh, cliname);
2290                 if (rc)
2291                         GOTO(out_free, rc);
2292
2293                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2294                                    "mount opts");
2295                 if (rc)
2296                         GOTO(out_end, rc);
2297                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2298                                       fsdb->fsdb_clilmv);
2299                 if (rc)
2300                         GOTO(out_end, rc);
2301                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2302                                    "mount opts");
2303
2304         if (rc)
2305                 GOTO(out_end, rc);
2306
2307         /* for_all_existing_mdt except current one */
2308         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2309                 if (i !=  mti->mti_stripe_index &&
2310                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2311                         char *logname;
2312
2313                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2314                         if (rc)
2315                                 GOTO(out_end, rc);
2316
2317                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2318                                                       i, logname);
2319                         name_destroy(&logname);
2320                         if (rc)
2321                                 GOTO(out_end, rc);
2322                 }
2323         }
2324 out_end:
2325         record_end_log(env, &llh);
2326 out_free:
2327         name_destroy(&cliname);
2328         RETURN(rc);
2329 }
2330
2331 /* Add the ost info to the client/mdt lov */
2332 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2333                                     struct mgs_device *mgs, struct fs_db *fsdb,
2334                                     struct mgs_target_info *mti,
2335                                     char *logname, char *suffix, char *lovname,
2336                                     enum lustre_sec_part sec_part, int flags)
2337 {
2338         struct llog_handle *llh = NULL;
2339         char *nodeuuid = NULL;
2340         char *oscname = NULL;
2341         char *oscuuid = NULL;
2342         char *lovuuid = NULL;
2343         char *svname = NULL;
2344         char index[6];
2345         int i, rc;
2346
2347         ENTRY;
2348         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2349                mti->mti_svname, logname);
2350
2351         if (mgs_log_is_empty(env, mgs, logname)) {
2352                 CERROR("log is empty! Logical error\n");
2353                 RETURN (-EINVAL);
2354         }
2355
2356         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2357         if (rc)
2358                 RETURN(rc);
2359         rc = name_create(&svname, mti->mti_svname, "-osc");
2360         if (rc)
2361                 GOTO(out_free, rc);
2362
2363         /* for the system upgraded from old 1.8, keep using the old osc naming
2364          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2365         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2366                 rc = name_create(&oscname, svname, "");
2367         else
2368                 rc = name_create(&oscname, svname, suffix);
2369         if (rc)
2370                 GOTO(out_free, rc);
2371
2372         rc = name_create(&oscuuid, oscname, "_UUID");
2373         if (rc)
2374                 GOTO(out_free, rc);
2375         rc = name_create(&lovuuid, lovname, "_UUID");
2376         if (rc)
2377                 GOTO(out_free, rc);
2378
2379
2380         /*
2381         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2382         multihomed (#4)
2383         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2384         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2385         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2386         failover (#6,7)
2387         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2388         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2389         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2390         */
2391
2392         rc = record_start_log(env, mgs, &llh, logname);
2393         if (rc)
2394                 GOTO(out_free, rc);
2395
2396         /* FIXME these should be a single journal transaction */
2397         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2398                            "add osc");
2399         if (rc)
2400                 GOTO(out_end, rc);
2401
2402         /* NB: don't change record order, because upon MDT steal OSC config
2403          * from client, it treats all nids before LCFG_SETUP as target nids
2404          * (multiple interfaces), while nids after as failover node nids.
2405          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2406          */
2407         for (i = 0; i < mti->mti_nid_count; i++) {
2408                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2409                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2410                 if (rc)
2411                         GOTO(out_end, rc);
2412         }
2413         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2414         if (rc)
2415                 GOTO(out_end, rc);
2416         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2417         if (rc)
2418                 GOTO(out_end, rc);
2419         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2420         if (rc)
2421                 GOTO(out_end, rc);
2422
2423         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2424
2425         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2426         if (rc)
2427                 GOTO(out_end, rc);
2428         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2429                            "add osc");
2430         if (rc)
2431                 GOTO(out_end, rc);
2432 out_end:
2433         record_end_log(env, &llh);
2434 out_free:
2435         name_destroy(&lovuuid);
2436         name_destroy(&oscuuid);
2437         name_destroy(&oscname);
2438         name_destroy(&svname);
2439         name_destroy(&nodeuuid);
2440         RETURN(rc);
2441 }
2442
2443 static int mgs_write_log_ost(const struct lu_env *env,
2444                              struct mgs_device *mgs, struct fs_db *fsdb,
2445                              struct mgs_target_info *mti)
2446 {
2447         struct llog_handle *llh = NULL;
2448         char *logname, *lovname;
2449         char *ptr = mti->mti_params;
2450         int rc, flags = 0, failout = 0, i;
2451         ENTRY;
2452
2453         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2454
2455         /* The ost startup log */
2456
2457         /* If the ost log already exists, that means that someone reformatted
2458            the ost and it called target_add again. */
2459         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2460                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2461                                    "exists, yet the server claims it never "
2462                                    "registered. It may have been reformatted, "
2463                                    "or the index changed. writeconf the MDT to "
2464                                    "regenerate all logs.\n", mti->mti_svname);
2465                 RETURN(-EALREADY);
2466         }
2467
2468         /*
2469         attach obdfilter ost1 ost1_UUID
2470         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2471         */
2472         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2473                 failout = (strncmp(ptr, "failout", 7) == 0);
2474         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2475         if (rc)
2476                 RETURN(rc);
2477         /* FIXME these should be a single journal transaction */
2478         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2479         if (rc)
2480                 GOTO(out_end, rc);
2481         if (*mti->mti_uuid == '\0')
2482                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2483                          "%s_UUID", mti->mti_svname);
2484         rc = record_attach(env, llh, mti->mti_svname,
2485                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2486         if (rc)
2487                 GOTO(out_end, rc);
2488         rc = record_setup(env, llh, mti->mti_svname,
2489                           "dev"/*ignored*/, "type"/*ignored*/,
2490                           failout ? "n" : "f", 0/*options*/);
2491         if (rc)
2492                 GOTO(out_end, rc);
2493         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2494         if (rc)
2495                 GOTO(out_end, rc);
2496 out_end:
2497         record_end_log(env, &llh);
2498         if (rc)
2499                 RETURN(rc);
2500         /* We also have to update the other logs where this osc is part of
2501            the lov */
2502
2503         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2504                 /* If we're upgrading, the old mdt log already has our
2505                    entry. Let's do a fake one for fun. */
2506                 /* Note that we can't add any new failnids, since we don't
2507                    know the old osc names. */
2508                 flags = CM_SKIP | CM_UPGRADE146;
2509
2510         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2511                 /* If the update flag isn't set, don't update client/mdt
2512                    logs. */
2513                 flags |= CM_SKIP;
2514                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2515                               "the MDT first to regenerate it.\n",
2516                               mti->mti_svname);
2517         }
2518
2519         /* Add ost to all MDT lov defs */
2520         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2521                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2522                         char mdt_index[9];
2523
2524                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2525                                                      i);
2526                         if (rc)
2527                                 RETURN(rc);
2528                         sprintf(mdt_index, "-MDT%04x", i);
2529                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2530                                                       logname, mdt_index,
2531                                                       lovname, LUSTRE_SP_MDT,
2532                                                       flags);
2533                         name_destroy(&logname);
2534                         name_destroy(&lovname);
2535                         if (rc)
2536                                 RETURN(rc);
2537                 }
2538         }
2539
2540         /* Append ost info to the client log */
2541         rc = name_create(&logname, mti->mti_fsname, "-client");
2542         if (rc)
2543                 RETURN(rc);
2544         if (mgs_log_is_empty(env, mgs, logname)) {
2545                 /* Start client log */
2546                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2547                                        fsdb->fsdb_clilov);
2548                 if (rc)
2549                         GOTO(out_free, rc);
2550                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2551                                        fsdb->fsdb_clilmv);
2552                 if (rc)
2553                         GOTO(out_free, rc);
2554         }
2555         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2556                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2557 out_free:
2558         name_destroy(&logname);
2559         RETURN(rc);
2560 }
2561
2562 static __inline__ int mgs_param_empty(char *ptr)
2563 {
2564         char *tmp;
2565
2566         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2567                 return 1;
2568         return 0;
2569 }
2570
2571 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2572                                           struct mgs_device *mgs,
2573                                           struct fs_db *fsdb,
2574                                           struct mgs_target_info *mti,
2575                                           char *logname, char *cliname)
2576 {
2577         int rc;
2578         struct llog_handle *llh = NULL;
2579
2580         if (mgs_param_empty(mti->mti_params)) {
2581                 /* Remove _all_ failnids */
2582                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2583                                 mti->mti_svname, "add failnid", CM_SKIP);
2584                 return rc < 0 ? rc : 0;
2585         }
2586
2587         /* Otherwise failover nids are additive */
2588         rc = record_start_log(env, mgs, &llh, logname);
2589         if (rc)
2590                 return rc;
2591                 /* FIXME this should be a single journal transaction */
2592         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2593                            "add failnid");
2594         if (rc)
2595                 goto out_end;
2596         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2597         if (rc)
2598                 goto out_end;
2599         rc = record_marker(env, llh, fsdb, CM_END,
2600                            mti->mti_svname, "add failnid");
2601 out_end:
2602         record_end_log(env, &llh);
2603         return rc;
2604 }
2605
2606
2607 /* Add additional failnids to an existing log.
2608    The mdc/osc must have been added to logs first */
2609 /* tcp nids must be in dotted-quad ascii -
2610    we can't resolve hostnames from the kernel. */
2611 static int mgs_write_log_add_failnid(const struct lu_env *env,
2612                                      struct mgs_device *mgs,
2613                                      struct fs_db *fsdb,
2614                                      struct mgs_target_info *mti)
2615 {
2616         char *logname, *cliname;
2617         int rc;
2618         ENTRY;
2619
2620         /* FIXME we currently can't erase the failnids
2621          * given when a target first registers, since they aren't part of
2622          * an "add uuid" stanza */
2623
2624         /* Verify that we know about this target */
2625         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2626                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2627                                    "yet. It must be started before failnids "
2628                                    "can be added.\n", mti->mti_svname);
2629                 RETURN(-ENOENT);
2630         }
2631
2632         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2633         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2634                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2635         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2636                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2637         } else {
2638                 RETURN(-EINVAL);
2639         }
2640         if (rc)
2641                 RETURN(rc);
2642         /* Add failover nids to the client log */
2643         rc = name_create(&logname, mti->mti_fsname, "-client");
2644         if (rc) {
2645                 name_destroy(&cliname);
2646                 RETURN(rc);
2647         }
2648         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2649         name_destroy(&logname);
2650         name_destroy(&cliname);
2651         if (rc)
2652                 RETURN(rc);
2653
2654         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2655                 /* Add OST failover nids to the MDT logs as well */
2656                 int i;
2657
2658                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2659                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2660                                 continue;
2661                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2662                         if (rc)
2663                                 RETURN(rc);
2664                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2665                                                  fsdb, i);
2666                         if (rc) {
2667                                 name_destroy(&logname);
2668                                 RETURN(rc);
2669                         }
2670                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2671                                                             mti, logname,
2672                                                             cliname);
2673                         name_destroy(&cliname);
2674                         name_destroy(&logname);
2675                         if (rc)
2676                                 RETURN(rc);
2677                 }
2678         }
2679
2680         RETURN(rc);
2681 }
2682
2683 static int mgs_wlp_lcfg(const struct lu_env *env,
2684                         struct mgs_device *mgs, struct fs_db *fsdb,
2685                         struct mgs_target_info *mti,
2686                         char *logname, struct lustre_cfg_bufs *bufs,
2687                         char *tgtname, char *ptr)
2688 {
2689         char comment[MTI_NAME_MAXLEN];
2690         char *tmp;
2691         struct llog_cfg_rec *lcr;
2692         int rc, del;
2693
2694         /* Erase any old settings of this same parameter */
2695         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2696         comment[MTI_NAME_MAXLEN - 1] = 0;
2697         /* But don't try to match the value. */
2698         tmp = strchr(comment, '=');
2699         if (tmp != NULL)
2700                 *tmp = 0;
2701         /* FIXME we should skip settings that are the same as old values */
2702         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2703         if (rc < 0)
2704                 return rc;
2705         del = mgs_param_empty(ptr);
2706
2707         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2708                       "Setting" : "Modifying", tgtname, comment, logname);
2709         if (del) {
2710                 /* mgs_modify() will return 1 if nothing had to be done */
2711                 if (rc == 1)
2712                         rc = 0;
2713                 return rc;
2714         }
2715
2716         lustre_cfg_bufs_reset(bufs, tgtname);
2717         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2718         if (mti->mti_flags & LDD_F_PARAM2)
2719                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2720
2721         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2722                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2723         if (lcr == NULL)
2724                 return -ENOMEM;
2725
2726         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2727                                   comment);
2728         lustre_cfg_rec_free(lcr);
2729         return rc;
2730 }
2731
2732 static int mgs_write_log_param2(const struct lu_env *env,
2733                                 struct mgs_device *mgs,
2734                                 struct fs_db *fsdb,
2735                                 struct mgs_target_info *mti, char *ptr)
2736 {
2737         struct lustre_cfg_bufs  bufs;
2738         int                     rc = 0;
2739         ENTRY;
2740
2741         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2742         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2743                           mti->mti_svname, ptr);
2744
2745         RETURN(rc);
2746 }
2747
2748 /* write global variable settings into log */
2749 static int mgs_write_log_sys(const struct lu_env *env,
2750                              struct mgs_device *mgs, struct fs_db *fsdb,
2751                              struct mgs_target_info *mti, char *sys, char *ptr)
2752 {
2753         struct mgs_thread_info  *mgi = mgs_env_info(env);
2754         struct lustre_cfg       *lcfg;
2755         struct llog_cfg_rec     *lcr;
2756         char *tmp, sep;
2757         int rc, cmd, convert = 1;
2758
2759         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2760                 cmd = LCFG_SET_TIMEOUT;
2761         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2762                 cmd = LCFG_SET_LDLM_TIMEOUT;
2763         /* Check for known params here so we can return error to lctl */
2764         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2765                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2766                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2767                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2768                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2769                 cmd = LCFG_PARAM;
2770         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2771                 convert = 0; /* Don't convert string value to integer */
2772                 cmd = LCFG_PARAM;
2773         } else {
2774                 return -EINVAL;
2775         }
2776
2777         if (mgs_param_empty(ptr))
2778                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2779         else
2780                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2781
2782         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2783         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2784         if (!convert && *tmp != '\0')
2785                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2786         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2787         if (lcr == NULL)
2788                 return -ENOMEM;
2789
2790         lcfg = &lcr->lcr_cfg;
2791         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2792         /* truncate the comment to the parameter name */
2793         ptr = tmp - 1;
2794         sep = *ptr;
2795         *ptr = '\0';
2796         /* modify all servers and clients */
2797         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2798                                       *tmp == '\0' ? NULL : lcr,
2799                                       mti->mti_fsname, sys, 0);
2800         if (rc == 0 && *tmp != '\0') {
2801                 switch (cmd) {
2802                 case LCFG_SET_TIMEOUT:
2803                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2804                                 class_process_config(lcfg);
2805                         break;
2806                 case LCFG_SET_LDLM_TIMEOUT:
2807                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2808                                 class_process_config(lcfg);
2809                         break;
2810                 default:
2811                         break;
2812                 }
2813         }
2814         *ptr = sep;
2815         lustre_cfg_rec_free(lcr);
2816         return rc;
2817 }
2818
2819 /* write quota settings into log */
2820 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2821                                struct fs_db *fsdb, struct mgs_target_info *mti,
2822                                char *quota, char *ptr)
2823 {
2824         struct mgs_thread_info  *mgi = mgs_env_info(env);
2825         struct llog_cfg_rec     *lcr;
2826         char                    *tmp;
2827         char                     sep;
2828         int                      rc, cmd = LCFG_PARAM;
2829
2830         /* support only 'meta' and 'data' pools so far */
2831         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2832             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2833                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2834                        "& quota.ost are)\n", ptr);
2835                 return -EINVAL;
2836         }
2837
2838         if (*tmp == '\0') {
2839                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2840         } else {
2841                 CDEBUG(D_MGS, "global '%s'\n", quota);
2842
2843                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2844                     strcmp(tmp, "none") != 0) {
2845                         CERROR("enable option(%s) isn't supported\n", tmp);
2846                         return -EINVAL;
2847                 }
2848         }
2849
2850         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2851         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2852         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2853         if (lcr == NULL)
2854                 return -ENOMEM;
2855
2856         /* truncate the comment to the parameter name */
2857         ptr = tmp - 1;
2858         sep = *ptr;
2859         *ptr = '\0';
2860
2861         /* XXX we duplicated quota enable information in all server
2862          *     config logs, it should be moved to a separate config
2863          *     log once we cleanup the config log for global param. */
2864         /* modify all servers */
2865         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2866                                       *tmp == '\0' ? NULL : lcr,
2867                                       mti->mti_fsname, quota, 1);
2868         *ptr = sep;
2869         lustre_cfg_rec_free(lcr);
2870         return rc < 0 ? rc : 0;
2871 }
2872
2873 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2874                                    struct mgs_device *mgs,
2875                                    struct fs_db *fsdb,
2876                                    struct mgs_target_info *mti,
2877                                    char *param)
2878 {
2879         struct mgs_thread_info  *mgi = mgs_env_info(env);
2880         struct llog_cfg_rec     *lcr;
2881         struct llog_handle      *llh = NULL;
2882         char                    *logname;
2883         char                    *comment, *ptr;
2884         int                      rc, len;
2885
2886         ENTRY;
2887
2888         /* get comment */
2889         ptr = strchr(param, '=');
2890         LASSERT(ptr != NULL);
2891         len = ptr - param;
2892
2893         OBD_ALLOC(comment, len + 1);
2894         if (comment == NULL)
2895                 RETURN(-ENOMEM);
2896         strncpy(comment, param, len);
2897         comment[len] = '\0';
2898
2899         /* prepare lcfg */
2900         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2901         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2902         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2903         if (lcr == NULL)
2904                 GOTO(out_comment, rc = -ENOMEM);
2905
2906         /* construct log name */
2907         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2908         if (rc < 0)
2909                 GOTO(out_lcfg, rc);
2910
2911         if (mgs_log_is_empty(env, mgs, logname)) {
2912                 rc = record_start_log(env, mgs, &llh, logname);
2913                 if (rc < 0)
2914                         GOTO(out, rc);
2915                 record_end_log(env, &llh);
2916         }
2917
2918         /* obsolete old one */
2919         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2920                         comment, CM_SKIP);
2921         if (rc < 0)
2922                 GOTO(out, rc);
2923         /* write the new one */
2924         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2925                                   mti->mti_svname, comment);
2926         if (rc)
2927                 CERROR("%s: error writing log %s: rc = %d\n",
2928                        mgs->mgs_obd->obd_name, logname, rc);
2929 out:
2930         name_destroy(&logname);
2931 out_lcfg:
2932         lustre_cfg_rec_free(lcr);
2933 out_comment:
2934         OBD_FREE(comment, len + 1);
2935         RETURN(rc);
2936 }
2937
2938 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2939                                         char *param)
2940 {
2941         char    *ptr;
2942
2943         /* disable the adjustable udesc parameter for now, i.e. use default
2944          * setting that client always ship udesc to MDT if possible. to enable
2945          * it simply remove the following line */
2946         goto error_out;
2947
2948         ptr = strchr(param, '=');
2949         if (ptr == NULL)
2950                 goto error_out;
2951         *ptr++ = '\0';
2952
2953         if (strcmp(param, PARAM_SRPC_UDESC))
2954                 goto error_out;
2955
2956         if (strcmp(ptr, "yes") == 0) {
2957                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2958                 CWARN("Enable user descriptor shipping from client to MDT\n");
2959         } else if (strcmp(ptr, "no") == 0) {
2960                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2961                 CWARN("Disable user descriptor shipping from client to MDT\n");
2962         } else {
2963                 *(ptr - 1) = '=';
2964                 goto error_out;
2965         }
2966         return 0;
2967
2968 error_out:
2969         CERROR("Invalid param: %s\n", param);
2970         return -EINVAL;
2971 }
2972
2973 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2974                                   const char *svname,
2975                                   char *param)
2976 {
2977         struct sptlrpc_rule      rule;
2978         struct sptlrpc_rule_set *rset;
2979         int                      rc;
2980         ENTRY;
2981
2982         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2983                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2984                 RETURN(-EINVAL);
2985         }
2986
2987         if (strncmp(param, PARAM_SRPC_UDESC,
2988                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2989                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2990         }
2991
2992         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2993                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2994                 RETURN(-EINVAL);
2995         }
2996
2997         param += sizeof(PARAM_SRPC_FLVR) - 1;
2998
2999         rc = sptlrpc_parse_rule(param, &rule);
3000         if (rc)
3001                 RETURN(rc);
3002
3003         /* mgs rules implies must be mgc->mgs */
3004         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3005                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3006                      rule.sr_from != LUSTRE_SP_ANY) ||
3007                     (rule.sr_to != LUSTRE_SP_MGS &&
3008                      rule.sr_to != LUSTRE_SP_ANY))
3009                         RETURN(-EINVAL);
3010         }
3011
3012         /* preapre room for this coming rule. svcname format should be:
3013          * - fsname: general rule
3014          * - fsname-tgtname: target-specific rule
3015          */
3016         if (strchr(svname, '-')) {
3017                 struct mgs_tgt_srpc_conf *tgtconf;
3018                 int                       found = 0;
3019
3020                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3021                      tgtconf = tgtconf->mtsc_next) {
3022                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3023                                 found = 1;
3024                                 break;
3025                         }
3026                 }
3027
3028                 if (!found) {
3029                         int name_len;
3030
3031                         OBD_ALLOC_PTR(tgtconf);
3032                         if (tgtconf == NULL)
3033                                 RETURN(-ENOMEM);
3034
3035                         name_len = strlen(svname);
3036
3037                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3038                         if (tgtconf->mtsc_tgt == NULL) {
3039                                 OBD_FREE_PTR(tgtconf);
3040                                 RETURN(-ENOMEM);
3041                         }
3042                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3043
3044                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3045                         fsdb->fsdb_srpc_tgt = tgtconf;
3046                 }
3047
3048                 rset = &tgtconf->mtsc_rset;
3049         } else {
3050                 rset = &fsdb->fsdb_srpc_gen;
3051         }
3052
3053         rc = sptlrpc_rule_set_merge(rset, &rule);
3054
3055         RETURN(rc);
3056 }
3057
3058 static int mgs_srpc_set_param(const struct lu_env *env,
3059                               struct mgs_device *mgs,
3060                               struct fs_db *fsdb,
3061                               struct mgs_target_info *mti,
3062                               char *param)
3063 {
3064         char                   *copy;
3065         int                     rc, copy_size;
3066         ENTRY;
3067
3068 #ifndef HAVE_GSS
3069         RETURN(-EINVAL);
3070 #endif
3071         /* keep a copy of original param, which could be destroied
3072          * during parsing */
3073         copy_size = strlen(param) + 1;
3074         OBD_ALLOC(copy, copy_size);
3075         if (copy == NULL)
3076                 return -ENOMEM;
3077         memcpy(copy, param, copy_size);
3078
3079         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3080         if (rc)
3081                 goto out_free;
3082
3083         /* previous steps guaranteed the syntax is correct */
3084         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3085         if (rc)
3086                 goto out_free;
3087
3088         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3089                 /*
3090                  * for mgs rules, make them effective immediately.
3091                  */
3092                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3093                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3094                                                  &fsdb->fsdb_srpc_gen);
3095         }
3096
3097 out_free:
3098         OBD_FREE(copy, copy_size);
3099         RETURN(rc);
3100 }
3101
3102 struct mgs_srpc_read_data {
3103         struct fs_db   *msrd_fsdb;
3104         int             msrd_skip;
3105 };
3106
3107 static int mgs_srpc_read_handler(const struct lu_env *env,
3108                                  struct llog_handle *llh,
3109                                  struct llog_rec_hdr *rec, void *data)
3110 {
3111         struct mgs_srpc_read_data *msrd = data;
3112         struct cfg_marker         *marker;
3113         struct lustre_cfg         *lcfg = REC_DATA(rec);
3114         char                      *svname, *param;
3115         int                        cfg_len, rc;
3116         ENTRY;
3117
3118         if (rec->lrh_type != OBD_CFG_REC) {
3119                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3120                 RETURN(-EINVAL);
3121         }
3122
3123         cfg_len = REC_DATA_LEN(rec);
3124
3125         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3126         if (rc) {
3127                 CERROR("Insane cfg\n");
3128                 RETURN(rc);
3129         }
3130
3131         if (lcfg->lcfg_command == LCFG_MARKER) {
3132                 marker = lustre_cfg_buf(lcfg, 1);
3133
3134                 if (marker->cm_flags & CM_START &&
3135                     marker->cm_flags & CM_SKIP)
3136                         msrd->msrd_skip = 1;
3137                 if (marker->cm_flags & CM_END)
3138                         msrd->msrd_skip = 0;
3139
3140                 RETURN(0);
3141         }
3142
3143         if (msrd->msrd_skip)
3144                 RETURN(0);
3145
3146         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3147                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3148                 RETURN(0);
3149         }
3150
3151         svname = lustre_cfg_string(lcfg, 0);
3152         if (svname == NULL) {
3153                 CERROR("svname is empty\n");
3154                 RETURN(0);
3155         }
3156
3157         param = lustre_cfg_string(lcfg, 1);
3158         if (param == NULL) {
3159                 CERROR("param is empty\n");
3160                 RETURN(0);
3161         }
3162
3163         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3164         if (rc)
3165                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3166
3167         RETURN(0);
3168 }
3169
3170 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3171                                 struct mgs_device *mgs,
3172                                 struct fs_db *fsdb)
3173 {
3174         struct llog_handle        *llh = NULL;
3175         struct llog_ctxt          *ctxt;
3176         char                      *logname;
3177         struct mgs_srpc_read_data  msrd;
3178         int                        rc;
3179         ENTRY;
3180
3181         /* construct log name */
3182         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3183         if (rc)
3184                 RETURN(rc);
3185
3186         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3187         LASSERT(ctxt != NULL);
3188
3189         if (mgs_log_is_empty(env, mgs, logname))
3190                 GOTO(out, rc = 0);
3191
3192         rc = llog_open(env, ctxt, &llh, NULL, logname,
3193                        LLOG_OPEN_EXISTS);
3194         if (rc < 0) {
3195                 if (rc == -ENOENT)
3196                         rc = 0;
3197                 GOTO(out, rc);
3198         }
3199
3200         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3201         if (rc)
3202                 GOTO(out_close, rc);
3203
3204         if (llog_get_size(llh) <= 1)
3205                 GOTO(out_close, rc = 0);
3206
3207         msrd.msrd_fsdb = fsdb;
3208         msrd.msrd_skip = 0;
3209
3210         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3211                           NULL);
3212
3213 out_close:
3214         llog_close(env, llh);
3215 out:
3216         llog_ctxt_put(ctxt);
3217         name_destroy(&logname);
3218
3219         if (rc)
3220                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3221         RETURN(rc);
3222 }
3223
3224 /* Permanent settings of all parameters by writing into the appropriate
3225  * configuration logs.
3226  * A parameter with null value ("<param>='\0'") means to erase it out of
3227  * the logs.
3228  */
3229 static int mgs_write_log_param(const struct lu_env *env,
3230                                struct mgs_device *mgs, struct fs_db *fsdb,
3231                                struct mgs_target_info *mti, char *ptr)
3232 {
3233         struct mgs_thread_info *mgi = mgs_env_info(env);
3234         char *logname;
3235         char *tmp;
3236         int rc = 0, rc2 = 0;
3237         ENTRY;
3238
3239         /* For various parameter settings, we have to figure out which logs
3240            care about them (e.g. both mdt and client for lov settings) */
3241         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3242
3243         /* The params are stored in MOUNT_DATA_FILE and modified via
3244            tunefs.lustre, or set using lctl conf_param */
3245
3246         /* Processed in lustre_start_mgc */
3247         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3248                 GOTO(end, rc);
3249
3250         /* Processed in ost/mdt */
3251         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3252                 GOTO(end, rc);
3253
3254         /* Processed in mgs_write_log_ost */
3255         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {