Whamcloud - gitweb
LU-2237 tests: new test for re-recreating last_rcvd
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         mutex_init(&fsdb->fsdb_mutex);
349         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551                 if (fsdb->fsdb_mdt_count >= MAX_MDT_COUNT) {
552                         LCONSOLE_ERROR_MSG(0x13f, "The max mdt count"
553                                            "is %d\n", (int)MAX_MDT_COUNT);
554                         GOTO(out_up, rc = -ERANGE);
555                 }
556         } else {
557                 GOTO(out_up, rc = -EINVAL);
558         }
559
560         if (mti->mti_flags & LDD_F_NEED_INDEX) {
561                 rc = next_index(imap, INDEX_MAP_SIZE);
562                 if (rc == -1)
563                         GOTO(out_up, rc = -ERANGE);
564                 mti->mti_stripe_index = rc;
565                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
566                         fsdb->fsdb_mdt_count ++;
567         }
568
569         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
570                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
571                                    "but the max index is %d.\n",
572                                    mti->mti_svname, mti->mti_stripe_index,
573                                    INDEX_MAP_SIZE * 8);
574                 GOTO(out_up, rc = -ERANGE);
575         }
576
577         if (test_bit(mti->mti_stripe_index, imap)) {
578                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
579                     !(mti->mti_flags & LDD_F_WRITECONF)) {
580                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
581                                            "%d, but that index is already in "
582                                            "use. Use --writeconf to force\n",
583                                            mti->mti_svname,
584                                            mti->mti_stripe_index);
585                         GOTO(out_up, rc = -EADDRINUSE);
586                 } else {
587                         CDEBUG(D_MGS, "Server %s updating index %d\n",
588                                mti->mti_svname, mti->mti_stripe_index);
589                         GOTO(out_up, rc = EALREADY);
590                 }
591         }
592
593         set_bit(mti->mti_stripe_index, imap);
594         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
595         mutex_unlock(&fsdb->fsdb_mutex);
596         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
597                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
598
599         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
600                mti->mti_stripe_index);
601
602         RETURN(0);
603 out_up:
604         mutex_unlock(&fsdb->fsdb_mutex);
605         return rc;
606 }
607
608 struct mgs_modify_lookup {
609         struct cfg_marker mml_marker;
610         int               mml_modified;
611 };
612
613 static int mgs_modify_handler(const struct lu_env *env,
614                               struct llog_handle *llh,
615                               struct llog_rec_hdr *rec, void *data)
616 {
617         struct mgs_modify_lookup *mml = data;
618         struct cfg_marker *marker;
619         struct lustre_cfg *lcfg = (struct lustre_cfg *)(rec + 1);
620         int cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
621                 sizeof(struct llog_rec_tail);
622         int rc;
623         ENTRY;
624
625         if (rec->lrh_type != OBD_CFG_REC) {
626                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
627                 RETURN(-EINVAL);
628         }
629
630         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
631         if (rc) {
632                 CERROR("Insane cfg\n");
633                 RETURN(rc);
634         }
635
636         /* We only care about markers */
637         if (lcfg->lcfg_command != LCFG_MARKER)
638                 RETURN(0);
639
640         marker = lustre_cfg_buf(lcfg, 1);
641         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
642             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
643             !(marker->cm_flags & CM_SKIP)) {
644                 /* Found a non-skipped marker match */
645                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
646                        rec->lrh_index, marker->cm_step,
647                        marker->cm_flags, mml->mml_marker.cm_flags,
648                        marker->cm_tgtname, marker->cm_comment);
649                 /* Overwrite the old marker llog entry */
650                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
651                 marker->cm_flags |= mml->mml_marker.cm_flags;
652                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
653                 /* Header and tail are added back to lrh_len in
654                    llog_lvfs_write_rec */
655                 rec->lrh_len = cfg_len;
656                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
657                                 rec->lrh_index);
658                 if (!rc)
659                          mml->mml_modified++;
660         }
661
662         RETURN(rc);
663 }
664
665 /**
666  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
667  * Return code:
668  * 0 - modified successfully,
669  * 1 - no modification was done
670  * negative - error
671  */
672 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
673                       struct fs_db *fsdb, struct mgs_target_info *mti,
674                       char *logname, char *devname, char *comment, int flags)
675 {
676         struct llog_handle *loghandle;
677         struct llog_ctxt *ctxt;
678         struct mgs_modify_lookup *mml;
679         int rc;
680
681         ENTRY;
682
683         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
684         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
685                flags);
686
687         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
688         LASSERT(ctxt != NULL);
689         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
690         if (rc < 0) {
691                 if (rc == -ENOENT)
692                         rc = 0;
693                 GOTO(out_pop, rc);
694         }
695
696         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
697         if (rc)
698                 GOTO(out_close, rc);
699
700         if (llog_get_size(loghandle) <= 1)
701                 GOTO(out_close, rc = 0);
702
703         OBD_ALLOC_PTR(mml);
704         if (!mml)
705                 GOTO(out_close, rc = -ENOMEM);
706         strcpy(mml->mml_marker.cm_comment, comment);
707         strcpy(mml->mml_marker.cm_tgtname, devname);
708         /* Modify mostly means cancel */
709         mml->mml_marker.cm_flags = flags;
710         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
711         mml->mml_modified = 0;
712         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
713                           NULL);
714         if (!rc && !mml->mml_modified)
715                 rc = 1;
716         OBD_FREE_PTR(mml);
717
718 out_close:
719         llog_close(env, loghandle);
720 out_pop:
721         if (rc < 0)
722                 CERROR("%s: modify %s/%s failed: rc = %d\n",
723                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
724         llog_ctxt_put(ctxt);
725         RETURN(rc);
726 }
727
728 /******************** config log recording functions *********************/
729
730 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
731                        struct lustre_cfg *lcfg)
732 {
733         struct llog_rec_hdr      rec;
734         int                      buflen, rc;
735
736         if (!lcfg || !llh)
737                 return -ENOMEM;
738
739         LASSERT(llh->lgh_ctxt);
740
741         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
742                                 lcfg->lcfg_buflens);
743         rec.lrh_len = llog_data_len(buflen);
744         rec.lrh_type = OBD_CFG_REC;
745
746         /* idx = -1 means append */
747         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
748         if (rc)
749                 CERROR("failed %d\n", rc);
750         return rc;
751 }
752
753 static int record_base(const struct lu_env *env, struct llog_handle *llh,
754                      char *cfgname, lnet_nid_t nid, int cmd,
755                      char *s1, char *s2, char *s3, char *s4)
756 {
757         struct mgs_thread_info *mgi = mgs_env_info(env);
758         struct lustre_cfg     *lcfg;
759         int rc;
760
761         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
762                cmd, s1, s2, s3, s4);
763
764         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
765         if (s1)
766                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
767         if (s2)
768                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
769         if (s3)
770                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
771         if (s4)
772                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
773
774         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
775         if (!lcfg)
776                 return -ENOMEM;
777         lcfg->lcfg_nid = nid;
778
779         rc = record_lcfg(env, llh, lcfg);
780
781         lustre_cfg_free(lcfg);
782
783         if (rc) {
784                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
785                        cmd, s1, s2, s3, s4);
786         }
787         return rc;
788 }
789
790
791 static inline int record_add_uuid(const struct lu_env *env,
792                                   struct llog_handle *llh,
793                                   uint64_t nid, char *uuid)
794 {
795         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
796
797 }
798
799 static inline int record_add_conn(const struct lu_env *env,
800                                   struct llog_handle *llh,
801                                   char *devname, char *uuid)
802 {
803         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
804 }
805
806 static inline int record_attach(const struct lu_env *env,
807                                 struct llog_handle *llh, char *devname,
808                                 char *type, char *uuid)
809 {
810         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
811 }
812
813 static inline int record_setup(const struct lu_env *env,
814                                struct llog_handle *llh, char *devname,
815                                char *s1, char *s2, char *s3, char *s4)
816 {
817         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
818 }
819
820 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
821                             char *devname, struct lov_desc *desc)
822 {
823         struct mgs_thread_info *mgi = mgs_env_info(env);
824         struct lustre_cfg *lcfg;
825         int rc;
826
827         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
828         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
829         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
830         if (!lcfg)
831                 return -ENOMEM;
832         rc = record_lcfg(env, llh, lcfg);
833
834         lustre_cfg_free(lcfg);
835         return rc;
836 }
837
838 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
839                             char *devname, struct lmv_desc *desc)
840 {
841         struct mgs_thread_info *mgi = mgs_env_info(env);
842         struct lustre_cfg *lcfg;
843         int rc;
844
845         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
846         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
847         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
848
849         rc = record_lcfg(env, llh, lcfg);
850
851         lustre_cfg_free(lcfg);
852         return rc;
853 }
854
855 static inline int record_mdc_add(const struct lu_env *env,
856                                  struct llog_handle *llh,
857                                  char *logname, char *mdcuuid,
858                                  char *mdtuuid, char *index,
859                                  char *gen)
860 {
861         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
862                            mdtuuid,index,gen,mdcuuid);
863 }
864
865 static inline int record_lov_add(const struct lu_env *env,
866                                  struct llog_handle *llh,
867                                  char *lov_name, char *ost_uuid,
868                                  char *index, char *gen)
869 {
870         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
871                            ost_uuid,index,gen,0);
872 }
873
874 static inline int record_mount_opt(const struct lu_env *env,
875                                    struct llog_handle *llh,
876                                    char *profile, char *lov_name,
877                                    char *mdc_name)
878 {
879         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
880                            profile,lov_name,mdc_name,0);
881 }
882
883 static int record_marker(const struct lu_env *env,
884                          struct llog_handle *llh,
885                          struct fs_db *fsdb, __u32 flags,
886                          char *tgtname, char *comment)
887 {
888         struct mgs_thread_info *mgi = mgs_env_info(env);
889         struct lustre_cfg *lcfg;
890         int rc;
891
892         if (flags & CM_START)
893                 fsdb->fsdb_gen++;
894         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
895         mgi->mgi_marker.cm_flags = flags;
896         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
897         strncpy(mgi->mgi_marker.cm_tgtname, tgtname,
898                 sizeof(mgi->mgi_marker.cm_tgtname));
899         strncpy(mgi->mgi_marker.cm_comment, comment,
900                 sizeof(mgi->mgi_marker.cm_comment));
901         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
902         mgi->mgi_marker.cm_canceltime = 0;
903         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
904         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
905                             sizeof(mgi->mgi_marker));
906         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
907         if (!lcfg)
908                 return -ENOMEM;
909         rc = record_lcfg(env, llh, lcfg);
910
911         lustre_cfg_free(lcfg);
912         return rc;
913 }
914
915 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
916                             struct llog_handle **llh, char *name)
917 {
918         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
919         struct llog_ctxt        *ctxt;
920         int                      rc = 0;
921
922         if (*llh)
923                 GOTO(out, rc = -EBUSY);
924
925         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
926         if (!ctxt)
927                 GOTO(out, rc = -ENODEV);
928         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
929
930         rc = llog_open_create(env, ctxt, llh, NULL, name);
931         if (rc)
932                 GOTO(out_ctxt, rc);
933         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
934         if (rc)
935                 llog_close(env, *llh);
936 out_ctxt:
937         llog_ctxt_put(ctxt);
938 out:
939         if (rc) {
940                 CERROR("%s: can't start log %s: rc = %d\n",
941                        mgs->mgs_obd->obd_name, name, rc);
942                 *llh = NULL;
943         }
944         RETURN(rc);
945 }
946
947 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
948 {
949         int rc;
950
951         rc = llog_close(env, *llh);
952         *llh = NULL;
953
954         return rc;
955 }
956
957 static int mgs_log_is_empty(const struct lu_env *env,
958                             struct mgs_device *mgs, char *name)
959 {
960         struct llog_handle *llh;
961         struct llog_ctxt *ctxt;
962         int rc = 0;
963
964         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
965         LASSERT(ctxt != NULL);
966         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
967         if (rc < 0) {
968                 if (rc == -ENOENT)
969                         rc = 0;
970                 GOTO(out_ctxt, rc);
971         }
972
973         llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
974         if (rc)
975                 GOTO(out_close, rc);
976         rc = llog_get_size(llh);
977
978 out_close:
979         llog_close(env, llh);
980 out_ctxt:
981         llog_ctxt_put(ctxt);
982         /* header is record 1 */
983         return (rc <= 1);
984 }
985
986 /******************** config "macros" *********************/
987
988 /* write an lcfg directly into a log (with markers) */
989 static int mgs_write_log_direct(const struct lu_env *env,
990                                 struct mgs_device *mgs, struct fs_db *fsdb,
991                                 char *logname, struct lustre_cfg *lcfg,
992                                 char *devname, char *comment)
993 {
994         struct llog_handle *llh = NULL;
995         int rc;
996         ENTRY;
997
998         if (!lcfg)
999                 RETURN(-ENOMEM);
1000
1001         rc = record_start_log(env, mgs, &llh, logname);
1002         if (rc)
1003                 RETURN(rc);
1004
1005         /* FIXME These should be a single journal transaction */
1006         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1007         if (rc)
1008                 GOTO(out_end, rc);
1009         rc = record_lcfg(env, llh, lcfg);
1010         if (rc)
1011                 GOTO(out_end, rc);
1012         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1013         if (rc)
1014                 GOTO(out_end, rc);
1015 out_end:
1016         record_end_log(env, &llh);
1017         RETURN(rc);
1018 }
1019
1020 /* write the lcfg in all logs for the given fs */
1021 int mgs_write_log_direct_all(const struct lu_env *env,
1022                              struct mgs_device *mgs,
1023                              struct fs_db *fsdb,
1024                              struct mgs_target_info *mti,
1025                              struct lustre_cfg *lcfg,
1026                              char *devname, char *comment,
1027                              int server_only)
1028 {
1029         cfs_list_t list;
1030         struct mgs_direntry *dirent, *n;
1031         char *fsname = mti->mti_fsname;
1032         char *logname;
1033         int rc = 0, len = strlen(fsname);
1034         ENTRY;
1035
1036         /* We need to set params for any future logs
1037            as well. FIXME Append this file to every new log.
1038            Actually, we should store as params (text), not llogs.  Or
1039            in a database. */
1040         rc = name_create(&logname, fsname, "-params");
1041         if (rc)
1042                 RETURN(rc);
1043         if (mgs_log_is_empty(env, mgs, logname)) {
1044                 struct llog_handle *llh = NULL;
1045                 rc = record_start_log(env, mgs, &llh, logname);
1046                 record_end_log(env, &llh);
1047         }
1048         name_destroy(&logname);
1049         if (rc)
1050                 RETURN(rc);
1051
1052         /* Find all the logs in the CONFIGS directory */
1053         rc = class_dentry_readdir(env, mgs, &list);
1054         if (rc)
1055                 RETURN(rc);
1056
1057         /* Could use fsdb index maps instead of directory listing */
1058         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1059                 cfs_list_del(&dirent->list);
1060                 /* don't write to sptlrpc rule log */
1061                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1062                         goto next;
1063
1064                 /* caller wants write server logs only */
1065                 if (server_only && strstr(dirent->name, "-client") != NULL)
1066                         goto next;
1067
1068                 if (strncmp(fsname, dirent->name, len) == 0) {
1069                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1070                         /* Erase any old settings of this same parameter */
1071                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1072                                         devname, comment, CM_SKIP);
1073                         if (rc < 0)
1074                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1075                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1076                         /* Write the new one */
1077                         if (lcfg) {
1078                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1079                                                           dirent->name,
1080                                                           lcfg, devname,
1081                                                           comment);
1082                                 if (rc)
1083                                         CERROR("%s: writing log %s: rc = %d\n",
1084                                                mgs->mgs_obd->obd_name,
1085                                                dirent->name, rc);
1086                         }
1087                 }
1088 next:
1089                 mgs_direntry_free(dirent);
1090         }
1091
1092         RETURN(rc);
1093 }
1094
1095 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1096                                     struct mgs_device *mgs,
1097                                     struct fs_db *fsdb,
1098                                     struct mgs_target_info *mti,
1099                                     char *logname);
1100 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1101                                     struct mgs_device *mgs,
1102                                     struct fs_db *fsdb,
1103                                     struct mgs_target_info *mti,
1104                                     char *logname, char *suffix, char *lovname,
1105                                     enum lustre_sec_part sec_part, int flags);
1106 static int name_create_mdt_and_lov(char **logname, char **lovname,
1107                                    struct fs_db *fsdb, int i);
1108
1109 static int add_param(char *params, char *key, char *val)
1110 {
1111         char *start = params + strlen(params);
1112         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1113         int keylen = 0;
1114
1115         if (key != NULL)
1116                 keylen = strlen(key);
1117         if (start + 1 + keylen + strlen(val) >= end) {
1118                 CERROR("params are too long: %s %s%s\n",
1119                        params, key != NULL ? key : "", val);
1120                 return -EINVAL;
1121         }
1122
1123         sprintf(start, " %s%s", key != NULL ? key : "", val);
1124         return 0;
1125 }
1126
1127 static int mgs_steal_llog_handler(const struct lu_env *env,
1128                                   struct llog_handle *llh,
1129                                   struct llog_rec_hdr *rec, void *data)
1130 {
1131         struct mgs_device *mgs;
1132         struct obd_device *obd;
1133         struct mgs_target_info *mti, *tmti;
1134         struct fs_db *fsdb;
1135         int cfg_len = rec->lrh_len;
1136         char *cfg_buf = (char*) (rec + 1);
1137         struct lustre_cfg *lcfg;
1138         int rc = 0;
1139         struct llog_handle *mdt_llh = NULL;
1140         static int got_an_osc_or_mdc = 0;
1141         /* 0: not found any osc/mdc;
1142            1: found osc;
1143            2: found mdc;
1144         */
1145         static int last_step = -1;
1146
1147         ENTRY;
1148
1149         mti = ((struct temp_comp*)data)->comp_mti;
1150         tmti = ((struct temp_comp*)data)->comp_tmti;
1151         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1152         obd = ((struct temp_comp *)data)->comp_obd;
1153         mgs = lu2mgs_dev(obd->obd_lu_dev);
1154         LASSERT(mgs);
1155
1156         if (rec->lrh_type != OBD_CFG_REC) {
1157                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1158                 RETURN(-EINVAL);
1159         }
1160
1161         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1162         if (rc) {
1163                 CERROR("Insane cfg\n");
1164                 RETURN(rc);
1165         }
1166
1167         lcfg = (struct lustre_cfg *)cfg_buf;
1168
1169         if (lcfg->lcfg_command == LCFG_MARKER) {
1170                 struct cfg_marker *marker;
1171                 marker = lustre_cfg_buf(lcfg, 1);
1172                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1173                     (marker->cm_flags & CM_START)){
1174                         got_an_osc_or_mdc = 1;
1175                         strncpy(tmti->mti_svname, marker->cm_tgtname,
1176                                 sizeof(tmti->mti_svname));
1177                         rc = record_start_log(env, mgs, &mdt_llh,
1178                                               mti->mti_svname);
1179                         if (rc)
1180                                 RETURN(rc);
1181                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1182                                            mti->mti_svname,"add osc(copied)");
1183                         record_end_log(env, &mdt_llh);
1184                         last_step = marker->cm_step;
1185                         RETURN(rc);
1186                 }
1187                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1188                     (marker->cm_flags & CM_END)){
1189                         LASSERT(last_step == marker->cm_step);
1190                         last_step = -1;
1191                         got_an_osc_or_mdc = 0;
1192                         rc = record_start_log(env, mgs, &mdt_llh,
1193                                               mti->mti_svname);
1194                         if (rc)
1195                                 RETURN(rc);
1196                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1197                                            mti->mti_svname,"add osc(copied)");
1198                         record_end_log(env, &mdt_llh);
1199                         RETURN(rc);
1200                 }
1201                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1202                     (marker->cm_flags & CM_START)){
1203                         got_an_osc_or_mdc = 2;
1204                         last_step = marker->cm_step;
1205                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1206                                strlen(marker->cm_tgtname));
1207
1208                         RETURN(rc);
1209                 }
1210                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1211                     (marker->cm_flags & CM_END)){
1212                         LASSERT(last_step == marker->cm_step);
1213                         last_step = -1;
1214                         got_an_osc_or_mdc = 0;
1215                         RETURN(rc);
1216                 }
1217         }
1218
1219         if (got_an_osc_or_mdc == 0 || last_step < 0)
1220                 RETURN(rc);
1221
1222         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1223                 uint64_t nodenid = lcfg->lcfg_nid;
1224
1225                 if (strlen(tmti->mti_uuid) == 0) {
1226                         /* target uuid not set, this config record is before
1227                          * LCFG_SETUP, this nid is one of target node nid.
1228                          */
1229                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1230                         tmti->mti_nid_count++;
1231                 } else {
1232                         /* failover node nid */
1233                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1234                                        libcfs_nid2str(nodenid));
1235                 }
1236
1237                 RETURN(rc);
1238         }
1239
1240         if (lcfg->lcfg_command == LCFG_SETUP) {
1241                 char *target;
1242
1243                 target = lustre_cfg_string(lcfg, 1);
1244                 memcpy(tmti->mti_uuid, target, strlen(target));
1245                 RETURN(rc);
1246         }
1247
1248         /* ignore client side sptlrpc_conf_log */
1249         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1250                 RETURN(rc);
1251
1252         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1253                 int index;
1254
1255                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1256                         RETURN (-EINVAL);
1257
1258                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1259                        strlen(mti->mti_fsname));
1260                 tmti->mti_stripe_index = index;
1261
1262                 rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, tmti,
1263                                               mti->mti_svname);
1264                 memset(tmti, 0, sizeof(*tmti));
1265                 RETURN(rc);
1266         }
1267
1268         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1269                 int index;
1270                 char mdt_index[9];
1271                 char *logname, *lovname;
1272
1273                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1274                                              mti->mti_stripe_index);
1275                 if (rc)
1276                         RETURN(rc);
1277                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1278
1279                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1280                         name_destroy(&logname);
1281                         name_destroy(&lovname);
1282                         RETURN(-EINVAL);
1283                 }
1284
1285                 tmti->mti_stripe_index = index;
1286                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1287                                          mdt_index, lovname,
1288                                          LUSTRE_SP_MDT, 0);
1289                 name_destroy(&logname);
1290                 name_destroy(&lovname);
1291                 RETURN(rc);
1292         }
1293         RETURN(rc);
1294 }
1295
1296 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1297 /* stealed from mgs_get_fsdb_from_llog*/
1298 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1299                                               struct mgs_device *mgs,
1300                                               char *client_name,
1301                                               struct temp_comp* comp)
1302 {
1303         struct llog_handle *loghandle;
1304         struct mgs_target_info *tmti;
1305         struct llog_ctxt *ctxt;
1306         int rc;
1307
1308         ENTRY;
1309
1310         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1311         LASSERT(ctxt != NULL);
1312
1313         OBD_ALLOC_PTR(tmti);
1314         if (tmti == NULL)
1315                 GOTO(out_ctxt, rc = -ENOMEM);
1316
1317         comp->comp_tmti = tmti;
1318         comp->comp_obd = mgs->mgs_obd;
1319
1320         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1321                        LLOG_OPEN_EXISTS);
1322         if (rc < 0) {
1323                 if (rc == -ENOENT)
1324                         rc = 0;
1325                 GOTO(out_pop, rc);
1326         }
1327
1328         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1329         if (rc)
1330                 GOTO(out_close, rc);
1331
1332         rc = llog_process_or_fork(env, loghandle, mgs_steal_llog_handler,
1333                                   (void *)comp, NULL, false);
1334         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1335 out_close:
1336         llog_close(env, loghandle);
1337 out_pop:
1338         OBD_FREE_PTR(tmti);
1339 out_ctxt:
1340         llog_ctxt_put(ctxt);
1341         RETURN(rc);
1342 }
1343
1344 /* lmv is the second thing for client logs */
1345 /* copied from mgs_write_log_lov. Please refer to that.  */
1346 static int mgs_write_log_lmv(const struct lu_env *env,
1347                              struct mgs_device *mgs,
1348                              struct fs_db *fsdb,
1349                              struct mgs_target_info *mti,
1350                              char *logname, char *lmvname)
1351 {
1352         struct llog_handle *llh = NULL;
1353         struct lmv_desc *lmvdesc;
1354         char *uuid;
1355         int rc = 0;
1356         ENTRY;
1357
1358         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1359
1360         OBD_ALLOC_PTR(lmvdesc);
1361         if (lmvdesc == NULL)
1362                 RETURN(-ENOMEM);
1363         lmvdesc->ld_active_tgt_count = 0;
1364         lmvdesc->ld_tgt_count = 0;
1365         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1366         uuid = (char *)lmvdesc->ld_uuid.uuid;
1367
1368         rc = record_start_log(env, mgs, &llh, logname);
1369         if (rc)
1370                 GOTO(out_free, rc);
1371         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1372         if (rc)
1373                 GOTO(out_end, rc);
1374         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1375         if (rc)
1376                 GOTO(out_end, rc);
1377         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1378         if (rc)
1379                 GOTO(out_end, rc);
1380         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1381         if (rc)
1382                 GOTO(out_end, rc);
1383 out_end:
1384         record_end_log(env, &llh);
1385 out_free:
1386         OBD_FREE_PTR(lmvdesc);
1387         RETURN(rc);
1388 }
1389
1390 /* lov is the first thing in the mdt and client logs */
1391 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1392                              struct fs_db *fsdb, struct mgs_target_info *mti,
1393                              char *logname, char *lovname)
1394 {
1395         struct llog_handle *llh = NULL;
1396         struct lov_desc *lovdesc;
1397         char *uuid;
1398         int rc = 0;
1399         ENTRY;
1400
1401         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1402
1403         /*
1404         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1405         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1406               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1407         */
1408
1409         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1410         OBD_ALLOC_PTR(lovdesc);
1411         if (lovdesc == NULL)
1412                 RETURN(-ENOMEM);
1413         lovdesc->ld_magic = LOV_DESC_MAGIC;
1414         lovdesc->ld_tgt_count = 0;
1415         /* Defaults.  Can be changed later by lcfg config_param */
1416         lovdesc->ld_default_stripe_count = 1;
1417         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1418         lovdesc->ld_default_stripe_size = 1024 * 1024;
1419         lovdesc->ld_default_stripe_offset = -1;
1420         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1421         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1422         /* can these be the same? */
1423         uuid = (char *)lovdesc->ld_uuid.uuid;
1424
1425         /* This should always be the first entry in a log.
1426         rc = mgs_clear_log(obd, logname); */
1427         rc = record_start_log(env, mgs, &llh, logname);
1428         if (rc)
1429                 GOTO(out_free, rc);
1430         /* FIXME these should be a single journal transaction */
1431         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1432         if (rc)
1433                 GOTO(out_end, rc);
1434         rc = record_attach(env, llh, lovname, "lov", uuid);
1435         if (rc)
1436                 GOTO(out_end, rc);
1437         rc = record_lov_setup(env, llh, lovname, lovdesc);
1438         if (rc)
1439                 GOTO(out_end, rc);
1440         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1441         if (rc)
1442                 GOTO(out_end, rc);
1443         EXIT;
1444 out_end:
1445         record_end_log(env, &llh);
1446 out_free:
1447         OBD_FREE_PTR(lovdesc);
1448         return rc;
1449 }
1450
1451 /* add failnids to open log */
1452 static int mgs_write_log_failnids(const struct lu_env *env,
1453                                   struct mgs_target_info *mti,
1454                                   struct llog_handle *llh,
1455                                   char *cliname)
1456 {
1457         char *failnodeuuid = NULL;
1458         char *ptr = mti->mti_params;
1459         lnet_nid_t nid;
1460         int rc = 0;
1461
1462         /*
1463         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1464         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1465         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1466         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1467         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1468         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1469         */
1470
1471         /* Pull failnid info out of params string */
1472         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1473                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1474                         if (failnodeuuid == NULL) {
1475                                 /* We don't know the failover node name,
1476                                    so just use the first nid as the uuid */
1477                                 rc = name_create(&failnodeuuid,
1478                                                  libcfs_nid2str(nid), "");
1479                                 if (rc)
1480                                         return rc;
1481                         }
1482                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1483                                "client %s\n", libcfs_nid2str(nid),
1484                                failnodeuuid, cliname);
1485                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1486                 }
1487                 if (failnodeuuid)
1488                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1489         }
1490
1491         name_destroy(&failnodeuuid);
1492         return rc;
1493 }
1494
1495 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1496                                     struct mgs_device *mgs,
1497                                     struct fs_db *fsdb,
1498                                     struct mgs_target_info *mti,
1499                                     char *logname, char *lmvname)
1500 {
1501         struct llog_handle *llh = NULL;
1502         char *mdcname = NULL;
1503         char *nodeuuid = NULL;
1504         char *mdcuuid = NULL;
1505         char *lmvuuid = NULL;
1506         char index[6];
1507         int i, rc;
1508         ENTRY;
1509
1510         if (mgs_log_is_empty(env, mgs, logname)) {
1511                 CERROR("log is empty! Logical error\n");
1512                 RETURN(-EINVAL);
1513         }
1514
1515         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1516                mti->mti_svname, logname, lmvname);
1517
1518         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1519         if (rc)
1520                 RETURN(rc);
1521         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1522         if (rc)
1523                 GOTO(out_free, rc);
1524         rc = name_create(&mdcuuid, mdcname, "_UUID");
1525         if (rc)
1526                 GOTO(out_free, rc);
1527         rc = name_create(&lmvuuid, lmvname, "_UUID");
1528         if (rc)
1529                 GOTO(out_free, rc);
1530
1531         rc = record_start_log(env, mgs, &llh, logname);
1532         if (rc)
1533                 GOTO(out_free, rc);
1534         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1535                            "add mdc");
1536         if (rc)
1537                 GOTO(out_end, rc);
1538         for (i = 0; i < mti->mti_nid_count; i++) {
1539                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1540                        libcfs_nid2str(mti->mti_nids[i]));
1541
1542                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1543                 if (rc)
1544                         GOTO(out_end, rc);
1545         }
1546
1547         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1548         if (rc)
1549                 GOTO(out_end, rc);
1550         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1551         if (rc)
1552                 GOTO(out_end, rc);
1553         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1554         if (rc)
1555                 GOTO(out_end, rc);
1556         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1557         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1558                             index, "1");
1559         if (rc)
1560                 GOTO(out_end, rc);
1561         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1562                            "add mdc");
1563         if (rc)
1564                 GOTO(out_end, rc);
1565 out_end:
1566         record_end_log(env, &llh);
1567 out_free:
1568         name_destroy(&lmvuuid);
1569         name_destroy(&mdcuuid);
1570         name_destroy(&mdcname);
1571         name_destroy(&nodeuuid);
1572         RETURN(rc);
1573 }
1574
1575 /* add new mdc to already existent MDS */
1576 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1577                                     struct mgs_device *mgs,
1578                                     struct fs_db *fsdb,
1579                                     struct mgs_target_info *mti,
1580                                     char *logname)
1581 {
1582         struct llog_handle *llh = NULL;
1583         char *nodeuuid = NULL;
1584         char *mdcname = NULL;
1585         char *mdcuuid = NULL;
1586         char *mdtuuid = NULL;
1587         int idx = mti->mti_stripe_index;
1588         char index[9];
1589         int i, rc;
1590
1591         ENTRY;
1592         if (mgs_log_is_empty(env, mgs, logname)) {
1593                 CERROR("log is empty! Logical error\n");
1594                 RETURN (-EINVAL);
1595         }
1596
1597         CDEBUG(D_MGS, "adding mdc index %d to %s\n", idx, logname);
1598
1599         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1600         if (rc)
1601                 RETURN(rc);
1602         snprintf(index, sizeof(index), "-mdc%04x", idx);
1603         rc = name_create(&mdcname, logname, index);
1604         if (rc)
1605                 GOTO(out_free, rc);
1606         rc = name_create(&mdcuuid, mdcname, "_UUID");
1607         if (rc)
1608                 GOTO(out_free, rc);
1609         rc = name_create(&mdtuuid, logname, "_UUID");
1610         if (rc)
1611                 GOTO(out_free, rc);
1612
1613         rc = record_start_log(env, mgs, &llh, logname);
1614         if (rc)
1615                 GOTO(out_free, rc);
1616         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname, "add mdc");
1617         if (rc)
1618                 GOTO(out_end, rc);
1619         for (i = 0; i < mti->mti_nid_count; i++) {
1620                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1621                        libcfs_nid2str(mti->mti_nids[i]));
1622                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1623                 if (rc)
1624                         GOTO(out_end, rc);
1625         }
1626         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, mdcuuid);
1627         if (rc)
1628                 GOTO(out_end, rc);
1629         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1630         if (rc)
1631                 GOTO(out_end, rc);
1632         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1633         if (rc)
1634                 GOTO(out_end, rc);
1635         snprintf(index, sizeof(index), "%d", idx);
1636
1637         rc = record_mdc_add(env, llh, logname, mdcuuid, mti->mti_uuid,
1638                             index, "1");
1639         if (rc)
1640                 GOTO(out_end, rc);
1641         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add mdc");
1642         if (rc)
1643                 GOTO(out_end, rc);
1644 out_end:
1645         record_end_log(env, &llh);
1646 out_free:
1647         name_destroy(&mdtuuid);
1648         name_destroy(&mdcuuid);
1649         name_destroy(&mdcname);
1650         name_destroy(&nodeuuid);
1651         RETURN(rc);
1652 }
1653
1654 static int mgs_write_log_mdt0(const struct lu_env *env,
1655                               struct mgs_device *mgs,
1656                               struct fs_db *fsdb,
1657                               struct mgs_target_info *mti)
1658 {
1659         char *log = mti->mti_svname;
1660         struct llog_handle *llh = NULL;
1661         char *uuid, *lovname;
1662         char mdt_index[6];
1663         char *ptr = mti->mti_params;
1664         int rc = 0, failout = 0;
1665         ENTRY;
1666
1667         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
1668         if (uuid == NULL)
1669                 RETURN(-ENOMEM);
1670
1671         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1672                 failout = (strncmp(ptr, "failout", 7) == 0);
1673
1674         rc = name_create(&lovname, log, "-mdtlov");
1675         if (rc)
1676                 GOTO(out_free, rc);
1677         if (mgs_log_is_empty(env, mgs, log)) {
1678                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
1679                 if (rc)
1680                         GOTO(out_lod, rc);
1681         }
1682
1683         sprintf(mdt_index, "%d", mti->mti_stripe_index);
1684
1685         rc = record_start_log(env, mgs, &llh, log);
1686         if (rc)
1687                 GOTO(out_lod, rc);
1688
1689         /* add MDT itself */
1690
1691         /* FIXME this whole fn should be a single journal transaction */
1692         sprintf(uuid, "%s_UUID", log);
1693         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
1694         if (rc)
1695                 GOTO(out_lod, rc);
1696         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
1697         if (rc)
1698                 GOTO(out_end, rc);
1699         rc = record_mount_opt(env, llh, log, lovname, NULL);
1700         if (rc)
1701                 GOTO(out_end, rc);
1702         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
1703                         failout ? "n" : "f");
1704         if (rc)
1705                 GOTO(out_end, rc);
1706         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
1707         if (rc)
1708                 GOTO(out_end, rc);
1709 out_end:
1710         record_end_log(env, &llh);
1711 out_lod:
1712         name_destroy(&lovname);
1713 out_free:
1714         OBD_FREE(uuid, sizeof(struct obd_uuid));
1715         RETURN(rc);
1716 }
1717
1718 static inline int name_create_mdt(char **logname, char *fsname, int i)
1719 {
1720         char mdt_index[9];
1721
1722         sprintf(mdt_index, "-MDT%04x", i);
1723         return name_create(logname, fsname, mdt_index);
1724 }
1725
1726 static int name_create_mdt_and_lov(char **logname, char **lovname,
1727                                     struct fs_db *fsdb, int i)
1728 {
1729         int rc;
1730
1731         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
1732         if (rc)
1733                 return rc;
1734         /* COMPAT_180 */
1735         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1736                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
1737         else
1738                 rc = name_create(lovname, *logname, "-mdtlov");
1739         if (rc) {
1740                 name_destroy(logname);
1741                 *logname = NULL;
1742         }
1743         return rc;
1744 }
1745
1746 static inline int name_create_mdt_osc(char **oscname, char *ostname,
1747                                        struct fs_db *fsdb, int i)
1748 {
1749         char suffix[16];
1750
1751         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1752                 sprintf(suffix, "-osc");
1753         else
1754                 sprintf(suffix, "-osc-MDT%04x", i);
1755         return name_create(oscname, ostname, suffix);
1756 }
1757
1758 /* envelope method for all layers log */
1759 static int mgs_write_log_mdt(const struct lu_env *env,
1760                              struct mgs_device *mgs,
1761                              struct fs_db *fsdb,
1762                              struct mgs_target_info *mti)
1763 {
1764         struct mgs_thread_info *mgi = mgs_env_info(env);
1765         struct llog_handle *llh = NULL;
1766         char *cliname;
1767         int rc, i = 0;
1768         ENTRY;
1769
1770         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
1771
1772         if (mti->mti_uuid[0] == '\0') {
1773                 /* Make up our own uuid */
1774                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1775                          "%s_UUID", mti->mti_svname);
1776         }
1777
1778         /* add mdt */
1779         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
1780         if (rc)
1781                 RETURN(rc);
1782         /* Append the mdt info to the client log */
1783         rc = name_create(&cliname, mti->mti_fsname, "-client");
1784         if (rc)
1785                 RETURN(rc);
1786
1787         if (mgs_log_is_empty(env, mgs, cliname)) {
1788                 /* Start client log */
1789                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
1790                                        fsdb->fsdb_clilov);
1791                 if (rc)
1792                         GOTO(out_free, rc);
1793                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
1794                                        fsdb->fsdb_clilmv);
1795                 if (rc)
1796                         GOTO(out_free, rc);
1797         }
1798
1799         /*
1800         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1801         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
1802         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
1803         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1804         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
1805         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
1806         */
1807
1808                 /* copy client info about lov/lmv */
1809                 mgi->mgi_comp.comp_mti = mti;
1810                 mgi->mgi_comp.comp_fsdb = fsdb;
1811
1812                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
1813                                                         &mgi->mgi_comp);
1814                 if (rc)
1815                         GOTO(out_free, rc);
1816                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
1817                                               fsdb->fsdb_clilmv);
1818                 if (rc)
1819                         GOTO(out_free, rc);
1820
1821                 /* add mountopts */
1822                 rc = record_start_log(env, mgs, &llh, cliname);
1823                 if (rc)
1824                         GOTO(out_free, rc);
1825
1826                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
1827                                    "mount opts");
1828                 if (rc)
1829                         GOTO(out_end, rc);
1830                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
1831                                       fsdb->fsdb_clilmv);
1832                 if (rc)
1833                         GOTO(out_end, rc);
1834                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
1835                                    "mount opts");
1836
1837         if (rc)
1838                 GOTO(out_end, rc);
1839         /* for_all_existing_mdt except current one */
1840         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1841                 char *mdtname;
1842                 if (i !=  mti->mti_stripe_index &&
1843                     test_bit(i,  fsdb->fsdb_mdt_index_map)) {
1844                         rc = name_create_mdt(&mdtname, mti->mti_fsname, i);
1845                         if (rc)
1846                                 GOTO(out_end, rc);
1847                         rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, mti, mdtname);
1848                         name_destroy(&mdtname);
1849                         if (rc)
1850                                 GOTO(out_end, rc);
1851                 }
1852         }
1853 out_end:
1854         record_end_log(env, &llh);
1855 out_free:
1856         name_destroy(&cliname);
1857         RETURN(rc);
1858 }
1859
1860 /* Add the ost info to the client/mdt lov */
1861 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1862                                     struct mgs_device *mgs, struct fs_db *fsdb,
1863                                     struct mgs_target_info *mti,
1864                                     char *logname, char *suffix, char *lovname,
1865                                     enum lustre_sec_part sec_part, int flags)
1866 {
1867         struct llog_handle *llh = NULL;
1868         char *nodeuuid = NULL;
1869         char *oscname = NULL;
1870         char *oscuuid = NULL;
1871         char *lovuuid = NULL;
1872         char *svname = NULL;
1873         char index[6];
1874         int i, rc;
1875
1876         ENTRY;
1877         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
1878                mti->mti_svname, logname);
1879
1880         if (mgs_log_is_empty(env, mgs, logname)) {
1881                 CERROR("log is empty! Logical error\n");
1882                 RETURN (-EINVAL);
1883         }
1884
1885         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1886         if (rc)
1887                 RETURN(rc);
1888         rc = name_create(&svname, mti->mti_svname, "-osc");
1889         if (rc)
1890                 GOTO(out_free, rc);
1891         /* for the system upgraded from old 1.8, keep using the old osc naming
1892          * style for mdt, see name_create_mdt_osc(). LU-1257 */
1893         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1894                 rc = name_create(&oscname, svname, "");
1895         else
1896                 rc = name_create(&oscname, svname, suffix);
1897         if (rc)
1898                 GOTO(out_free, rc);
1899         rc = name_create(&oscuuid, oscname, "_UUID");
1900         if (rc)
1901                 GOTO(out_free, rc);
1902         rc = name_create(&lovuuid, lovname, "_UUID");
1903         if (rc)
1904                 GOTO(out_free, rc);
1905
1906         /*
1907         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1908         multihomed (#4)
1909         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
1910         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
1911         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
1912         failover (#6,7)
1913         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1914         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
1915         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
1916         */
1917
1918         rc = record_start_log(env, mgs, &llh, logname);
1919         if (rc)
1920                 GOTO(out_free, rc);
1921
1922         /* FIXME these should be a single journal transaction */
1923         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
1924                            "add osc");
1925         if (rc)
1926                 GOTO(out_end, rc);
1927
1928         /* NB: don't change record order, because upon MDT steal OSC config
1929          * from client, it treats all nids before LCFG_SETUP as target nids
1930          * (multiple interfaces), while nids after as failover node nids.
1931          * See mgs_steal_llog_handler() LCFG_ADD_UUID.
1932          */
1933         for (i = 0; i < mti->mti_nid_count; i++) {
1934                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
1935                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1936                 if (rc)
1937                         GOTO(out_end, rc);
1938         }
1939         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
1940         if (rc)
1941                 GOTO(out_end, rc);
1942         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
1943         if (rc)
1944                 GOTO(out_end, rc);
1945         rc = mgs_write_log_failnids(env, mti, llh, oscname);
1946         if (rc)
1947                 GOTO(out_end, rc);
1948         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1949         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
1950         if (rc)
1951                 GOTO(out_end, rc);
1952         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
1953                            "add osc");
1954         if (rc)
1955                 GOTO(out_end, rc);
1956 out_end:
1957         record_end_log(env, &llh);
1958 out_free:
1959         name_destroy(&lovuuid);
1960         name_destroy(&oscuuid);
1961         name_destroy(&oscname);
1962         name_destroy(&svname);
1963         name_destroy(&nodeuuid);
1964         RETURN(rc);
1965 }
1966
1967 static int mgs_write_log_ost(const struct lu_env *env,
1968                              struct mgs_device *mgs, struct fs_db *fsdb,
1969                              struct mgs_target_info *mti)
1970 {
1971         struct llog_handle *llh = NULL;
1972         char *logname, *lovname;
1973         char *ptr = mti->mti_params;
1974         int rc, flags = 0, failout = 0, i;
1975         ENTRY;
1976
1977         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
1978
1979         /* The ost startup log */
1980
1981         /* If the ost log already exists, that means that someone reformatted
1982            the ost and it called target_add again. */
1983         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
1984                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
1985                                    "exists, yet the server claims it never "
1986                                    "registered. It may have been reformatted, "
1987                                    "or the index changed. writeconf the MDT to "
1988                                    "regenerate all logs.\n", mti->mti_svname);
1989                 RETURN(-EALREADY);
1990         }
1991
1992         /*
1993         attach obdfilter ost1 ost1_UUID
1994         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
1995         */
1996         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1997                 failout = (strncmp(ptr, "failout", 7) == 0);
1998         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
1999         if (rc)
2000                 RETURN(rc);
2001         /* FIXME these should be a single journal transaction */
2002         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2003         if (rc)
2004                 GOTO(out_end, rc);
2005         if (*mti->mti_uuid == '\0')
2006                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2007                          "%s_UUID", mti->mti_svname);
2008         rc = record_attach(env, llh, mti->mti_svname,
2009                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2010         if (rc)
2011                 GOTO(out_end, rc);
2012         rc = record_setup(env, llh, mti->mti_svname,
2013                           "dev"/*ignored*/, "type"/*ignored*/,
2014                           failout ? "n" : "f", 0/*options*/);
2015         if (rc)
2016                 GOTO(out_end, rc);
2017         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2018         if (rc)
2019                 GOTO(out_end, rc);
2020 out_end:
2021         record_end_log(env, &llh);
2022         if (rc)
2023                 RETURN(rc);
2024         /* We also have to update the other logs where this osc is part of
2025            the lov */
2026
2027         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2028                 /* If we're upgrading, the old mdt log already has our
2029                    entry. Let's do a fake one for fun. */
2030                 /* Note that we can't add any new failnids, since we don't
2031                    know the old osc names. */
2032                 flags = CM_SKIP | CM_UPGRADE146;
2033
2034         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2035                 /* If the update flag isn't set, don't update client/mdt
2036                    logs. */
2037                 flags |= CM_SKIP;
2038                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2039                               "the MDT first to regenerate it.\n",
2040                               mti->mti_svname);
2041         }
2042
2043         /* Add ost to all MDT lov defs */
2044         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2045                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2046                         char mdt_index[9];
2047
2048                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2049                                                      i);
2050                         if (rc)
2051                                 RETURN(rc);
2052                         sprintf(mdt_index, "-MDT%04x", i);
2053                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2054                                                       logname, mdt_index,
2055                                                       lovname, LUSTRE_SP_MDT,
2056                                                       flags);
2057                         name_destroy(&logname);
2058                         name_destroy(&lovname);
2059                         if (rc)
2060                                 RETURN(rc);
2061                 }
2062         }
2063
2064         /* Append ost info to the client log */
2065         rc = name_create(&logname, mti->mti_fsname, "-client");
2066         if (rc)
2067                 RETURN(rc);
2068         if (mgs_log_is_empty(env, mgs, logname)) {
2069                 /* Start client log */
2070                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2071                                        fsdb->fsdb_clilov);
2072                 if (rc)
2073                         GOTO(out_free, rc);
2074                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2075                                        fsdb->fsdb_clilmv);
2076                 if (rc)
2077                         GOTO(out_free, rc);
2078         }
2079         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2080                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2081 out_free:
2082         name_destroy(&logname);
2083         RETURN(rc);
2084 }
2085
2086 static __inline__ int mgs_param_empty(char *ptr)
2087 {
2088         char *tmp;
2089
2090         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2091                 return 1;
2092         return 0;
2093 }
2094
2095 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2096                                           struct mgs_device *mgs,
2097                                           struct fs_db *fsdb,
2098                                           struct mgs_target_info *mti,
2099                                           char *logname, char *cliname)
2100 {
2101         int rc;
2102         struct llog_handle *llh = NULL;
2103
2104         if (mgs_param_empty(mti->mti_params)) {
2105                 /* Remove _all_ failnids */
2106                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2107                                 mti->mti_svname, "add failnid", CM_SKIP);
2108                 return rc < 0 ? rc : 0;
2109         }
2110
2111         /* Otherwise failover nids are additive */
2112         rc = record_start_log(env, mgs, &llh, logname);
2113         if (rc)
2114                 return rc;
2115                 /* FIXME this should be a single journal transaction */
2116         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2117                            "add failnid");
2118         if (rc)
2119                 goto out_end;
2120         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2121         if (rc)
2122                 goto out_end;
2123         rc = record_marker(env, llh, fsdb, CM_END,
2124                            mti->mti_svname, "add failnid");
2125 out_end:
2126         record_end_log(env, &llh);
2127         return rc;
2128 }
2129
2130
2131 /* Add additional failnids to an existing log.
2132    The mdc/osc must have been added to logs first */
2133 /* tcp nids must be in dotted-quad ascii -
2134    we can't resolve hostnames from the kernel. */
2135 static int mgs_write_log_add_failnid(const struct lu_env *env,
2136                                      struct mgs_device *mgs,
2137                                      struct fs_db *fsdb,
2138                                      struct mgs_target_info *mti)
2139 {
2140         char *logname, *cliname;
2141         int rc;
2142         ENTRY;
2143
2144         /* FIXME we currently can't erase the failnids
2145          * given when a target first registers, since they aren't part of
2146          * an "add uuid" stanza */
2147
2148         /* Verify that we know about this target */
2149         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2150                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2151                                    "yet. It must be started before failnids "
2152                                    "can be added.\n", mti->mti_svname);
2153                 RETURN(-ENOENT);
2154         }
2155
2156         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2157         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2158                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2159         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2160                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2161         } else {
2162                 RETURN(-EINVAL);
2163         }
2164         if (rc)
2165                 RETURN(rc);
2166         /* Add failover nids to the client log */
2167         rc = name_create(&logname, mti->mti_fsname, "-client");
2168         if (rc) {
2169                 name_destroy(&cliname);
2170                 RETURN(rc);
2171         }
2172         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2173         name_destroy(&logname);
2174         name_destroy(&cliname);
2175         if (rc)
2176                 RETURN(rc);
2177
2178         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2179                 /* Add OST failover nids to the MDT logs as well */
2180                 int i;
2181
2182                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2183                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2184                                 continue;
2185                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2186                         if (rc)
2187                                 RETURN(rc);
2188                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2189                                                  fsdb, i);
2190                         if (rc) {
2191                                 name_destroy(&logname);
2192                                 RETURN(rc);
2193                         }
2194                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2195                                                             mti, logname,
2196                                                             cliname);
2197                         name_destroy(&cliname);
2198                         name_destroy(&logname);
2199                         if (rc)
2200                                 RETURN(rc);
2201                 }
2202         }
2203
2204         RETURN(rc);
2205 }
2206
2207 static int mgs_wlp_lcfg(const struct lu_env *env,
2208                         struct mgs_device *mgs, struct fs_db *fsdb,
2209                         struct mgs_target_info *mti,
2210                         char *logname, struct lustre_cfg_bufs *bufs,
2211                         char *tgtname, char *ptr)
2212 {
2213         char comment[MTI_NAME_MAXLEN];
2214         char *tmp;
2215         struct lustre_cfg *lcfg;
2216         int rc, del;
2217
2218         /* Erase any old settings of this same parameter */
2219         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2220         comment[MTI_NAME_MAXLEN - 1] = 0;
2221         /* But don't try to match the value. */
2222         if ((tmp = strchr(comment, '=')))
2223             *tmp = 0;
2224         /* FIXME we should skip settings that are the same as old values */
2225         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2226         if (rc < 0)
2227                 return rc;
2228         del = mgs_param_empty(ptr);
2229
2230         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2231                       "Sett" : "Modify", tgtname, comment, logname);
2232         if (del)
2233                 return rc;
2234
2235         lustre_cfg_bufs_reset(bufs, tgtname);
2236         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2237         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2238         if (!lcfg)
2239                 return -ENOMEM;
2240         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2241         lustre_cfg_free(lcfg);
2242         return rc;
2243 }
2244
2245 /* write global variable settings into log */
2246 static int mgs_write_log_sys(const struct lu_env *env,
2247                              struct mgs_device *mgs, struct fs_db *fsdb,
2248                              struct mgs_target_info *mti, char *sys, char *ptr)
2249 {
2250         struct mgs_thread_info *mgi = mgs_env_info(env);
2251         struct lustre_cfg *lcfg;
2252         char *tmp, sep;
2253         int rc, cmd, convert = 1;
2254
2255         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2256                 cmd = LCFG_SET_TIMEOUT;
2257         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2258                 cmd = LCFG_SET_LDLM_TIMEOUT;
2259         /* Check for known params here so we can return error to lctl */
2260         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2261                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2262                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2263                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2264                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2265                 cmd = LCFG_PARAM;
2266         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2267                 convert = 0; /* Don't convert string value to integer */
2268                 cmd = LCFG_PARAM;
2269         } else {
2270                 return -EINVAL;
2271         }
2272
2273         if (mgs_param_empty(ptr))
2274                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2275         else
2276                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2277
2278         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2279         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2280         if (!convert && *tmp != '\0')
2281                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2282         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2283         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2284         /* truncate the comment to the parameter name */
2285         ptr = tmp - 1;
2286         sep = *ptr;
2287         *ptr = '\0';
2288         /* modify all servers and clients */
2289         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2290                                       *tmp == '\0' ? NULL : lcfg,
2291                                       mti->mti_fsname, sys, 0);
2292         if (rc == 0 && *tmp != '\0') {
2293                 switch (cmd) {
2294                 case LCFG_SET_TIMEOUT:
2295                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2296                                 class_process_config(lcfg);
2297                         break;
2298                 case LCFG_SET_LDLM_TIMEOUT:
2299                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2300                                 class_process_config(lcfg);
2301                         break;
2302                 default:
2303                         break;
2304                 }
2305         }
2306         *ptr = sep;
2307         lustre_cfg_free(lcfg);
2308         return rc;
2309 }
2310
2311 /* write quota settings into log */
2312 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2313                                struct fs_db *fsdb, struct mgs_target_info *mti,
2314                                char *quota, char *ptr)
2315 {
2316         struct mgs_thread_info  *mgi = mgs_env_info(env);
2317         struct lustre_cfg       *lcfg;
2318         char                    *tmp;
2319         char                     sep;
2320         int                      rc, cmd = LCFG_PARAM;
2321
2322         /* support only 'meta' and 'data' pools so far */
2323         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2324             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2325                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2326                        "& quota.ost are)\n", ptr);
2327                 return -EINVAL;
2328         }
2329
2330         if (*tmp == '\0') {
2331                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2332         } else {
2333                 CDEBUG(D_MGS, "global '%s'\n", quota);
2334
2335                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2336                     strcmp(tmp, "none") != 0) {
2337                         CERROR("enable option(%s) isn't supported\n", tmp);
2338                         return -EINVAL;
2339                 }
2340         }
2341
2342         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2343         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2344         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2345         /* truncate the comment to the parameter name */
2346         ptr = tmp - 1;
2347         sep = *ptr;
2348         *ptr = '\0';
2349
2350         /* XXX we duplicated quota enable information in all server
2351          *     config logs, it should be moved to a separate config
2352          *     log once we cleanup the config log for global param. */
2353         /* modify all servers */
2354         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2355                                       *tmp == '\0' ? NULL : lcfg,
2356                                       mti->mti_fsname, quota, 1);
2357         *ptr = sep;
2358         lustre_cfg_free(lcfg);
2359         return rc;
2360 }
2361
2362 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2363                                    struct mgs_device *mgs,
2364                                    struct fs_db *fsdb,
2365                                    struct mgs_target_info *mti,
2366                                    char *param)
2367 {
2368         struct mgs_thread_info *mgi = mgs_env_info(env);
2369         struct llog_handle     *llh = NULL;
2370         char                   *logname;
2371         char                   *comment, *ptr;
2372         struct lustre_cfg      *lcfg;
2373         int                     rc, len;
2374         ENTRY;
2375
2376         /* get comment */
2377         ptr = strchr(param, '=');
2378         LASSERT(ptr);
2379         len = ptr - param;
2380
2381         OBD_ALLOC(comment, len + 1);
2382         if (comment == NULL)
2383                 RETURN(-ENOMEM);
2384         strncpy(comment, param, len);
2385         comment[len] = '\0';
2386
2387         /* prepare lcfg */
2388         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2389         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2390         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2391         if (lcfg == NULL)
2392                 GOTO(out_comment, rc = -ENOMEM);
2393
2394         /* construct log name */
2395         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2396         if (rc)
2397                 GOTO(out_lcfg, rc);
2398
2399         if (mgs_log_is_empty(env, mgs, logname)) {
2400                 rc = record_start_log(env, mgs, &llh, logname);
2401                 if (rc)
2402                         GOTO(out, rc);
2403                 record_end_log(env, &llh);
2404         }
2405
2406         /* obsolete old one */
2407         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2408                         comment, CM_SKIP);
2409         if (rc < 0)
2410                 GOTO(out, rc);
2411         /* write the new one */
2412         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2413                                   mti->mti_svname, comment);
2414         if (rc)
2415                 CERROR("err %d writing log %s\n", rc, logname);
2416 out:
2417         name_destroy(&logname);
2418 out_lcfg:
2419         lustre_cfg_free(lcfg);
2420 out_comment:
2421         OBD_FREE(comment, len + 1);
2422         RETURN(rc);
2423 }
2424
2425 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2426                                         char *param)
2427 {
2428         char    *ptr;
2429
2430         /* disable the adjustable udesc parameter for now, i.e. use default
2431          * setting that client always ship udesc to MDT if possible. to enable
2432          * it simply remove the following line */
2433         goto error_out;
2434
2435         ptr = strchr(param, '=');
2436         if (ptr == NULL)
2437                 goto error_out;
2438         *ptr++ = '\0';
2439
2440         if (strcmp(param, PARAM_SRPC_UDESC))
2441                 goto error_out;
2442
2443         if (strcmp(ptr, "yes") == 0) {
2444                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2445                 CWARN("Enable user descriptor shipping from client to MDT\n");
2446         } else if (strcmp(ptr, "no") == 0) {
2447                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2448                 CWARN("Disable user descriptor shipping from client to MDT\n");
2449         } else {
2450                 *(ptr - 1) = '=';
2451                 goto error_out;
2452         }
2453         return 0;
2454
2455 error_out:
2456         CERROR("Invalid param: %s\n", param);
2457         return -EINVAL;
2458 }
2459
2460 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2461                                   const char *svname,
2462                                   char *param)
2463 {
2464         struct sptlrpc_rule      rule;
2465         struct sptlrpc_rule_set *rset;
2466         int                      rc;
2467         ENTRY;
2468
2469         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2470                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2471                 RETURN(-EINVAL);
2472         }
2473
2474         if (strncmp(param, PARAM_SRPC_UDESC,
2475                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2476                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2477         }
2478
2479         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2480                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2481                 RETURN(-EINVAL);
2482         }
2483
2484         param += sizeof(PARAM_SRPC_FLVR) - 1;
2485
2486         rc = sptlrpc_parse_rule(param, &rule);
2487         if (rc)
2488                 RETURN(rc);
2489
2490         /* mgs rules implies must be mgc->mgs */
2491         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2492                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2493                      rule.sr_from != LUSTRE_SP_ANY) ||
2494                     (rule.sr_to != LUSTRE_SP_MGS &&
2495                      rule.sr_to != LUSTRE_SP_ANY))
2496                         RETURN(-EINVAL);
2497         }
2498
2499         /* preapre room for this coming rule. svcname format should be:
2500          * - fsname: general rule
2501          * - fsname-tgtname: target-specific rule
2502          */
2503         if (strchr(svname, '-')) {
2504                 struct mgs_tgt_srpc_conf *tgtconf;
2505                 int                       found = 0;
2506
2507                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
2508                      tgtconf = tgtconf->mtsc_next) {
2509                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
2510                                 found = 1;
2511                                 break;
2512                         }
2513                 }
2514
2515                 if (!found) {
2516                         int name_len;
2517
2518                         OBD_ALLOC_PTR(tgtconf);
2519                         if (tgtconf == NULL)
2520                                 RETURN(-ENOMEM);
2521
2522                         name_len = strlen(svname);
2523
2524                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
2525                         if (tgtconf->mtsc_tgt == NULL) {
2526                                 OBD_FREE_PTR(tgtconf);
2527                                 RETURN(-ENOMEM);
2528                         }
2529                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
2530
2531                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
2532                         fsdb->fsdb_srpc_tgt = tgtconf;
2533                 }
2534
2535                 rset = &tgtconf->mtsc_rset;
2536         } else {
2537                 rset = &fsdb->fsdb_srpc_gen;
2538         }
2539
2540         rc = sptlrpc_rule_set_merge(rset, &rule);
2541
2542         RETURN(rc);
2543 }
2544
2545 static int mgs_srpc_set_param(const struct lu_env *env,
2546                               struct mgs_device *mgs,
2547                               struct fs_db *fsdb,
2548                               struct mgs_target_info *mti,
2549                               char *param)
2550 {
2551         char                   *copy;
2552         int                     rc, copy_size;
2553         ENTRY;
2554
2555 #ifndef HAVE_GSS
2556         RETURN(-EINVAL);
2557 #endif
2558         /* keep a copy of original param, which could be destroied
2559          * during parsing */
2560         copy_size = strlen(param) + 1;
2561         OBD_ALLOC(copy, copy_size);
2562         if (copy == NULL)
2563                 return -ENOMEM;
2564         memcpy(copy, param, copy_size);
2565
2566         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
2567         if (rc)
2568                 goto out_free;
2569
2570         /* previous steps guaranteed the syntax is correct */
2571         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
2572         if (rc)
2573                 goto out_free;
2574
2575         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2576                 /*
2577                  * for mgs rules, make them effective immediately.
2578                  */
2579                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
2580                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
2581                                                  &fsdb->fsdb_srpc_gen);
2582         }
2583
2584 out_free:
2585         OBD_FREE(copy, copy_size);
2586         RETURN(rc);
2587 }
2588
2589 struct mgs_srpc_read_data {
2590         struct fs_db   *msrd_fsdb;
2591         int             msrd_skip;
2592 };
2593
2594 static int mgs_srpc_read_handler(const struct lu_env *env,
2595                                  struct llog_handle *llh,
2596                                  struct llog_rec_hdr *rec, void *data)
2597 {
2598         struct mgs_srpc_read_data *msrd = data;
2599         struct cfg_marker         *marker;
2600         struct lustre_cfg         *lcfg = (struct lustre_cfg *)(rec + 1);
2601         char                      *svname, *param;
2602         int                        cfg_len, rc;
2603         ENTRY;
2604
2605         if (rec->lrh_type != OBD_CFG_REC) {
2606                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2607                 RETURN(-EINVAL);
2608         }
2609
2610         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
2611                   sizeof(struct llog_rec_tail);
2612
2613         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
2614         if (rc) {
2615                 CERROR("Insane cfg\n");
2616                 RETURN(rc);
2617         }
2618
2619         if (lcfg->lcfg_command == LCFG_MARKER) {
2620                 marker = lustre_cfg_buf(lcfg, 1);
2621
2622                 if (marker->cm_flags & CM_START &&
2623                     marker->cm_flags & CM_SKIP)
2624                         msrd->msrd_skip = 1;
2625                 if (marker->cm_flags & CM_END)
2626                         msrd->msrd_skip = 0;
2627
2628                 RETURN(0);
2629         }
2630
2631         if (msrd->msrd_skip)
2632                 RETURN(0);
2633
2634         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
2635                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
2636                 RETURN(0);
2637         }
2638
2639         svname = lustre_cfg_string(lcfg, 0);
2640         if (svname == NULL) {
2641                 CERROR("svname is empty\n");
2642                 RETURN(0);
2643         }
2644
2645         param = lustre_cfg_string(lcfg, 1);
2646         if (param == NULL) {
2647                 CERROR("param is empty\n");
2648                 RETURN(0);
2649         }
2650
2651         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
2652         if (rc)
2653                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
2654
2655         RETURN(0);
2656 }
2657
2658 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
2659                                 struct mgs_device *mgs,
2660                                 struct fs_db *fsdb)
2661 {
2662         struct llog_handle        *llh = NULL;
2663         struct llog_ctxt          *ctxt;
2664         char                      *logname;
2665         struct mgs_srpc_read_data  msrd;
2666         int                        rc;
2667         ENTRY;
2668
2669         /* construct log name */
2670         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
2671         if (rc)
2672                 RETURN(rc);
2673
2674         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2675         LASSERT(ctxt != NULL);
2676
2677         if (mgs_log_is_empty(env, mgs, logname))
2678                 GOTO(out, rc = 0);
2679
2680         rc = llog_open(env, ctxt, &llh, NULL, logname,
2681                        LLOG_OPEN_EXISTS);
2682         if (rc < 0) {
2683                 if (rc == -ENOENT)
2684                         rc = 0;
2685                 GOTO(out, rc);
2686         }
2687
2688         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
2689         if (rc)
2690                 GOTO(out_close, rc);
2691
2692         if (llog_get_size(llh) <= 1)
2693                 GOTO(out_close, rc = 0);
2694
2695         msrd.msrd_fsdb = fsdb;
2696         msrd.msrd_skip = 0;
2697
2698         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
2699                           NULL);
2700
2701 out_close:
2702         llog_close(env, llh);
2703 out:
2704         llog_ctxt_put(ctxt);
2705         name_destroy(&logname);
2706
2707         if (rc)
2708                 CERROR("failed to read sptlrpc config database: %d\n", rc);
2709         RETURN(rc);
2710 }
2711
2712 /* Permanent settings of all parameters by writing into the appropriate
2713  * configuration logs.
2714  * A parameter with null value ("<param>='\0'") means to erase it out of
2715  * the logs.
2716  */
2717 static int mgs_write_log_param(const struct lu_env *env,
2718                                struct mgs_device *mgs, struct fs_db *fsdb,
2719                                struct mgs_target_info *mti, char *ptr)
2720 {
2721         struct mgs_thread_info *mgi = mgs_env_info(env);
2722         char *logname;
2723         char *tmp;
2724         int rc = 0, rc2 = 0;
2725         ENTRY;
2726
2727         /* For various parameter settings, we have to figure out which logs
2728            care about them (e.g. both mdt and client for lov settings) */
2729         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2730
2731         /* The params are stored in MOUNT_DATA_FILE and modified via
2732            tunefs.lustre, or set using lctl conf_param */
2733
2734         /* Processed in lustre_start_mgc */
2735         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
2736                 GOTO(end, rc);
2737
2738         /* Processed in ost/mdt */
2739         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
2740                 GOTO(end, rc);
2741
2742         /* Processed in mgs_write_log_ost */
2743         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
2744                 if (mti->mti_flags & LDD_F_PARAM) {
2745                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
2746                                            "changed with tunefs.lustre"
2747                                            "and --writeconf\n", ptr);
2748                         rc = -EPERM;
2749                 }
2750                 GOTO(end, rc);
2751         }
2752
2753         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
2754                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
2755                 GOTO(end, rc);
2756         }
2757
2758         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
2759                 /* Add a failover nidlist */
2760                 rc = 0;
2761                 /* We already processed failovers params for new
2762                    targets in mgs_write_log_target */
2763                 if (mti->mti_flags & LDD_F_PARAM) {
2764                         CDEBUG(D_MGS, "Adding failnode\n");
2765                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
2766                 }
2767                 GOTO(end, rc);
2768         }
2769
2770         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
2771                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
2772                 GOTO(end, rc);
2773         }
2774
2775         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
2776                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
2777                 GOTO(end, rc);
2778         }
2779
2780         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
2781                 /* active=0 means off, anything else means on */
2782                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
2783                 int i;
2784
2785                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2786                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
2787                                            "be (de)activated.\n",
2788                                            mti->mti_svname);
2789                         GOTO(end, rc = -EINVAL);
2790                 }
2791                 LCONSOLE_WARN("Permanently %sactivating %s\n",
2792                               flag ? "de": "re", mti->mti_svname);
2793                 /* Modify clilov */
2794                 rc = name_create(&logname, mti->mti_fsname, "-client");
2795                 if (rc)
2796                         GOTO(end, rc);
2797                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2798                                 mti->mti_svname, "add osc", flag);
2799                 name_destroy(&logname);
2800                 if (rc)
2801                         goto active_err;
2802                 /* Modify mdtlov */
2803                 /* Add to all MDT logs for CMD */
2804                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2805                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2806                                 continue;
2807                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2808                         if (rc)
2809                                 GOTO(end, rc);
2810                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
2811                                         mti->mti_svname, "add osc", flag);
2812                         name_destroy(&logname);
2813                         if (rc)
2814                                 goto active_err;
2815                 }
2816         active_err:
2817                 if (rc) {
2818                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
2819                                            "log (%d). No permanent "
2820                                            "changes were made to the "
2821                                            "config log.\n",
2822                                            mti->mti_svname, rc);
2823                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
2824                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
2825                                                    " because the log"
2826                                                    "is in the old 1.4"
2827                                                    "style. Consider "
2828                                                    " --writeconf to "
2829                                                    "update the logs.\n");
2830                         GOTO(end, rc);
2831                 }
2832                 /* Fall through to osc proc for deactivating live OSC
2833                    on running MDT / clients. */
2834         }
2835         /* Below here, let obd's XXX_process_config methods handle it */
2836
2837         /* All lov. in proc */
2838         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
2839                 char *mdtlovname;
2840
2841                 CDEBUG(D_MGS, "lov param %s\n", ptr);
2842                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
2843                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
2844                                            "set on the MDT, not %s. "
2845                                            "Ignoring.\n",
2846                                            mti->mti_svname);
2847                         GOTO(end, rc = 0);
2848                 }
2849
2850                 /* Modify mdtlov */
2851                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2852                         GOTO(end, rc = -ENODEV);
2853
2854                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
2855                                              mti->mti_stripe_index);
2856                 if (rc)
2857                         GOTO(end, rc);
2858                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2859                                   &mgi->mgi_bufs, mdtlovname, ptr);
2860                 name_destroy(&logname);
2861                 name_destroy(&mdtlovname);
2862                 if (rc)
2863                         GOTO(end, rc);
2864
2865                 /* Modify clilov */
2866                 rc = name_create(&logname, mti->mti_fsname, "-client");
2867                 if (rc)
2868                         GOTO(end, rc);
2869                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2870                                   fsdb->fsdb_clilov, ptr);
2871                 name_destroy(&logname);
2872                 GOTO(end, rc);
2873         }
2874
2875         /* All osc., mdc., llite. params in proc */
2876         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
2877             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
2878             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
2879                 char *cname;
2880
2881                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2882                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
2883                                            " cannot be modified. Consider"
2884                                            " updating the configuration with"
2885                                            " --writeconf\n",
2886                                            mti->mti_svname);
2887                         GOTO(end, rc = -EINVAL);
2888                 }
2889                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
2890                         rc = name_create(&cname, mti->mti_fsname, "-client");
2891                         /* Add the client type to match the obdname in
2892                            class_config_llog_handler */
2893                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2894                         rc = name_create(&cname, mti->mti_svname, "-mdc");
2895                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2896                         rc = name_create(&cname, mti->mti_svname, "-osc");
2897                 } else {
2898                         GOTO(end, rc = -EINVAL);
2899                 }
2900                 if (rc)
2901                         GOTO(end, rc);
2902
2903                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2904
2905                 /* Modify client */
2906                 rc = name_create(&logname, mti->mti_fsname, "-client");
2907                 if (rc) {
2908                         name_destroy(&cname);
2909                         GOTO(end, rc);
2910                 }
2911                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2912                                   cname, ptr);
2913
2914                 /* osc params affect the MDT as well */
2915                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2916                         int i;
2917
2918                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2919                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2920                                         continue;
2921                                 name_destroy(&cname);
2922                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
2923                                                          fsdb, i);
2924                                 name_destroy(&logname);
2925                                 if (rc)
2926                                         break;
2927                                 rc = name_create_mdt(&logname,
2928                                                      mti->mti_fsname, i);
2929                                 if (rc)
2930                                         break;
2931                                 if (!mgs_log_is_empty(env, mgs, logname)) {
2932                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
2933                                                           mti, logname,
2934                                                           &mgi->mgi_bufs,
2935                                                           cname, ptr);
2936                                         if (rc)
2937                                                 break;
2938                                 }
2939                         }
2940                 }
2941                 name_destroy(&logname);
2942                 name_destroy(&cname);
2943                 GOTO(end, rc);
2944         }
2945
2946         /* All mdt. params in proc */
2947         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
2948                 int i;
2949                 __u32 idx;
2950
2951                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2952                 if (strncmp(mti->mti_svname, mti->mti_fsname,
2953                             MTI_NAME_MAXLEN) == 0)
2954                         /* device is unspecified completely? */
2955                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
2956                 else
2957                         rc = server_name2index(mti->mti_svname, &idx, NULL);
2958                 if (rc < 0)
2959                         goto active_err;
2960                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
2961                         goto active_err;
2962                 if (rc & LDD_F_SV_ALL) {
2963                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2964                                 if (!test_bit(i,
2965                                                   fsdb->fsdb_mdt_index_map))
2966                                         continue;
2967                                 rc = name_create_mdt(&logname,
2968                                                 mti->mti_fsname, i);
2969                                 if (rc)
2970                                         goto active_err;
2971                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2972                                                   logname, &mgi->mgi_bufs,
2973                                                   logname, ptr);
2974                                 name_destroy(&logname);
2975                                 if (rc)
2976                                         goto active_err;
2977                         }
2978                 } else {
2979                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2980                                           mti->mti_svname, &mgi->mgi_bufs,
2981                                           mti->mti_svname, ptr);
2982                         if (rc)
2983                                 goto active_err;
2984                 }
2985                 GOTO(end, rc);
2986         }
2987
2988         /* All mdd., ost. params in proc */
2989         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
2990             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
2991                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2992                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2993                         GOTO(end, rc = -ENODEV);
2994
2995                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2996                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
2997                 GOTO(end, rc);
2998         }
2999
3000         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3001         rc2 = -ENOSYS;
3002
3003 end:
3004         if (rc)
3005                 CERROR("err %d on param '%s'\n", rc, ptr);
3006
3007         RETURN(rc ?: rc2);
3008 }
3009
3010 /* Not implementing automatic failover nid addition at this time. */
3011 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3012                       struct mgs_target_info *mti)
3013 {
3014 #if 0
3015         struct fs_db *fsdb;
3016         int rc;
3017         ENTRY;
3018
3019         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3020         if (rc)
3021                 RETURN(rc);
3022
3023         if (mgs_log_is_empty(obd, mti->mti_svname))
3024                 /* should never happen */
3025                 RETURN(-ENOENT);
3026
3027         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3028
3029         /* FIXME We can just check mti->params to see if we're already in
3030            the failover list.  Modify mti->params for rewriting back at
3031            server_register_target(). */
3032
3033         mutex_lock(&fsdb->fsdb_mutex);
3034         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3035         mutex_unlock(&fsdb->fsdb_mutex);
3036
3037         RETURN(rc);
3038 #endif
3039         return 0;
3040 }
3041
3042 int mgs_write_log_target(const struct lu_env *env,
3043                          struct mgs_device *mgs,
3044                          struct mgs_target_info *mti,
3045                          struct fs_db *fsdb)
3046 {
3047         int rc = -EINVAL;
3048         char *buf, *params;
3049         ENTRY;
3050
3051         /* set/check the new target index */
3052         rc = mgs_set_index(env, mgs, mti);
3053         if (rc < 0) {
3054                 CERROR("Can't get index (%d)\n", rc);
3055                 RETURN(rc);
3056         }
3057
3058         if (rc == EALREADY) {
3059                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3060                               mti->mti_stripe_index, mti->mti_svname);
3061                 /* We would like to mark old log sections as invalid
3062                    and add new log sections in the client and mdt logs.
3063                    But if we add new sections, then live clients will
3064                    get repeat setup instructions for already running
3065                    osc's. So don't update the client/mdt logs. */
3066                 mti->mti_flags &= ~LDD_F_UPDATE;
3067         }
3068
3069         mutex_lock(&fsdb->fsdb_mutex);
3070
3071         if (mti->mti_flags &
3072             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3073                 /* Generate a log from scratch */
3074                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3075                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3076                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3077                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3078                 } else {
3079                         CERROR("Unknown target type %#x, can't create log for "
3080                                "%s\n", mti->mti_flags, mti->mti_svname);
3081                 }
3082                 if (rc) {
3083                         CERROR("Can't write logs for %s (%d)\n",
3084                                mti->mti_svname, rc);
3085                         GOTO(out_up, rc);
3086                 }
3087         } else {
3088                 /* Just update the params from tunefs in mgs_write_log_params */
3089                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3090                 mti->mti_flags |= LDD_F_PARAM;
3091         }
3092
3093         /* allocate temporary buffer, where class_get_next_param will
3094            make copy of a current  parameter */
3095         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3096         if (buf == NULL)
3097                 GOTO(out_up, rc = -ENOMEM);
3098         params = mti->mti_params;
3099         while (params != NULL) {
3100                 rc = class_get_next_param(&params, buf);
3101                 if (rc) {
3102                         if (rc == 1)
3103                                 /* there is no next parameter, that is
3104                                    not an error */
3105                                 rc = 0;
3106                         break;
3107                 }
3108                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3109                        params, buf);
3110                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3111                 if (rc)
3112                         break;
3113         }
3114
3115         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3116
3117 out_up:
3118         mutex_unlock(&fsdb->fsdb_mutex);
3119         RETURN(rc);
3120 }
3121
3122 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3123 {
3124         struct llog_ctxt        *ctxt;
3125         int                      rc = 0;
3126
3127         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3128         if (ctxt == NULL) {
3129                 CERROR("%s: MGS config context doesn't exist\n",
3130                        mgs->mgs_obd->obd_name);
3131                 rc = -ENODEV;
3132         } else {
3133                 rc = llog_erase(env, ctxt, NULL, name);
3134                 /* llog may not exist */
3135                 if (rc == -ENOENT)
3136                         rc = 0;
3137                 llog_ctxt_put(ctxt);
3138         }
3139
3140         if (rc)
3141                 CERROR("%s: failed to clear log %s: %d\n",
3142                        mgs->mgs_obd->obd_name, name, rc);
3143
3144         return rc;
3145 }
3146
3147 /* erase all logs for the given fs */
3148 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3149 {
3150         struct fs_db *fsdb;
3151         cfs_list_t list;
3152         struct mgs_direntry *dirent, *n;
3153         int rc, len = strlen(fsname);
3154         char *suffix;
3155         ENTRY;
3156
3157         /* Find all the logs in the CONFIGS directory */
3158         rc = class_dentry_readdir(env, mgs, &list);
3159         if (rc)
3160                 RETURN(rc);
3161
3162         mutex_lock(&mgs->mgs_mutex);
3163
3164         /* Delete the fs db */
3165         fsdb = mgs_find_fsdb(mgs, fsname);
3166         if (fsdb)
3167                 mgs_free_fsdb(mgs, fsdb);
3168
3169         mutex_unlock(&mgs->mgs_mutex);
3170
3171         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3172                 cfs_list_del(&dirent->list);
3173                 suffix = strrchr(dirent->name, '-');
3174                 if (suffix != NULL) {
3175                         if ((len == suffix - dirent->name) &&
3176                             (strncmp(fsname, dirent->name, len) == 0)) {
3177                                 CDEBUG(D_MGS, "Removing log %s\n",
3178                                        dirent->name);
3179                                 mgs_erase_log(env, mgs, dirent->name);
3180                         }
3181                 }
3182                 mgs_direntry_free(dirent);
3183         }
3184
3185         RETURN(rc);
3186 }
3187
3188 /* from llog_swab */
3189 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3190 {
3191         int i;
3192         ENTRY;
3193
3194         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3195         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3196
3197         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3198         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3199         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3200         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3201
3202         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3203         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3204                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3205                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3206                                i, lcfg->lcfg_buflens[i],
3207                                lustre_cfg_string(lcfg, i));
3208                 }
3209         EXIT;
3210 }
3211
3212 /* Set a permanent (config log) param for a target or fs
3213  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3214  *             buf1 contains the single parameter
3215  */
3216 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3217                  struct lustre_cfg *lcfg, char *fsname)
3218 {
3219         struct fs_db *fsdb;
3220         struct mgs_target_info *mti;
3221         char *devname, *param;
3222         char *ptr, *tmp;
3223         __u32 index;
3224         int rc = 0;
3225         ENTRY;
3226
3227         print_lustre_cfg(lcfg);
3228
3229         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3230         devname = lustre_cfg_string(lcfg, 0);
3231         param = lustre_cfg_string(lcfg, 1);
3232         if (!devname) {
3233                 /* Assume device name embedded in param:
3234                    lustre-OST0000.osc.max_dirty_mb=32 */
3235                 ptr = strchr(param, '.');
3236                 if (ptr) {
3237                         devname = param;
3238                         *ptr = 0;
3239                         param = ptr + 1;
3240                 }
3241         }
3242         if (!devname) {
3243                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3244                 RETURN(-ENOSYS);
3245         }
3246
3247         /* Extract fsname */
3248         ptr = strrchr(devname, '-');
3249         memset(fsname, 0, MTI_NAME_MAXLEN);
3250         if (ptr && (server_name2index(ptr, &index, NULL) >= 0)) {
3251                 /* param related to llite isn't allowed to set by OST or MDT */
3252                 if (strncmp(param, PARAM_LLITE, sizeof(PARAM_LLITE)) == 0)
3253                         RETURN(-EINVAL);
3254
3255                 strncpy(fsname, devname, ptr - devname);
3256         } else {
3257                 /* assume devname is the fsname */
3258                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3259         }
3260         fsname[MTI_NAME_MAXLEN - 1] = 0;
3261         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3262
3263         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3264         if (rc)
3265                 RETURN(rc);
3266         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3267             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3268                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3269                        "is '%s'\n", fsname, devname);
3270                 mgs_free_fsdb(mgs, fsdb);
3271                 RETURN(-EINVAL);
3272         }
3273
3274         /* Create a fake mti to hold everything */
3275         OBD_ALLOC_PTR(mti);
3276         if (!mti)
3277                 GOTO(out, rc = -ENOMEM);
3278         strncpy(mti->mti_fsname, fsname, MTI_NAME_MAXLEN);
3279         strncpy(mti->mti_svname, devname, MTI_NAME_MAXLEN);
3280         strncpy(mti->mti_params, param, sizeof(mti->mti_params));
3281         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3282         if (rc < 0)
3283                 /* Not a valid server; may be only fsname */
3284                 rc = 0;
3285         else
3286                 /* Strip -osc or -mdc suffix from svname */
3287                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3288                                      mti->mti_svname))
3289                         GOTO(out, rc = -EINVAL);
3290
3291         mti->mti_flags = rc | LDD_F_PARAM;
3292
3293         mutex_lock(&fsdb->fsdb_mutex);
3294         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3295         mutex_unlock(&fsdb->fsdb_mutex);
3296
3297         /*
3298          * Revoke lock so everyone updates.  Should be alright if
3299          * someone was already reading while we were updating the logs,
3300          * so we don't really need to hold the lock while we're
3301          * writing (above).
3302          */
3303         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3304 out:
3305         OBD_FREE_PTR(mti);
3306         RETURN(rc);
3307 }
3308
3309 static int mgs_write_log_pool(const struct lu_env *env,
3310                               struct mgs_device *mgs, char *logname,
3311                               struct fs_db *fsdb, char *lovname,
3312                               enum lcfg_command_type cmd,
3313                               char *poolname, char *fsname,
3314                               char *ostname, char *comment)
3315 {
3316         struct llog_handle *llh = NULL;
3317         int rc;
3318
3319         rc = record_start_log(env, mgs, &llh, logname);
3320         if (rc)
3321                 return rc;
3322         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3323         if (rc)
3324                 goto out;
3325         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3326         if (rc)
3327                 goto out;
3328         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3329 out:
3330         record_end_log(env, &llh);
3331         return rc;
3332 }
3333
3334 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3335                  enum lcfg_command_type cmd, char *fsname,
3336                  char *poolname, char *ostname)
3337 {
3338         struct fs_db *fsdb;
3339         char *lovname;
3340         char *logname;
3341         char *label = NULL, *canceled_label = NULL;
3342         int label_sz;
3343         struct mgs_target_info *mti = NULL;
3344         int rc, i;
3345         ENTRY;
3346
3347         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3348         if (rc) {
3349                 CERROR("Can't get db for %s\n", fsname);
3350                 RETURN(rc);
3351         }
3352         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3353                 CERROR("%s is not defined\n", fsname);
3354                 mgs_free_fsdb(mgs, fsdb);
3355                 RETURN(-EINVAL);
3356         }
3357
3358         label_sz = 10 + strlen(fsname) + strlen(poolname);
3359
3360         /* check if ostname match fsname */
3361         if (ostname != NULL) {
3362                 char *ptr;
3363
3364                 ptr = strrchr(ostname, '-');
3365                 if ((ptr == NULL) ||
3366                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3367                         RETURN(-EINVAL);
3368                 label_sz += strlen(ostname);
3369         }
3370
3371         OBD_ALLOC(label, label_sz);
3372         if (label == NULL)
3373                 RETURN(-ENOMEM);
3374
3375         switch(cmd) {
3376         case LCFG_POOL_NEW:
3377                 sprintf(label,
3378                         "new %s.%s", fsname, poolname);
3379                 break;
3380         case LCFG_POOL_ADD:
3381                 sprintf(label,
3382                         "add %s.%s.%s", fsname, poolname, ostname);
3383                 break;
3384         case LCFG_POOL_REM:
3385                 OBD_ALLOC(canceled_label, label_sz);
3386                 if (canceled_label == NULL)
3387                         GOTO(out_label, rc = -ENOMEM);
3388                 sprintf(label,
3389                         "rem %s.%s.%s", fsname, poolname, ostname);
3390                 sprintf(canceled_label,
3391                         "add %s.%s.%s", fsname, poolname, ostname);
3392                 break;
3393         case LCFG_POOL_DEL:
3394                 OBD_ALLOC(canceled_label, label_sz);
3395                 if (canceled_label == NULL)
3396                         GOTO(out_label, rc = -ENOMEM);
3397                 sprintf(label,
3398                         "del %s.%s", fsname, poolname);
3399                 sprintf(canceled_label,
3400                         "new %s.%s", fsname, poolname);
3401                 break;
3402         default:
3403                 break;
3404         }
3405
3406         mutex_lock(&fsdb->fsdb_mutex);
3407
3408         if (canceled_label != NULL) {
3409                 OBD_ALLOC_PTR(mti);
3410                 if (mti == NULL)
3411                         GOTO(out_cancel, rc = -ENOMEM);
3412         }
3413
3414         /* write pool def to all MDT logs */
3415         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3416                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3417                         rc = name_create_mdt_and_lov(&logname, &lovname,
3418                                                      fsdb, i);
3419                         if (rc) {
3420                                 mutex_unlock(&fsdb->fsdb_mutex);
3421                                 GOTO(out_mti, rc);
3422                         }
3423                         if (canceled_label != NULL) {
3424                                 strcpy(mti->mti_svname, "lov pool");
3425                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3426                                                 lovname, canceled_label,
3427                                                 CM_SKIP);
3428                         }
3429
3430                         if (rc >= 0)
3431                                 rc = mgs_write_log_pool(env, mgs, logname,
3432                                                         fsdb, lovname, cmd,
3433                                                         fsname, poolname,
3434                                                         ostname, label);
3435                         name_destroy(&logname);
3436                         name_destroy(&lovname);
3437                         if (rc) {
3438                                 mutex_unlock(&fsdb->fsdb_mutex);
3439                                 GOTO(out_mti, rc);
3440                         }
3441                 }
3442         }
3443
3444         rc = name_create(&logname, fsname, "-client");
3445         if (rc) {
3446                 mutex_unlock(&fsdb->fsdb_mutex);
3447                 GOTO(out_mti, rc);
3448         }
3449         if (canceled_label != NULL) {
3450                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3451                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
3452                 if (rc < 0) {
3453                         mutex_unlock(&fsdb->fsdb_mutex);
3454                         name_destroy(&logname);
3455                         GOTO(out_mti, rc);
3456                 }
3457         }
3458
3459         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
3460                                 cmd, fsname, poolname, ostname, label);
3461         mutex_unlock(&fsdb->fsdb_mutex);
3462         name_destroy(&logname);
3463         /* request for update */
3464         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3465
3466         EXIT;
3467 out_mti:
3468         if (mti != NULL)
3469                 OBD_FREE_PTR(mti);
3470 out_cancel:
3471         if (canceled_label != NULL)
3472                 OBD_FREE(canceled_label, label_sz);
3473 out_label:
3474         OBD_FREE(label, label_sz);
3475         return rc;
3476 }
3477
3478 #if 0
3479 /******************** unused *********************/
3480 static int mgs_backup_llog(struct obd_device *obd, char* fsname)
3481 {
3482         struct file *filp, *bak_filp;
3483         struct lvfs_run_ctxt saved;
3484         char *logname, *buf;
3485         loff_t soff = 0 , doff = 0;
3486         int count = 4096, len;
3487         int rc = 0;
3488
3489         OBD_ALLOC(logname, PATH_MAX);
3490         if (logname == NULL)
3491                 return -ENOMEM;
3492
3493         OBD_ALLOC(buf, count);
3494         if (!buf)
3495                 GOTO(out , rc = -ENOMEM);
3496
3497         len = snprintf(logname, PATH_MAX, "%s/%s.bak",
3498                        MOUNT_CONFIGS_DIR, fsname);
3499
3500         if (len >= PATH_MAX - 1) {
3501                 GOTO(out, -ENAMETOOLONG);
3502         }
3503
3504         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3505
3506         bak_filp = l_filp_open(logname, O_RDWR|O_CREAT|O_TRUNC, 0660);
3507         if (IS_ERR(bak_filp)) {
3508                 rc = PTR_ERR(bak_filp);
3509                 CERROR("backup logfile open %s: %d\n", logname, rc);
3510                 GOTO(pop, rc);
3511         }
3512         sprintf(logname, "%s/%s", MOUNT_CONFIGS_DIR, fsname);
3513         filp = l_filp_open(logname, O_RDONLY, 0);
3514         if (IS_ERR(filp)) {
3515                 rc = PTR_ERR(filp);
3516                 CERROR("logfile open %s: %d\n", logname, rc);
3517                 GOTO(close1f, rc);
3518         }
3519
3520         while ((rc = lustre_fread(filp, buf, count, &soff)) > 0) {
3521                 rc = lustre_fwrite(bak_filp, buf, count, &doff);
3522                 break;
3523         }
3524
3525         filp_close(filp, 0);
3526 close1f:
3527         filp_close(bak_filp, 0);
3528 pop:
3529         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3530 out:
3531         if (buf)
3532                 OBD_FREE(buf, count);
3533         OBD_FREE(logname, PATH_MAX);
3534         return rc;
3535 }
3536
3537 #endif