Whamcloud - gitweb
LU-2200 obdclass: be more careful processing server name
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         mutex_init(&fsdb->fsdb_mutex);
349         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551         } else {
552                 GOTO(out_up, rc = -EINVAL);
553         }
554
555         if (mti->mti_flags & LDD_F_NEED_INDEX) {
556                 rc = next_index(imap, INDEX_MAP_SIZE);
557                 if (rc == -1)
558                         GOTO(out_up, rc = -ERANGE);
559                 mti->mti_stripe_index = rc;
560                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
561                         fsdb->fsdb_mdt_count ++;
562         }
563
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
566                                    "but the max index is %d.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 /* Header and tail are added back to lrh_len in
648                    llog_lvfs_write_rec */
649                 rec->lrh_len = cfg_len;
650                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
651                                 rec->lrh_index);
652                 if (!rc)
653                          mml->mml_modified++;
654         }
655
656         RETURN(rc);
657 }
658
659 /**
660  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
661  * Return code:
662  * 0 - modified successfully,
663  * 1 - no modification was done
664  * negative - error
665  */
666 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
667                       struct fs_db *fsdb, struct mgs_target_info *mti,
668                       char *logname, char *devname, char *comment, int flags)
669 {
670         struct llog_handle *loghandle;
671         struct llog_ctxt *ctxt;
672         struct mgs_modify_lookup *mml;
673         int rc;
674
675         ENTRY;
676
677         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
678         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
679                flags);
680
681         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
682         LASSERT(ctxt != NULL);
683         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
684         if (rc < 0) {
685                 if (rc == -ENOENT)
686                         rc = 0;
687                 GOTO(out_pop, rc);
688         }
689
690         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
691         if (rc)
692                 GOTO(out_close, rc);
693
694         if (llog_get_size(loghandle) <= 1)
695                 GOTO(out_close, rc = 0);
696
697         OBD_ALLOC_PTR(mml);
698         if (!mml)
699                 GOTO(out_close, rc = -ENOMEM);
700         strcpy(mml->mml_marker.cm_comment, comment);
701         strcpy(mml->mml_marker.cm_tgtname, devname);
702         /* Modify mostly means cancel */
703         mml->mml_marker.cm_flags = flags;
704         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
705         mml->mml_modified = 0;
706         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
707                           NULL);
708         if (!rc && !mml->mml_modified)
709                 rc = 1;
710         OBD_FREE_PTR(mml);
711
712 out_close:
713         llog_close(env, loghandle);
714 out_pop:
715         if (rc < 0)
716                 CERROR("%s: modify %s/%s failed: rc = %d\n",
717                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
718         llog_ctxt_put(ctxt);
719         RETURN(rc);
720 }
721
722 /** This structure is passed to mgs_replace_handler */
723 struct mgs_replace_uuid_lookup {
724         /* Nids are replaced for this target device */
725         struct mgs_target_info target;
726         /* Temporary modified llog */
727         struct llog_handle *temp_llh;
728         /* Flag is set if in target block*/
729         int in_target_device;
730         /* Nids already added. Just skip (multiple nids) */
731         int device_nids_added;
732         /* Flag is set if this block should not be copied */
733         int skip_it;
734 };
735
736 /**
737  * Check: a) if block should be skipped
738  * b) is it target block
739  *
740  * \param[in] lcfg
741  * \param[in] mrul
742  *
743  * \retval 0 should not to be skipped
744  * \retval 1 should to be skipped
745  */
746 static int check_markers(struct lustre_cfg *lcfg,
747                          struct mgs_replace_uuid_lookup *mrul)
748 {
749          struct cfg_marker *marker;
750
751         /* Track markers. Find given device */
752         if (lcfg->lcfg_command == LCFG_MARKER) {
753                 marker = lustre_cfg_buf(lcfg, 1);
754                 /* Clean llog from records marked as CM_EXCLUDE.
755                    CM_SKIP records are used for "active" command
756                    and can be restored if needed */
757                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
758                     (CM_EXCLUDE | CM_START)) {
759                         mrul->skip_it = 1;
760                         return 1;
761                 }
762
763                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
764                     (CM_EXCLUDE | CM_END)) {
765                         mrul->skip_it = 0;
766                         return 1;
767                 }
768
769                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
770                         LASSERT(!(marker->cm_flags & CM_START) ||
771                                 !(marker->cm_flags & CM_END));
772                         if (marker->cm_flags & CM_START) {
773                                 mrul->in_target_device = 1;
774                                 mrul->device_nids_added = 0;
775                         } else if (marker->cm_flags & CM_END)
776                                 mrul->in_target_device = 0;
777                 }
778         }
779
780         return 0;
781 }
782
783 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
784                        struct lustre_cfg *lcfg)
785 {
786         struct llog_rec_hdr      rec;
787         int                      buflen, rc;
788
789         if (!lcfg || !llh)
790                 return -ENOMEM;
791
792         LASSERT(llh->lgh_ctxt);
793
794         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
795                                 lcfg->lcfg_buflens);
796         rec.lrh_len = llog_data_len(buflen);
797         rec.lrh_type = OBD_CFG_REC;
798
799         /* idx = -1 means append */
800         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
801         if (rc)
802                 CERROR("failed %d\n", rc);
803         return rc;
804 }
805
806 static int record_base(const struct lu_env *env, struct llog_handle *llh,
807                      char *cfgname, lnet_nid_t nid, int cmd,
808                      char *s1, char *s2, char *s3, char *s4)
809 {
810         struct mgs_thread_info *mgi = mgs_env_info(env);
811         struct lustre_cfg     *lcfg;
812         int rc;
813
814         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
815                cmd, s1, s2, s3, s4);
816
817         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
818         if (s1)
819                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
820         if (s2)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
822         if (s3)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
824         if (s4)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
826
827         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
828         if (!lcfg)
829                 return -ENOMEM;
830         lcfg->lcfg_nid = nid;
831
832         rc = record_lcfg(env, llh, lcfg);
833
834         lustre_cfg_free(lcfg);
835
836         if (rc) {
837                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
838                        cmd, s1, s2, s3, s4);
839         }
840         return rc;
841 }
842
843 static inline int record_add_uuid(const struct lu_env *env,
844                                   struct llog_handle *llh,
845                                   uint64_t nid, char *uuid)
846 {
847         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
848 }
849
850 static inline int record_add_conn(const struct lu_env *env,
851                                   struct llog_handle *llh,
852                                   char *devname, char *uuid)
853 {
854         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
855 }
856
857 static inline int record_attach(const struct lu_env *env,
858                                 struct llog_handle *llh, char *devname,
859                                 char *type, char *uuid)
860 {
861         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
862 }
863
864 static inline int record_setup(const struct lu_env *env,
865                                struct llog_handle *llh, char *devname,
866                                char *s1, char *s2, char *s3, char *s4)
867 {
868         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
869 }
870
871 /**
872  * \retval <0 record processing error
873  * \retval n record is processed. No need copy original one.
874  * \retval 0 record is not processed.
875  */
876 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
877                            struct mgs_replace_uuid_lookup *mrul)
878 {
879         int nids_added = 0;
880         lnet_nid_t nid;
881         char *ptr;
882         int rc;
883
884         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
885                 /* LCFG_ADD_UUID command found. Let's skip original command
886                    and add passed nids */
887                 ptr = mrul->target.mti_params;
888                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
889                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
890                                "device %s\n", libcfs_nid2str(nid),
891                                 mrul->target.mti_params,
892                                 mrul->target.mti_svname);
893                         rc = record_add_uuid(env,
894                                              mrul->temp_llh, nid,
895                                              mrul->target.mti_params);
896                         if (!rc)
897                                 nids_added++;
898                 }
899
900                 if (nids_added == 0) {
901                         CERROR("No new nids were added, nid %s with uuid %s, "
902                                "device %s\n", libcfs_nid2str(nid),
903                                mrul->target.mti_params,
904                                mrul->target.mti_svname);
905                         RETURN(-ENXIO);
906                 } else {
907                         mrul->device_nids_added = 1;
908                 }
909
910                 return nids_added;
911         }
912
913         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
914                 /* LCFG_SETUP command found. UUID should be changed */
915                 rc = record_setup(env,
916                                   mrul->temp_llh,
917                                   /* devname the same */
918                                   lustre_cfg_string(lcfg, 0),
919                                   /* s1 is not changed */
920                                   lustre_cfg_string(lcfg, 1),
921                                   /* new uuid should be
922                                   the full nidlist */
923                                   mrul->target.mti_params,
924                                   /* s3 is not changed */
925                                   lustre_cfg_string(lcfg, 3),
926                                   /* s4 is not changed */
927                                   lustre_cfg_string(lcfg, 4));
928                 return rc ? rc : 1;
929         }
930
931         /* Another commands in target device block */
932         return 0;
933 }
934
935 /**
936  * Handler that called for every record in llog.
937  * Records are processed in order they placed in llog.
938  *
939  * \param[in] llh       log to be processed
940  * \param[in] rec       current record
941  * \param[in] data      mgs_replace_uuid_lookup structure
942  *
943  * \retval 0    success
944  */
945 static int mgs_replace_handler(const struct lu_env *env,
946                                struct llog_handle *llh,
947                                struct llog_rec_hdr *rec,
948                                void *data)
949 {
950         struct llog_rec_hdr local_rec = *rec;
951         struct mgs_replace_uuid_lookup *mrul;
952         struct lustre_cfg *lcfg = REC_DATA(rec);
953         int cfg_len = REC_DATA_LEN(rec);
954         int rc;
955         ENTRY;
956
957         mrul = (struct mgs_replace_uuid_lookup *)data;
958
959         if (rec->lrh_type != OBD_CFG_REC) {
960                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
961                        rec->lrh_type, lcfg->lcfg_command,
962                        lustre_cfg_string(lcfg, 0),
963                        lustre_cfg_string(lcfg, 1));
964                 RETURN(-EINVAL);
965         }
966
967         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
968         if (rc) {
969                 /* Do not copy any invalidated records */
970                 GOTO(skip_out, rc = 0);
971         }
972
973         rc = check_markers(lcfg, mrul);
974         if (rc || mrul->skip_it)
975                 GOTO(skip_out, rc = 0);
976
977         /* Write to new log all commands outside target device block */
978         if (!mrul->in_target_device)
979                 GOTO(copy_out, rc = 0);
980
981         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
982            (failover nids) for this target, assuming that if then
983            primary is changing then so is the failover */
984         if (mrul->device_nids_added &&
985             (lcfg->lcfg_command == LCFG_ADD_UUID ||
986              lcfg->lcfg_command == LCFG_ADD_CONN))
987                 GOTO(skip_out, rc = 0);
988
989         rc = process_command(env, lcfg, mrul);
990         if (rc < 0)
991                 RETURN(rc);
992
993         if (rc)
994                 RETURN(0);
995 copy_out:
996         /* Record is placed in temporary llog as is */
997         local_rec.lrh_len -= sizeof(*rec) + sizeof(struct llog_rec_tail);
998         rc = llog_write(env, mrul->temp_llh, &local_rec, NULL, 0,
999                         (void *)lcfg, -1);
1000
1001         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1002                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1003                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1004         RETURN(rc);
1005
1006 skip_out:
1007         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1008                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1009                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1010         RETURN(rc);
1011 }
1012
1013 static int mgs_backup_llog(const struct lu_env *env,
1014                            struct obd_device *mgs,
1015                            char *fsname, char *backup)
1016 {
1017         struct obd_uuid *uuid;
1018         struct llog_handle *orig_llh, *bak_llh;
1019         struct llog_ctxt *lctxt;
1020         int rc, rc2;
1021         ENTRY;
1022
1023         lctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1024         if (!lctxt) {
1025                 CERROR("%s: missing llog context\n", mgs->obd_name);
1026                 GOTO(out, rc = -EINVAL);
1027         }
1028
1029         /* Make sure there's no old backup log */
1030         rc = llog_erase(env, lctxt, NULL, backup);
1031         if (rc < 0 && rc != -ENOENT)
1032                 GOTO(out_put, rc);
1033
1034         /* open backup log */
1035         rc = llog_open_create(env, lctxt, &bak_llh, NULL, backup);
1036         if (rc) {
1037                 CERROR("%s: backup logfile open %s: rc = %d\n",
1038                        mgs->obd_name, backup, rc);
1039                 GOTO(out_put, rc);
1040         }
1041
1042         /* set the log header uuid */
1043         OBD_ALLOC_PTR(uuid);
1044         if (uuid == NULL)
1045                 GOTO(out_put, rc = -ENOMEM);
1046         obd_str2uuid(uuid, backup);
1047         rc = llog_init_handle(env, bak_llh, LLOG_F_IS_PLAIN, uuid);
1048         OBD_FREE_PTR(uuid);
1049         if (rc)
1050                 GOTO(out_close1, rc);
1051
1052         /* open original log */
1053         rc = llog_open(env, lctxt, &orig_llh, NULL, fsname,
1054                        LLOG_OPEN_EXISTS);
1055         if (rc < 0) {
1056                 if (rc == -ENOENT)
1057                         rc = 0;
1058                 GOTO(out_close1, rc);
1059         }
1060
1061         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1062         if (rc)
1063                 GOTO(out_close2, rc);
1064
1065         /* Copy remote log */
1066         rc = llog_process(env, orig_llh, llog_copy_handler,
1067                           (void *)bak_llh, NULL);
1068
1069 out_close2:
1070         rc2 = llog_close(env, orig_llh);
1071         if (!rc)
1072                 rc = rc2;
1073 out_close1:
1074         rc2 = llog_close(env, bak_llh);
1075         if (!rc)
1076                 rc = rc2;
1077 out_put:
1078         if (lctxt)
1079                 llog_ctxt_put(lctxt);
1080 out:
1081         if (rc)
1082                 CERROR("%s: Failed to backup log %s: rc = %d\n",
1083                        mgs->obd_name, fsname, rc);
1084         RETURN(rc);
1085 }
1086
1087 static int mgs_log_is_empty(const struct lu_env *env, struct mgs_device *mgs,
1088                             char *name);
1089
1090 static int mgs_replace_nids_log(const struct lu_env *env,
1091                                 struct obd_device *mgs, struct fs_db *fsdb,
1092                                 char *logname, char *devname, char *nids)
1093 {
1094         struct llog_handle *orig_llh, *backup_llh;
1095         struct llog_ctxt *ctxt;
1096         struct mgs_replace_uuid_lookup *mrul;
1097         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1098         char *backup;
1099         int rc, rc2;
1100         ENTRY;
1101
1102         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1103
1104         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1105         LASSERT(ctxt != NULL);
1106
1107         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1108                 /* Log is empty. Nothing to replace */
1109                 GOTO(out_put, rc = 0);
1110         }
1111
1112         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1113         if (backup == NULL)
1114                 GOTO(out_put, rc = -ENOMEM);
1115
1116         sprintf(backup, "%s.bak", logname);
1117
1118         rc = mgs_backup_llog(env, mgs, logname, backup);
1119         if (rc < 0) {
1120                 CERROR("%s: can't make backup for %s: rc = %d\n",
1121                        mgs->obd_name, logname, rc);
1122                 GOTO(out_free,rc);
1123         }
1124
1125         /* Now erase original log file. Connections are not allowed.
1126            Backup is already saved */
1127         rc = llog_erase(env, ctxt, NULL, logname);
1128         if (rc < 0 && rc != -ENOENT)
1129                 GOTO(out_free, rc);
1130
1131         /* open local log */
1132         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1133         if (rc)
1134                 GOTO(out_restore, rc);
1135
1136         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1137         if (rc)
1138                 GOTO(out_closel, rc);
1139
1140         /* open backup llog */
1141         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1142                        LLOG_OPEN_EXISTS);
1143         if (rc)
1144                 GOTO(out_closel, rc);
1145
1146         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1147         if (rc)
1148                 GOTO(out_close, rc);
1149
1150         if (llog_get_size(backup_llh) <= 1)
1151                 GOTO(out_close, rc = 0);
1152
1153         OBD_ALLOC_PTR(mrul);
1154         if (!mrul)
1155                 GOTO(out_close, rc = -ENOMEM);
1156         /* devname is only needed information to replace UUID records */
1157         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1158         /* parse nids later */
1159         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1160         /* Copy records to this temporary llog */
1161         mrul->temp_llh = orig_llh;
1162
1163         rc = llog_process(env, backup_llh, mgs_replace_handler,
1164                           (void *)mrul, NULL);
1165         OBD_FREE_PTR(mrul);
1166 out_close:
1167         rc2 = llog_close(NULL, backup_llh);
1168         if (!rc)
1169                 rc = rc2;
1170 out_closel:
1171         rc2 = llog_close(NULL, orig_llh);
1172         if (!rc)
1173                 rc = rc2;
1174
1175 out_restore:
1176         if (rc) {
1177                 CERROR("%s: llog should be restored: rc = %d\n",
1178                        mgs->obd_name, rc);
1179                 rc2 = mgs_backup_llog(env, mgs, backup, logname);
1180                 if (rc2 < 0)
1181                         CERROR("%s: can't restore backup %s: rc = %d\n",
1182                                mgs->obd_name, logname, rc2);
1183         }
1184
1185 out_free:
1186         OBD_FREE(backup, strlen(backup) + 1);
1187
1188 out_put:
1189         llog_ctxt_put(ctxt);
1190
1191         if (rc)
1192                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1193                        mgs->obd_name, logname, rc);
1194
1195         RETURN(rc);
1196 }
1197
1198 /**
1199  * Parse device name and get file system name and/or device index
1200  *
1201  * \param[in]   devname device name (ex. lustre-MDT0000)
1202  * \param[out]  fsname  file system name(optional)
1203  * \param[out]  index   device index(optional)
1204  *
1205  * \retval 0    success
1206  */
1207 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1208 {
1209         int rc;
1210         ENTRY;
1211
1212         /* Extract fsname */
1213         if (fsname) {
1214                 rc = server_name2fsname(devname, fsname, NULL);
1215                 if (rc < 0) {
1216                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1217                                devname);
1218                         RETURN(-EINVAL);
1219                 }
1220         }
1221
1222         if (index) {
1223                 rc = server_name2index(devname, index, NULL);
1224                 if (rc < 0) {
1225                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1226                                devname);
1227                         RETURN(-EINVAL);
1228                 }
1229         }
1230
1231         RETURN(0);
1232 }
1233
1234 static int only_mgs_is_running(struct obd_device *mgs_obd)
1235 {
1236         /* TDB: Is global variable with devices count exists? */
1237         int num_devices = get_devices_count();
1238         /* osd, MGS and MGC + self_export
1239            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1240         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1241 }
1242
1243 static int name_create_mdt(char **logname, char *fsname, int i)
1244 {
1245         char mdt_index[9];
1246
1247         sprintf(mdt_index, "-MDT%04x", i);
1248         return name_create(logname, fsname, mdt_index);
1249 }
1250
1251 /**
1252  * Replace nids for \a device to \a nids values
1253  *
1254  * \param obd           MGS obd device
1255  * \param devname       nids need to be replaced for this device
1256  * (ex. lustre-OST0000)
1257  * \param nids          nids list (ex. nid1,nid2,nid3)
1258  *
1259  * \retval 0    success
1260  */
1261 int mgs_replace_nids(const struct lu_env *env,
1262                      struct mgs_device *mgs,
1263                      char *devname, char *nids)
1264 {
1265         /* Assume fsname is part of device name */
1266         char fsname[MTI_NAME_MAXLEN];
1267         int rc;
1268         __u32 index;
1269         char *logname;
1270         struct fs_db *fsdb;
1271         unsigned int i;
1272         int conn_state;
1273         struct obd_device *mgs_obd = mgs->mgs_obd;
1274         ENTRY;
1275
1276         /* We can only change NIDs if no other nodes are connected */
1277         spin_lock(&mgs_obd->obd_dev_lock);
1278         conn_state = mgs_obd->obd_no_conn;
1279         mgs_obd->obd_no_conn = 1;
1280         spin_unlock(&mgs_obd->obd_dev_lock);
1281
1282         /* We can not change nids if not only MGS is started */
1283         if (!only_mgs_is_running(mgs_obd)) {
1284                 CERROR("Only MGS is allowed to be started\n");
1285                 GOTO(out, rc = -EINPROGRESS);
1286         }
1287
1288         /* Get fsname and index*/
1289         rc = mgs_parse_devname(devname, fsname, &index);
1290         if (rc)
1291                 GOTO(out, rc);
1292
1293         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1294         if (rc) {
1295                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1296                 GOTO(out, rc);
1297         }
1298
1299         /* Process client llogs */
1300         name_create(&logname, fsname, "-client");
1301         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1302         name_destroy(&logname);
1303         if (rc) {
1304                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1305                        fsname, devname, rc);
1306                 GOTO(out, rc);
1307         }
1308
1309         /* Process MDT llogs */
1310         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1311                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1312                         continue;
1313                 name_create_mdt(&logname, fsname, i);
1314                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1315                 name_destroy(&logname);
1316                 if (rc)
1317                         GOTO(out, rc);
1318         }
1319
1320 out:
1321         spin_lock(&mgs_obd->obd_dev_lock);
1322         mgs_obd->obd_no_conn = conn_state;
1323         spin_unlock(&mgs_obd->obd_dev_lock);
1324
1325         RETURN(rc);
1326 }
1327
1328 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1329                             char *devname, struct lov_desc *desc)
1330 {
1331         struct mgs_thread_info *mgi = mgs_env_info(env);
1332         struct lustre_cfg *lcfg;
1333         int rc;
1334
1335         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1336         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1337         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1338         if (!lcfg)
1339                 return -ENOMEM;
1340         rc = record_lcfg(env, llh, lcfg);
1341
1342         lustre_cfg_free(lcfg);
1343         return rc;
1344 }
1345
1346 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1347                             char *devname, struct lmv_desc *desc)
1348 {
1349         struct mgs_thread_info *mgi = mgs_env_info(env);
1350         struct lustre_cfg *lcfg;
1351         int rc;
1352
1353         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1354         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1355         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1356
1357         rc = record_lcfg(env, llh, lcfg);
1358
1359         lustre_cfg_free(lcfg);
1360         return rc;
1361 }
1362
1363 static inline int record_mdc_add(const struct lu_env *env,
1364                                  struct llog_handle *llh,
1365                                  char *logname, char *mdcuuid,
1366                                  char *mdtuuid, char *index,
1367                                  char *gen)
1368 {
1369         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1370                            mdtuuid,index,gen,mdcuuid);
1371 }
1372
1373 static inline int record_lov_add(const struct lu_env *env,
1374                                  struct llog_handle *llh,
1375                                  char *lov_name, char *ost_uuid,
1376                                  char *index, char *gen)
1377 {
1378         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1379                            ost_uuid, index, gen, 0);
1380 }
1381
1382 static inline int record_mount_opt(const struct lu_env *env,
1383                                    struct llog_handle *llh,
1384                                    char *profile, char *lov_name,
1385                                    char *mdc_name)
1386 {
1387         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1388                            profile,lov_name,mdc_name,0);
1389 }
1390
1391 static int record_marker(const struct lu_env *env,
1392                          struct llog_handle *llh,
1393                          struct fs_db *fsdb, __u32 flags,
1394                          char *tgtname, char *comment)
1395 {
1396         struct mgs_thread_info *mgi = mgs_env_info(env);
1397         struct lustre_cfg *lcfg;
1398         int rc;
1399         int cplen = 0;
1400
1401         if (flags & CM_START)
1402                 fsdb->fsdb_gen++;
1403         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1404         mgi->mgi_marker.cm_flags = flags;
1405         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1406         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1407                         sizeof(mgi->mgi_marker.cm_tgtname));
1408         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1409                 return -E2BIG;
1410         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1411                         sizeof(mgi->mgi_marker.cm_comment));
1412         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1413                 return -E2BIG;
1414         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1415         mgi->mgi_marker.cm_canceltime = 0;
1416         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1417         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1418                             sizeof(mgi->mgi_marker));
1419         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1420         if (!lcfg)
1421                 return -ENOMEM;
1422         rc = record_lcfg(env, llh, lcfg);
1423
1424         lustre_cfg_free(lcfg);
1425         return rc;
1426 }
1427
1428 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1429                             struct llog_handle **llh, char *name)
1430 {
1431         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1432         struct llog_ctxt        *ctxt;
1433         int                      rc = 0;
1434
1435         if (*llh)
1436                 GOTO(out, rc = -EBUSY);
1437
1438         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1439         if (!ctxt)
1440                 GOTO(out, rc = -ENODEV);
1441         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1442
1443         rc = llog_open_create(env, ctxt, llh, NULL, name);
1444         if (rc)
1445                 GOTO(out_ctxt, rc);
1446         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1447         if (rc)
1448                 llog_close(env, *llh);
1449 out_ctxt:
1450         llog_ctxt_put(ctxt);
1451 out:
1452         if (rc) {
1453                 CERROR("%s: can't start log %s: rc = %d\n",
1454                        mgs->mgs_obd->obd_name, name, rc);
1455                 *llh = NULL;
1456         }
1457         RETURN(rc);
1458 }
1459
1460 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1461 {
1462         int rc;
1463
1464         rc = llog_close(env, *llh);
1465         *llh = NULL;
1466
1467         return rc;
1468 }
1469
1470 static int mgs_log_is_empty(const struct lu_env *env,
1471                             struct mgs_device *mgs, char *name)
1472 {
1473         struct llog_handle *llh;
1474         struct llog_ctxt *ctxt;
1475         int rc = 0;
1476
1477         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1478         LASSERT(ctxt != NULL);
1479         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1480         if (rc < 0) {
1481                 if (rc == -ENOENT)
1482                         rc = 0;
1483                 GOTO(out_ctxt, rc);
1484         }
1485
1486         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1487         if (rc)
1488                 GOTO(out_close, rc);
1489         rc = llog_get_size(llh);
1490
1491 out_close:
1492         llog_close(env, llh);
1493 out_ctxt:
1494         llog_ctxt_put(ctxt);
1495         /* header is record 1 */
1496         return (rc <= 1);
1497 }
1498
1499 /******************** config "macros" *********************/
1500
1501 /* write an lcfg directly into a log (with markers) */
1502 static int mgs_write_log_direct(const struct lu_env *env,
1503                                 struct mgs_device *mgs, struct fs_db *fsdb,
1504                                 char *logname, struct lustre_cfg *lcfg,
1505                                 char *devname, char *comment)
1506 {
1507         struct llog_handle *llh = NULL;
1508         int rc;
1509         ENTRY;
1510
1511         if (!lcfg)
1512                 RETURN(-ENOMEM);
1513
1514         rc = record_start_log(env, mgs, &llh, logname);
1515         if (rc)
1516                 RETURN(rc);
1517
1518         /* FIXME These should be a single journal transaction */
1519         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1520         if (rc)
1521                 GOTO(out_end, rc);
1522         rc = record_lcfg(env, llh, lcfg);
1523         if (rc)
1524                 GOTO(out_end, rc);
1525         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1526         if (rc)
1527                 GOTO(out_end, rc);
1528 out_end:
1529         record_end_log(env, &llh);
1530         RETURN(rc);
1531 }
1532
1533 /* write the lcfg in all logs for the given fs */
1534 int mgs_write_log_direct_all(const struct lu_env *env,
1535                              struct mgs_device *mgs,
1536                              struct fs_db *fsdb,
1537                              struct mgs_target_info *mti,
1538                              struct lustre_cfg *lcfg,
1539                              char *devname, char *comment,
1540                              int server_only)
1541 {
1542         cfs_list_t list;
1543         struct mgs_direntry *dirent, *n;
1544         char *fsname = mti->mti_fsname;
1545         char *logname;
1546         int rc = 0, len = strlen(fsname);
1547         ENTRY;
1548
1549         /* We need to set params for any future logs
1550            as well. FIXME Append this file to every new log.
1551            Actually, we should store as params (text), not llogs.  Or
1552            in a database. */
1553         rc = name_create(&logname, fsname, "-params");
1554         if (rc)
1555                 RETURN(rc);
1556         if (mgs_log_is_empty(env, mgs, logname)) {
1557                 struct llog_handle *llh = NULL;
1558                 rc = record_start_log(env, mgs, &llh, logname);
1559                 record_end_log(env, &llh);
1560         }
1561         name_destroy(&logname);
1562         if (rc)
1563                 RETURN(rc);
1564
1565         /* Find all the logs in the CONFIGS directory */
1566         rc = class_dentry_readdir(env, mgs, &list);
1567         if (rc)
1568                 RETURN(rc);
1569
1570         /* Could use fsdb index maps instead of directory listing */
1571         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1572                 cfs_list_del(&dirent->list);
1573                 /* don't write to sptlrpc rule log */
1574                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1575                         goto next;
1576
1577                 /* caller wants write server logs only */
1578                 if (server_only && strstr(dirent->name, "-client") != NULL)
1579                         goto next;
1580
1581                 if (strncmp(fsname, dirent->name, len) == 0) {
1582                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1583                         /* Erase any old settings of this same parameter */
1584                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1585                                         devname, comment, CM_SKIP);
1586                         if (rc < 0)
1587                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1588                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1589                         /* Write the new one */
1590                         if (lcfg) {
1591                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1592                                                           dirent->name,
1593                                                           lcfg, devname,
1594                                                           comment);
1595                                 if (rc)
1596                                         CERROR("%s: writing log %s: rc = %d\n",
1597                                                mgs->mgs_obd->obd_name,
1598                                                dirent->name, rc);
1599                         }
1600                 }
1601 next:
1602                 mgs_direntry_free(dirent);
1603         }
1604
1605         RETURN(rc);
1606 }
1607
1608 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1609                                     struct mgs_device *mgs,
1610                                     struct fs_db *fsdb,
1611                                     struct mgs_target_info *mti,
1612                                     int index, char *logname);
1613 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1614                                     struct mgs_device *mgs,
1615                                     struct fs_db *fsdb,
1616                                     struct mgs_target_info *mti,
1617                                     char *logname, char *suffix, char *lovname,
1618                                     enum lustre_sec_part sec_part, int flags);
1619 static int name_create_mdt_and_lov(char **logname, char **lovname,
1620                                    struct fs_db *fsdb, int i);
1621
1622 static int add_param(char *params, char *key, char *val)
1623 {
1624         char *start = params + strlen(params);
1625         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1626         int keylen = 0;
1627
1628         if (key != NULL)
1629                 keylen = strlen(key);
1630         if (start + 1 + keylen + strlen(val) >= end) {
1631                 CERROR("params are too long: %s %s%s\n",
1632                        params, key != NULL ? key : "", val);
1633                 return -EINVAL;
1634         }
1635
1636         sprintf(start, " %s%s", key != NULL ? key : "", val);
1637         return 0;
1638 }
1639
1640 /**
1641  * Walk through client config log record and convert the related records
1642  * into the target.
1643  **/
1644 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1645                                          struct llog_handle *llh,
1646                                          struct llog_rec_hdr *rec, void *data)
1647 {
1648         struct mgs_device *mgs;
1649         struct obd_device *obd;
1650         struct mgs_target_info *mti, *tmti;
1651         struct fs_db *fsdb;
1652         int cfg_len = rec->lrh_len;
1653         char *cfg_buf = (char*) (rec + 1);
1654         struct lustre_cfg *lcfg;
1655         int rc = 0;
1656         struct llog_handle *mdt_llh = NULL;
1657         static int got_an_osc_or_mdc = 0;
1658         /* 0: not found any osc/mdc;
1659            1: found osc;
1660            2: found mdc;
1661         */
1662         static int last_step = -1;
1663         int cplen = 0;
1664
1665         ENTRY;
1666
1667         mti = ((struct temp_comp*)data)->comp_mti;
1668         tmti = ((struct temp_comp*)data)->comp_tmti;
1669         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1670         obd = ((struct temp_comp *)data)->comp_obd;
1671         mgs = lu2mgs_dev(obd->obd_lu_dev);
1672         LASSERT(mgs);
1673
1674         if (rec->lrh_type != OBD_CFG_REC) {
1675                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1676                 RETURN(-EINVAL);
1677         }
1678
1679         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1680         if (rc) {
1681                 CERROR("Insane cfg\n");
1682                 RETURN(rc);
1683         }
1684
1685         lcfg = (struct lustre_cfg *)cfg_buf;
1686
1687         if (lcfg->lcfg_command == LCFG_MARKER) {
1688                 struct cfg_marker *marker;
1689                 marker = lustre_cfg_buf(lcfg, 1);
1690                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1691                     (marker->cm_flags & CM_START) &&
1692                      !(marker->cm_flags & CM_SKIP)) {
1693                         got_an_osc_or_mdc = 1;
1694                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1695                                         sizeof(tmti->mti_svname));
1696                         if (cplen >= sizeof(tmti->mti_svname))
1697                                 RETURN(-E2BIG);
1698                         rc = record_start_log(env, mgs, &mdt_llh,
1699                                               mti->mti_svname);
1700                         if (rc)
1701                                 RETURN(rc);
1702                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1703                                            mti->mti_svname, "add osc(copied)");
1704                         record_end_log(env, &mdt_llh);
1705                         last_step = marker->cm_step;
1706                         RETURN(rc);
1707                 }
1708                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1709                     (marker->cm_flags & CM_END) &&
1710                      !(marker->cm_flags & CM_SKIP)) {
1711                         LASSERT(last_step == marker->cm_step);
1712                         last_step = -1;
1713                         got_an_osc_or_mdc = 0;
1714                         memset(tmti, 0, sizeof(*tmti));
1715                         rc = record_start_log(env, mgs, &mdt_llh,
1716                                               mti->mti_svname);
1717                         if (rc)
1718                                 RETURN(rc);
1719                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1720                                            mti->mti_svname, "add osc(copied)");
1721                         record_end_log(env, &mdt_llh);
1722                         RETURN(rc);
1723                 }
1724                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1725                     (marker->cm_flags & CM_START) &&
1726                      !(marker->cm_flags & CM_SKIP)) {
1727                         got_an_osc_or_mdc = 2;
1728                         last_step = marker->cm_step;
1729                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1730                                strlen(marker->cm_tgtname));
1731
1732                         RETURN(rc);
1733                 }
1734                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1735                     (marker->cm_flags & CM_END) &&
1736                      !(marker->cm_flags & CM_SKIP)) {
1737                         LASSERT(last_step == marker->cm_step);
1738                         last_step = -1;
1739                         got_an_osc_or_mdc = 0;
1740                         memset(tmti, 0, sizeof(*tmti));
1741                         RETURN(rc);
1742                 }
1743         }
1744
1745         if (got_an_osc_or_mdc == 0 || last_step < 0)
1746                 RETURN(rc);
1747
1748         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1749                 uint64_t nodenid = lcfg->lcfg_nid;
1750
1751                 if (strlen(tmti->mti_uuid) == 0) {
1752                         /* target uuid not set, this config record is before
1753                          * LCFG_SETUP, this nid is one of target node nid.
1754                          */
1755                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1756                         tmti->mti_nid_count++;
1757                 } else {
1758                         /* failover node nid */
1759                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1760                                        libcfs_nid2str(nodenid));
1761                 }
1762
1763                 RETURN(rc);
1764         }
1765
1766         if (lcfg->lcfg_command == LCFG_SETUP) {
1767                 char *target;
1768
1769                 target = lustre_cfg_string(lcfg, 1);
1770                 memcpy(tmti->mti_uuid, target, strlen(target));
1771                 RETURN(rc);
1772         }
1773
1774         /* ignore client side sptlrpc_conf_log */
1775         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1776                 RETURN(rc);
1777
1778         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1779                 int index;
1780
1781                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1782                         RETURN (-EINVAL);
1783
1784                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1785                        strlen(mti->mti_fsname));
1786                 tmti->mti_stripe_index = index;
1787
1788                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1789                                               mti->mti_stripe_index,
1790                                               mti->mti_svname);
1791                 memset(tmti, 0, sizeof(*tmti));
1792                 RETURN(rc);
1793         }
1794
1795         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1796                 int index;
1797                 char mdt_index[9];
1798                 char *logname, *lovname;
1799
1800                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1801                                              mti->mti_stripe_index);
1802                 if (rc)
1803                         RETURN(rc);
1804                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1805
1806                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1807                         name_destroy(&logname);
1808                         name_destroy(&lovname);
1809                         RETURN(-EINVAL);
1810                 }
1811
1812                 tmti->mti_stripe_index = index;
1813                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1814                                          mdt_index, lovname,
1815                                          LUSTRE_SP_MDT, 0);
1816                 name_destroy(&logname);
1817                 name_destroy(&lovname);
1818                 RETURN(rc);
1819         }
1820         RETURN(rc);
1821 }
1822
1823 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1824 /* stealed from mgs_get_fsdb_from_llog*/
1825 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1826                                               struct mgs_device *mgs,
1827                                               char *client_name,
1828                                               struct temp_comp* comp)
1829 {
1830         struct llog_handle *loghandle;
1831         struct mgs_target_info *tmti;
1832         struct llog_ctxt *ctxt;
1833         int rc;
1834
1835         ENTRY;
1836
1837         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1838         LASSERT(ctxt != NULL);
1839
1840         OBD_ALLOC_PTR(tmti);
1841         if (tmti == NULL)
1842                 GOTO(out_ctxt, rc = -ENOMEM);
1843
1844         comp->comp_tmti = tmti;
1845         comp->comp_obd = mgs->mgs_obd;
1846
1847         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1848                        LLOG_OPEN_EXISTS);
1849         if (rc < 0) {
1850                 if (rc == -ENOENT)
1851                         rc = 0;
1852                 GOTO(out_pop, rc);
1853         }
1854
1855         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1856         if (rc)
1857                 GOTO(out_close, rc);
1858
1859         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1860                                   (void *)comp, NULL, false);
1861         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1862 out_close:
1863         llog_close(env, loghandle);
1864 out_pop:
1865         OBD_FREE_PTR(tmti);
1866 out_ctxt:
1867         llog_ctxt_put(ctxt);
1868         RETURN(rc);
1869 }
1870
1871 /* lmv is the second thing for client logs */
1872 /* copied from mgs_write_log_lov. Please refer to that.  */
1873 static int mgs_write_log_lmv(const struct lu_env *env,
1874                              struct mgs_device *mgs,
1875                              struct fs_db *fsdb,
1876                              struct mgs_target_info *mti,
1877                              char *logname, char *lmvname)
1878 {
1879         struct llog_handle *llh = NULL;
1880         struct lmv_desc *lmvdesc;
1881         char *uuid;
1882         int rc = 0;
1883         ENTRY;
1884
1885         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1886
1887         OBD_ALLOC_PTR(lmvdesc);
1888         if (lmvdesc == NULL)
1889                 RETURN(-ENOMEM);
1890         lmvdesc->ld_active_tgt_count = 0;
1891         lmvdesc->ld_tgt_count = 0;
1892         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1893         uuid = (char *)lmvdesc->ld_uuid.uuid;
1894
1895         rc = record_start_log(env, mgs, &llh, logname);
1896         if (rc)
1897                 GOTO(out_free, rc);
1898         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1899         if (rc)
1900                 GOTO(out_end, rc);
1901         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1902         if (rc)
1903                 GOTO(out_end, rc);
1904         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1905         if (rc)
1906                 GOTO(out_end, rc);
1907         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1908         if (rc)
1909                 GOTO(out_end, rc);
1910 out_end:
1911         record_end_log(env, &llh);
1912 out_free:
1913         OBD_FREE_PTR(lmvdesc);
1914         RETURN(rc);
1915 }
1916
1917 /* lov is the first thing in the mdt and client logs */
1918 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1919                              struct fs_db *fsdb, struct mgs_target_info *mti,
1920                              char *logname, char *lovname)
1921 {
1922         struct llog_handle *llh = NULL;
1923         struct lov_desc *lovdesc;
1924         char *uuid;
1925         int rc = 0;
1926         ENTRY;
1927
1928         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1929
1930         /*
1931         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1932         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1933               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1934         */
1935
1936         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1937         OBD_ALLOC_PTR(lovdesc);
1938         if (lovdesc == NULL)
1939                 RETURN(-ENOMEM);
1940         lovdesc->ld_magic = LOV_DESC_MAGIC;
1941         lovdesc->ld_tgt_count = 0;
1942         /* Defaults.  Can be changed later by lcfg config_param */
1943         lovdesc->ld_default_stripe_count = 1;
1944         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1945         lovdesc->ld_default_stripe_size = 1024 * 1024;
1946         lovdesc->ld_default_stripe_offset = -1;
1947         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1948         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1949         /* can these be the same? */
1950         uuid = (char *)lovdesc->ld_uuid.uuid;
1951
1952         /* This should always be the first entry in a log.
1953         rc = mgs_clear_log(obd, logname); */
1954         rc = record_start_log(env, mgs, &llh, logname);
1955         if (rc)
1956                 GOTO(out_free, rc);
1957         /* FIXME these should be a single journal transaction */
1958         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1959         if (rc)
1960                 GOTO(out_end, rc);
1961         rc = record_attach(env, llh, lovname, "lov", uuid);
1962         if (rc)
1963                 GOTO(out_end, rc);
1964         rc = record_lov_setup(env, llh, lovname, lovdesc);
1965         if (rc)
1966                 GOTO(out_end, rc);
1967         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1968         if (rc)
1969                 GOTO(out_end, rc);
1970         EXIT;
1971 out_end:
1972         record_end_log(env, &llh);
1973 out_free:
1974         OBD_FREE_PTR(lovdesc);
1975         return rc;
1976 }
1977
1978 /* add failnids to open log */
1979 static int mgs_write_log_failnids(const struct lu_env *env,
1980                                   struct mgs_target_info *mti,
1981                                   struct llog_handle *llh,
1982                                   char *cliname)
1983 {
1984         char *failnodeuuid = NULL;
1985         char *ptr = mti->mti_params;
1986         lnet_nid_t nid;
1987         int rc = 0;
1988
1989         /*
1990         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1991         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1992         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1993         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1994         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1995         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1996         */
1997
1998         /* Pull failnid info out of params string */
1999         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2000                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2001                         if (failnodeuuid == NULL) {
2002                                 /* We don't know the failover node name,
2003                                    so just use the first nid as the uuid */
2004                                 rc = name_create(&failnodeuuid,
2005                                                  libcfs_nid2str(nid), "");
2006                                 if (rc)
2007                                         return rc;
2008                         }
2009                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2010                                "client %s\n", libcfs_nid2str(nid),
2011                                failnodeuuid, cliname);
2012                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2013                 }
2014                 if (failnodeuuid)
2015                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2016         }
2017
2018         name_destroy(&failnodeuuid);
2019         return rc;
2020 }
2021
2022 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2023                                     struct mgs_device *mgs,
2024                                     struct fs_db *fsdb,
2025                                     struct mgs_target_info *mti,
2026                                     char *logname, char *lmvname)
2027 {
2028         struct llog_handle *llh = NULL;
2029         char *mdcname = NULL;
2030         char *nodeuuid = NULL;
2031         char *mdcuuid = NULL;
2032         char *lmvuuid = NULL;
2033         char index[6];
2034         int i, rc;
2035         ENTRY;
2036
2037         if (mgs_log_is_empty(env, mgs, logname)) {
2038                 CERROR("log is empty! Logical error\n");
2039                 RETURN(-EINVAL);
2040         }
2041
2042         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2043                mti->mti_svname, logname, lmvname);
2044
2045         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2046         if (rc)
2047                 RETURN(rc);
2048         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2049         if (rc)
2050                 GOTO(out_free, rc);
2051         rc = name_create(&mdcuuid, mdcname, "_UUID");
2052         if (rc)
2053                 GOTO(out_free, rc);
2054         rc = name_create(&lmvuuid, lmvname, "_UUID");
2055         if (rc)
2056                 GOTO(out_free, rc);
2057
2058         rc = record_start_log(env, mgs, &llh, logname);
2059         if (rc)
2060                 GOTO(out_free, rc);
2061         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2062                            "add mdc");
2063         if (rc)
2064                 GOTO(out_end, rc);
2065         for (i = 0; i < mti->mti_nid_count; i++) {
2066                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2067                        libcfs_nid2str(mti->mti_nids[i]));
2068
2069                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2070                 if (rc)
2071                         GOTO(out_end, rc);
2072         }
2073
2074         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2075         if (rc)
2076                 GOTO(out_end, rc);
2077         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
2078         if (rc)
2079                 GOTO(out_end, rc);
2080         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2081         if (rc)
2082                 GOTO(out_end, rc);
2083         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2084         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2085                             index, "1");
2086         if (rc)
2087                 GOTO(out_end, rc);
2088         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2089                            "add mdc");
2090         if (rc)
2091                 GOTO(out_end, rc);
2092 out_end:
2093         record_end_log(env, &llh);
2094 out_free:
2095         name_destroy(&lmvuuid);
2096         name_destroy(&mdcuuid);
2097         name_destroy(&mdcname);
2098         name_destroy(&nodeuuid);
2099         RETURN(rc);
2100 }
2101
2102 static inline int name_create_lov(char **lovname, char *mdtname,
2103                                   struct fs_db *fsdb, int index)
2104 {
2105         /* COMPAT_180 */
2106         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2107                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2108         else
2109                 return name_create(lovname, mdtname, "-mdtlov");
2110 }
2111
2112 static int name_create_mdt_and_lov(char **logname, char **lovname,
2113                                    struct fs_db *fsdb, int i)
2114 {
2115         int rc;
2116
2117         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2118         if (rc)
2119                 return rc;
2120         /* COMPAT_180 */
2121         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2122                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2123         else
2124                 rc = name_create(lovname, *logname, "-mdtlov");
2125         if (rc) {
2126                 name_destroy(logname);
2127                 *logname = NULL;
2128         }
2129         return rc;
2130 }
2131
2132 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2133                                       struct fs_db *fsdb, int i)
2134 {
2135         char suffix[16];
2136
2137         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2138                 sprintf(suffix, "-osc");
2139         else
2140                 sprintf(suffix, "-osc-MDT%04x", i);
2141         return name_create(oscname, ostname, suffix);
2142 }
2143
2144 /* add new mdc to already existent MDS */
2145 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2146                                     struct mgs_device *mgs,
2147                                     struct fs_db *fsdb,
2148                                     struct mgs_target_info *mti,
2149                                     int mdt_index, char *logname)
2150 {
2151         struct llog_handle      *llh = NULL;
2152         char    *nodeuuid = NULL;
2153         char    *ospname = NULL;
2154         char    *lovuuid = NULL;
2155         char    *mdtuuid = NULL;
2156         char    *svname = NULL;
2157         char    *mdtname = NULL;
2158         char    *lovname = NULL;
2159         char    index_str[16];
2160         int     i, rc;
2161
2162         ENTRY;
2163         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2164                 CERROR("log is empty! Logical error\n");
2165                 RETURN (-EINVAL);
2166         }
2167
2168         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2169                logname);
2170
2171         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2172         if (rc)
2173                 RETURN(rc);
2174
2175         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2176         if (rc)
2177                 GOTO(out_destory, rc);
2178
2179         rc = name_create(&svname, mdtname, "-osp");
2180         if (rc)
2181                 GOTO(out_destory, rc);
2182
2183         sprintf(index_str, "-MDT%04x", mdt_index);
2184         rc = name_create(&ospname, svname, index_str);
2185         if (rc)
2186                 GOTO(out_destory, rc);
2187
2188         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2189         if (rc)
2190                 GOTO(out_destory, rc);
2191
2192         rc = name_create(&lovuuid, lovname, "_UUID");
2193         if (rc)
2194                 GOTO(out_destory, rc);
2195
2196         rc = name_create(&mdtuuid, mdtname, "_UUID");
2197         if (rc)
2198                 GOTO(out_destory, rc);
2199
2200         rc = record_start_log(env, mgs, &llh, logname);
2201         if (rc)
2202                 GOTO(out_destory, rc);
2203
2204         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2205                            "add osp");
2206         if (rc)
2207                 GOTO(out_destory, rc);
2208
2209         for (i = 0; i < mti->mti_nid_count; i++) {
2210                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2211                        libcfs_nid2str(mti->mti_nids[i]));
2212                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2213                 if (rc)
2214                         GOTO(out_end, rc);
2215         }
2216
2217         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2218         if (rc)
2219                 GOTO(out_end, rc);
2220
2221         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2222                           NULL, NULL);
2223         if (rc)
2224                 GOTO(out_end, rc);
2225
2226         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2227         if (rc)
2228                 GOTO(out_end, rc);
2229
2230         /* Add mdc(osp) to lod */
2231         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2232                  mti->mti_stripe_index);
2233         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2234                          index_str, "1", NULL);
2235         if (rc)
2236                 GOTO(out_end, rc);
2237
2238         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2239         if (rc)
2240                 GOTO(out_end, rc);
2241
2242 out_end:
2243         record_end_log(env, &llh);
2244
2245 out_destory:
2246         name_destroy(&mdtuuid);
2247         name_destroy(&lovuuid);
2248         name_destroy(&lovname);
2249         name_destroy(&ospname);
2250         name_destroy(&svname);
2251         name_destroy(&nodeuuid);
2252         name_destroy(&mdtname);
2253         RETURN(rc);
2254 }
2255
2256 static int mgs_write_log_mdt0(const struct lu_env *env,
2257                               struct mgs_device *mgs,
2258                               struct fs_db *fsdb,
2259                               struct mgs_target_info *mti)
2260 {
2261         char *log = mti->mti_svname;
2262         struct llog_handle *llh = NULL;
2263         char *uuid, *lovname;
2264         char mdt_index[6];
2265         char *ptr = mti->mti_params;
2266         int rc = 0, failout = 0;
2267         ENTRY;
2268
2269         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2270         if (uuid == NULL)
2271                 RETURN(-ENOMEM);
2272
2273         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2274                 failout = (strncmp(ptr, "failout", 7) == 0);
2275
2276         rc = name_create(&lovname, log, "-mdtlov");
2277         if (rc)
2278                 GOTO(out_free, rc);
2279         if (mgs_log_is_empty(env, mgs, log)) {
2280                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2281                 if (rc)
2282                         GOTO(out_lod, rc);
2283         }
2284
2285         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2286
2287         rc = record_start_log(env, mgs, &llh, log);
2288         if (rc)
2289                 GOTO(out_lod, rc);
2290
2291         /* add MDT itself */
2292
2293         /* FIXME this whole fn should be a single journal transaction */
2294         sprintf(uuid, "%s_UUID", log);
2295         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2296         if (rc)
2297                 GOTO(out_lod, rc);
2298         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2299         if (rc)
2300                 GOTO(out_end, rc);
2301         rc = record_mount_opt(env, llh, log, lovname, NULL);
2302         if (rc)
2303                 GOTO(out_end, rc);
2304         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2305                         failout ? "n" : "f");
2306         if (rc)
2307                 GOTO(out_end, rc);
2308         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2309         if (rc)
2310                 GOTO(out_end, rc);
2311 out_end:
2312         record_end_log(env, &llh);
2313 out_lod:
2314         name_destroy(&lovname);
2315 out_free:
2316         OBD_FREE(uuid, sizeof(struct obd_uuid));
2317         RETURN(rc);
2318 }
2319
2320 /* envelope method for all layers log */
2321 static int mgs_write_log_mdt(const struct lu_env *env,
2322                              struct mgs_device *mgs,
2323                              struct fs_db *fsdb,
2324                              struct mgs_target_info *mti)
2325 {
2326         struct mgs_thread_info *mgi = mgs_env_info(env);
2327         struct llog_handle *llh = NULL;
2328         char *cliname;
2329         int rc, i = 0;
2330         ENTRY;
2331
2332         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2333
2334         if (mti->mti_uuid[0] == '\0') {
2335                 /* Make up our own uuid */
2336                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2337                          "%s_UUID", mti->mti_svname);
2338         }
2339
2340         /* add mdt */
2341         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2342         if (rc)
2343                 RETURN(rc);
2344         /* Append the mdt info to the client log */
2345         rc = name_create(&cliname, mti->mti_fsname, "-client");
2346         if (rc)
2347                 RETURN(rc);
2348
2349         if (mgs_log_is_empty(env, mgs, cliname)) {
2350                 /* Start client log */
2351                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2352                                        fsdb->fsdb_clilov);
2353                 if (rc)
2354                         GOTO(out_free, rc);
2355                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2356                                        fsdb->fsdb_clilmv);
2357                 if (rc)
2358                         GOTO(out_free, rc);
2359         }
2360
2361         /*
2362         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2363         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2364         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2365         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2366         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2367         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2368         */
2369
2370                 /* copy client info about lov/lmv */
2371                 mgi->mgi_comp.comp_mti = mti;
2372                 mgi->mgi_comp.comp_fsdb = fsdb;
2373
2374                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2375                                                         &mgi->mgi_comp);
2376                 if (rc)
2377                         GOTO(out_free, rc);
2378                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2379                                               fsdb->fsdb_clilmv);
2380                 if (rc)
2381                         GOTO(out_free, rc);
2382
2383                 /* add mountopts */
2384                 rc = record_start_log(env, mgs, &llh, cliname);
2385                 if (rc)
2386                         GOTO(out_free, rc);
2387
2388                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2389                                    "mount opts");
2390                 if (rc)
2391                         GOTO(out_end, rc);
2392                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2393                                       fsdb->fsdb_clilmv);
2394                 if (rc)
2395                         GOTO(out_end, rc);
2396                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2397                                    "mount opts");
2398
2399         if (rc)
2400                 GOTO(out_end, rc);
2401
2402         /* for_all_existing_mdt except current one */
2403         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2404                 if (i !=  mti->mti_stripe_index &&
2405                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2406                         char *logname;
2407
2408                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2409                         if (rc)
2410                                 GOTO(out_end, rc);
2411
2412                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2413                                                       i, logname);
2414                         name_destroy(&logname);
2415                         if (rc)
2416                                 GOTO(out_end, rc);
2417                 }
2418         }
2419 out_end:
2420         record_end_log(env, &llh);
2421 out_free:
2422         name_destroy(&cliname);
2423         RETURN(rc);
2424 }
2425
2426 /* Add the ost info to the client/mdt lov */
2427 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2428                                     struct mgs_device *mgs, struct fs_db *fsdb,
2429                                     struct mgs_target_info *mti,
2430                                     char *logname, char *suffix, char *lovname,
2431                                     enum lustre_sec_part sec_part, int flags)
2432 {
2433         struct llog_handle *llh = NULL;
2434         char *nodeuuid = NULL;
2435         char *oscname = NULL;
2436         char *oscuuid = NULL;
2437         char *lovuuid = NULL;
2438         char *svname = NULL;
2439         char index[6];
2440         int i, rc;
2441
2442         ENTRY;
2443         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2444                mti->mti_svname, logname);
2445
2446         if (mgs_log_is_empty(env, mgs, logname)) {
2447                 CERROR("log is empty! Logical error\n");
2448                 RETURN (-EINVAL);
2449         }
2450
2451         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2452         if (rc)
2453                 RETURN(rc);
2454         rc = name_create(&svname, mti->mti_svname, "-osc");
2455         if (rc)
2456                 GOTO(out_free, rc);
2457
2458         /* for the system upgraded from old 1.8, keep using the old osc naming
2459          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2460         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2461                 rc = name_create(&oscname, svname, "");
2462         else
2463                 rc = name_create(&oscname, svname, suffix);
2464         if (rc)
2465                 GOTO(out_free, rc);
2466
2467         rc = name_create(&oscuuid, oscname, "_UUID");
2468         if (rc)
2469                 GOTO(out_free, rc);
2470         rc = name_create(&lovuuid, lovname, "_UUID");
2471         if (rc)
2472                 GOTO(out_free, rc);
2473
2474
2475         /*
2476         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2477         multihomed (#4)
2478         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2479         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2480         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2481         failover (#6,7)
2482         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2483         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2484         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2485         */
2486
2487         rc = record_start_log(env, mgs, &llh, logname);
2488         if (rc)
2489                 GOTO(out_free, rc);
2490
2491         /* FIXME these should be a single journal transaction */
2492         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2493                            "add osc");
2494         if (rc)
2495                 GOTO(out_end, rc);
2496
2497         /* NB: don't change record order, because upon MDT steal OSC config
2498          * from client, it treats all nids before LCFG_SETUP as target nids
2499          * (multiple interfaces), while nids after as failover node nids.
2500          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2501          */
2502         for (i = 0; i < mti->mti_nid_count; i++) {
2503                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2504                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2505                 if (rc)
2506                         GOTO(out_end, rc);
2507         }
2508         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2509         if (rc)
2510                 GOTO(out_end, rc);
2511         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2512         if (rc)
2513                 GOTO(out_end, rc);
2514         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2515         if (rc)
2516                 GOTO(out_end, rc);
2517
2518         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2519
2520         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2521         if (rc)
2522                 GOTO(out_end, rc);
2523         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2524                            "add osc");
2525         if (rc)
2526                 GOTO(out_end, rc);
2527 out_end:
2528         record_end_log(env, &llh);
2529 out_free:
2530         name_destroy(&lovuuid);
2531         name_destroy(&oscuuid);
2532         name_destroy(&oscname);
2533         name_destroy(&svname);
2534         name_destroy(&nodeuuid);
2535         RETURN(rc);
2536 }
2537
2538 static int mgs_write_log_ost(const struct lu_env *env,
2539                              struct mgs_device *mgs, struct fs_db *fsdb,
2540                              struct mgs_target_info *mti)
2541 {
2542         struct llog_handle *llh = NULL;
2543         char *logname, *lovname;
2544         char *ptr = mti->mti_params;
2545         int rc, flags = 0, failout = 0, i;
2546         ENTRY;
2547
2548         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2549
2550         /* The ost startup log */
2551
2552         /* If the ost log already exists, that means that someone reformatted
2553            the ost and it called target_add again. */
2554         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2555                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2556                                    "exists, yet the server claims it never "
2557                                    "registered. It may have been reformatted, "
2558                                    "or the index changed. writeconf the MDT to "
2559                                    "regenerate all logs.\n", mti->mti_svname);
2560                 RETURN(-EALREADY);
2561         }
2562
2563         /*
2564         attach obdfilter ost1 ost1_UUID
2565         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2566         */
2567         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2568                 failout = (strncmp(ptr, "failout", 7) == 0);
2569         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2570         if (rc)
2571                 RETURN(rc);
2572         /* FIXME these should be a single journal transaction */
2573         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2574         if (rc)
2575                 GOTO(out_end, rc);
2576         if (*mti->mti_uuid == '\0')
2577                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2578                          "%s_UUID", mti->mti_svname);
2579         rc = record_attach(env, llh, mti->mti_svname,
2580                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2581         if (rc)
2582                 GOTO(out_end, rc);
2583         rc = record_setup(env, llh, mti->mti_svname,
2584                           "dev"/*ignored*/, "type"/*ignored*/,
2585                           failout ? "n" : "f", 0/*options*/);
2586         if (rc)
2587                 GOTO(out_end, rc);
2588         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2589         if (rc)
2590                 GOTO(out_end, rc);
2591 out_end:
2592         record_end_log(env, &llh);
2593         if (rc)
2594                 RETURN(rc);
2595         /* We also have to update the other logs where this osc is part of
2596            the lov */
2597
2598         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2599                 /* If we're upgrading, the old mdt log already has our
2600                    entry. Let's do a fake one for fun. */
2601                 /* Note that we can't add any new failnids, since we don't
2602                    know the old osc names. */
2603                 flags = CM_SKIP | CM_UPGRADE146;
2604
2605         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2606                 /* If the update flag isn't set, don't update client/mdt
2607                    logs. */
2608                 flags |= CM_SKIP;
2609                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2610                               "the MDT first to regenerate it.\n",
2611                               mti->mti_svname);
2612         }
2613
2614         /* Add ost to all MDT lov defs */
2615         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2616                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2617                         char mdt_index[9];
2618
2619                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2620                                                      i);
2621                         if (rc)
2622                                 RETURN(rc);
2623                         sprintf(mdt_index, "-MDT%04x", i);
2624                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2625                                                       logname, mdt_index,
2626                                                       lovname, LUSTRE_SP_MDT,
2627                                                       flags);
2628                         name_destroy(&logname);
2629                         name_destroy(&lovname);
2630                         if (rc)
2631                                 RETURN(rc);
2632                 }
2633         }
2634
2635         /* Append ost info to the client log */
2636         rc = name_create(&logname, mti->mti_fsname, "-client");
2637         if (rc)
2638                 RETURN(rc);
2639         if (mgs_log_is_empty(env, mgs, logname)) {
2640                 /* Start client log */
2641                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2642                                        fsdb->fsdb_clilov);
2643                 if (rc)
2644                         GOTO(out_free, rc);
2645                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2646                                        fsdb->fsdb_clilmv);
2647                 if (rc)
2648                         GOTO(out_free, rc);
2649         }
2650         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2651                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2652 out_free:
2653         name_destroy(&logname);
2654         RETURN(rc);
2655 }
2656
2657 static __inline__ int mgs_param_empty(char *ptr)
2658 {
2659         char *tmp;
2660
2661         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2662                 return 1;
2663         return 0;
2664 }
2665
2666 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2667                                           struct mgs_device *mgs,
2668                                           struct fs_db *fsdb,
2669                                           struct mgs_target_info *mti,
2670                                           char *logname, char *cliname)
2671 {
2672         int rc;
2673         struct llog_handle *llh = NULL;
2674
2675         if (mgs_param_empty(mti->mti_params)) {
2676                 /* Remove _all_ failnids */
2677                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2678                                 mti->mti_svname, "add failnid", CM_SKIP);
2679                 return rc < 0 ? rc : 0;
2680         }
2681
2682         /* Otherwise failover nids are additive */
2683         rc = record_start_log(env, mgs, &llh, logname);
2684         if (rc)
2685                 return rc;
2686                 /* FIXME this should be a single journal transaction */
2687         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2688                            "add failnid");
2689         if (rc)
2690                 goto out_end;
2691         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2692         if (rc)
2693                 goto out_end;
2694         rc = record_marker(env, llh, fsdb, CM_END,
2695                            mti->mti_svname, "add failnid");
2696 out_end:
2697         record_end_log(env, &llh);
2698         return rc;
2699 }
2700
2701
2702 /* Add additional failnids to an existing log.
2703    The mdc/osc must have been added to logs first */
2704 /* tcp nids must be in dotted-quad ascii -
2705    we can't resolve hostnames from the kernel. */
2706 static int mgs_write_log_add_failnid(const struct lu_env *env,
2707                                      struct mgs_device *mgs,
2708                                      struct fs_db *fsdb,
2709                                      struct mgs_target_info *mti)
2710 {
2711         char *logname, *cliname;
2712         int rc;
2713         ENTRY;
2714
2715         /* FIXME we currently can't erase the failnids
2716          * given when a target first registers, since they aren't part of
2717          * an "add uuid" stanza */
2718
2719         /* Verify that we know about this target */
2720         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2721                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2722                                    "yet. It must be started before failnids "
2723                                    "can be added.\n", mti->mti_svname);
2724                 RETURN(-ENOENT);
2725         }
2726
2727         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2728         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2729                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2730         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2731                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2732         } else {
2733                 RETURN(-EINVAL);
2734         }
2735         if (rc)
2736                 RETURN(rc);
2737         /* Add failover nids to the client log */
2738         rc = name_create(&logname, mti->mti_fsname, "-client");
2739         if (rc) {
2740                 name_destroy(&cliname);
2741                 RETURN(rc);
2742         }
2743         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2744         name_destroy(&logname);
2745         name_destroy(&cliname);
2746         if (rc)
2747                 RETURN(rc);
2748
2749         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2750                 /* Add OST failover nids to the MDT logs as well */
2751                 int i;
2752
2753                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2754                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2755                                 continue;
2756                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2757                         if (rc)
2758                                 RETURN(rc);
2759                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2760                                                  fsdb, i);
2761                         if (rc) {
2762                                 name_destroy(&logname);
2763                                 RETURN(rc);
2764                         }
2765                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2766                                                             mti, logname,
2767                                                             cliname);
2768                         name_destroy(&cliname);
2769                         name_destroy(&logname);
2770                         if (rc)
2771                                 RETURN(rc);
2772                 }
2773         }
2774
2775         RETURN(rc);
2776 }
2777
2778 static int mgs_wlp_lcfg(const struct lu_env *env,
2779                         struct mgs_device *mgs, struct fs_db *fsdb,
2780                         struct mgs_target_info *mti,
2781                         char *logname, struct lustre_cfg_bufs *bufs,
2782                         char *tgtname, char *ptr)
2783 {
2784         char comment[MTI_NAME_MAXLEN];
2785         char *tmp;
2786         struct lustre_cfg *lcfg;
2787         int rc, del;
2788
2789         /* Erase any old settings of this same parameter */
2790         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2791         comment[MTI_NAME_MAXLEN - 1] = 0;
2792         /* But don't try to match the value. */
2793         if ((tmp = strchr(comment, '=')))
2794             *tmp = 0;
2795         /* FIXME we should skip settings that are the same as old values */
2796         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2797         if (rc < 0)
2798                 return rc;
2799         del = mgs_param_empty(ptr);
2800
2801         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2802                       "Sett" : "Modify", tgtname, comment, logname);
2803         if (del)
2804                 return rc;
2805
2806         lustre_cfg_bufs_reset(bufs, tgtname);
2807         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2808         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2809         if (!lcfg)
2810                 return -ENOMEM;
2811         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2812         lustre_cfg_free(lcfg);
2813         return rc;
2814 }
2815
2816 /* write global variable settings into log */
2817 static int mgs_write_log_sys(const struct lu_env *env,
2818                              struct mgs_device *mgs, struct fs_db *fsdb,
2819                              struct mgs_target_info *mti, char *sys, char *ptr)
2820 {
2821         struct mgs_thread_info *mgi = mgs_env_info(env);
2822         struct lustre_cfg *lcfg;
2823         char *tmp, sep;
2824         int rc, cmd, convert = 1;
2825
2826         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2827                 cmd = LCFG_SET_TIMEOUT;
2828         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2829                 cmd = LCFG_SET_LDLM_TIMEOUT;
2830         /* Check for known params here so we can return error to lctl */
2831         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2832                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2833                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2834                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2835                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2836                 cmd = LCFG_PARAM;
2837         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2838                 convert = 0; /* Don't convert string value to integer */
2839                 cmd = LCFG_PARAM;
2840         } else {
2841                 return -EINVAL;
2842         }
2843
2844         if (mgs_param_empty(ptr))
2845                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2846         else
2847                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2848
2849         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2850         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2851         if (!convert && *tmp != '\0')
2852                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2853         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2854         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2855         /* truncate the comment to the parameter name */
2856         ptr = tmp - 1;
2857         sep = *ptr;
2858         *ptr = '\0';
2859         /* modify all servers and clients */
2860         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2861                                       *tmp == '\0' ? NULL : lcfg,
2862                                       mti->mti_fsname, sys, 0);
2863         if (rc == 0 && *tmp != '\0') {
2864                 switch (cmd) {
2865                 case LCFG_SET_TIMEOUT:
2866                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2867                                 class_process_config(lcfg);
2868                         break;
2869                 case LCFG_SET_LDLM_TIMEOUT:
2870                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2871                                 class_process_config(lcfg);
2872                         break;
2873                 default:
2874                         break;
2875                 }
2876         }
2877         *ptr = sep;
2878         lustre_cfg_free(lcfg);
2879         return rc;
2880 }
2881
2882 /* write quota settings into log */
2883 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2884                                struct fs_db *fsdb, struct mgs_target_info *mti,
2885                                char *quota, char *ptr)
2886 {
2887         struct mgs_thread_info  *mgi = mgs_env_info(env);
2888         struct lustre_cfg       *lcfg;
2889         char                    *tmp;
2890         char                     sep;
2891         int                      rc, cmd = LCFG_PARAM;
2892
2893         /* support only 'meta' and 'data' pools so far */
2894         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2895             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2896                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2897                        "& quota.ost are)\n", ptr);
2898                 return -EINVAL;
2899         }
2900
2901         if (*tmp == '\0') {
2902                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2903         } else {
2904                 CDEBUG(D_MGS, "global '%s'\n", quota);
2905
2906                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2907                     strcmp(tmp, "none") != 0) {
2908                         CERROR("enable option(%s) isn't supported\n", tmp);
2909                         return -EINVAL;
2910                 }
2911         }
2912
2913         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2914         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2915         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2916         /* truncate the comment to the parameter name */
2917         ptr = tmp - 1;
2918         sep = *ptr;
2919         *ptr = '\0';
2920
2921         /* XXX we duplicated quota enable information in all server
2922          *     config logs, it should be moved to a separate config
2923          *     log once we cleanup the config log for global param. */
2924         /* modify all servers */
2925         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2926                                       *tmp == '\0' ? NULL : lcfg,
2927                                       mti->mti_fsname, quota, 1);
2928         *ptr = sep;
2929         lustre_cfg_free(lcfg);
2930         return rc < 0 ? rc : 0;
2931 }
2932
2933 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2934                                    struct mgs_device *mgs,
2935                                    struct fs_db *fsdb,
2936                                    struct mgs_target_info *mti,
2937                                    char *param)
2938 {
2939         struct mgs_thread_info *mgi = mgs_env_info(env);
2940         struct llog_handle     *llh = NULL;
2941         char                   *logname;
2942         char                   *comment, *ptr;
2943         struct lustre_cfg      *lcfg;
2944         int                     rc, len;
2945         ENTRY;
2946
2947         /* get comment */
2948         ptr = strchr(param, '=');
2949         LASSERT(ptr);
2950         len = ptr - param;
2951
2952         OBD_ALLOC(comment, len + 1);
2953         if (comment == NULL)
2954                 RETURN(-ENOMEM);
2955         strncpy(comment, param, len);
2956         comment[len] = '\0';
2957
2958         /* prepare lcfg */
2959         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2960         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2961         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2962         if (lcfg == NULL)
2963                 GOTO(out_comment, rc = -ENOMEM);
2964
2965         /* construct log name */
2966         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2967         if (rc)
2968                 GOTO(out_lcfg, rc);
2969
2970         if (mgs_log_is_empty(env, mgs, logname)) {
2971                 rc = record_start_log(env, mgs, &llh, logname);
2972                 if (rc)
2973                         GOTO(out, rc);
2974                 record_end_log(env, &llh);
2975         }
2976
2977         /* obsolete old one */
2978         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2979                         comment, CM_SKIP);
2980         if (rc < 0)
2981                 GOTO(out, rc);
2982         /* write the new one */
2983         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2984                                   mti->mti_svname, comment);
2985         if (rc)
2986                 CERROR("err %d writing log %s\n", rc, logname);
2987 out:
2988         name_destroy(&logname);
2989 out_lcfg:
2990         lustre_cfg_free(lcfg);
2991 out_comment:
2992         OBD_FREE(comment, len + 1);
2993         RETURN(rc);
2994 }
2995
2996 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2997                                         char *param)
2998 {
2999         char    *ptr;
3000
3001         /* disable the adjustable udesc parameter for now, i.e. use default
3002          * setting that client always ship udesc to MDT if possible. to enable
3003          * it simply remove the following line */
3004         goto error_out;
3005
3006         ptr = strchr(param, '=');
3007         if (ptr == NULL)
3008                 goto error_out;
3009         *ptr++ = '\0';
3010
3011         if (strcmp(param, PARAM_SRPC_UDESC))
3012                 goto error_out;
3013
3014         if (strcmp(ptr, "yes") == 0) {
3015                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3016                 CWARN("Enable user descriptor shipping from client to MDT\n");
3017         } else if (strcmp(ptr, "no") == 0) {
3018                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3019                 CWARN("Disable user descriptor shipping from client to MDT\n");
3020         } else {
3021                 *(ptr - 1) = '=';
3022                 goto error_out;
3023         }
3024         return 0;
3025
3026 error_out:
3027         CERROR("Invalid param: %s\n", param);
3028         return -EINVAL;
3029 }
3030
3031 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3032                                   const char *svname,
3033                                   char *param)
3034 {
3035         struct sptlrpc_rule      rule;
3036         struct sptlrpc_rule_set *rset;
3037         int                      rc;
3038         ENTRY;
3039
3040         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3041                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3042                 RETURN(-EINVAL);
3043         }
3044
3045         if (strncmp(param, PARAM_SRPC_UDESC,
3046                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3047                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3048         }
3049
3050         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3051                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3052                 RETURN(-EINVAL);
3053         }
3054
3055         param += sizeof(PARAM_SRPC_FLVR) - 1;
3056
3057         rc = sptlrpc_parse_rule(param, &rule);
3058         if (rc)
3059                 RETURN(rc);
3060
3061         /* mgs rules implies must be mgc->mgs */
3062         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3063                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3064                      rule.sr_from != LUSTRE_SP_ANY) ||
3065                     (rule.sr_to != LUSTRE_SP_MGS &&
3066                      rule.sr_to != LUSTRE_SP_ANY))
3067                         RETURN(-EINVAL);
3068         }
3069
3070         /* preapre room for this coming rule. svcname format should be:
3071          * - fsname: general rule
3072          * - fsname-tgtname: target-specific rule
3073          */
3074         if (strchr(svname, '-')) {
3075                 struct mgs_tgt_srpc_conf *tgtconf;
3076                 int                       found = 0;
3077
3078                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3079                      tgtconf = tgtconf->mtsc_next) {
3080                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3081                                 found = 1;
3082                                 break;
3083                         }
3084                 }
3085
3086                 if (!found) {
3087                         int name_len;
3088
3089                         OBD_ALLOC_PTR(tgtconf);
3090                         if (tgtconf == NULL)
3091                                 RETURN(-ENOMEM);
3092
3093                         name_len = strlen(svname);
3094
3095                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3096                         if (tgtconf->mtsc_tgt == NULL) {
3097                                 OBD_FREE_PTR(tgtconf);
3098                                 RETURN(-ENOMEM);
3099                         }
3100                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3101
3102                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3103                         fsdb->fsdb_srpc_tgt = tgtconf;
3104                 }
3105
3106                 rset = &tgtconf->mtsc_rset;
3107         } else {
3108                 rset = &fsdb->fsdb_srpc_gen;
3109         }
3110
3111         rc = sptlrpc_rule_set_merge(rset, &rule);
3112
3113         RETURN(rc);
3114 }
3115
3116 static int mgs_srpc_set_param(const struct lu_env *env,
3117                               struct mgs_device *mgs,
3118                               struct fs_db *fsdb,
3119                               struct mgs_target_info *mti,
3120                               char *param)
3121 {
3122         char                   *copy;
3123         int                     rc, copy_size;
3124         ENTRY;
3125
3126 #ifndef HAVE_GSS
3127         RETURN(-EINVAL);
3128 #endif
3129         /* keep a copy of original param, which could be destroied
3130          * during parsing */
3131         copy_size = strlen(param) + 1;
3132         OBD_ALLOC(copy, copy_size);
3133         if (copy == NULL)
3134                 return -ENOMEM;
3135         memcpy(copy, param, copy_size);
3136
3137         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3138         if (rc)
3139                 goto out_free;
3140
3141         /* previous steps guaranteed the syntax is correct */
3142         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3143         if (rc)
3144                 goto out_free;
3145
3146         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3147                 /*
3148                  * for mgs rules, make them effective immediately.
3149                  */
3150                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3151                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3152                                                  &fsdb->fsdb_srpc_gen);
3153         }
3154
3155 out_free:
3156         OBD_FREE(copy, copy_size);
3157         RETURN(rc);
3158 }
3159
3160 struct mgs_srpc_read_data {
3161         struct fs_db   *msrd_fsdb;
3162         int             msrd_skip;
3163 };
3164
3165 static int mgs_srpc_read_handler(const struct lu_env *env,
3166                                  struct llog_handle *llh,
3167                                  struct llog_rec_hdr *rec, void *data)
3168 {
3169         struct mgs_srpc_read_data *msrd = data;
3170         struct cfg_marker         *marker;
3171         struct lustre_cfg         *lcfg = REC_DATA(rec);
3172         char                      *svname, *param;
3173         int                        cfg_len, rc;
3174         ENTRY;
3175
3176         if (rec->lrh_type != OBD_CFG_REC) {
3177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3178                 RETURN(-EINVAL);
3179         }
3180
3181         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3182                   sizeof(struct llog_rec_tail);
3183
3184         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3185         if (rc) {
3186                 CERROR("Insane cfg\n");
3187                 RETURN(rc);
3188         }
3189
3190         if (lcfg->lcfg_command == LCFG_MARKER) {
3191                 marker = lustre_cfg_buf(lcfg, 1);
3192
3193                 if (marker->cm_flags & CM_START &&
3194                     marker->cm_flags & CM_SKIP)
3195                         msrd->msrd_skip = 1;
3196                 if (marker->cm_flags & CM_END)
3197                         msrd->msrd_skip = 0;
3198
3199                 RETURN(0);
3200         }
3201
3202         if (msrd->msrd_skip)
3203                 RETURN(0);
3204
3205         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3206                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3207                 RETURN(0);
3208         }
3209
3210         svname = lustre_cfg_string(lcfg, 0);
3211         if (svname == NULL) {
3212                 CERROR("svname is empty\n");
3213                 RETURN(0);
3214         }
3215
3216         param = lustre_cfg_string(lcfg, 1);
3217         if (param == NULL) {
3218                 CERROR("param is empty\n");
3219                 RETURN(0);
3220         }
3221
3222         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3223         if (rc)
3224                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3225
3226         RETURN(0);
3227 }
3228
3229 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3230                                 struct mgs_device *mgs,
3231                                 struct fs_db *fsdb)
3232 {
3233         struct llog_handle        *llh = NULL;
3234         struct llog_ctxt          *ctxt;
3235         char                      *logname;
3236         struct mgs_srpc_read_data  msrd;
3237         int                        rc;
3238         ENTRY;
3239
3240         /* construct log name */
3241         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3242         if (rc)
3243                 RETURN(rc);
3244
3245         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3246         LASSERT(ctxt != NULL);
3247
3248         if (mgs_log_is_empty(env, mgs, logname))
3249                 GOTO(out, rc = 0);
3250
3251         rc = llog_open(env, ctxt, &llh, NULL, logname,
3252                        LLOG_OPEN_EXISTS);
3253         if (rc < 0) {
3254                 if (rc == -ENOENT)
3255                         rc = 0;
3256                 GOTO(out, rc);
3257         }
3258
3259         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3260         if (rc)
3261                 GOTO(out_close, rc);
3262
3263         if (llog_get_size(llh) <= 1)
3264                 GOTO(out_close, rc = 0);
3265
3266         msrd.msrd_fsdb = fsdb;
3267         msrd.msrd_skip = 0;
3268
3269         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3270                           NULL);
3271
3272 out_close:
3273         llog_close(env, llh);
3274 out:
3275         llog_ctxt_put(ctxt);
3276         name_destroy(&logname);
3277
3278         if (rc)
3279                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3280         RETURN(rc);
3281 }
3282
3283 /* Permanent settings of all parameters by writing into the appropriate
3284  * configuration logs.
3285  * A parameter with null value ("<param>='\0'") means to erase it out of
3286  * the logs.
3287  */
3288 static int mgs_write_log_param(const struct lu_env *env,
3289                                struct mgs_device *mgs, struct fs_db *fsdb,
3290                                struct mgs_target_info *mti, char *ptr)
3291 {
3292         struct mgs_thread_info *mgi = mgs_env_info(env);
3293         char *logname;
3294         char *tmp;
3295         int rc = 0, rc2 = 0;
3296         ENTRY;
3297
3298         /* For various parameter settings, we have to figure out which logs
3299            care about them (e.g. both mdt and client for lov settings) */
3300         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3301
3302         /* The params are stored in MOUNT_DATA_FILE and modified via
3303            tunefs.lustre, or set using lctl conf_param */
3304
3305         /* Processed in lustre_start_mgc */
3306         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3307                 GOTO(end, rc);
3308
3309         /* Processed in ost/mdt */
3310         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3311                 GOTO(end, rc);
3312
3313         /* Processed in mgs_write_log_ost */
3314         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3315                 if (mti->mti_flags & LDD_F_PARAM) {
3316                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3317                                            "changed with tunefs.lustre"
3318                                            "and --writeconf\n", ptr);
3319                         rc = -EPERM;
3320                 }
3321                 GOTO(end, rc);
3322         }
3323
3324         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3325                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3326                 GOTO(end, rc);
3327         }
3328
3329         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3330                 /* Add a failover nidlist */
3331                 rc = 0;
3332                 /* We already processed failovers params for new
3333                    targets in mgs_write_log_target */
3334                 if (mti->mti_flags & LDD_F_PARAM) {
3335                         CDEBUG(D_MGS, "Adding failnode\n");
3336                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3337                 }
3338                 GOTO(end, rc);
3339         }
3340
3341         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3342                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3343                 GOTO(end, rc);
3344         }
3345
3346         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3347                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3348                 GOTO(end, rc);
3349         }
3350
3351         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3352                 /* active=0 means off, anything else means on */
3353                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3354                 int i;
3355
3356                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3357                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3358                                            "be (de)activated.\n",
3359                                            mti->mti_svname);
3360                         GOTO(end, rc = -EINVAL);
3361                 }
3362                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3363                               flag ? "de": "re", mti->mti_svname);
3364                 /* Modify clilov */
3365                 rc = name_create(&logname, mti->mti_fsname, "-client");
3366                 if (rc)
3367                         GOTO(end, rc);
3368                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3369                                 mti->mti_svname, "add osc", flag);
3370                 name_destroy(&logname);
3371                 if (rc)
3372                         goto active_err;
3373                 /* Modify mdtlov */
3374                 /* Add to all MDT logs for CMD */
3375                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3376                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3377                                 continue;
3378                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3379                         if (rc)
3380                                 GOTO(end, rc);
3381                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3382                                         mti->mti_svname, "add osc", flag);
3383                         name_destroy(&logname);
3384                         if (rc)
3385                                 goto active_err;
3386                 }
3387         active_err:
3388                 if (rc) {
3389                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3390                                            "log (%d). No permanent "
3391                                            "changes were made to the "
3392                                            "config log.\n",
3393                                            mti->mti_svname, rc);
3394                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3395                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3396                                                    " because the log"
3397                                                    "is in the old 1.4"
3398                                                    "style. Consider "
3399                                                    " --writeconf to "
3400                                                    "update the logs.\n");
3401                         GOTO(end, rc);
3402                 }
3403                 /* Fall through to osc proc for deactivating live OSC
3404                    on running MDT / clients. */
3405         }
3406         /* Below here, let obd's XXX_process_config methods handle it */
3407
3408         /* All lov. in proc */
3409         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3410                 char *mdtlovname;
3411
3412                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3413                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3414                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3415                                            "set on the MDT, not %s. "
3416                                            "Ignoring.\n",
3417                                            mti->mti_svname);
3418                         GOTO(end, rc = 0);
3419                 }
3420
3421                 /* Modify mdtlov */
3422                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3423                         GOTO(end, rc = -ENODEV);
3424
3425                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3426                                              mti->mti_stripe_index);
3427                 if (rc)
3428                         GOTO(end, rc);
3429                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3430                                   &mgi->mgi_bufs, mdtlovname, ptr);
3431                 name_destroy(&logname);
3432                 name_destroy(&mdtlovname);
3433                 if (rc)
3434                         GOTO(end, rc);
3435
3436                 /* Modify clilov */
3437                 rc = name_create(&logname, mti->mti_fsname, "-client");
3438                 if (rc)
3439                         GOTO(end, rc);
3440                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3441                                   fsdb->fsdb_clilov, ptr);
3442                 name_destroy(&logname);
3443                 GOTO(end, rc);
3444         }
3445
3446         /* All osc., mdc., llite. params in proc */
3447         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3448             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3449             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3450                 char *cname;
3451
3452                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3453                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3454                                            " cannot be modified. Consider"
3455                                            " updating the configuration with"
3456                                            " --writeconf\n",
3457                                            mti->mti_svname);
3458                         GOTO(end, rc = -EINVAL);
3459                 }
3460                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3461                         rc = name_create(&cname, mti->mti_fsname, "-client");
3462                         /* Add the client type to match the obdname in
3463                            class_config_llog_handler */
3464                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3465                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3466                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3467                         rc = name_create(&cname, mti->mti_svname, "-osc");
3468                 } else {
3469                         GOTO(end, rc = -EINVAL);
3470                 }
3471                 if (rc)
3472                         GOTO(end, rc);
3473
3474                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3475
3476                 /* Modify client */
3477                 rc = name_create(&logname, mti->mti_fsname, "-client");
3478                 if (rc) {
3479                         name_destroy(&cname);
3480                         GOTO(end, rc);
3481                 }
3482                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3483                                   cname, ptr);
3484
3485                 /* osc params affect the MDT as well */
3486                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3487                         int i;
3488
3489                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3490                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3491                                         continue;
3492                                 name_destroy(&cname);
3493                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3494                                                          fsdb, i);
3495                                 name_destroy(&logname);
3496                                 if (rc)
3497                                         break;
3498                                 rc = name_create_mdt(&logname,
3499                                                      mti->mti_fsname, i);
3500                                 if (rc)
3501                                         break;
3502                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3503                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3504                                                           mti, logname,
3505                                                           &mgi->mgi_bufs,
3506                                                           cname, ptr);
3507                                         if (rc)
3508                                                 break;
3509                                 }
3510                         }
3511                 }
3512                 name_destroy(&logname);
3513                 name_destroy(&cname);
3514                 GOTO(end, rc);
3515         }
3516
3517         /* All mdt. params in proc */
3518         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3519                 int i;
3520                 __u32 idx;
3521
3522                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3523                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3524                             MTI_NAME_MAXLEN) == 0)
3525                         /* device is unspecified completely? */
3526                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3527                 else
3528                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3529                 if (rc < 0)
3530                         goto active_err;
3531                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3532                         goto active_err;
3533                 if (rc & LDD_F_SV_ALL) {
3534                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3535                                 if (!test_bit(i,
3536                                                   fsdb->fsdb_mdt_index_map))
3537                                         continue;
3538                                 rc = name_create_mdt(&logname,
3539                                                 mti->mti_fsname, i);
3540                                 if (rc)
3541                                         goto active_err;
3542                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3543                                                   logname, &mgi->mgi_bufs,
3544                                                   logname, ptr);
3545                                 name_destroy(&logname);
3546                                 if (rc)
3547                                         goto active_err;
3548                         }
3549                 } else {
3550                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3551                                           mti->mti_svname, &mgi->mgi_bufs,
3552                                           mti->mti_svname, ptr);
3553                         if (rc)
3554                                 goto active_err;
3555                 }
3556                 GOTO(end, rc);
3557         }
3558
3559         /* All mdd., ost. params in proc */
3560         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3561             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
3562                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3563                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3564                         GOTO(end, rc = -ENODEV);
3565
3566                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3567                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3568                 GOTO(end, rc);
3569         }
3570
3571         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3572         rc2 = -ENOSYS;
3573
3574 end:
3575         if (rc)
3576                 CERROR("err %d on param '%s'\n", rc, ptr);
3577
3578         RETURN(rc ?: rc2);
3579 }
3580
3581 /* Not implementing automatic failover nid addition at this time. */
3582 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3583                       struct mgs_target_info *mti)
3584 {
3585 #if 0
3586         struct fs_db *fsdb;
3587         int rc;
3588         ENTRY;
3589
3590         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3591         if (rc)
3592                 RETURN(rc);
3593
3594         if (mgs_log_is_empty(obd, mti->mti_svname))
3595                 /* should never happen */
3596                 RETURN(-ENOENT);
3597
3598         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3599
3600         /* FIXME We can just check mti->params to see if we're already in
3601            the failover list.  Modify mti->params for rewriting back at
3602            server_register_target(). */
3603
3604         mutex_lock(&fsdb->fsdb_mutex);
3605         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3606         mutex_unlock(&fsdb->fsdb_mutex);
3607
3608         RETURN(rc);
3609 #endif
3610         return 0;
3611 }
3612
3613 int mgs_write_log_target(const struct lu_env *env,
3614                          struct mgs_device *mgs,
3615                          struct mgs_target_info *mti,
3616                          struct fs_db *fsdb)
3617 {
3618         int rc = -EINVAL;
3619         char *buf, *params;
3620         ENTRY;
3621
3622         /* set/check the new target index */
3623         rc = mgs_set_index(env, mgs, mti);
3624         if (rc < 0) {
3625                 CERROR("Can't get index (%d)\n", rc);
3626                 RETURN(rc);
3627         }
3628
3629         if (rc == EALREADY) {
3630                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3631                               mti->mti_stripe_index, mti->mti_svname);
3632                 /* We would like to mark old log sections as invalid
3633                    and add new log sections in the client and mdt logs.
3634                    But if we add new sections, then live clients will
3635                    get repeat setup instructions for already running
3636                    osc's. So don't update the client/mdt logs. */
3637                 mti->mti_flags &= ~LDD_F_UPDATE;
3638                 rc = 0;
3639         }
3640
3641         mutex_lock(&fsdb->fsdb_mutex);
3642
3643         if (mti->mti_flags &
3644             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3645                 /* Generate a log from scratch */
3646                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3647                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3648                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3649                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3650                 } else {
3651                         CERROR("Unknown target type %#x, can't create log for "
3652                                "%s\n", mti->mti_flags, mti->mti_svname);
3653                 }
3654                 if (rc) {
3655                         CERROR("Can't write logs for %s (%d)\n",
3656                                mti->mti_svname, rc);
3657                         GOTO(out_up, rc);
3658                 }
3659         } else {
3660                 /* Just update the params from tunefs in mgs_write_log_params */
3661                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3662                 mti->mti_flags |= LDD_F_PARAM;
3663         }
3664
3665         /* allocate temporary buffer, where class_get_next_param will
3666            make copy of a current  parameter */
3667         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3668         if (buf == NULL)
3669                 GOTO(out_up, rc = -ENOMEM);
3670         params = mti->mti_params;
3671         while (params != NULL) {
3672                 rc = class_get_next_param(&params, buf);
3673                 if (rc) {
3674                         if (rc == 1)
3675                                 /* there is no next parameter, that is
3676                                    not an error */
3677                                 rc = 0;
3678                         break;
3679                 }
3680                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3681                        params, buf);
3682                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3683                 if (rc)
3684                         break;
3685         }
3686
3687         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3688
3689 out_up:
3690         mutex_unlock(&fsdb->fsdb_mutex);
3691         RETURN(rc);
3692 }
3693
3694 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3695 {
3696         struct llog_ctxt        *ctxt;
3697         int                      rc = 0;
3698
3699         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3700         if (ctxt == NULL) {
3701                 CERROR("%s: MGS config context doesn't exist\n",
3702                        mgs->mgs_obd->obd_name);
3703                 rc = -ENODEV;
3704         } else {
3705                 rc = llog_erase(env, ctxt, NULL, name);
3706                 /* llog may not exist */
3707                 if (rc == -ENOENT)
3708                         rc = 0;
3709                 llog_ctxt_put(ctxt);
3710         }
3711
3712         if (rc)
3713                 CERROR("%s: failed to clear log %s: %d\n",
3714                        mgs->mgs_obd->obd_name, name, rc);
3715
3716         return rc;
3717 }
3718
3719 /* erase all logs for the given fs */
3720 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3721 {
3722         struct fs_db *fsdb;
3723         cfs_list_t list;
3724         struct mgs_direntry *dirent, *n;
3725         int rc, len = strlen(fsname);
3726         char *suffix;
3727         ENTRY;
3728
3729         /* Find all the logs in the CONFIGS directory */
3730         rc = class_dentry_readdir(env, mgs, &list);
3731         if (rc)
3732                 RETURN(rc);
3733
3734         mutex_lock(&mgs->mgs_mutex);
3735
3736         /* Delete the fs db */
3737         fsdb = mgs_find_fsdb(mgs, fsname);
3738         if (fsdb)
3739                 mgs_free_fsdb(mgs, fsdb);
3740
3741         mutex_unlock(&mgs->mgs_mutex);
3742
3743         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3744                 cfs_list_del(&dirent->list);
3745                 suffix = strrchr(dirent->name, '-');
3746                 if (suffix != NULL) {
3747                         if ((len == suffix - dirent->name) &&
3748                             (strncmp(fsname, dirent->name, len) == 0)) {
3749                                 CDEBUG(D_MGS, "Removing log %s\n",
3750                                        dirent->name);
3751                                 mgs_erase_log(env, mgs, dirent->name);
3752                         }
3753                 }
3754                 mgs_direntry_free(dirent);
3755         }
3756
3757         RETURN(rc);
3758 }
3759
3760 /* from llog_swab */
3761 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3762 {
3763         int i;
3764         ENTRY;
3765
3766         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3767         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3768
3769         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3770         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3771         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3772         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3773
3774         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3775         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3776                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3777                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3778                                i, lcfg->lcfg_buflens[i],
3779                                lustre_cfg_string(lcfg, i));
3780                 }
3781         EXIT;
3782 }
3783
3784 /* Set a permanent (config log) param for a target or fs
3785  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3786  *             buf1 contains the single parameter
3787  */
3788 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3789                  struct lustre_cfg *lcfg, char *fsname)
3790 {
3791         struct fs_db *fsdb;
3792         struct mgs_target_info *mti;
3793         char *devname, *param;
3794         char *ptr;
3795         const char *tmp;
3796         __u32 index;
3797         int rc = 0;
3798         ENTRY;
3799
3800         print_lustre_cfg(lcfg);
3801
3802         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3803         devname = lustre_cfg_string(lcfg, 0);
3804         param = lustre_cfg_string(lcfg, 1);
3805         if (!devname) {
3806                 /* Assume device name embedded in param:
3807                    lustre-OST0000.osc.max_dirty_mb=32 */
3808                 ptr = strchr(param, '.');
3809                 if (ptr) {
3810                         devname = param;
3811                         *ptr = 0;
3812                         param = ptr + 1;
3813                 }
3814         }
3815         if (!devname) {
3816                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3817                 RETURN(-ENOSYS);
3818         }
3819
3820         rc = mgs_parse_devname(devname, fsname, NULL);
3821         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3822                 /* param related to llite isn't allowed to set by OST or MDT */
3823                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3824                                    sizeof(PARAM_LLITE)) == 0)
3825                         RETURN(-EINVAL);
3826         } else {
3827                 /* assume devname is the fsname */
3828                 memset(fsname, 0, MTI_NAME_MAXLEN);
3829                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3830                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3831         }
3832         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3833
3834         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3835         if (rc)
3836                 RETURN(rc);
3837         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3838             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3839                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3840                        "is '%s'\n", fsname, devname);
3841                 mgs_free_fsdb(mgs, fsdb);
3842                 RETURN(-EINVAL);
3843         }
3844
3845         /* Create a fake mti to hold everything */
3846         OBD_ALLOC_PTR(mti);
3847         if (!mti)
3848                 GOTO(out, rc = -ENOMEM);
3849         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3850             >= sizeof(mti->mti_fsname))
3851                 GOTO(out, rc = -E2BIG);
3852         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3853             >= sizeof(mti->mti_svname))
3854                 GOTO(out, rc = -E2BIG);
3855         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3856             >= sizeof(mti->mti_params))
3857                 GOTO(out, rc = -E2BIG);
3858         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3859         if (rc < 0)
3860                 /* Not a valid server; may be only fsname */
3861                 rc = 0;
3862         else
3863                 /* Strip -osc or -mdc suffix from svname */
3864                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3865                                      mti->mti_svname))
3866                         GOTO(out, rc = -EINVAL);
3867
3868         mti->mti_flags = rc | LDD_F_PARAM;
3869
3870         mutex_lock(&fsdb->fsdb_mutex);
3871         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3872         mutex_unlock(&fsdb->fsdb_mutex);
3873
3874         /*
3875          * Revoke lock so everyone updates.  Should be alright if
3876          * someone was already reading while we were updating the logs,
3877          * so we don't really need to hold the lock while we're
3878          * writing (above).
3879          */
3880         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3881 out:
3882         OBD_FREE_PTR(mti);
3883         RETURN(rc);
3884 }
3885
3886 static int mgs_write_log_pool(const struct lu_env *env,
3887                               struct mgs_device *mgs, char *logname,
3888                               struct fs_db *fsdb, char *lovname,
3889                               enum lcfg_command_type cmd,
3890                               char *poolname, char *fsname,
3891                               char *ostname, char *comment)
3892 {
3893         struct llog_handle *llh = NULL;
3894         int rc;
3895
3896         rc = record_start_log(env, mgs, &llh, logname);
3897         if (rc)
3898                 return rc;
3899         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3900         if (rc)
3901                 goto out;
3902         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3903         if (rc)
3904                 goto out;
3905         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3906 out:
3907         record_end_log(env, &llh);
3908         return rc;
3909 }
3910
3911 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3912                  enum lcfg_command_type cmd, char *fsname,
3913                  char *poolname, char *ostname)
3914 {
3915         struct fs_db *fsdb;
3916         char *lovname;
3917         char *logname;
3918         char *label = NULL, *canceled_label = NULL;
3919         int label_sz;
3920         struct mgs_target_info *mti = NULL;
3921         int rc, i;
3922         ENTRY;
3923
3924         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3925         if (rc) {
3926                 CERROR("Can't get db for %s\n", fsname);
3927                 RETURN(rc);
3928         }
3929         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3930                 CERROR("%s is not defined\n", fsname);
3931                 mgs_free_fsdb(mgs, fsdb);
3932                 RETURN(-EINVAL);
3933         }
3934
3935         label_sz = 10 + strlen(fsname) + strlen(poolname);
3936
3937         /* check if ostname match fsname */
3938         if (ostname != NULL) {
3939                 char *ptr;
3940
3941                 ptr = strrchr(ostname, '-');
3942                 if ((ptr == NULL) ||
3943                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3944                         RETURN(-EINVAL);
3945                 label_sz += strlen(ostname);
3946         }
3947
3948         OBD_ALLOC(label, label_sz);
3949         if (label == NULL)
3950                 RETURN(-ENOMEM);
3951
3952         switch(cmd) {
3953         case LCFG_POOL_NEW:
3954                 sprintf(label,
3955                         "new %s.%s", fsname, poolname);
3956                 break;
3957         case LCFG_POOL_ADD:
3958                 sprintf(label,
3959                         "add %s.%s.%s", fsname, poolname, ostname);
3960                 break;
3961         case LCFG_POOL_REM:
3962                 OBD_ALLOC(canceled_label, label_sz);
3963                 if (canceled_label == NULL)
3964                         GOTO(out_label, rc = -ENOMEM);
3965                 sprintf(label,
3966                         "rem %s.%s.%s", fsname, poolname, ostname);
3967                 sprintf(canceled_label,
3968                         "add %s.%s.%s", fsname, poolname, ostname);
3969                 break;
3970         case LCFG_POOL_DEL:
3971                 OBD_ALLOC(canceled_label, label_sz);
3972                 if (canceled_label == NULL)
3973                         GOTO(out_label, rc = -ENOMEM);
3974                 sprintf(label,
3975                         "del %s.%s", fsname, poolname);
3976                 sprintf(canceled_label,
3977                         "new %s.%s", fsname, poolname);
3978                 break;
3979         default:
3980                 break;
3981         }
3982
3983         if (canceled_label != NULL) {
3984                 OBD_ALLOC_PTR(mti);
3985                 if (mti == NULL)
3986                         GOTO(out_cancel, rc = -ENOMEM);
3987         }
3988
3989         mutex_lock(&fsdb->fsdb_mutex);
3990         /* write pool def to all MDT logs */
3991         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3992                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3993                         rc = name_create_mdt_and_lov(&logname, &lovname,
3994                                                      fsdb, i);
3995                         if (rc) {
3996                                 mutex_unlock(&fsdb->fsdb_mutex);
3997                                 GOTO(out_mti, rc);
3998                         }
3999                         if (canceled_label != NULL) {
4000                                 strcpy(mti->mti_svname, "lov pool");
4001                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4002                                                 lovname, canceled_label,
4003                                                 CM_SKIP);
4004                         }
4005
4006                         if (rc >= 0)
4007                                 rc = mgs_write_log_pool(env, mgs, logname,
4008                                                         fsdb, lovname, cmd,
4009                                                         fsname, poolname,
4010                                                         ostname, label);
4011                         name_destroy(&logname);
4012                         name_destroy(&lovname);
4013                         if (rc) {
4014                                 mutex_unlock(&fsdb->fsdb_mutex);
4015                                 GOTO(out_mti, rc);
4016                         }
4017                 }
4018         }
4019
4020         rc = name_create(&logname, fsname, "-client");
4021         if (rc) {
4022                 mutex_unlock(&fsdb->fsdb_mutex);
4023                 GOTO(out_mti, rc);
4024         }
4025         if (canceled_label != NULL) {
4026                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4027                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4028                 if (rc < 0) {
4029                         mutex_unlock(&fsdb->fsdb_mutex);
4030                         name_destroy(&logname);
4031                         GOTO(out_mti, rc);
4032                 }
4033         }
4034
4035         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4036                                 cmd, fsname, poolname, ostname, label);
4037         mutex_unlock(&fsdb->fsdb_mutex);
4038         name_destroy(&logname);
4039         /* request for update */
4040         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4041
4042         EXIT;
4043 out_mti:
4044         if (mti != NULL)
4045                 OBD_FREE_PTR(mti);
4046 out_cancel:
4047         if (canceled_label != NULL)
4048                 OBD_FREE(canceled_label, label_sz);
4049 out_label:
4050         OBD_FREE(label, label_sz);
4051         return rc;
4052 }
4053
4054