Whamcloud - gitweb
LU-2449 osd: osd-zfs to initialize parent attribute
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         mutex_init(&fsdb->fsdb_mutex);
349         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551         } else {
552                 GOTO(out_up, rc = -EINVAL);
553         }
554
555         if (mti->mti_flags & LDD_F_NEED_INDEX) {
556                 rc = next_index(imap, INDEX_MAP_SIZE);
557                 if (rc == -1)
558                         GOTO(out_up, rc = -ERANGE);
559                 mti->mti_stripe_index = rc;
560                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
561                         fsdb->fsdb_mdt_count ++;
562         }
563
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
566                                    "but the max index is %d.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 /* Header and tail are added back to lrh_len in
648                    llog_lvfs_write_rec */
649                 rec->lrh_len = cfg_len;
650                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
651                                 rec->lrh_index);
652                 if (!rc)
653                          mml->mml_modified++;
654         }
655
656         RETURN(rc);
657 }
658
659 /**
660  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
661  * Return code:
662  * 0 - modified successfully,
663  * 1 - no modification was done
664  * negative - error
665  */
666 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
667                       struct fs_db *fsdb, struct mgs_target_info *mti,
668                       char *logname, char *devname, char *comment, int flags)
669 {
670         struct llog_handle *loghandle;
671         struct llog_ctxt *ctxt;
672         struct mgs_modify_lookup *mml;
673         int rc;
674
675         ENTRY;
676
677         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
678         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
679                flags);
680
681         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
682         LASSERT(ctxt != NULL);
683         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
684         if (rc < 0) {
685                 if (rc == -ENOENT)
686                         rc = 0;
687                 GOTO(out_pop, rc);
688         }
689
690         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
691         if (rc)
692                 GOTO(out_close, rc);
693
694         if (llog_get_size(loghandle) <= 1)
695                 GOTO(out_close, rc = 0);
696
697         OBD_ALLOC_PTR(mml);
698         if (!mml)
699                 GOTO(out_close, rc = -ENOMEM);
700         strcpy(mml->mml_marker.cm_comment, comment);
701         strcpy(mml->mml_marker.cm_tgtname, devname);
702         /* Modify mostly means cancel */
703         mml->mml_marker.cm_flags = flags;
704         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
705         mml->mml_modified = 0;
706         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
707                           NULL);
708         if (!rc && !mml->mml_modified)
709                 rc = 1;
710         OBD_FREE_PTR(mml);
711
712 out_close:
713         llog_close(env, loghandle);
714 out_pop:
715         if (rc < 0)
716                 CERROR("%s: modify %s/%s failed: rc = %d\n",
717                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
718         llog_ctxt_put(ctxt);
719         RETURN(rc);
720 }
721
722 /** This structure is passed to mgs_replace_handler */
723 struct mgs_replace_uuid_lookup {
724         /* Nids are replaced for this target device */
725         struct mgs_target_info target;
726         /* Temporary modified llog */
727         struct llog_handle *temp_llh;
728         /* Flag is set if in target block*/
729         int in_target_device;
730         /* Nids already added. Just skip (multiple nids) */
731         int device_nids_added;
732         /* Flag is set if this block should not be copied */
733         int skip_it;
734 };
735
736 /**
737  * Check: a) if block should be skipped
738  * b) is it target block
739  *
740  * \param[in] lcfg
741  * \param[in] mrul
742  *
743  * \retval 0 should not to be skipped
744  * \retval 1 should to be skipped
745  */
746 static int check_markers(struct lustre_cfg *lcfg,
747                          struct mgs_replace_uuid_lookup *mrul)
748 {
749          struct cfg_marker *marker;
750
751         /* Track markers. Find given device */
752         if (lcfg->lcfg_command == LCFG_MARKER) {
753                 marker = lustre_cfg_buf(lcfg, 1);
754                 /* Clean llog from records marked as CM_EXCLUDE.
755                    CM_SKIP records are used for "active" command
756                    and can be restored if needed */
757                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
758                     (CM_EXCLUDE | CM_START)) {
759                         mrul->skip_it = 1;
760                         return 1;
761                 }
762
763                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
764                     (CM_EXCLUDE | CM_END)) {
765                         mrul->skip_it = 0;
766                         return 1;
767                 }
768
769                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
770                         LASSERT(!(marker->cm_flags & CM_START) ||
771                                 !(marker->cm_flags & CM_END));
772                         if (marker->cm_flags & CM_START) {
773                                 mrul->in_target_device = 1;
774                                 mrul->device_nids_added = 0;
775                         } else if (marker->cm_flags & CM_END)
776                                 mrul->in_target_device = 0;
777                 }
778         }
779
780         return 0;
781 }
782
783 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
784                        struct lustre_cfg *lcfg)
785 {
786         struct llog_rec_hdr      rec;
787         int                      buflen, rc;
788
789         if (!lcfg || !llh)
790                 return -ENOMEM;
791
792         LASSERT(llh->lgh_ctxt);
793
794         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
795                                 lcfg->lcfg_buflens);
796         rec.lrh_len = llog_data_len(buflen);
797         rec.lrh_type = OBD_CFG_REC;
798
799         /* idx = -1 means append */
800         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
801         if (rc)
802                 CERROR("failed %d\n", rc);
803         return rc;
804 }
805
806 static int record_base(const struct lu_env *env, struct llog_handle *llh,
807                      char *cfgname, lnet_nid_t nid, int cmd,
808                      char *s1, char *s2, char *s3, char *s4)
809 {
810         struct mgs_thread_info *mgi = mgs_env_info(env);
811         struct lustre_cfg     *lcfg;
812         int rc;
813
814         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
815                cmd, s1, s2, s3, s4);
816
817         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
818         if (s1)
819                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
820         if (s2)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
822         if (s3)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
824         if (s4)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
826
827         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
828         if (!lcfg)
829                 return -ENOMEM;
830         lcfg->lcfg_nid = nid;
831
832         rc = record_lcfg(env, llh, lcfg);
833
834         lustre_cfg_free(lcfg);
835
836         if (rc) {
837                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
838                        cmd, s1, s2, s3, s4);
839         }
840         return rc;
841 }
842
843 static inline int record_add_uuid(const struct lu_env *env,
844                                   struct llog_handle *llh,
845                                   uint64_t nid, char *uuid)
846 {
847         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
848 }
849
850 static inline int record_add_conn(const struct lu_env *env,
851                                   struct llog_handle *llh,
852                                   char *devname, char *uuid)
853 {
854         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
855 }
856
857 static inline int record_attach(const struct lu_env *env,
858                                 struct llog_handle *llh, char *devname,
859                                 char *type, char *uuid)
860 {
861         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
862 }
863
864 static inline int record_setup(const struct lu_env *env,
865                                struct llog_handle *llh, char *devname,
866                                char *s1, char *s2, char *s3, char *s4)
867 {
868         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
869 }
870
871 /**
872  * \retval <0 record processing error
873  * \retval n record is processed. No need copy original one.
874  * \retval 0 record is not processed.
875  */
876 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
877                            struct mgs_replace_uuid_lookup *mrul)
878 {
879         int nids_added = 0;
880         lnet_nid_t nid;
881         char *ptr;
882         int rc;
883
884         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
885                 /* LCFG_ADD_UUID command found. Let's skip original command
886                    and add passed nids */
887                 ptr = mrul->target.mti_params;
888                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
889                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
890                                "device %s\n", libcfs_nid2str(nid),
891                                 mrul->target.mti_params,
892                                 mrul->target.mti_svname);
893                         rc = record_add_uuid(env,
894                                              mrul->temp_llh, nid,
895                                              mrul->target.mti_params);
896                         if (!rc)
897                                 nids_added++;
898                 }
899
900                 if (nids_added == 0) {
901                         CERROR("No new nids were added, nid %s with uuid %s, "
902                                "device %s\n", libcfs_nid2str(nid),
903                                mrul->target.mti_params,
904                                mrul->target.mti_svname);
905                         RETURN(-ENXIO);
906                 } else {
907                         mrul->device_nids_added = 1;
908                 }
909
910                 return nids_added;
911         }
912
913         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
914                 /* LCFG_SETUP command found. UUID should be changed */
915                 rc = record_setup(env,
916                                   mrul->temp_llh,
917                                   /* devname the same */
918                                   lustre_cfg_string(lcfg, 0),
919                                   /* s1 is not changed */
920                                   lustre_cfg_string(lcfg, 1),
921                                   /* new uuid should be
922                                   the full nidlist */
923                                   mrul->target.mti_params,
924                                   /* s3 is not changed */
925                                   lustre_cfg_string(lcfg, 3),
926                                   /* s4 is not changed */
927                                   lustre_cfg_string(lcfg, 4));
928                 return rc ? rc : 1;
929         }
930
931         /* Another commands in target device block */
932         return 0;
933 }
934
935 /**
936  * Handler that called for every record in llog.
937  * Records are processed in order they placed in llog.
938  *
939  * \param[in] llh       log to be processed
940  * \param[in] rec       current record
941  * \param[in] data      mgs_replace_uuid_lookup structure
942  *
943  * \retval 0    success
944  */
945 static int mgs_replace_handler(const struct lu_env *env,
946                                struct llog_handle *llh,
947                                struct llog_rec_hdr *rec,
948                                void *data)
949 {
950         struct llog_rec_hdr local_rec = *rec;
951         struct mgs_replace_uuid_lookup *mrul;
952         struct lustre_cfg *lcfg = REC_DATA(rec);
953         int cfg_len = REC_DATA_LEN(rec);
954         int rc;
955         ENTRY;
956
957         mrul = (struct mgs_replace_uuid_lookup *)data;
958
959         if (rec->lrh_type != OBD_CFG_REC) {
960                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
961                        rec->lrh_type, lcfg->lcfg_command,
962                        lustre_cfg_string(lcfg, 0),
963                        lustre_cfg_string(lcfg, 1));
964                 RETURN(-EINVAL);
965         }
966
967         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
968         if (rc) {
969                 /* Do not copy any invalidated records */
970                 GOTO(skip_out, rc = 0);
971         }
972
973         rc = check_markers(lcfg, mrul);
974         if (rc || mrul->skip_it)
975                 GOTO(skip_out, rc = 0);
976
977         /* Write to new log all commands outside target device block */
978         if (!mrul->in_target_device)
979                 GOTO(copy_out, rc = 0);
980
981         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
982            (failover nids) for this target, assuming that if then
983            primary is changing then so is the failover */
984         if (mrul->device_nids_added &&
985             (lcfg->lcfg_command == LCFG_ADD_UUID ||
986              lcfg->lcfg_command == LCFG_ADD_CONN))
987                 GOTO(skip_out, rc = 0);
988
989         rc = process_command(env, lcfg, mrul);
990         if (rc < 0)
991                 RETURN(rc);
992
993         if (rc)
994                 RETURN(0);
995 copy_out:
996         /* Record is placed in temporary llog as is */
997         local_rec.lrh_len -= sizeof(*rec) + sizeof(struct llog_rec_tail);
998         rc = llog_write(env, mrul->temp_llh, &local_rec, NULL, 0,
999                         (void *)lcfg, -1);
1000
1001         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1002                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1003                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1004         RETURN(rc);
1005
1006 skip_out:
1007         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1008                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1009                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1010         RETURN(rc);
1011 }
1012
1013 static int mgs_backup_llog(const struct lu_env *env,
1014                            struct obd_device *mgs,
1015                            char *fsname, char *backup)
1016 {
1017         struct obd_uuid *uuid;
1018         struct llog_handle *orig_llh, *bak_llh;
1019         struct llog_ctxt *lctxt;
1020         int rc, rc2;
1021         ENTRY;
1022
1023         lctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1024         if (!lctxt) {
1025                 CERROR("%s: missing llog context\n", mgs->obd_name);
1026                 GOTO(out, rc = -EINVAL);
1027         }
1028
1029         /* Make sure there's no old backup log */
1030         rc = llog_erase(env, lctxt, NULL, backup);
1031         if (rc < 0 && rc != -ENOENT)
1032                 GOTO(out_put, rc);
1033
1034         /* open backup log */
1035         rc = llog_open_create(env, lctxt, &bak_llh, NULL, backup);
1036         if (rc) {
1037                 CERROR("%s: backup logfile open %s: rc = %d\n",
1038                        mgs->obd_name, backup, rc);
1039                 GOTO(out_put, rc);
1040         }
1041
1042         /* set the log header uuid */
1043         OBD_ALLOC_PTR(uuid);
1044         if (uuid == NULL)
1045                 GOTO(out_put, rc = -ENOMEM);
1046         obd_str2uuid(uuid, backup);
1047         rc = llog_init_handle(env, bak_llh, LLOG_F_IS_PLAIN, uuid);
1048         OBD_FREE_PTR(uuid);
1049         if (rc)
1050                 GOTO(out_close1, rc);
1051
1052         /* open original log */
1053         rc = llog_open(env, lctxt, &orig_llh, NULL, fsname,
1054                        LLOG_OPEN_EXISTS);
1055         if (rc < 0) {
1056                 if (rc == -ENOENT)
1057                         rc = 0;
1058                 GOTO(out_close1, rc);
1059         }
1060
1061         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1062         if (rc)
1063                 GOTO(out_close2, rc);
1064
1065         /* Copy remote log */
1066         rc = llog_process(env, orig_llh, llog_copy_handler,
1067                           (void *)bak_llh, NULL);
1068
1069 out_close2:
1070         rc2 = llog_close(env, orig_llh);
1071         if (!rc)
1072                 rc = rc2;
1073 out_close1:
1074         rc2 = llog_close(env, bak_llh);
1075         if (!rc)
1076                 rc = rc2;
1077 out_put:
1078         if (lctxt)
1079                 llog_ctxt_put(lctxt);
1080 out:
1081         if (rc)
1082                 CERROR("%s: Failed to backup log %s: rc = %d\n",
1083                        mgs->obd_name, fsname, rc);
1084         RETURN(rc);
1085 }
1086
1087 static int mgs_log_is_empty(const struct lu_env *env, struct mgs_device *mgs,
1088                             char *name);
1089
1090 static int mgs_replace_nids_log(const struct lu_env *env,
1091                                 struct obd_device *mgs, struct fs_db *fsdb,
1092                                 char *logname, char *devname, char *nids)
1093 {
1094         struct llog_handle *orig_llh, *backup_llh;
1095         struct llog_ctxt *ctxt;
1096         struct mgs_replace_uuid_lookup *mrul;
1097         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1098         char *backup;
1099         int rc, rc2;
1100         ENTRY;
1101
1102         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1103
1104         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1105         LASSERT(ctxt != NULL);
1106
1107         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1108                 /* Log is empty. Nothing to replace */
1109                 GOTO(out_put, rc = 0);
1110         }
1111
1112         OBD_ALLOC(backup, strlen(logname) + 5);
1113         if (backup == NULL)
1114                 GOTO(out_put, rc = -ENOMEM);
1115
1116         sprintf(backup, "%s.bak", logname);
1117
1118         rc = mgs_backup_llog(env, mgs, logname, backup);
1119         if (rc < 0) {
1120                 CERROR("%s: can't make backup for %s: rc = %d\n",
1121                        mgs->obd_name, logname, rc);
1122                 GOTO(out_free,rc);
1123         }
1124
1125         /* Now erase original log file. Connections are not allowed.
1126            Backup is already saved */
1127         rc = llog_erase(env, ctxt, NULL, logname);
1128         if (rc < 0 && rc != -ENOENT)
1129                 GOTO(out_free, rc);
1130
1131         /* open local log */
1132         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1133         if (rc)
1134                 GOTO(out_restore, rc);
1135
1136         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1137         if (rc)
1138                 GOTO(out_closel, rc);
1139
1140         /* open backup llog */
1141         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1142                        LLOG_OPEN_EXISTS);
1143         if (rc)
1144                 GOTO(out_closel, rc);
1145
1146         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1147         if (rc)
1148                 GOTO(out_close, rc);
1149
1150         if (llog_get_size(backup_llh) <= 1)
1151                 GOTO(out_close, rc = 0);
1152
1153         OBD_ALLOC_PTR(mrul);
1154         if (!mrul)
1155                 GOTO(out_close, rc = -ENOMEM);
1156         /* devname is only needed information to replace UUID records */
1157         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1158         /* parse nids later */
1159         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1160         /* Copy records to this temporary llog */
1161         mrul->temp_llh = orig_llh;
1162
1163         rc = llog_process(env, backup_llh, mgs_replace_handler,
1164                           (void *)mrul, NULL);
1165         OBD_FREE_PTR(mrul);
1166 out_close:
1167         rc2 = llog_close(NULL, backup_llh);
1168         if (!rc)
1169                 rc = rc2;
1170 out_closel:
1171         rc2 = llog_close(NULL, orig_llh);
1172         if (!rc)
1173                 rc = rc2;
1174
1175 out_restore:
1176         if (rc) {
1177                 CERROR("%s: llog should be restored: rc = %d\n",
1178                        mgs->obd_name, rc);
1179                 rc2 = mgs_backup_llog(env, mgs, backup, logname);
1180                 if (rc2 < 0)
1181                         CERROR("%s: can't restore backup %s: rc = %d\n",
1182                                mgs->obd_name, logname, rc2);
1183         }
1184
1185 out_free:
1186         OBD_FREE(backup, strlen(backup) + 5);
1187
1188 out_put:
1189         llog_ctxt_put(ctxt);
1190
1191         if (rc)
1192                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1193                        mgs->obd_name, logname, rc);
1194
1195         RETURN(rc);
1196 }
1197
1198 /**
1199  * Parse device name and get file system name and/or device index
1200  *
1201  * \param[in]   devname device name (ex. lustre-MDT0000)
1202  * \param[out]  fsname  file system name(optional)
1203  * \param[out]  index   device index(optional)
1204  *
1205  * \retval 0    success
1206  */
1207 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1208 {
1209         char *ptr;
1210         ENTRY;
1211
1212         /* Extract fsname */
1213         ptr = strrchr(devname, '-');
1214
1215         if (fsname) {
1216                 if (!ptr) {
1217                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1218                                devname);
1219                         RETURN(-EINVAL);
1220                 }
1221                 memset(fsname, 0, MTI_NAME_MAXLEN);
1222                 strncpy(fsname, devname, ptr - devname);
1223                 fsname[MTI_NAME_MAXLEN - 1] = 0;
1224         }
1225
1226         if (index) {
1227                 if (server_name2index(ptr, index, NULL) < 0) {
1228                         CDEBUG(D_MGS, "Device name with wrong index\n");
1229                         RETURN(-EINVAL);
1230                 }
1231         }
1232
1233         RETURN(0);
1234 }
1235
1236 static int only_mgs_is_running(struct obd_device *mgs_obd)
1237 {
1238         /* TDB: Is global variable with devices count exists? */
1239         int num_devices = get_devices_count();
1240         /* osd, MGS and MGC + self_export
1241            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1242         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1243 }
1244
1245 static int name_create_mdt(char **logname, char *fsname, int i)
1246 {
1247         char mdt_index[9];
1248
1249         sprintf(mdt_index, "-MDT%04x", i);
1250         return name_create(logname, fsname, mdt_index);
1251 }
1252
1253 /**
1254  * Replace nids for \a device to \a nids values
1255  *
1256  * \param obd           MGS obd device
1257  * \param devname       nids need to be replaced for this device
1258  * (ex. lustre-OST0000)
1259  * \param nids          nids list (ex. nid1,nid2,nid3)
1260  *
1261  * \retval 0    success
1262  */
1263 int mgs_replace_nids(const struct lu_env *env,
1264                      struct mgs_device *mgs,
1265                      char *devname, char *nids)
1266 {
1267         /* Assume fsname is part of device name */
1268         char fsname[MTI_NAME_MAXLEN];
1269         int rc;
1270         __u32 index;
1271         char *logname;
1272         struct fs_db *fsdb;
1273         unsigned int i;
1274         int conn_state;
1275         struct obd_device *mgs_obd = mgs->mgs_obd;
1276         ENTRY;
1277
1278         /* We can only change NIDs if no other nodes are connected */
1279         spin_lock(&mgs_obd->obd_dev_lock);
1280         conn_state = mgs_obd->obd_no_conn;
1281         mgs_obd->obd_no_conn = 1;
1282         spin_unlock(&mgs_obd->obd_dev_lock);
1283
1284         /* We can not change nids if not only MGS is started */
1285         if (!only_mgs_is_running(mgs_obd)) {
1286                 CERROR("Only MGS is allowed to be started\n");
1287                 GOTO(out, rc = -EINPROGRESS);
1288         }
1289
1290         /* Get fsname and index*/
1291         rc = mgs_parse_devname(devname, fsname, &index);
1292         if (rc)
1293                 GOTO(out, rc);
1294
1295         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1296         if (rc) {
1297                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1298                 GOTO(out, rc);
1299         }
1300
1301         /* Process client llogs */
1302         name_create(&logname, fsname, "-client");
1303         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1304         name_destroy(&logname);
1305         if (rc) {
1306                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1307                        fsname, devname, rc);
1308                 GOTO(out, rc);
1309         }
1310
1311         /* Process MDT llogs */
1312         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1313                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1314                         continue;
1315                 name_create_mdt(&logname, fsname, i);
1316                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1317                 name_destroy(&logname);
1318                 if (rc)
1319                         GOTO(out, rc);
1320         }
1321
1322 out:
1323         spin_lock(&mgs_obd->obd_dev_lock);
1324         mgs_obd->obd_no_conn = conn_state;
1325         spin_unlock(&mgs_obd->obd_dev_lock);
1326
1327         RETURN(rc);
1328 }
1329
1330 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1331                             char *devname, struct lov_desc *desc)
1332 {
1333         struct mgs_thread_info *mgi = mgs_env_info(env);
1334         struct lustre_cfg *lcfg;
1335         int rc;
1336
1337         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1338         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1339         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1340         if (!lcfg)
1341                 return -ENOMEM;
1342         rc = record_lcfg(env, llh, lcfg);
1343
1344         lustre_cfg_free(lcfg);
1345         return rc;
1346 }
1347
1348 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1349                             char *devname, struct lmv_desc *desc)
1350 {
1351         struct mgs_thread_info *mgi = mgs_env_info(env);
1352         struct lustre_cfg *lcfg;
1353         int rc;
1354
1355         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1356         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1357         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1358
1359         rc = record_lcfg(env, llh, lcfg);
1360
1361         lustre_cfg_free(lcfg);
1362         return rc;
1363 }
1364
1365 static inline int record_mdc_add(const struct lu_env *env,
1366                                  struct llog_handle *llh,
1367                                  char *logname, char *mdcuuid,
1368                                  char *mdtuuid, char *index,
1369                                  char *gen)
1370 {
1371         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1372                            mdtuuid,index,gen,mdcuuid);
1373 }
1374
1375 static inline int record_lov_add(const struct lu_env *env,
1376                                  struct llog_handle *llh,
1377                                  char *lov_name, char *ost_uuid,
1378                                  char *index, char *gen)
1379 {
1380         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1381                            ost_uuid, index, gen, 0);
1382 }
1383
1384 static inline int record_mount_opt(const struct lu_env *env,
1385                                    struct llog_handle *llh,
1386                                    char *profile, char *lov_name,
1387                                    char *mdc_name)
1388 {
1389         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1390                            profile,lov_name,mdc_name,0);
1391 }
1392
1393 static int record_marker(const struct lu_env *env,
1394                          struct llog_handle *llh,
1395                          struct fs_db *fsdb, __u32 flags,
1396                          char *tgtname, char *comment)
1397 {
1398         struct mgs_thread_info *mgi = mgs_env_info(env);
1399         struct lustre_cfg *lcfg;
1400         int rc;
1401         int cplen = 0;
1402
1403         if (flags & CM_START)
1404                 fsdb->fsdb_gen++;
1405         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1406         mgi->mgi_marker.cm_flags = flags;
1407         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1408         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1409                         sizeof(mgi->mgi_marker.cm_tgtname));
1410         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1411                 return -E2BIG;
1412         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1413                         sizeof(mgi->mgi_marker.cm_comment));
1414         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1415                 return -E2BIG;
1416         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1417         mgi->mgi_marker.cm_canceltime = 0;
1418         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1419         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1420                             sizeof(mgi->mgi_marker));
1421         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1422         if (!lcfg)
1423                 return -ENOMEM;
1424         rc = record_lcfg(env, llh, lcfg);
1425
1426         lustre_cfg_free(lcfg);
1427         return rc;
1428 }
1429
1430 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1431                             struct llog_handle **llh, char *name)
1432 {
1433         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1434         struct llog_ctxt        *ctxt;
1435         int                      rc = 0;
1436
1437         if (*llh)
1438                 GOTO(out, rc = -EBUSY);
1439
1440         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1441         if (!ctxt)
1442                 GOTO(out, rc = -ENODEV);
1443         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1444
1445         rc = llog_open_create(env, ctxt, llh, NULL, name);
1446         if (rc)
1447                 GOTO(out_ctxt, rc);
1448         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1449         if (rc)
1450                 llog_close(env, *llh);
1451 out_ctxt:
1452         llog_ctxt_put(ctxt);
1453 out:
1454         if (rc) {
1455                 CERROR("%s: can't start log %s: rc = %d\n",
1456                        mgs->mgs_obd->obd_name, name, rc);
1457                 *llh = NULL;
1458         }
1459         RETURN(rc);
1460 }
1461
1462 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1463 {
1464         int rc;
1465
1466         rc = llog_close(env, *llh);
1467         *llh = NULL;
1468
1469         return rc;
1470 }
1471
1472 static int mgs_log_is_empty(const struct lu_env *env,
1473                             struct mgs_device *mgs, char *name)
1474 {
1475         struct llog_handle *llh;
1476         struct llog_ctxt *ctxt;
1477         int rc = 0;
1478
1479         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1480         LASSERT(ctxt != NULL);
1481         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1482         if (rc < 0) {
1483                 if (rc == -ENOENT)
1484                         rc = 0;
1485                 GOTO(out_ctxt, rc);
1486         }
1487
1488         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1489         if (rc)
1490                 GOTO(out_close, rc);
1491         rc = llog_get_size(llh);
1492
1493 out_close:
1494         llog_close(env, llh);
1495 out_ctxt:
1496         llog_ctxt_put(ctxt);
1497         /* header is record 1 */
1498         return (rc <= 1);
1499 }
1500
1501 /******************** config "macros" *********************/
1502
1503 /* write an lcfg directly into a log (with markers) */
1504 static int mgs_write_log_direct(const struct lu_env *env,
1505                                 struct mgs_device *mgs, struct fs_db *fsdb,
1506                                 char *logname, struct lustre_cfg *lcfg,
1507                                 char *devname, char *comment)
1508 {
1509         struct llog_handle *llh = NULL;
1510         int rc;
1511         ENTRY;
1512
1513         if (!lcfg)
1514                 RETURN(-ENOMEM);
1515
1516         rc = record_start_log(env, mgs, &llh, logname);
1517         if (rc)
1518                 RETURN(rc);
1519
1520         /* FIXME These should be a single journal transaction */
1521         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1522         if (rc)
1523                 GOTO(out_end, rc);
1524         rc = record_lcfg(env, llh, lcfg);
1525         if (rc)
1526                 GOTO(out_end, rc);
1527         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1528         if (rc)
1529                 GOTO(out_end, rc);
1530 out_end:
1531         record_end_log(env, &llh);
1532         RETURN(rc);
1533 }
1534
1535 /* write the lcfg in all logs for the given fs */
1536 int mgs_write_log_direct_all(const struct lu_env *env,
1537                              struct mgs_device *mgs,
1538                              struct fs_db *fsdb,
1539                              struct mgs_target_info *mti,
1540                              struct lustre_cfg *lcfg,
1541                              char *devname, char *comment,
1542                              int server_only)
1543 {
1544         cfs_list_t list;
1545         struct mgs_direntry *dirent, *n;
1546         char *fsname = mti->mti_fsname;
1547         char *logname;
1548         int rc = 0, len = strlen(fsname);
1549         ENTRY;
1550
1551         /* We need to set params for any future logs
1552            as well. FIXME Append this file to every new log.
1553            Actually, we should store as params (text), not llogs.  Or
1554            in a database. */
1555         rc = name_create(&logname, fsname, "-params");
1556         if (rc)
1557                 RETURN(rc);
1558         if (mgs_log_is_empty(env, mgs, logname)) {
1559                 struct llog_handle *llh = NULL;
1560                 rc = record_start_log(env, mgs, &llh, logname);
1561                 record_end_log(env, &llh);
1562         }
1563         name_destroy(&logname);
1564         if (rc)
1565                 RETURN(rc);
1566
1567         /* Find all the logs in the CONFIGS directory */
1568         rc = class_dentry_readdir(env, mgs, &list);
1569         if (rc)
1570                 RETURN(rc);
1571
1572         /* Could use fsdb index maps instead of directory listing */
1573         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1574                 cfs_list_del(&dirent->list);
1575                 /* don't write to sptlrpc rule log */
1576                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1577                         goto next;
1578
1579                 /* caller wants write server logs only */
1580                 if (server_only && strstr(dirent->name, "-client") != NULL)
1581                         goto next;
1582
1583                 if (strncmp(fsname, dirent->name, len) == 0) {
1584                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1585                         /* Erase any old settings of this same parameter */
1586                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1587                                         devname, comment, CM_SKIP);
1588                         if (rc < 0)
1589                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1590                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1591                         /* Write the new one */
1592                         if (lcfg) {
1593                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1594                                                           dirent->name,
1595                                                           lcfg, devname,
1596                                                           comment);
1597                                 if (rc)
1598                                         CERROR("%s: writing log %s: rc = %d\n",
1599                                                mgs->mgs_obd->obd_name,
1600                                                dirent->name, rc);
1601                         }
1602                 }
1603 next:
1604                 mgs_direntry_free(dirent);
1605         }
1606
1607         RETURN(rc);
1608 }
1609
1610 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1611                                     struct mgs_device *mgs,
1612                                     struct fs_db *fsdb,
1613                                     struct mgs_target_info *mti,
1614                                     int index, char *logname);
1615 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1616                                     struct mgs_device *mgs,
1617                                     struct fs_db *fsdb,
1618                                     struct mgs_target_info *mti,
1619                                     char *logname, char *suffix, char *lovname,
1620                                     enum lustre_sec_part sec_part, int flags);
1621 static int name_create_mdt_and_lov(char **logname, char **lovname,
1622                                    struct fs_db *fsdb, int i);
1623
1624 static int add_param(char *params, char *key, char *val)
1625 {
1626         char *start = params + strlen(params);
1627         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1628         int keylen = 0;
1629
1630         if (key != NULL)
1631                 keylen = strlen(key);
1632         if (start + 1 + keylen + strlen(val) >= end) {
1633                 CERROR("params are too long: %s %s%s\n",
1634                        params, key != NULL ? key : "", val);
1635                 return -EINVAL;
1636         }
1637
1638         sprintf(start, " %s%s", key != NULL ? key : "", val);
1639         return 0;
1640 }
1641
1642 static int mgs_steal_llog_handler(const struct lu_env *env,
1643                                   struct llog_handle *llh,
1644                                   struct llog_rec_hdr *rec, void *data)
1645 {
1646         struct mgs_device *mgs;
1647         struct obd_device *obd;
1648         struct mgs_target_info *mti, *tmti;
1649         struct fs_db *fsdb;
1650         int cfg_len = rec->lrh_len;
1651         char *cfg_buf = (char*) (rec + 1);
1652         struct lustre_cfg *lcfg;
1653         int rc = 0;
1654         struct llog_handle *mdt_llh = NULL;
1655         static int got_an_osc_or_mdc = 0;
1656         /* 0: not found any osc/mdc;
1657            1: found osc;
1658            2: found mdc;
1659         */
1660         static int last_step = -1;
1661         int cplen = 0;
1662
1663         ENTRY;
1664
1665         mti = ((struct temp_comp*)data)->comp_mti;
1666         tmti = ((struct temp_comp*)data)->comp_tmti;
1667         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1668         obd = ((struct temp_comp *)data)->comp_obd;
1669         mgs = lu2mgs_dev(obd->obd_lu_dev);
1670         LASSERT(mgs);
1671
1672         if (rec->lrh_type != OBD_CFG_REC) {
1673                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1674                 RETURN(-EINVAL);
1675         }
1676
1677         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1678         if (rc) {
1679                 CERROR("Insane cfg\n");
1680                 RETURN(rc);
1681         }
1682
1683         lcfg = (struct lustre_cfg *)cfg_buf;
1684
1685         if (lcfg->lcfg_command == LCFG_MARKER) {
1686                 struct cfg_marker *marker;
1687                 marker = lustre_cfg_buf(lcfg, 1);
1688                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1689                     (marker->cm_flags & CM_START) &&
1690                      !(marker->cm_flags & CM_SKIP)) {
1691                         got_an_osc_or_mdc = 1;
1692                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1693                                         sizeof(tmti->mti_svname));
1694                         if (cplen >= sizeof(tmti->mti_svname))
1695                                 RETURN(-E2BIG);
1696                         rc = record_start_log(env, mgs, &mdt_llh,
1697                                               mti->mti_svname);
1698                         if (rc)
1699                                 RETURN(rc);
1700                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1701                                            mti->mti_svname,"add osc(copied)");
1702                         record_end_log(env, &mdt_llh);
1703                         last_step = marker->cm_step;
1704                         RETURN(rc);
1705                 }
1706                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1707                     (marker->cm_flags & CM_END) &&
1708                      !(marker->cm_flags & CM_SKIP)) {
1709                         LASSERT(last_step == marker->cm_step);
1710                         last_step = -1;
1711                         got_an_osc_or_mdc = 0;
1712                         rc = record_start_log(env, mgs, &mdt_llh,
1713                                               mti->mti_svname);
1714                         if (rc)
1715                                 RETURN(rc);
1716                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1717                                            mti->mti_svname,"add osc(copied)");
1718                         record_end_log(env, &mdt_llh);
1719                         RETURN(rc);
1720                 }
1721                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1722                     (marker->cm_flags & CM_START) &&
1723                      !(marker->cm_flags & CM_SKIP)) {
1724                         got_an_osc_or_mdc = 2;
1725                         last_step = marker->cm_step;
1726                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1727                                strlen(marker->cm_tgtname));
1728
1729                         RETURN(rc);
1730                 }
1731                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1732                     (marker->cm_flags & CM_END) &&
1733                      !(marker->cm_flags & CM_SKIP)) {
1734                         LASSERT(last_step == marker->cm_step);
1735                         last_step = -1;
1736                         got_an_osc_or_mdc = 0;
1737                         RETURN(rc);
1738                 }
1739         }
1740
1741         if (got_an_osc_or_mdc == 0 || last_step < 0)
1742                 RETURN(rc);
1743
1744         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1745                 uint64_t nodenid = lcfg->lcfg_nid;
1746
1747                 if (strlen(tmti->mti_uuid) == 0) {
1748                         /* target uuid not set, this config record is before
1749                          * LCFG_SETUP, this nid is one of target node nid.
1750                          */
1751                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1752                         tmti->mti_nid_count++;
1753                 } else {
1754                         /* failover node nid */
1755                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1756                                        libcfs_nid2str(nodenid));
1757                 }
1758
1759                 RETURN(rc);
1760         }
1761
1762         if (lcfg->lcfg_command == LCFG_SETUP) {
1763                 char *target;
1764
1765                 target = lustre_cfg_string(lcfg, 1);
1766                 memcpy(tmti->mti_uuid, target, strlen(target));
1767                 RETURN(rc);
1768         }
1769
1770         /* ignore client side sptlrpc_conf_log */
1771         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1772                 RETURN(rc);
1773
1774         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1775                 int index;
1776
1777                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1778                         RETURN (-EINVAL);
1779
1780                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1781                        strlen(mti->mti_fsname));
1782                 tmti->mti_stripe_index = index;
1783
1784                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1785                                               mti->mti_stripe_index,
1786                                               mti->mti_svname);
1787                 memset(tmti, 0, sizeof(*tmti));
1788                 RETURN(rc);
1789         }
1790
1791         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1792                 int index;
1793                 char mdt_index[9];
1794                 char *logname, *lovname;
1795
1796                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1797                                              mti->mti_stripe_index);
1798                 if (rc)
1799                         RETURN(rc);
1800                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1801
1802                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1803                         name_destroy(&logname);
1804                         name_destroy(&lovname);
1805                         RETURN(-EINVAL);
1806                 }
1807
1808                 tmti->mti_stripe_index = index;
1809                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1810                                          mdt_index, lovname,
1811                                          LUSTRE_SP_MDT, 0);
1812                 name_destroy(&logname);
1813                 name_destroy(&lovname);
1814                 RETURN(rc);
1815         }
1816         RETURN(rc);
1817 }
1818
1819 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1820 /* stealed from mgs_get_fsdb_from_llog*/
1821 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1822                                               struct mgs_device *mgs,
1823                                               char *client_name,
1824                                               struct temp_comp* comp)
1825 {
1826         struct llog_handle *loghandle;
1827         struct mgs_target_info *tmti;
1828         struct llog_ctxt *ctxt;
1829         int rc;
1830
1831         ENTRY;
1832
1833         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1834         LASSERT(ctxt != NULL);
1835
1836         OBD_ALLOC_PTR(tmti);
1837         if (tmti == NULL)
1838                 GOTO(out_ctxt, rc = -ENOMEM);
1839
1840         comp->comp_tmti = tmti;
1841         comp->comp_obd = mgs->mgs_obd;
1842
1843         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1844                        LLOG_OPEN_EXISTS);
1845         if (rc < 0) {
1846                 if (rc == -ENOENT)
1847                         rc = 0;
1848                 GOTO(out_pop, rc);
1849         }
1850
1851         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1852         if (rc)
1853                 GOTO(out_close, rc);
1854
1855         rc = llog_process_or_fork(env, loghandle, mgs_steal_llog_handler,
1856                                   (void *)comp, NULL, false);
1857         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1858 out_close:
1859         llog_close(env, loghandle);
1860 out_pop:
1861         OBD_FREE_PTR(tmti);
1862 out_ctxt:
1863         llog_ctxt_put(ctxt);
1864         RETURN(rc);
1865 }
1866
1867 /* lmv is the second thing for client logs */
1868 /* copied from mgs_write_log_lov. Please refer to that.  */
1869 static int mgs_write_log_lmv(const struct lu_env *env,
1870                              struct mgs_device *mgs,
1871                              struct fs_db *fsdb,
1872                              struct mgs_target_info *mti,
1873                              char *logname, char *lmvname)
1874 {
1875         struct llog_handle *llh = NULL;
1876         struct lmv_desc *lmvdesc;
1877         char *uuid;
1878         int rc = 0;
1879         ENTRY;
1880
1881         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1882
1883         OBD_ALLOC_PTR(lmvdesc);
1884         if (lmvdesc == NULL)
1885                 RETURN(-ENOMEM);
1886         lmvdesc->ld_active_tgt_count = 0;
1887         lmvdesc->ld_tgt_count = 0;
1888         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1889         uuid = (char *)lmvdesc->ld_uuid.uuid;
1890
1891         rc = record_start_log(env, mgs, &llh, logname);
1892         if (rc)
1893                 GOTO(out_free, rc);
1894         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1895         if (rc)
1896                 GOTO(out_end, rc);
1897         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1898         if (rc)
1899                 GOTO(out_end, rc);
1900         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1901         if (rc)
1902                 GOTO(out_end, rc);
1903         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1904         if (rc)
1905                 GOTO(out_end, rc);
1906 out_end:
1907         record_end_log(env, &llh);
1908 out_free:
1909         OBD_FREE_PTR(lmvdesc);
1910         RETURN(rc);
1911 }
1912
1913 /* lov is the first thing in the mdt and client logs */
1914 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1915                              struct fs_db *fsdb, struct mgs_target_info *mti,
1916                              char *logname, char *lovname)
1917 {
1918         struct llog_handle *llh = NULL;
1919         struct lov_desc *lovdesc;
1920         char *uuid;
1921         int rc = 0;
1922         ENTRY;
1923
1924         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1925
1926         /*
1927         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1928         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1929               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1930         */
1931
1932         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1933         OBD_ALLOC_PTR(lovdesc);
1934         if (lovdesc == NULL)
1935                 RETURN(-ENOMEM);
1936         lovdesc->ld_magic = LOV_DESC_MAGIC;
1937         lovdesc->ld_tgt_count = 0;
1938         /* Defaults.  Can be changed later by lcfg config_param */
1939         lovdesc->ld_default_stripe_count = 1;
1940         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1941         lovdesc->ld_default_stripe_size = 1024 * 1024;
1942         lovdesc->ld_default_stripe_offset = -1;
1943         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1944         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1945         /* can these be the same? */
1946         uuid = (char *)lovdesc->ld_uuid.uuid;
1947
1948         /* This should always be the first entry in a log.
1949         rc = mgs_clear_log(obd, logname); */
1950         rc = record_start_log(env, mgs, &llh, logname);
1951         if (rc)
1952                 GOTO(out_free, rc);
1953         /* FIXME these should be a single journal transaction */
1954         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1955         if (rc)
1956                 GOTO(out_end, rc);
1957         rc = record_attach(env, llh, lovname, "lov", uuid);
1958         if (rc)
1959                 GOTO(out_end, rc);
1960         rc = record_lov_setup(env, llh, lovname, lovdesc);
1961         if (rc)
1962                 GOTO(out_end, rc);
1963         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1964         if (rc)
1965                 GOTO(out_end, rc);
1966         EXIT;
1967 out_end:
1968         record_end_log(env, &llh);
1969 out_free:
1970         OBD_FREE_PTR(lovdesc);
1971         return rc;
1972 }
1973
1974 /* add failnids to open log */
1975 static int mgs_write_log_failnids(const struct lu_env *env,
1976                                   struct mgs_target_info *mti,
1977                                   struct llog_handle *llh,
1978                                   char *cliname)
1979 {
1980         char *failnodeuuid = NULL;
1981         char *ptr = mti->mti_params;
1982         lnet_nid_t nid;
1983         int rc = 0;
1984
1985         /*
1986         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1987         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1988         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1989         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1990         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1991         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1992         */
1993
1994         /* Pull failnid info out of params string */
1995         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1996                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1997                         if (failnodeuuid == NULL) {
1998                                 /* We don't know the failover node name,
1999                                    so just use the first nid as the uuid */
2000                                 rc = name_create(&failnodeuuid,
2001                                                  libcfs_nid2str(nid), "");
2002                                 if (rc)
2003                                         return rc;
2004                         }
2005                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2006                                "client %s\n", libcfs_nid2str(nid),
2007                                failnodeuuid, cliname);
2008                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2009                 }
2010                 if (failnodeuuid)
2011                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2012         }
2013
2014         name_destroy(&failnodeuuid);
2015         return rc;
2016 }
2017
2018 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2019                                     struct mgs_device *mgs,
2020                                     struct fs_db *fsdb,
2021                                     struct mgs_target_info *mti,
2022                                     char *logname, char *lmvname)
2023 {
2024         struct llog_handle *llh = NULL;
2025         char *mdcname = NULL;
2026         char *nodeuuid = NULL;
2027         char *mdcuuid = NULL;
2028         char *lmvuuid = NULL;
2029         char index[6];
2030         int i, rc;
2031         ENTRY;
2032
2033         if (mgs_log_is_empty(env, mgs, logname)) {
2034                 CERROR("log is empty! Logical error\n");
2035                 RETURN(-EINVAL);
2036         }
2037
2038         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2039                mti->mti_svname, logname, lmvname);
2040
2041         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2042         if (rc)
2043                 RETURN(rc);
2044         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2045         if (rc)
2046                 GOTO(out_free, rc);
2047         rc = name_create(&mdcuuid, mdcname, "_UUID");
2048         if (rc)
2049                 GOTO(out_free, rc);
2050         rc = name_create(&lmvuuid, lmvname, "_UUID");
2051         if (rc)
2052                 GOTO(out_free, rc);
2053
2054         rc = record_start_log(env, mgs, &llh, logname);
2055         if (rc)
2056                 GOTO(out_free, rc);
2057         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2058                            "add mdc");
2059         if (rc)
2060                 GOTO(out_end, rc);
2061         for (i = 0; i < mti->mti_nid_count; i++) {
2062                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2063                        libcfs_nid2str(mti->mti_nids[i]));
2064
2065                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2066                 if (rc)
2067                         GOTO(out_end, rc);
2068         }
2069
2070         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2071         if (rc)
2072                 GOTO(out_end, rc);
2073         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
2074         if (rc)
2075                 GOTO(out_end, rc);
2076         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2077         if (rc)
2078                 GOTO(out_end, rc);
2079         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2080         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2081                             index, "1");
2082         if (rc)
2083                 GOTO(out_end, rc);
2084         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2085                            "add mdc");
2086         if (rc)
2087                 GOTO(out_end, rc);
2088 out_end:
2089         record_end_log(env, &llh);
2090 out_free:
2091         name_destroy(&lmvuuid);
2092         name_destroy(&mdcuuid);
2093         name_destroy(&mdcname);
2094         name_destroy(&nodeuuid);
2095         RETURN(rc);
2096 }
2097
2098 static inline int name_create_lov(char **lovname, char *mdtname,
2099                                   struct fs_db *fsdb, int index)
2100 {
2101         /* COMPAT_180 */
2102         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2103                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2104         else
2105                 return name_create(lovname, mdtname, "-mdtlov");
2106 }
2107
2108 static int name_create_mdt_and_lov(char **logname, char **lovname,
2109                                    struct fs_db *fsdb, int i)
2110 {
2111         int rc;
2112
2113         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2114         if (rc)
2115                 return rc;
2116         /* COMPAT_180 */
2117         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2118                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2119         else
2120                 rc = name_create(lovname, *logname, "-mdtlov");
2121         if (rc) {
2122                 name_destroy(logname);
2123                 *logname = NULL;
2124         }
2125         return rc;
2126 }
2127
2128 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2129                                       struct fs_db *fsdb, int i)
2130 {
2131         char suffix[16];
2132
2133         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2134                 sprintf(suffix, "-osc");
2135         else
2136                 sprintf(suffix, "-osc-MDT%04x", i);
2137         return name_create(oscname, ostname, suffix);
2138 }
2139
2140 /* add new mdc to already existent MDS */
2141 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2142                                     struct mgs_device *mgs,
2143                                     struct fs_db *fsdb,
2144                                     struct mgs_target_info *mti,
2145                                     int mdt_index, char *logname)
2146 {
2147         struct llog_handle      *llh = NULL;
2148         char    *nodeuuid = NULL;
2149         char    *ospname = NULL;
2150         char    *lovuuid = NULL;
2151         char    *mdtuuid = NULL;
2152         char    *svname = NULL;
2153         char    *mdtname = NULL;
2154         char    *lovname = NULL;
2155         char    index_str[16];
2156         int     i, rc;
2157
2158         ENTRY;
2159         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2160                 CERROR("log is empty! Logical error\n");
2161                 RETURN (-EINVAL);
2162         }
2163
2164         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2165                logname);
2166
2167         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2168         if (rc)
2169                 RETURN(rc);
2170
2171         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2172         if (rc)
2173                 GOTO(out_destory, rc);
2174
2175         rc = name_create(&svname, mdtname, "-osp");
2176         if (rc)
2177                 GOTO(out_destory, rc);
2178
2179         sprintf(index_str, "-MDT%04x", mdt_index);
2180         rc = name_create(&ospname, svname, index_str);
2181         if (rc)
2182                 GOTO(out_destory, rc);
2183
2184         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2185         if (rc)
2186                 GOTO(out_destory, rc);
2187
2188         rc = name_create(&lovuuid, lovname, "_UUID");
2189         if (rc)
2190                 GOTO(out_destory, rc);
2191
2192         rc = name_create(&mdtuuid, mdtname, "_UUID");
2193         if (rc)
2194                 GOTO(out_destory, rc);
2195
2196         rc = record_start_log(env, mgs, &llh, logname);
2197         if (rc)
2198                 GOTO(out_destory, rc);
2199
2200         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2201                            "add osp");
2202         if (rc)
2203                 GOTO(out_destory, rc);
2204
2205         for (i = 0; i < mti->mti_nid_count; i++) {
2206                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2207                        libcfs_nid2str(mti->mti_nids[i]));
2208                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2209                 if (rc)
2210                         GOTO(out_end, rc);
2211         }
2212
2213         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2214         if (rc)
2215                 GOTO(out_end, rc);
2216
2217         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2218                           NULL, NULL);
2219         if (rc)
2220                 GOTO(out_end, rc);
2221
2222         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2223         if (rc)
2224                 GOTO(out_end, rc);
2225
2226         /* Add mdc(osp) to lod */
2227         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2228                  mti->mti_stripe_index);
2229         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2230                          index_str, "1", NULL);
2231         if (rc)
2232                 GOTO(out_end, rc);
2233
2234         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2235         if (rc)
2236                 GOTO(out_end, rc);
2237
2238 out_end:
2239         record_end_log(env, &llh);
2240
2241 out_destory:
2242         name_destroy(&mdtuuid);
2243         name_destroy(&lovuuid);
2244         name_destroy(&lovname);
2245         name_destroy(&ospname);
2246         name_destroy(&svname);
2247         name_destroy(&nodeuuid);
2248         name_destroy(&mdtname);
2249         RETURN(rc);
2250 }
2251
2252 static int mgs_write_log_mdt0(const struct lu_env *env,
2253                               struct mgs_device *mgs,
2254                               struct fs_db *fsdb,
2255                               struct mgs_target_info *mti)
2256 {
2257         char *log = mti->mti_svname;
2258         struct llog_handle *llh = NULL;
2259         char *uuid, *lovname;
2260         char mdt_index[6];
2261         char *ptr = mti->mti_params;
2262         int rc = 0, failout = 0;
2263         ENTRY;
2264
2265         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2266         if (uuid == NULL)
2267                 RETURN(-ENOMEM);
2268
2269         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2270                 failout = (strncmp(ptr, "failout", 7) == 0);
2271
2272         rc = name_create(&lovname, log, "-mdtlov");
2273         if (rc)
2274                 GOTO(out_free, rc);
2275         if (mgs_log_is_empty(env, mgs, log)) {
2276                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2277                 if (rc)
2278                         GOTO(out_lod, rc);
2279         }
2280
2281         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2282
2283         rc = record_start_log(env, mgs, &llh, log);
2284         if (rc)
2285                 GOTO(out_lod, rc);
2286
2287         /* add MDT itself */
2288
2289         /* FIXME this whole fn should be a single journal transaction */
2290         sprintf(uuid, "%s_UUID", log);
2291         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2292         if (rc)
2293                 GOTO(out_lod, rc);
2294         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2295         if (rc)
2296                 GOTO(out_end, rc);
2297         rc = record_mount_opt(env, llh, log, lovname, NULL);
2298         if (rc)
2299                 GOTO(out_end, rc);
2300         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2301                         failout ? "n" : "f");
2302         if (rc)
2303                 GOTO(out_end, rc);
2304         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2305         if (rc)
2306                 GOTO(out_end, rc);
2307 out_end:
2308         record_end_log(env, &llh);
2309 out_lod:
2310         name_destroy(&lovname);
2311 out_free:
2312         OBD_FREE(uuid, sizeof(struct obd_uuid));
2313         RETURN(rc);
2314 }
2315
2316 /* envelope method for all layers log */
2317 static int mgs_write_log_mdt(const struct lu_env *env,
2318                              struct mgs_device *mgs,
2319                              struct fs_db *fsdb,
2320                              struct mgs_target_info *mti)
2321 {
2322         struct mgs_thread_info *mgi = mgs_env_info(env);
2323         struct llog_handle *llh = NULL;
2324         char *cliname;
2325         int rc, i = 0;
2326         ENTRY;
2327
2328         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2329
2330         if (mti->mti_uuid[0] == '\0') {
2331                 /* Make up our own uuid */
2332                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2333                          "%s_UUID", mti->mti_svname);
2334         }
2335
2336         /* add mdt */
2337         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2338         if (rc)
2339                 RETURN(rc);
2340         /* Append the mdt info to the client log */
2341         rc = name_create(&cliname, mti->mti_fsname, "-client");
2342         if (rc)
2343                 RETURN(rc);
2344
2345         if (mgs_log_is_empty(env, mgs, cliname)) {
2346                 /* Start client log */
2347                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2348                                        fsdb->fsdb_clilov);
2349                 if (rc)
2350                         GOTO(out_free, rc);
2351                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2352                                        fsdb->fsdb_clilmv);
2353                 if (rc)
2354                         GOTO(out_free, rc);
2355         }
2356
2357         /*
2358         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2359         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2360         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2361         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2362         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2363         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2364         */
2365
2366                 /* copy client info about lov/lmv */
2367                 mgi->mgi_comp.comp_mti = mti;
2368                 mgi->mgi_comp.comp_fsdb = fsdb;
2369
2370                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2371                                                         &mgi->mgi_comp);
2372                 if (rc)
2373                         GOTO(out_free, rc);
2374                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2375                                               fsdb->fsdb_clilmv);
2376                 if (rc)
2377                         GOTO(out_free, rc);
2378
2379                 /* add mountopts */
2380                 rc = record_start_log(env, mgs, &llh, cliname);
2381                 if (rc)
2382                         GOTO(out_free, rc);
2383
2384                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2385                                    "mount opts");
2386                 if (rc)
2387                         GOTO(out_end, rc);
2388                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2389                                       fsdb->fsdb_clilmv);
2390                 if (rc)
2391                         GOTO(out_end, rc);
2392                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2393                                    "mount opts");
2394
2395         if (rc)
2396                 GOTO(out_end, rc);
2397
2398         /* for_all_existing_mdt except current one */
2399         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2400                 if (i !=  mti->mti_stripe_index &&
2401                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2402                         char *logname;
2403
2404                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2405                         if (rc)
2406                                 GOTO(out_end, rc);
2407
2408                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2409                                                       i, logname);
2410                         name_destroy(&logname);
2411                         if (rc)
2412                                 GOTO(out_end, rc);
2413                 }
2414         }
2415 out_end:
2416         record_end_log(env, &llh);
2417 out_free:
2418         name_destroy(&cliname);
2419         RETURN(rc);
2420 }
2421
2422 /* Add the ost info to the client/mdt lov */
2423 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2424                                     struct mgs_device *mgs, struct fs_db *fsdb,
2425                                     struct mgs_target_info *mti,
2426                                     char *logname, char *suffix, char *lovname,
2427                                     enum lustre_sec_part sec_part, int flags)
2428 {
2429         struct llog_handle *llh = NULL;
2430         char *nodeuuid = NULL;
2431         char *oscname = NULL;
2432         char *oscuuid = NULL;
2433         char *lovuuid = NULL;
2434         char *svname = NULL;
2435         char index[6];
2436         int i, rc;
2437
2438         ENTRY;
2439         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2440                mti->mti_svname, logname);
2441
2442         if (mgs_log_is_empty(env, mgs, logname)) {
2443                 CERROR("log is empty! Logical error\n");
2444                 RETURN (-EINVAL);
2445         }
2446
2447         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2448         if (rc)
2449                 RETURN(rc);
2450         rc = name_create(&svname, mti->mti_svname, "-osc");
2451         if (rc)
2452                 GOTO(out_free, rc);
2453
2454         /* for the system upgraded from old 1.8, keep using the old osc naming
2455          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2456         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2457                 rc = name_create(&oscname, svname, "");
2458         else
2459                 rc = name_create(&oscname, svname, suffix);
2460         if (rc)
2461                 GOTO(out_free, rc);
2462
2463         rc = name_create(&oscuuid, oscname, "_UUID");
2464         if (rc)
2465                 GOTO(out_free, rc);
2466         rc = name_create(&lovuuid, lovname, "_UUID");
2467         if (rc)
2468                 GOTO(out_free, rc);
2469
2470
2471         /*
2472         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2473         multihomed (#4)
2474         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2475         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2476         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2477         failover (#6,7)
2478         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2479         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2480         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2481         */
2482
2483         rc = record_start_log(env, mgs, &llh, logname);
2484         if (rc)
2485                 GOTO(out_free, rc);
2486
2487         /* FIXME these should be a single journal transaction */
2488         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2489                            "add osc");
2490         if (rc)
2491                 GOTO(out_end, rc);
2492
2493         /* NB: don't change record order, because upon MDT steal OSC config
2494          * from client, it treats all nids before LCFG_SETUP as target nids
2495          * (multiple interfaces), while nids after as failover node nids.
2496          * See mgs_steal_llog_handler() LCFG_ADD_UUID.
2497          */
2498         for (i = 0; i < mti->mti_nid_count; i++) {
2499                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2500                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2501                 if (rc)
2502                         GOTO(out_end, rc);
2503         }
2504         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2505         if (rc)
2506                 GOTO(out_end, rc);
2507         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2508         if (rc)
2509                 GOTO(out_end, rc);
2510         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2511         if (rc)
2512                 GOTO(out_end, rc);
2513
2514         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2515
2516         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2517         if (rc)
2518                 GOTO(out_end, rc);
2519         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2520                            "add osc");
2521         if (rc)
2522                 GOTO(out_end, rc);
2523 out_end:
2524         record_end_log(env, &llh);
2525 out_free:
2526         name_destroy(&lovuuid);
2527         name_destroy(&oscuuid);
2528         name_destroy(&oscname);
2529         name_destroy(&svname);
2530         name_destroy(&nodeuuid);
2531         RETURN(rc);
2532 }
2533
2534 static int mgs_write_log_ost(const struct lu_env *env,
2535                              struct mgs_device *mgs, struct fs_db *fsdb,
2536                              struct mgs_target_info *mti)
2537 {
2538         struct llog_handle *llh = NULL;
2539         char *logname, *lovname;
2540         char *ptr = mti->mti_params;
2541         int rc, flags = 0, failout = 0, i;
2542         ENTRY;
2543
2544         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2545
2546         /* The ost startup log */
2547
2548         /* If the ost log already exists, that means that someone reformatted
2549            the ost and it called target_add again. */
2550         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2551                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2552                                    "exists, yet the server claims it never "
2553                                    "registered. It may have been reformatted, "
2554                                    "or the index changed. writeconf the MDT to "
2555                                    "regenerate all logs.\n", mti->mti_svname);
2556                 RETURN(-EALREADY);
2557         }
2558
2559         /*
2560         attach obdfilter ost1 ost1_UUID
2561         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2562         */
2563         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2564                 failout = (strncmp(ptr, "failout", 7) == 0);
2565         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2566         if (rc)
2567                 RETURN(rc);
2568         /* FIXME these should be a single journal transaction */
2569         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2570         if (rc)
2571                 GOTO(out_end, rc);
2572         if (*mti->mti_uuid == '\0')
2573                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2574                          "%s_UUID", mti->mti_svname);
2575         rc = record_attach(env, llh, mti->mti_svname,
2576                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2577         if (rc)
2578                 GOTO(out_end, rc);
2579         rc = record_setup(env, llh, mti->mti_svname,
2580                           "dev"/*ignored*/, "type"/*ignored*/,
2581                           failout ? "n" : "f", 0/*options*/);
2582         if (rc)
2583                 GOTO(out_end, rc);
2584         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2585         if (rc)
2586                 GOTO(out_end, rc);
2587 out_end:
2588         record_end_log(env, &llh);
2589         if (rc)
2590                 RETURN(rc);
2591         /* We also have to update the other logs where this osc is part of
2592            the lov */
2593
2594         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2595                 /* If we're upgrading, the old mdt log already has our
2596                    entry. Let's do a fake one for fun. */
2597                 /* Note that we can't add any new failnids, since we don't
2598                    know the old osc names. */
2599                 flags = CM_SKIP | CM_UPGRADE146;
2600
2601         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2602                 /* If the update flag isn't set, don't update client/mdt
2603                    logs. */
2604                 flags |= CM_SKIP;
2605                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2606                               "the MDT first to regenerate it.\n",
2607                               mti->mti_svname);
2608         }
2609
2610         /* Add ost to all MDT lov defs */
2611         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2612                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2613                         char mdt_index[9];
2614
2615                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2616                                                      i);
2617                         if (rc)
2618                                 RETURN(rc);
2619                         sprintf(mdt_index, "-MDT%04x", i);
2620                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2621                                                       logname, mdt_index,
2622                                                       lovname, LUSTRE_SP_MDT,
2623                                                       flags);
2624                         name_destroy(&logname);
2625                         name_destroy(&lovname);
2626                         if (rc)
2627                                 RETURN(rc);
2628                 }
2629         }
2630
2631         /* Append ost info to the client log */
2632         rc = name_create(&logname, mti->mti_fsname, "-client");
2633         if (rc)
2634                 RETURN(rc);
2635         if (mgs_log_is_empty(env, mgs, logname)) {
2636                 /* Start client log */
2637                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2638                                        fsdb->fsdb_clilov);
2639                 if (rc)
2640                         GOTO(out_free, rc);
2641                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2642                                        fsdb->fsdb_clilmv);
2643                 if (rc)
2644                         GOTO(out_free, rc);
2645         }
2646         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2647                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2648 out_free:
2649         name_destroy(&logname);
2650         RETURN(rc);
2651 }
2652
2653 static __inline__ int mgs_param_empty(char *ptr)
2654 {
2655         char *tmp;
2656
2657         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2658                 return 1;
2659         return 0;
2660 }
2661
2662 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2663                                           struct mgs_device *mgs,
2664                                           struct fs_db *fsdb,
2665                                           struct mgs_target_info *mti,
2666                                           char *logname, char *cliname)
2667 {
2668         int rc;
2669         struct llog_handle *llh = NULL;
2670
2671         if (mgs_param_empty(mti->mti_params)) {
2672                 /* Remove _all_ failnids */
2673                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2674                                 mti->mti_svname, "add failnid", CM_SKIP);
2675                 return rc < 0 ? rc : 0;
2676         }
2677
2678         /* Otherwise failover nids are additive */
2679         rc = record_start_log(env, mgs, &llh, logname);
2680         if (rc)
2681                 return rc;
2682                 /* FIXME this should be a single journal transaction */
2683         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2684                            "add failnid");
2685         if (rc)
2686                 goto out_end;
2687         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2688         if (rc)
2689                 goto out_end;
2690         rc = record_marker(env, llh, fsdb, CM_END,
2691                            mti->mti_svname, "add failnid");
2692 out_end:
2693         record_end_log(env, &llh);
2694         return rc;
2695 }
2696
2697
2698 /* Add additional failnids to an existing log.
2699    The mdc/osc must have been added to logs first */
2700 /* tcp nids must be in dotted-quad ascii -
2701    we can't resolve hostnames from the kernel. */
2702 static int mgs_write_log_add_failnid(const struct lu_env *env,
2703                                      struct mgs_device *mgs,
2704                                      struct fs_db *fsdb,
2705                                      struct mgs_target_info *mti)
2706 {
2707         char *logname, *cliname;
2708         int rc;
2709         ENTRY;
2710
2711         /* FIXME we currently can't erase the failnids
2712          * given when a target first registers, since they aren't part of
2713          * an "add uuid" stanza */
2714
2715         /* Verify that we know about this target */
2716         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2717                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2718                                    "yet. It must be started before failnids "
2719                                    "can be added.\n", mti->mti_svname);
2720                 RETURN(-ENOENT);
2721         }
2722
2723         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2724         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2725                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2726         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2727                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2728         } else {
2729                 RETURN(-EINVAL);
2730         }
2731         if (rc)
2732                 RETURN(rc);
2733         /* Add failover nids to the client log */
2734         rc = name_create(&logname, mti->mti_fsname, "-client");
2735         if (rc) {
2736                 name_destroy(&cliname);
2737                 RETURN(rc);
2738         }
2739         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2740         name_destroy(&logname);
2741         name_destroy(&cliname);
2742         if (rc)
2743                 RETURN(rc);
2744
2745         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2746                 /* Add OST failover nids to the MDT logs as well */
2747                 int i;
2748
2749                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2750                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2751                                 continue;
2752                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2753                         if (rc)
2754                                 RETURN(rc);
2755                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2756                                                  fsdb, i);
2757                         if (rc) {
2758                                 name_destroy(&logname);
2759                                 RETURN(rc);
2760                         }
2761                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2762                                                             mti, logname,
2763                                                             cliname);
2764                         name_destroy(&cliname);
2765                         name_destroy(&logname);
2766                         if (rc)
2767                                 RETURN(rc);
2768                 }
2769         }
2770
2771         RETURN(rc);
2772 }
2773
2774 static int mgs_wlp_lcfg(const struct lu_env *env,
2775                         struct mgs_device *mgs, struct fs_db *fsdb,
2776                         struct mgs_target_info *mti,
2777                         char *logname, struct lustre_cfg_bufs *bufs,
2778                         char *tgtname, char *ptr)
2779 {
2780         char comment[MTI_NAME_MAXLEN];
2781         char *tmp;
2782         struct lustre_cfg *lcfg;
2783         int rc, del;
2784
2785         /* Erase any old settings of this same parameter */
2786         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2787         comment[MTI_NAME_MAXLEN - 1] = 0;
2788         /* But don't try to match the value. */
2789         if ((tmp = strchr(comment, '=')))
2790             *tmp = 0;
2791         /* FIXME we should skip settings that are the same as old values */
2792         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2793         if (rc < 0)
2794                 return rc;
2795         del = mgs_param_empty(ptr);
2796
2797         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2798                       "Sett" : "Modify", tgtname, comment, logname);
2799         if (del)
2800                 return rc;
2801
2802         lustre_cfg_bufs_reset(bufs, tgtname);
2803         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2804         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2805         if (!lcfg)
2806                 return -ENOMEM;
2807         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2808         lustre_cfg_free(lcfg);
2809         return rc;
2810 }
2811
2812 /* write global variable settings into log */
2813 static int mgs_write_log_sys(const struct lu_env *env,
2814                              struct mgs_device *mgs, struct fs_db *fsdb,
2815                              struct mgs_target_info *mti, char *sys, char *ptr)
2816 {
2817         struct mgs_thread_info *mgi = mgs_env_info(env);
2818         struct lustre_cfg *lcfg;
2819         char *tmp, sep;
2820         int rc, cmd, convert = 1;
2821
2822         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2823                 cmd = LCFG_SET_TIMEOUT;
2824         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2825                 cmd = LCFG_SET_LDLM_TIMEOUT;
2826         /* Check for known params here so we can return error to lctl */
2827         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2828                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2829                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2830                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2831                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2832                 cmd = LCFG_PARAM;
2833         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2834                 convert = 0; /* Don't convert string value to integer */
2835                 cmd = LCFG_PARAM;
2836         } else {
2837                 return -EINVAL;
2838         }
2839
2840         if (mgs_param_empty(ptr))
2841                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2842         else
2843                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2844
2845         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2846         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2847         if (!convert && *tmp != '\0')
2848                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2849         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2850         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2851         /* truncate the comment to the parameter name */
2852         ptr = tmp - 1;
2853         sep = *ptr;
2854         *ptr = '\0';
2855         /* modify all servers and clients */
2856         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2857                                       *tmp == '\0' ? NULL : lcfg,
2858                                       mti->mti_fsname, sys, 0);
2859         if (rc == 0 && *tmp != '\0') {
2860                 switch (cmd) {
2861                 case LCFG_SET_TIMEOUT:
2862                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2863                                 class_process_config(lcfg);
2864                         break;
2865                 case LCFG_SET_LDLM_TIMEOUT:
2866                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2867                                 class_process_config(lcfg);
2868                         break;
2869                 default:
2870                         break;
2871                 }
2872         }
2873         *ptr = sep;
2874         lustre_cfg_free(lcfg);
2875         return rc;
2876 }
2877
2878 /* write quota settings into log */
2879 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2880                                struct fs_db *fsdb, struct mgs_target_info *mti,
2881                                char *quota, char *ptr)
2882 {
2883         struct mgs_thread_info  *mgi = mgs_env_info(env);
2884         struct lustre_cfg       *lcfg;
2885         char                    *tmp;
2886         char                     sep;
2887         int                      rc, cmd = LCFG_PARAM;
2888
2889         /* support only 'meta' and 'data' pools so far */
2890         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2891             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2892                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2893                        "& quota.ost are)\n", ptr);
2894                 return -EINVAL;
2895         }
2896
2897         if (*tmp == '\0') {
2898                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2899         } else {
2900                 CDEBUG(D_MGS, "global '%s'\n", quota);
2901
2902                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2903                     strcmp(tmp, "none") != 0) {
2904                         CERROR("enable option(%s) isn't supported\n", tmp);
2905                         return -EINVAL;
2906                 }
2907         }
2908
2909         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2910         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2911         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2912         /* truncate the comment to the parameter name */
2913         ptr = tmp - 1;
2914         sep = *ptr;
2915         *ptr = '\0';
2916
2917         /* XXX we duplicated quota enable information in all server
2918          *     config logs, it should be moved to a separate config
2919          *     log once we cleanup the config log for global param. */
2920         /* modify all servers */
2921         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2922                                       *tmp == '\0' ? NULL : lcfg,
2923                                       mti->mti_fsname, quota, 1);
2924         *ptr = sep;
2925         lustre_cfg_free(lcfg);
2926         return rc < 0 ? rc : 0;
2927 }
2928
2929 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2930                                    struct mgs_device *mgs,
2931                                    struct fs_db *fsdb,
2932                                    struct mgs_target_info *mti,
2933                                    char *param)
2934 {
2935         struct mgs_thread_info *mgi = mgs_env_info(env);
2936         struct llog_handle     *llh = NULL;
2937         char                   *logname;
2938         char                   *comment, *ptr;
2939         struct lustre_cfg      *lcfg;
2940         int                     rc, len;
2941         ENTRY;
2942
2943         /* get comment */
2944         ptr = strchr(param, '=');
2945         LASSERT(ptr);
2946         len = ptr - param;
2947
2948         OBD_ALLOC(comment, len + 1);
2949         if (comment == NULL)
2950                 RETURN(-ENOMEM);
2951         strncpy(comment, param, len);
2952         comment[len] = '\0';
2953
2954         /* prepare lcfg */
2955         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2956         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2957         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2958         if (lcfg == NULL)
2959                 GOTO(out_comment, rc = -ENOMEM);
2960
2961         /* construct log name */
2962         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2963         if (rc)
2964                 GOTO(out_lcfg, rc);
2965
2966         if (mgs_log_is_empty(env, mgs, logname)) {
2967                 rc = record_start_log(env, mgs, &llh, logname);
2968                 if (rc)
2969                         GOTO(out, rc);
2970                 record_end_log(env, &llh);
2971         }
2972
2973         /* obsolete old one */
2974         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2975                         comment, CM_SKIP);
2976         if (rc < 0)
2977                 GOTO(out, rc);
2978         /* write the new one */
2979         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2980                                   mti->mti_svname, comment);
2981         if (rc)
2982                 CERROR("err %d writing log %s\n", rc, logname);
2983 out:
2984         name_destroy(&logname);
2985 out_lcfg:
2986         lustre_cfg_free(lcfg);
2987 out_comment:
2988         OBD_FREE(comment, len + 1);
2989         RETURN(rc);
2990 }
2991
2992 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2993                                         char *param)
2994 {
2995         char    *ptr;
2996
2997         /* disable the adjustable udesc parameter for now, i.e. use default
2998          * setting that client always ship udesc to MDT if possible. to enable
2999          * it simply remove the following line */
3000         goto error_out;
3001
3002         ptr = strchr(param, '=');
3003         if (ptr == NULL)
3004                 goto error_out;
3005         *ptr++ = '\0';
3006
3007         if (strcmp(param, PARAM_SRPC_UDESC))
3008                 goto error_out;
3009
3010         if (strcmp(ptr, "yes") == 0) {
3011                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3012                 CWARN("Enable user descriptor shipping from client to MDT\n");
3013         } else if (strcmp(ptr, "no") == 0) {
3014                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3015                 CWARN("Disable user descriptor shipping from client to MDT\n");
3016         } else {
3017                 *(ptr - 1) = '=';
3018                 goto error_out;
3019         }
3020         return 0;
3021
3022 error_out:
3023         CERROR("Invalid param: %s\n", param);
3024         return -EINVAL;
3025 }
3026
3027 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3028                                   const char *svname,
3029                                   char *param)
3030 {
3031         struct sptlrpc_rule      rule;
3032         struct sptlrpc_rule_set *rset;
3033         int                      rc;
3034         ENTRY;
3035
3036         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3037                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3038                 RETURN(-EINVAL);
3039         }
3040
3041         if (strncmp(param, PARAM_SRPC_UDESC,
3042                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3043                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3044         }
3045
3046         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3047                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3048                 RETURN(-EINVAL);
3049         }
3050
3051         param += sizeof(PARAM_SRPC_FLVR) - 1;
3052
3053         rc = sptlrpc_parse_rule(param, &rule);
3054         if (rc)
3055                 RETURN(rc);
3056
3057         /* mgs rules implies must be mgc->mgs */
3058         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3059                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3060                      rule.sr_from != LUSTRE_SP_ANY) ||
3061                     (rule.sr_to != LUSTRE_SP_MGS &&
3062                      rule.sr_to != LUSTRE_SP_ANY))
3063                         RETURN(-EINVAL);
3064         }
3065
3066         /* preapre room for this coming rule. svcname format should be:
3067          * - fsname: general rule
3068          * - fsname-tgtname: target-specific rule
3069          */
3070         if (strchr(svname, '-')) {
3071                 struct mgs_tgt_srpc_conf *tgtconf;
3072                 int                       found = 0;
3073
3074                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3075                      tgtconf = tgtconf->mtsc_next) {
3076                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3077                                 found = 1;
3078                                 break;
3079                         }
3080                 }
3081
3082                 if (!found) {
3083                         int name_len;
3084
3085                         OBD_ALLOC_PTR(tgtconf);
3086                         if (tgtconf == NULL)
3087                                 RETURN(-ENOMEM);
3088
3089                         name_len = strlen(svname);
3090
3091                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3092                         if (tgtconf->mtsc_tgt == NULL) {
3093                                 OBD_FREE_PTR(tgtconf);
3094                                 RETURN(-ENOMEM);
3095                         }
3096                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3097
3098                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3099                         fsdb->fsdb_srpc_tgt = tgtconf;
3100                 }
3101
3102                 rset = &tgtconf->mtsc_rset;
3103         } else {
3104                 rset = &fsdb->fsdb_srpc_gen;
3105         }
3106
3107         rc = sptlrpc_rule_set_merge(rset, &rule);
3108
3109         RETURN(rc);
3110 }
3111
3112 static int mgs_srpc_set_param(const struct lu_env *env,
3113                               struct mgs_device *mgs,
3114                               struct fs_db *fsdb,
3115                               struct mgs_target_info *mti,
3116                               char *param)
3117 {
3118         char                   *copy;
3119         int                     rc, copy_size;
3120         ENTRY;
3121
3122 #ifndef HAVE_GSS
3123         RETURN(-EINVAL);
3124 #endif
3125         /* keep a copy of original param, which could be destroied
3126          * during parsing */
3127         copy_size = strlen(param) + 1;
3128         OBD_ALLOC(copy, copy_size);
3129         if (copy == NULL)
3130                 return -ENOMEM;
3131         memcpy(copy, param, copy_size);
3132
3133         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3134         if (rc)
3135                 goto out_free;
3136
3137         /* previous steps guaranteed the syntax is correct */
3138         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3139         if (rc)
3140                 goto out_free;
3141
3142         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3143                 /*
3144                  * for mgs rules, make them effective immediately.
3145                  */
3146                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3147                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3148                                                  &fsdb->fsdb_srpc_gen);
3149         }
3150
3151 out_free:
3152         OBD_FREE(copy, copy_size);
3153         RETURN(rc);
3154 }
3155
3156 struct mgs_srpc_read_data {
3157         struct fs_db   *msrd_fsdb;
3158         int             msrd_skip;
3159 };
3160
3161 static int mgs_srpc_read_handler(const struct lu_env *env,
3162                                  struct llog_handle *llh,
3163                                  struct llog_rec_hdr *rec, void *data)
3164 {
3165         struct mgs_srpc_read_data *msrd = data;
3166         struct cfg_marker         *marker;
3167         struct lustre_cfg         *lcfg = REC_DATA(rec);
3168         char                      *svname, *param;
3169         int                        cfg_len, rc;
3170         ENTRY;
3171
3172         if (rec->lrh_type != OBD_CFG_REC) {
3173                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3174                 RETURN(-EINVAL);
3175         }
3176
3177         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3178                   sizeof(struct llog_rec_tail);
3179
3180         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3181         if (rc) {
3182                 CERROR("Insane cfg\n");
3183                 RETURN(rc);
3184         }
3185
3186         if (lcfg->lcfg_command == LCFG_MARKER) {
3187                 marker = lustre_cfg_buf(lcfg, 1);
3188
3189                 if (marker->cm_flags & CM_START &&
3190                     marker->cm_flags & CM_SKIP)
3191                         msrd->msrd_skip = 1;
3192                 if (marker->cm_flags & CM_END)
3193                         msrd->msrd_skip = 0;
3194
3195                 RETURN(0);
3196         }
3197
3198         if (msrd->msrd_skip)
3199                 RETURN(0);
3200
3201         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3202                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3203                 RETURN(0);
3204         }
3205
3206         svname = lustre_cfg_string(lcfg, 0);
3207         if (svname == NULL) {
3208                 CERROR("svname is empty\n");
3209                 RETURN(0);
3210         }
3211
3212         param = lustre_cfg_string(lcfg, 1);
3213         if (param == NULL) {
3214                 CERROR("param is empty\n");
3215                 RETURN(0);
3216         }
3217
3218         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3219         if (rc)
3220                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3221
3222         RETURN(0);
3223 }
3224
3225 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3226                                 struct mgs_device *mgs,
3227                                 struct fs_db *fsdb)
3228 {
3229         struct llog_handle        *llh = NULL;
3230         struct llog_ctxt          *ctxt;
3231         char                      *logname;
3232         struct mgs_srpc_read_data  msrd;
3233         int                        rc;
3234         ENTRY;
3235
3236         /* construct log name */
3237         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3238         if (rc)
3239                 RETURN(rc);
3240
3241         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3242         LASSERT(ctxt != NULL);
3243
3244         if (mgs_log_is_empty(env, mgs, logname))
3245                 GOTO(out, rc = 0);
3246
3247         rc = llog_open(env, ctxt, &llh, NULL, logname,
3248                        LLOG_OPEN_EXISTS);
3249         if (rc < 0) {
3250                 if (rc == -ENOENT)
3251                         rc = 0;
3252                 GOTO(out, rc);
3253         }
3254
3255         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3256         if (rc)
3257                 GOTO(out_close, rc);
3258
3259         if (llog_get_size(llh) <= 1)
3260                 GOTO(out_close, rc = 0);
3261
3262         msrd.msrd_fsdb = fsdb;
3263         msrd.msrd_skip = 0;
3264
3265         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3266                           NULL);
3267
3268 out_close:
3269         llog_close(env, llh);
3270 out:
3271         llog_ctxt_put(ctxt);
3272         name_destroy(&logname);
3273
3274         if (rc)
3275                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3276         RETURN(rc);
3277 }
3278
3279 /* Permanent settings of all parameters by writing into the appropriate
3280  * configuration logs.
3281  * A parameter with null value ("<param>='\0'") means to erase it out of
3282  * the logs.
3283  */
3284 static int mgs_write_log_param(const struct lu_env *env,
3285                                struct mgs_device *mgs, struct fs_db *fsdb,
3286                                struct mgs_target_info *mti, char *ptr)
3287 {
3288         struct mgs_thread_info *mgi = mgs_env_info(env);
3289         char *logname;
3290         char *tmp;
3291         int rc = 0, rc2 = 0;
3292         ENTRY;
3293
3294         /* For various parameter settings, we have to figure out which logs
3295            care about them (e.g. both mdt and client for lov settings) */
3296         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3297
3298         /* The params are stored in MOUNT_DATA_FILE and modified via
3299            tunefs.lustre, or set using lctl conf_param */
3300
3301         /* Processed in lustre_start_mgc */
3302         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3303                 GOTO(end, rc);
3304
3305         /* Processed in ost/mdt */
3306         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3307                 GOTO(end, rc);
3308
3309         /* Processed in mgs_write_log_ost */
3310         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3311                 if (mti->mti_flags & LDD_F_PARAM) {
3312                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3313                                            "changed with tunefs.lustre"
3314                                            "and --writeconf\n", ptr);
3315                         rc = -EPERM;
3316                 }
3317                 GOTO(end, rc);
3318         }
3319
3320         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3321                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3322                 GOTO(end, rc);
3323         }
3324
3325         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3326                 /* Add a failover nidlist */
3327                 rc = 0;
3328                 /* We already processed failovers params for new
3329                    targets in mgs_write_log_target */
3330                 if (mti->mti_flags & LDD_F_PARAM) {
3331                         CDEBUG(D_MGS, "Adding failnode\n");
3332                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3333                 }
3334                 GOTO(end, rc);
3335         }
3336
3337         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3338                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3339                 GOTO(end, rc);
3340         }
3341
3342         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3343                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3344                 GOTO(end, rc);
3345         }
3346
3347         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3348                 /* active=0 means off, anything else means on */
3349                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3350                 int i;
3351
3352                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3353                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3354                                            "be (de)activated.\n",
3355                                            mti->mti_svname);
3356                         GOTO(end, rc = -EINVAL);
3357                 }
3358                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3359                               flag ? "de": "re", mti->mti_svname);
3360                 /* Modify clilov */
3361                 rc = name_create(&logname, mti->mti_fsname, "-client");
3362                 if (rc)
3363                         GOTO(end, rc);
3364                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3365                                 mti->mti_svname, "add osc", flag);
3366                 name_destroy(&logname);
3367                 if (rc)
3368                         goto active_err;
3369                 /* Modify mdtlov */
3370                 /* Add to all MDT logs for CMD */
3371                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3372                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3373                                 continue;
3374                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3375                         if (rc)
3376                                 GOTO(end, rc);
3377                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3378                                         mti->mti_svname, "add osc", flag);
3379                         name_destroy(&logname);
3380                         if (rc)
3381                                 goto active_err;
3382                 }
3383         active_err:
3384                 if (rc) {
3385                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3386                                            "log (%d). No permanent "
3387                                            "changes were made to the "
3388                                            "config log.\n",
3389                                            mti->mti_svname, rc);
3390                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3391                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3392                                                    " because the log"
3393                                                    "is in the old 1.4"
3394                                                    "style. Consider "
3395                                                    " --writeconf to "
3396                                                    "update the logs.\n");
3397                         GOTO(end, rc);
3398                 }
3399                 /* Fall through to osc proc for deactivating live OSC
3400                    on running MDT / clients. */
3401         }
3402         /* Below here, let obd's XXX_process_config methods handle it */
3403
3404         /* All lov. in proc */
3405         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3406                 char *mdtlovname;
3407
3408                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3409                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3410                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3411                                            "set on the MDT, not %s. "
3412                                            "Ignoring.\n",
3413                                            mti->mti_svname);
3414                         GOTO(end, rc = 0);
3415                 }
3416
3417                 /* Modify mdtlov */
3418                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3419                         GOTO(end, rc = -ENODEV);
3420
3421                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3422                                              mti->mti_stripe_index);
3423                 if (rc)
3424                         GOTO(end, rc);
3425                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3426                                   &mgi->mgi_bufs, mdtlovname, ptr);
3427                 name_destroy(&logname);
3428                 name_destroy(&mdtlovname);
3429                 if (rc)
3430                         GOTO(end, rc);
3431
3432                 /* Modify clilov */
3433                 rc = name_create(&logname, mti->mti_fsname, "-client");
3434                 if (rc)
3435                         GOTO(end, rc);
3436                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3437                                   fsdb->fsdb_clilov, ptr);
3438                 name_destroy(&logname);
3439                 GOTO(end, rc);
3440         }
3441
3442         /* All osc., mdc., llite. params in proc */
3443         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3444             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3445             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3446                 char *cname;
3447
3448                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3449                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3450                                            " cannot be modified. Consider"
3451                                            " updating the configuration with"
3452                                            " --writeconf\n",
3453                                            mti->mti_svname);
3454                         GOTO(end, rc = -EINVAL);
3455                 }
3456                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3457                         rc = name_create(&cname, mti->mti_fsname, "-client");
3458                         /* Add the client type to match the obdname in
3459                            class_config_llog_handler */
3460                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3461                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3462                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3463                         rc = name_create(&cname, mti->mti_svname, "-osc");
3464                 } else {
3465                         GOTO(end, rc = -EINVAL);
3466                 }
3467                 if (rc)
3468                         GOTO(end, rc);
3469
3470                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3471
3472                 /* Modify client */
3473                 rc = name_create(&logname, mti->mti_fsname, "-client");
3474                 if (rc) {
3475                         name_destroy(&cname);
3476                         GOTO(end, rc);
3477                 }
3478                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3479                                   cname, ptr);
3480
3481                 /* osc params affect the MDT as well */
3482                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3483                         int i;
3484
3485                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3486                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3487                                         continue;
3488                                 name_destroy(&cname);
3489                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3490                                                          fsdb, i);
3491                                 name_destroy(&logname);
3492                                 if (rc)
3493                                         break;
3494                                 rc = name_create_mdt(&logname,
3495                                                      mti->mti_fsname, i);
3496                                 if (rc)
3497                                         break;
3498                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3499                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3500                                                           mti, logname,
3501                                                           &mgi->mgi_bufs,
3502                                                           cname, ptr);
3503                                         if (rc)
3504                                                 break;
3505                                 }
3506                         }
3507                 }
3508                 name_destroy(&logname);
3509                 name_destroy(&cname);
3510                 GOTO(end, rc);
3511         }
3512
3513         /* All mdt. params in proc */
3514         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3515                 int i;
3516                 __u32 idx;
3517
3518                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3519                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3520                             MTI_NAME_MAXLEN) == 0)
3521                         /* device is unspecified completely? */
3522                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3523                 else
3524                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3525                 if (rc < 0)
3526                         goto active_err;
3527                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3528                         goto active_err;
3529                 if (rc & LDD_F_SV_ALL) {
3530                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3531                                 if (!test_bit(i,
3532                                                   fsdb->fsdb_mdt_index_map))
3533                                         continue;
3534                                 rc = name_create_mdt(&logname,
3535                                                 mti->mti_fsname, i);
3536                                 if (rc)
3537                                         goto active_err;
3538                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3539                                                   logname, &mgi->mgi_bufs,
3540                                                   logname, ptr);
3541                                 name_destroy(&logname);
3542                                 if (rc)
3543                                         goto active_err;
3544                         }
3545                 } else {
3546                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3547                                           mti->mti_svname, &mgi->mgi_bufs,
3548                                           mti->mti_svname, ptr);
3549                         if (rc)
3550                                 goto active_err;
3551                 }
3552                 GOTO(end, rc);
3553         }
3554
3555         /* All mdd., ost. params in proc */
3556         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3557             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
3558                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3559                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3560                         GOTO(end, rc = -ENODEV);
3561
3562                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3563                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3564                 GOTO(end, rc);
3565         }
3566
3567         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3568         rc2 = -ENOSYS;
3569
3570 end:
3571         if (rc)
3572                 CERROR("err %d on param '%s'\n", rc, ptr);
3573
3574         RETURN(rc ?: rc2);
3575 }
3576
3577 /* Not implementing automatic failover nid addition at this time. */
3578 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3579                       struct mgs_target_info *mti)
3580 {
3581 #if 0
3582         struct fs_db *fsdb;
3583         int rc;
3584         ENTRY;
3585
3586         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3587         if (rc)
3588                 RETURN(rc);
3589
3590         if (mgs_log_is_empty(obd, mti->mti_svname))
3591                 /* should never happen */
3592                 RETURN(-ENOENT);
3593
3594         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3595
3596         /* FIXME We can just check mti->params to see if we're already in
3597            the failover list.  Modify mti->params for rewriting back at
3598            server_register_target(). */
3599
3600         mutex_lock(&fsdb->fsdb_mutex);
3601         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3602         mutex_unlock(&fsdb->fsdb_mutex);
3603
3604         RETURN(rc);
3605 #endif
3606         return 0;
3607 }
3608
3609 int mgs_write_log_target(const struct lu_env *env,
3610                          struct mgs_device *mgs,
3611                          struct mgs_target_info *mti,
3612                          struct fs_db *fsdb)
3613 {
3614         int rc = -EINVAL;
3615         char *buf, *params;
3616         ENTRY;
3617
3618         /* set/check the new target index */
3619         rc = mgs_set_index(env, mgs, mti);
3620         if (rc < 0) {
3621                 CERROR("Can't get index (%d)\n", rc);
3622                 RETURN(rc);
3623         }
3624
3625         if (rc == EALREADY) {
3626                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3627                               mti->mti_stripe_index, mti->mti_svname);
3628                 /* We would like to mark old log sections as invalid
3629                    and add new log sections in the client and mdt logs.
3630                    But if we add new sections, then live clients will
3631                    get repeat setup instructions for already running
3632                    osc's. So don't update the client/mdt logs. */
3633                 mti->mti_flags &= ~LDD_F_UPDATE;
3634         }
3635
3636         mutex_lock(&fsdb->fsdb_mutex);
3637
3638         if (mti->mti_flags &
3639             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3640                 /* Generate a log from scratch */
3641                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3642                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3643                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3644                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3645                 } else {
3646                         CERROR("Unknown target type %#x, can't create log for "
3647                                "%s\n", mti->mti_flags, mti->mti_svname);
3648                 }
3649                 if (rc) {
3650                         CERROR("Can't write logs for %s (%d)\n",
3651                                mti->mti_svname, rc);
3652                         GOTO(out_up, rc);
3653                 }
3654         } else {
3655                 /* Just update the params from tunefs in mgs_write_log_params */
3656                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3657                 mti->mti_flags |= LDD_F_PARAM;
3658         }
3659
3660         /* allocate temporary buffer, where class_get_next_param will
3661            make copy of a current  parameter */
3662         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3663         if (buf == NULL)
3664                 GOTO(out_up, rc = -ENOMEM);
3665         params = mti->mti_params;
3666         while (params != NULL) {
3667                 rc = class_get_next_param(&params, buf);
3668                 if (rc) {
3669                         if (rc == 1)
3670                                 /* there is no next parameter, that is
3671                                    not an error */
3672                                 rc = 0;
3673                         break;
3674                 }
3675                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3676                        params, buf);
3677                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3678                 if (rc)
3679                         break;
3680         }
3681
3682         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3683
3684 out_up:
3685         mutex_unlock(&fsdb->fsdb_mutex);
3686         RETURN(rc);
3687 }
3688
3689 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3690 {
3691         struct llog_ctxt        *ctxt;
3692         int                      rc = 0;
3693
3694         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3695         if (ctxt == NULL) {
3696                 CERROR("%s: MGS config context doesn't exist\n",
3697                        mgs->mgs_obd->obd_name);
3698                 rc = -ENODEV;
3699         } else {
3700                 rc = llog_erase(env, ctxt, NULL, name);
3701                 /* llog may not exist */
3702                 if (rc == -ENOENT)
3703                         rc = 0;
3704                 llog_ctxt_put(ctxt);
3705         }
3706
3707         if (rc)
3708                 CERROR("%s: failed to clear log %s: %d\n",
3709                        mgs->mgs_obd->obd_name, name, rc);
3710
3711         return rc;
3712 }
3713
3714 /* erase all logs for the given fs */
3715 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3716 {
3717         struct fs_db *fsdb;
3718         cfs_list_t list;
3719         struct mgs_direntry *dirent, *n;
3720         int rc, len = strlen(fsname);
3721         char *suffix;
3722         ENTRY;
3723
3724         /* Find all the logs in the CONFIGS directory */
3725         rc = class_dentry_readdir(env, mgs, &list);
3726         if (rc)
3727                 RETURN(rc);
3728
3729         mutex_lock(&mgs->mgs_mutex);
3730
3731         /* Delete the fs db */
3732         fsdb = mgs_find_fsdb(mgs, fsname);
3733         if (fsdb)
3734                 mgs_free_fsdb(mgs, fsdb);
3735
3736         mutex_unlock(&mgs->mgs_mutex);
3737
3738         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3739                 cfs_list_del(&dirent->list);
3740                 suffix = strrchr(dirent->name, '-');
3741                 if (suffix != NULL) {
3742                         if ((len == suffix - dirent->name) &&
3743                             (strncmp(fsname, dirent->name, len) == 0)) {
3744                                 CDEBUG(D_MGS, "Removing log %s\n",
3745                                        dirent->name);
3746                                 mgs_erase_log(env, mgs, dirent->name);
3747                         }
3748                 }
3749                 mgs_direntry_free(dirent);
3750         }
3751
3752         RETURN(rc);
3753 }
3754
3755 /* from llog_swab */
3756 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3757 {
3758         int i;
3759         ENTRY;
3760
3761         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3762         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3763
3764         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3765         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3766         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3767         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3768
3769         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3770         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3771                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3772                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3773                                i, lcfg->lcfg_buflens[i],
3774                                lustre_cfg_string(lcfg, i));
3775                 }
3776         EXIT;
3777 }
3778
3779 /* Set a permanent (config log) param for a target or fs
3780  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3781  *             buf1 contains the single parameter
3782  */
3783 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3784                  struct lustre_cfg *lcfg, char *fsname)
3785 {
3786         struct fs_db *fsdb;
3787         struct mgs_target_info *mti;
3788         char *devname, *param;
3789         char *ptr, *tmp;
3790         __u32 index;
3791         int rc = 0;
3792         ENTRY;
3793
3794         print_lustre_cfg(lcfg);
3795
3796         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3797         devname = lustre_cfg_string(lcfg, 0);
3798         param = lustre_cfg_string(lcfg, 1);
3799         if (!devname) {
3800                 /* Assume device name embedded in param:
3801                    lustre-OST0000.osc.max_dirty_mb=32 */
3802                 ptr = strchr(param, '.');
3803                 if (ptr) {
3804                         devname = param;
3805                         *ptr = 0;
3806                         param = ptr + 1;
3807                 }
3808         }
3809         if (!devname) {
3810                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3811                 RETURN(-ENOSYS);
3812         }
3813
3814         rc = mgs_parse_devname(devname, fsname, NULL);
3815         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3816                 /* param related to llite isn't allowed to set by OST or MDT */
3817                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3818                                    sizeof(PARAM_LLITE)) == 0)
3819                         RETURN(-EINVAL);
3820         } else {
3821                 /* assume devname is the fsname */
3822                 memset(fsname, 0, MTI_NAME_MAXLEN);
3823                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3824                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3825         }
3826         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3827
3828         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3829         if (rc)
3830                 RETURN(rc);
3831         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3832             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3833                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3834                        "is '%s'\n", fsname, devname);
3835                 mgs_free_fsdb(mgs, fsdb);
3836                 RETURN(-EINVAL);
3837         }
3838
3839         /* Create a fake mti to hold everything */
3840         OBD_ALLOC_PTR(mti);
3841         if (!mti)
3842                 GOTO(out, rc = -ENOMEM);
3843         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3844             >= sizeof(mti->mti_fsname))
3845                 GOTO(out, rc = -E2BIG);
3846         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3847             >= sizeof(mti->mti_svname))
3848                 GOTO(out, rc = -E2BIG);
3849         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3850             >= sizeof(mti->mti_params))
3851                 GOTO(out, rc = -E2BIG);
3852         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3853         if (rc < 0)
3854                 /* Not a valid server; may be only fsname */
3855                 rc = 0;
3856         else
3857                 /* Strip -osc or -mdc suffix from svname */
3858                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3859                                      mti->mti_svname))
3860                         GOTO(out, rc = -EINVAL);
3861
3862         mti->mti_flags = rc | LDD_F_PARAM;
3863
3864         mutex_lock(&fsdb->fsdb_mutex);
3865         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3866         mutex_unlock(&fsdb->fsdb_mutex);
3867
3868         /*
3869          * Revoke lock so everyone updates.  Should be alright if
3870          * someone was already reading while we were updating the logs,
3871          * so we don't really need to hold the lock while we're
3872          * writing (above).
3873          */
3874         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3875 out:
3876         OBD_FREE_PTR(mti);
3877         RETURN(rc);
3878 }
3879
3880 static int mgs_write_log_pool(const struct lu_env *env,
3881                               struct mgs_device *mgs, char *logname,
3882                               struct fs_db *fsdb, char *lovname,
3883                               enum lcfg_command_type cmd,
3884                               char *poolname, char *fsname,
3885                               char *ostname, char *comment)
3886 {
3887         struct llog_handle *llh = NULL;
3888         int rc;
3889
3890         rc = record_start_log(env, mgs, &llh, logname);
3891         if (rc)
3892                 return rc;
3893         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3894         if (rc)
3895                 goto out;
3896         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3897         if (rc)
3898                 goto out;
3899         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3900 out:
3901         record_end_log(env, &llh);
3902         return rc;
3903 }
3904
3905 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3906                  enum lcfg_command_type cmd, char *fsname,
3907                  char *poolname, char *ostname)
3908 {
3909         struct fs_db *fsdb;
3910         char *lovname;
3911         char *logname;
3912         char *label = NULL, *canceled_label = NULL;
3913         int label_sz;
3914         struct mgs_target_info *mti = NULL;
3915         int rc, i;
3916         ENTRY;
3917
3918         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3919         if (rc) {
3920                 CERROR("Can't get db for %s\n", fsname);
3921                 RETURN(rc);
3922         }
3923         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3924                 CERROR("%s is not defined\n", fsname);
3925                 mgs_free_fsdb(mgs, fsdb);
3926                 RETURN(-EINVAL);
3927         }
3928
3929         label_sz = 10 + strlen(fsname) + strlen(poolname);
3930
3931         /* check if ostname match fsname */
3932         if (ostname != NULL) {
3933                 char *ptr;
3934
3935                 ptr = strrchr(ostname, '-');
3936                 if ((ptr == NULL) ||
3937                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3938                         RETURN(-EINVAL);
3939                 label_sz += strlen(ostname);
3940         }
3941
3942         OBD_ALLOC(label, label_sz);
3943         if (label == NULL)
3944                 RETURN(-ENOMEM);
3945
3946         switch(cmd) {
3947         case LCFG_POOL_NEW:
3948                 sprintf(label,
3949                         "new %s.%s", fsname, poolname);
3950                 break;
3951         case LCFG_POOL_ADD:
3952                 sprintf(label,
3953                         "add %s.%s.%s", fsname, poolname, ostname);
3954                 break;
3955         case LCFG_POOL_REM:
3956                 OBD_ALLOC(canceled_label, label_sz);
3957                 if (canceled_label == NULL)
3958                         GOTO(out_label, rc = -ENOMEM);
3959                 sprintf(label,
3960                         "rem %s.%s.%s", fsname, poolname, ostname);
3961                 sprintf(canceled_label,
3962                         "add %s.%s.%s", fsname, poolname, ostname);
3963                 break;
3964         case LCFG_POOL_DEL:
3965                 OBD_ALLOC(canceled_label, label_sz);
3966                 if (canceled_label == NULL)
3967                         GOTO(out_label, rc = -ENOMEM);
3968                 sprintf(label,
3969                         "del %s.%s", fsname, poolname);
3970                 sprintf(canceled_label,
3971                         "new %s.%s", fsname, poolname);
3972                 break;
3973         default:
3974                 break;
3975         }
3976
3977         mutex_lock(&fsdb->fsdb_mutex);
3978
3979         if (canceled_label != NULL) {
3980                 OBD_ALLOC_PTR(mti);
3981                 if (mti == NULL)
3982                         GOTO(out_cancel, rc = -ENOMEM);
3983         }
3984
3985         /* write pool def to all MDT logs */
3986         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3987                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3988                         rc = name_create_mdt_and_lov(&logname, &lovname,
3989                                                      fsdb, i);
3990                         if (rc) {
3991                                 mutex_unlock(&fsdb->fsdb_mutex);
3992                                 GOTO(out_mti, rc);
3993                         }
3994                         if (canceled_label != NULL) {
3995                                 strcpy(mti->mti_svname, "lov pool");
3996                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3997                                                 lovname, canceled_label,
3998                                                 CM_SKIP);
3999                         }
4000
4001                         if (rc >= 0)
4002                                 rc = mgs_write_log_pool(env, mgs, logname,
4003                                                         fsdb, lovname, cmd,
4004                                                         fsname, poolname,
4005                                                         ostname, label);
4006                         name_destroy(&logname);
4007                         name_destroy(&lovname);
4008                         if (rc) {
4009                                 mutex_unlock(&fsdb->fsdb_mutex);
4010                                 GOTO(out_mti, rc);
4011                         }
4012                 }
4013         }
4014
4015         rc = name_create(&logname, fsname, "-client");
4016         if (rc) {
4017                 mutex_unlock(&fsdb->fsdb_mutex);
4018                 GOTO(out_mti, rc);
4019         }
4020         if (canceled_label != NULL) {
4021                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4022                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4023                 if (rc < 0) {
4024                         mutex_unlock(&fsdb->fsdb_mutex);
4025                         name_destroy(&logname);
4026                         GOTO(out_mti, rc);
4027                 }
4028         }
4029
4030         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4031                                 cmd, fsname, poolname, ostname, label);
4032         mutex_unlock(&fsdb->fsdb_mutex);
4033         name_destroy(&logname);
4034         /* request for update */
4035         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4036
4037         EXIT;
4038 out_mti:
4039         if (mti != NULL)
4040                 OBD_FREE_PTR(mti);
4041 out_cancel:
4042         if (canceled_label != NULL)
4043                 OBD_FREE(canceled_label, label_sz);
4044 out_label:
4045         OBD_FREE(label, label_sz);
4046         return rc;
4047 }
4048
4049