Whamcloud - gitweb
LU-1778 llite: fix inconsistencies of root squash feature
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         LASSERT(dir);
71         LASSERT(dir->do_index_ops);
72
73         iops = &dir->do_index_ops->dio_it;
74         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
75         if (IS_ERR(it))
76                 RETURN(PTR_ERR(it));
77
78         rc = iops->load(env, it, 0);
79         if (rc <= 0)
80                 GOTO(fini, rc = 0);
81
82         /* main cycle */
83         do {
84                 key = (void *)iops->key(env, it);
85                 if (IS_ERR(key)) {
86                         CERROR("%s: key failed when listing %s: rc = %d\n",
87                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
88                                (int) PTR_ERR(key));
89                         goto next;
90                 }
91                 key_sz = iops->key_size(env, it);
92                 LASSERT(key_sz > 0);
93
94                 /* filter out "." and ".." entries */
95                 if (key[0] == '.') {
96                         if (key_sz == 1)
97                                 goto next;
98                         if (key_sz == 2 && key[1] == '.')
99                                 goto next;
100                 }
101
102                 de = mgs_direntry_alloc(key_sz + 1);
103                 if (de == NULL) {
104                         rc = -ENOMEM;
105                         break;
106                 }
107
108                 memcpy(de->name, key, key_sz);
109                 de->name[key_sz] = 0;
110
111                 cfs_list_add(&de->list, list);
112
113 next:
114                 rc = iops->next(env, it);
115         } while (rc == 0);
116         rc = 0;
117
118         iops->put(env, it);
119
120 fini:
121         iops->fini(env, it);
122         if (rc)
123                 CERROR("%s: key failed when listing %s: rc = %d\n",
124                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
125         RETURN(rc);
126 }
127
128 /******************** DB functions *********************/
129
130 static inline int name_create(char **newname, char *prefix, char *suffix)
131 {
132         LASSERT(newname);
133         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
134         if (!*newname)
135                 return -ENOMEM;
136         sprintf(*newname, "%s%s", prefix, suffix);
137         return 0;
138 }
139
140 static inline void name_destroy(char **name)
141 {
142         if (*name)
143                 OBD_FREE(*name, strlen(*name) + 1);
144         *name = NULL;
145 }
146
147 struct mgs_fsdb_handler_data
148 {
149         struct fs_db   *fsdb;
150         __u32           ver;
151 };
152
153 /* from the (client) config log, figure out:
154         1. which ost's/mdt's are configured (by index)
155         2. what the last config step is
156         3. COMPAT_18 osc name
157 */
158 /* It might be better to have a separate db file, instead of parsing the info
159    out of the client log.  This is slow and potentially error-prone. */
160 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
161                             struct llog_rec_hdr *rec, void *data)
162 {
163         struct mgs_fsdb_handler_data *d = data;
164         struct fs_db *fsdb = d->fsdb;
165         int cfg_len = rec->lrh_len;
166         char *cfg_buf = (char*) (rec + 1);
167         struct lustre_cfg *lcfg;
168         __u32 index;
169         int rc = 0;
170         ENTRY;
171
172         if (rec->lrh_type != OBD_CFG_REC) {
173                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
174                 RETURN(-EINVAL);
175         }
176
177         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
178         if (rc) {
179                 CERROR("Insane cfg\n");
180                 RETURN(rc);
181         }
182
183         lcfg = (struct lustre_cfg *)cfg_buf;
184
185         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
186                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
187
188         /* Figure out ost indicies */
189         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
190         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
191             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
192                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
193                                        NULL, 10);
194                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
195                        lustre_cfg_string(lcfg, 1), index,
196                        lustre_cfg_string(lcfg, 2));
197                 set_bit(index, fsdb->fsdb_ost_index_map);
198         }
199
200         /* Figure out mdt indicies */
201         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
202         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
203             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
204                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
205                                        &index, NULL);
206                 if (rc != LDD_F_SV_TYPE_MDT) {
207                         CWARN("Unparsable MDC name %s, assuming index 0\n",
208                               lustre_cfg_string(lcfg, 0));
209                         index = 0;
210                 }
211                 rc = 0;
212                 CDEBUG(D_MGS, "MDT index is %u\n", index);
213                 set_bit(index, fsdb->fsdb_mdt_index_map);
214                 fsdb->fsdb_mdt_count ++;
215         }
216
217         /**
218          * figure out the old config. fsdb_gen = 0 means old log
219          * It is obsoleted and not supported anymore
220          */
221         if (fsdb->fsdb_gen == 0) {
222                 CERROR("Old config format is not supported\n");
223                 RETURN(-EINVAL);
224         }
225
226         /*
227          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
228          */
229         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
230             lcfg->lcfg_command == LCFG_ATTACH &&
231             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
232                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
233                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
234                         CWARN("MDT using 1.8 OSC name scheme\n");
235                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
236                 }
237         }
238
239         if (lcfg->lcfg_command == LCFG_MARKER) {
240                 struct cfg_marker *marker;
241                 marker = lustre_cfg_buf(lcfg, 1);
242
243                 d->ver = marker->cm_vers;
244
245                 /* Keep track of the latest marker step */
246                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
247         }
248
249         RETURN(rc);
250 }
251
252 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
253 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
254                                   struct mgs_device *mgs,
255                                   struct fs_db *fsdb)
256 {
257         char                            *logname;
258         struct llog_handle              *loghandle;
259         struct llog_ctxt                *ctxt;
260         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
261         int rc;
262
263         ENTRY;
264
265         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
266         LASSERT(ctxt != NULL);
267         rc = name_create(&logname, fsdb->fsdb_name, "-client");
268         if (rc)
269                 GOTO(out_put, rc);
270         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
271         if (rc)
272                 GOTO(out_pop, rc);
273
274         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
275         if (rc)
276                 GOTO(out_close, rc);
277
278         if (llog_get_size(loghandle) <= 1)
279                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
280
281         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
282         CDEBUG(D_INFO, "get_db = %d\n", rc);
283 out_close:
284         llog_close(env, loghandle);
285 out_pop:
286         name_destroy(&logname);
287 out_put:
288         llog_ctxt_put(ctxt);
289
290         RETURN(rc);
291 }
292
293 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
294 {
295         struct mgs_tgt_srpc_conf *tgtconf;
296
297         /* free target-specific rules */
298         while (fsdb->fsdb_srpc_tgt) {
299                 tgtconf = fsdb->fsdb_srpc_tgt;
300                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
301
302                 LASSERT(tgtconf->mtsc_tgt);
303
304                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
305                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
306                 OBD_FREE_PTR(tgtconf);
307         }
308
309         /* free general rules */
310         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
311 }
312
313 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
314 {
315         struct fs_db *fsdb;
316         cfs_list_t *tmp;
317
318         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
319                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
320                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
321                         return fsdb;
322         }
323         return NULL;
324 }
325
326 /* caller must hold the mgs->mgs_fs_db_lock */
327 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
328                                   struct mgs_device *mgs, char *fsname)
329 {
330         struct fs_db *fsdb;
331         int rc;
332         ENTRY;
333
334         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
335                 CERROR("fsname %s is too long\n", fsname);
336                 RETURN(NULL);
337         }
338
339         OBD_ALLOC_PTR(fsdb);
340         if (!fsdb)
341                 RETURN(NULL);
342
343         strcpy(fsdb->fsdb_name, fsname);
344         mutex_init(&fsdb->fsdb_mutex);
345         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
346         fsdb->fsdb_gen = 1;
347
348         if (strcmp(fsname, MGSSELF_NAME) == 0) {
349                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
350         } else {
351                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
352                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
353                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
354                         CERROR("No memory for index maps\n");
355                         GOTO(err, rc = -ENOMEM);
356                 }
357
358                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
359                 if (rc)
360                         GOTO(err, rc);
361                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
362                 if (rc)
363                         GOTO(err, rc);
364
365                 /* initialise data for NID table */
366                 mgs_ir_init_fs(env, mgs, fsdb);
367
368                 lproc_mgs_add_live(mgs, fsdb);
369         }
370
371         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
372
373         RETURN(fsdb);
374 err:
375         if (fsdb->fsdb_ost_index_map)
376                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
377         if (fsdb->fsdb_mdt_index_map)
378                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
379         name_destroy(&fsdb->fsdb_clilov);
380         name_destroy(&fsdb->fsdb_clilmv);
381         OBD_FREE_PTR(fsdb);
382         RETURN(NULL);
383 }
384
385 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
386 {
387         /* wait for anyone with the sem */
388         mutex_lock(&fsdb->fsdb_mutex);
389         lproc_mgs_del_live(mgs, fsdb);
390         cfs_list_del(&fsdb->fsdb_list);
391
392         /* deinitialize fsr */
393         mgs_ir_fini_fs(mgs, fsdb);
394
395         if (fsdb->fsdb_ost_index_map)
396                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
397         if (fsdb->fsdb_mdt_index_map)
398                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
399         name_destroy(&fsdb->fsdb_clilov);
400         name_destroy(&fsdb->fsdb_clilmv);
401         mgs_free_fsdb_srpc(fsdb);
402         mutex_unlock(&fsdb->fsdb_mutex);
403         OBD_FREE_PTR(fsdb);
404 }
405
406 int mgs_init_fsdb_list(struct mgs_device *mgs)
407 {
408         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
409         return 0;
410 }
411
412 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
413 {
414         struct fs_db *fsdb;
415         cfs_list_t *tmp, *tmp2;
416         mutex_lock(&mgs->mgs_mutex);
417         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
418                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
419                 mgs_free_fsdb(mgs, fsdb);
420         }
421         mutex_unlock(&mgs->mgs_mutex);
422         return 0;
423 }
424
425 int mgs_find_or_make_fsdb(const struct lu_env *env,
426                           struct mgs_device *mgs, char *name,
427                           struct fs_db **dbh)
428 {
429         struct fs_db *fsdb;
430         int rc = 0;
431
432         ENTRY;
433         mutex_lock(&mgs->mgs_mutex);
434         fsdb = mgs_find_fsdb(mgs, name);
435         if (fsdb) {
436                 mutex_unlock(&mgs->mgs_mutex);
437                 *dbh = fsdb;
438                 RETURN(0);
439         }
440
441         CDEBUG(D_MGS, "Creating new db\n");
442         fsdb = mgs_new_fsdb(env, mgs, name);
443         /* lock fsdb_mutex until the db is loaded from llogs */
444         if (fsdb)
445                 mutex_lock(&fsdb->fsdb_mutex);
446         mutex_unlock(&mgs->mgs_mutex);
447         if (!fsdb)
448                 RETURN(-ENOMEM);
449
450         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
451                 /* populate the db from the client llog */
452                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
453                 if (rc) {
454                         CERROR("Can't get db from client log %d\n", rc);
455                         GOTO(out_free, rc);
456                 }
457         }
458
459         /* populate srpc rules from params llog */
460         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
461         if (rc) {
462                 CERROR("Can't get db from params log %d\n", rc);
463                 GOTO(out_free, rc);
464         }
465
466         mutex_unlock(&fsdb->fsdb_mutex);
467         *dbh = fsdb;
468
469         RETURN(0);
470
471 out_free:
472         mutex_unlock(&fsdb->fsdb_mutex);
473         mgs_free_fsdb(mgs, fsdb);
474         return rc;
475 }
476
477 /* 1 = index in use
478    0 = index unused
479    -1= empty client log */
480 int mgs_check_index(const struct lu_env *env,
481                     struct mgs_device *mgs,
482                     struct mgs_target_info *mti)
483 {
484         struct fs_db *fsdb;
485         void *imap;
486         int rc = 0;
487         ENTRY;
488
489         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
490
491         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
492         if (rc) {
493                 CERROR("Can't get db for %s\n", mti->mti_fsname);
494                 RETURN(rc);
495         }
496
497         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
498                 RETURN(-1);
499
500         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
501                 imap = fsdb->fsdb_ost_index_map;
502         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
503                 imap = fsdb->fsdb_mdt_index_map;
504         else
505                 RETURN(-EINVAL);
506
507         if (test_bit(mti->mti_stripe_index, imap))
508                 RETURN(1);
509         RETURN(0);
510 }
511
512 static __inline__ int next_index(void *index_map, int map_len)
513 {
514         int i;
515         for (i = 0; i < map_len * 8; i++)
516                 if (!test_bit(i, index_map)) {
517                          return i;
518                  }
519         CERROR("max index %d exceeded.\n", i);
520         return -1;
521 }
522
523 /* Return codes:
524         0  newly marked as in use
525         <0 err
526         +EALREADY for update of an old index */
527 static int mgs_set_index(const struct lu_env *env,
528                          struct mgs_device *mgs,
529                          struct mgs_target_info *mti)
530 {
531         struct fs_db *fsdb;
532         void *imap;
533         int rc = 0;
534         ENTRY;
535
536         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
537         if (rc) {
538                 CERROR("Can't get db for %s\n", mti->mti_fsname);
539                 RETURN(rc);
540         }
541
542         mutex_lock(&fsdb->fsdb_mutex);
543         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
544                 imap = fsdb->fsdb_ost_index_map;
545         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
546                 imap = fsdb->fsdb_mdt_index_map;
547         } else {
548                 GOTO(out_up, rc = -EINVAL);
549         }
550
551         if (mti->mti_flags & LDD_F_NEED_INDEX) {
552                 rc = next_index(imap, INDEX_MAP_SIZE);
553                 if (rc == -1)
554                         GOTO(out_up, rc = -ERANGE);
555                 mti->mti_stripe_index = rc;
556                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
557                         fsdb->fsdb_mdt_count ++;
558         }
559
560         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
561                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
562                                    "but the max index is %d.\n",
563                                    mti->mti_svname, mti->mti_stripe_index,
564                                    INDEX_MAP_SIZE * 8);
565                 GOTO(out_up, rc = -ERANGE);
566         }
567
568         if (test_bit(mti->mti_stripe_index, imap)) {
569                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
570                     !(mti->mti_flags & LDD_F_WRITECONF)) {
571                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
572                                            "%d, but that index is already in "
573                                            "use. Use --writeconf to force\n",
574                                            mti->mti_svname,
575                                            mti->mti_stripe_index);
576                         GOTO(out_up, rc = -EADDRINUSE);
577                 } else {
578                         CDEBUG(D_MGS, "Server %s updating index %d\n",
579                                mti->mti_svname, mti->mti_stripe_index);
580                         GOTO(out_up, rc = EALREADY);
581                 }
582         }
583
584         set_bit(mti->mti_stripe_index, imap);
585         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
586         mutex_unlock(&fsdb->fsdb_mutex);
587         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
588                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
589
590         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
591                mti->mti_stripe_index);
592
593         RETURN(0);
594 out_up:
595         mutex_unlock(&fsdb->fsdb_mutex);
596         return rc;
597 }
598
599 struct mgs_modify_lookup {
600         struct cfg_marker mml_marker;
601         int               mml_modified;
602 };
603
604 static int mgs_modify_handler(const struct lu_env *env,
605                               struct llog_handle *llh,
606                               struct llog_rec_hdr *rec, void *data)
607 {
608         struct mgs_modify_lookup *mml = data;
609         struct cfg_marker *marker;
610         struct lustre_cfg *lcfg = REC_DATA(rec);
611         int cfg_len = REC_DATA_LEN(rec);
612         int rc;
613         ENTRY;
614
615         if (rec->lrh_type != OBD_CFG_REC) {
616                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
617                 RETURN(-EINVAL);
618         }
619
620         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
621         if (rc) {
622                 CERROR("Insane cfg\n");
623                 RETURN(rc);
624         }
625
626         /* We only care about markers */
627         if (lcfg->lcfg_command != LCFG_MARKER)
628                 RETURN(0);
629
630         marker = lustre_cfg_buf(lcfg, 1);
631         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
632             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
633             !(marker->cm_flags & CM_SKIP)) {
634                 /* Found a non-skipped marker match */
635                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
636                        rec->lrh_index, marker->cm_step,
637                        marker->cm_flags, mml->mml_marker.cm_flags,
638                        marker->cm_tgtname, marker->cm_comment);
639                 /* Overwrite the old marker llog entry */
640                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
641                 marker->cm_flags |= mml->mml_marker.cm_flags;
642                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
643                 /* Header and tail are added back to lrh_len in
644                    llog_lvfs_write_rec */
645                 rec->lrh_len = cfg_len;
646                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
647                                 rec->lrh_index);
648                 if (!rc)
649                          mml->mml_modified++;
650         }
651
652         RETURN(rc);
653 }
654
655 /**
656  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
657  * Return code:
658  * 0 - modified successfully,
659  * 1 - no modification was done
660  * negative - error
661  */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct llog_ctxt *ctxt;
668         struct mgs_modify_lookup *mml;
669         int rc;
670
671         ENTRY;
672
673         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
674         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
675                flags);
676
677         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
678         LASSERT(ctxt != NULL);
679         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
680         if (rc < 0) {
681                 if (rc == -ENOENT)
682                         rc = 0;
683                 GOTO(out_pop, rc);
684         }
685
686         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
687         if (rc)
688                 GOTO(out_close, rc);
689
690         if (llog_get_size(loghandle) <= 1)
691                 GOTO(out_close, rc = 0);
692
693         OBD_ALLOC_PTR(mml);
694         if (!mml)
695                 GOTO(out_close, rc = -ENOMEM);
696         if (strlcpy(mml->mml_marker.cm_comment, comment,
697                     sizeof(mml->mml_marker.cm_comment)) >=
698             sizeof(mml->mml_marker.cm_comment))
699                 GOTO(out_close, rc = -E2BIG);
700         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
701                     sizeof(mml->mml_marker.cm_tgtname)) >=
702             sizeof(mml->mml_marker.cm_tgtname))
703                 GOTO(out_close, rc = -E2BIG);
704         /* Modify mostly means cancel */
705         mml->mml_marker.cm_flags = flags;
706         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
707         mml->mml_modified = 0;
708         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
709                           NULL);
710         if (!rc && !mml->mml_modified)
711                 rc = 1;
712         OBD_FREE_PTR(mml);
713
714 out_close:
715         llog_close(env, loghandle);
716 out_pop:
717         if (rc < 0)
718                 CERROR("%s: modify %s/%s failed: rc = %d\n",
719                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
720         llog_ctxt_put(ctxt);
721         RETURN(rc);
722 }
723
724 /** This structure is passed to mgs_replace_handler */
725 struct mgs_replace_uuid_lookup {
726         /* Nids are replaced for this target device */
727         struct mgs_target_info target;
728         /* Temporary modified llog */
729         struct llog_handle *temp_llh;
730         /* Flag is set if in target block*/
731         int in_target_device;
732         /* Nids already added. Just skip (multiple nids) */
733         int device_nids_added;
734         /* Flag is set if this block should not be copied */
735         int skip_it;
736 };
737
738 /**
739  * Check: a) if block should be skipped
740  * b) is it target block
741  *
742  * \param[in] lcfg
743  * \param[in] mrul
744  *
745  * \retval 0 should not to be skipped
746  * \retval 1 should to be skipped
747  */
748 static int check_markers(struct lustre_cfg *lcfg,
749                          struct mgs_replace_uuid_lookup *mrul)
750 {
751          struct cfg_marker *marker;
752
753         /* Track markers. Find given device */
754         if (lcfg->lcfg_command == LCFG_MARKER) {
755                 marker = lustre_cfg_buf(lcfg, 1);
756                 /* Clean llog from records marked as CM_EXCLUDE.
757                    CM_SKIP records are used for "active" command
758                    and can be restored if needed */
759                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
760                     (CM_EXCLUDE | CM_START)) {
761                         mrul->skip_it = 1;
762                         return 1;
763                 }
764
765                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
766                     (CM_EXCLUDE | CM_END)) {
767                         mrul->skip_it = 0;
768                         return 1;
769                 }
770
771                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
772                         LASSERT(!(marker->cm_flags & CM_START) ||
773                                 !(marker->cm_flags & CM_END));
774                         if (marker->cm_flags & CM_START) {
775                                 mrul->in_target_device = 1;
776                                 mrul->device_nids_added = 0;
777                         } else if (marker->cm_flags & CM_END)
778                                 mrul->in_target_device = 0;
779                 }
780         }
781
782         return 0;
783 }
784
785 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
786                        struct lustre_cfg *lcfg)
787 {
788         struct llog_rec_hdr      rec;
789         int                      buflen, rc;
790
791         if (!lcfg || !llh)
792                 return -ENOMEM;
793
794         LASSERT(llh->lgh_ctxt);
795
796         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
797                                 lcfg->lcfg_buflens);
798         rec.lrh_len = llog_data_len(buflen);
799         rec.lrh_type = OBD_CFG_REC;
800
801         /* idx = -1 means append */
802         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
803         if (rc)
804                 CERROR("failed %d\n", rc);
805         return rc;
806 }
807
808 static int record_base(const struct lu_env *env, struct llog_handle *llh,
809                      char *cfgname, lnet_nid_t nid, int cmd,
810                      char *s1, char *s2, char *s3, char *s4)
811 {
812         struct mgs_thread_info *mgi = mgs_env_info(env);
813         struct lustre_cfg     *lcfg;
814         int rc;
815
816         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
817                cmd, s1, s2, s3, s4);
818
819         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
820         if (s1)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
822         if (s2)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
824         if (s3)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
826         if (s4)
827                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
828
829         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
830         if (!lcfg)
831                 return -ENOMEM;
832         lcfg->lcfg_nid = nid;
833
834         rc = record_lcfg(env, llh, lcfg);
835
836         lustre_cfg_free(lcfg);
837
838         if (rc) {
839                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
840                        cmd, s1, s2, s3, s4);
841         }
842         return rc;
843 }
844
845 static inline int record_add_uuid(const struct lu_env *env,
846                                   struct llog_handle *llh,
847                                   uint64_t nid, char *uuid)
848 {
849         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
850 }
851
852 static inline int record_add_conn(const struct lu_env *env,
853                                   struct llog_handle *llh,
854                                   char *devname, char *uuid)
855 {
856         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
857 }
858
859 static inline int record_attach(const struct lu_env *env,
860                                 struct llog_handle *llh, char *devname,
861                                 char *type, char *uuid)
862 {
863         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
864 }
865
866 static inline int record_setup(const struct lu_env *env,
867                                struct llog_handle *llh, char *devname,
868                                char *s1, char *s2, char *s3, char *s4)
869 {
870         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
871 }
872
873 /**
874  * \retval <0 record processing error
875  * \retval n record is processed. No need copy original one.
876  * \retval 0 record is not processed.
877  */
878 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
879                            struct mgs_replace_uuid_lookup *mrul)
880 {
881         int nids_added = 0;
882         lnet_nid_t nid;
883         char *ptr;
884         int rc;
885
886         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
887                 /* LCFG_ADD_UUID command found. Let's skip original command
888                    and add passed nids */
889                 ptr = mrul->target.mti_params;
890                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
891                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
892                                "device %s\n", libcfs_nid2str(nid),
893                                 mrul->target.mti_params,
894                                 mrul->target.mti_svname);
895                         rc = record_add_uuid(env,
896                                              mrul->temp_llh, nid,
897                                              mrul->target.mti_params);
898                         if (!rc)
899                                 nids_added++;
900                 }
901
902                 if (nids_added == 0) {
903                         CERROR("No new nids were added, nid %s with uuid %s, "
904                                "device %s\n", libcfs_nid2str(nid),
905                                mrul->target.mti_params,
906                                mrul->target.mti_svname);
907                         RETURN(-ENXIO);
908                 } else {
909                         mrul->device_nids_added = 1;
910                 }
911
912                 return nids_added;
913         }
914
915         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
916                 /* LCFG_SETUP command found. UUID should be changed */
917                 rc = record_setup(env,
918                                   mrul->temp_llh,
919                                   /* devname the same */
920                                   lustre_cfg_string(lcfg, 0),
921                                   /* s1 is not changed */
922                                   lustre_cfg_string(lcfg, 1),
923                                   /* new uuid should be
924                                   the full nidlist */
925                                   mrul->target.mti_params,
926                                   /* s3 is not changed */
927                                   lustre_cfg_string(lcfg, 3),
928                                   /* s4 is not changed */
929                                   lustre_cfg_string(lcfg, 4));
930                 return rc ? rc : 1;
931         }
932
933         /* Another commands in target device block */
934         return 0;
935 }
936
937 /**
938  * Handler that called for every record in llog.
939  * Records are processed in order they placed in llog.
940  *
941  * \param[in] llh       log to be processed
942  * \param[in] rec       current record
943  * \param[in] data      mgs_replace_uuid_lookup structure
944  *
945  * \retval 0    success
946  */
947 static int mgs_replace_handler(const struct lu_env *env,
948                                struct llog_handle *llh,
949                                struct llog_rec_hdr *rec,
950                                void *data)
951 {
952         struct mgs_replace_uuid_lookup *mrul;
953         struct lustre_cfg *lcfg = REC_DATA(rec);
954         int cfg_len = REC_DATA_LEN(rec);
955         int rc;
956         ENTRY;
957
958         mrul = (struct mgs_replace_uuid_lookup *)data;
959
960         if (rec->lrh_type != OBD_CFG_REC) {
961                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
962                        rec->lrh_type, lcfg->lcfg_command,
963                        lustre_cfg_string(lcfg, 0),
964                        lustre_cfg_string(lcfg, 1));
965                 RETURN(-EINVAL);
966         }
967
968         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
969         if (rc) {
970                 /* Do not copy any invalidated records */
971                 GOTO(skip_out, rc = 0);
972         }
973
974         rc = check_markers(lcfg, mrul);
975         if (rc || mrul->skip_it)
976                 GOTO(skip_out, rc = 0);
977
978         /* Write to new log all commands outside target device block */
979         if (!mrul->in_target_device)
980                 GOTO(copy_out, rc = 0);
981
982         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
983            (failover nids) for this target, assuming that if then
984            primary is changing then so is the failover */
985         if (mrul->device_nids_added &&
986             (lcfg->lcfg_command == LCFG_ADD_UUID ||
987              lcfg->lcfg_command == LCFG_ADD_CONN))
988                 GOTO(skip_out, rc = 0);
989
990         rc = process_command(env, lcfg, mrul);
991         if (rc < 0)
992                 RETURN(rc);
993
994         if (rc)
995                 RETURN(0);
996 copy_out:
997         /* Record is placed in temporary llog as is */
998         rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1);
999
1000         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1001                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1002                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1003         RETURN(rc);
1004
1005 skip_out:
1006         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1007                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1008                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1009         RETURN(rc);
1010 }
1011
1012 static int mgs_log_is_empty(const struct lu_env *env,
1013                             struct mgs_device *mgs, char *name)
1014 {
1015         struct llog_ctxt        *ctxt;
1016         int                      rc;
1017
1018         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1019         LASSERT(ctxt != NULL);
1020
1021         rc = llog_is_empty(env, ctxt, name);
1022         llog_ctxt_put(ctxt);
1023         return rc;
1024 }
1025
1026 static int mgs_replace_nids_log(const struct lu_env *env,
1027                                 struct obd_device *mgs, struct fs_db *fsdb,
1028                                 char *logname, char *devname, char *nids)
1029 {
1030         struct llog_handle *orig_llh, *backup_llh;
1031         struct llog_ctxt *ctxt;
1032         struct mgs_replace_uuid_lookup *mrul;
1033         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1034         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1035         char *backup;
1036         int rc, rc2;
1037         ENTRY;
1038
1039         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1040
1041         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1042         LASSERT(ctxt != NULL);
1043
1044         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1045                 /* Log is empty. Nothing to replace */
1046                 GOTO(out_put, rc = 0);
1047         }
1048
1049         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1050         if (backup == NULL)
1051                 GOTO(out_put, rc = -ENOMEM);
1052
1053         sprintf(backup, "%s.bak", logname);
1054
1055         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1056         if (rc == 0) {
1057                 /* Now erase original log file. Connections are not allowed.
1058                    Backup is already saved */
1059                 rc = llog_erase(env, ctxt, NULL, logname);
1060                 if (rc < 0)
1061                         GOTO(out_free, rc);
1062         } else if (rc != -ENOENT) {
1063                 CERROR("%s: can't make backup for %s: rc = %d\n",
1064                        mgs->obd_name, logname, rc);
1065                 GOTO(out_free,rc);
1066         }
1067
1068         /* open local log */
1069         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1070         if (rc)
1071                 GOTO(out_restore, rc);
1072
1073         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1074         if (rc)
1075                 GOTO(out_closel, rc);
1076
1077         /* open backup llog */
1078         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1079                        LLOG_OPEN_EXISTS);
1080         if (rc)
1081                 GOTO(out_closel, rc);
1082
1083         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1084         if (rc)
1085                 GOTO(out_close, rc);
1086
1087         if (llog_get_size(backup_llh) <= 1)
1088                 GOTO(out_close, rc = 0);
1089
1090         OBD_ALLOC_PTR(mrul);
1091         if (!mrul)
1092                 GOTO(out_close, rc = -ENOMEM);
1093         /* devname is only needed information to replace UUID records */
1094         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1095         /* parse nids later */
1096         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1097         /* Copy records to this temporary llog */
1098         mrul->temp_llh = orig_llh;
1099
1100         rc = llog_process(env, backup_llh, mgs_replace_handler,
1101                           (void *)mrul, NULL);
1102         OBD_FREE_PTR(mrul);
1103 out_close:
1104         rc2 = llog_close(NULL, backup_llh);
1105         if (!rc)
1106                 rc = rc2;
1107 out_closel:
1108         rc2 = llog_close(NULL, orig_llh);
1109         if (!rc)
1110                 rc = rc2;
1111
1112 out_restore:
1113         if (rc) {
1114                 CERROR("%s: llog should be restored: rc = %d\n",
1115                        mgs->obd_name, rc);
1116                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1117                                   logname);
1118                 if (rc2 < 0)
1119                         CERROR("%s: can't restore backup %s: rc = %d\n",
1120                                mgs->obd_name, logname, rc2);
1121         }
1122
1123 out_free:
1124         OBD_FREE(backup, strlen(backup) + 1);
1125
1126 out_put:
1127         llog_ctxt_put(ctxt);
1128
1129         if (rc)
1130                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1131                        mgs->obd_name, logname, rc);
1132
1133         RETURN(rc);
1134 }
1135
1136 /**
1137  * Parse device name and get file system name and/or device index
1138  *
1139  * \param[in]   devname device name (ex. lustre-MDT0000)
1140  * \param[out]  fsname  file system name(optional)
1141  * \param[out]  index   device index(optional)
1142  *
1143  * \retval 0    success
1144  */
1145 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1146 {
1147         int rc;
1148         ENTRY;
1149
1150         /* Extract fsname */
1151         if (fsname) {
1152                 rc = server_name2fsname(devname, fsname, NULL);
1153                 if (rc < 0) {
1154                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1155                                devname);
1156                         RETURN(-EINVAL);
1157                 }
1158         }
1159
1160         if (index) {
1161                 rc = server_name2index(devname, index, NULL);
1162                 if (rc < 0) {
1163                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1164                                devname);
1165                         RETURN(-EINVAL);
1166                 }
1167         }
1168
1169         RETURN(0);
1170 }
1171
1172 static int only_mgs_is_running(struct obd_device *mgs_obd)
1173 {
1174         /* TDB: Is global variable with devices count exists? */
1175         int num_devices = get_devices_count();
1176         /* osd, MGS and MGC + self_export
1177            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1178         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1179 }
1180
1181 static int name_create_mdt(char **logname, char *fsname, int i)
1182 {
1183         char mdt_index[9];
1184
1185         sprintf(mdt_index, "-MDT%04x", i);
1186         return name_create(logname, fsname, mdt_index);
1187 }
1188
1189 /**
1190  * Replace nids for \a device to \a nids values
1191  *
1192  * \param obd           MGS obd device
1193  * \param devname       nids need to be replaced for this device
1194  * (ex. lustre-OST0000)
1195  * \param nids          nids list (ex. nid1,nid2,nid3)
1196  *
1197  * \retval 0    success
1198  */
1199 int mgs_replace_nids(const struct lu_env *env,
1200                      struct mgs_device *mgs,
1201                      char *devname, char *nids)
1202 {
1203         /* Assume fsname is part of device name */
1204         char fsname[MTI_NAME_MAXLEN];
1205         int rc;
1206         __u32 index;
1207         char *logname;
1208         struct fs_db *fsdb;
1209         unsigned int i;
1210         int conn_state;
1211         struct obd_device *mgs_obd = mgs->mgs_obd;
1212         ENTRY;
1213
1214         /* We can only change NIDs if no other nodes are connected */
1215         spin_lock(&mgs_obd->obd_dev_lock);
1216         conn_state = mgs_obd->obd_no_conn;
1217         mgs_obd->obd_no_conn = 1;
1218         spin_unlock(&mgs_obd->obd_dev_lock);
1219
1220         /* We can not change nids if not only MGS is started */
1221         if (!only_mgs_is_running(mgs_obd)) {
1222                 CERROR("Only MGS is allowed to be started\n");
1223                 GOTO(out, rc = -EINPROGRESS);
1224         }
1225
1226         /* Get fsname and index*/
1227         rc = mgs_parse_devname(devname, fsname, &index);
1228         if (rc)
1229                 GOTO(out, rc);
1230
1231         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1232         if (rc) {
1233                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1234                 GOTO(out, rc);
1235         }
1236
1237         /* Process client llogs */
1238         name_create(&logname, fsname, "-client");
1239         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1240         name_destroy(&logname);
1241         if (rc) {
1242                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1243                        fsname, devname, rc);
1244                 GOTO(out, rc);
1245         }
1246
1247         /* Process MDT llogs */
1248         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1249                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1250                         continue;
1251                 name_create_mdt(&logname, fsname, i);
1252                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1253                 name_destroy(&logname);
1254                 if (rc)
1255                         GOTO(out, rc);
1256         }
1257
1258 out:
1259         spin_lock(&mgs_obd->obd_dev_lock);
1260         mgs_obd->obd_no_conn = conn_state;
1261         spin_unlock(&mgs_obd->obd_dev_lock);
1262
1263         RETURN(rc);
1264 }
1265
1266 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1267                             char *devname, struct lov_desc *desc)
1268 {
1269         struct mgs_thread_info *mgi = mgs_env_info(env);
1270         struct lustre_cfg *lcfg;
1271         int rc;
1272
1273         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1274         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1275         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1276         if (!lcfg)
1277                 return -ENOMEM;
1278         rc = record_lcfg(env, llh, lcfg);
1279
1280         lustre_cfg_free(lcfg);
1281         return rc;
1282 }
1283
1284 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1285                             char *devname, struct lmv_desc *desc)
1286 {
1287         struct mgs_thread_info *mgi = mgs_env_info(env);
1288         struct lustre_cfg *lcfg;
1289         int rc;
1290
1291         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1292         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1293         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1294
1295         rc = record_lcfg(env, llh, lcfg);
1296
1297         lustre_cfg_free(lcfg);
1298         return rc;
1299 }
1300
1301 static inline int record_mdc_add(const struct lu_env *env,
1302                                  struct llog_handle *llh,
1303                                  char *logname, char *mdcuuid,
1304                                  char *mdtuuid, char *index,
1305                                  char *gen)
1306 {
1307         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1308                            mdtuuid,index,gen,mdcuuid);
1309 }
1310
1311 static inline int record_lov_add(const struct lu_env *env,
1312                                  struct llog_handle *llh,
1313                                  char *lov_name, char *ost_uuid,
1314                                  char *index, char *gen)
1315 {
1316         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1317                            ost_uuid, index, gen, 0);
1318 }
1319
1320 static inline int record_mount_opt(const struct lu_env *env,
1321                                    struct llog_handle *llh,
1322                                    char *profile, char *lov_name,
1323                                    char *mdc_name)
1324 {
1325         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1326                            profile,lov_name,mdc_name,0);
1327 }
1328
1329 static int record_marker(const struct lu_env *env,
1330                          struct llog_handle *llh,
1331                          struct fs_db *fsdb, __u32 flags,
1332                          char *tgtname, char *comment)
1333 {
1334         struct mgs_thread_info *mgi = mgs_env_info(env);
1335         struct lustre_cfg *lcfg;
1336         int rc;
1337         int cplen = 0;
1338
1339         if (flags & CM_START)
1340                 fsdb->fsdb_gen++;
1341         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1342         mgi->mgi_marker.cm_flags = flags;
1343         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1344         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1345                         sizeof(mgi->mgi_marker.cm_tgtname));
1346         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1347                 return -E2BIG;
1348         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1349                         sizeof(mgi->mgi_marker.cm_comment));
1350         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1351                 return -E2BIG;
1352         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1353         mgi->mgi_marker.cm_canceltime = 0;
1354         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1355         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1356                             sizeof(mgi->mgi_marker));
1357         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1358         if (!lcfg)
1359                 return -ENOMEM;
1360         rc = record_lcfg(env, llh, lcfg);
1361
1362         lustre_cfg_free(lcfg);
1363         return rc;
1364 }
1365
1366 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1367                             struct llog_handle **llh, char *name)
1368 {
1369         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1370         struct llog_ctxt        *ctxt;
1371         int                      rc = 0;
1372         ENTRY;
1373
1374         if (*llh)
1375                 GOTO(out, rc = -EBUSY);
1376
1377         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1378         if (!ctxt)
1379                 GOTO(out, rc = -ENODEV);
1380         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1381
1382         rc = llog_open_create(env, ctxt, llh, NULL, name);
1383         if (rc)
1384                 GOTO(out_ctxt, rc);
1385         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1386         if (rc)
1387                 llog_close(env, *llh);
1388 out_ctxt:
1389         llog_ctxt_put(ctxt);
1390 out:
1391         if (rc) {
1392                 CERROR("%s: can't start log %s: rc = %d\n",
1393                        mgs->mgs_obd->obd_name, name, rc);
1394                 *llh = NULL;
1395         }
1396         RETURN(rc);
1397 }
1398
1399 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1400 {
1401         int rc;
1402
1403         rc = llog_close(env, *llh);
1404         *llh = NULL;
1405
1406         return rc;
1407 }
1408
1409 /******************** config "macros" *********************/
1410
1411 /* write an lcfg directly into a log (with markers) */
1412 static int mgs_write_log_direct(const struct lu_env *env,
1413                                 struct mgs_device *mgs, struct fs_db *fsdb,
1414                                 char *logname, struct lustre_cfg *lcfg,
1415                                 char *devname, char *comment)
1416 {
1417         struct llog_handle *llh = NULL;
1418         int rc;
1419         ENTRY;
1420
1421         if (!lcfg)
1422                 RETURN(-ENOMEM);
1423
1424         rc = record_start_log(env, mgs, &llh, logname);
1425         if (rc)
1426                 RETURN(rc);
1427
1428         /* FIXME These should be a single journal transaction */
1429         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1430         if (rc)
1431                 GOTO(out_end, rc);
1432         rc = record_lcfg(env, llh, lcfg);
1433         if (rc)
1434                 GOTO(out_end, rc);
1435         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1436         if (rc)
1437                 GOTO(out_end, rc);
1438 out_end:
1439         record_end_log(env, &llh);
1440         RETURN(rc);
1441 }
1442
1443 /* write the lcfg in all logs for the given fs */
1444 int mgs_write_log_direct_all(const struct lu_env *env,
1445                              struct mgs_device *mgs,
1446                              struct fs_db *fsdb,
1447                              struct mgs_target_info *mti,
1448                              struct lustre_cfg *lcfg,
1449                              char *devname, char *comment,
1450                              int server_only)
1451 {
1452         cfs_list_t list;
1453         struct mgs_direntry *dirent, *n;
1454         char *fsname = mti->mti_fsname;
1455         int rc = 0, len = strlen(fsname);
1456         ENTRY;
1457
1458         /* Find all the logs in the CONFIGS directory */
1459         rc = class_dentry_readdir(env, mgs, &list);
1460         if (rc)
1461                 RETURN(rc);
1462
1463         /* Could use fsdb index maps instead of directory listing */
1464         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1465                 cfs_list_del(&dirent->list);
1466                 /* don't write to sptlrpc rule log */
1467                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1468                         goto next;
1469
1470                 /* caller wants write server logs only */
1471                 if (server_only && strstr(dirent->name, "-client") != NULL)
1472                         goto next;
1473
1474                 if (strncmp(fsname, dirent->name, len) == 0) {
1475                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1476                         /* Erase any old settings of this same parameter */
1477                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1478                                         devname, comment, CM_SKIP);
1479                         if (rc < 0)
1480                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1481                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1482                         /* Write the new one */
1483                         if (lcfg) {
1484                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1485                                                           dirent->name,
1486                                                           lcfg, devname,
1487                                                           comment);
1488                                 if (rc)
1489                                         CERROR("%s: writing log %s: rc = %d\n",
1490                                                mgs->mgs_obd->obd_name,
1491                                                dirent->name, rc);
1492                         }
1493                 }
1494 next:
1495                 mgs_direntry_free(dirent);
1496         }
1497
1498         RETURN(rc);
1499 }
1500
1501 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1502                                     struct mgs_device *mgs,
1503                                     struct fs_db *fsdb,
1504                                     struct mgs_target_info *mti,
1505                                     int index, char *logname);
1506 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1507                                     struct mgs_device *mgs,
1508                                     struct fs_db *fsdb,
1509                                     struct mgs_target_info *mti,
1510                                     char *logname, char *suffix, char *lovname,
1511                                     enum lustre_sec_part sec_part, int flags);
1512 static int name_create_mdt_and_lov(char **logname, char **lovname,
1513                                    struct fs_db *fsdb, int i);
1514
1515 static int add_param(char *params, char *key, char *val)
1516 {
1517         char *start = params + strlen(params);
1518         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1519         int keylen = 0;
1520
1521         if (key != NULL)
1522                 keylen = strlen(key);
1523         if (start + 1 + keylen + strlen(val) >= end) {
1524                 CERROR("params are too long: %s %s%s\n",
1525                        params, key != NULL ? key : "", val);
1526                 return -EINVAL;
1527         }
1528
1529         sprintf(start, " %s%s", key != NULL ? key : "", val);
1530         return 0;
1531 }
1532
1533 /**
1534  * Walk through client config log record and convert the related records
1535  * into the target.
1536  **/
1537 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1538                                          struct llog_handle *llh,
1539                                          struct llog_rec_hdr *rec, void *data)
1540 {
1541         struct mgs_device *mgs;
1542         struct obd_device *obd;
1543         struct mgs_target_info *mti, *tmti;
1544         struct fs_db *fsdb;
1545         int cfg_len = rec->lrh_len;
1546         char *cfg_buf = (char*) (rec + 1);
1547         struct lustre_cfg *lcfg;
1548         int rc = 0;
1549         struct llog_handle *mdt_llh = NULL;
1550         static int got_an_osc_or_mdc = 0;
1551         /* 0: not found any osc/mdc;
1552            1: found osc;
1553            2: found mdc;
1554         */
1555         static int last_step = -1;
1556         int cplen = 0;
1557
1558         ENTRY;
1559
1560         mti = ((struct temp_comp*)data)->comp_mti;
1561         tmti = ((struct temp_comp*)data)->comp_tmti;
1562         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1563         obd = ((struct temp_comp *)data)->comp_obd;
1564         mgs = lu2mgs_dev(obd->obd_lu_dev);
1565         LASSERT(mgs);
1566
1567         if (rec->lrh_type != OBD_CFG_REC) {
1568                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1569                 RETURN(-EINVAL);
1570         }
1571
1572         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1573         if (rc) {
1574                 CERROR("Insane cfg\n");
1575                 RETURN(rc);
1576         }
1577
1578         lcfg = (struct lustre_cfg *)cfg_buf;
1579
1580         if (lcfg->lcfg_command == LCFG_MARKER) {
1581                 struct cfg_marker *marker;
1582                 marker = lustre_cfg_buf(lcfg, 1);
1583                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1584                     (marker->cm_flags & CM_START) &&
1585                      !(marker->cm_flags & CM_SKIP)) {
1586                         got_an_osc_or_mdc = 1;
1587                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1588                                         sizeof(tmti->mti_svname));
1589                         if (cplen >= sizeof(tmti->mti_svname))
1590                                 RETURN(-E2BIG);
1591                         rc = record_start_log(env, mgs, &mdt_llh,
1592                                               mti->mti_svname);
1593                         if (rc)
1594                                 RETURN(rc);
1595                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1596                                            mti->mti_svname, "add osc(copied)");
1597                         record_end_log(env, &mdt_llh);
1598                         last_step = marker->cm_step;
1599                         RETURN(rc);
1600                 }
1601                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1602                     (marker->cm_flags & CM_END) &&
1603                      !(marker->cm_flags & CM_SKIP)) {
1604                         LASSERT(last_step == marker->cm_step);
1605                         last_step = -1;
1606                         got_an_osc_or_mdc = 0;
1607                         memset(tmti, 0, sizeof(*tmti));
1608                         rc = record_start_log(env, mgs, &mdt_llh,
1609                                               mti->mti_svname);
1610                         if (rc)
1611                                 RETURN(rc);
1612                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1613                                            mti->mti_svname, "add osc(copied)");
1614                         record_end_log(env, &mdt_llh);
1615                         RETURN(rc);
1616                 }
1617                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1618                     (marker->cm_flags & CM_START) &&
1619                      !(marker->cm_flags & CM_SKIP)) {
1620                         got_an_osc_or_mdc = 2;
1621                         last_step = marker->cm_step;
1622                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1623                                strlen(marker->cm_tgtname));
1624
1625                         RETURN(rc);
1626                 }
1627                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1628                     (marker->cm_flags & CM_END) &&
1629                      !(marker->cm_flags & CM_SKIP)) {
1630                         LASSERT(last_step == marker->cm_step);
1631                         last_step = -1;
1632                         got_an_osc_or_mdc = 0;
1633                         memset(tmti, 0, sizeof(*tmti));
1634                         RETURN(rc);
1635                 }
1636         }
1637
1638         if (got_an_osc_or_mdc == 0 || last_step < 0)
1639                 RETURN(rc);
1640
1641         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1642                 uint64_t nodenid = lcfg->lcfg_nid;
1643
1644                 if (strlen(tmti->mti_uuid) == 0) {
1645                         /* target uuid not set, this config record is before
1646                          * LCFG_SETUP, this nid is one of target node nid.
1647                          */
1648                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1649                         tmti->mti_nid_count++;
1650                 } else {
1651                         /* failover node nid */
1652                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1653                                        libcfs_nid2str(nodenid));
1654                 }
1655
1656                 RETURN(rc);
1657         }
1658
1659         if (lcfg->lcfg_command == LCFG_SETUP) {
1660                 char *target;
1661
1662                 target = lustre_cfg_string(lcfg, 1);
1663                 memcpy(tmti->mti_uuid, target, strlen(target));
1664                 RETURN(rc);
1665         }
1666
1667         /* ignore client side sptlrpc_conf_log */
1668         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1669                 RETURN(rc);
1670
1671         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1672                 int index;
1673
1674                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1675                         RETURN (-EINVAL);
1676
1677                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1678                        strlen(mti->mti_fsname));
1679                 tmti->mti_stripe_index = index;
1680
1681                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1682                                               mti->mti_stripe_index,
1683                                               mti->mti_svname);
1684                 memset(tmti, 0, sizeof(*tmti));
1685                 RETURN(rc);
1686         }
1687
1688         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1689                 int index;
1690                 char mdt_index[9];
1691                 char *logname, *lovname;
1692
1693                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1694                                              mti->mti_stripe_index);
1695                 if (rc)
1696                         RETURN(rc);
1697                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1698
1699                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1700                         name_destroy(&logname);
1701                         name_destroy(&lovname);
1702                         RETURN(-EINVAL);
1703                 }
1704
1705                 tmti->mti_stripe_index = index;
1706                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1707                                          mdt_index, lovname,
1708                                          LUSTRE_SP_MDT, 0);
1709                 name_destroy(&logname);
1710                 name_destroy(&lovname);
1711                 RETURN(rc);
1712         }
1713         RETURN(rc);
1714 }
1715
1716 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1717 /* stealed from mgs_get_fsdb_from_llog*/
1718 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1719                                               struct mgs_device *mgs,
1720                                               char *client_name,
1721                                               struct temp_comp* comp)
1722 {
1723         struct llog_handle *loghandle;
1724         struct mgs_target_info *tmti;
1725         struct llog_ctxt *ctxt;
1726         int rc;
1727
1728         ENTRY;
1729
1730         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1731         LASSERT(ctxt != NULL);
1732
1733         OBD_ALLOC_PTR(tmti);
1734         if (tmti == NULL)
1735                 GOTO(out_ctxt, rc = -ENOMEM);
1736
1737         comp->comp_tmti = tmti;
1738         comp->comp_obd = mgs->mgs_obd;
1739
1740         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1741                        LLOG_OPEN_EXISTS);
1742         if (rc < 0) {
1743                 if (rc == -ENOENT)
1744                         rc = 0;
1745                 GOTO(out_pop, rc);
1746         }
1747
1748         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1749         if (rc)
1750                 GOTO(out_close, rc);
1751
1752         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1753                                   (void *)comp, NULL, false);
1754         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1755 out_close:
1756         llog_close(env, loghandle);
1757 out_pop:
1758         OBD_FREE_PTR(tmti);
1759 out_ctxt:
1760         llog_ctxt_put(ctxt);
1761         RETURN(rc);
1762 }
1763
1764 /* lmv is the second thing for client logs */
1765 /* copied from mgs_write_log_lov. Please refer to that.  */
1766 static int mgs_write_log_lmv(const struct lu_env *env,
1767                              struct mgs_device *mgs,
1768                              struct fs_db *fsdb,
1769                              struct mgs_target_info *mti,
1770                              char *logname, char *lmvname)
1771 {
1772         struct llog_handle *llh = NULL;
1773         struct lmv_desc *lmvdesc;
1774         char *uuid;
1775         int rc = 0;
1776         ENTRY;
1777
1778         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1779
1780         OBD_ALLOC_PTR(lmvdesc);
1781         if (lmvdesc == NULL)
1782                 RETURN(-ENOMEM);
1783         lmvdesc->ld_active_tgt_count = 0;
1784         lmvdesc->ld_tgt_count = 0;
1785         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1786         uuid = (char *)lmvdesc->ld_uuid.uuid;
1787
1788         rc = record_start_log(env, mgs, &llh, logname);
1789         if (rc)
1790                 GOTO(out_free, rc);
1791         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1792         if (rc)
1793                 GOTO(out_end, rc);
1794         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1795         if (rc)
1796                 GOTO(out_end, rc);
1797         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1798         if (rc)
1799                 GOTO(out_end, rc);
1800         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1801         if (rc)
1802                 GOTO(out_end, rc);
1803 out_end:
1804         record_end_log(env, &llh);
1805 out_free:
1806         OBD_FREE_PTR(lmvdesc);
1807         RETURN(rc);
1808 }
1809
1810 /* lov is the first thing in the mdt and client logs */
1811 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1812                              struct fs_db *fsdb, struct mgs_target_info *mti,
1813                              char *logname, char *lovname)
1814 {
1815         struct llog_handle *llh = NULL;
1816         struct lov_desc *lovdesc;
1817         char *uuid;
1818         int rc = 0;
1819         ENTRY;
1820
1821         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1822
1823         /*
1824         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1825         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1826               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1827         */
1828
1829         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1830         OBD_ALLOC_PTR(lovdesc);
1831         if (lovdesc == NULL)
1832                 RETURN(-ENOMEM);
1833         lovdesc->ld_magic = LOV_DESC_MAGIC;
1834         lovdesc->ld_tgt_count = 0;
1835         /* Defaults.  Can be changed later by lcfg config_param */
1836         lovdesc->ld_default_stripe_count = 1;
1837         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1838         lovdesc->ld_default_stripe_size = 1024 * 1024;
1839         lovdesc->ld_default_stripe_offset = -1;
1840         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1841         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1842         /* can these be the same? */
1843         uuid = (char *)lovdesc->ld_uuid.uuid;
1844
1845         /* This should always be the first entry in a log.
1846         rc = mgs_clear_log(obd, logname); */
1847         rc = record_start_log(env, mgs, &llh, logname);
1848         if (rc)
1849                 GOTO(out_free, rc);
1850         /* FIXME these should be a single journal transaction */
1851         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1852         if (rc)
1853                 GOTO(out_end, rc);
1854         rc = record_attach(env, llh, lovname, "lov", uuid);
1855         if (rc)
1856                 GOTO(out_end, rc);
1857         rc = record_lov_setup(env, llh, lovname, lovdesc);
1858         if (rc)
1859                 GOTO(out_end, rc);
1860         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1861         if (rc)
1862                 GOTO(out_end, rc);
1863         EXIT;
1864 out_end:
1865         record_end_log(env, &llh);
1866 out_free:
1867         OBD_FREE_PTR(lovdesc);
1868         return rc;
1869 }
1870
1871 /* add failnids to open log */
1872 static int mgs_write_log_failnids(const struct lu_env *env,
1873                                   struct mgs_target_info *mti,
1874                                   struct llog_handle *llh,
1875                                   char *cliname)
1876 {
1877         char *failnodeuuid = NULL;
1878         char *ptr = mti->mti_params;
1879         lnet_nid_t nid;
1880         int rc = 0;
1881
1882         /*
1883         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1884         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1885         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1886         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1887         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1888         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1889         */
1890
1891         /*
1892          * Pull failnid info out of params string, which may contain something
1893          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1894          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1895          * etc.  However, convert_hostnames() should have caught those.
1896          */
1897         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1898                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1899                         if (failnodeuuid == NULL) {
1900                                 /* We don't know the failover node name,
1901                                    so just use the first nid as the uuid */
1902                                 rc = name_create(&failnodeuuid,
1903                                                  libcfs_nid2str(nid), "");
1904                                 if (rc)
1905                                         return rc;
1906                         }
1907                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1908                                "client %s\n", libcfs_nid2str(nid),
1909                                failnodeuuid, cliname);
1910                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1911                         /*
1912                          * If *ptr is ':', we have added all NIDs for
1913                          * failnodeuuid.
1914                          */
1915                         if (*ptr == ':') {
1916                                 rc = record_add_conn(env, llh, cliname,
1917                                                      failnodeuuid);
1918                                 name_destroy(&failnodeuuid);
1919                                 failnodeuuid = NULL;
1920                         }
1921                 }
1922                 if (failnodeuuid) {
1923                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1924                         name_destroy(&failnodeuuid);
1925                         failnodeuuid = NULL;
1926                 }
1927         }
1928
1929         return rc;
1930 }
1931
1932 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1933                                     struct mgs_device *mgs,
1934                                     struct fs_db *fsdb,
1935                                     struct mgs_target_info *mti,
1936                                     char *logname, char *lmvname)
1937 {
1938         struct llog_handle *llh = NULL;
1939         char *mdcname = NULL;
1940         char *nodeuuid = NULL;
1941         char *mdcuuid = NULL;
1942         char *lmvuuid = NULL;
1943         char index[6];
1944         int i, rc;
1945         ENTRY;
1946
1947         if (mgs_log_is_empty(env, mgs, logname)) {
1948                 CERROR("log is empty! Logical error\n");
1949                 RETURN(-EINVAL);
1950         }
1951
1952         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1953                mti->mti_svname, logname, lmvname);
1954
1955         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1956         if (rc)
1957                 RETURN(rc);
1958         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1959         if (rc)
1960                 GOTO(out_free, rc);
1961         rc = name_create(&mdcuuid, mdcname, "_UUID");
1962         if (rc)
1963                 GOTO(out_free, rc);
1964         rc = name_create(&lmvuuid, lmvname, "_UUID");
1965         if (rc)
1966                 GOTO(out_free, rc);
1967
1968         rc = record_start_log(env, mgs, &llh, logname);
1969         if (rc)
1970                 GOTO(out_free, rc);
1971         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1972                            "add mdc");
1973         if (rc)
1974                 GOTO(out_end, rc);
1975         for (i = 0; i < mti->mti_nid_count; i++) {
1976                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1977                        libcfs_nid2str(mti->mti_nids[i]));
1978
1979                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1980                 if (rc)
1981                         GOTO(out_end, rc);
1982         }
1983
1984         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1985         if (rc)
1986                 GOTO(out_end, rc);
1987         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1988         if (rc)
1989                 GOTO(out_end, rc);
1990         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1991         if (rc)
1992                 GOTO(out_end, rc);
1993         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1994         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1995                             index, "1");
1996         if (rc)
1997                 GOTO(out_end, rc);
1998         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1999                            "add mdc");
2000         if (rc)
2001                 GOTO(out_end, rc);
2002 out_end:
2003         record_end_log(env, &llh);
2004 out_free:
2005         name_destroy(&lmvuuid);
2006         name_destroy(&mdcuuid);
2007         name_destroy(&mdcname);
2008         name_destroy(&nodeuuid);
2009         RETURN(rc);
2010 }
2011
2012 static inline int name_create_lov(char **lovname, char *mdtname,
2013                                   struct fs_db *fsdb, int index)
2014 {
2015         /* COMPAT_180 */
2016         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2017                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2018         else
2019                 return name_create(lovname, mdtname, "-mdtlov");
2020 }
2021
2022 static int name_create_mdt_and_lov(char **logname, char **lovname,
2023                                    struct fs_db *fsdb, int i)
2024 {
2025         int rc;
2026
2027         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2028         if (rc)
2029                 return rc;
2030         /* COMPAT_180 */
2031         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2032                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2033         else
2034                 rc = name_create(lovname, *logname, "-mdtlov");
2035         if (rc) {
2036                 name_destroy(logname);
2037                 *logname = NULL;
2038         }
2039         return rc;
2040 }
2041
2042 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2043                                       struct fs_db *fsdb, int i)
2044 {
2045         char suffix[16];
2046
2047         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2048                 sprintf(suffix, "-osc");
2049         else
2050                 sprintf(suffix, "-osc-MDT%04x", i);
2051         return name_create(oscname, ostname, suffix);
2052 }
2053
2054 /* add new mdc to already existent MDS */
2055 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2056                                     struct mgs_device *mgs,
2057                                     struct fs_db *fsdb,
2058                                     struct mgs_target_info *mti,
2059                                     int mdt_index, char *logname)
2060 {
2061         struct llog_handle      *llh = NULL;
2062         char    *nodeuuid = NULL;
2063         char    *ospname = NULL;
2064         char    *lovuuid = NULL;
2065         char    *mdtuuid = NULL;
2066         char    *svname = NULL;
2067         char    *mdtname = NULL;
2068         char    *lovname = NULL;
2069         char    index_str[16];
2070         int     i, rc;
2071
2072         ENTRY;
2073         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2074                 CERROR("log is empty! Logical error\n");
2075                 RETURN (-EINVAL);
2076         }
2077
2078         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2079                logname);
2080
2081         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2082         if (rc)
2083                 RETURN(rc);
2084
2085         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2086         if (rc)
2087                 GOTO(out_destory, rc);
2088
2089         rc = name_create(&svname, mdtname, "-osp");
2090         if (rc)
2091                 GOTO(out_destory, rc);
2092
2093         sprintf(index_str, "-MDT%04x", mdt_index);
2094         rc = name_create(&ospname, svname, index_str);
2095         if (rc)
2096                 GOTO(out_destory, rc);
2097
2098         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2099         if (rc)
2100                 GOTO(out_destory, rc);
2101
2102         rc = name_create(&lovuuid, lovname, "_UUID");
2103         if (rc)
2104                 GOTO(out_destory, rc);
2105
2106         rc = name_create(&mdtuuid, mdtname, "_UUID");
2107         if (rc)
2108                 GOTO(out_destory, rc);
2109
2110         rc = record_start_log(env, mgs, &llh, logname);
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2115                            "add osp");
2116         if (rc)
2117                 GOTO(out_destory, rc);
2118
2119         for (i = 0; i < mti->mti_nid_count; i++) {
2120                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2121                        libcfs_nid2str(mti->mti_nids[i]));
2122                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2123                 if (rc)
2124                         GOTO(out_end, rc);
2125         }
2126
2127         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2128         if (rc)
2129                 GOTO(out_end, rc);
2130
2131         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2132                           NULL, NULL);
2133         if (rc)
2134                 GOTO(out_end, rc);
2135
2136         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2137         if (rc)
2138                 GOTO(out_end, rc);
2139
2140         /* Add mdc(osp) to lod */
2141         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2142                  mti->mti_stripe_index);
2143         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2144                          index_str, "1", NULL);
2145         if (rc)
2146                 GOTO(out_end, rc);
2147
2148         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2149         if (rc)
2150                 GOTO(out_end, rc);
2151
2152 out_end:
2153         record_end_log(env, &llh);
2154
2155 out_destory:
2156         name_destroy(&mdtuuid);
2157         name_destroy(&lovuuid);
2158         name_destroy(&lovname);
2159         name_destroy(&ospname);
2160         name_destroy(&svname);
2161         name_destroy(&nodeuuid);
2162         name_destroy(&mdtname);
2163         RETURN(rc);
2164 }
2165
2166 static int mgs_write_log_mdt0(const struct lu_env *env,
2167                               struct mgs_device *mgs,
2168                               struct fs_db *fsdb,
2169                               struct mgs_target_info *mti)
2170 {
2171         char *log = mti->mti_svname;
2172         struct llog_handle *llh = NULL;
2173         char *uuid, *lovname;
2174         char mdt_index[6];
2175         char *ptr = mti->mti_params;
2176         int rc = 0, failout = 0;
2177         ENTRY;
2178
2179         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2180         if (uuid == NULL)
2181                 RETURN(-ENOMEM);
2182
2183         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2184                 failout = (strncmp(ptr, "failout", 7) == 0);
2185
2186         rc = name_create(&lovname, log, "-mdtlov");
2187         if (rc)
2188                 GOTO(out_free, rc);
2189         if (mgs_log_is_empty(env, mgs, log)) {
2190                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2191                 if (rc)
2192                         GOTO(out_lod, rc);
2193         }
2194
2195         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2196
2197         rc = record_start_log(env, mgs, &llh, log);
2198         if (rc)
2199                 GOTO(out_lod, rc);
2200
2201         /* add MDT itself */
2202
2203         /* FIXME this whole fn should be a single journal transaction */
2204         sprintf(uuid, "%s_UUID", log);
2205         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2206         if (rc)
2207                 GOTO(out_lod, rc);
2208         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2209         if (rc)
2210                 GOTO(out_end, rc);
2211         rc = record_mount_opt(env, llh, log, lovname, NULL);
2212         if (rc)
2213                 GOTO(out_end, rc);
2214         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2215                         failout ? "n" : "f");
2216         if (rc)
2217                 GOTO(out_end, rc);
2218         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2219         if (rc)
2220                 GOTO(out_end, rc);
2221 out_end:
2222         record_end_log(env, &llh);
2223 out_lod:
2224         name_destroy(&lovname);
2225 out_free:
2226         OBD_FREE(uuid, sizeof(struct obd_uuid));
2227         RETURN(rc);
2228 }
2229
2230 /* envelope method for all layers log */
2231 static int mgs_write_log_mdt(const struct lu_env *env,
2232                              struct mgs_device *mgs,
2233                              struct fs_db *fsdb,
2234                              struct mgs_target_info *mti)
2235 {
2236         struct mgs_thread_info *mgi = mgs_env_info(env);
2237         struct llog_handle *llh = NULL;
2238         char *cliname;
2239         int rc, i = 0;
2240         ENTRY;
2241
2242         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2243
2244         if (mti->mti_uuid[0] == '\0') {
2245                 /* Make up our own uuid */
2246                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2247                          "%s_UUID", mti->mti_svname);
2248         }
2249
2250         /* add mdt */
2251         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2252         if (rc)
2253                 RETURN(rc);
2254         /* Append the mdt info to the client log */
2255         rc = name_create(&cliname, mti->mti_fsname, "-client");
2256         if (rc)
2257                 RETURN(rc);
2258
2259         if (mgs_log_is_empty(env, mgs, cliname)) {
2260                 /* Start client log */
2261                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2262                                        fsdb->fsdb_clilov);
2263                 if (rc)
2264                         GOTO(out_free, rc);
2265                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2266                                        fsdb->fsdb_clilmv);
2267                 if (rc)
2268                         GOTO(out_free, rc);
2269         }
2270
2271         /*
2272         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2273         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2274         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2275         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2276         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2277         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2278         */
2279
2280                 /* copy client info about lov/lmv */
2281                 mgi->mgi_comp.comp_mti = mti;
2282                 mgi->mgi_comp.comp_fsdb = fsdb;
2283
2284                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2285                                                         &mgi->mgi_comp);
2286                 if (rc)
2287                         GOTO(out_free, rc);
2288                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2289                                               fsdb->fsdb_clilmv);
2290                 if (rc)
2291                         GOTO(out_free, rc);
2292
2293                 /* add mountopts */
2294                 rc = record_start_log(env, mgs, &llh, cliname);
2295                 if (rc)
2296                         GOTO(out_free, rc);
2297
2298                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2299                                    "mount opts");
2300                 if (rc)
2301                         GOTO(out_end, rc);
2302                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2303                                       fsdb->fsdb_clilmv);
2304                 if (rc)
2305                         GOTO(out_end, rc);
2306                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2307                                    "mount opts");
2308
2309         if (rc)
2310                 GOTO(out_end, rc);
2311
2312         /* for_all_existing_mdt except current one */
2313         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2314                 if (i !=  mti->mti_stripe_index &&
2315                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2316                         char *logname;
2317
2318                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2319                         if (rc)
2320                                 GOTO(out_end, rc);
2321
2322                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2323                                                       i, logname);
2324                         name_destroy(&logname);
2325                         if (rc)
2326                                 GOTO(out_end, rc);
2327                 }
2328         }
2329 out_end:
2330         record_end_log(env, &llh);
2331 out_free:
2332         name_destroy(&cliname);
2333         RETURN(rc);
2334 }
2335
2336 /* Add the ost info to the client/mdt lov */
2337 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2338                                     struct mgs_device *mgs, struct fs_db *fsdb,
2339                                     struct mgs_target_info *mti,
2340                                     char *logname, char *suffix, char *lovname,
2341                                     enum lustre_sec_part sec_part, int flags)
2342 {
2343         struct llog_handle *llh = NULL;
2344         char *nodeuuid = NULL;
2345         char *oscname = NULL;
2346         char *oscuuid = NULL;
2347         char *lovuuid = NULL;
2348         char *svname = NULL;
2349         char index[6];
2350         int i, rc;
2351
2352         ENTRY;
2353         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2354                mti->mti_svname, logname);
2355
2356         if (mgs_log_is_empty(env, mgs, logname)) {
2357                 CERROR("log is empty! Logical error\n");
2358                 RETURN (-EINVAL);
2359         }
2360
2361         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2362         if (rc)
2363                 RETURN(rc);
2364         rc = name_create(&svname, mti->mti_svname, "-osc");
2365         if (rc)
2366                 GOTO(out_free, rc);
2367
2368         /* for the system upgraded from old 1.8, keep using the old osc naming
2369          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2370         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2371                 rc = name_create(&oscname, svname, "");
2372         else
2373                 rc = name_create(&oscname, svname, suffix);
2374         if (rc)
2375                 GOTO(out_free, rc);
2376
2377         rc = name_create(&oscuuid, oscname, "_UUID");
2378         if (rc)
2379                 GOTO(out_free, rc);
2380         rc = name_create(&lovuuid, lovname, "_UUID");
2381         if (rc)
2382                 GOTO(out_free, rc);
2383
2384
2385         /*
2386         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2387         multihomed (#4)
2388         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2389         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2390         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2391         failover (#6,7)
2392         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2393         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2394         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2395         */
2396
2397         rc = record_start_log(env, mgs, &llh, logname);
2398         if (rc)
2399                 GOTO(out_free, rc);
2400
2401         /* FIXME these should be a single journal transaction */
2402         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2403                            "add osc");
2404         if (rc)
2405                 GOTO(out_end, rc);
2406
2407         /* NB: don't change record order, because upon MDT steal OSC config
2408          * from client, it treats all nids before LCFG_SETUP as target nids
2409          * (multiple interfaces), while nids after as failover node nids.
2410          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2411          */
2412         for (i = 0; i < mti->mti_nid_count; i++) {
2413                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2414                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2415                 if (rc)
2416                         GOTO(out_end, rc);
2417         }
2418         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2419         if (rc)
2420                 GOTO(out_end, rc);
2421         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2422         if (rc)
2423                 GOTO(out_end, rc);
2424         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2425         if (rc)
2426                 GOTO(out_end, rc);
2427
2428         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2429
2430         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2431         if (rc)
2432                 GOTO(out_end, rc);
2433         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2434                            "add osc");
2435         if (rc)
2436                 GOTO(out_end, rc);
2437 out_end:
2438         record_end_log(env, &llh);
2439 out_free:
2440         name_destroy(&lovuuid);
2441         name_destroy(&oscuuid);
2442         name_destroy(&oscname);
2443         name_destroy(&svname);
2444         name_destroy(&nodeuuid);
2445         RETURN(rc);
2446 }
2447
2448 static int mgs_write_log_ost(const struct lu_env *env,
2449                              struct mgs_device *mgs, struct fs_db *fsdb,
2450                              struct mgs_target_info *mti)
2451 {
2452         struct llog_handle *llh = NULL;
2453         char *logname, *lovname;
2454         char *ptr = mti->mti_params;
2455         int rc, flags = 0, failout = 0, i;
2456         ENTRY;
2457
2458         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2459
2460         /* The ost startup log */
2461
2462         /* If the ost log already exists, that means that someone reformatted
2463            the ost and it called target_add again. */
2464         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2465                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2466                                    "exists, yet the server claims it never "
2467                                    "registered. It may have been reformatted, "
2468                                    "or the index changed. writeconf the MDT to "
2469                                    "regenerate all logs.\n", mti->mti_svname);
2470                 RETURN(-EALREADY);
2471         }
2472
2473         /*
2474         attach obdfilter ost1 ost1_UUID
2475         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2476         */
2477         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2478                 failout = (strncmp(ptr, "failout", 7) == 0);
2479         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2480         if (rc)
2481                 RETURN(rc);
2482         /* FIXME these should be a single journal transaction */
2483         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2484         if (rc)
2485                 GOTO(out_end, rc);
2486         if (*mti->mti_uuid == '\0')
2487                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2488                          "%s_UUID", mti->mti_svname);
2489         rc = record_attach(env, llh, mti->mti_svname,
2490                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2491         if (rc)
2492                 GOTO(out_end, rc);
2493         rc = record_setup(env, llh, mti->mti_svname,
2494                           "dev"/*ignored*/, "type"/*ignored*/,
2495                           failout ? "n" : "f", 0/*options*/);
2496         if (rc)
2497                 GOTO(out_end, rc);
2498         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2499         if (rc)
2500                 GOTO(out_end, rc);
2501 out_end:
2502         record_end_log(env, &llh);
2503         if (rc)
2504                 RETURN(rc);
2505         /* We also have to update the other logs where this osc is part of
2506            the lov */
2507
2508         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2509                 /* If we're upgrading, the old mdt log already has our
2510                    entry. Let's do a fake one for fun. */
2511                 /* Note that we can't add any new failnids, since we don't
2512                    know the old osc names. */
2513                 flags = CM_SKIP | CM_UPGRADE146;
2514
2515         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2516                 /* If the update flag isn't set, don't update client/mdt
2517                    logs. */
2518                 flags |= CM_SKIP;
2519                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2520                               "the MDT first to regenerate it.\n",
2521                               mti->mti_svname);
2522         }
2523
2524         /* Add ost to all MDT lov defs */
2525         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2526                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2527                         char mdt_index[9];
2528
2529                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2530                                                      i);
2531                         if (rc)
2532                                 RETURN(rc);
2533                         sprintf(mdt_index, "-MDT%04x", i);
2534                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2535                                                       logname, mdt_index,
2536                                                       lovname, LUSTRE_SP_MDT,
2537                                                       flags);
2538                         name_destroy(&logname);
2539                         name_destroy(&lovname);
2540                         if (rc)
2541                                 RETURN(rc);
2542                 }
2543         }
2544
2545         /* Append ost info to the client log */
2546         rc = name_create(&logname, mti->mti_fsname, "-client");
2547         if (rc)
2548                 RETURN(rc);
2549         if (mgs_log_is_empty(env, mgs, logname)) {
2550                 /* Start client log */
2551                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2552                                        fsdb->fsdb_clilov);
2553                 if (rc)
2554                         GOTO(out_free, rc);
2555                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2556                                        fsdb->fsdb_clilmv);
2557                 if (rc)
2558                         GOTO(out_free, rc);
2559         }
2560         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2561                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2562 out_free:
2563         name_destroy(&logname);
2564         RETURN(rc);
2565 }
2566
2567 static __inline__ int mgs_param_empty(char *ptr)
2568 {
2569         char *tmp;
2570
2571         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2572                 return 1;
2573         return 0;
2574 }
2575
2576 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2577                                           struct mgs_device *mgs,
2578                                           struct fs_db *fsdb,
2579                                           struct mgs_target_info *mti,
2580                                           char *logname, char *cliname)
2581 {
2582         int rc;
2583         struct llog_handle *llh = NULL;
2584
2585         if (mgs_param_empty(mti->mti_params)) {
2586                 /* Remove _all_ failnids */
2587                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2588                                 mti->mti_svname, "add failnid", CM_SKIP);
2589                 return rc < 0 ? rc : 0;
2590         }
2591
2592         /* Otherwise failover nids are additive */
2593         rc = record_start_log(env, mgs, &llh, logname);
2594         if (rc)
2595                 return rc;
2596                 /* FIXME this should be a single journal transaction */
2597         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2598                            "add failnid");
2599         if (rc)
2600                 goto out_end;
2601         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2602         if (rc)
2603                 goto out_end;
2604         rc = record_marker(env, llh, fsdb, CM_END,
2605                            mti->mti_svname, "add failnid");
2606 out_end:
2607         record_end_log(env, &llh);
2608         return rc;
2609 }
2610
2611
2612 /* Add additional failnids to an existing log.
2613    The mdc/osc must have been added to logs first */
2614 /* tcp nids must be in dotted-quad ascii -
2615    we can't resolve hostnames from the kernel. */
2616 static int mgs_write_log_add_failnid(const struct lu_env *env,
2617                                      struct mgs_device *mgs,
2618                                      struct fs_db *fsdb,
2619                                      struct mgs_target_info *mti)
2620 {
2621         char *logname, *cliname;
2622         int rc;
2623         ENTRY;
2624
2625         /* FIXME we currently can't erase the failnids
2626          * given when a target first registers, since they aren't part of
2627          * an "add uuid" stanza */
2628
2629         /* Verify that we know about this target */
2630         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2631                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2632                                    "yet. It must be started before failnids "
2633                                    "can be added.\n", mti->mti_svname);
2634                 RETURN(-ENOENT);
2635         }
2636
2637         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2638         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2639                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2640         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2641                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2642         } else {
2643                 RETURN(-EINVAL);
2644         }
2645         if (rc)
2646                 RETURN(rc);
2647         /* Add failover nids to the client log */
2648         rc = name_create(&logname, mti->mti_fsname, "-client");
2649         if (rc) {
2650                 name_destroy(&cliname);
2651                 RETURN(rc);
2652         }
2653         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2654         name_destroy(&logname);
2655         name_destroy(&cliname);
2656         if (rc)
2657                 RETURN(rc);
2658
2659         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2660                 /* Add OST failover nids to the MDT logs as well */
2661                 int i;
2662
2663                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2664                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2665                                 continue;
2666                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2667                         if (rc)
2668                                 RETURN(rc);
2669                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2670                                                  fsdb, i);
2671                         if (rc) {
2672                                 name_destroy(&logname);
2673                                 RETURN(rc);
2674                         }
2675                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2676                                                             mti, logname,
2677                                                             cliname);
2678                         name_destroy(&cliname);
2679                         name_destroy(&logname);
2680                         if (rc)
2681                                 RETURN(rc);
2682                 }
2683         }
2684
2685         RETURN(rc);
2686 }
2687
2688 static int mgs_wlp_lcfg(const struct lu_env *env,
2689                         struct mgs_device *mgs, struct fs_db *fsdb,
2690                         struct mgs_target_info *mti,
2691                         char *logname, struct lustre_cfg_bufs *bufs,
2692                         char *tgtname, char *ptr)
2693 {
2694         char comment[MTI_NAME_MAXLEN];
2695         char *tmp;
2696         struct lustre_cfg *lcfg;
2697         int rc, del;
2698
2699         /* Erase any old settings of this same parameter */
2700         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2701         comment[MTI_NAME_MAXLEN - 1] = 0;
2702         /* But don't try to match the value. */
2703         if ((tmp = strchr(comment, '=')))
2704             *tmp = 0;
2705         /* FIXME we should skip settings that are the same as old values */
2706         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2707         if (rc < 0)
2708                 return rc;
2709         del = mgs_param_empty(ptr);
2710
2711         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2712                       "Sett" : "Modify", tgtname, comment, logname);
2713         if (del)
2714                 return rc;
2715
2716         lustre_cfg_bufs_reset(bufs, tgtname);
2717         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2718         if (mti->mti_flags & LDD_F_PARAM2)
2719                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2720
2721         lcfg = lustre_cfg_new((mti->mti_flags & LDD_F_PARAM2) ?
2722                               LCFG_SET_PARAM : LCFG_PARAM, bufs);
2723
2724         if (!lcfg)
2725                 return -ENOMEM;
2726         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2727         lustre_cfg_free(lcfg);
2728         return rc;
2729 }
2730
2731 static int mgs_write_log_param2(const struct lu_env *env,
2732                                 struct mgs_device *mgs,
2733                                 struct fs_db *fsdb,
2734                                 struct mgs_target_info *mti, char *ptr)
2735 {
2736         struct lustre_cfg_bufs  bufs;
2737         int                     rc = 0;
2738         ENTRY;
2739
2740         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2741         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2742                           mti->mti_svname, ptr);
2743
2744         RETURN(rc);
2745 }
2746
2747 /* write global variable settings into log */
2748 static int mgs_write_log_sys(const struct lu_env *env,
2749                              struct mgs_device *mgs, struct fs_db *fsdb,
2750                              struct mgs_target_info *mti, char *sys, char *ptr)
2751 {
2752         struct mgs_thread_info *mgi = mgs_env_info(env);
2753         struct lustre_cfg *lcfg;
2754         char *tmp, sep;
2755         int rc, cmd, convert = 1;
2756
2757         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2758                 cmd = LCFG_SET_TIMEOUT;
2759         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2760                 cmd = LCFG_SET_LDLM_TIMEOUT;
2761         /* Check for known params here so we can return error to lctl */
2762         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2763                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2764                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2765                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2766                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2767                 cmd = LCFG_PARAM;
2768         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2769                 convert = 0; /* Don't convert string value to integer */
2770                 cmd = LCFG_PARAM;
2771         } else {
2772                 return -EINVAL;
2773         }
2774
2775         if (mgs_param_empty(ptr))
2776                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2777         else
2778                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2779
2780         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2781         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2782         if (!convert && *tmp != '\0')
2783                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2784         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2785         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2786         /* truncate the comment to the parameter name */
2787         ptr = tmp - 1;
2788         sep = *ptr;
2789         *ptr = '\0';
2790         /* modify all servers and clients */
2791         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2792                                       *tmp == '\0' ? NULL : lcfg,
2793                                       mti->mti_fsname, sys, 0);
2794         if (rc == 0 && *tmp != '\0') {
2795                 switch (cmd) {
2796                 case LCFG_SET_TIMEOUT:
2797                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2798                                 class_process_config(lcfg);
2799                         break;
2800                 case LCFG_SET_LDLM_TIMEOUT:
2801                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2802                                 class_process_config(lcfg);
2803                         break;
2804                 default:
2805                         break;
2806                 }
2807         }
2808         *ptr = sep;
2809         lustre_cfg_free(lcfg);
2810         return rc;
2811 }
2812
2813 /* write quota settings into log */
2814 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2815                                struct fs_db *fsdb, struct mgs_target_info *mti,
2816                                char *quota, char *ptr)
2817 {
2818         struct mgs_thread_info  *mgi = mgs_env_info(env);
2819         struct lustre_cfg       *lcfg;
2820         char                    *tmp;
2821         char                     sep;
2822         int                      rc, cmd = LCFG_PARAM;
2823
2824         /* support only 'meta' and 'data' pools so far */
2825         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2826             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2827                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2828                        "& quota.ost are)\n", ptr);
2829                 return -EINVAL;
2830         }
2831
2832         if (*tmp == '\0') {
2833                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2834         } else {
2835                 CDEBUG(D_MGS, "global '%s'\n", quota);
2836
2837                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2838                     strcmp(tmp, "none") != 0) {
2839                         CERROR("enable option(%s) isn't supported\n", tmp);
2840                         return -EINVAL;
2841                 }
2842         }
2843
2844         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2845         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2846         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2847         /* truncate the comment to the parameter name */
2848         ptr = tmp - 1;
2849         sep = *ptr;
2850         *ptr = '\0';
2851
2852         /* XXX we duplicated quota enable information in all server
2853          *     config logs, it should be moved to a separate config
2854          *     log once we cleanup the config log for global param. */
2855         /* modify all servers */
2856         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2857                                       *tmp == '\0' ? NULL : lcfg,
2858                                       mti->mti_fsname, quota, 1);
2859         *ptr = sep;
2860         lustre_cfg_free(lcfg);
2861         return rc < 0 ? rc : 0;
2862 }
2863
2864 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2865                                    struct mgs_device *mgs,
2866                                    struct fs_db *fsdb,
2867                                    struct mgs_target_info *mti,
2868                                    char *param)
2869 {
2870         struct mgs_thread_info *mgi = mgs_env_info(env);
2871         struct llog_handle     *llh = NULL;
2872         char                   *logname;
2873         char                   *comment, *ptr;
2874         struct lustre_cfg      *lcfg;
2875         int                     rc, len;
2876         ENTRY;
2877
2878         /* get comment */
2879         ptr = strchr(param, '=');
2880         LASSERT(ptr);
2881         len = ptr - param;
2882
2883         OBD_ALLOC(comment, len + 1);
2884         if (comment == NULL)
2885                 RETURN(-ENOMEM);
2886         strncpy(comment, param, len);
2887         comment[len] = '\0';
2888
2889         /* prepare lcfg */
2890         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2891         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2892         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2893         if (lcfg == NULL)
2894                 GOTO(out_comment, rc = -ENOMEM);
2895
2896         /* construct log name */
2897         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2898         if (rc)
2899                 GOTO(out_lcfg, rc);
2900
2901         if (mgs_log_is_empty(env, mgs, logname)) {
2902                 rc = record_start_log(env, mgs, &llh, logname);
2903                 if (rc)
2904                         GOTO(out, rc);
2905                 record_end_log(env, &llh);
2906         }
2907
2908         /* obsolete old one */
2909         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2910                         comment, CM_SKIP);
2911         if (rc < 0)
2912                 GOTO(out, rc);
2913         /* write the new one */
2914         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2915                                   mti->mti_svname, comment);
2916         if (rc)
2917                 CERROR("err %d writing log %s\n", rc, logname);
2918 out:
2919         name_destroy(&logname);
2920 out_lcfg:
2921         lustre_cfg_free(lcfg);
2922 out_comment:
2923         OBD_FREE(comment, len + 1);
2924         RETURN(rc);
2925 }
2926
2927 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2928                                         char *param)
2929 {
2930         char    *ptr;
2931
2932         /* disable the adjustable udesc parameter for now, i.e. use default
2933          * setting that client always ship udesc to MDT if possible. to enable
2934          * it simply remove the following line */
2935         goto error_out;
2936
2937         ptr = strchr(param, '=');
2938         if (ptr == NULL)
2939                 goto error_out;
2940         *ptr++ = '\0';
2941
2942         if (strcmp(param, PARAM_SRPC_UDESC))
2943                 goto error_out;
2944
2945         if (strcmp(ptr, "yes") == 0) {
2946                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2947                 CWARN("Enable user descriptor shipping from client to MDT\n");
2948         } else if (strcmp(ptr, "no") == 0) {
2949                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2950                 CWARN("Disable user descriptor shipping from client to MDT\n");
2951         } else {
2952                 *(ptr - 1) = '=';
2953                 goto error_out;
2954         }
2955         return 0;
2956
2957 error_out:
2958         CERROR("Invalid param: %s\n", param);
2959         return -EINVAL;
2960 }
2961
2962 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2963                                   const char *svname,
2964                                   char *param)
2965 {
2966         struct sptlrpc_rule      rule;
2967         struct sptlrpc_rule_set *rset;
2968         int                      rc;
2969         ENTRY;
2970
2971         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2972                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2973                 RETURN(-EINVAL);
2974         }
2975
2976         if (strncmp(param, PARAM_SRPC_UDESC,
2977                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2978                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2979         }
2980
2981         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2982                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2983                 RETURN(-EINVAL);
2984         }
2985
2986         param += sizeof(PARAM_SRPC_FLVR) - 1;
2987
2988         rc = sptlrpc_parse_rule(param, &rule);
2989         if (rc)
2990                 RETURN(rc);
2991
2992         /* mgs rules implies must be mgc->mgs */
2993         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2994                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2995                      rule.sr_from != LUSTRE_SP_ANY) ||
2996                     (rule.sr_to != LUSTRE_SP_MGS &&
2997                      rule.sr_to != LUSTRE_SP_ANY))
2998                         RETURN(-EINVAL);
2999         }
3000
3001         /* preapre room for this coming rule. svcname format should be:
3002          * - fsname: general rule
3003          * - fsname-tgtname: target-specific rule
3004          */
3005         if (strchr(svname, '-')) {
3006                 struct mgs_tgt_srpc_conf *tgtconf;
3007                 int                       found = 0;
3008
3009                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3010                      tgtconf = tgtconf->mtsc_next) {
3011                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3012                                 found = 1;
3013                                 break;
3014                         }
3015                 }
3016
3017                 if (!found) {
3018                         int name_len;
3019
3020                         OBD_ALLOC_PTR(tgtconf);
3021                         if (tgtconf == NULL)
3022                                 RETURN(-ENOMEM);
3023
3024                         name_len = strlen(svname);
3025
3026                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3027                         if (tgtconf->mtsc_tgt == NULL) {
3028                                 OBD_FREE_PTR(tgtconf);
3029                                 RETURN(-ENOMEM);
3030                         }
3031                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3032
3033                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3034                         fsdb->fsdb_srpc_tgt = tgtconf;
3035                 }
3036
3037                 rset = &tgtconf->mtsc_rset;
3038         } else {
3039                 rset = &fsdb->fsdb_srpc_gen;
3040         }
3041
3042         rc = sptlrpc_rule_set_merge(rset, &rule);
3043
3044         RETURN(rc);
3045 }
3046
3047 static int mgs_srpc_set_param(const struct lu_env *env,
3048                               struct mgs_device *mgs,
3049                               struct fs_db *fsdb,
3050                               struct mgs_target_info *mti,
3051                               char *param)
3052 {
3053         char                   *copy;
3054         int                     rc, copy_size;
3055         ENTRY;
3056
3057 #ifndef HAVE_GSS
3058         RETURN(-EINVAL);
3059 #endif
3060         /* keep a copy of original param, which could be destroied
3061          * during parsing */
3062         copy_size = strlen(param) + 1;
3063         OBD_ALLOC(copy, copy_size);
3064         if (copy == NULL)
3065                 return -ENOMEM;
3066         memcpy(copy, param, copy_size);
3067
3068         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3069         if (rc)
3070                 goto out_free;
3071
3072         /* previous steps guaranteed the syntax is correct */
3073         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3074         if (rc)
3075                 goto out_free;
3076
3077         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3078                 /*
3079                  * for mgs rules, make them effective immediately.
3080                  */
3081                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3082                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3083                                                  &fsdb->fsdb_srpc_gen);
3084         }
3085
3086 out_free:
3087         OBD_FREE(copy, copy_size);
3088         RETURN(rc);
3089 }
3090
3091 struct mgs_srpc_read_data {
3092         struct fs_db   *msrd_fsdb;
3093         int             msrd_skip;
3094 };
3095
3096 static int mgs_srpc_read_handler(const struct lu_env *env,
3097                                  struct llog_handle *llh,
3098                                  struct llog_rec_hdr *rec, void *data)
3099 {
3100         struct mgs_srpc_read_data *msrd = data;
3101         struct cfg_marker         *marker;
3102         struct lustre_cfg         *lcfg = REC_DATA(rec);
3103         char                      *svname, *param;
3104         int                        cfg_len, rc;
3105         ENTRY;
3106
3107         if (rec->lrh_type != OBD_CFG_REC) {
3108                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3109                 RETURN(-EINVAL);
3110         }
3111
3112         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3113                   sizeof(struct llog_rec_tail);
3114
3115         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3116         if (rc) {
3117                 CERROR("Insane cfg\n");
3118                 RETURN(rc);
3119         }
3120
3121         if (lcfg->lcfg_command == LCFG_MARKER) {
3122                 marker = lustre_cfg_buf(lcfg, 1);
3123
3124                 if (marker->cm_flags & CM_START &&
3125                     marker->cm_flags & CM_SKIP)
3126                         msrd->msrd_skip = 1;
3127                 if (marker->cm_flags & CM_END)
3128                         msrd->msrd_skip = 0;
3129
3130                 RETURN(0);
3131         }
3132
3133         if (msrd->msrd_skip)
3134                 RETURN(0);
3135
3136         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3137                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3138                 RETURN(0);
3139         }
3140
3141         svname = lustre_cfg_string(lcfg, 0);
3142         if (svname == NULL) {
3143                 CERROR("svname is empty\n");
3144                 RETURN(0);
3145         }
3146
3147         param = lustre_cfg_string(lcfg, 1);
3148         if (param == NULL) {
3149                 CERROR("param is empty\n");
3150                 RETURN(0);
3151         }
3152
3153         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3154         if (rc)
3155                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3156
3157         RETURN(0);
3158 }
3159
3160 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3161                                 struct mgs_device *mgs,
3162                                 struct fs_db *fsdb)
3163 {
3164         struct llog_handle        *llh = NULL;
3165         struct llog_ctxt          *ctxt;
3166         char                      *logname;
3167         struct mgs_srpc_read_data  msrd;
3168         int                        rc;
3169         ENTRY;
3170
3171         /* construct log name */
3172         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3173         if (rc)
3174                 RETURN(rc);
3175
3176         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3177         LASSERT(ctxt != NULL);
3178
3179         if (mgs_log_is_empty(env, mgs, logname))
3180                 GOTO(out, rc = 0);
3181
3182         rc = llog_open(env, ctxt, &llh, NULL, logname,
3183                        LLOG_OPEN_EXISTS);
3184         if (rc < 0) {
3185                 if (rc == -ENOENT)
3186                         rc = 0;
3187                 GOTO(out, rc);
3188         }
3189
3190         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3191         if (rc)
3192                 GOTO(out_close, rc);
3193
3194         if (llog_get_size(llh) <= 1)
3195                 GOTO(out_close, rc = 0);
3196
3197         msrd.msrd_fsdb = fsdb;
3198         msrd.msrd_skip = 0;
3199
3200         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3201                           NULL);
3202
3203 out_close:
3204         llog_close(env, llh);
3205 out:
3206         llog_ctxt_put(ctxt);
3207         name_destroy(&logname);
3208
3209         if (rc)
3210                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3211         RETURN(rc);
3212 }
3213
3214 /* Permanent settings of all parameters by writing into the appropriate
3215  * configuration logs.
3216  * A parameter with null value ("<param>='\0'") means to erase it out of
3217  * the logs.
3218  */
3219 static int mgs_write_log_param(const struct lu_env *env,
3220                                struct mgs_device *mgs, struct fs_db *fsdb,
3221                                struct mgs_target_info *mti, char *ptr)
3222 {
3223         struct mgs_thread_info *mgi = mgs_env_info(env);
3224         char *logname;
3225         char *tmp;
3226         int rc = 0, rc2 = 0;
3227         ENTRY;
3228
3229         /* For various parameter settings, we have to figure out which logs
3230            care about them (e.g. both mdt and client for lov settings) */
3231         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3232
3233         /* The params are stored in MOUNT_DATA_FILE and modified via
3234            tunefs.lustre, or set using lctl conf_param */
3235
3236         /* Processed in lustre_start_mgc */
3237         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3238                 GOTO(end, rc);
3239
3240         /* Processed in ost/mdt */
3241         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3242                 GOTO(end, rc);
3243
3244         /* Processed in mgs_write_log_ost */
3245         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3246                 if (mti->mti_flags & LDD_F_PARAM) {
3247                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3248                                            "changed with tunefs.lustre"
3249                                            "and --writeconf\n", ptr);
3250                         rc = -EPERM;
3251                 }
3252                 GOTO(end, rc);
3253         }
3254
3255         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3256                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3257                 GOTO(end, rc);
3258         }
3259
3260         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3261                 /* Add a failover nidlist */
3262                 rc = 0;
3263                 /* We already processed failovers params for new
3264                    targets in mgs_write_log_target */
3265                 if (mti->mti_flags & LDD_F_PARAM) {
3266                         CDEBUG(D_MGS, "Adding failnode\n");
3267                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3268                 }
3269                 GOTO(end, rc);
3270         }
3271
3272         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3273                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3274                 GOTO(end, rc);
3275         }
3276
3277         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3278                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3279                 GOTO(end, rc);
3280         }
3281
3282         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3283                 /* active=0 means off, anything else means on */
3284                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3285                 int i;
3286
3287                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3288                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3289                                            "be (de)activated.\n",
3290                                            mti->mti_svname);
3291                         GOTO(end, rc = -EINVAL);
3292                 }
3293                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3294                               flag ? "de": "re", mti->mti_svname);
3295                 /* Modify clilov */
3296                 rc = name_create(&logname, mti->mti_fsname, "-client");
3297                 if (rc)
3298                         GOTO(end, rc);
3299                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3300                                 mti->mti_svname, "add osc", flag);
3301                 name_destroy(&logname);
3302                 if (rc)
3303                         goto active_err;
3304                 /* Modify mdtlov */
3305                 /* Add to all MDT logs for CMD */
3306                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3307                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3308                                 continue;
3309                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3310                         if (rc)
3311                                 GOTO(end, rc);
3312                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3313                                         mti->mti_svname, "add osc", flag);
3314                         name_destroy(&logname);
3315                         if (rc)
3316                                 goto active_err;
3317                 }
3318         active_err:
3319                 if (rc) {
3320                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3321                                            "log (%d). No permanent "
3322                                            "changes were made to the "
3323                                            "config log.\n",
3324                                            mti->mti_svname, rc);
3325                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3326                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3327                                                    " because the log"
3328                                                    "is in the old 1.4"
3329                                                    "style. Consider "
3330                                                    " --writeconf to "
3331                                                    "update the logs.\n");
3332                         GOTO(end, rc);
3333                 }
3334                 /* Fall through to osc proc for deactivating live OSC
3335                    on running MDT / clients. */
3336         }
3337         /* Below here, let obd's XXX_process_config methods handle it */
3338
3339         /* All lov. in proc */
3340         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3341                 char *mdtlovname;
3342
3343                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3344                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3345                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3346                                            "set on the MDT, not %s. "
3347                                            "Ignoring.\n",
3348                                            mti->mti_svname);
3349                         GOTO(end, rc = 0);
3350                 }
3351
3352                 /* Modify mdtlov */
3353                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3354                         GOTO(end, rc = -ENODEV);
3355
3356                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3357                                              mti->mti_stripe_index);
3358                 if (rc)
3359                         GOTO(end, rc);
3360                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3361                                   &mgi->mgi_bufs, mdtlovname, ptr);
3362                 name_destroy(&logname);
3363                 name_destroy(&mdtlovname);
3364                 if (rc)
3365                         GOTO(end, rc);
3366
3367                 /* Modify clilov */
3368                 rc = name_create(&logname, mti->mti_fsname, "-client");
3369                 if (rc)
3370                         GOTO(end, rc);
3371                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3372                                   fsdb->fsdb_clilov, ptr);
3373                 name_destroy(&logname);
3374                 GOTO(end, rc);
3375         }
3376
3377         /* All osc., mdc., llite. params in proc */
3378         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3379             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3380             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3381                 char *cname;
3382
3383                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3384                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3385                                            " cannot be modified. Consider"
3386                                            " updating the configuration with"
3387                                            " --writeconf\n",
3388                                            mti->mti_svname);
3389                         GOTO(end, rc = -EINVAL);
3390                 }
3391                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3392                         rc = name_create(&cname, mti->mti_fsname, "-client");
3393                         /* Add the client type to match the obdname in
3394                            class_config_llog_handler */
3395                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3396                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3397                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3398                         rc = name_create(&cname, mti->mti_svname, "-osc");
3399                 } else {
3400                         GOTO(end, rc = -EINVAL);
3401                 }
3402                 if (rc)
3403                         GOTO(end, rc);
3404
3405                 /* Forbid direct update of llite root squash parameters.
3406                  * These parameters are indirectly set via the MDT settings.
3407                  * See (LU-1778) */
3408                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3409                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3410                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3411                         LCONSOLE_ERROR("%s: root squash parameters can only "
3412                                 "be updated through MDT component\n",
3413                                 mti->mti_fsname);
3414                         name_destroy(&cname);
3415                         GOTO(end, rc = -EINVAL);
3416                 }
3417
3418                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3419
3420                 /* Modify client */
3421                 rc = name_create(&logname, mti->mti_fsname, "-client");
3422                 if (rc) {
3423                         name_destroy(&cname);
3424                         GOTO(end, rc);
3425                 }
3426                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3427                                   cname, ptr);
3428
3429                 /* osc params affect the MDT as well */
3430                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3431                         int i;
3432
3433                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3434                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3435                                         continue;
3436                                 name_destroy(&cname);
3437                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3438                                                          fsdb, i);
3439                                 name_destroy(&logname);
3440                                 if (rc)
3441                                         break;
3442                                 rc = name_create_mdt(&logname,
3443                                                      mti->mti_fsname, i);
3444                                 if (rc)
3445                                         break;
3446                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3447                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3448                                                           mti, logname,
3449                                                           &mgi->mgi_bufs,
3450                                                           cname, ptr);
3451                                         if (rc)
3452                                                 break;
3453                                 }
3454                         }
3455                 }
3456                 name_destroy(&logname);
3457                 name_destroy(&cname);
3458                 GOTO(end, rc);
3459         }
3460
3461         /* All mdt. params in proc */
3462         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3463                 int i;
3464                 __u32 idx;
3465
3466                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3467                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3468                             MTI_NAME_MAXLEN) == 0)
3469                         /* device is unspecified completely? */
3470                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3471                 else
3472                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3473                 if (rc < 0)
3474                         goto active_err;
3475                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3476                         goto active_err;
3477                 if (rc & LDD_F_SV_ALL) {
3478                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3479                                 if (!test_bit(i,
3480                                                   fsdb->fsdb_mdt_index_map))
3481                                         continue;
3482                                 rc = name_create_mdt(&logname,
3483                                                 mti->mti_fsname, i);
3484                                 if (rc)
3485                                         goto active_err;
3486                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3487                                                   logname, &mgi->mgi_bufs,
3488                                                   logname, ptr);
3489                                 name_destroy(&logname);
3490                                 if (rc)
3491                                         goto active_err;
3492                         }
3493                 } else {
3494                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3495                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3496                                 LCONSOLE_ERROR("%s: root squash parameters "
3497                                         "cannot be applied to a single MDT\n",
3498                                         mti->mti_fsname);
3499                                 GOTO(end, rc = -EINVAL);
3500                         }
3501                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3502                                           mti->mti_svname, &mgi->mgi_bufs,
3503                                           mti->mti_svname, ptr);
3504                         if (rc)
3505                                 goto active_err;
3506                 }
3507
3508                 /* root squash settings are also applied to llite
3509                  * config log (see LU-1778) */
3510                 if (rc == 0 &&
3511                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3512                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3513                         char *cname;
3514                         char *ptr2;
3515
3516                         rc = name_create(&cname, mti->mti_fsname, "-client");
3517                         if (rc)
3518                                 GOTO(end, rc);
3519                         rc = name_create(&logname, mti->mti_fsname, "-client");
3520                         if (rc) {
3521                                 name_destroy(&cname);
3522                                 GOTO(end, rc);
3523                         }
3524                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3525                         if (rc) {
3526                                 name_destroy(&cname);
3527                                 name_destroy(&logname);
3528                                 GOTO(end, rc);
3529                         }
3530                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3531                                           &mgi->mgi_bufs, cname, ptr2);
3532                         name_destroy(&ptr2);
3533                         name_destroy(&logname);
3534                         name_destroy(&cname);
3535                 }
3536                 GOTO(end, rc);
3537         }
3538
3539         /* All mdd., ost. and osd. params in proc */
3540         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3541             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3542             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3543                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3544                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3545                         GOTO(end, rc = -ENODEV);
3546
3547                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3548                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3549                 GOTO(end, rc);
3550         }
3551
3552         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3553         rc2 = -ENOSYS;
3554
3555 end:
3556         if (rc)
3557                 CERROR("err %d on param '%s'\n", rc, ptr);
3558
3559         RETURN(rc ?: rc2);
3560 }
3561
3562 /* Not implementing automatic failover nid addition at this time. */
3563 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3564                       struct mgs_target_info *mti)
3565 {
3566 #if 0
3567         struct fs_db *fsdb;
3568         int rc;
3569         ENTRY;
3570
3571         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3572         if (rc)
3573                 RETURN(rc);
3574
3575         if (mgs_log_is_empty(obd, mti->mti_svname))
3576                 /* should never happen */
3577                 RETURN(-ENOENT);
3578
3579         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3580
3581         /* FIXME We can just check mti->params to see if we're already in
3582            the failover list.  Modify mti->params for rewriting back at
3583            server_register_target(). */
3584
3585         mutex_lock(&fsdb->fsdb_mutex);
3586         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3587         mutex_unlock(&fsdb->fsdb_mutex);
3588
3589         RETURN(rc);
3590 #endif
3591         return 0;
3592 }
3593
3594 int mgs_write_log_target(const struct lu_env *env,
3595                          struct mgs_device *mgs,
3596                          struct mgs_target_info *mti,
3597                          struct fs_db *fsdb)
3598 {
3599         int rc = -EINVAL;
3600         char *buf, *params;
3601         ENTRY;
3602
3603         /* set/check the new target index */
3604         rc = mgs_set_index(env, mgs, mti);
3605         if (rc < 0) {
3606                 CERROR("Can't get index (%d)\n", rc);
3607                 RETURN(rc);
3608         }
3609
3610         if (rc == EALREADY) {
3611                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3612                               mti->mti_stripe_index, mti->mti_svname);
3613                 /* We would like to mark old log sections as invalid
3614                    and add new log sections in the client and mdt logs.
3615                    But if we add new sections, then live clients will
3616                    get repeat setup instructions for already running
3617                    osc's. So don't update the client/mdt logs. */
3618                 mti->mti_flags &= ~LDD_F_UPDATE;
3619                 rc = 0;
3620         }
3621
3622         mutex_lock(&fsdb->fsdb_mutex);
3623
3624         if (mti->mti_flags &
3625             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3626                 /* Generate a log from scratch */
3627                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3628                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3629                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3630                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3631                 } else {
3632                         CERROR("Unknown target type %#x, can't create log for "
3633                                "%s\n", mti->mti_flags, mti->mti_svname);
3634                 }
3635                 if (rc) {
3636                         CERROR("Can't write logs for %s (%d)\n",
3637                                mti->mti_svname, rc);
3638                         GOTO(out_up, rc);
3639                 }
3640         } else {
3641                 /* Just update the params from tunefs in mgs_write_log_params */
3642                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3643                 mti->mti_flags |= LDD_F_PARAM;
3644         }
3645
3646         /* allocate temporary buffer, where class_get_next_param will
3647            make copy of a current  parameter */
3648         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3649         if (buf == NULL)
3650                 GOTO(out_up, rc = -ENOMEM);
3651         params = mti->mti_params;
3652         while (params != NULL) {
3653                 rc = class_get_next_param(&params, buf);
3654                 if (rc) {
3655                         if (rc == 1)
3656                                 /* there is no next parameter, that is
3657                                    not an error */
3658                                 rc = 0;
3659                         break;
3660                 }
3661                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3662                        params, buf);
3663                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3664                 if (rc)
3665                         break;
3666         }
3667
3668         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3669
3670 out_up:
3671         mutex_unlock(&fsdb->fsdb_mutex);
3672         RETURN(rc);
3673 }
3674
3675 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3676 {
3677         struct llog_ctxt        *ctxt;
3678         int                      rc = 0;
3679
3680         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3681         if (ctxt == NULL) {
3682                 CERROR("%s: MGS config context doesn't exist\n",
3683                        mgs->mgs_obd->obd_name);
3684                 rc = -ENODEV;
3685         } else {
3686                 rc = llog_erase(env, ctxt, NULL, name);
3687                 /* llog may not exist */
3688                 if (rc == -ENOENT)
3689                         rc = 0;
3690                 llog_ctxt_put(ctxt);
3691         }
3692
3693         if (rc)
3694                 CERROR("%s: failed to clear log %s: %d\n",
3695                        mgs->mgs_obd->obd_name, name, rc);
3696
3697         return rc;
3698 }
3699
3700 /* erase all logs for the given fs */
3701 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3702 {
3703         struct fs_db *fsdb;
3704         cfs_list_t list;
3705         struct mgs_direntry *dirent, *n;
3706         int rc, len = strlen(fsname);
3707         char *suffix;
3708         ENTRY;
3709
3710         /* Find all the logs in the CONFIGS directory */
3711         rc = class_dentry_readdir(env, mgs, &list);
3712         if (rc)
3713                 RETURN(rc);
3714
3715         mutex_lock(&mgs->mgs_mutex);
3716
3717         /* Delete the fs db */
3718         fsdb = mgs_find_fsdb(mgs, fsname);
3719         if (fsdb)
3720                 mgs_free_fsdb(mgs, fsdb);
3721
3722         mutex_unlock(&mgs->mgs_mutex);
3723
3724         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3725                 cfs_list_del(&dirent->list);
3726                 suffix = strrchr(dirent->name, '-');
3727                 if (suffix != NULL) {
3728                         if ((len == suffix - dirent->name) &&
3729                             (strncmp(fsname, dirent->name, len) == 0)) {
3730                                 CDEBUG(D_MGS, "Removing log %s\n",
3731                                        dirent->name);
3732                                 mgs_erase_log(env, mgs, dirent->name);
3733                         }
3734                 }
3735                 mgs_direntry_free(dirent);
3736         }
3737
3738         RETURN(rc);
3739 }
3740
3741 /* from llog_swab */
3742 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3743 {
3744         int i;
3745         ENTRY;
3746
3747         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3748         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3749
3750         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3751         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3752         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3753         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3754
3755         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3756         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3757                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3758                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3759                                i, lcfg->lcfg_buflens[i],
3760                                lustre_cfg_string(lcfg, i));
3761                 }
3762         EXIT;
3763 }
3764
3765 /* Setup params fsdb and log
3766  */
3767 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3768                           struct fs_db *fsdb)
3769 {
3770         struct llog_handle      *params_llh = NULL;
3771         int                     rc;
3772         ENTRY;
3773
3774         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3775         if (fsdb != NULL) {
3776                 mutex_lock(&fsdb->fsdb_mutex);
3777                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3778                 if (rc == 0)
3779                         rc = record_end_log(env, &params_llh);
3780                 mutex_unlock(&fsdb->fsdb_mutex);
3781         }
3782
3783         RETURN(rc);
3784 }
3785
3786 /* Cleanup params fsdb and log
3787  */
3788 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3789 {
3790         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3791 }
3792
3793 /* Set a permanent (config log) param for a target or fs
3794  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3795  *             buf1 contains the single parameter
3796  */
3797 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3798                  struct lustre_cfg *lcfg, char *fsname)
3799 {
3800         struct fs_db *fsdb;
3801         struct mgs_target_info *mti;
3802         char *devname, *param;
3803         char *ptr;
3804         const char *tmp;
3805         __u32 index;
3806         int rc = 0;
3807         ENTRY;
3808
3809         print_lustre_cfg(lcfg);
3810
3811         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3812         devname = lustre_cfg_string(lcfg, 0);
3813         param = lustre_cfg_string(lcfg, 1);
3814         if (!devname) {
3815                 /* Assume device name embedded in param:
3816                    lustre-OST0000.osc.max_dirty_mb=32 */
3817                 ptr = strchr(param, '.');
3818                 if (ptr) {
3819                         devname = param;
3820                         *ptr = 0;
3821                         param = ptr + 1;
3822                 }
3823         }
3824         if (!devname) {
3825                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3826                 RETURN(-ENOSYS);
3827         }
3828
3829         rc = mgs_parse_devname(devname, fsname, NULL);
3830         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3831                 /* param related to llite isn't allowed to set by OST or MDT */
3832                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3833                                    sizeof(PARAM_LLITE)) == 0)
3834                         RETURN(-EINVAL);
3835         } else {
3836                 /* assume devname is the fsname */
3837                 memset(fsname, 0, MTI_NAME_MAXLEN);
3838                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3839                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3840         }
3841         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3842
3843         rc = mgs_find_or_make_fsdb(env, mgs,
3844                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3845                                    PARAMS_FILENAME : fsname, &fsdb);
3846         if (rc)
3847                 RETURN(rc);
3848
3849         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3850             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3851             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3852                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3853                        "is '%s'\n", fsname, devname);
3854                 mgs_free_fsdb(mgs, fsdb);
3855                 RETURN(-EINVAL);
3856         }
3857
3858         /* Create a fake mti to hold everything */
3859         OBD_ALLOC_PTR(mti);
3860         if (!mti)
3861                 GOTO(out, rc = -ENOMEM);
3862         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3863             >= sizeof(mti->mti_fsname))
3864                 GOTO(out, rc = -E2BIG);
3865         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3866             >= sizeof(mti->mti_svname))
3867                 GOTO(out, rc = -E2BIG);
3868         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3869             >= sizeof(mti->mti_params))
3870                 GOTO(out, rc = -E2BIG);
3871         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3872         if (rc < 0)
3873                 /* Not a valid server; may be only fsname */
3874                 rc = 0;
3875         else
3876                 /* Strip -osc or -mdc suffix from svname */
3877                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3878                                      mti->mti_svname))
3879                         GOTO(out, rc = -EINVAL);
3880         /*
3881          * Revoke lock so everyone updates.  Should be alright if
3882          * someone was already reading while we were updating the logs,
3883          * so we don't really need to hold the lock while we're
3884          * writing (above).
3885          */
3886         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3887                 mti->mti_flags = rc | LDD_F_PARAM2;
3888                 mutex_lock(&fsdb->fsdb_mutex);
3889                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3890                 mutex_unlock(&fsdb->fsdb_mutex);
3891                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3892         } else {
3893                 mti->mti_flags = rc | LDD_F_PARAM;
3894                 mutex_lock(&fsdb->fsdb_mutex);
3895                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3896                 mutex_unlock(&fsdb->fsdb_mutex);
3897                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3898         }
3899
3900 out:
3901         OBD_FREE_PTR(mti);
3902         RETURN(rc);
3903 }
3904
3905 static int mgs_write_log_pool(const struct lu_env *env,
3906                               struct mgs_device *mgs, char *logname,
3907                               struct fs_db *fsdb, char *tgtname,
3908                               enum lcfg_command_type cmd,
3909                               char *fsname, char *poolname,
3910                               char *ostname, char *comment)
3911 {
3912         struct llog_handle *llh = NULL;
3913         int rc;
3914
3915         rc = record_start_log(env, mgs, &llh, logname);
3916         if (rc)
3917                 return rc;
3918         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3919         if (rc)
3920                 goto out;
3921         rc = record_base(env, llh, tgtname, 0, cmd,
3922                          fsname, poolname, ostname, 0);
3923         if (rc)
3924                 goto out;
3925         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3926 out:
3927         record_end_log(env, &llh);
3928         return rc;
3929 }
3930
3931 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3932                  enum lcfg_command_type cmd, char *fsname,
3933                  char *poolname, char *ostname)
3934 {
3935         struct fs_db *fsdb;
3936         char *lovname;
3937         char *logname;
3938         char *label = NULL, *canceled_label = NULL;
3939         int label_sz;
3940         struct mgs_target_info *mti = NULL;
3941         int rc, i;
3942         ENTRY;
3943
3944         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3945         if (rc) {
3946                 CERROR("Can't get db for %s\n", fsname);
3947                 RETURN(rc);
3948         }
3949         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3950                 CERROR("%s is not defined\n", fsname);
3951                 mgs_free_fsdb(mgs, fsdb);
3952                 RETURN(-EINVAL);
3953         }
3954
3955         label_sz = 10 + strlen(fsname) + strlen(poolname);
3956
3957         /* check if ostname match fsname */
3958         if (ostname != NULL) {
3959                 char *ptr;
3960
3961                 ptr = strrchr(ostname, '-');
3962                 if ((ptr == NULL) ||
3963                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3964                         RETURN(-EINVAL);
3965                 label_sz += strlen(ostname);
3966         }
3967
3968         OBD_ALLOC(label, label_sz);
3969         if (label == NULL)
3970                 RETURN(-ENOMEM);
3971
3972         switch(cmd) {
3973         case LCFG_POOL_NEW:
3974                 sprintf(label,
3975                         "new %s.%s", fsname, poolname);
3976                 break;
3977         case LCFG_POOL_ADD:
3978                 sprintf(label,
3979                         "add %s.%s.%s", fsname, poolname, ostname);
3980                 break;
3981         case LCFG_POOL_REM:
3982                 OBD_ALLOC(canceled_label, label_sz);
3983                 if (canceled_label == NULL)
3984                         GOTO(out_label, rc = -ENOMEM);
3985                 sprintf(label,
3986                         "rem %s.%s.%s", fsname, poolname, ostname);
3987                 sprintf(canceled_label,
3988                         "add %s.%s.%s", fsname, poolname, ostname);
3989                 break;
3990         case LCFG_POOL_DEL:
3991                 OBD_ALLOC(canceled_label, label_sz);
3992                 if (canceled_label == NULL)
3993                         GOTO(out_label, rc = -ENOMEM);
3994                 sprintf(label,
3995                         "del %s.%s", fsname, poolname);
3996                 sprintf(canceled_label,
3997                         "new %s.%s", fsname, poolname);
3998                 break;
3999         default:
4000                 break;
4001         }
4002
4003         if (canceled_label != NULL) {
4004                 OBD_ALLOC_PTR(mti);
4005                 if (mti == NULL)
4006                         GOTO(out_cancel, rc = -ENOMEM);
4007         }
4008
4009         mutex_lock(&fsdb->fsdb_mutex);
4010         /* write pool def to all MDT logs */
4011         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4012                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4013                         rc = name_create_mdt_and_lov(&logname, &lovname,
4014                                                      fsdb, i);
4015                         if (rc) {
4016                                 mutex_unlock(&fsdb->fsdb_mutex);
4017                                 GOTO(out_mti, rc);
4018                         }
4019                         if (canceled_label != NULL) {
4020                                 strcpy(mti->mti_svname, "lov pool");
4021                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4022                                                 lovname, canceled_label,
4023                                                 CM_SKIP);
4024                         }
4025
4026                         if (rc >= 0)
4027                                 rc = mgs_write_log_pool(env, mgs, logname,
4028                                                         fsdb, lovname, cmd,
4029                                                         fsname, poolname,
4030                                                         ostname, label);
4031                         name_destroy(&logname);
4032                         name_destroy(&lovname);
4033                         if (rc) {
4034                                 mutex_unlock(&fsdb->fsdb_mutex);
4035                                 GOTO(out_mti, rc);
4036                         }
4037                 }
4038         }
4039
4040         rc = name_create(&logname, fsname, "-client");
4041         if (rc) {
4042                 mutex_unlock(&fsdb->fsdb_mutex);
4043                 GOTO(out_mti, rc);
4044         }
4045         if (canceled_label != NULL) {
4046                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4047                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4048                 if (rc < 0) {
4049                         mutex_unlock(&fsdb->fsdb_mutex);
4050                         name_destroy(&logname);
4051                         GOTO(out_mti, rc);
4052                 }
4053         }
4054
4055         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4056                                 cmd, fsname, poolname, ostname, label);
4057         mutex_unlock(&fsdb->fsdb_mutex);
4058         name_destroy(&logname);
4059         /* request for update */
4060         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4061
4062         EXIT;
4063 out_mti:
4064         if (mti != NULL)
4065                 OBD_FREE_PTR(mti);
4066 out_cancel:
4067         if (canceled_label != NULL)
4068                 OBD_FREE(canceled_label, label_sz);
4069 out_label:
4070         OBD_FREE(label, label_sz);
4071         return rc;
4072 }