Whamcloud - gitweb
LU-1302 llog: modify llog_write/llog_add to support OSD
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lquota.h>
53 #include <lustre_log.h>
54
55 #include "mgs_internal.h"
56
57 /********************** Class functions ********************/
58
59 /* Caller must list_del and mgs_dirent_free() each dentry from the list */
60 int class_dentry_readdir(const struct lu_env *env,
61                          struct mgs_device *mgs, cfs_list_t *list)
62 {
63         struct dt_object    *dir = mgs->mgs_configs_dir;
64         const struct dt_it_ops *iops;
65         struct dt_it        *it;
66         struct mgs_direntry *de;
67         char                *key;
68         int                  rc, key_sz;
69
70         CFS_INIT_LIST_HEAD(list);
71
72         if (!dt_try_as_dir(env, dir))
73                 GOTO(out, rc = -ENOTDIR);
74
75         LASSERT(dir);
76         LASSERT(dir->do_index_ops);
77
78         iops = &dir->do_index_ops->dio_it;
79         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
80         if (IS_ERR(it))
81                 RETURN(PTR_ERR(it));
82
83         rc = iops->load(env, it, 0);
84         if (rc <= 0)
85                 GOTO(fini, rc = 0);
86
87         /* main cycle */
88         do {
89                 key = (void *)iops->key(env, it);
90                 if (IS_ERR(key)) {
91                         CERROR("%s: key failed when listing %s: rc = %d\n",
92                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
93                                (int) PTR_ERR(key));
94                         goto next;
95                 }
96                 key_sz = iops->key_size(env, it);
97                 LASSERT(key_sz > 0);
98
99                 /* filter out "." and ".." entries */
100                 if (key[0] == '.') {
101                         if (key_sz == 1)
102                                 goto next;
103                         if (key_sz == 2 && key[1] == '.')
104                                 goto next;
105                 }
106
107                 de = mgs_direntry_alloc(key_sz + 1);
108                 if (de == NULL) {
109                         rc = -ENOMEM;
110                         break;
111                 }
112
113                 memcpy(de->name, key, key_sz);
114                 de->name[key_sz] = 0;
115
116                 cfs_list_add(&de->list, list);
117
118 next:
119                 rc = iops->next(env, it);
120         } while (rc == 0);
121         rc = 0;
122
123         iops->put(env, it);
124
125 fini:
126         iops->fini(env, it);
127 out:
128         if (rc)
129                 CERROR("%s: key failed when listing %s: rc = %d\n",
130                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
131         RETURN(rc);
132 }
133
134 /******************** DB functions *********************/
135
136 static inline int name_create(char **newname, char *prefix, char *suffix)
137 {
138         LASSERT(newname);
139         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
140         if (!*newname)
141                 return -ENOMEM;
142         sprintf(*newname, "%s%s", prefix, suffix);
143         return 0;
144 }
145
146 static inline void name_destroy(char **name)
147 {
148         if (*name)
149                 OBD_FREE(*name, strlen(*name) + 1);
150         *name = NULL;
151 }
152
153 struct mgs_fsdb_handler_data
154 {
155         struct fs_db   *fsdb;
156         __u32           ver;
157 };
158
159 /* from the (client) config log, figure out:
160         1. which ost's/mdt's are configured (by index)
161         2. what the last config step is
162         3. COMPAT_18 osc name
163 */
164 /* It might be better to have a separate db file, instead of parsing the info
165    out of the client log.  This is slow and potentially error-prone. */
166 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
167                             struct llog_rec_hdr *rec, void *data)
168 {
169         struct mgs_fsdb_handler_data *d = data;
170         struct fs_db *fsdb = d->fsdb;
171         int cfg_len = rec->lrh_len;
172         char *cfg_buf = (char*) (rec + 1);
173         struct lustre_cfg *lcfg;
174         __u32 index;
175         int rc = 0;
176         ENTRY;
177
178         if (rec->lrh_type != OBD_CFG_REC) {
179                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
180                 RETURN(-EINVAL);
181         }
182
183         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
184         if (rc) {
185                 CERROR("Insane cfg\n");
186                 RETURN(rc);
187         }
188
189         lcfg = (struct lustre_cfg *)cfg_buf;
190
191         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
192                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
193
194         /* Figure out ost indicies */
195         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
196         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
197             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
198                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
199                                        NULL, 10);
200                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
201                        lustre_cfg_string(lcfg, 1), index,
202                        lustre_cfg_string(lcfg, 2));
203                 cfs_set_bit(index, fsdb->fsdb_ost_index_map);
204         }
205
206         /* Figure out mdt indicies */
207         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
208         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
209             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
210                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
211                                        &index, NULL);
212                 if (rc != LDD_F_SV_TYPE_MDT) {
213                         CWARN("Unparsable MDC name %s, assuming index 0\n",
214                               lustre_cfg_string(lcfg, 0));
215                         index = 0;
216                 }
217                 rc = 0;
218                 CDEBUG(D_MGS, "MDT index is %u\n", index);
219                 cfs_set_bit(index, fsdb->fsdb_mdt_index_map);
220                 fsdb->fsdb_mdt_count ++;
221         }
222
223         /**
224          * figure out the old config. fsdb_gen = 0 means old log
225          * It is obsoleted and not supported anymore
226          */
227         if (fsdb->fsdb_gen == 0) {
228                 CERROR("Old config format is not supported\n");
229                 RETURN(-EINVAL);
230         }
231
232         /*
233          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
234          */
235         if (!cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
236             lcfg->lcfg_command == LCFG_ATTACH &&
237             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
238                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
239                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
240                         CWARN("MDT using 1.8 OSC name scheme\n");
241                         cfs_set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
242                 }
243         }
244
245         if (lcfg->lcfg_command == LCFG_MARKER) {
246                 struct cfg_marker *marker;
247                 marker = lustre_cfg_buf(lcfg, 1);
248
249                 d->ver = marker->cm_vers;
250
251                 /* Keep track of the latest marker step */
252                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
253         }
254
255         RETURN(rc);
256 }
257
258 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
259 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
260                                   struct mgs_device *mgs,
261                                   struct fs_db *fsdb)
262 {
263         char *logname;
264         struct llog_handle *loghandle;
265         struct lvfs_run_ctxt saved;
266         struct llog_ctxt *ctxt;
267         struct mgs_fsdb_handler_data d = { fsdb, 0 };
268         int rc, rc2;
269         ENTRY;
270
271         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
272         LASSERT(ctxt != NULL);
273         rc = name_create(&logname, fsdb->fsdb_name, "-client");
274         if (rc)
275                 GOTO(out_put, rc);
276         cfs_mutex_lock(&fsdb->fsdb_mutex);
277         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
278         rc = llog_open_create(NULL, ctxt, &loghandle, NULL, logname);
279         if (rc)
280                 GOTO(out_pop, rc);
281
282         rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
283         if (rc)
284                 GOTO(out_close, rc);
285
286         if (llog_get_size(loghandle) <= 1)
287                 cfs_set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
288
289         rc = llog_process_or_fork(env, loghandle, mgs_fsdb_handler, (void *)&d,
290                                   NULL, false);
291         CDEBUG(D_INFO, "get_db = %d\n", rc);
292 out_close:
293         rc2 = llog_close(NULL, loghandle);
294         if (!rc)
295                 rc = rc2;
296 out_pop:
297         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
298         cfs_mutex_unlock(&fsdb->fsdb_mutex);
299         name_destroy(&logname);
300 out_put:
301         llog_ctxt_put(ctxt);
302
303         RETURN(rc);
304 }
305
306 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
307 {
308         struct mgs_tgt_srpc_conf *tgtconf;
309
310         /* free target-specific rules */
311         while (fsdb->fsdb_srpc_tgt) {
312                 tgtconf = fsdb->fsdb_srpc_tgt;
313                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
314
315                 LASSERT(tgtconf->mtsc_tgt);
316
317                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
318                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
319                 OBD_FREE_PTR(tgtconf);
320         }
321
322         /* free general rules */
323         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
324 }
325
326 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
327 {
328         struct fs_db *fsdb;
329         cfs_list_t *tmp;
330
331         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
332                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
333                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
334                         return fsdb;
335         }
336         return NULL;
337 }
338
339 /* caller must hold the mgs->mgs_fs_db_lock */
340 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
341                                   struct mgs_device *mgs, char *fsname)
342 {
343         struct fs_db *fsdb;
344         int rc;
345         ENTRY;
346
347         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
348                 CERROR("fsname %s is too long\n", fsname);
349                 RETURN(NULL);
350         }
351
352         OBD_ALLOC_PTR(fsdb);
353         if (!fsdb)
354                 RETURN(NULL);
355
356         strcpy(fsdb->fsdb_name, fsname);
357         cfs_mutex_init(&fsdb->fsdb_mutex);
358         cfs_set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
359         fsdb->fsdb_gen = 1;
360
361         if (strcmp(fsname, MGSSELF_NAME) == 0) {
362                 cfs_set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
363         } else {
364                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
365                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
366                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
367                         CERROR("No memory for index maps\n");
368                         GOTO(err, rc = -ENOMEM);
369                 }
370
371                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
372                 if (rc)
373                         GOTO(err, rc);
374                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
375                 if (rc)
376                         GOTO(err, rc);
377
378                 /* initialise data for NID table */
379                 mgs_ir_init_fs(env, mgs, fsdb);
380
381                 lproc_mgs_add_live(mgs, fsdb);
382         }
383
384         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
385
386         RETURN(fsdb);
387 err:
388         if (fsdb->fsdb_ost_index_map)
389                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
390         if (fsdb->fsdb_mdt_index_map)
391                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
392         name_destroy(&fsdb->fsdb_clilov);
393         name_destroy(&fsdb->fsdb_clilmv);
394         OBD_FREE_PTR(fsdb);
395         RETURN(NULL);
396 }
397
398 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
399 {
400         /* wait for anyone with the sem */
401         cfs_mutex_lock(&fsdb->fsdb_mutex);
402         lproc_mgs_del_live(mgs, fsdb);
403         cfs_list_del(&fsdb->fsdb_list);
404
405         /* deinitialize fsr */
406         mgs_ir_fini_fs(mgs, fsdb);
407
408         if (fsdb->fsdb_ost_index_map)
409                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
410         if (fsdb->fsdb_mdt_index_map)
411                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
412         name_destroy(&fsdb->fsdb_clilov);
413         name_destroy(&fsdb->fsdb_clilmv);
414         mgs_free_fsdb_srpc(fsdb);
415         cfs_mutex_unlock(&fsdb->fsdb_mutex);
416         OBD_FREE_PTR(fsdb);
417 }
418
419 int mgs_init_fsdb_list(struct mgs_device *mgs)
420 {
421         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
422         return 0;
423 }
424
425 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
426 {
427         struct fs_db *fsdb;
428         cfs_list_t *tmp, *tmp2;
429         cfs_mutex_lock(&mgs->mgs_mutex);
430         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
431                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
432                 mgs_free_fsdb(mgs, fsdb);
433         }
434         cfs_mutex_unlock(&mgs->mgs_mutex);
435         return 0;
436 }
437
438 int mgs_find_or_make_fsdb(const struct lu_env *env,
439                           struct mgs_device *mgs, char *name,
440                           struct fs_db **dbh)
441 {
442         struct fs_db *fsdb;
443         int rc = 0;
444
445         cfs_mutex_lock(&mgs->mgs_mutex);
446         fsdb = mgs_find_fsdb(mgs, name);
447         if (fsdb) {
448                 cfs_mutex_unlock(&mgs->mgs_mutex);
449                 *dbh = fsdb;
450                 return 0;
451         }
452
453         CDEBUG(D_MGS, "Creating new db\n");
454         fsdb = mgs_new_fsdb(env, mgs, name);
455         cfs_mutex_unlock(&mgs->mgs_mutex);
456         if (!fsdb)
457                 return -ENOMEM;
458
459         if (!cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
460                 /* populate the db from the client llog */
461                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
462                 if (rc) {
463                         CERROR("Can't get db from client log %d\n", rc);
464                         mgs_free_fsdb(mgs, fsdb);
465                         return rc;
466                 }
467         }
468
469         /* populate srpc rules from params llog */
470         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
471         if (rc) {
472                 CERROR("Can't get db from params log %d\n", rc);
473                 mgs_free_fsdb(mgs, fsdb);
474                 return rc;
475         }
476
477         *dbh = fsdb;
478
479         return 0;
480 }
481
482 /* 1 = index in use
483    0 = index unused
484    -1= empty client log */
485 int mgs_check_index(const struct lu_env *env,
486                     struct mgs_device *mgs,
487                     struct mgs_target_info *mti)
488 {
489         struct fs_db *fsdb;
490         void *imap;
491         int rc = 0;
492         ENTRY;
493
494         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
495
496         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
497         if (rc) {
498                 CERROR("Can't get db for %s\n", mti->mti_fsname);
499                 RETURN(rc);
500         }
501
502         if (cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
503                 RETURN(-1);
504
505         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
506                 imap = fsdb->fsdb_ost_index_map;
507         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
508                 imap = fsdb->fsdb_mdt_index_map;
509         else
510                 RETURN(-EINVAL);
511
512         if (cfs_test_bit(mti->mti_stripe_index, imap))
513                 RETURN(1);
514         RETURN(0);
515 }
516
517 static __inline__ int next_index(void *index_map, int map_len)
518 {
519         int i;
520         for (i = 0; i < map_len * 8; i++)
521                  if (!cfs_test_bit(i, index_map)) {
522                          return i;
523                  }
524         CERROR("max index %d exceeded.\n", i);
525         return -1;
526 }
527
528 /* Return codes:
529         0  newly marked as in use
530         <0 err
531         +EALREADY for update of an old index */
532 static int mgs_set_index(const struct lu_env *env,
533                          struct mgs_device *mgs,
534                          struct mgs_target_info *mti)
535 {
536         struct fs_db *fsdb;
537         void *imap;
538         int rc = 0;
539         ENTRY;
540
541         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
542         if (rc) {
543                 CERROR("Can't get db for %s\n", mti->mti_fsname);
544                 RETURN(rc);
545         }
546
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551                 if (fsdb->fsdb_mdt_count >= MAX_MDT_COUNT) {
552                         LCONSOLE_ERROR_MSG(0x13f, "The max mdt count"
553                                            "is %d\n", (int)MAX_MDT_COUNT);
554                         RETURN(-ERANGE);
555                 }
556         } else {
557                 RETURN(-EINVAL);
558         }
559
560         if (mti->mti_flags & LDD_F_NEED_INDEX) {
561                 rc = next_index(imap, INDEX_MAP_SIZE);
562                 if (rc == -1)
563                         RETURN(-ERANGE);
564                 mti->mti_stripe_index = rc;
565                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
566                         fsdb->fsdb_mdt_count ++;
567         }
568
569         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
570                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
571                                    "but the max index is %d.\n",
572                                    mti->mti_svname, mti->mti_stripe_index,
573                                    INDEX_MAP_SIZE * 8);
574                 RETURN(-ERANGE);
575         }
576
577         if (cfs_test_bit(mti->mti_stripe_index, imap)) {
578                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
579                     !(mti->mti_flags & LDD_F_WRITECONF)) {
580                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
581                                            "%d, but that index is already in "
582                                            "use. Use --writeconf to force\n",
583                                            mti->mti_svname,
584                                            mti->mti_stripe_index);
585                         RETURN(-EADDRINUSE);
586                 } else {
587                         CDEBUG(D_MGS, "Server %s updating index %d\n",
588                                mti->mti_svname, mti->mti_stripe_index);
589                         RETURN(EALREADY);
590                 }
591         }
592
593         cfs_set_bit(mti->mti_stripe_index, imap);
594         cfs_clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
595         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
596                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
597
598         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
599                mti->mti_stripe_index);
600
601         RETURN(0);
602 }
603
604 struct mgs_modify_lookup {
605         struct cfg_marker mml_marker;
606         int               mml_modified;
607 };
608
609 static int mgs_modify_handler(const struct lu_env *env,
610                               struct llog_handle *llh,
611                               struct llog_rec_hdr *rec, void *data)
612 {
613         struct mgs_modify_lookup *mml = data;
614         struct cfg_marker *marker;
615         struct lustre_cfg *lcfg = (struct lustre_cfg *)(rec + 1);
616         int cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
617                 sizeof(struct llog_rec_tail);
618         int rc;
619         ENTRY;
620
621         if (rec->lrh_type != OBD_CFG_REC) {
622                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
623                 RETURN(-EINVAL);
624         }
625
626         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
627         if (rc) {
628                 CERROR("Insane cfg\n");
629                 RETURN(rc);
630         }
631
632         /* We only care about markers */
633         if (lcfg->lcfg_command != LCFG_MARKER)
634                 RETURN(0);
635
636         marker = lustre_cfg_buf(lcfg, 1);
637         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
638             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
639             !(marker->cm_flags & CM_SKIP)) {
640                 /* Found a non-skipped marker match */
641                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
642                        rec->lrh_index, marker->cm_step,
643                        marker->cm_flags, mml->mml_marker.cm_flags,
644                        marker->cm_tgtname, marker->cm_comment);
645                 /* Overwrite the old marker llog entry */
646                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
647                 marker->cm_flags |= mml->mml_marker.cm_flags;
648                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
649                 /* Header and tail are added back to lrh_len in
650                    llog_lvfs_write_rec */
651                 rec->lrh_len = cfg_len;
652                 rc = llog_write(NULL, llh, rec, NULL, 0, (void *)lcfg,
653                                 rec->lrh_index);
654                 if (!rc)
655                          mml->mml_modified++;
656         }
657
658         RETURN(rc);
659 }
660
661 /* Modify an existing config log record (for CM_SKIP or CM_EXCLUDE) */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct lvfs_run_ctxt saved;
668         struct llog_ctxt *ctxt;
669         struct mgs_modify_lookup *mml;
670         int rc, rc2;
671         ENTRY;
672
673         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
674                flags);
675
676         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
677         LASSERT(ctxt != NULL);
678         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
679         rc = llog_open(NULL, ctxt, &loghandle, NULL, logname,
680                        LLOG_OPEN_EXISTS);
681         if (rc < 0) {
682                 if (rc == -ENOENT)
683                         rc = 0;
684                 GOTO(out_pop, rc);
685         }
686
687         rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
688         if (rc)
689                 GOTO(out_close, rc);
690
691         if (llog_get_size(loghandle) <= 1)
692                 GOTO(out_close, rc = 0);
693
694         OBD_ALLOC_PTR(mml);
695         if (!mml)
696                 GOTO(out_close, rc = -ENOMEM);
697         strcpy(mml->mml_marker.cm_comment, comment);
698         strcpy(mml->mml_marker.cm_tgtname, devname);
699         /* Modify mostly means cancel */
700         mml->mml_marker.cm_flags = flags;
701         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
702         mml->mml_modified = 0;
703         rc = llog_process_or_fork(env, loghandle, mgs_modify_handler,
704                                   (void *)mml, NULL, false);
705         if (!rc && !mml->mml_modified)
706                 rc = 1;
707         OBD_FREE_PTR(mml);
708
709 out_close:
710         rc2 = llog_close(NULL, loghandle);
711         if (!rc)
712                 rc = rc2;
713 out_pop:
714         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
715         if (rc < 0)
716                 CERROR("modify %s/%s failed %d\n",
717                        mti->mti_svname, comment, rc);
718         llog_ctxt_put(ctxt);
719         RETURN(rc);
720 }
721
722 /******************** config log recording functions *********************/
723
724 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
725                          struct lustre_cfg *lcfg)
726 {
727         struct lvfs_run_ctxt   saved;
728         struct llog_rec_hdr    rec;
729         int buflen, rc;
730         struct obd_device *obd = llh->lgh_ctxt->loc_obd;
731
732         if (!lcfg || !llh)
733                 return -ENOMEM;
734
735         LASSERT(llh->lgh_ctxt);
736
737         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
738                                 lcfg->lcfg_buflens);
739         rec.lrh_len = llog_data_len(buflen);
740         rec.lrh_type = OBD_CFG_REC;
741
742         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
743         /* idx = -1 means append */
744         rc = llog_write(NULL, llh, &rec, NULL, 0, (void *)lcfg, -1);
745         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
746         if (rc)
747                 CERROR("failed %d\n", rc);
748         return rc;
749 }
750
751 static int record_base(const struct lu_env *env, struct llog_handle *llh,
752                      char *cfgname, lnet_nid_t nid, int cmd,
753                      char *s1, char *s2, char *s3, char *s4)
754 {
755         struct mgs_thread_info *mgi = mgs_env_info(env);
756         struct lustre_cfg     *lcfg;
757         int rc;
758
759         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
760                cmd, s1, s2, s3, s4);
761
762         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
763         if (s1)
764                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
765         if (s2)
766                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
767         if (s3)
768                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
769         if (s4)
770                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
771
772         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
773         if (!lcfg)
774                 return -ENOMEM;
775         lcfg->lcfg_nid = nid;
776
777         rc = record_lcfg(env, llh, lcfg);
778
779         lustre_cfg_free(lcfg);
780
781         if (rc) {
782                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
783                        cmd, s1, s2, s3, s4);
784         }
785         return rc;
786 }
787
788
789 static inline int record_add_uuid(const struct lu_env *env,
790                                   struct llog_handle *llh,
791                                   uint64_t nid, char *uuid)
792 {
793         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
794
795 }
796
797 static inline int record_add_conn(const struct lu_env *env,
798                                   struct llog_handle *llh,
799                                   char *devname, char *uuid)
800 {
801         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
802 }
803
804 static inline int record_attach(const struct lu_env *env,
805                                 struct llog_handle *llh, char *devname,
806                                 char *type, char *uuid)
807 {
808         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
809 }
810
811 static inline int record_setup(const struct lu_env *env,
812                                struct llog_handle *llh, char *devname,
813                                char *s1, char *s2, char *s3, char *s4)
814 {
815         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
816 }
817
818 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
819                             char *devname, struct lov_desc *desc)
820 {
821         struct mgs_thread_info *mgi = mgs_env_info(env);
822         struct lustre_cfg *lcfg;
823         int rc;
824
825         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
826         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
827         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
828         if (!lcfg)
829                 return -ENOMEM;
830         rc = record_lcfg(env, llh, lcfg);
831
832         lustre_cfg_free(lcfg);
833         return rc;
834 }
835
836 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
837                             char *devname, struct lmv_desc *desc)
838 {
839         struct mgs_thread_info *mgi = mgs_env_info(env);
840         struct lustre_cfg *lcfg;
841         int rc;
842
843         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
844         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
845         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
846
847         rc = record_lcfg(env, llh, lcfg);
848
849         lustre_cfg_free(lcfg);
850         return rc;
851 }
852
853 static inline int record_mdc_add(const struct lu_env *env,
854                                  struct llog_handle *llh,
855                                  char *logname, char *mdcuuid,
856                                  char *mdtuuid, char *index,
857                                  char *gen)
858 {
859         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
860                            mdtuuid,index,gen,mdcuuid);
861 }
862
863 static inline int record_lov_add(const struct lu_env *env,
864                                  struct llog_handle *llh,
865                                  char *lov_name, char *ost_uuid,
866                                  char *index, char *gen)
867 {
868         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
869                            ost_uuid,index,gen,0);
870 }
871
872 static inline int record_mount_opt(const struct lu_env *env,
873                                    struct llog_handle *llh,
874                                    char *profile, char *lov_name,
875                                    char *mdc_name)
876 {
877         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
878                            profile,lov_name,mdc_name,0);
879 }
880
881 static int record_marker(const struct lu_env *env,
882                          struct llog_handle *llh,
883                          struct fs_db *fsdb, __u32 flags,
884                          char *tgtname, char *comment)
885 {
886         struct mgs_thread_info *mgi = mgs_env_info(env);
887         struct lustre_cfg *lcfg;
888         int rc;
889
890         if (flags & CM_START)
891                 fsdb->fsdb_gen++;
892         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
893         mgi->mgi_marker.cm_flags = flags;
894         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
895         strncpy(mgi->mgi_marker.cm_tgtname, tgtname,
896                 sizeof(mgi->mgi_marker.cm_tgtname));
897         strncpy(mgi->mgi_marker.cm_comment, comment,
898                 sizeof(mgi->mgi_marker.cm_comment));
899         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
900         mgi->mgi_marker.cm_canceltime = 0;
901         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
902         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
903                             sizeof(mgi->mgi_marker));
904         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
905         if (!lcfg)
906                 return -ENOMEM;
907         rc = record_lcfg(env, llh, lcfg);
908
909         lustre_cfg_free(lcfg);
910         return rc;
911 }
912
913 static int record_start_log(const struct lu_env *env,
914                             struct mgs_device *mgs,
915                             struct llog_handle **llh, char *name)
916 {
917         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
918         struct lvfs_run_ctxt saved;
919         struct llog_ctxt *ctxt;
920         int rc = 0;
921
922         if (*llh)
923                 GOTO(out, rc = -EBUSY);
924
925         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
926         if (!ctxt)
927                 GOTO(out, rc = -ENODEV);
928         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
929
930         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
931         rc = llog_open_create(NULL, ctxt, llh, NULL, name);
932         if (rc)
933                 GOTO(out_ctxt, rc);
934         rc = llog_init_handle(NULL, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
935         if (rc) {
936                 llog_close(NULL, *llh);
937                 *llh = NULL;
938         }
939 out_ctxt:
940         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
941         llog_ctxt_put(ctxt);
942 out:
943         if (rc)
944                 CERROR("Can't start log %s: %d\n", name, rc);
945         RETURN(rc);
946 }
947
948 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
949 {
950         struct lvfs_run_ctxt saved;
951         struct obd_device *obd = (*llh)->lgh_ctxt->loc_obd;
952         int rc = 0;
953
954         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
955
956         rc = llog_close(NULL, *llh);
957         *llh = NULL;
958
959         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
960         RETURN(rc);
961 }
962
963 static int mgs_log_is_empty(const struct lu_env *env,
964                             struct mgs_device *mgs, char *name)
965 {
966         struct lvfs_run_ctxt saved;
967         struct llog_handle *llh;
968         struct llog_ctxt *ctxt;
969         int rc = 0;
970
971         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
972         LASSERT(ctxt != NULL);
973         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
974         rc = llog_open(NULL, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
975         if (rc < 0) {
976                 if (rc == -ENOENT)
977                         rc = 0;
978                 GOTO(out_ctxt, rc);
979         }
980
981         llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
982         if (rc)
983                 GOTO(out_close, rc);
984         rc = llog_get_size(llh);
985
986 out_close:
987         llog_close(NULL, llh);
988 out_ctxt:
989         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
990         llog_ctxt_put(ctxt);
991         /* header is record 1 */
992         return (rc <= 1);
993 }
994
995 /******************** config "macros" *********************/
996
997 /* write an lcfg directly into a log (with markers) */
998 static int mgs_write_log_direct(const struct lu_env *env,
999                                 struct mgs_device *mgs, struct fs_db *fsdb,
1000                                 char *logname, struct lustre_cfg *lcfg,
1001                                 char *devname, char *comment)
1002 {
1003         struct llog_handle *llh = NULL;
1004         int rc;
1005         ENTRY;
1006
1007         if (!lcfg)
1008                 RETURN(-ENOMEM);
1009
1010         rc = record_start_log(env, mgs, &llh, logname);
1011         if (rc)
1012                 RETURN(rc);
1013
1014         /* FIXME These should be a single journal transaction */
1015         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1016         if (rc)
1017                 GOTO(out_end, rc);
1018         rc = record_lcfg(env, llh, lcfg);
1019         if (rc)
1020                 GOTO(out_end, rc);
1021         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1022         if (rc)
1023                 GOTO(out_end, rc);
1024 out_end:
1025         record_end_log(env, &llh);
1026         RETURN(rc);
1027 }
1028
1029 /* write the lcfg in all logs for the given fs */
1030 int mgs_write_log_direct_all(const struct lu_env *env,
1031                              struct mgs_device *mgs,
1032                              struct fs_db *fsdb,
1033                              struct mgs_target_info *mti,
1034                              struct lustre_cfg *lcfg,
1035                              char *devname, char *comment,
1036                              int server_only)
1037 {
1038         cfs_list_t list;
1039         struct mgs_direntry *dirent, *n;
1040         char *fsname = mti->mti_fsname;
1041         char *logname;
1042         int rc = 0, len = strlen(fsname);
1043         ENTRY;
1044
1045         /* We need to set params for any future logs
1046            as well. FIXME Append this file to every new log.
1047            Actually, we should store as params (text), not llogs.  Or
1048            in a database. */
1049         rc = name_create(&logname, fsname, "-params");
1050         if (rc)
1051                 RETURN(rc);
1052         if (mgs_log_is_empty(env, mgs, logname)) {
1053                 struct llog_handle *llh = NULL;
1054                 rc = record_start_log(env, mgs, &llh, logname);
1055                 record_end_log(env, &llh);
1056         }
1057         name_destroy(&logname);
1058         if (rc)
1059                 RETURN(rc);
1060
1061         /* Find all the logs in the CONFIGS directory */
1062         rc = class_dentry_readdir(env, mgs, &list);
1063         if (rc)
1064                 RETURN(rc);
1065
1066         /* Could use fsdb index maps instead of directory listing */
1067         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1068                 cfs_list_del(&dirent->list);
1069                 /* don't write to sptlrpc rule log */
1070                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1071                         goto next;
1072
1073                 /* caller wants write server logs only */
1074                 if (server_only && strstr(dirent->name, "-client") != NULL)
1075                         goto next;
1076
1077                 if (strncmp(fsname, dirent->name, len) == 0) {
1078                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1079                         /* Erase any old settings of this same parameter */
1080                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1081                                         devname, comment, CM_SKIP);
1082                         if (rc < 0)
1083                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1084                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1085                         /* Write the new one */
1086                         if (lcfg) {
1087                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1088                                                           dirent->name,
1089                                                           lcfg, devname,
1090                                                           comment);
1091                                 if (rc)
1092                                         CERROR("%s: writing log %s: rc = %d\n",
1093                                                mgs->mgs_obd->obd_name,
1094                                                dirent->name, rc);
1095                         }
1096                 }
1097 next:
1098                 mgs_direntry_free(dirent);
1099         }
1100
1101         RETURN(rc);
1102 }
1103
1104 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1105                                     struct mgs_device *mgs,
1106                                     struct fs_db *fsdb,
1107                                     struct mgs_target_info *mti,
1108                                     char *logname);
1109 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1110                                     struct mgs_device *mgs,
1111                                     struct fs_db *fsdb,
1112                                     struct mgs_target_info *mti,
1113                                     char *logname, char *suffix, char *lovname,
1114                                     enum lustre_sec_part sec_part, int flags);
1115 static int name_create_mdt_and_lov(char **logname, char **lovname,
1116                                    struct fs_db *fsdb, int i);
1117
1118 static int mgs_steal_llog_handler(const struct lu_env *env,
1119                                   struct llog_handle *llh,
1120                                   struct llog_rec_hdr *rec, void *data)
1121 {
1122         struct mgs_device *mgs;
1123         struct mgs_target_info *mti, *tmti;
1124         struct fs_db *fsdb;
1125         int cfg_len = rec->lrh_len;
1126         char *cfg_buf = (char*) (rec + 1);
1127         struct lustre_cfg *lcfg;
1128         int rc = 0;
1129         struct llog_handle *mdt_llh = NULL;
1130         static int got_an_osc_or_mdc = 0;
1131         /* 0: not found any osc/mdc;
1132            1: found osc;
1133            2: found mdc;
1134         */
1135         static int last_step = -1;
1136
1137         ENTRY;
1138
1139         mti = ((struct temp_comp*)data)->comp_mti;
1140         tmti = ((struct temp_comp*)data)->comp_tmti;
1141         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1142         mgs = ((struct temp_comp*)data)->comp_mgs;
1143
1144         if (rec->lrh_type != OBD_CFG_REC) {
1145                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1146                 RETURN(-EINVAL);
1147         }
1148
1149         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1150         if (rc) {
1151                 CERROR("Insane cfg\n");
1152                 RETURN(rc);
1153         }
1154
1155         lcfg = (struct lustre_cfg *)cfg_buf;
1156
1157         if (lcfg->lcfg_command == LCFG_MARKER) {
1158                 struct cfg_marker *marker;
1159                 marker = lustre_cfg_buf(lcfg, 1);
1160                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1161                     (marker->cm_flags & CM_START)){
1162                         got_an_osc_or_mdc = 1;
1163                         strncpy(tmti->mti_svname, marker->cm_tgtname,
1164                                 sizeof(tmti->mti_svname));
1165                         rc = record_start_log(env, mgs, &mdt_llh,
1166                                               mti->mti_svname);
1167                         if (rc)
1168                                 RETURN(rc);
1169                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1170                                            mti->mti_svname,"add osc(copied)");
1171                         record_end_log(env, &mdt_llh);
1172                         last_step = marker->cm_step;
1173                         RETURN(rc);
1174                 }
1175                 if (!strncmp(marker->cm_comment,"add osc",7) &&
1176                     (marker->cm_flags & CM_END)){
1177                         LASSERT(last_step == marker->cm_step);
1178                         last_step = -1;
1179                         got_an_osc_or_mdc = 0;
1180                         rc = record_start_log(env, mgs, &mdt_llh,
1181                                               mti->mti_svname);
1182                         if (rc)
1183                                 RETURN(rc);
1184                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1185                                            mti->mti_svname,"add osc(copied)");
1186                         record_end_log(env, &mdt_llh);
1187                         RETURN(rc);
1188                 }
1189                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1190                     (marker->cm_flags & CM_START)){
1191                         got_an_osc_or_mdc = 2;
1192                         last_step = marker->cm_step;
1193                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1194                                strlen(marker->cm_tgtname));
1195
1196                         RETURN(rc);
1197                 }
1198                 if (!strncmp(marker->cm_comment,"add mdc",7) &&
1199                     (marker->cm_flags & CM_END)){
1200                         LASSERT(last_step == marker->cm_step);
1201                         last_step = -1;
1202                         got_an_osc_or_mdc = 0;
1203                         RETURN(rc);
1204                 }
1205         }
1206
1207         if (got_an_osc_or_mdc == 0 || last_step < 0)
1208                 RETURN(rc);
1209
1210         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1211                 uint64_t nodenid;
1212                 nodenid = lcfg->lcfg_nid;
1213
1214                 tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1215                 tmti->mti_nid_count++;
1216
1217                 RETURN(rc);
1218         }
1219
1220         if (lcfg->lcfg_command == LCFG_SETUP) {
1221                 char *target;
1222
1223                 target = lustre_cfg_string(lcfg, 1);
1224                 memcpy(tmti->mti_uuid, target, strlen(target));
1225                 RETURN(rc);
1226         }
1227
1228         /* ignore client side sptlrpc_conf_log */
1229         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1230                 RETURN(rc);
1231
1232         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1233                 int index;
1234
1235                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1236                         RETURN (-EINVAL);
1237
1238                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1239                        strlen(mti->mti_fsname));
1240                 tmti->mti_stripe_index = index;
1241
1242                 rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, tmti,
1243                                               mti->mti_svname);
1244                 memset(tmti, 0, sizeof(*tmti));
1245                 RETURN(rc);
1246         }
1247
1248         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1249                 int index;
1250                 char mdt_index[9];
1251                 char *logname, *lovname;
1252
1253                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1254                                              mti->mti_stripe_index);
1255                 if (rc)
1256                         RETURN(rc);
1257                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1258
1259                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1260                         name_destroy(&logname);
1261                         name_destroy(&lovname);
1262                         RETURN(-EINVAL);
1263                 }
1264
1265                 tmti->mti_stripe_index = index;
1266                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1267                                          mdt_index, lovname,
1268                                          LUSTRE_SP_MDT, 0);
1269                 name_destroy(&logname);
1270                 name_destroy(&lovname);
1271                 RETURN(rc);
1272         }
1273         RETURN(rc);
1274 }
1275
1276 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1277 /* stealed from mgs_get_fsdb_from_llog*/
1278 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1279                                               struct mgs_device *mgs,
1280                                               char *client_name,
1281                                               struct temp_comp* comp)
1282 {
1283         struct llog_handle *loghandle;
1284         struct lvfs_run_ctxt saved;
1285         struct mgs_target_info *tmti;
1286         struct llog_ctxt *ctxt;
1287         int rc;
1288
1289         ENTRY;
1290
1291         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1292         LASSERT(ctxt != NULL);
1293
1294         OBD_ALLOC_PTR(tmti);
1295         if (tmti == NULL)
1296                 GOTO(out_ctxt, rc = -ENOMEM);
1297
1298         comp->comp_tmti = tmti;
1299         comp->comp_mgs = mgs;
1300
1301         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
1302
1303         rc = llog_open(NULL, ctxt, &loghandle, NULL, client_name,
1304                        LLOG_OPEN_EXISTS);
1305         if (rc < 0) {
1306                 if (rc == -ENOENT)
1307                         rc = 0;
1308                 GOTO(out_pop, rc);
1309         }
1310
1311         rc = llog_init_handle(NULL, loghandle, LLOG_F_IS_PLAIN, NULL);
1312         if (rc)
1313                 GOTO(out_close, rc);
1314
1315         rc = llog_process_or_fork(env, loghandle, mgs_steal_llog_handler,
1316                                   (void *)comp, NULL, false);
1317         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1318 out_close:
1319         llog_close(NULL, loghandle);
1320 out_pop:
1321         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
1322         OBD_FREE_PTR(tmti);
1323 out_ctxt:
1324         llog_ctxt_put(ctxt);
1325         RETURN(rc);
1326 }
1327
1328 /* lmv is the second thing for client logs */
1329 /* copied from mgs_write_log_lov. Please refer to that.  */
1330 static int mgs_write_log_lmv(const struct lu_env *env,
1331                              struct mgs_device *mgs,
1332                              struct fs_db *fsdb,
1333                              struct mgs_target_info *mti,
1334                              char *logname, char *lmvname)
1335 {
1336         struct llog_handle *llh = NULL;
1337         struct lmv_desc *lmvdesc;
1338         char *uuid;
1339         int rc = 0;
1340         ENTRY;
1341
1342         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1343
1344         OBD_ALLOC_PTR(lmvdesc);
1345         if (lmvdesc == NULL)
1346                 RETURN(-ENOMEM);
1347         lmvdesc->ld_active_tgt_count = 0;
1348         lmvdesc->ld_tgt_count = 0;
1349         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1350         uuid = (char *)lmvdesc->ld_uuid.uuid;
1351
1352         rc = record_start_log(env, mgs, &llh, logname);
1353         if (rc)
1354                 GOTO(out_free, rc);
1355         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1356         if (rc)
1357                 GOTO(out_end, rc);
1358         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1359         if (rc)
1360                 GOTO(out_end, rc);
1361         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1362         if (rc)
1363                 GOTO(out_end, rc);
1364         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1365         if (rc)
1366                 GOTO(out_end, rc);
1367 out_end:
1368         record_end_log(env, &llh);
1369 out_free:
1370         OBD_FREE_PTR(lmvdesc);
1371         RETURN(rc);
1372 }
1373
1374 /* lov is the first thing in the mdt and client logs */
1375 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1376                              struct fs_db *fsdb, struct mgs_target_info *mti,
1377                              char *logname, char *lovname)
1378 {
1379         struct llog_handle *llh = NULL;
1380         struct lov_desc *lovdesc;
1381         char *uuid;
1382         int rc = 0;
1383         ENTRY;
1384
1385         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1386
1387         /*
1388         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1389         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1390               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1391         */
1392
1393         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1394         OBD_ALLOC_PTR(lovdesc);
1395         if (lovdesc == NULL)
1396                 RETURN(-ENOMEM);
1397         lovdesc->ld_magic = LOV_DESC_MAGIC;
1398         lovdesc->ld_tgt_count = 0;
1399         /* Defaults.  Can be changed later by lcfg config_param */
1400         lovdesc->ld_default_stripe_count = 1;
1401         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1402         lovdesc->ld_default_stripe_size = 1024 * 1024;
1403         lovdesc->ld_default_stripe_offset = -1;
1404         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1405         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1406         /* can these be the same? */
1407         uuid = (char *)lovdesc->ld_uuid.uuid;
1408
1409         /* This should always be the first entry in a log.
1410         rc = mgs_clear_log(obd, logname); */
1411         rc = record_start_log(env, mgs, &llh, logname);
1412         if (rc)
1413                 GOTO(out_free, rc);
1414         /* FIXME these should be a single journal transaction */
1415         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1416         if (rc)
1417                 GOTO(out_end, rc);
1418         rc = record_attach(env, llh, lovname, "lov", uuid);
1419         if (rc)
1420                 GOTO(out_end, rc);
1421         rc = record_lov_setup(env, llh, lovname, lovdesc);
1422         if (rc)
1423                 GOTO(out_end, rc);
1424         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1425         if (rc)
1426                 GOTO(out_end, rc);
1427         EXIT;
1428 out_end:
1429         record_end_log(env, &llh);
1430 out_free:
1431         OBD_FREE_PTR(lovdesc);
1432         return rc;
1433 }
1434
1435 /* add failnids to open log */
1436 static int mgs_write_log_failnids(const struct lu_env *env,
1437                                   struct mgs_target_info *mti,
1438                                   struct llog_handle *llh,
1439                                   char *cliname)
1440 {
1441         char *failnodeuuid = NULL;
1442         char *ptr = mti->mti_params;
1443         lnet_nid_t nid;
1444         int rc = 0;
1445
1446         /*
1447         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1448         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1449         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1450         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1451         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1452         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1453         */
1454
1455         /* Pull failnid info out of params string */
1456         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1457                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1458                         if (failnodeuuid == NULL) {
1459                                 /* We don't know the failover node name,
1460                                    so just use the first nid as the uuid */
1461                                 rc = name_create(&failnodeuuid,
1462                                                  libcfs_nid2str(nid), "");
1463                                 if (rc)
1464                                         return rc;
1465                         }
1466                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1467                                "client %s\n", libcfs_nid2str(nid),
1468                                failnodeuuid, cliname);
1469                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1470                 }
1471                 if (failnodeuuid)
1472                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1473         }
1474
1475         name_destroy(&failnodeuuid);
1476         return rc;
1477 }
1478
1479 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1480                                     struct mgs_device *mgs,
1481                                     struct fs_db *fsdb,
1482                                     struct mgs_target_info *mti,
1483                                     char *logname, char *lmvname)
1484 {
1485         struct llog_handle *llh = NULL;
1486         char *mdcname = NULL;
1487         char *nodeuuid = NULL;
1488         char *mdcuuid = NULL;
1489         char *lmvuuid = NULL;
1490         char index[6];
1491         int i, rc;
1492         ENTRY;
1493
1494         if (mgs_log_is_empty(env, mgs, logname)) {
1495                 CERROR("log is empty! Logical error\n");
1496                 RETURN(-EINVAL);
1497         }
1498
1499         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1500                mti->mti_svname, logname, lmvname);
1501
1502         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1503         if (rc)
1504                 RETURN(rc);
1505         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1506         if (rc)
1507                 GOTO(out_free, rc);
1508         rc = name_create(&mdcuuid, mdcname, "_UUID");
1509         if (rc)
1510                 GOTO(out_free, rc);
1511         rc = name_create(&lmvuuid, lmvname, "_UUID");
1512         if (rc)
1513                 GOTO(out_free, rc);
1514
1515         rc = record_start_log(env, mgs, &llh, logname);
1516         if (rc)
1517                 GOTO(out_free, rc);
1518         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1519                            "add mdc");
1520         if (rc)
1521                 GOTO(out_end, rc);
1522         for (i = 0; i < mti->mti_nid_count; i++) {
1523                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1524                        libcfs_nid2str(mti->mti_nids[i]));
1525
1526                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1527                 if (rc)
1528                         GOTO(out_end, rc);
1529         }
1530
1531         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1532         if (rc)
1533                 GOTO(out_end, rc);
1534         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1535         if (rc)
1536                 GOTO(out_end, rc);
1537         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1538         if (rc)
1539                 GOTO(out_end, rc);
1540         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1541         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1542                             index, "1");
1543         if (rc)
1544                 GOTO(out_end, rc);
1545         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1546                            "add mdc");
1547         if (rc)
1548                 GOTO(out_end, rc);
1549 out_end:
1550         record_end_log(env, &llh);
1551 out_free:
1552         name_destroy(&lmvuuid);
1553         name_destroy(&mdcuuid);
1554         name_destroy(&mdcname);
1555         name_destroy(&nodeuuid);
1556         RETURN(rc);
1557 }
1558
1559 /* add new mdc to already existent MDS */
1560 static int mgs_write_log_mdc_to_mdt(const struct lu_env *env,
1561                                     struct mgs_device *mgs,
1562                                     struct fs_db *fsdb,
1563                                     struct mgs_target_info *mti,
1564                                     char *logname)
1565 {
1566         struct llog_handle *llh = NULL;
1567         char *nodeuuid = NULL;
1568         char *mdcname = NULL;
1569         char *mdcuuid = NULL;
1570         char *mdtuuid = NULL;
1571         int idx = mti->mti_stripe_index;
1572         char index[9];
1573         int i, rc;
1574
1575         ENTRY;
1576         if (mgs_log_is_empty(env, mgs, logname)) {
1577                 CERROR("log is empty! Logical error\n");
1578                 RETURN (-EINVAL);
1579         }
1580
1581         CDEBUG(D_MGS, "adding mdc index %d to %s\n", idx, logname);
1582
1583         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1584         if (rc)
1585                 RETURN(rc);
1586         snprintf(index, sizeof(index), "-mdc%04x", idx);
1587         rc = name_create(&mdcname, logname, index);
1588         if (rc)
1589                 GOTO(out_free, rc);
1590         rc = name_create(&mdcuuid, mdcname, "_UUID");
1591         if (rc)
1592                 GOTO(out_free, rc);
1593         rc = name_create(&mdtuuid, logname, "_UUID");
1594         if (rc)
1595                 GOTO(out_free, rc);
1596
1597         rc = record_start_log(env, mgs, &llh, logname);
1598         if (rc)
1599                 GOTO(out_free, rc);
1600         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname, "add mdc");
1601         if (rc)
1602                 GOTO(out_end, rc);
1603         for (i = 0; i < mti->mti_nid_count; i++) {
1604                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1605                        libcfs_nid2str(mti->mti_nids[i]));
1606                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1607                 if (rc)
1608                         GOTO(out_end, rc);
1609         }
1610         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, mdcuuid);
1611         if (rc)
1612                 GOTO(out_end, rc);
1613         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1614         if (rc)
1615                 GOTO(out_end, rc);
1616         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1617         if (rc)
1618                 GOTO(out_end, rc);
1619         snprintf(index, sizeof(index), "%d", idx);
1620
1621         rc = record_mdc_add(env, llh, logname, mdcuuid, mti->mti_uuid,
1622                             index, "1");
1623         if (rc)
1624                 GOTO(out_end, rc);
1625         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add mdc");
1626         if (rc)
1627                 GOTO(out_end, rc);
1628 out_end:
1629         record_end_log(env, &llh);
1630 out_free:
1631         name_destroy(&mdtuuid);
1632         name_destroy(&mdcuuid);
1633         name_destroy(&mdcname);
1634         name_destroy(&nodeuuid);
1635         RETURN(rc);
1636 }
1637
1638 static int mgs_write_log_mdt0(const struct lu_env *env,
1639                               struct mgs_device *mgs,
1640                               struct fs_db *fsdb,
1641                               struct mgs_target_info *mti)
1642 {
1643         char *log = mti->mti_svname;
1644         struct llog_handle *llh = NULL;
1645         char *uuid, *lovname;
1646         char mdt_index[6];
1647         char *ptr = mti->mti_params;
1648         int rc = 0, failout = 0;
1649         ENTRY;
1650
1651         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
1652         if (uuid == NULL)
1653                 RETURN(-ENOMEM);
1654
1655         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1656                 failout = (strncmp(ptr, "failout", 7) == 0);
1657
1658         rc = name_create(&lovname, log, "-mdtlov");
1659         if (rc)
1660                 GOTO(out_free, rc);
1661         if (mgs_log_is_empty(env, mgs, log)) {
1662                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
1663                 if (rc)
1664                         GOTO(out_lod, rc);
1665         }
1666
1667         sprintf(mdt_index, "%d", mti->mti_stripe_index);
1668
1669         rc = record_start_log(env, mgs, &llh, log);
1670         if (rc)
1671                 GOTO(out_lod, rc);
1672
1673         /* add MDT itself */
1674
1675         /* FIXME this whole fn should be a single journal transaction */
1676         sprintf(uuid, "%s_UUID", log);
1677         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
1678         if (rc)
1679                 GOTO(out_lod, rc);
1680         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
1681         if (rc)
1682                 GOTO(out_end, rc);
1683         rc = record_mount_opt(env, llh, log, lovname, NULL);
1684         if (rc)
1685                 GOTO(out_end, rc);
1686         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
1687                         failout ? "n" : "f");
1688         if (rc)
1689                 GOTO(out_end, rc);
1690         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
1691         if (rc)
1692                 GOTO(out_end, rc);
1693 out_end:
1694         record_end_log(env, &llh);
1695 out_lod:
1696         name_destroy(&lovname);
1697 out_free:
1698         OBD_FREE(uuid, sizeof(struct obd_uuid));
1699         RETURN(rc);
1700 }
1701
1702 static inline int name_create_mdt(char **logname, char *fsname, int i)
1703 {
1704         char mdt_index[9];
1705
1706         sprintf(mdt_index, "-MDT%04x", i);
1707         return name_create(logname, fsname, mdt_index);
1708 }
1709
1710 static int name_create_mdt_and_lov(char **logname, char **lovname,
1711                                     struct fs_db *fsdb, int i)
1712 {
1713         int rc;
1714
1715         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
1716         if (rc)
1717                 return rc;
1718         /* COMPAT_180 */
1719         if (i == 0 && cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1720                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
1721         else
1722                 rc = name_create(lovname, *logname, "-mdtlov");
1723         if (rc) {
1724                 name_destroy(logname);
1725                 *logname = NULL;
1726         }
1727         return rc;
1728 }
1729
1730 static inline int name_create_mdt_osc(char **oscname, char *ostname,
1731                                        struct fs_db *fsdb, int i)
1732 {
1733         char suffix[16];
1734
1735         if (i == 0 && cfs_test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
1736                 sprintf(suffix, "-osc");
1737         else
1738                 sprintf(suffix, "-osc-MDT%04x", i);
1739         return name_create(oscname, ostname, suffix);
1740 }
1741
1742 /* envelope method for all layers log */
1743 static int mgs_write_log_mdt(const struct lu_env *env,
1744                              struct mgs_device *mgs,
1745                              struct fs_db *fsdb,
1746                              struct mgs_target_info *mti)
1747 {
1748         struct mgs_thread_info *mgi = mgs_env_info(env);
1749         struct llog_handle *llh = NULL;
1750         char *cliname;
1751         int rc, i = 0;
1752         ENTRY;
1753
1754         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
1755
1756         if (mti->mti_uuid[0] == '\0') {
1757                 /* Make up our own uuid */
1758                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1759                          "%s_UUID", mti->mti_svname);
1760         }
1761
1762         /* add mdt */
1763         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
1764         if (rc)
1765                 RETURN(rc);
1766         /* Append the mdt info to the client log */
1767         rc = name_create(&cliname, mti->mti_fsname, "-client");
1768         if (rc)
1769                 RETURN(rc);
1770
1771         if (mgs_log_is_empty(env, mgs, cliname)) {
1772                 /* Start client log */
1773                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
1774                                        fsdb->fsdb_clilov);
1775                 if (rc)
1776                         GOTO(out_free, rc);
1777                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
1778                                        fsdb->fsdb_clilmv);
1779                 if (rc)
1780                         GOTO(out_free, rc);
1781         }
1782
1783         /*
1784         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1785         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
1786         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
1787         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1788         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
1789         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
1790         */
1791
1792                 /* copy client info about lov/lmv */
1793                 mgi->mgi_comp.comp_mti = mti;
1794                 mgi->mgi_comp.comp_fsdb = fsdb;
1795
1796                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
1797                                                         &mgi->mgi_comp);
1798                 if (rc)
1799                         GOTO(out_free, rc);
1800                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
1801                                               fsdb->fsdb_clilmv);
1802                 if (rc)
1803                         GOTO(out_free, rc);
1804
1805                 /* add mountopts */
1806                 rc = record_start_log(env, mgs, &llh, cliname);
1807                 if (rc)
1808                         GOTO(out_free, rc);
1809
1810                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
1811                                    "mount opts");
1812                 if (rc)
1813                         GOTO(out_end, rc);
1814                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
1815                                       fsdb->fsdb_clilmv);
1816                 if (rc)
1817                         GOTO(out_end, rc);
1818                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
1819                                    "mount opts");
1820
1821         if (rc)
1822                 GOTO(out_end, rc);
1823         /* for_all_existing_mdt except current one */
1824         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
1825                 char *mdtname;
1826                 if (i !=  mti->mti_stripe_index &&
1827                     cfs_test_bit(i,  fsdb->fsdb_mdt_index_map)) {
1828                         rc = name_create_mdt(&mdtname, mti->mti_fsname, i);
1829                         if (rc)
1830                                 GOTO(out_end, rc);
1831                         rc = mgs_write_log_mdc_to_mdt(env, mgs, fsdb, mti, mdtname);
1832                         name_destroy(&mdtname);
1833                         if (rc)
1834                                 GOTO(out_end, rc);
1835                 }
1836         }
1837 out_end:
1838         record_end_log(env, &llh);
1839 out_free:
1840         name_destroy(&cliname);
1841         RETURN(rc);
1842 }
1843
1844 /* Add the ost info to the client/mdt lov */
1845 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1846                                     struct mgs_device *mgs, struct fs_db *fsdb,
1847                                     struct mgs_target_info *mti,
1848                                     char *logname, char *suffix, char *lovname,
1849                                     enum lustre_sec_part sec_part, int flags)
1850 {
1851         struct llog_handle *llh = NULL;
1852         char *nodeuuid = NULL;
1853         char *oscname = NULL;
1854         char *oscuuid = NULL;
1855         char *lovuuid = NULL;
1856         char *svname = NULL;
1857         char index[6];
1858         int i, rc;
1859
1860         ENTRY;
1861         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
1862                mti->mti_svname, logname);
1863
1864         if (mgs_log_is_empty(env, mgs, logname)) {
1865                 CERROR("log is empty! Logical error\n");
1866                 RETURN (-EINVAL);
1867         }
1868
1869         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1870         if (rc)
1871                 RETURN(rc);
1872         rc = name_create(&svname, mti->mti_svname, "-osc");
1873         if (rc)
1874                 GOTO(out_free, rc);
1875         rc = name_create(&oscname, svname, suffix);
1876         if (rc)
1877                 GOTO(out_free, rc);
1878         rc = name_create(&oscuuid, oscname, "_UUID");
1879         if (rc)
1880                 GOTO(out_free, rc);
1881         rc = name_create(&lovuuid, lovname, "_UUID");
1882         if (rc)
1883                 GOTO(out_free, rc);
1884
1885         /*
1886         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
1887         multihomed (#4)
1888         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
1889         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
1890         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
1891         failover (#6,7)
1892         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
1893         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
1894         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
1895         */
1896
1897         rc = record_start_log(env, mgs, &llh, logname);
1898         if (rc)
1899                 GOTO(out_free, rc);
1900         /* FIXME these should be a single journal transaction */
1901         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
1902                            "add osc");
1903         if (rc)
1904                 GOTO(out_end, rc);
1905         for (i = 0; i < mti->mti_nid_count; i++) {
1906                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
1907                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1908                 if (rc)
1909                         GOTO(out_end, rc);
1910         }
1911         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
1912         if (rc)
1913                 GOTO(out_end, rc);
1914         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
1915         if (rc)
1916                 GOTO(out_end, rc);
1917         rc = mgs_write_log_failnids(env, mti, llh, oscname);
1918         if (rc)
1919                 GOTO(out_end, rc);
1920         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1921         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
1922         if (rc)
1923                 GOTO(out_end, rc);
1924         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
1925                            "add osc");
1926         if (rc)
1927                 GOTO(out_end, rc);
1928 out_end:
1929         record_end_log(env, &llh);
1930 out_free:
1931         name_destroy(&lovuuid);
1932         name_destroy(&oscuuid);
1933         name_destroy(&oscname);
1934         name_destroy(&svname);
1935         name_destroy(&nodeuuid);
1936         RETURN(rc);
1937 }
1938
1939 static int mgs_write_log_ost(const struct lu_env *env,
1940                              struct mgs_device *mgs, struct fs_db *fsdb,
1941                              struct mgs_target_info *mti)
1942 {
1943         struct llog_handle *llh = NULL;
1944         char *logname, *lovname;
1945         char *ptr = mti->mti_params;
1946         int rc, flags = 0, failout = 0, i;
1947         ENTRY;
1948
1949         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
1950
1951         /* The ost startup log */
1952
1953         /* If the ost log already exists, that means that someone reformatted
1954            the ost and it called target_add again. */
1955         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
1956                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
1957                                    "exists, yet the server claims it never "
1958                                    "registered. It may have been reformatted, "
1959                                    "or the index changed. writeconf the MDT to "
1960                                    "regenerate all logs.\n", mti->mti_svname);
1961                 RETURN(-EALREADY);
1962         }
1963
1964         /*
1965         attach obdfilter ost1 ost1_UUID
1966         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
1967         */
1968         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
1969                 failout = (strncmp(ptr, "failout", 7) == 0);
1970         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
1971         if (rc)
1972                 RETURN(rc);
1973         /* FIXME these should be a single journal transaction */
1974         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
1975         if (rc)
1976                 GOTO(out_end, rc);
1977         if (*mti->mti_uuid == '\0')
1978                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
1979                          "%s_UUID", mti->mti_svname);
1980         rc = record_attach(env, llh, mti->mti_svname,
1981                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
1982         if (rc)
1983                 GOTO(out_end, rc);
1984         rc = record_setup(env, llh, mti->mti_svname,
1985                           "dev"/*ignored*/, "type"/*ignored*/,
1986                           failout ? "n" : "f", 0/*options*/);
1987         if (rc)
1988                 GOTO(out_end, rc);
1989         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
1990         if (rc)
1991                 GOTO(out_end, rc);
1992 out_end:
1993         record_end_log(env, &llh);
1994         if (rc)
1995                 RETURN(rc);
1996         /* We also have to update the other logs where this osc is part of
1997            the lov */
1998
1999         if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2000                 /* If we're upgrading, the old mdt log already has our
2001                    entry. Let's do a fake one for fun. */
2002                 /* Note that we can't add any new failnids, since we don't
2003                    know the old osc names. */
2004                 flags = CM_SKIP | CM_UPGRADE146;
2005
2006         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2007                 /* If the update flag isn't set, don't update client/mdt
2008                    logs. */
2009                 flags |= CM_SKIP;
2010                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2011                               "the MDT first to regenerate it.\n",
2012                               mti->mti_svname);
2013         }
2014
2015         /* Add ost to all MDT lov defs */
2016         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2017                 if (cfs_test_bit(i, fsdb->fsdb_mdt_index_map)) {
2018                         char mdt_index[9];
2019
2020                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2021                                                      i);
2022                         if (rc)
2023                                 RETURN(rc);
2024                         sprintf(mdt_index, "-MDT%04x", i);
2025                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2026                                                       logname, mdt_index,
2027                                                       lovname, LUSTRE_SP_MDT,
2028                                                       flags);
2029                         name_destroy(&logname);
2030                         name_destroy(&lovname);
2031                         if (rc)
2032                                 RETURN(rc);
2033                 }
2034         }
2035
2036         /* Append ost info to the client log */
2037         rc = name_create(&logname, mti->mti_fsname, "-client");
2038         if (rc)
2039                 RETURN(rc);
2040         if (mgs_log_is_empty(env, mgs, logname)) {
2041                 /* Start client log */
2042                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2043                                        fsdb->fsdb_clilov);
2044                 if (rc)
2045                         GOTO(out_free, rc);
2046                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2047                                        fsdb->fsdb_clilmv);
2048                 if (rc)
2049                         GOTO(out_free, rc);
2050         }
2051         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2052                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2053 out_free:
2054         name_destroy(&logname);
2055         RETURN(rc);
2056 }
2057
2058 static __inline__ int mgs_param_empty(char *ptr)
2059 {
2060         char *tmp;
2061
2062         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2063                 return 1;
2064         return 0;
2065 }
2066
2067 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2068                                           struct mgs_device *mgs,
2069                                           struct fs_db *fsdb,
2070                                           struct mgs_target_info *mti,
2071                                           char *logname, char *cliname)
2072 {
2073         int rc;
2074         struct llog_handle *llh = NULL;
2075
2076         if (mgs_param_empty(mti->mti_params)) {
2077                 /* Remove _all_ failnids */
2078                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2079                                 mti->mti_svname, "add failnid", CM_SKIP);
2080                 return rc < 0 ? rc : 0;
2081         }
2082
2083         /* Otherwise failover nids are additive */
2084         rc = record_start_log(env, mgs, &llh, logname);
2085         if (rc)
2086                 return rc;
2087                 /* FIXME this should be a single journal transaction */
2088         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2089                            "add failnid");
2090         if (rc)
2091                 goto out_end;
2092         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2093         if (rc)
2094                 goto out_end;
2095         rc = record_marker(env, llh, fsdb, CM_END,
2096                            mti->mti_svname, "add failnid");
2097 out_end:
2098         record_end_log(env, &llh);
2099         return rc;
2100 }
2101
2102
2103 /* Add additional failnids to an existing log.
2104    The mdc/osc must have been added to logs first */
2105 /* tcp nids must be in dotted-quad ascii -
2106    we can't resolve hostnames from the kernel. */
2107 static int mgs_write_log_add_failnid(const struct lu_env *env,
2108                                      struct mgs_device *mgs,
2109                                      struct fs_db *fsdb,
2110                                      struct mgs_target_info *mti)
2111 {
2112         char *logname, *cliname;
2113         int rc;
2114         ENTRY;
2115
2116         /* FIXME we currently can't erase the failnids
2117          * given when a target first registers, since they aren't part of
2118          * an "add uuid" stanza */
2119
2120         /* Verify that we know about this target */
2121         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2122                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2123                                    "yet. It must be started before failnids "
2124                                    "can be added.\n", mti->mti_svname);
2125                 RETURN(-ENOENT);
2126         }
2127
2128         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2129         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2130                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2131         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2132                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2133         } else {
2134                 RETURN(-EINVAL);
2135         }
2136         if (rc)
2137                 RETURN(rc);
2138         /* Add failover nids to the client log */
2139         rc = name_create(&logname, mti->mti_fsname, "-client");
2140         if (rc) {
2141                 name_destroy(&cliname);
2142                 RETURN(rc);
2143         }
2144         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2145         name_destroy(&logname);
2146         name_destroy(&cliname);
2147         if (rc)
2148                 RETURN(rc);
2149
2150         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2151                 /* Add OST failover nids to the MDT logs as well */
2152                 int i;
2153
2154                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2155                         if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2156                                 continue;
2157                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2158                         if (rc)
2159                                 RETURN(rc);
2160                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2161                                                  fsdb, i);
2162                         if (rc) {
2163                                 name_destroy(&logname);
2164                                 RETURN(rc);
2165                         }
2166                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2167                                                             mti, logname,
2168                                                             cliname);
2169                         name_destroy(&cliname);
2170                         name_destroy(&logname);
2171                         if (rc)
2172                                 RETURN(rc);
2173                 }
2174         }
2175
2176         RETURN(rc);
2177 }
2178
2179 static int mgs_wlp_lcfg(const struct lu_env *env,
2180                         struct mgs_device *mgs, struct fs_db *fsdb,
2181                         struct mgs_target_info *mti,
2182                         char *logname, struct lustre_cfg_bufs *bufs,
2183                         char *tgtname, char *ptr)
2184 {
2185         char comment[MTI_NAME_MAXLEN];
2186         char *tmp;
2187         struct lustre_cfg *lcfg;
2188         int rc, del;
2189
2190         /* Erase any old settings of this same parameter */
2191         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2192         comment[MTI_NAME_MAXLEN - 1] = 0;
2193         /* But don't try to match the value. */
2194         if ((tmp = strchr(comment, '=')))
2195             *tmp = 0;
2196         /* FIXME we should skip settings that are the same as old values */
2197         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2198         if (rc < 0)
2199                 return rc;
2200         del = mgs_param_empty(ptr);
2201
2202         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2203                       "Sett" : "Modify", tgtname, comment, logname);
2204         if (del)
2205                 return rc;
2206
2207         lustre_cfg_bufs_reset(bufs, tgtname);
2208         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2209         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2210         if (!lcfg)
2211                 return -ENOMEM;
2212         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2213         lustre_cfg_free(lcfg);
2214         return rc;
2215 }
2216
2217 /* write global variable settings into log */
2218 static int mgs_write_log_sys(const struct lu_env *env,
2219                              struct mgs_device *mgs, struct fs_db *fsdb,
2220                              struct mgs_target_info *mti, char *sys, char *ptr)
2221 {
2222         struct mgs_thread_info *mgi = mgs_env_info(env);
2223         struct lustre_cfg *lcfg;
2224         char *tmp, sep;
2225         int rc, cmd, convert = 1;
2226
2227         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2228                 cmd = LCFG_SET_TIMEOUT;
2229         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2230                 cmd = LCFG_SET_LDLM_TIMEOUT;
2231         /* Check for known params here so we can return error to lctl */
2232         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2233                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2234                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2235                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2236                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2237                 cmd = LCFG_PARAM;
2238         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2239                 convert = 0; /* Don't convert string value to integer */
2240                 cmd = LCFG_PARAM;
2241         } else {
2242                 return -EINVAL;
2243         }
2244
2245         if (mgs_param_empty(ptr))
2246                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2247         else
2248                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2249
2250         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2251         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2252         if (!convert && *tmp != '\0')
2253                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2254         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2255         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2256         /* truncate the comment to the parameter name */
2257         ptr = tmp - 1;
2258         sep = *ptr;
2259         *ptr = '\0';
2260         /* modify all servers and clients */
2261         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2262                                       *tmp == '\0' ? NULL : lcfg,
2263                                       mti->mti_fsname, sys, 0);
2264         if (rc == 0 && *tmp != '\0') {
2265                 switch (cmd) {
2266                 case LCFG_SET_TIMEOUT:
2267                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2268                                 class_process_config(lcfg);
2269                         break;
2270                 case LCFG_SET_LDLM_TIMEOUT:
2271                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2272                                 class_process_config(lcfg);
2273                         break;
2274                 default:
2275                         break;
2276                 }
2277         }
2278         *ptr = sep;
2279         lustre_cfg_free(lcfg);
2280         return rc;
2281 }
2282
2283 /* write quota settings into log */
2284 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2285                                struct fs_db *fsdb, struct mgs_target_info *mti,
2286                                char *quota, char *ptr)
2287 {
2288         struct lustre_cfg_bufs bufs;
2289         struct lustre_cfg *lcfg;
2290         char *tmp;
2291         char sep;
2292         int cmd = LCFG_PARAM;
2293         int rc;
2294
2295         /* support only 'meta' and 'data' pools so far */
2296         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2297             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2298                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2299                        "& quota.ost are)\n", ptr);
2300                 return -EINVAL;
2301         }
2302
2303         if (*tmp == '\0') {
2304                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2305         } else {
2306                 CDEBUG(D_MGS, "global '%s'\n", quota);
2307
2308                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2309                     strcmp(tmp, "none") != 0) {
2310                         CERROR("enable option(%s) isn't supported\n", tmp);
2311                         return -EINVAL;
2312                 }
2313         }
2314
2315         lustre_cfg_bufs_reset(&bufs, NULL);
2316         lustre_cfg_bufs_set_string(&bufs, 1, quota);
2317         lcfg = lustre_cfg_new(cmd, &bufs);
2318         /* truncate the comment to the parameter name */
2319         ptr = tmp - 1;
2320         sep = *ptr;
2321         *ptr = '\0';
2322
2323         /* XXX we duplicated quota enable information in all server
2324          *     config logs, it should be moved to a separate config
2325          *     log once we cleanup the config log for global param. */
2326         /* modify all servers */
2327         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2328                                       *tmp == '\0' ? NULL : lcfg,
2329                                       mti->mti_fsname, quota, 1);
2330         *ptr = sep;
2331         lustre_cfg_free(lcfg);
2332         return rc;
2333 }
2334
2335 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2336                                    struct mgs_device *mgs,
2337                                    struct fs_db *fsdb,
2338                                    struct mgs_target_info *mti,
2339                                    char *param)
2340 {
2341         struct mgs_thread_info *mgi = mgs_env_info(env);
2342         struct llog_handle     *llh = NULL;
2343         char                   *logname;
2344         char                   *comment, *ptr;
2345         struct lustre_cfg      *lcfg;
2346         int                     rc, len;
2347         ENTRY;
2348
2349         /* get comment */
2350         ptr = strchr(param, '=');
2351         LASSERT(ptr);
2352         len = ptr - param;
2353
2354         OBD_ALLOC(comment, len + 1);
2355         if (comment == NULL)
2356                 RETURN(-ENOMEM);
2357         strncpy(comment, param, len);
2358         comment[len] = '\0';
2359
2360         /* prepare lcfg */
2361         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2362         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2363         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2364         if (lcfg == NULL)
2365                 GOTO(out_comment, rc = -ENOMEM);
2366
2367         /* construct log name */
2368         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2369         if (rc)
2370                 GOTO(out_lcfg, rc);
2371
2372         if (mgs_log_is_empty(env, mgs, logname)) {
2373                 rc = record_start_log(env, mgs, &llh, logname);
2374                 if (rc)
2375                         GOTO(out, rc);
2376                 record_end_log(env, &llh);
2377         }
2378
2379         /* obsolete old one */
2380         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2381                         comment, CM_SKIP);
2382         if (rc < 0)
2383                 GOTO(out, rc);
2384         /* write the new one */
2385         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2386                                   mti->mti_svname, comment);
2387         if (rc)
2388                 CERROR("err %d writing log %s\n", rc, logname);
2389 out:
2390         name_destroy(&logname);
2391 out_lcfg:
2392         lustre_cfg_free(lcfg);
2393 out_comment:
2394         OBD_FREE(comment, len + 1);
2395         RETURN(rc);
2396 }
2397
2398 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2399                                         char *param)
2400 {
2401         char    *ptr;
2402
2403         /* disable the adjustable udesc parameter for now, i.e. use default
2404          * setting that client always ship udesc to MDT if possible. to enable
2405          * it simply remove the following line */
2406         goto error_out;
2407
2408         ptr = strchr(param, '=');
2409         if (ptr == NULL)
2410                 goto error_out;
2411         *ptr++ = '\0';
2412
2413         if (strcmp(param, PARAM_SRPC_UDESC))
2414                 goto error_out;
2415
2416         if (strcmp(ptr, "yes") == 0) {
2417                 cfs_set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2418                 CWARN("Enable user descriptor shipping from client to MDT\n");
2419         } else if (strcmp(ptr, "no") == 0) {
2420                 cfs_clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2421                 CWARN("Disable user descriptor shipping from client to MDT\n");
2422         } else {
2423                 *(ptr - 1) = '=';
2424                 goto error_out;
2425         }
2426         return 0;
2427
2428 error_out:
2429         CERROR("Invalid param: %s\n", param);
2430         return -EINVAL;
2431 }
2432
2433 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2434                                   const char *svname,
2435                                   char *param)
2436 {
2437         struct sptlrpc_rule      rule;
2438         struct sptlrpc_rule_set *rset;
2439         int                      rc;
2440         ENTRY;
2441
2442         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2443                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2444                 RETURN(-EINVAL);
2445         }
2446
2447         if (strncmp(param, PARAM_SRPC_UDESC,
2448                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2449                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2450         }
2451
2452         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2453                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2454                 RETURN(-EINVAL);
2455         }
2456
2457         param += sizeof(PARAM_SRPC_FLVR) - 1;
2458
2459         rc = sptlrpc_parse_rule(param, &rule);
2460         if (rc)
2461                 RETURN(rc);
2462
2463         /* mgs rules implies must be mgc->mgs */
2464         if (cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2465                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2466                      rule.sr_from != LUSTRE_SP_ANY) ||
2467                     (rule.sr_to != LUSTRE_SP_MGS &&
2468                      rule.sr_to != LUSTRE_SP_ANY))
2469                         RETURN(-EINVAL);
2470         }
2471
2472         /* preapre room for this coming rule. svcname format should be:
2473          * - fsname: general rule
2474          * - fsname-tgtname: target-specific rule
2475          */
2476         if (strchr(svname, '-')) {
2477                 struct mgs_tgt_srpc_conf *tgtconf;
2478                 int                       found = 0;
2479
2480                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
2481                      tgtconf = tgtconf->mtsc_next) {
2482                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
2483                                 found = 1;
2484                                 break;
2485                         }
2486                 }
2487
2488                 if (!found) {
2489                         int name_len;
2490
2491                         OBD_ALLOC_PTR(tgtconf);
2492                         if (tgtconf == NULL)
2493                                 RETURN(-ENOMEM);
2494
2495                         name_len = strlen(svname);
2496
2497                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
2498                         if (tgtconf->mtsc_tgt == NULL) {
2499                                 OBD_FREE_PTR(tgtconf);
2500                                 RETURN(-ENOMEM);
2501                         }
2502                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
2503
2504                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
2505                         fsdb->fsdb_srpc_tgt = tgtconf;
2506                 }
2507
2508                 rset = &tgtconf->mtsc_rset;
2509         } else {
2510                 rset = &fsdb->fsdb_srpc_gen;
2511         }
2512
2513         rc = sptlrpc_rule_set_merge(rset, &rule);
2514
2515         RETURN(rc);
2516 }
2517
2518 static int mgs_srpc_set_param(const struct lu_env *env,
2519                               struct mgs_device *mgs,
2520                               struct fs_db *fsdb,
2521                               struct mgs_target_info *mti,
2522                               char *param)
2523 {
2524         char                   *copy;
2525         int                     rc, copy_size;
2526         ENTRY;
2527
2528 #ifndef HAVE_GSS
2529         RETURN(-EINVAL);
2530 #endif
2531         /* keep a copy of original param, which could be destroied
2532          * during parsing */
2533         copy_size = strlen(param) + 1;
2534         OBD_ALLOC(copy, copy_size);
2535         if (copy == NULL)
2536                 return -ENOMEM;
2537         memcpy(copy, param, copy_size);
2538
2539         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
2540         if (rc)
2541                 goto out_free;
2542
2543         /* previous steps guaranteed the syntax is correct */
2544         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
2545         if (rc)
2546                 goto out_free;
2547
2548         if (cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2549                 /*
2550                  * for mgs rules, make them effective immediately.
2551                  */
2552                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
2553                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
2554                                                  &fsdb->fsdb_srpc_gen);
2555         }
2556
2557 out_free:
2558         OBD_FREE(copy, copy_size);
2559         RETURN(rc);
2560 }
2561
2562 struct mgs_srpc_read_data {
2563         struct fs_db   *msrd_fsdb;
2564         int             msrd_skip;
2565 };
2566
2567 static int mgs_srpc_read_handler(const struct lu_env *env,
2568                                  struct llog_handle *llh,
2569                                  struct llog_rec_hdr *rec, void *data)
2570 {
2571         struct mgs_srpc_read_data *msrd = data;
2572         struct cfg_marker         *marker;
2573         struct lustre_cfg         *lcfg = (struct lustre_cfg *)(rec + 1);
2574         char                      *svname, *param;
2575         int                        cfg_len, rc;
2576         ENTRY;
2577
2578         if (rec->lrh_type != OBD_CFG_REC) {
2579                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
2580                 RETURN(-EINVAL);
2581         }
2582
2583         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
2584                   sizeof(struct llog_rec_tail);
2585
2586         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
2587         if (rc) {
2588                 CERROR("Insane cfg\n");
2589                 RETURN(rc);
2590         }
2591
2592         if (lcfg->lcfg_command == LCFG_MARKER) {
2593                 marker = lustre_cfg_buf(lcfg, 1);
2594
2595                 if (marker->cm_flags & CM_START &&
2596                     marker->cm_flags & CM_SKIP)
2597                         msrd->msrd_skip = 1;
2598                 if (marker->cm_flags & CM_END)
2599                         msrd->msrd_skip = 0;
2600
2601                 RETURN(0);
2602         }
2603
2604         if (msrd->msrd_skip)
2605                 RETURN(0);
2606
2607         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
2608                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
2609                 RETURN(0);
2610         }
2611
2612         svname = lustre_cfg_string(lcfg, 0);
2613         if (svname == NULL) {
2614                 CERROR("svname is empty\n");
2615                 RETURN(0);
2616         }
2617
2618         param = lustre_cfg_string(lcfg, 1);
2619         if (param == NULL) {
2620                 CERROR("param is empty\n");
2621                 RETURN(0);
2622         }
2623
2624         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
2625         if (rc)
2626                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
2627
2628         RETURN(0);
2629 }
2630
2631 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
2632                                 struct mgs_device *mgs,
2633                                 struct fs_db *fsdb)
2634 {
2635         struct llog_handle        *llh = NULL;
2636         struct lvfs_run_ctxt       saved;
2637         struct llog_ctxt          *ctxt;
2638         char                      *logname;
2639         struct mgs_srpc_read_data  msrd;
2640         int                        rc;
2641         ENTRY;
2642
2643         /* construct log name */
2644         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
2645         if (rc)
2646                 RETURN(rc);
2647
2648         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
2649         LASSERT(ctxt != NULL);
2650
2651         if (mgs_log_is_empty(env, mgs, logname))
2652                 GOTO(out, rc = 0);
2653
2654         push_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
2655
2656         rc = llog_open(NULL, ctxt, &llh, NULL, logname,
2657                        LLOG_OPEN_EXISTS);
2658         if (rc < 0) {
2659                 if (rc == -ENOENT)
2660                         rc = 0;
2661                 GOTO(out_pop, rc);
2662         }
2663
2664         rc = llog_init_handle(NULL, llh, LLOG_F_IS_PLAIN, NULL);
2665         if (rc)
2666                 GOTO(out_close, rc);
2667
2668         if (llog_get_size(llh) <= 1)
2669                 GOTO(out_close, rc = 0);
2670
2671         msrd.msrd_fsdb = fsdb;
2672         msrd.msrd_skip = 0;
2673
2674         rc = llog_process_or_fork(env, llh, mgs_srpc_read_handler,
2675                                   (void *)&msrd, NULL, false);
2676
2677 out_close:
2678         llog_close(NULL, llh);
2679 out_pop:
2680         pop_ctxt(&saved, &mgs->mgs_obd->obd_lvfs_ctxt, NULL);
2681 out:
2682         llog_ctxt_put(ctxt);
2683         name_destroy(&logname);
2684
2685         if (rc)
2686                 CERROR("failed to read sptlrpc config database: %d\n", rc);
2687         RETURN(rc);
2688 }
2689
2690 /* Permanent settings of all parameters by writing into the appropriate
2691  * configuration logs.
2692  * A parameter with null value ("<param>='\0'") means to erase it out of
2693  * the logs.
2694  */
2695 static int mgs_write_log_param(const struct lu_env *env,
2696                                struct mgs_device *mgs, struct fs_db *fsdb,
2697                                struct mgs_target_info *mti, char *ptr)
2698 {
2699         struct mgs_thread_info *mgi = mgs_env_info(env);
2700         char *logname;
2701         char *tmp;
2702         int rc = 0, rc2 = 0;
2703         ENTRY;
2704
2705         /* For various parameter settings, we have to figure out which logs
2706            care about them (e.g. both mdt and client for lov settings) */
2707         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2708
2709         /* The params are stored in MOUNT_DATA_FILE and modified via
2710            tunefs.lustre, or set using lctl conf_param */
2711
2712         /* Processed in lustre_start_mgc */
2713         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
2714                 GOTO(end, rc);
2715
2716         /* Processed in ost/mdt */
2717         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
2718                 GOTO(end, rc);
2719
2720         /* Processed in mgs_write_log_ost */
2721         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
2722                 if (mti->mti_flags & LDD_F_PARAM) {
2723                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
2724                                            "changed with tunefs.lustre"
2725                                            "and --writeconf\n", ptr);
2726                         rc = -EPERM;
2727                 }
2728                 GOTO(end, rc);
2729         }
2730
2731         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
2732                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
2733                 GOTO(end, rc);
2734         }
2735
2736         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
2737                 /* Add a failover nidlist */
2738                 rc = 0;
2739                 /* We already processed failovers params for new
2740                    targets in mgs_write_log_target */
2741                 if (mti->mti_flags & LDD_F_PARAM) {
2742                         CDEBUG(D_MGS, "Adding failnode\n");
2743                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
2744                 }
2745                 GOTO(end, rc);
2746         }
2747
2748         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
2749                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
2750                 GOTO(end, rc);
2751         }
2752
2753         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
2754                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
2755                 GOTO(end, rc);
2756         }
2757
2758         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
2759                 /* active=0 means off, anything else means on */
2760                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
2761                 int i;
2762
2763                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2764                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
2765                                            "be (de)activated.\n",
2766                                            mti->mti_svname);
2767                         GOTO(end, rc = -EINVAL);
2768                 }
2769                 LCONSOLE_WARN("Permanently %sactivating %s\n",
2770                               flag ? "de": "re", mti->mti_svname);
2771                 /* Modify clilov */
2772                 rc = name_create(&logname, mti->mti_fsname, "-client");
2773                 if (rc)
2774                         GOTO(end, rc);
2775                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2776                                 mti->mti_svname, "add osc", flag);
2777                 name_destroy(&logname);
2778                 if (rc)
2779                         goto active_err;
2780                 /* Modify mdtlov */
2781                 /* Add to all MDT logs for CMD */
2782                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2783                         if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2784                                 continue;
2785                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2786                         if (rc)
2787                                 GOTO(end, rc);
2788                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
2789                                         mti->mti_svname, "add osc", flag);
2790                         name_destroy(&logname);
2791                         if (rc)
2792                                 goto active_err;
2793                 }
2794         active_err:
2795                 if (rc) {
2796                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
2797                                            "log (%d). No permanent "
2798                                            "changes were made to the "
2799                                            "config log.\n",
2800                                            mti->mti_svname, rc);
2801                         if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
2802                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
2803                                                    " because the log"
2804                                                    "is in the old 1.4"
2805                                                    "style. Consider "
2806                                                    " --writeconf to "
2807                                                    "update the logs.\n");
2808                         GOTO(end, rc);
2809                 }
2810                 /* Fall through to osc proc for deactivating live OSC
2811                    on running MDT / clients. */
2812         }
2813         /* Below here, let obd's XXX_process_config methods handle it */
2814
2815         /* All lov. in proc */
2816         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
2817                 char *mdtlovname;
2818
2819                 CDEBUG(D_MGS, "lov param %s\n", ptr);
2820                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
2821                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
2822                                            "set on the MDT, not %s. "
2823                                            "Ignoring.\n",
2824                                            mti->mti_svname);
2825                         GOTO(end, rc = 0);
2826                 }
2827
2828                 /* Modify mdtlov */
2829                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2830                         GOTO(end, rc = -ENODEV);
2831
2832                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
2833                                              mti->mti_stripe_index);
2834                 if (rc)
2835                         GOTO(end, rc);
2836                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2837                                   &mgi->mgi_bufs, mdtlovname, ptr);
2838                 name_destroy(&logname);
2839                 name_destroy(&mdtlovname);
2840                 if (rc)
2841                         GOTO(end, rc);
2842
2843                 /* Modify clilov */
2844                 rc = name_create(&logname, mti->mti_fsname, "-client");
2845                 if (rc)
2846                         GOTO(end, rc);
2847                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2848                                   fsdb->fsdb_clilov, ptr);
2849                 name_destroy(&logname);
2850                 GOTO(end, rc);
2851         }
2852
2853         /* All osc., mdc., llite. params in proc */
2854         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
2855             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
2856             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
2857                 char *cname;
2858
2859                 if (cfs_test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2860                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
2861                                            " cannot be modified. Consider"
2862                                            " updating the configuration with"
2863                                            " --writeconf\n",
2864                                            mti->mti_svname);
2865                         GOTO(end, rc = -EINVAL);
2866                 }
2867                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
2868                         rc = name_create(&cname, mti->mti_fsname, "-client");
2869                         /* Add the client type to match the obdname in
2870                            class_config_llog_handler */
2871                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2872                         rc = name_create(&cname, mti->mti_svname, "-mdc");
2873                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2874                         rc = name_create(&cname, mti->mti_svname, "-osc");
2875                 } else {
2876                         GOTO(end, rc = -EINVAL);
2877                 }
2878                 if (rc)
2879                         GOTO(end, rc);
2880
2881                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2882
2883                 /* Modify client */
2884                 rc = name_create(&logname, mti->mti_fsname, "-client");
2885                 if (rc) {
2886                         name_destroy(&cname);
2887                         GOTO(end, rc);
2888                 }
2889                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
2890                                   cname, ptr);
2891
2892                 /* osc params affect the MDT as well */
2893                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
2894                         int i;
2895
2896                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2897                                 if (!cfs_test_bit(i, fsdb->fsdb_mdt_index_map))
2898                                         continue;
2899                                 name_destroy(&cname);
2900                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
2901                                                          fsdb, i);
2902                                 name_destroy(&logname);
2903                                 if (rc)
2904                                         break;
2905                                 rc = name_create_mdt(&logname,
2906                                                      mti->mti_fsname, i);
2907                                 if (rc)
2908                                         break;
2909                                 if (!mgs_log_is_empty(env, mgs, logname)) {
2910                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
2911                                                           mti, logname,
2912                                                           &mgi->mgi_bufs,
2913                                                           cname, ptr);
2914                                         if (rc)
2915                                                 break;
2916                                 }
2917                         }
2918                 }
2919                 name_destroy(&logname);
2920                 name_destroy(&cname);
2921                 GOTO(end, rc);
2922         }
2923
2924         /* All mdt. params in proc */
2925         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
2926                 int i;
2927                 __u32 idx;
2928
2929                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2930                 if (strncmp(mti->mti_svname, mti->mti_fsname,
2931                             MTI_NAME_MAXLEN) == 0)
2932                         /* device is unspecified completely? */
2933                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
2934                 else
2935                         rc = server_name2index(mti->mti_svname, &idx, NULL);
2936                 if (rc < 0)
2937                         goto active_err;
2938                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
2939                         goto active_err;
2940                 if (rc & LDD_F_SV_ALL) {
2941                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2942                                 if (!cfs_test_bit(i,
2943                                                   fsdb->fsdb_mdt_index_map))
2944                                         continue;
2945                                 rc = name_create_mdt(&logname,
2946                                                 mti->mti_fsname, i);
2947                                 if (rc)
2948                                         goto active_err;
2949                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2950                                                   logname, &mgi->mgi_bufs,
2951                                                   logname, ptr);
2952                                 name_destroy(&logname);
2953                                 if (rc)
2954                                         goto active_err;
2955                         }
2956                 } else {
2957                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
2958                                           mti->mti_svname, &mgi->mgi_bufs,
2959                                           mti->mti_svname, ptr);
2960                         if (rc)
2961                                 goto active_err;
2962                 }
2963                 GOTO(end, rc);
2964         }
2965
2966         /* All mdd., ost. params in proc */
2967         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
2968             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
2969                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
2970                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
2971                         GOTO(end, rc = -ENODEV);
2972
2973                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
2974                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
2975                 GOTO(end, rc);
2976         }
2977
2978         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
2979         rc2 = -ENOSYS;
2980
2981 end:
2982         if (rc)
2983                 CERROR("err %d on param '%s'\n", rc, ptr);
2984
2985         RETURN(rc ?: rc2);
2986 }
2987
2988 /* Not implementing automatic failover nid addition at this time. */
2989 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
2990                       struct mgs_target_info *mti)
2991 {
2992 #if 0
2993         struct fs_db *fsdb;
2994         int rc;
2995         ENTRY;
2996
2997         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
2998         if (rc)
2999                 RETURN(rc);
3000
3001         if (mgs_log_is_empty(obd, mti->mti_svname))
3002                 /* should never happen */
3003                 RETURN(-ENOENT);
3004
3005         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3006
3007         /* FIXME We can just check mti->params to see if we're already in
3008            the failover list.  Modify mti->params for rewriting back at
3009            server_register_target(). */
3010
3011         cfs_mutex_lock(&fsdb->fsdb_mutex);
3012         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3013         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3014
3015         RETURN(rc);
3016 #endif
3017         return 0;
3018 }
3019
3020 int mgs_write_log_target(const struct lu_env *env,
3021                          struct mgs_device *mgs,
3022                          struct mgs_target_info *mti,
3023                          struct fs_db *fsdb)
3024 {
3025         int rc = -EINVAL;
3026         char *buf, *params;
3027         ENTRY;
3028
3029         /* set/check the new target index */
3030         rc = mgs_set_index(env, mgs, mti);
3031         if (rc < 0) {
3032                 CERROR("Can't get index (%d)\n", rc);
3033                 RETURN(rc);
3034         }
3035
3036         if (rc == EALREADY) {
3037                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3038                               mti->mti_stripe_index, mti->mti_svname);
3039                 /* We would like to mark old log sections as invalid
3040                    and add new log sections in the client and mdt logs.
3041                    But if we add new sections, then live clients will
3042                    get repeat setup instructions for already running
3043                    osc's. So don't update the client/mdt logs. */
3044                 mti->mti_flags &= ~LDD_F_UPDATE;
3045         }
3046
3047         cfs_mutex_lock(&fsdb->fsdb_mutex);
3048
3049         if (mti->mti_flags &
3050             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3051                 /* Generate a log from scratch */
3052                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3053                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3054                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3055                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3056                 } else {
3057                         CERROR("Unknown target type %#x, can't create log for "
3058                                "%s\n", mti->mti_flags, mti->mti_svname);
3059                 }
3060                 if (rc) {
3061                         CERROR("Can't write logs for %s (%d)\n",
3062                                mti->mti_svname, rc);
3063                         GOTO(out_up, rc);
3064                 }
3065         } else {
3066                 /* Just update the params from tunefs in mgs_write_log_params */
3067                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3068                 mti->mti_flags |= LDD_F_PARAM;
3069         }
3070
3071         /* allocate temporary buffer, where class_get_next_param will
3072            make copy of a current  parameter */
3073         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3074         if (buf == NULL)
3075                 GOTO(out_up, rc = -ENOMEM);
3076         params = mti->mti_params;
3077         while (params != NULL) {
3078                 rc = class_get_next_param(&params, buf);
3079                 if (rc) {
3080                         if (rc == 1)
3081                                 /* there is no next parameter, that is
3082                                    not an error */
3083                                 rc = 0;
3084                         break;
3085                 }
3086                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3087                        params, buf);
3088                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3089                 if (rc)
3090                         break;
3091         }
3092
3093         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3094
3095 out_up:
3096         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3097         RETURN(rc);
3098 }
3099
3100 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3101 {
3102         struct lvfs_run_ctxt     saved;
3103         struct llog_ctxt        *ctxt;
3104         int                      rc = 0;
3105         struct obd_device *obd = mgs->mgs_obd;
3106
3107         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
3108         if (ctxt == NULL) {
3109                 CERROR("%s: MGS config context doesn't exist\n",
3110                        obd->obd_name);
3111                 rc = -ENODEV;
3112         } else {
3113                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3114                 rc = llog_erase(NULL, ctxt, NULL, name);
3115                 /* llog may not exist */
3116                 if (rc == -ENOENT)
3117                         rc = 0;
3118                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3119                 llog_ctxt_put(ctxt);
3120         }
3121
3122         if (rc)
3123                 CERROR("%s: failed to clear log %s: %d\n", obd->obd_name,
3124                        name, rc);
3125
3126         return rc;
3127 }
3128
3129 /* erase all logs for the given fs */
3130 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3131 {
3132         struct fs_db *fsdb;
3133         cfs_list_t list;
3134         struct mgs_direntry *dirent, *n;
3135         int rc, len = strlen(fsname);
3136         char *suffix;
3137         ENTRY;
3138
3139         /* Find all the logs in the CONFIGS directory */
3140         rc = class_dentry_readdir(env, mgs, &list);
3141         if (rc)
3142                 RETURN(rc);
3143
3144         cfs_mutex_lock(&mgs->mgs_mutex);
3145
3146         /* Delete the fs db */
3147         fsdb = mgs_find_fsdb(mgs, fsname);
3148         if (fsdb)
3149                 mgs_free_fsdb(mgs, fsdb);
3150
3151         cfs_mutex_unlock(&mgs->mgs_mutex);
3152
3153         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3154                 cfs_list_del(&dirent->list);
3155                 suffix = strrchr(dirent->name, '-');
3156                 if (suffix != NULL) {
3157                         if ((len == suffix - dirent->name) &&
3158                             (strncmp(fsname, dirent->name, len) == 0)) {
3159                                 CDEBUG(D_MGS, "Removing log %s\n",
3160                                        dirent->name);
3161                                 mgs_erase_log(env, mgs, dirent->name);
3162                         }
3163                 }
3164                 mgs_direntry_free(dirent);
3165         }
3166
3167         RETURN(rc);
3168 }
3169
3170 /* from llog_swab */
3171 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3172 {
3173         int i;
3174         ENTRY;
3175
3176         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3177         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3178
3179         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3180         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3181         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3182         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3183
3184         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3185         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3186                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3187                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3188                                i, lcfg->lcfg_buflens[i],
3189                                lustre_cfg_string(lcfg, i));
3190                 }
3191         EXIT;
3192 }
3193
3194 /* Set a permanent (config log) param for a target or fs
3195  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3196  *             buf1 contains the single parameter
3197  */
3198 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3199                  struct lustre_cfg *lcfg, char *fsname)
3200 {
3201         struct fs_db *fsdb;
3202         struct mgs_target_info *mti;
3203         char *devname, *param;
3204         char *ptr, *tmp;
3205         __u32 index;
3206         int rc = 0;
3207         ENTRY;
3208
3209         print_lustre_cfg(lcfg);
3210
3211         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3212         devname = lustre_cfg_string(lcfg, 0);
3213         param = lustre_cfg_string(lcfg, 1);
3214         if (!devname) {
3215                 /* Assume device name embedded in param:
3216                    lustre-OST0000.osc.max_dirty_mb=32 */
3217                 ptr = strchr(param, '.');
3218                 if (ptr) {
3219                         devname = param;
3220                         *ptr = 0;
3221                         param = ptr + 1;
3222                 }
3223         }
3224         if (!devname) {
3225                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3226                 RETURN(-ENOSYS);
3227         }
3228
3229         /* Extract fsname */
3230         ptr = strrchr(devname, '-');
3231         memset(fsname, 0, MTI_NAME_MAXLEN);
3232         if (ptr && (server_name2index(ptr, &index, NULL) >= 0)) {
3233                 /* param related to llite isn't allowed to set by OST or MDT */
3234                 if (strncmp(param, PARAM_LLITE, sizeof(PARAM_LLITE)) == 0)
3235                         RETURN(-EINVAL);
3236
3237                 strncpy(fsname, devname, ptr - devname);
3238         } else {
3239                 /* assume devname is the fsname */
3240                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3241         }
3242         fsname[MTI_NAME_MAXLEN - 1] = 0;
3243         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3244
3245         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3246         if (rc)
3247                 RETURN(rc);
3248         if (!cfs_test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3249             cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3250                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3251                        "is '%s'\n", fsname, devname);
3252                 mgs_free_fsdb(mgs, fsdb);
3253                 RETURN(-EINVAL);
3254         }
3255
3256         /* Create a fake mti to hold everything */
3257         OBD_ALLOC_PTR(mti);
3258         if (!mti)
3259                 GOTO(out, rc = -ENOMEM);
3260         strncpy(mti->mti_fsname, fsname, MTI_NAME_MAXLEN);
3261         strncpy(mti->mti_svname, devname, MTI_NAME_MAXLEN);
3262         strncpy(mti->mti_params, param, sizeof(mti->mti_params));
3263         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3264         if (rc < 0)
3265                 /* Not a valid server; may be only fsname */
3266                 rc = 0;
3267         else
3268                 /* Strip -osc or -mdc suffix from svname */
3269                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3270                                      mti->mti_svname))
3271                         GOTO(out, rc = -EINVAL);
3272
3273         mti->mti_flags = rc | LDD_F_PARAM;
3274
3275         cfs_mutex_lock(&fsdb->fsdb_mutex);
3276         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3277         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3278
3279         /*
3280          * Revoke lock so everyone updates.  Should be alright if
3281          * someone was already reading while we were updating the logs,
3282          * so we don't really need to hold the lock while we're
3283          * writing (above).
3284          */
3285         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3286 out:
3287         OBD_FREE_PTR(mti);
3288         RETURN(rc);
3289 }
3290
3291 static int mgs_write_log_pool(const struct lu_env *env,
3292                               struct mgs_device *mgs, char *logname,
3293                               struct fs_db *fsdb, char *lovname,
3294                               enum lcfg_command_type cmd,
3295                               char *poolname, char *fsname,
3296                               char *ostname, char *comment)
3297 {
3298         struct llog_handle *llh = NULL;
3299         int rc;
3300
3301         rc = record_start_log(env, mgs, &llh, logname);
3302         if (rc)
3303                 return rc;
3304         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3305         if (rc)
3306                 goto out;
3307         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3308         if (rc)
3309                 goto out;
3310         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3311 out:
3312         record_end_log(env, &llh);
3313         return rc;
3314 }
3315
3316 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3317                  enum lcfg_command_type cmd, char *fsname,
3318                  char *poolname, char *ostname)
3319 {
3320         struct fs_db *fsdb;
3321         char *lovname;
3322         char *logname;
3323         char *label = NULL, *canceled_label = NULL;
3324         int label_sz;
3325         struct mgs_target_info *mti = NULL;
3326         int rc, i;
3327         ENTRY;
3328
3329         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3330         if (rc) {
3331                 CERROR("Can't get db for %s\n", fsname);
3332                 RETURN(rc);
3333         }
3334         if (cfs_test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3335                 CERROR("%s is not defined\n", fsname);
3336                 mgs_free_fsdb(mgs, fsdb);
3337                 RETURN(-EINVAL);
3338         }
3339
3340         label_sz = 10 + strlen(fsname) + strlen(poolname);
3341
3342         /* check if ostname match fsname */
3343         if (ostname != NULL) {
3344                 char *ptr;
3345
3346                 ptr = strrchr(ostname, '-');
3347                 if ((ptr == NULL) ||
3348                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3349                         RETURN(-EINVAL);
3350                 label_sz += strlen(ostname);
3351         }
3352
3353         OBD_ALLOC(label, label_sz);
3354         if (label == NULL)
3355                 RETURN(-ENOMEM);
3356
3357         switch(cmd) {
3358         case LCFG_POOL_NEW:
3359                 sprintf(label,
3360                         "new %s.%s", fsname, poolname);
3361                 break;
3362         case LCFG_POOL_ADD:
3363                 sprintf(label,
3364                         "add %s.%s.%s", fsname, poolname, ostname);
3365                 break;
3366         case LCFG_POOL_REM:
3367                 OBD_ALLOC(canceled_label, label_sz);
3368                 if (canceled_label == NULL)
3369                         GOTO(out_label, rc = -ENOMEM);
3370                 sprintf(label,
3371                         "rem %s.%s.%s", fsname, poolname, ostname);
3372                 sprintf(canceled_label,
3373                         "add %s.%s.%s", fsname, poolname, ostname);
3374                 break;
3375         case LCFG_POOL_DEL:
3376                 OBD_ALLOC(canceled_label, label_sz);
3377                 if (canceled_label == NULL)
3378                         GOTO(out_label, rc = -ENOMEM);
3379                 sprintf(label,
3380                         "del %s.%s", fsname, poolname);
3381                 sprintf(canceled_label,
3382                         "new %s.%s", fsname, poolname);
3383                 break;
3384         default:
3385                 break;
3386         }
3387
3388         cfs_mutex_lock(&fsdb->fsdb_mutex);
3389
3390         if (canceled_label != NULL) {
3391                 OBD_ALLOC_PTR(mti);
3392                 if (mti == NULL)
3393                         GOTO(out_cancel, rc = -ENOMEM);
3394         }
3395
3396         /* write pool def to all MDT logs */
3397         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3398                  if (cfs_test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3399                         rc = name_create_mdt_and_lov(&logname, &lovname,
3400                                                      fsdb, i);
3401                         if (rc) {
3402                                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3403                                 GOTO(out_mti, rc);
3404                         }
3405                         if (canceled_label != NULL) {
3406                                 strcpy(mti->mti_svname, "lov pool");
3407                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3408                                                 lovname, canceled_label,
3409                                                 CM_SKIP);
3410                         }
3411
3412                         if (rc >= 0)
3413                                 rc = mgs_write_log_pool(env, mgs, logname,
3414                                                         fsdb, lovname, cmd,
3415                                                         fsname, poolname,
3416                                                         ostname, label);
3417                         name_destroy(&logname);
3418                         name_destroy(&lovname);
3419                         if (rc) {
3420                                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3421                                 GOTO(out_mti, rc);
3422                         }
3423                 }
3424         }
3425
3426         rc = name_create(&logname, fsname, "-client");
3427         if (rc) {
3428                 cfs_mutex_unlock(&fsdb->fsdb_mutex);
3429                 GOTO(out_mti, rc);
3430         }
3431         if (canceled_label != NULL) {
3432                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3433                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
3434                 if (rc < 0) {
3435                         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3436                         name_destroy(&logname);
3437                         GOTO(out_mti, rc);
3438                 }
3439         }
3440
3441         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
3442                                 cmd, fsname, poolname, ostname, label);
3443         cfs_mutex_unlock(&fsdb->fsdb_mutex);
3444         name_destroy(&logname);
3445         /* request for update */
3446         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3447
3448         EXIT;
3449 out_mti:
3450         if (mti != NULL)
3451                 OBD_FREE_PTR(mti);
3452 out_cancel:
3453         if (canceled_label != NULL)
3454                 OBD_FREE(canceled_label, label_sz);
3455 out_label:
3456         OBD_FREE(label, label_sz);
3457         return rc;
3458 }
3459
3460 #if 0
3461 /******************** unused *********************/
3462 static int mgs_backup_llog(struct obd_device *obd, char* fsname)
3463 {
3464         struct file *filp, *bak_filp;
3465         struct lvfs_run_ctxt saved;
3466         char *logname, *buf;
3467         loff_t soff = 0 , doff = 0;
3468         int count = 4096, len;
3469         int rc = 0;
3470
3471         OBD_ALLOC(logname, PATH_MAX);
3472         if (logname == NULL)
3473                 return -ENOMEM;
3474
3475         OBD_ALLOC(buf, count);
3476         if (!buf)
3477                 GOTO(out , rc = -ENOMEM);
3478
3479         len = snprintf(logname, PATH_MAX, "%s/%s.bak",
3480                        MOUNT_CONFIGS_DIR, fsname);
3481
3482         if (len >= PATH_MAX - 1) {
3483                 GOTO(out, -ENAMETOOLONG);
3484         }
3485
3486         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3487
3488         bak_filp = l_filp_open(logname, O_RDWR|O_CREAT|O_TRUNC, 0660);
3489         if (IS_ERR(bak_filp)) {
3490                 rc = PTR_ERR(bak_filp);
3491                 CERROR("backup logfile open %s: %d\n", logname, rc);
3492                 GOTO(pop, rc);
3493         }
3494         sprintf(logname, "%s/%s", MOUNT_CONFIGS_DIR, fsname);
3495         filp = l_filp_open(logname, O_RDONLY, 0);
3496         if (IS_ERR(filp)) {
3497                 rc = PTR_ERR(filp);
3498                 CERROR("logfile open %s: %d\n", logname, rc);
3499                 GOTO(close1f, rc);
3500         }
3501
3502         while ((rc = lustre_fread(filp, buf, count, &soff)) > 0) {
3503                 rc = lustre_fwrite(bak_filp, buf, count, &doff);
3504                 break;
3505         }
3506
3507         filp_close(filp, 0);
3508 close1f:
3509         filp_close(bak_filp, 0);
3510 pop:
3511         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3512 out:
3513         if (buf)
3514                 OBD_FREE(buf, count);
3515         OBD_FREE(logname, PATH_MAX);
3516         return rc;
3517 }
3518
3519 #endif