Whamcloud - gitweb
LU-4961 lustre: move ioctls to lustre_ioctl.h
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         LASSERT(dir);
71         LASSERT(dir->do_index_ops);
72
73         iops = &dir->do_index_ops->dio_it;
74         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
75         if (IS_ERR(it))
76                 RETURN(PTR_ERR(it));
77
78         rc = iops->load(env, it, 0);
79         if (rc <= 0)
80                 GOTO(fini, rc = 0);
81
82         /* main cycle */
83         do {
84                 key = (void *)iops->key(env, it);
85                 if (IS_ERR(key)) {
86                         CERROR("%s: key failed when listing %s: rc = %d\n",
87                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
88                                (int) PTR_ERR(key));
89                         goto next;
90                 }
91                 key_sz = iops->key_size(env, it);
92                 LASSERT(key_sz > 0);
93
94                 /* filter out "." and ".." entries */
95                 if (key[0] == '.') {
96                         if (key_sz == 1)
97                                 goto next;
98                         if (key_sz == 2 && key[1] == '.')
99                                 goto next;
100                 }
101
102                 de = mgs_direntry_alloc(key_sz + 1);
103                 if (de == NULL) {
104                         rc = -ENOMEM;
105                         break;
106                 }
107
108                 memcpy(de->name, key, key_sz);
109                 de->name[key_sz] = 0;
110
111                 cfs_list_add(&de->list, list);
112
113 next:
114                 rc = iops->next(env, it);
115         } while (rc == 0);
116         rc = 0;
117
118         iops->put(env, it);
119
120 fini:
121         iops->fini(env, it);
122         if (rc)
123                 CERROR("%s: key failed when listing %s: rc = %d\n",
124                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
125         RETURN(rc);
126 }
127
128 /******************** DB functions *********************/
129
130 static inline int name_create(char **newname, char *prefix, char *suffix)
131 {
132         LASSERT(newname);
133         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
134         if (!*newname)
135                 return -ENOMEM;
136         sprintf(*newname, "%s%s", prefix, suffix);
137         return 0;
138 }
139
140 static inline void name_destroy(char **name)
141 {
142         if (*name)
143                 OBD_FREE(*name, strlen(*name) + 1);
144         *name = NULL;
145 }
146
147 struct mgs_fsdb_handler_data
148 {
149         struct fs_db   *fsdb;
150         __u32           ver;
151 };
152
153 /* from the (client) config log, figure out:
154         1. which ost's/mdt's are configured (by index)
155         2. what the last config step is
156         3. COMPAT_18 osc name
157 */
158 /* It might be better to have a separate db file, instead of parsing the info
159    out of the client log.  This is slow and potentially error-prone. */
160 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
161                             struct llog_rec_hdr *rec, void *data)
162 {
163         struct mgs_fsdb_handler_data *d = data;
164         struct fs_db *fsdb = d->fsdb;
165         int cfg_len = rec->lrh_len;
166         char *cfg_buf = (char*) (rec + 1);
167         struct lustre_cfg *lcfg;
168         __u32 index;
169         int rc = 0;
170         ENTRY;
171
172         if (rec->lrh_type != OBD_CFG_REC) {
173                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
174                 RETURN(-EINVAL);
175         }
176
177         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
178         if (rc) {
179                 CERROR("Insane cfg\n");
180                 RETURN(rc);
181         }
182
183         lcfg = (struct lustre_cfg *)cfg_buf;
184
185         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
186                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
187
188         /* Figure out ost indicies */
189         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
190         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
191             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
192                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
193                                        NULL, 10);
194                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
195                        lustre_cfg_string(lcfg, 1), index,
196                        lustre_cfg_string(lcfg, 2));
197                 set_bit(index, fsdb->fsdb_ost_index_map);
198         }
199
200         /* Figure out mdt indicies */
201         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
202         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
203             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
204                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
205                                        &index, NULL);
206                 if (rc != LDD_F_SV_TYPE_MDT) {
207                         CWARN("Unparsable MDC name %s, assuming index 0\n",
208                               lustre_cfg_string(lcfg, 0));
209                         index = 0;
210                 }
211                 rc = 0;
212                 CDEBUG(D_MGS, "MDT index is %u\n", index);
213                 set_bit(index, fsdb->fsdb_mdt_index_map);
214                 fsdb->fsdb_mdt_count ++;
215         }
216
217         /**
218          * figure out the old config. fsdb_gen = 0 means old log
219          * It is obsoleted and not supported anymore
220          */
221         if (fsdb->fsdb_gen == 0) {
222                 CERROR("Old config format is not supported\n");
223                 RETURN(-EINVAL);
224         }
225
226         /*
227          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
228          */
229         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
230             lcfg->lcfg_command == LCFG_ATTACH &&
231             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
232                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
233                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
234                         CWARN("MDT using 1.8 OSC name scheme\n");
235                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
236                 }
237         }
238
239         if (lcfg->lcfg_command == LCFG_MARKER) {
240                 struct cfg_marker *marker;
241                 marker = lustre_cfg_buf(lcfg, 1);
242
243                 d->ver = marker->cm_vers;
244
245                 /* Keep track of the latest marker step */
246                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
247         }
248
249         RETURN(rc);
250 }
251
252 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
253 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
254                                   struct mgs_device *mgs,
255                                   struct fs_db *fsdb)
256 {
257         char                            *logname;
258         struct llog_handle              *loghandle;
259         struct llog_ctxt                *ctxt;
260         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
261         int rc;
262
263         ENTRY;
264
265         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
266         LASSERT(ctxt != NULL);
267         rc = name_create(&logname, fsdb->fsdb_name, "-client");
268         if (rc)
269                 GOTO(out_put, rc);
270         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
271         if (rc)
272                 GOTO(out_pop, rc);
273
274         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
275         if (rc)
276                 GOTO(out_close, rc);
277
278         if (llog_get_size(loghandle) <= 1)
279                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
280
281         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
282         CDEBUG(D_INFO, "get_db = %d\n", rc);
283 out_close:
284         llog_close(env, loghandle);
285 out_pop:
286         name_destroy(&logname);
287 out_put:
288         llog_ctxt_put(ctxt);
289
290         RETURN(rc);
291 }
292
293 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
294 {
295         struct mgs_tgt_srpc_conf *tgtconf;
296
297         /* free target-specific rules */
298         while (fsdb->fsdb_srpc_tgt) {
299                 tgtconf = fsdb->fsdb_srpc_tgt;
300                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
301
302                 LASSERT(tgtconf->mtsc_tgt);
303
304                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
305                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
306                 OBD_FREE_PTR(tgtconf);
307         }
308
309         /* free general rules */
310         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
311 }
312
313 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
314 {
315         struct fs_db *fsdb;
316         cfs_list_t *tmp;
317
318         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
319                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
320                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
321                         return fsdb;
322         }
323         return NULL;
324 }
325
326 /* caller must hold the mgs->mgs_fs_db_lock */
327 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
328                                   struct mgs_device *mgs, char *fsname)
329 {
330         struct fs_db *fsdb;
331         int rc;
332         ENTRY;
333
334         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
335                 CERROR("fsname %s is too long\n", fsname);
336                 RETURN(NULL);
337         }
338
339         OBD_ALLOC_PTR(fsdb);
340         if (!fsdb)
341                 RETURN(NULL);
342
343         strcpy(fsdb->fsdb_name, fsname);
344         mutex_init(&fsdb->fsdb_mutex);
345         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
346         fsdb->fsdb_gen = 1;
347
348         if (strcmp(fsname, MGSSELF_NAME) == 0) {
349                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
350         } else {
351                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
352                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
353                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
354                         CERROR("No memory for index maps\n");
355                         GOTO(err, rc = -ENOMEM);
356                 }
357
358                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
359                 if (rc)
360                         GOTO(err, rc);
361                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
362                 if (rc)
363                         GOTO(err, rc);
364
365                 /* initialise data for NID table */
366                 mgs_ir_init_fs(env, mgs, fsdb);
367
368                 lproc_mgs_add_live(mgs, fsdb);
369         }
370
371         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
372
373         RETURN(fsdb);
374 err:
375         if (fsdb->fsdb_ost_index_map)
376                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
377         if (fsdb->fsdb_mdt_index_map)
378                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
379         name_destroy(&fsdb->fsdb_clilov);
380         name_destroy(&fsdb->fsdb_clilmv);
381         OBD_FREE_PTR(fsdb);
382         RETURN(NULL);
383 }
384
385 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
386 {
387         /* wait for anyone with the sem */
388         mutex_lock(&fsdb->fsdb_mutex);
389         lproc_mgs_del_live(mgs, fsdb);
390         cfs_list_del(&fsdb->fsdb_list);
391
392         /* deinitialize fsr */
393         mgs_ir_fini_fs(mgs, fsdb);
394
395         if (fsdb->fsdb_ost_index_map)
396                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
397         if (fsdb->fsdb_mdt_index_map)
398                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
399         name_destroy(&fsdb->fsdb_clilov);
400         name_destroy(&fsdb->fsdb_clilmv);
401         mgs_free_fsdb_srpc(fsdb);
402         mutex_unlock(&fsdb->fsdb_mutex);
403         OBD_FREE_PTR(fsdb);
404 }
405
406 int mgs_init_fsdb_list(struct mgs_device *mgs)
407 {
408         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
409         return 0;
410 }
411
412 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
413 {
414         struct fs_db *fsdb;
415         cfs_list_t *tmp, *tmp2;
416         mutex_lock(&mgs->mgs_mutex);
417         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
418                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
419                 mgs_free_fsdb(mgs, fsdb);
420         }
421         mutex_unlock(&mgs->mgs_mutex);
422         return 0;
423 }
424
425 int mgs_find_or_make_fsdb(const struct lu_env *env,
426                           struct mgs_device *mgs, char *name,
427                           struct fs_db **dbh)
428 {
429         struct fs_db *fsdb;
430         int rc = 0;
431
432         ENTRY;
433         mutex_lock(&mgs->mgs_mutex);
434         fsdb = mgs_find_fsdb(mgs, name);
435         if (fsdb) {
436                 mutex_unlock(&mgs->mgs_mutex);
437                 *dbh = fsdb;
438                 RETURN(0);
439         }
440
441         CDEBUG(D_MGS, "Creating new db\n");
442         fsdb = mgs_new_fsdb(env, mgs, name);
443         /* lock fsdb_mutex until the db is loaded from llogs */
444         if (fsdb)
445                 mutex_lock(&fsdb->fsdb_mutex);
446         mutex_unlock(&mgs->mgs_mutex);
447         if (!fsdb)
448                 RETURN(-ENOMEM);
449
450         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
451                 /* populate the db from the client llog */
452                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
453                 if (rc) {
454                         CERROR("Can't get db from client log %d\n", rc);
455                         GOTO(out_free, rc);
456                 }
457         }
458
459         /* populate srpc rules from params llog */
460         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
461         if (rc) {
462                 CERROR("Can't get db from params log %d\n", rc);
463                 GOTO(out_free, rc);
464         }
465
466         mutex_unlock(&fsdb->fsdb_mutex);
467         *dbh = fsdb;
468
469         RETURN(0);
470
471 out_free:
472         mutex_unlock(&fsdb->fsdb_mutex);
473         mgs_free_fsdb(mgs, fsdb);
474         return rc;
475 }
476
477 /* 1 = index in use
478    0 = index unused
479    -1= empty client log */
480 int mgs_check_index(const struct lu_env *env,
481                     struct mgs_device *mgs,
482                     struct mgs_target_info *mti)
483 {
484         struct fs_db *fsdb;
485         void *imap;
486         int rc = 0;
487         ENTRY;
488
489         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
490
491         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
492         if (rc) {
493                 CERROR("Can't get db for %s\n", mti->mti_fsname);
494                 RETURN(rc);
495         }
496
497         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
498                 RETURN(-1);
499
500         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
501                 imap = fsdb->fsdb_ost_index_map;
502         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
503                 imap = fsdb->fsdb_mdt_index_map;
504         else
505                 RETURN(-EINVAL);
506
507         if (test_bit(mti->mti_stripe_index, imap))
508                 RETURN(1);
509         RETURN(0);
510 }
511
512 static __inline__ int next_index(void *index_map, int map_len)
513 {
514         int i;
515         for (i = 0; i < map_len * 8; i++)
516                 if (!test_bit(i, index_map)) {
517                          return i;
518                  }
519         CERROR("max index %d exceeded.\n", i);
520         return -1;
521 }
522
523 /* Return codes:
524         0  newly marked as in use
525         <0 err
526         +EALREADY for update of an old index */
527 static int mgs_set_index(const struct lu_env *env,
528                          struct mgs_device *mgs,
529                          struct mgs_target_info *mti)
530 {
531         struct fs_db *fsdb;
532         void *imap;
533         int rc = 0;
534         ENTRY;
535
536         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
537         if (rc) {
538                 CERROR("Can't get db for %s\n", mti->mti_fsname);
539                 RETURN(rc);
540         }
541
542         mutex_lock(&fsdb->fsdb_mutex);
543         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
544                 imap = fsdb->fsdb_ost_index_map;
545         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
546                 imap = fsdb->fsdb_mdt_index_map;
547         } else {
548                 GOTO(out_up, rc = -EINVAL);
549         }
550
551         if (mti->mti_flags & LDD_F_NEED_INDEX) {
552                 rc = next_index(imap, INDEX_MAP_SIZE);
553                 if (rc == -1)
554                         GOTO(out_up, rc = -ERANGE);
555                 mti->mti_stripe_index = rc;
556                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
557                         fsdb->fsdb_mdt_count ++;
558         }
559
560         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
561                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
562                                    "but the max index is %d.\n",
563                                    mti->mti_svname, mti->mti_stripe_index,
564                                    INDEX_MAP_SIZE * 8);
565                 GOTO(out_up, rc = -ERANGE);
566         }
567
568         if (test_bit(mti->mti_stripe_index, imap)) {
569                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
570                     !(mti->mti_flags & LDD_F_WRITECONF)) {
571                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
572                                            "%d, but that index is already in "
573                                            "use. Use --writeconf to force\n",
574                                            mti->mti_svname,
575                                            mti->mti_stripe_index);
576                         GOTO(out_up, rc = -EADDRINUSE);
577                 } else {
578                         CDEBUG(D_MGS, "Server %s updating index %d\n",
579                                mti->mti_svname, mti->mti_stripe_index);
580                         GOTO(out_up, rc = EALREADY);
581                 }
582         }
583
584         set_bit(mti->mti_stripe_index, imap);
585         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
586         mutex_unlock(&fsdb->fsdb_mutex);
587         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
588                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
589
590         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
591                mti->mti_stripe_index);
592
593         RETURN(0);
594 out_up:
595         mutex_unlock(&fsdb->fsdb_mutex);
596         return rc;
597 }
598
599 struct mgs_modify_lookup {
600         struct cfg_marker mml_marker;
601         int               mml_modified;
602 };
603
604 static int mgs_modify_handler(const struct lu_env *env,
605                               struct llog_handle *llh,
606                               struct llog_rec_hdr *rec, void *data)
607 {
608         struct mgs_modify_lookup *mml = data;
609         struct cfg_marker *marker;
610         struct lustre_cfg *lcfg = REC_DATA(rec);
611         int cfg_len = REC_DATA_LEN(rec);
612         int rc;
613         ENTRY;
614
615         if (rec->lrh_type != OBD_CFG_REC) {
616                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
617                 RETURN(-EINVAL);
618         }
619
620         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
621         if (rc) {
622                 CERROR("Insane cfg\n");
623                 RETURN(rc);
624         }
625
626         /* We only care about markers */
627         if (lcfg->lcfg_command != LCFG_MARKER)
628                 RETURN(0);
629
630         marker = lustre_cfg_buf(lcfg, 1);
631         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
632             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
633             !(marker->cm_flags & CM_SKIP)) {
634                 /* Found a non-skipped marker match */
635                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
636                        rec->lrh_index, marker->cm_step,
637                        marker->cm_flags, mml->mml_marker.cm_flags,
638                        marker->cm_tgtname, marker->cm_comment);
639                 /* Overwrite the old marker llog entry */
640                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
641                 marker->cm_flags |= mml->mml_marker.cm_flags;
642                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
643                 /* Header and tail are added back to lrh_len in
644                    llog_lvfs_write_rec */
645                 rec->lrh_len = cfg_len;
646                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
647                                 rec->lrh_index);
648                 if (!rc)
649                          mml->mml_modified++;
650         }
651
652         RETURN(rc);
653 }
654
655 /**
656  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
657  * Return code:
658  * 0 - modified successfully,
659  * 1 - no modification was done
660  * negative - error
661  */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct llog_ctxt *ctxt;
668         struct mgs_modify_lookup *mml;
669         int rc;
670
671         ENTRY;
672
673         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
674         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
675                flags);
676
677         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
678         LASSERT(ctxt != NULL);
679         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
680         if (rc < 0) {
681                 if (rc == -ENOENT)
682                         rc = 0;
683                 GOTO(out_pop, rc);
684         }
685
686         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
687         if (rc)
688                 GOTO(out_close, rc);
689
690         if (llog_get_size(loghandle) <= 1)
691                 GOTO(out_close, rc = 0);
692
693         OBD_ALLOC_PTR(mml);
694         if (!mml)
695                 GOTO(out_close, rc = -ENOMEM);
696         if (strlcpy(mml->mml_marker.cm_comment, comment,
697                     sizeof(mml->mml_marker.cm_comment)) >=
698             sizeof(mml->mml_marker.cm_comment))
699                 GOTO(out_free, rc = -E2BIG);
700         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
701                     sizeof(mml->mml_marker.cm_tgtname)) >=
702             sizeof(mml->mml_marker.cm_tgtname))
703                 GOTO(out_free, rc = -E2BIG);
704         /* Modify mostly means cancel */
705         mml->mml_marker.cm_flags = flags;
706         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
707         mml->mml_modified = 0;
708         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
709                           NULL);
710         if (!rc && !mml->mml_modified)
711                 rc = 1;
712
713 out_free:
714         OBD_FREE_PTR(mml);
715
716 out_close:
717         llog_close(env, loghandle);
718 out_pop:
719         if (rc < 0)
720                 CERROR("%s: modify %s/%s failed: rc = %d\n",
721                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
722         llog_ctxt_put(ctxt);
723         RETURN(rc);
724 }
725
726 /** This structure is passed to mgs_replace_handler */
727 struct mgs_replace_uuid_lookup {
728         /* Nids are replaced for this target device */
729         struct mgs_target_info target;
730         /* Temporary modified llog */
731         struct llog_handle *temp_llh;
732         /* Flag is set if in target block*/
733         int in_target_device;
734         /* Nids already added. Just skip (multiple nids) */
735         int device_nids_added;
736         /* Flag is set if this block should not be copied */
737         int skip_it;
738 };
739
740 /**
741  * Check: a) if block should be skipped
742  * b) is it target block
743  *
744  * \param[in] lcfg
745  * \param[in] mrul
746  *
747  * \retval 0 should not to be skipped
748  * \retval 1 should to be skipped
749  */
750 static int check_markers(struct lustre_cfg *lcfg,
751                          struct mgs_replace_uuid_lookup *mrul)
752 {
753          struct cfg_marker *marker;
754
755         /* Track markers. Find given device */
756         if (lcfg->lcfg_command == LCFG_MARKER) {
757                 marker = lustre_cfg_buf(lcfg, 1);
758                 /* Clean llog from records marked as CM_EXCLUDE.
759                    CM_SKIP records are used for "active" command
760                    and can be restored if needed */
761                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
762                     (CM_EXCLUDE | CM_START)) {
763                         mrul->skip_it = 1;
764                         return 1;
765                 }
766
767                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
768                     (CM_EXCLUDE | CM_END)) {
769                         mrul->skip_it = 0;
770                         return 1;
771                 }
772
773                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
774                         LASSERT(!(marker->cm_flags & CM_START) ||
775                                 !(marker->cm_flags & CM_END));
776                         if (marker->cm_flags & CM_START) {
777                                 mrul->in_target_device = 1;
778                                 mrul->device_nids_added = 0;
779                         } else if (marker->cm_flags & CM_END)
780                                 mrul->in_target_device = 0;
781                 }
782         }
783
784         return 0;
785 }
786
787 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
788                        struct lustre_cfg *lcfg)
789 {
790         struct llog_rec_hdr      rec;
791         int                      buflen, rc;
792
793         if (!lcfg || !llh)
794                 return -ENOMEM;
795
796         LASSERT(llh->lgh_ctxt);
797
798         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
799                                 lcfg->lcfg_buflens);
800         rec.lrh_len = llog_data_len(buflen);
801         rec.lrh_type = OBD_CFG_REC;
802
803         /* idx = -1 means append */
804         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
805         if (rc)
806                 CERROR("failed %d\n", rc);
807         return rc;
808 }
809
810 static int record_base(const struct lu_env *env, struct llog_handle *llh,
811                      char *cfgname, lnet_nid_t nid, int cmd,
812                      char *s1, char *s2, char *s3, char *s4)
813 {
814         struct mgs_thread_info *mgi = mgs_env_info(env);
815         struct lustre_cfg     *lcfg;
816         int rc;
817
818         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
819                cmd, s1, s2, s3, s4);
820
821         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
822         if (s1)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
824         if (s2)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
826         if (s3)
827                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
828         if (s4)
829                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
830
831         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
832         if (!lcfg)
833                 return -ENOMEM;
834         lcfg->lcfg_nid = nid;
835
836         rc = record_lcfg(env, llh, lcfg);
837
838         lustre_cfg_free(lcfg);
839
840         if (rc) {
841                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
842                        cmd, s1, s2, s3, s4);
843         }
844         return rc;
845 }
846
847 static inline int record_add_uuid(const struct lu_env *env,
848                                   struct llog_handle *llh,
849                                   uint64_t nid, char *uuid)
850 {
851         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
852 }
853
854 static inline int record_add_conn(const struct lu_env *env,
855                                   struct llog_handle *llh,
856                                   char *devname, char *uuid)
857 {
858         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
859 }
860
861 static inline int record_attach(const struct lu_env *env,
862                                 struct llog_handle *llh, char *devname,
863                                 char *type, char *uuid)
864 {
865         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
866 }
867
868 static inline int record_setup(const struct lu_env *env,
869                                struct llog_handle *llh, char *devname,
870                                char *s1, char *s2, char *s3, char *s4)
871 {
872         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
873 }
874
875 /**
876  * \retval <0 record processing error
877  * \retval n record is processed. No need copy original one.
878  * \retval 0 record is not processed.
879  */
880 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
881                            struct mgs_replace_uuid_lookup *mrul)
882 {
883         int nids_added = 0;
884         lnet_nid_t nid;
885         char *ptr;
886         int rc;
887
888         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
889                 /* LCFG_ADD_UUID command found. Let's skip original command
890                    and add passed nids */
891                 ptr = mrul->target.mti_params;
892                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
893                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
894                                "device %s\n", libcfs_nid2str(nid),
895                                 mrul->target.mti_params,
896                                 mrul->target.mti_svname);
897                         rc = record_add_uuid(env,
898                                              mrul->temp_llh, nid,
899                                              mrul->target.mti_params);
900                         if (!rc)
901                                 nids_added++;
902                 }
903
904                 if (nids_added == 0) {
905                         CERROR("No new nids were added, nid %s with uuid %s, "
906                                "device %s\n", libcfs_nid2str(nid),
907                                mrul->target.mti_params,
908                                mrul->target.mti_svname);
909                         RETURN(-ENXIO);
910                 } else {
911                         mrul->device_nids_added = 1;
912                 }
913
914                 return nids_added;
915         }
916
917         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
918                 /* LCFG_SETUP command found. UUID should be changed */
919                 rc = record_setup(env,
920                                   mrul->temp_llh,
921                                   /* devname the same */
922                                   lustre_cfg_string(lcfg, 0),
923                                   /* s1 is not changed */
924                                   lustre_cfg_string(lcfg, 1),
925                                   /* new uuid should be
926                                   the full nidlist */
927                                   mrul->target.mti_params,
928                                   /* s3 is not changed */
929                                   lustre_cfg_string(lcfg, 3),
930                                   /* s4 is not changed */
931                                   lustre_cfg_string(lcfg, 4));
932                 return rc ? rc : 1;
933         }
934
935         /* Another commands in target device block */
936         return 0;
937 }
938
939 /**
940  * Handler that called for every record in llog.
941  * Records are processed in order they placed in llog.
942  *
943  * \param[in] llh       log to be processed
944  * \param[in] rec       current record
945  * \param[in] data      mgs_replace_uuid_lookup structure
946  *
947  * \retval 0    success
948  */
949 static int mgs_replace_handler(const struct lu_env *env,
950                                struct llog_handle *llh,
951                                struct llog_rec_hdr *rec,
952                                void *data)
953 {
954         struct mgs_replace_uuid_lookup *mrul;
955         struct lustre_cfg *lcfg = REC_DATA(rec);
956         int cfg_len = REC_DATA_LEN(rec);
957         int rc;
958         ENTRY;
959
960         mrul = (struct mgs_replace_uuid_lookup *)data;
961
962         if (rec->lrh_type != OBD_CFG_REC) {
963                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
964                        rec->lrh_type, lcfg->lcfg_command,
965                        lustre_cfg_string(lcfg, 0),
966                        lustre_cfg_string(lcfg, 1));
967                 RETURN(-EINVAL);
968         }
969
970         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
971         if (rc) {
972                 /* Do not copy any invalidated records */
973                 GOTO(skip_out, rc = 0);
974         }
975
976         rc = check_markers(lcfg, mrul);
977         if (rc || mrul->skip_it)
978                 GOTO(skip_out, rc = 0);
979
980         /* Write to new log all commands outside target device block */
981         if (!mrul->in_target_device)
982                 GOTO(copy_out, rc = 0);
983
984         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
985            (failover nids) for this target, assuming that if then
986            primary is changing then so is the failover */
987         if (mrul->device_nids_added &&
988             (lcfg->lcfg_command == LCFG_ADD_UUID ||
989              lcfg->lcfg_command == LCFG_ADD_CONN))
990                 GOTO(skip_out, rc = 0);
991
992         rc = process_command(env, lcfg, mrul);
993         if (rc < 0)
994                 RETURN(rc);
995
996         if (rc)
997                 RETURN(0);
998 copy_out:
999         /* Record is placed in temporary llog as is */
1000         rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1);
1001
1002         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1003                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1004                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1005         RETURN(rc);
1006
1007 skip_out:
1008         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1009                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1010                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1011         RETURN(rc);
1012 }
1013
1014 static int mgs_log_is_empty(const struct lu_env *env,
1015                             struct mgs_device *mgs, char *name)
1016 {
1017         struct llog_ctxt        *ctxt;
1018         int                      rc;
1019
1020         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1021         LASSERT(ctxt != NULL);
1022
1023         rc = llog_is_empty(env, ctxt, name);
1024         llog_ctxt_put(ctxt);
1025         return rc;
1026 }
1027
1028 static int mgs_replace_nids_log(const struct lu_env *env,
1029                                 struct obd_device *mgs, struct fs_db *fsdb,
1030                                 char *logname, char *devname, char *nids)
1031 {
1032         struct llog_handle *orig_llh, *backup_llh;
1033         struct llog_ctxt *ctxt;
1034         struct mgs_replace_uuid_lookup *mrul;
1035         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1036         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1037         char *backup;
1038         int rc, rc2;
1039         ENTRY;
1040
1041         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1042
1043         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1044         LASSERT(ctxt != NULL);
1045
1046         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1047                 /* Log is empty. Nothing to replace */
1048                 GOTO(out_put, rc = 0);
1049         }
1050
1051         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1052         if (backup == NULL)
1053                 GOTO(out_put, rc = -ENOMEM);
1054
1055         sprintf(backup, "%s.bak", logname);
1056
1057         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1058         if (rc == 0) {
1059                 /* Now erase original log file. Connections are not allowed.
1060                    Backup is already saved */
1061                 rc = llog_erase(env, ctxt, NULL, logname);
1062                 if (rc < 0)
1063                         GOTO(out_free, rc);
1064         } else if (rc != -ENOENT) {
1065                 CERROR("%s: can't make backup for %s: rc = %d\n",
1066                        mgs->obd_name, logname, rc);
1067                 GOTO(out_free,rc);
1068         }
1069
1070         /* open local log */
1071         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1072         if (rc)
1073                 GOTO(out_restore, rc);
1074
1075         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1076         if (rc)
1077                 GOTO(out_closel, rc);
1078
1079         /* open backup llog */
1080         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1081                        LLOG_OPEN_EXISTS);
1082         if (rc)
1083                 GOTO(out_closel, rc);
1084
1085         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1086         if (rc)
1087                 GOTO(out_close, rc);
1088
1089         if (llog_get_size(backup_llh) <= 1)
1090                 GOTO(out_close, rc = 0);
1091
1092         OBD_ALLOC_PTR(mrul);
1093         if (!mrul)
1094                 GOTO(out_close, rc = -ENOMEM);
1095         /* devname is only needed information to replace UUID records */
1096         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1097         /* parse nids later */
1098         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1099         /* Copy records to this temporary llog */
1100         mrul->temp_llh = orig_llh;
1101
1102         rc = llog_process(env, backup_llh, mgs_replace_handler,
1103                           (void *)mrul, NULL);
1104         OBD_FREE_PTR(mrul);
1105 out_close:
1106         rc2 = llog_close(NULL, backup_llh);
1107         if (!rc)
1108                 rc = rc2;
1109 out_closel:
1110         rc2 = llog_close(NULL, orig_llh);
1111         if (!rc)
1112                 rc = rc2;
1113
1114 out_restore:
1115         if (rc) {
1116                 CERROR("%s: llog should be restored: rc = %d\n",
1117                        mgs->obd_name, rc);
1118                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1119                                   logname);
1120                 if (rc2 < 0)
1121                         CERROR("%s: can't restore backup %s: rc = %d\n",
1122                                mgs->obd_name, logname, rc2);
1123         }
1124
1125 out_free:
1126         OBD_FREE(backup, strlen(backup) + 1);
1127
1128 out_put:
1129         llog_ctxt_put(ctxt);
1130
1131         if (rc)
1132                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1133                        mgs->obd_name, logname, rc);
1134
1135         RETURN(rc);
1136 }
1137
1138 /**
1139  * Parse device name and get file system name and/or device index
1140  *
1141  * \param[in]   devname device name (ex. lustre-MDT0000)
1142  * \param[out]  fsname  file system name(optional)
1143  * \param[out]  index   device index(optional)
1144  *
1145  * \retval 0    success
1146  */
1147 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1148 {
1149         int rc;
1150         ENTRY;
1151
1152         /* Extract fsname */
1153         if (fsname) {
1154                 rc = server_name2fsname(devname, fsname, NULL);
1155                 if (rc < 0) {
1156                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1157                                devname);
1158                         RETURN(-EINVAL);
1159                 }
1160         }
1161
1162         if (index) {
1163                 rc = server_name2index(devname, index, NULL);
1164                 if (rc < 0) {
1165                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1166                                devname);
1167                         RETURN(-EINVAL);
1168                 }
1169         }
1170
1171         RETURN(0);
1172 }
1173
1174 /* This is only called during replace_nids */
1175 static int only_mgs_is_running(struct obd_device *mgs_obd)
1176 {
1177         /* TDB: Is global variable with devices count exists? */
1178         int num_devices = get_devices_count();
1179         int num_exports = 0;
1180         struct obd_export *exp;
1181
1182         spin_lock(&mgs_obd->obd_dev_lock);
1183         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1184                 /* skip self export */
1185                 if (exp == mgs_obd->obd_self_export)
1186                         continue;
1187                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1188                         continue;
1189
1190                 ++num_exports;
1191
1192                 CERROR("%s: node %s still connected during replace_nids "
1193                        "connect_flags:%llx\n",
1194                        mgs_obd->obd_name,
1195                        libcfs_nid2str(exp->exp_nid_stats->nid),
1196                        exp_connect_flags(exp));
1197
1198         }
1199         spin_unlock(&mgs_obd->obd_dev_lock);
1200
1201         /* osd, MGS and MGC + self_export
1202            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1203         return (num_devices <= 3) && (num_exports == 0);
1204 }
1205
1206 static int name_create_mdt(char **logname, char *fsname, int i)
1207 {
1208         char mdt_index[9];
1209
1210         sprintf(mdt_index, "-MDT%04x", i);
1211         return name_create(logname, fsname, mdt_index);
1212 }
1213
1214 /**
1215  * Replace nids for \a device to \a nids values
1216  *
1217  * \param obd           MGS obd device
1218  * \param devname       nids need to be replaced for this device
1219  * (ex. lustre-OST0000)
1220  * \param nids          nids list (ex. nid1,nid2,nid3)
1221  *
1222  * \retval 0    success
1223  */
1224 int mgs_replace_nids(const struct lu_env *env,
1225                      struct mgs_device *mgs,
1226                      char *devname, char *nids)
1227 {
1228         /* Assume fsname is part of device name */
1229         char fsname[MTI_NAME_MAXLEN];
1230         int rc;
1231         __u32 index;
1232         char *logname;
1233         struct fs_db *fsdb;
1234         unsigned int i;
1235         int conn_state;
1236         struct obd_device *mgs_obd = mgs->mgs_obd;
1237         ENTRY;
1238
1239         /* We can only change NIDs if no other nodes are connected */
1240         spin_lock(&mgs_obd->obd_dev_lock);
1241         conn_state = mgs_obd->obd_no_conn;
1242         mgs_obd->obd_no_conn = 1;
1243         spin_unlock(&mgs_obd->obd_dev_lock);
1244
1245         /* We can not change nids if not only MGS is started */
1246         if (!only_mgs_is_running(mgs_obd)) {
1247                 CERROR("Only MGS is allowed to be started\n");
1248                 GOTO(out, rc = -EINPROGRESS);
1249         }
1250
1251         /* Get fsname and index*/
1252         rc = mgs_parse_devname(devname, fsname, &index);
1253         if (rc)
1254                 GOTO(out, rc);
1255
1256         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1257         if (rc) {
1258                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1259                 GOTO(out, rc);
1260         }
1261
1262         /* Process client llogs */
1263         name_create(&logname, fsname, "-client");
1264         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1265         name_destroy(&logname);
1266         if (rc) {
1267                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1268                        fsname, devname, rc);
1269                 GOTO(out, rc);
1270         }
1271
1272         /* Process MDT llogs */
1273         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1274                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1275                         continue;
1276                 name_create_mdt(&logname, fsname, i);
1277                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1278                 name_destroy(&logname);
1279                 if (rc)
1280                         GOTO(out, rc);
1281         }
1282
1283 out:
1284         spin_lock(&mgs_obd->obd_dev_lock);
1285         mgs_obd->obd_no_conn = conn_state;
1286         spin_unlock(&mgs_obd->obd_dev_lock);
1287
1288         RETURN(rc);
1289 }
1290
1291 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1292                             char *devname, struct lov_desc *desc)
1293 {
1294         struct mgs_thread_info *mgi = mgs_env_info(env);
1295         struct lustre_cfg *lcfg;
1296         int rc;
1297
1298         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1299         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1300         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1301         if (!lcfg)
1302                 return -ENOMEM;
1303         rc = record_lcfg(env, llh, lcfg);
1304
1305         lustre_cfg_free(lcfg);
1306         return rc;
1307 }
1308
1309 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1310                             char *devname, struct lmv_desc *desc)
1311 {
1312         struct mgs_thread_info *mgi = mgs_env_info(env);
1313         struct lustre_cfg *lcfg;
1314         int rc;
1315
1316         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1317         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1318         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1319
1320         rc = record_lcfg(env, llh, lcfg);
1321
1322         lustre_cfg_free(lcfg);
1323         return rc;
1324 }
1325
1326 static inline int record_mdc_add(const struct lu_env *env,
1327                                  struct llog_handle *llh,
1328                                  char *logname, char *mdcuuid,
1329                                  char *mdtuuid, char *index,
1330                                  char *gen)
1331 {
1332         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1333                            mdtuuid,index,gen,mdcuuid);
1334 }
1335
1336 static inline int record_lov_add(const struct lu_env *env,
1337                                  struct llog_handle *llh,
1338                                  char *lov_name, char *ost_uuid,
1339                                  char *index, char *gen)
1340 {
1341         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1342                            ost_uuid, index, gen, 0);
1343 }
1344
1345 static inline int record_mount_opt(const struct lu_env *env,
1346                                    struct llog_handle *llh,
1347                                    char *profile, char *lov_name,
1348                                    char *mdc_name)
1349 {
1350         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1351                            profile,lov_name,mdc_name,0);
1352 }
1353
1354 static int record_marker(const struct lu_env *env,
1355                          struct llog_handle *llh,
1356                          struct fs_db *fsdb, __u32 flags,
1357                          char *tgtname, char *comment)
1358 {
1359         struct mgs_thread_info *mgi = mgs_env_info(env);
1360         struct lustre_cfg *lcfg;
1361         int rc;
1362         int cplen = 0;
1363
1364         if (flags & CM_START)
1365                 fsdb->fsdb_gen++;
1366         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1367         mgi->mgi_marker.cm_flags = flags;
1368         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1369         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1370                         sizeof(mgi->mgi_marker.cm_tgtname));
1371         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1372                 return -E2BIG;
1373         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1374                         sizeof(mgi->mgi_marker.cm_comment));
1375         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1376                 return -E2BIG;
1377         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1378         mgi->mgi_marker.cm_canceltime = 0;
1379         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1380         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1381                             sizeof(mgi->mgi_marker));
1382         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1383         if (!lcfg)
1384                 return -ENOMEM;
1385         rc = record_lcfg(env, llh, lcfg);
1386
1387         lustre_cfg_free(lcfg);
1388         return rc;
1389 }
1390
1391 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1392                             struct llog_handle **llh, char *name)
1393 {
1394         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1395         struct llog_ctxt        *ctxt;
1396         int                      rc = 0;
1397         ENTRY;
1398
1399         if (*llh)
1400                 GOTO(out, rc = -EBUSY);
1401
1402         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1403         if (!ctxt)
1404                 GOTO(out, rc = -ENODEV);
1405         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1406
1407         rc = llog_open_create(env, ctxt, llh, NULL, name);
1408         if (rc)
1409                 GOTO(out_ctxt, rc);
1410         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1411         if (rc)
1412                 llog_close(env, *llh);
1413 out_ctxt:
1414         llog_ctxt_put(ctxt);
1415 out:
1416         if (rc) {
1417                 CERROR("%s: can't start log %s: rc = %d\n",
1418                        mgs->mgs_obd->obd_name, name, rc);
1419                 *llh = NULL;
1420         }
1421         RETURN(rc);
1422 }
1423
1424 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1425 {
1426         int rc;
1427
1428         rc = llog_close(env, *llh);
1429         *llh = NULL;
1430
1431         return rc;
1432 }
1433
1434 /******************** config "macros" *********************/
1435
1436 /* write an lcfg directly into a log (with markers) */
1437 static int mgs_write_log_direct(const struct lu_env *env,
1438                                 struct mgs_device *mgs, struct fs_db *fsdb,
1439                                 char *logname, struct lustre_cfg *lcfg,
1440                                 char *devname, char *comment)
1441 {
1442         struct llog_handle *llh = NULL;
1443         int rc;
1444         ENTRY;
1445
1446         if (!lcfg)
1447                 RETURN(-ENOMEM);
1448
1449         rc = record_start_log(env, mgs, &llh, logname);
1450         if (rc)
1451                 RETURN(rc);
1452
1453         /* FIXME These should be a single journal transaction */
1454         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1455         if (rc)
1456                 GOTO(out_end, rc);
1457         rc = record_lcfg(env, llh, lcfg);
1458         if (rc)
1459                 GOTO(out_end, rc);
1460         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1461         if (rc)
1462                 GOTO(out_end, rc);
1463 out_end:
1464         record_end_log(env, &llh);
1465         RETURN(rc);
1466 }
1467
1468 /* write the lcfg in all logs for the given fs */
1469 int mgs_write_log_direct_all(const struct lu_env *env,
1470                              struct mgs_device *mgs,
1471                              struct fs_db *fsdb,
1472                              struct mgs_target_info *mti,
1473                              struct lustre_cfg *lcfg,
1474                              char *devname, char *comment,
1475                              int server_only)
1476 {
1477         cfs_list_t list;
1478         struct mgs_direntry *dirent, *n;
1479         char *fsname = mti->mti_fsname;
1480         char *logname;
1481         int rc = 0, len = strlen(fsname);
1482         ENTRY;
1483
1484         /* We need to set params for any future logs
1485            as well. FIXME Append this file to every new log.
1486            Actually, we should store as params (text), not llogs.  Or
1487            in a database. */
1488         rc = name_create(&logname, fsname, "-params");
1489         if (rc)
1490                 RETURN(rc);
1491         if (mgs_log_is_empty(env, mgs, logname)) {
1492                 struct llog_handle *llh = NULL;
1493                 rc = record_start_log(env, mgs, &llh, logname);
1494                 if (rc == 0)
1495                         record_end_log(env, &llh);
1496         }
1497         name_destroy(&logname);
1498         if (rc)
1499                 RETURN(rc);
1500
1501         /* Find all the logs in the CONFIGS directory */
1502         rc = class_dentry_readdir(env, mgs, &list);
1503         if (rc)
1504                 RETURN(rc);
1505
1506         /* Could use fsdb index maps instead of directory listing */
1507         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1508                 cfs_list_del(&dirent->list);
1509                 /* don't write to sptlrpc rule log */
1510                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1511                         goto next;
1512
1513                 /* caller wants write server logs only */
1514                 if (server_only && strstr(dirent->name, "-client") != NULL)
1515                         goto next;
1516
1517                 if (strncmp(fsname, dirent->name, len) == 0) {
1518                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1519                         /* Erase any old settings of this same parameter */
1520                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1521                                         devname, comment, CM_SKIP);
1522                         if (rc < 0)
1523                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1524                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1525                         /* Write the new one */
1526                         if (lcfg) {
1527                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1528                                                           dirent->name,
1529                                                           lcfg, devname,
1530                                                           comment);
1531                                 if (rc)
1532                                         CERROR("%s: writing log %s: rc = %d\n",
1533                                                mgs->mgs_obd->obd_name,
1534                                                dirent->name, rc);
1535                         }
1536                 }
1537 next:
1538                 mgs_direntry_free(dirent);
1539         }
1540
1541         RETURN(rc);
1542 }
1543
1544 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1545                                     struct mgs_device *mgs,
1546                                     struct fs_db *fsdb,
1547                                     struct mgs_target_info *mti,
1548                                     int index, char *logname);
1549 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1550                                     struct mgs_device *mgs,
1551                                     struct fs_db *fsdb,
1552                                     struct mgs_target_info *mti,
1553                                     char *logname, char *suffix, char *lovname,
1554                                     enum lustre_sec_part sec_part, int flags);
1555 static int name_create_mdt_and_lov(char **logname, char **lovname,
1556                                    struct fs_db *fsdb, int i);
1557
1558 static int add_param(char *params, char *key, char *val)
1559 {
1560         char *start = params + strlen(params);
1561         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1562         int keylen = 0;
1563
1564         if (key != NULL)
1565                 keylen = strlen(key);
1566         if (start + 1 + keylen + strlen(val) >= end) {
1567                 CERROR("params are too long: %s %s%s\n",
1568                        params, key != NULL ? key : "", val);
1569                 return -EINVAL;
1570         }
1571
1572         sprintf(start, " %s%s", key != NULL ? key : "", val);
1573         return 0;
1574 }
1575
1576 /**
1577  * Walk through client config log record and convert the related records
1578  * into the target.
1579  **/
1580 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1581                                          struct llog_handle *llh,
1582                                          struct llog_rec_hdr *rec, void *data)
1583 {
1584         struct mgs_device *mgs;
1585         struct obd_device *obd;
1586         struct mgs_target_info *mti, *tmti;
1587         struct fs_db *fsdb;
1588         int cfg_len = rec->lrh_len;
1589         char *cfg_buf = (char*) (rec + 1);
1590         struct lustre_cfg *lcfg;
1591         int rc = 0;
1592         struct llog_handle *mdt_llh = NULL;
1593         static int got_an_osc_or_mdc = 0;
1594         /* 0: not found any osc/mdc;
1595            1: found osc;
1596            2: found mdc;
1597         */
1598         static int last_step = -1;
1599         int cplen = 0;
1600
1601         ENTRY;
1602
1603         mti = ((struct temp_comp*)data)->comp_mti;
1604         tmti = ((struct temp_comp*)data)->comp_tmti;
1605         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1606         obd = ((struct temp_comp *)data)->comp_obd;
1607         mgs = lu2mgs_dev(obd->obd_lu_dev);
1608         LASSERT(mgs);
1609
1610         if (rec->lrh_type != OBD_CFG_REC) {
1611                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1612                 RETURN(-EINVAL);
1613         }
1614
1615         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1616         if (rc) {
1617                 CERROR("Insane cfg\n");
1618                 RETURN(rc);
1619         }
1620
1621         lcfg = (struct lustre_cfg *)cfg_buf;
1622
1623         if (lcfg->lcfg_command == LCFG_MARKER) {
1624                 struct cfg_marker *marker;
1625                 marker = lustre_cfg_buf(lcfg, 1);
1626                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1627                     (marker->cm_flags & CM_START) &&
1628                      !(marker->cm_flags & CM_SKIP)) {
1629                         got_an_osc_or_mdc = 1;
1630                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1631                                         sizeof(tmti->mti_svname));
1632                         if (cplen >= sizeof(tmti->mti_svname))
1633                                 RETURN(-E2BIG);
1634                         rc = record_start_log(env, mgs, &mdt_llh,
1635                                               mti->mti_svname);
1636                         if (rc)
1637                                 RETURN(rc);
1638                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1639                                            mti->mti_svname, "add osc(copied)");
1640                         record_end_log(env, &mdt_llh);
1641                         last_step = marker->cm_step;
1642                         RETURN(rc);
1643                 }
1644                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1645                     (marker->cm_flags & CM_END) &&
1646                      !(marker->cm_flags & CM_SKIP)) {
1647                         LASSERT(last_step == marker->cm_step);
1648                         last_step = -1;
1649                         got_an_osc_or_mdc = 0;
1650                         memset(tmti, 0, sizeof(*tmti));
1651                         rc = record_start_log(env, mgs, &mdt_llh,
1652                                               mti->mti_svname);
1653                         if (rc)
1654                                 RETURN(rc);
1655                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1656                                            mti->mti_svname, "add osc(copied)");
1657                         record_end_log(env, &mdt_llh);
1658                         RETURN(rc);
1659                 }
1660                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1661                     (marker->cm_flags & CM_START) &&
1662                      !(marker->cm_flags & CM_SKIP)) {
1663                         got_an_osc_or_mdc = 2;
1664                         last_step = marker->cm_step;
1665                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1666                                strlen(marker->cm_tgtname));
1667
1668                         RETURN(rc);
1669                 }
1670                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1671                     (marker->cm_flags & CM_END) &&
1672                      !(marker->cm_flags & CM_SKIP)) {
1673                         LASSERT(last_step == marker->cm_step);
1674                         last_step = -1;
1675                         got_an_osc_or_mdc = 0;
1676                         memset(tmti, 0, sizeof(*tmti));
1677                         RETURN(rc);
1678                 }
1679         }
1680
1681         if (got_an_osc_or_mdc == 0 || last_step < 0)
1682                 RETURN(rc);
1683
1684         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1685                 uint64_t nodenid = lcfg->lcfg_nid;
1686
1687                 if (strlen(tmti->mti_uuid) == 0) {
1688                         /* target uuid not set, this config record is before
1689                          * LCFG_SETUP, this nid is one of target node nid.
1690                          */
1691                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1692                         tmti->mti_nid_count++;
1693                 } else {
1694                         /* failover node nid */
1695                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1696                                        libcfs_nid2str(nodenid));
1697                 }
1698
1699                 RETURN(rc);
1700         }
1701
1702         if (lcfg->lcfg_command == LCFG_SETUP) {
1703                 char *target;
1704
1705                 target = lustre_cfg_string(lcfg, 1);
1706                 memcpy(tmti->mti_uuid, target, strlen(target));
1707                 RETURN(rc);
1708         }
1709
1710         /* ignore client side sptlrpc_conf_log */
1711         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1712                 RETURN(rc);
1713
1714         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1715                 int index;
1716
1717                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1718                         RETURN (-EINVAL);
1719
1720                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1721                        strlen(mti->mti_fsname));
1722                 tmti->mti_stripe_index = index;
1723
1724                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1725                                               mti->mti_stripe_index,
1726                                               mti->mti_svname);
1727                 memset(tmti, 0, sizeof(*tmti));
1728                 RETURN(rc);
1729         }
1730
1731         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1732                 int index;
1733                 char mdt_index[9];
1734                 char *logname, *lovname;
1735
1736                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1737                                              mti->mti_stripe_index);
1738                 if (rc)
1739                         RETURN(rc);
1740                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1741
1742                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1743                         name_destroy(&logname);
1744                         name_destroy(&lovname);
1745                         RETURN(-EINVAL);
1746                 }
1747
1748                 tmti->mti_stripe_index = index;
1749                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1750                                          mdt_index, lovname,
1751                                          LUSTRE_SP_MDT, 0);
1752                 name_destroy(&logname);
1753                 name_destroy(&lovname);
1754                 RETURN(rc);
1755         }
1756         RETURN(rc);
1757 }
1758
1759 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1760 /* stealed from mgs_get_fsdb_from_llog*/
1761 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1762                                               struct mgs_device *mgs,
1763                                               char *client_name,
1764                                               struct temp_comp* comp)
1765 {
1766         struct llog_handle *loghandle;
1767         struct mgs_target_info *tmti;
1768         struct llog_ctxt *ctxt;
1769         int rc;
1770
1771         ENTRY;
1772
1773         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1774         LASSERT(ctxt != NULL);
1775
1776         OBD_ALLOC_PTR(tmti);
1777         if (tmti == NULL)
1778                 GOTO(out_ctxt, rc = -ENOMEM);
1779
1780         comp->comp_tmti = tmti;
1781         comp->comp_obd = mgs->mgs_obd;
1782
1783         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1784                        LLOG_OPEN_EXISTS);
1785         if (rc < 0) {
1786                 if (rc == -ENOENT)
1787                         rc = 0;
1788                 GOTO(out_pop, rc);
1789         }
1790
1791         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1792         if (rc)
1793                 GOTO(out_close, rc);
1794
1795         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1796                                   (void *)comp, NULL, false);
1797         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1798 out_close:
1799         llog_close(env, loghandle);
1800 out_pop:
1801         OBD_FREE_PTR(tmti);
1802 out_ctxt:
1803         llog_ctxt_put(ctxt);
1804         RETURN(rc);
1805 }
1806
1807 /* lmv is the second thing for client logs */
1808 /* copied from mgs_write_log_lov. Please refer to that.  */
1809 static int mgs_write_log_lmv(const struct lu_env *env,
1810                              struct mgs_device *mgs,
1811                              struct fs_db *fsdb,
1812                              struct mgs_target_info *mti,
1813                              char *logname, char *lmvname)
1814 {
1815         struct llog_handle *llh = NULL;
1816         struct lmv_desc *lmvdesc;
1817         char *uuid;
1818         int rc = 0;
1819         ENTRY;
1820
1821         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1822
1823         OBD_ALLOC_PTR(lmvdesc);
1824         if (lmvdesc == NULL)
1825                 RETURN(-ENOMEM);
1826         lmvdesc->ld_active_tgt_count = 0;
1827         lmvdesc->ld_tgt_count = 0;
1828         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1829         uuid = (char *)lmvdesc->ld_uuid.uuid;
1830
1831         rc = record_start_log(env, mgs, &llh, logname);
1832         if (rc)
1833                 GOTO(out_free, rc);
1834         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1835         if (rc)
1836                 GOTO(out_end, rc);
1837         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1838         if (rc)
1839                 GOTO(out_end, rc);
1840         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1841         if (rc)
1842                 GOTO(out_end, rc);
1843         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1844         if (rc)
1845                 GOTO(out_end, rc);
1846 out_end:
1847         record_end_log(env, &llh);
1848 out_free:
1849         OBD_FREE_PTR(lmvdesc);
1850         RETURN(rc);
1851 }
1852
1853 /* lov is the first thing in the mdt and client logs */
1854 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1855                              struct fs_db *fsdb, struct mgs_target_info *mti,
1856                              char *logname, char *lovname)
1857 {
1858         struct llog_handle *llh = NULL;
1859         struct lov_desc *lovdesc;
1860         char *uuid;
1861         int rc = 0;
1862         ENTRY;
1863
1864         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1865
1866         /*
1867         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1868         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1869               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1870         */
1871
1872         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1873         OBD_ALLOC_PTR(lovdesc);
1874         if (lovdesc == NULL)
1875                 RETURN(-ENOMEM);
1876         lovdesc->ld_magic = LOV_DESC_MAGIC;
1877         lovdesc->ld_tgt_count = 0;
1878         /* Defaults.  Can be changed later by lcfg config_param */
1879         lovdesc->ld_default_stripe_count = 1;
1880         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1881         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1882         lovdesc->ld_default_stripe_offset = -1;
1883         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1884         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1885         /* can these be the same? */
1886         uuid = (char *)lovdesc->ld_uuid.uuid;
1887
1888         /* This should always be the first entry in a log.
1889         rc = mgs_clear_log(obd, logname); */
1890         rc = record_start_log(env, mgs, &llh, logname);
1891         if (rc)
1892                 GOTO(out_free, rc);
1893         /* FIXME these should be a single journal transaction */
1894         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1895         if (rc)
1896                 GOTO(out_end, rc);
1897         rc = record_attach(env, llh, lovname, "lov", uuid);
1898         if (rc)
1899                 GOTO(out_end, rc);
1900         rc = record_lov_setup(env, llh, lovname, lovdesc);
1901         if (rc)
1902                 GOTO(out_end, rc);
1903         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1904         if (rc)
1905                 GOTO(out_end, rc);
1906         EXIT;
1907 out_end:
1908         record_end_log(env, &llh);
1909 out_free:
1910         OBD_FREE_PTR(lovdesc);
1911         return rc;
1912 }
1913
1914 /* add failnids to open log */
1915 static int mgs_write_log_failnids(const struct lu_env *env,
1916                                   struct mgs_target_info *mti,
1917                                   struct llog_handle *llh,
1918                                   char *cliname)
1919 {
1920         char *failnodeuuid = NULL;
1921         char *ptr = mti->mti_params;
1922         lnet_nid_t nid;
1923         int rc = 0;
1924
1925         /*
1926         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1927         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1928         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1929         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1930         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1931         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1932         */
1933
1934         /* Pull failnid info out of params string */
1935         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1936                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1937                         if (failnodeuuid == NULL) {
1938                                 /* We don't know the failover node name,
1939                                    so just use the first nid as the uuid */
1940                                 rc = name_create(&failnodeuuid,
1941                                                  libcfs_nid2str(nid), "");
1942                                 if (rc)
1943                                         return rc;
1944                         }
1945                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1946                                "client %s\n", libcfs_nid2str(nid),
1947                                failnodeuuid, cliname);
1948                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1949                 }
1950                 if (failnodeuuid) {
1951                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1952                         name_destroy(&failnodeuuid);
1953                         failnodeuuid = NULL;
1954                 }
1955         }
1956
1957         return rc;
1958 }
1959
1960 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1961                                     struct mgs_device *mgs,
1962                                     struct fs_db *fsdb,
1963                                     struct mgs_target_info *mti,
1964                                     char *logname, char *lmvname)
1965 {
1966         struct llog_handle *llh = NULL;
1967         char *mdcname = NULL;
1968         char *nodeuuid = NULL;
1969         char *mdcuuid = NULL;
1970         char *lmvuuid = NULL;
1971         char index[6];
1972         int i, rc;
1973         ENTRY;
1974
1975         if (mgs_log_is_empty(env, mgs, logname)) {
1976                 CERROR("log is empty! Logical error\n");
1977                 RETURN(-EINVAL);
1978         }
1979
1980         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1981                mti->mti_svname, logname, lmvname);
1982
1983         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1984         if (rc)
1985                 RETURN(rc);
1986         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1987         if (rc)
1988                 GOTO(out_free, rc);
1989         rc = name_create(&mdcuuid, mdcname, "_UUID");
1990         if (rc)
1991                 GOTO(out_free, rc);
1992         rc = name_create(&lmvuuid, lmvname, "_UUID");
1993         if (rc)
1994                 GOTO(out_free, rc);
1995
1996         rc = record_start_log(env, mgs, &llh, logname);
1997         if (rc)
1998                 GOTO(out_free, rc);
1999         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2000                            "add mdc");
2001         if (rc)
2002                 GOTO(out_end, rc);
2003         for (i = 0; i < mti->mti_nid_count; i++) {
2004                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2005                        libcfs_nid2str(mti->mti_nids[i]));
2006
2007                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2008                 if (rc)
2009                         GOTO(out_end, rc);
2010         }
2011
2012         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2013         if (rc)
2014                 GOTO(out_end, rc);
2015         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
2016         if (rc)
2017                 GOTO(out_end, rc);
2018         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2019         if (rc)
2020                 GOTO(out_end, rc);
2021         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2022         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2023                             index, "1");
2024         if (rc)
2025                 GOTO(out_end, rc);
2026         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2027                            "add mdc");
2028         if (rc)
2029                 GOTO(out_end, rc);
2030 out_end:
2031         record_end_log(env, &llh);
2032 out_free:
2033         name_destroy(&lmvuuid);
2034         name_destroy(&mdcuuid);
2035         name_destroy(&mdcname);
2036         name_destroy(&nodeuuid);
2037         RETURN(rc);
2038 }
2039
2040 static inline int name_create_lov(char **lovname, char *mdtname,
2041                                   struct fs_db *fsdb, int index)
2042 {
2043         /* COMPAT_180 */
2044         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2045                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2046         else
2047                 return name_create(lovname, mdtname, "-mdtlov");
2048 }
2049
2050 static int name_create_mdt_and_lov(char **logname, char **lovname,
2051                                    struct fs_db *fsdb, int i)
2052 {
2053         int rc;
2054
2055         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2056         if (rc)
2057                 return rc;
2058         /* COMPAT_180 */
2059         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2060                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2061         else
2062                 rc = name_create(lovname, *logname, "-mdtlov");
2063         if (rc) {
2064                 name_destroy(logname);
2065                 *logname = NULL;
2066         }
2067         return rc;
2068 }
2069
2070 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2071                                       struct fs_db *fsdb, int i)
2072 {
2073         char suffix[16];
2074
2075         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2076                 sprintf(suffix, "-osc");
2077         else
2078                 sprintf(suffix, "-osc-MDT%04x", i);
2079         return name_create(oscname, ostname, suffix);
2080 }
2081
2082 /* add new mdc to already existent MDS */
2083 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2084                                     struct mgs_device *mgs,
2085                                     struct fs_db *fsdb,
2086                                     struct mgs_target_info *mti,
2087                                     int mdt_index, char *logname)
2088 {
2089         struct llog_handle      *llh = NULL;
2090         char    *nodeuuid = NULL;
2091         char    *ospname = NULL;
2092         char    *lovuuid = NULL;
2093         char    *mdtuuid = NULL;
2094         char    *svname = NULL;
2095         char    *mdtname = NULL;
2096         char    *lovname = NULL;
2097         char    index_str[16];
2098         int     i, rc;
2099
2100         ENTRY;
2101         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2102                 CERROR("log is empty! Logical error\n");
2103                 RETURN (-EINVAL);
2104         }
2105
2106         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2107                logname);
2108
2109         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2110         if (rc)
2111                 RETURN(rc);
2112
2113         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2114         if (rc)
2115                 GOTO(out_destory, rc);
2116
2117         rc = name_create(&svname, mdtname, "-osp");
2118         if (rc)
2119                 GOTO(out_destory, rc);
2120
2121         sprintf(index_str, "-MDT%04x", mdt_index);
2122         rc = name_create(&ospname, svname, index_str);
2123         if (rc)
2124                 GOTO(out_destory, rc);
2125
2126         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2127         if (rc)
2128                 GOTO(out_destory, rc);
2129
2130         rc = name_create(&lovuuid, lovname, "_UUID");
2131         if (rc)
2132                 GOTO(out_destory, rc);
2133
2134         rc = name_create(&mdtuuid, mdtname, "_UUID");
2135         if (rc)
2136                 GOTO(out_destory, rc);
2137
2138         rc = record_start_log(env, mgs, &llh, logname);
2139         if (rc)
2140                 GOTO(out_destory, rc);
2141
2142         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2143                            "add osp");
2144         if (rc)
2145                 GOTO(out_destory, rc);
2146
2147         for (i = 0; i < mti->mti_nid_count; i++) {
2148                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2149                        libcfs_nid2str(mti->mti_nids[i]));
2150                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2151                 if (rc)
2152                         GOTO(out_end, rc);
2153         }
2154
2155         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2156         if (rc)
2157                 GOTO(out_end, rc);
2158
2159         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2160                           NULL, NULL);
2161         if (rc)
2162                 GOTO(out_end, rc);
2163
2164         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2165         if (rc)
2166                 GOTO(out_end, rc);
2167
2168         /* Add mdc(osp) to lod */
2169         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2170                  mti->mti_stripe_index);
2171         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2172                          index_str, "1", NULL);
2173         if (rc)
2174                 GOTO(out_end, rc);
2175
2176         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2177         if (rc)
2178                 GOTO(out_end, rc);
2179
2180 out_end:
2181         record_end_log(env, &llh);
2182
2183 out_destory:
2184         name_destroy(&mdtuuid);
2185         name_destroy(&lovuuid);
2186         name_destroy(&lovname);
2187         name_destroy(&ospname);
2188         name_destroy(&svname);
2189         name_destroy(&nodeuuid);
2190         name_destroy(&mdtname);
2191         RETURN(rc);
2192 }
2193
2194 static int mgs_write_log_mdt0(const struct lu_env *env,
2195                               struct mgs_device *mgs,
2196                               struct fs_db *fsdb,
2197                               struct mgs_target_info *mti)
2198 {
2199         char *log = mti->mti_svname;
2200         struct llog_handle *llh = NULL;
2201         char *uuid, *lovname;
2202         char mdt_index[6];
2203         char *ptr = mti->mti_params;
2204         int rc = 0, failout = 0;
2205         ENTRY;
2206
2207         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2208         if (uuid == NULL)
2209                 RETURN(-ENOMEM);
2210
2211         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2212                 failout = (strncmp(ptr, "failout", 7) == 0);
2213
2214         rc = name_create(&lovname, log, "-mdtlov");
2215         if (rc)
2216                 GOTO(out_free, rc);
2217         if (mgs_log_is_empty(env, mgs, log)) {
2218                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2219                 if (rc)
2220                         GOTO(out_lod, rc);
2221         }
2222
2223         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2224
2225         rc = record_start_log(env, mgs, &llh, log);
2226         if (rc)
2227                 GOTO(out_lod, rc);
2228
2229         /* add MDT itself */
2230
2231         /* FIXME this whole fn should be a single journal transaction */
2232         sprintf(uuid, "%s_UUID", log);
2233         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2234         if (rc)
2235                 GOTO(out_lod, rc);
2236         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2237         if (rc)
2238                 GOTO(out_end, rc);
2239         rc = record_mount_opt(env, llh, log, lovname, NULL);
2240         if (rc)
2241                 GOTO(out_end, rc);
2242         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2243                         failout ? "n" : "f");
2244         if (rc)
2245                 GOTO(out_end, rc);
2246         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2247         if (rc)
2248                 GOTO(out_end, rc);
2249 out_end:
2250         record_end_log(env, &llh);
2251 out_lod:
2252         name_destroy(&lovname);
2253 out_free:
2254         OBD_FREE(uuid, sizeof(struct obd_uuid));
2255         RETURN(rc);
2256 }
2257
2258 /* envelope method for all layers log */
2259 static int mgs_write_log_mdt(const struct lu_env *env,
2260                              struct mgs_device *mgs,
2261                              struct fs_db *fsdb,
2262                              struct mgs_target_info *mti)
2263 {
2264         struct mgs_thread_info *mgi = mgs_env_info(env);
2265         struct llog_handle *llh = NULL;
2266         char *cliname;
2267         int rc, i = 0;
2268         ENTRY;
2269
2270         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2271
2272         if (mti->mti_uuid[0] == '\0') {
2273                 /* Make up our own uuid */
2274                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2275                          "%s_UUID", mti->mti_svname);
2276         }
2277
2278         /* add mdt */
2279         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2280         if (rc)
2281                 RETURN(rc);
2282         /* Append the mdt info to the client log */
2283         rc = name_create(&cliname, mti->mti_fsname, "-client");
2284         if (rc)
2285                 RETURN(rc);
2286
2287         if (mgs_log_is_empty(env, mgs, cliname)) {
2288                 /* Start client log */
2289                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2290                                        fsdb->fsdb_clilov);
2291                 if (rc)
2292                         GOTO(out_free, rc);
2293                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2294                                        fsdb->fsdb_clilmv);
2295                 if (rc)
2296                         GOTO(out_free, rc);
2297         }
2298
2299         /*
2300         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2301         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2302         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2303         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2304         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2305         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2306         */
2307
2308                 /* copy client info about lov/lmv */
2309                 mgi->mgi_comp.comp_mti = mti;
2310                 mgi->mgi_comp.comp_fsdb = fsdb;
2311
2312                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2313                                                         &mgi->mgi_comp);
2314                 if (rc)
2315                         GOTO(out_free, rc);
2316                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2317                                               fsdb->fsdb_clilmv);
2318                 if (rc)
2319                         GOTO(out_free, rc);
2320
2321                 /* add mountopts */
2322                 rc = record_start_log(env, mgs, &llh, cliname);
2323                 if (rc)
2324                         GOTO(out_free, rc);
2325
2326                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2327                                    "mount opts");
2328                 if (rc)
2329                         GOTO(out_end, rc);
2330                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2331                                       fsdb->fsdb_clilmv);
2332                 if (rc)
2333                         GOTO(out_end, rc);
2334                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2335                                    "mount opts");
2336
2337         if (rc)
2338                 GOTO(out_end, rc);
2339
2340         /* for_all_existing_mdt except current one */
2341         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2342                 if (i !=  mti->mti_stripe_index &&
2343                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2344                         char *logname;
2345
2346                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2347                         if (rc)
2348                                 GOTO(out_end, rc);
2349
2350                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2351                                                       i, logname);
2352                         name_destroy(&logname);
2353                         if (rc)
2354                                 GOTO(out_end, rc);
2355                 }
2356         }
2357 out_end:
2358         record_end_log(env, &llh);
2359 out_free:
2360         name_destroy(&cliname);
2361         RETURN(rc);
2362 }
2363
2364 /* Add the ost info to the client/mdt lov */
2365 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2366                                     struct mgs_device *mgs, struct fs_db *fsdb,
2367                                     struct mgs_target_info *mti,
2368                                     char *logname, char *suffix, char *lovname,
2369                                     enum lustre_sec_part sec_part, int flags)
2370 {
2371         struct llog_handle *llh = NULL;
2372         char *nodeuuid = NULL;
2373         char *oscname = NULL;
2374         char *oscuuid = NULL;
2375         char *lovuuid = NULL;
2376         char *svname = NULL;
2377         char index[6];
2378         int i, rc;
2379
2380         ENTRY;
2381         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2382                mti->mti_svname, logname);
2383
2384         if (mgs_log_is_empty(env, mgs, logname)) {
2385                 CERROR("log is empty! Logical error\n");
2386                 RETURN (-EINVAL);
2387         }
2388
2389         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2390         if (rc)
2391                 RETURN(rc);
2392         rc = name_create(&svname, mti->mti_svname, "-osc");
2393         if (rc)
2394                 GOTO(out_free, rc);
2395
2396         /* for the system upgraded from old 1.8, keep using the old osc naming
2397          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2398         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2399                 rc = name_create(&oscname, svname, "");
2400         else
2401                 rc = name_create(&oscname, svname, suffix);
2402         if (rc)
2403                 GOTO(out_free, rc);
2404
2405         rc = name_create(&oscuuid, oscname, "_UUID");
2406         if (rc)
2407                 GOTO(out_free, rc);
2408         rc = name_create(&lovuuid, lovname, "_UUID");
2409         if (rc)
2410                 GOTO(out_free, rc);
2411
2412
2413         /*
2414         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2415         multihomed (#4)
2416         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2417         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2418         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2419         failover (#6,7)
2420         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2421         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2422         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2423         */
2424
2425         rc = record_start_log(env, mgs, &llh, logname);
2426         if (rc)
2427                 GOTO(out_free, rc);
2428
2429         /* FIXME these should be a single journal transaction */
2430         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2431                            "add osc");
2432         if (rc)
2433                 GOTO(out_end, rc);
2434
2435         /* NB: don't change record order, because upon MDT steal OSC config
2436          * from client, it treats all nids before LCFG_SETUP as target nids
2437          * (multiple interfaces), while nids after as failover node nids.
2438          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2439          */
2440         for (i = 0; i < mti->mti_nid_count; i++) {
2441                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2442                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2443                 if (rc)
2444                         GOTO(out_end, rc);
2445         }
2446         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2447         if (rc)
2448                 GOTO(out_end, rc);
2449         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2450         if (rc)
2451                 GOTO(out_end, rc);
2452         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2453         if (rc)
2454                 GOTO(out_end, rc);
2455
2456         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2457
2458         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2459         if (rc)
2460                 GOTO(out_end, rc);
2461         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2462                            "add osc");
2463         if (rc)
2464                 GOTO(out_end, rc);
2465 out_end:
2466         record_end_log(env, &llh);
2467 out_free:
2468         name_destroy(&lovuuid);
2469         name_destroy(&oscuuid);
2470         name_destroy(&oscname);
2471         name_destroy(&svname);
2472         name_destroy(&nodeuuid);
2473         RETURN(rc);
2474 }
2475
2476 static int mgs_write_log_ost(const struct lu_env *env,
2477                              struct mgs_device *mgs, struct fs_db *fsdb,
2478                              struct mgs_target_info *mti)
2479 {
2480         struct llog_handle *llh = NULL;
2481         char *logname, *lovname;
2482         char *ptr = mti->mti_params;
2483         int rc, flags = 0, failout = 0, i;
2484         ENTRY;
2485
2486         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2487
2488         /* The ost startup log */
2489
2490         /* If the ost log already exists, that means that someone reformatted
2491            the ost and it called target_add again. */
2492         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2493                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2494                                    "exists, yet the server claims it never "
2495                                    "registered. It may have been reformatted, "
2496                                    "or the index changed. writeconf the MDT to "
2497                                    "regenerate all logs.\n", mti->mti_svname);
2498                 RETURN(-EALREADY);
2499         }
2500
2501         /*
2502         attach obdfilter ost1 ost1_UUID
2503         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2504         */
2505         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2506                 failout = (strncmp(ptr, "failout", 7) == 0);
2507         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2508         if (rc)
2509                 RETURN(rc);
2510         /* FIXME these should be a single journal transaction */
2511         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2512         if (rc)
2513                 GOTO(out_end, rc);
2514         if (*mti->mti_uuid == '\0')
2515                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2516                          "%s_UUID", mti->mti_svname);
2517         rc = record_attach(env, llh, mti->mti_svname,
2518                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2519         if (rc)
2520                 GOTO(out_end, rc);
2521         rc = record_setup(env, llh, mti->mti_svname,
2522                           "dev"/*ignored*/, "type"/*ignored*/,
2523                           failout ? "n" : "f", 0/*options*/);
2524         if (rc)
2525                 GOTO(out_end, rc);
2526         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2527         if (rc)
2528                 GOTO(out_end, rc);
2529 out_end:
2530         record_end_log(env, &llh);
2531         if (rc)
2532                 RETURN(rc);
2533         /* We also have to update the other logs where this osc is part of
2534            the lov */
2535
2536         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2537                 /* If we're upgrading, the old mdt log already has our
2538                    entry. Let's do a fake one for fun. */
2539                 /* Note that we can't add any new failnids, since we don't
2540                    know the old osc names. */
2541                 flags = CM_SKIP | CM_UPGRADE146;
2542
2543         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2544                 /* If the update flag isn't set, don't update client/mdt
2545                    logs. */
2546                 flags |= CM_SKIP;
2547                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2548                               "the MDT first to regenerate it.\n",
2549                               mti->mti_svname);
2550         }
2551
2552         /* Add ost to all MDT lov defs */
2553         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2554                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2555                         char mdt_index[9];
2556
2557                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2558                                                      i);
2559                         if (rc)
2560                                 RETURN(rc);
2561                         sprintf(mdt_index, "-MDT%04x", i);
2562                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2563                                                       logname, mdt_index,
2564                                                       lovname, LUSTRE_SP_MDT,
2565                                                       flags);
2566                         name_destroy(&logname);
2567                         name_destroy(&lovname);
2568                         if (rc)
2569                                 RETURN(rc);
2570                 }
2571         }
2572
2573         /* Append ost info to the client log */
2574         rc = name_create(&logname, mti->mti_fsname, "-client");
2575         if (rc)
2576                 RETURN(rc);
2577         if (mgs_log_is_empty(env, mgs, logname)) {
2578                 /* Start client log */
2579                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2580                                        fsdb->fsdb_clilov);
2581                 if (rc)
2582                         GOTO(out_free, rc);
2583                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2584                                        fsdb->fsdb_clilmv);
2585                 if (rc)
2586                         GOTO(out_free, rc);
2587         }
2588         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2589                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2590 out_free:
2591         name_destroy(&logname);
2592         RETURN(rc);
2593 }
2594
2595 static __inline__ int mgs_param_empty(char *ptr)
2596 {
2597         char *tmp;
2598
2599         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2600                 return 1;
2601         return 0;
2602 }
2603
2604 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2605                                           struct mgs_device *mgs,
2606                                           struct fs_db *fsdb,
2607                                           struct mgs_target_info *mti,
2608                                           char *logname, char *cliname)
2609 {
2610         int rc;
2611         struct llog_handle *llh = NULL;
2612
2613         if (mgs_param_empty(mti->mti_params)) {
2614                 /* Remove _all_ failnids */
2615                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2616                                 mti->mti_svname, "add failnid", CM_SKIP);
2617                 return rc < 0 ? rc : 0;
2618         }
2619
2620         /* Otherwise failover nids are additive */
2621         rc = record_start_log(env, mgs, &llh, logname);
2622         if (rc)
2623                 return rc;
2624                 /* FIXME this should be a single journal transaction */
2625         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2626                            "add failnid");
2627         if (rc)
2628                 goto out_end;
2629         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2630         if (rc)
2631                 goto out_end;
2632         rc = record_marker(env, llh, fsdb, CM_END,
2633                            mti->mti_svname, "add failnid");
2634 out_end:
2635         record_end_log(env, &llh);
2636         return rc;
2637 }
2638
2639
2640 /* Add additional failnids to an existing log.
2641    The mdc/osc must have been added to logs first */
2642 /* tcp nids must be in dotted-quad ascii -
2643    we can't resolve hostnames from the kernel. */
2644 static int mgs_write_log_add_failnid(const struct lu_env *env,
2645                                      struct mgs_device *mgs,
2646                                      struct fs_db *fsdb,
2647                                      struct mgs_target_info *mti)
2648 {
2649         char *logname, *cliname;
2650         int rc;
2651         ENTRY;
2652
2653         /* FIXME we currently can't erase the failnids
2654          * given when a target first registers, since they aren't part of
2655          * an "add uuid" stanza */
2656
2657         /* Verify that we know about this target */
2658         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2659                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2660                                    "yet. It must be started before failnids "
2661                                    "can be added.\n", mti->mti_svname);
2662                 RETURN(-ENOENT);
2663         }
2664
2665         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2666         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2667                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2668         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2669                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2670         } else {
2671                 RETURN(-EINVAL);
2672         }
2673         if (rc)
2674                 RETURN(rc);
2675         /* Add failover nids to the client log */
2676         rc = name_create(&logname, mti->mti_fsname, "-client");
2677         if (rc) {
2678                 name_destroy(&cliname);
2679                 RETURN(rc);
2680         }
2681         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2682         name_destroy(&logname);
2683         name_destroy(&cliname);
2684         if (rc)
2685                 RETURN(rc);
2686
2687         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2688                 /* Add OST failover nids to the MDT logs as well */
2689                 int i;
2690
2691                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2692                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2693                                 continue;
2694                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2695                         if (rc)
2696                                 RETURN(rc);
2697                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2698                                                  fsdb, i);
2699                         if (rc) {
2700                                 name_destroy(&logname);
2701                                 RETURN(rc);
2702                         }
2703                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2704                                                             mti, logname,
2705                                                             cliname);
2706                         name_destroy(&cliname);
2707                         name_destroy(&logname);
2708                         if (rc)
2709                                 RETURN(rc);
2710                 }
2711         }
2712
2713         RETURN(rc);
2714 }
2715
2716 static int mgs_wlp_lcfg(const struct lu_env *env,
2717                         struct mgs_device *mgs, struct fs_db *fsdb,
2718                         struct mgs_target_info *mti,
2719                         char *logname, struct lustre_cfg_bufs *bufs,
2720                         char *tgtname, char *ptr)
2721 {
2722         char comment[MTI_NAME_MAXLEN];
2723         char *tmp;
2724         struct lustre_cfg *lcfg;
2725         int rc, del;
2726
2727         /* Erase any old settings of this same parameter */
2728         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2729         comment[MTI_NAME_MAXLEN - 1] = 0;
2730         /* But don't try to match the value. */
2731         tmp = strchr(comment, '=');
2732         if (tmp)
2733                 *tmp = 0;
2734         /* FIXME we should skip settings that are the same as old values */
2735         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2736         if (rc < 0)
2737                 return rc;
2738         del = mgs_param_empty(ptr);
2739
2740         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2741                       "Setting" : "Modifying", tgtname, comment, logname);
2742         if (del) {
2743                 /* mgs_modify() will return 1 if nothing had to be done */
2744                 if (rc == 1)
2745                         rc = 0;
2746                 return rc;
2747         }
2748
2749         lustre_cfg_bufs_reset(bufs, tgtname);
2750         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2751         if (mti->mti_flags & LDD_F_PARAM2)
2752                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2753
2754         lcfg = lustre_cfg_new((mti->mti_flags & LDD_F_PARAM2) ?
2755                               LCFG_SET_PARAM : LCFG_PARAM, bufs);
2756
2757         if (!lcfg)
2758                 return -ENOMEM;
2759         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2760         lustre_cfg_free(lcfg);
2761         return rc;
2762 }
2763
2764 static int mgs_write_log_param2(const struct lu_env *env,
2765                                 struct mgs_device *mgs,
2766                                 struct fs_db *fsdb,
2767                                 struct mgs_target_info *mti, char *ptr)
2768 {
2769         struct lustre_cfg_bufs  bufs;
2770         int                     rc = 0;
2771         ENTRY;
2772
2773         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2774         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2775                           mti->mti_svname, ptr);
2776
2777         RETURN(rc);
2778 }
2779
2780 /* write global variable settings into log */
2781 static int mgs_write_log_sys(const struct lu_env *env,
2782                              struct mgs_device *mgs, struct fs_db *fsdb,
2783                              struct mgs_target_info *mti, char *sys, char *ptr)
2784 {
2785         struct mgs_thread_info *mgi = mgs_env_info(env);
2786         struct lustre_cfg *lcfg;
2787         char *tmp, sep;
2788         int rc, cmd, convert = 1;
2789
2790         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2791                 cmd = LCFG_SET_TIMEOUT;
2792         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2793                 cmd = LCFG_SET_LDLM_TIMEOUT;
2794         /* Check for known params here so we can return error to lctl */
2795         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2796                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2797                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2798                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2799                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2800                 cmd = LCFG_PARAM;
2801         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2802                 convert = 0; /* Don't convert string value to integer */
2803                 cmd = LCFG_PARAM;
2804         } else {
2805                 return -EINVAL;
2806         }
2807
2808         if (mgs_param_empty(ptr))
2809                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2810         else
2811                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2812
2813         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2814         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2815         if (!convert && *tmp != '\0')
2816                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2817         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2818         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2819         /* truncate the comment to the parameter name */
2820         ptr = tmp - 1;
2821         sep = *ptr;
2822         *ptr = '\0';
2823         /* modify all servers and clients */
2824         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2825                                       *tmp == '\0' ? NULL : lcfg,
2826                                       mti->mti_fsname, sys, 0);
2827         if (rc == 0 && *tmp != '\0') {
2828                 switch (cmd) {
2829                 case LCFG_SET_TIMEOUT:
2830                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2831                                 class_process_config(lcfg);
2832                         break;
2833                 case LCFG_SET_LDLM_TIMEOUT:
2834                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2835                                 class_process_config(lcfg);
2836                         break;
2837                 default:
2838                         break;
2839                 }
2840         }
2841         *ptr = sep;
2842         lustre_cfg_free(lcfg);
2843         return rc;
2844 }
2845
2846 /* write quota settings into log */
2847 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2848                                struct fs_db *fsdb, struct mgs_target_info *mti,
2849                                char *quota, char *ptr)
2850 {
2851         struct mgs_thread_info  *mgi = mgs_env_info(env);
2852         struct lustre_cfg       *lcfg;
2853         char                    *tmp;
2854         char                     sep;
2855         int                      rc, cmd = LCFG_PARAM;
2856
2857         /* support only 'meta' and 'data' pools so far */
2858         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2859             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2860                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2861                        "& quota.ost are)\n", ptr);
2862                 return -EINVAL;
2863         }
2864
2865         if (*tmp == '\0') {
2866                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2867         } else {
2868                 CDEBUG(D_MGS, "global '%s'\n", quota);
2869
2870                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2871                     strcmp(tmp, "none") != 0) {
2872                         CERROR("enable option(%s) isn't supported\n", tmp);
2873                         return -EINVAL;
2874                 }
2875         }
2876
2877         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2878         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2879         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2880         /* truncate the comment to the parameter name */
2881         ptr = tmp - 1;
2882         sep = *ptr;
2883         *ptr = '\0';
2884
2885         /* XXX we duplicated quota enable information in all server
2886          *     config logs, it should be moved to a separate config
2887          *     log once we cleanup the config log for global param. */
2888         /* modify all servers */
2889         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2890                                       *tmp == '\0' ? NULL : lcfg,
2891                                       mti->mti_fsname, quota, 1);
2892         *ptr = sep;
2893         lustre_cfg_free(lcfg);
2894         return rc < 0 ? rc : 0;
2895 }
2896
2897 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2898                                    struct mgs_device *mgs,
2899                                    struct fs_db *fsdb,
2900                                    struct mgs_target_info *mti,
2901                                    char *param)
2902 {
2903         struct mgs_thread_info *mgi = mgs_env_info(env);
2904         struct llog_handle     *llh = NULL;
2905         char                   *logname;
2906         char                   *comment, *ptr;
2907         struct lustre_cfg      *lcfg;
2908         int                     rc, len;
2909         ENTRY;
2910
2911         /* get comment */
2912         ptr = strchr(param, '=');
2913         LASSERT(ptr);
2914         len = ptr - param;
2915
2916         OBD_ALLOC(comment, len + 1);
2917         if (comment == NULL)
2918                 RETURN(-ENOMEM);
2919         strncpy(comment, param, len);
2920         comment[len] = '\0';
2921
2922         /* prepare lcfg */
2923         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2924         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2925         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2926         if (lcfg == NULL)
2927                 GOTO(out_comment, rc = -ENOMEM);
2928
2929         /* construct log name */
2930         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2931         if (rc)
2932                 GOTO(out_lcfg, rc);
2933
2934         if (mgs_log_is_empty(env, mgs, logname)) {
2935                 rc = record_start_log(env, mgs, &llh, logname);
2936                 if (rc)
2937                         GOTO(out, rc);
2938                 record_end_log(env, &llh);
2939         }
2940
2941         /* obsolete old one */
2942         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2943                         comment, CM_SKIP);
2944         if (rc < 0)
2945                 GOTO(out, rc);
2946         /* write the new one */
2947         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2948                                   mti->mti_svname, comment);
2949         if (rc)
2950                 CERROR("err %d writing log %s\n", rc, logname);
2951 out:
2952         name_destroy(&logname);
2953 out_lcfg:
2954         lustre_cfg_free(lcfg);
2955 out_comment:
2956         OBD_FREE(comment, len + 1);
2957         RETURN(rc);
2958 }
2959
2960 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2961                                         char *param)
2962 {
2963         char    *ptr;
2964
2965         /* disable the adjustable udesc parameter for now, i.e. use default
2966          * setting that client always ship udesc to MDT if possible. to enable
2967          * it simply remove the following line */
2968         goto error_out;
2969
2970         ptr = strchr(param, '=');
2971         if (ptr == NULL)
2972                 goto error_out;
2973         *ptr++ = '\0';
2974
2975         if (strcmp(param, PARAM_SRPC_UDESC))
2976                 goto error_out;
2977
2978         if (strcmp(ptr, "yes") == 0) {
2979                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2980                 CWARN("Enable user descriptor shipping from client to MDT\n");
2981         } else if (strcmp(ptr, "no") == 0) {
2982                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2983                 CWARN("Disable user descriptor shipping from client to MDT\n");
2984         } else {
2985                 *(ptr - 1) = '=';
2986                 goto error_out;
2987         }
2988         return 0;
2989
2990 error_out:
2991         CERROR("Invalid param: %s\n", param);
2992         return -EINVAL;
2993 }
2994
2995 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2996                                   const char *svname,
2997                                   char *param)
2998 {
2999         struct sptlrpc_rule      rule;
3000         struct sptlrpc_rule_set *rset;
3001         int                      rc;
3002         ENTRY;
3003
3004         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3005                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3006                 RETURN(-EINVAL);
3007         }
3008
3009         if (strncmp(param, PARAM_SRPC_UDESC,
3010                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3011                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3012         }
3013
3014         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3015                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3016                 RETURN(-EINVAL);
3017         }
3018
3019         param += sizeof(PARAM_SRPC_FLVR) - 1;
3020
3021         rc = sptlrpc_parse_rule(param, &rule);
3022         if (rc)
3023                 RETURN(rc);
3024
3025         /* mgs rules implies must be mgc->mgs */
3026         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3027                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3028                      rule.sr_from != LUSTRE_SP_ANY) ||
3029                     (rule.sr_to != LUSTRE_SP_MGS &&
3030                      rule.sr_to != LUSTRE_SP_ANY))
3031                         RETURN(-EINVAL);
3032         }
3033
3034         /* preapre room for this coming rule. svcname format should be:
3035          * - fsname: general rule
3036          * - fsname-tgtname: target-specific rule
3037          */
3038         if (strchr(svname, '-')) {
3039                 struct mgs_tgt_srpc_conf *tgtconf;
3040                 int                       found = 0;
3041
3042                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3043                      tgtconf = tgtconf->mtsc_next) {
3044                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3045                                 found = 1;
3046                                 break;
3047                         }
3048                 }
3049
3050                 if (!found) {
3051                         int name_len;
3052
3053                         OBD_ALLOC_PTR(tgtconf);
3054                         if (tgtconf == NULL)
3055                                 RETURN(-ENOMEM);
3056
3057                         name_len = strlen(svname);
3058
3059                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3060                         if (tgtconf->mtsc_tgt == NULL) {
3061                                 OBD_FREE_PTR(tgtconf);
3062                                 RETURN(-ENOMEM);
3063                         }
3064                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3065
3066                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3067                         fsdb->fsdb_srpc_tgt = tgtconf;
3068                 }
3069
3070                 rset = &tgtconf->mtsc_rset;
3071         } else {
3072                 rset = &fsdb->fsdb_srpc_gen;
3073         }
3074
3075         rc = sptlrpc_rule_set_merge(rset, &rule);
3076
3077         RETURN(rc);
3078 }
3079
3080 static int mgs_srpc_set_param(const struct lu_env *env,
3081                               struct mgs_device *mgs,
3082                               struct fs_db *fsdb,
3083                               struct mgs_target_info *mti,
3084                               char *param)
3085 {
3086         char                   *copy;
3087         int                     rc, copy_size;
3088         ENTRY;
3089
3090 #ifndef HAVE_GSS
3091         RETURN(-EINVAL);
3092 #endif
3093         /* keep a copy of original param, which could be destroied
3094          * during parsing */
3095         copy_size = strlen(param) + 1;
3096         OBD_ALLOC(copy, copy_size);
3097         if (copy == NULL)
3098                 return -ENOMEM;
3099         memcpy(copy, param, copy_size);
3100
3101         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3102         if (rc)
3103                 goto out_free;
3104
3105         /* previous steps guaranteed the syntax is correct */
3106         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3107         if (rc)
3108                 goto out_free;
3109
3110         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3111                 /*
3112                  * for mgs rules, make them effective immediately.
3113                  */
3114                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3115                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3116                                                  &fsdb->fsdb_srpc_gen);
3117         }
3118
3119 out_free:
3120         OBD_FREE(copy, copy_size);
3121         RETURN(rc);
3122 }
3123
3124 struct mgs_srpc_read_data {
3125         struct fs_db   *msrd_fsdb;
3126         int             msrd_skip;
3127 };
3128
3129 static int mgs_srpc_read_handler(const struct lu_env *env,
3130                                  struct llog_handle *llh,
3131                                  struct llog_rec_hdr *rec, void *data)
3132 {
3133         struct mgs_srpc_read_data *msrd = data;
3134         struct cfg_marker         *marker;
3135         struct lustre_cfg         *lcfg = REC_DATA(rec);
3136         char                      *svname, *param;
3137         int                        cfg_len, rc;
3138         ENTRY;
3139
3140         if (rec->lrh_type != OBD_CFG_REC) {
3141                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3142                 RETURN(-EINVAL);
3143         }
3144
3145         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3146                   sizeof(struct llog_rec_tail);
3147
3148         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3149         if (rc) {
3150                 CERROR("Insane cfg\n");
3151                 RETURN(rc);
3152         }
3153
3154         if (lcfg->lcfg_command == LCFG_MARKER) {
3155                 marker = lustre_cfg_buf(lcfg, 1);
3156
3157                 if (marker->cm_flags & CM_START &&
3158                     marker->cm_flags & CM_SKIP)
3159                         msrd->msrd_skip = 1;
3160                 if (marker->cm_flags & CM_END)
3161                         msrd->msrd_skip = 0;
3162
3163                 RETURN(0);
3164         }
3165
3166         if (msrd->msrd_skip)
3167                 RETURN(0);
3168
3169         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3170                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3171                 RETURN(0);
3172         }
3173
3174         svname = lustre_cfg_string(lcfg, 0);
3175         if (svname == NULL) {
3176                 CERROR("svname is empty\n");
3177                 RETURN(0);
3178         }
3179
3180         param = lustre_cfg_string(lcfg, 1);
3181         if (param == NULL) {
3182                 CERROR("param is empty\n");
3183                 RETURN(0);
3184         }
3185
3186         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3187         if (rc)
3188                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3189
3190         RETURN(0);
3191 }
3192
3193 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3194                                 struct mgs_device *mgs,
3195                                 struct fs_db *fsdb)
3196 {
3197         struct llog_handle        *llh = NULL;
3198         struct llog_ctxt          *ctxt;
3199         char                      *logname;
3200         struct mgs_srpc_read_data  msrd;
3201         int                        rc;
3202         ENTRY;
3203
3204         /* construct log name */
3205         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3206         if (rc)
3207                 RETURN(rc);
3208
3209         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3210         LASSERT(ctxt != NULL);
3211
3212         if (mgs_log_is_empty(env, mgs, logname))
3213                 GOTO(out, rc = 0);
3214
3215         rc = llog_open(env, ctxt, &llh, NULL, logname,
3216                        LLOG_OPEN_EXISTS);
3217         if (rc < 0) {
3218                 if (rc == -ENOENT)
3219                         rc = 0;
3220                 GOTO(out, rc);
3221         }
3222
3223         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3224         if (rc)
3225                 GOTO(out_close, rc);
3226
3227         if (llog_get_size(llh) <= 1)
3228                 GOTO(out_close, rc = 0);
3229
3230         msrd.msrd_fsdb = fsdb;
3231         msrd.msrd_skip = 0;
3232
3233         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3234                           NULL);
3235
3236 out_close:
3237         llog_close(env, llh);
3238 out:
3239         llog_ctxt_put(ctxt);
3240         name_destroy(&logname);
3241
3242         if (rc)
3243                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3244         RETURN(rc);
3245 }
3246
3247 /* Permanent settings of all parameters by writing into the appropriate
3248  * configuration logs.
3249  * A parameter with null value ("<param>='\0'") means to erase it out of
3250  * the logs.
3251  */
3252 static int mgs_write_log_param(const struct lu_env *env,
3253                                struct mgs_device *mgs, struct fs_db *fsdb,
3254                                struct mgs_target_info *mti, char *ptr)
3255 {
3256         struct mgs_thread_info *mgi = mgs_env_info(env);
3257         char *logname;
3258         char *tmp;
3259         int rc = 0, rc2 = 0;
3260         ENTRY;
3261
3262         /* For various parameter settings, we have to figure out which logs
3263            care about them (e.g. both mdt and client for lov settings) */
3264         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3265
3266         /* The params are stored in MOUNT_DATA_FILE and modified via
3267            tunefs.lustre, or set using lctl conf_param */
3268
3269         /* Processed in lustre_start_mgc */
3270         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3271                 GOTO(end, rc);
3272
3273         /* Processed in ost/mdt */
3274         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3275                 GOTO(end, rc);
3276
3277         /* Processed in mgs_write_log_ost */
3278         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3279                 if (mti->mti_flags & LDD_F_PARAM) {
3280                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3281                                            "changed with tunefs.lustre"
3282                                            "and --writeconf\n", ptr);
3283                         rc = -EPERM;
3284                 }
3285                 GOTO(end, rc);
3286         }
3287
3288         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3289                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3290                 GOTO(end, rc);
3291         }
3292
3293         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3294                 /* Add a failover nidlist */
3295                 rc = 0;
3296                 /* We already processed failovers params for new
3297                    targets in mgs_write_log_target */
3298                 if (mti->mti_flags & LDD_F_PARAM) {
3299                         CDEBUG(D_MGS, "Adding failnode\n");
3300                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3301                 }
3302                 GOTO(end, rc);
3303         }
3304
3305         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3306                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3307                 GOTO(end, rc);
3308         }
3309
3310         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3311                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3312                 GOTO(end, rc);
3313         }
3314
3315         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3316                 /* active=0 means off, anything else means on */
3317                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3318                 int i;
3319
3320                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3321                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3322                                            "be (de)activated.\n",
3323                                            mti->mti_svname);
3324                         GOTO(end, rc = -EINVAL);
3325                 }
3326                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3327                               flag ? "de": "re", mti->mti_svname);
3328                 /* Modify clilov */
3329                 rc = name_create(&logname, mti->mti_fsname, "-client");
3330                 if (rc)
3331                         GOTO(end, rc);
3332                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3333                                 mti->mti_svname, "add osc", flag);
3334                 name_destroy(&logname);
3335                 if (rc)
3336                         goto active_err;
3337                 /* Modify mdtlov */
3338                 /* Add to all MDT logs for CMD */
3339                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3340                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3341                                 continue;
3342                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3343                         if (rc)
3344                                 GOTO(end, rc);
3345                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3346                                         mti->mti_svname, "add osc", flag);
3347                         name_destroy(&logname);
3348                         if (rc)
3349                                 goto active_err;
3350                 }
3351         active_err:
3352                 if (rc) {
3353                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3354                                            "log (%d). No permanent "
3355                                            "changes were made to the "
3356                                            "config log.\n",
3357                                            mti->mti_svname, rc);
3358                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3359                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3360                                                    " because the log"
3361                                                    "is in the old 1.4"
3362                                                    "style. Consider "
3363                                                    " --writeconf to "
3364                                                    "update the logs.\n");
3365                         GOTO(end, rc);
3366                 }
3367                 /* Fall through to osc proc for deactivating live OSC
3368                    on running MDT / clients. */
3369         }
3370         /* Below here, let obd's XXX_process_config methods handle it */
3371
3372         /* All lov. in proc */
3373         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3374                 char *mdtlovname;
3375
3376                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3377                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3378                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3379                                            "set on the MDT, not %s. "
3380                                            "Ignoring.\n",
3381                                            mti->mti_svname);
3382                         GOTO(end, rc = 0);
3383                 }
3384
3385                 /* Modify mdtlov */
3386                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3387                         GOTO(end, rc = -ENODEV);
3388
3389                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3390                                              mti->mti_stripe_index);
3391                 if (rc)
3392                         GOTO(end, rc);
3393                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3394                                   &mgi->mgi_bufs, mdtlovname, ptr);
3395                 name_destroy(&logname);
3396                 name_destroy(&mdtlovname);
3397                 if (rc)
3398                         GOTO(end, rc);
3399
3400                 /* Modify clilov */
3401                 rc = name_create(&logname, mti->mti_fsname, "-client");
3402                 if (rc)
3403                         GOTO(end, rc);
3404                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3405                                   fsdb->fsdb_clilov, ptr);
3406                 name_destroy(&logname);
3407                 GOTO(end, rc);
3408         }
3409
3410         /* All osc., mdc., llite. params in proc */
3411         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3412             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3413             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3414                 char *cname;
3415
3416                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3417                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3418                                            " cannot be modified. Consider"
3419                                            " updating the configuration with"
3420                                            " --writeconf\n",
3421                                            mti->mti_svname);
3422                         GOTO(end, rc = -EINVAL);
3423                 }
3424                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3425                         rc = name_create(&cname, mti->mti_fsname, "-client");
3426                         /* Add the client type to match the obdname in
3427                            class_config_llog_handler */
3428                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3429                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3430                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3431                         rc = name_create(&cname, mti->mti_svname, "-osc");
3432                 } else {
3433                         GOTO(end, rc = -EINVAL);
3434                 }
3435                 if (rc)
3436                         GOTO(end, rc);
3437
3438                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3439
3440                 /* Modify client */
3441                 rc = name_create(&logname, mti->mti_fsname, "-client");
3442                 if (rc) {
3443                         name_destroy(&cname);
3444                         GOTO(end, rc);
3445                 }
3446                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3447                                   cname, ptr);
3448
3449                 /* osc params affect the MDT as well */
3450                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3451                         int i;
3452
3453                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3454                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3455                                         continue;
3456                                 name_destroy(&cname);
3457                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3458                                                          fsdb, i);
3459                                 name_destroy(&logname);
3460                                 if (rc)
3461                                         break;
3462                                 rc = name_create_mdt(&logname,
3463                                                      mti->mti_fsname, i);
3464                                 if (rc)
3465                                         break;
3466                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3467                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3468                                                           mti, logname,
3469                                                           &mgi->mgi_bufs,
3470                                                           cname, ptr);
3471                                         if (rc)
3472                                                 break;
3473                                 }
3474                         }
3475                 }
3476                 name_destroy(&logname);
3477                 name_destroy(&cname);
3478                 GOTO(end, rc);
3479         }
3480
3481         /* All mdt. params in proc */
3482         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3483                 int i;
3484                 __u32 idx;
3485
3486                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3487                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3488                             MTI_NAME_MAXLEN) == 0)
3489                         /* device is unspecified completely? */
3490                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3491                 else
3492                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3493                 if (rc < 0)
3494                         goto active_err;
3495                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3496                         goto active_err;
3497                 if (rc & LDD_F_SV_ALL) {
3498                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3499                                 if (!test_bit(i,
3500                                                   fsdb->fsdb_mdt_index_map))
3501                                         continue;
3502                                 rc = name_create_mdt(&logname,
3503                                                 mti->mti_fsname, i);
3504                                 if (rc)
3505                                         goto active_err;
3506                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3507                                                   logname, &mgi->mgi_bufs,
3508                                                   logname, ptr);
3509                                 name_destroy(&logname);
3510                                 if (rc)
3511                                         goto active_err;
3512                         }
3513                 } else {
3514                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3515                                           mti->mti_svname, &mgi->mgi_bufs,
3516                                           mti->mti_svname, ptr);
3517                         if (rc)
3518                                 goto active_err;
3519                 }
3520                 GOTO(end, rc);
3521         }
3522
3523         /* All mdd., ost. and osd. params in proc */
3524         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3525             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3526             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3527                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3528                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3529                         GOTO(end, rc = -ENODEV);
3530
3531                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3532                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3533                 GOTO(end, rc);
3534         }
3535
3536         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3537         rc2 = -ENOSYS;
3538
3539 end:
3540         if (rc)
3541                 CERROR("err %d on param '%s'\n", rc, ptr);
3542
3543         RETURN(rc ?: rc2);
3544 }
3545
3546 /* Not implementing automatic failover nid addition at this time. */
3547 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3548                       struct mgs_target_info *mti)
3549 {
3550 #if 0
3551         struct fs_db *fsdb;
3552         int rc;
3553         ENTRY;
3554
3555         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3556         if (rc)
3557                 RETURN(rc);
3558
3559         if (mgs_log_is_empty(obd, mti->mti_svname))
3560                 /* should never happen */
3561                 RETURN(-ENOENT);
3562
3563         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3564
3565         /* FIXME We can just check mti->params to see if we're already in
3566            the failover list.  Modify mti->params for rewriting back at
3567            server_register_target(). */
3568
3569         mutex_lock(&fsdb->fsdb_mutex);
3570         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3571         mutex_unlock(&fsdb->fsdb_mutex);
3572
3573         RETURN(rc);
3574 #endif
3575         return 0;
3576 }
3577
3578 int mgs_write_log_target(const struct lu_env *env,
3579                          struct mgs_device *mgs,
3580                          struct mgs_target_info *mti,
3581                          struct fs_db *fsdb)
3582 {
3583         int rc = -EINVAL;
3584         char *buf, *params;
3585         ENTRY;
3586
3587         /* set/check the new target index */
3588         rc = mgs_set_index(env, mgs, mti);
3589         if (rc < 0) {
3590                 CERROR("Can't get index (%d)\n", rc);
3591                 RETURN(rc);
3592         }
3593
3594         if (rc == EALREADY) {
3595                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3596                               mti->mti_stripe_index, mti->mti_svname);
3597                 /* We would like to mark old log sections as invalid
3598                    and add new log sections in the client and mdt logs.
3599                    But if we add new sections, then live clients will
3600                    get repeat setup instructions for already running
3601                    osc's. So don't update the client/mdt logs. */
3602                 mti->mti_flags &= ~LDD_F_UPDATE;
3603                 rc = 0;
3604         }
3605
3606         mutex_lock(&fsdb->fsdb_mutex);
3607
3608         if (mti->mti_flags &
3609             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3610                 /* Generate a log from scratch */
3611                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3612                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3613                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3614                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3615                 } else {
3616                         CERROR("Unknown target type %#x, can't create log for "
3617                                "%s\n", mti->mti_flags, mti->mti_svname);
3618                 }
3619                 if (rc) {
3620                         CERROR("Can't write logs for %s (%d)\n",
3621                                mti->mti_svname, rc);
3622                         GOTO(out_up, rc);
3623                 }
3624         } else {
3625                 /* Just update the params from tunefs in mgs_write_log_params */
3626                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3627                 mti->mti_flags |= LDD_F_PARAM;
3628         }
3629
3630         /* allocate temporary buffer, where class_get_next_param will
3631            make copy of a current  parameter */
3632         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3633         if (buf == NULL)
3634                 GOTO(out_up, rc = -ENOMEM);
3635         params = mti->mti_params;
3636         while (params != NULL) {
3637                 rc = class_get_next_param(&params, buf);
3638                 if (rc) {
3639                         if (rc == 1)
3640                                 /* there is no next parameter, that is
3641                                    not an error */
3642                                 rc = 0;
3643                         break;
3644                 }
3645                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3646                        params, buf);
3647                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3648                 if (rc)
3649                         break;
3650         }
3651
3652         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3653
3654 out_up:
3655         mutex_unlock(&fsdb->fsdb_mutex);
3656         RETURN(rc);
3657 }
3658
3659 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3660 {
3661         struct llog_ctxt        *ctxt;
3662         int                      rc = 0;
3663
3664         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3665         if (ctxt == NULL) {
3666                 CERROR("%s: MGS config context doesn't exist\n",
3667                        mgs->mgs_obd->obd_name);
3668                 rc = -ENODEV;
3669         } else {
3670                 rc = llog_erase(env, ctxt, NULL, name);
3671                 /* llog may not exist */
3672                 if (rc == -ENOENT)
3673                         rc = 0;
3674                 llog_ctxt_put(ctxt);
3675         }
3676
3677         if (rc)
3678                 CERROR("%s: failed to clear log %s: %d\n",
3679                        mgs->mgs_obd->obd_name, name, rc);
3680
3681         return rc;
3682 }
3683
3684 /* erase all logs for the given fs */
3685 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3686 {
3687         struct fs_db *fsdb;
3688         cfs_list_t list;
3689         struct mgs_direntry *dirent, *n;
3690         int rc, len = strlen(fsname);
3691         char *suffix;
3692         ENTRY;
3693
3694         /* Find all the logs in the CONFIGS directory */
3695         rc = class_dentry_readdir(env, mgs, &list);
3696         if (rc)
3697                 RETURN(rc);
3698
3699         mutex_lock(&mgs->mgs_mutex);
3700
3701         /* Delete the fs db */
3702         fsdb = mgs_find_fsdb(mgs, fsname);
3703         if (fsdb)
3704                 mgs_free_fsdb(mgs, fsdb);
3705
3706         mutex_unlock(&mgs->mgs_mutex);
3707
3708         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3709                 cfs_list_del(&dirent->list);
3710                 suffix = strrchr(dirent->name, '-');
3711                 if (suffix != NULL) {
3712                         if ((len == suffix - dirent->name) &&
3713                             (strncmp(fsname, dirent->name, len) == 0)) {
3714                                 CDEBUG(D_MGS, "Removing log %s\n",
3715                                        dirent->name);
3716                                 mgs_erase_log(env, mgs, dirent->name);
3717                         }
3718                 }
3719                 mgs_direntry_free(dirent);
3720         }
3721
3722         RETURN(rc);
3723 }
3724
3725 /* list all logs for the given fs */
3726 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3727                   struct obd_ioctl_data *data)
3728 {
3729         cfs_list_t               list;
3730         struct mgs_direntry     *dirent, *n;
3731         char                    *out, *suffix;
3732         int                      l, remains, rc;
3733
3734         ENTRY;
3735
3736         /* Find all the logs in the CONFIGS directory */
3737         rc = class_dentry_readdir(env, mgs, &list);
3738         if (rc) {
3739                 CERROR("%s: can't read %s dir = %d\n",
3740                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
3741                 RETURN(rc);
3742         }
3743
3744         out = data->ioc_bulk;
3745         remains = data->ioc_inllen1;
3746         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3747                 cfs_list_del(&dirent->list);
3748                 suffix = strrchr(dirent->name, '-');
3749                 if (suffix != NULL) {
3750                         l = snprintf(out, remains, "config log: $%s\n",
3751                                      dirent->name);
3752                         out += l;
3753                         remains -= l;
3754                 }
3755                 mgs_direntry_free(dirent);
3756                 if (remains <= 0)
3757                         break;
3758         }
3759         RETURN(rc);
3760 }
3761
3762 /* from llog_swab */
3763 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3764 {
3765         int i;
3766         ENTRY;
3767
3768         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3769         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3770
3771         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3772         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3773         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3774         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3775
3776         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3777         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3778                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3779                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3780                                i, lcfg->lcfg_buflens[i],
3781                                lustre_cfg_string(lcfg, i));
3782                 }
3783         EXIT;
3784 }
3785
3786 /* Setup params fsdb and log
3787  */
3788 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3789                           struct fs_db *fsdb)
3790 {
3791         struct llog_handle      *params_llh = NULL;
3792         int                     rc;
3793         ENTRY;
3794
3795         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3796         if (fsdb != NULL) {
3797                 mutex_lock(&fsdb->fsdb_mutex);
3798                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3799                 if (rc == 0)
3800                         rc = record_end_log(env, &params_llh);
3801                 mutex_unlock(&fsdb->fsdb_mutex);
3802         }
3803
3804         RETURN(rc);
3805 }
3806
3807 /* Cleanup params fsdb and log
3808  */
3809 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3810 {
3811         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3812 }
3813
3814 /* Set a permanent (config log) param for a target or fs
3815  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3816  *             buf1 contains the single parameter
3817  */
3818 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3819                  struct lustre_cfg *lcfg, char *fsname)
3820 {
3821         struct fs_db *fsdb;
3822         struct mgs_target_info *mti;
3823         char *devname, *param;
3824         char *ptr;
3825         const char *tmp;
3826         __u32 index;
3827         int rc = 0;
3828         ENTRY;
3829
3830         print_lustre_cfg(lcfg);
3831
3832         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3833         devname = lustre_cfg_string(lcfg, 0);
3834         param = lustre_cfg_string(lcfg, 1);
3835         if (!devname) {
3836                 /* Assume device name embedded in param:
3837                    lustre-OST0000.osc.max_dirty_mb=32 */
3838                 ptr = strchr(param, '.');
3839                 if (ptr) {
3840                         devname = param;
3841                         *ptr = 0;
3842                         param = ptr + 1;
3843                 }
3844         }
3845         if (!devname) {
3846                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3847                 RETURN(-ENOSYS);
3848         }
3849
3850         rc = mgs_parse_devname(devname, fsname, NULL);
3851         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3852                 /* param related to llite isn't allowed to set by OST or MDT */
3853                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3854                                        sizeof(PARAM_LLITE) - 1) == 0)
3855                         RETURN(-EINVAL);
3856         } else {
3857                 /* assume devname is the fsname */
3858                 memset(fsname, 0, MTI_NAME_MAXLEN);
3859                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3860                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3861         }
3862         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3863
3864         rc = mgs_find_or_make_fsdb(env, mgs,
3865                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3866                                    PARAMS_FILENAME : fsname, &fsdb);
3867         if (rc)
3868                 RETURN(rc);
3869
3870         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3871             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3872             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3873                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3874                        "is '%s'\n", fsname, devname);
3875                 mgs_free_fsdb(mgs, fsdb);
3876                 RETURN(-EINVAL);
3877         }
3878
3879         /* Create a fake mti to hold everything */
3880         OBD_ALLOC_PTR(mti);
3881         if (!mti)
3882                 GOTO(out, rc = -ENOMEM);
3883         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3884             >= sizeof(mti->mti_fsname))
3885                 GOTO(out, rc = -E2BIG);
3886         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3887             >= sizeof(mti->mti_svname))
3888                 GOTO(out, rc = -E2BIG);
3889         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3890             >= sizeof(mti->mti_params))
3891                 GOTO(out, rc = -E2BIG);
3892         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3893         if (rc < 0)
3894                 /* Not a valid server; may be only fsname */
3895                 rc = 0;
3896         else
3897                 /* Strip -osc or -mdc suffix from svname */
3898                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3899                                      mti->mti_svname))
3900                         GOTO(out, rc = -EINVAL);
3901         /*
3902          * Revoke lock so everyone updates.  Should be alright if
3903          * someone was already reading while we were updating the logs,
3904          * so we don't really need to hold the lock while we're
3905          * writing (above).
3906          */
3907         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3908                 mti->mti_flags = rc | LDD_F_PARAM2;
3909                 mutex_lock(&fsdb->fsdb_mutex);
3910                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3911                 mutex_unlock(&fsdb->fsdb_mutex);
3912                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3913         } else {
3914                 mti->mti_flags = rc | LDD_F_PARAM;
3915                 mutex_lock(&fsdb->fsdb_mutex);
3916                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3917                 mutex_unlock(&fsdb->fsdb_mutex);
3918                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3919         }
3920
3921 out:
3922         OBD_FREE_PTR(mti);
3923         RETURN(rc);
3924 }
3925
3926 static int mgs_write_log_pool(const struct lu_env *env,
3927                               struct mgs_device *mgs, char *logname,
3928                               struct fs_db *fsdb, char *tgtname,
3929                               enum lcfg_command_type cmd,
3930                               char *fsname, char *poolname,
3931                               char *ostname, char *comment)
3932 {
3933         struct llog_handle *llh = NULL;
3934         int rc;
3935
3936         rc = record_start_log(env, mgs, &llh, logname);
3937         if (rc)
3938                 return rc;
3939         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3940         if (rc)
3941                 goto out;
3942         rc = record_base(env, llh, tgtname, 0, cmd,
3943                          fsname, poolname, ostname, 0);
3944         if (rc)
3945                 goto out;
3946         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3947 out:
3948         record_end_log(env, &llh);
3949         return rc;
3950 }
3951
3952 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
3953                     enum lcfg_command_type cmd, const char *nodemap_name,
3954                     const char *param)
3955 {
3956         lnet_nid_t      nid[2];
3957         __u32           idmap[2];
3958         bool            bool_switch;
3959         __u32           int_id;
3960         int             rc = 0;
3961         ENTRY;
3962
3963         switch (cmd) {
3964         case LCFG_NODEMAP_ADD:
3965                 rc = nodemap_add(nodemap_name);
3966                 break;
3967         case LCFG_NODEMAP_DEL:
3968                 rc = nodemap_del(nodemap_name);
3969                 break;
3970         case LCFG_NODEMAP_ADD_RANGE:
3971                 rc = nodemap_parse_range(param, nid);
3972                 if (rc != 0)
3973                         break;
3974                 rc = nodemap_add_range(nodemap_name, nid);
3975                 break;
3976         case LCFG_NODEMAP_DEL_RANGE:
3977                 rc = nodemap_parse_range(param, nid);
3978                 if (rc != 0)
3979                         break;
3980                 rc = nodemap_del_range(nodemap_name, nid);
3981                 break;
3982         case LCFG_NODEMAP_ADMIN:
3983                 bool_switch = simple_strtoul(param, NULL, 10);
3984                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
3985                 break;
3986         case LCFG_NODEMAP_TRUSTED:
3987                 bool_switch = simple_strtoul(param, NULL, 10);
3988                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
3989                 break;
3990         case LCFG_NODEMAP_SQUASH_UID:
3991                 int_id = simple_strtoul(param, NULL, 10);
3992                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
3993                 break;
3994         case LCFG_NODEMAP_SQUASH_GID:
3995                 int_id = simple_strtoul(param, NULL, 10);
3996                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
3997                 break;
3998         case LCFG_NODEMAP_ADD_UIDMAP:
3999         case LCFG_NODEMAP_ADD_GIDMAP:
4000                 rc = nodemap_parse_idmap(param, idmap);
4001                 if (rc != 0)
4002                         break;
4003                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4004                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4005                                                idmap);
4006                 else
4007                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4008                                                idmap);
4009                 break;
4010         case LCFG_NODEMAP_DEL_UIDMAP:
4011         case LCFG_NODEMAP_DEL_GIDMAP:
4012                 rc = nodemap_parse_idmap(param, idmap);
4013                 if (rc != 0)
4014                         break;
4015                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4016                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4017                                                idmap);
4018                 else
4019                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4020                                                idmap);
4021                 break;
4022         default:
4023                 rc = -EINVAL;
4024         }
4025
4026         RETURN(rc);
4027 }
4028
4029 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4030                  enum lcfg_command_type cmd, char *fsname,
4031                  char *poolname, char *ostname)
4032 {
4033         struct fs_db *fsdb;
4034         char *lovname;
4035         char *logname;
4036         char *label = NULL, *canceled_label = NULL;
4037         int label_sz;
4038         struct mgs_target_info *mti = NULL;
4039         int rc, i;
4040         ENTRY;
4041
4042         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4043         if (rc) {
4044                 CERROR("Can't get db for %s\n", fsname);
4045                 RETURN(rc);
4046         }
4047         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4048                 CERROR("%s is not defined\n", fsname);
4049                 mgs_free_fsdb(mgs, fsdb);
4050                 RETURN(-EINVAL);
4051         }
4052
4053         label_sz = 10 + strlen(fsname) + strlen(poolname);
4054
4055         /* check if ostname match fsname */
4056         if (ostname != NULL) {
4057                 char *ptr;
4058
4059                 ptr = strrchr(ostname, '-');
4060                 if ((ptr == NULL) ||
4061                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4062                         RETURN(-EINVAL);
4063                 label_sz += strlen(ostname);
4064         }
4065
4066         OBD_ALLOC(label, label_sz);
4067         if (label == NULL)
4068                 RETURN(-ENOMEM);
4069
4070         switch(cmd) {
4071         case LCFG_POOL_NEW:
4072                 sprintf(label,
4073                         "new %s.%s", fsname, poolname);
4074                 break;
4075         case LCFG_POOL_ADD:
4076                 sprintf(label,
4077                         "add %s.%s.%s", fsname, poolname, ostname);
4078                 break;
4079         case LCFG_POOL_REM:
4080                 OBD_ALLOC(canceled_label, label_sz);
4081                 if (canceled_label == NULL)
4082                         GOTO(out_label, rc = -ENOMEM);
4083                 sprintf(label,
4084                         "rem %s.%s.%s", fsname, poolname, ostname);
4085                 sprintf(canceled_label,
4086                         "add %s.%s.%s", fsname, poolname, ostname);
4087                 break;
4088         case LCFG_POOL_DEL:
4089                 OBD_ALLOC(canceled_label, label_sz);
4090                 if (canceled_label == NULL)
4091                         GOTO(out_label, rc = -ENOMEM);
4092                 sprintf(label,
4093                         "del %s.%s", fsname, poolname);
4094                 sprintf(canceled_label,
4095                         "new %s.%s", fsname, poolname);
4096                 break;
4097         default:
4098                 break;
4099         }
4100
4101         if (canceled_label != NULL) {
4102                 OBD_ALLOC_PTR(mti);
4103                 if (mti == NULL)
4104                         GOTO(out_cancel, rc = -ENOMEM);
4105         }
4106
4107         mutex_lock(&fsdb->fsdb_mutex);
4108         /* write pool def to all MDT logs */
4109         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4110                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4111                         rc = name_create_mdt_and_lov(&logname, &lovname,
4112                                                      fsdb, i);
4113                         if (rc) {
4114                                 mutex_unlock(&fsdb->fsdb_mutex);
4115                                 GOTO(out_mti, rc);
4116                         }
4117                         if (canceled_label != NULL) {
4118                                 strcpy(mti->mti_svname, "lov pool");
4119                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4120                                                 lovname, canceled_label,
4121                                                 CM_SKIP);
4122                         }
4123
4124                         if (rc >= 0)
4125                                 rc = mgs_write_log_pool(env, mgs, logname,
4126                                                         fsdb, lovname, cmd,
4127                                                         fsname, poolname,
4128                                                         ostname, label);
4129                         name_destroy(&logname);
4130                         name_destroy(&lovname);
4131                         if (rc) {
4132                                 mutex_unlock(&fsdb->fsdb_mutex);
4133                                 GOTO(out_mti, rc);
4134                         }
4135                 }
4136         }
4137
4138         rc = name_create(&logname, fsname, "-client");
4139         if (rc) {
4140                 mutex_unlock(&fsdb->fsdb_mutex);
4141                 GOTO(out_mti, rc);
4142         }
4143         if (canceled_label != NULL) {
4144                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4145                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4146                 if (rc < 0) {
4147                         mutex_unlock(&fsdb->fsdb_mutex);
4148                         name_destroy(&logname);
4149                         GOTO(out_mti, rc);
4150                 }
4151         }
4152
4153         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4154                                 cmd, fsname, poolname, ostname, label);
4155         mutex_unlock(&fsdb->fsdb_mutex);
4156         name_destroy(&logname);
4157         /* request for update */
4158         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4159
4160         EXIT;
4161 out_mti:
4162         if (mti != NULL)
4163                 OBD_FREE_PTR(mti);
4164 out_cancel:
4165         if (canceled_label != NULL)
4166                 OBD_FREE(canceled_label, label_sz);
4167 out_label:
4168         OBD_FREE(label, label_sz);
4169         return rc;
4170 }