Whamcloud - gitweb
LU-2988 mgs: Fix two "lctl replace_nids" resource leaks
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         mutex_init(&fsdb->fsdb_mutex);
349         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551         } else {
552                 GOTO(out_up, rc = -EINVAL);
553         }
554
555         if (mti->mti_flags & LDD_F_NEED_INDEX) {
556                 rc = next_index(imap, INDEX_MAP_SIZE);
557                 if (rc == -1)
558                         GOTO(out_up, rc = -ERANGE);
559                 mti->mti_stripe_index = rc;
560                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
561                         fsdb->fsdb_mdt_count ++;
562         }
563
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
566                                    "but the max index is %d.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 /* Header and tail are added back to lrh_len in
648                    llog_lvfs_write_rec */
649                 rec->lrh_len = cfg_len;
650                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
651                                 rec->lrh_index);
652                 if (!rc)
653                          mml->mml_modified++;
654         }
655
656         RETURN(rc);
657 }
658
659 /**
660  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
661  * Return code:
662  * 0 - modified successfully,
663  * 1 - no modification was done
664  * negative - error
665  */
666 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
667                       struct fs_db *fsdb, struct mgs_target_info *mti,
668                       char *logname, char *devname, char *comment, int flags)
669 {
670         struct llog_handle *loghandle;
671         struct llog_ctxt *ctxt;
672         struct mgs_modify_lookup *mml;
673         int rc;
674
675         ENTRY;
676
677         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
678         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
679                flags);
680
681         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
682         LASSERT(ctxt != NULL);
683         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
684         if (rc < 0) {
685                 if (rc == -ENOENT)
686                         rc = 0;
687                 GOTO(out_pop, rc);
688         }
689
690         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
691         if (rc)
692                 GOTO(out_close, rc);
693
694         if (llog_get_size(loghandle) <= 1)
695                 GOTO(out_close, rc = 0);
696
697         OBD_ALLOC_PTR(mml);
698         if (!mml)
699                 GOTO(out_close, rc = -ENOMEM);
700         strcpy(mml->mml_marker.cm_comment, comment);
701         strcpy(mml->mml_marker.cm_tgtname, devname);
702         /* Modify mostly means cancel */
703         mml->mml_marker.cm_flags = flags;
704         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
705         mml->mml_modified = 0;
706         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
707                           NULL);
708         if (!rc && !mml->mml_modified)
709                 rc = 1;
710         OBD_FREE_PTR(mml);
711
712 out_close:
713         llog_close(env, loghandle);
714 out_pop:
715         if (rc < 0)
716                 CERROR("%s: modify %s/%s failed: rc = %d\n",
717                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
718         llog_ctxt_put(ctxt);
719         RETURN(rc);
720 }
721
722 /** This structure is passed to mgs_replace_handler */
723 struct mgs_replace_uuid_lookup {
724         /* Nids are replaced for this target device */
725         struct mgs_target_info target;
726         /* Temporary modified llog */
727         struct llog_handle *temp_llh;
728         /* Flag is set if in target block*/
729         int in_target_device;
730         /* Nids already added. Just skip (multiple nids) */
731         int device_nids_added;
732         /* Flag is set if this block should not be copied */
733         int skip_it;
734 };
735
736 /**
737  * Check: a) if block should be skipped
738  * b) is it target block
739  *
740  * \param[in] lcfg
741  * \param[in] mrul
742  *
743  * \retval 0 should not to be skipped
744  * \retval 1 should to be skipped
745  */
746 static int check_markers(struct lustre_cfg *lcfg,
747                          struct mgs_replace_uuid_lookup *mrul)
748 {
749          struct cfg_marker *marker;
750
751         /* Track markers. Find given device */
752         if (lcfg->lcfg_command == LCFG_MARKER) {
753                 marker = lustre_cfg_buf(lcfg, 1);
754                 /* Clean llog from records marked as CM_EXCLUDE.
755                    CM_SKIP records are used for "active" command
756                    and can be restored if needed */
757                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
758                     (CM_EXCLUDE | CM_START)) {
759                         mrul->skip_it = 1;
760                         return 1;
761                 }
762
763                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
764                     (CM_EXCLUDE | CM_END)) {
765                         mrul->skip_it = 0;
766                         return 1;
767                 }
768
769                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
770                         LASSERT(!(marker->cm_flags & CM_START) ||
771                                 !(marker->cm_flags & CM_END));
772                         if (marker->cm_flags & CM_START) {
773                                 mrul->in_target_device = 1;
774                                 mrul->device_nids_added = 0;
775                         } else if (marker->cm_flags & CM_END)
776                                 mrul->in_target_device = 0;
777                 }
778         }
779
780         return 0;
781 }
782
783 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
784                        struct lustre_cfg *lcfg)
785 {
786         struct llog_rec_hdr      rec;
787         int                      buflen, rc;
788
789         if (!lcfg || !llh)
790                 return -ENOMEM;
791
792         LASSERT(llh->lgh_ctxt);
793
794         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
795                                 lcfg->lcfg_buflens);
796         rec.lrh_len = llog_data_len(buflen);
797         rec.lrh_type = OBD_CFG_REC;
798
799         /* idx = -1 means append */
800         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
801         if (rc)
802                 CERROR("failed %d\n", rc);
803         return rc;
804 }
805
806 static int record_base(const struct lu_env *env, struct llog_handle *llh,
807                      char *cfgname, lnet_nid_t nid, int cmd,
808                      char *s1, char *s2, char *s3, char *s4)
809 {
810         struct mgs_thread_info *mgi = mgs_env_info(env);
811         struct lustre_cfg     *lcfg;
812         int rc;
813
814         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
815                cmd, s1, s2, s3, s4);
816
817         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
818         if (s1)
819                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
820         if (s2)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
822         if (s3)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
824         if (s4)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
826
827         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
828         if (!lcfg)
829                 return -ENOMEM;
830         lcfg->lcfg_nid = nid;
831
832         rc = record_lcfg(env, llh, lcfg);
833
834         lustre_cfg_free(lcfg);
835
836         if (rc) {
837                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
838                        cmd, s1, s2, s3, s4);
839         }
840         return rc;
841 }
842
843 static inline int record_add_uuid(const struct lu_env *env,
844                                   struct llog_handle *llh,
845                                   uint64_t nid, char *uuid)
846 {
847         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
848 }
849
850 static inline int record_add_conn(const struct lu_env *env,
851                                   struct llog_handle *llh,
852                                   char *devname, char *uuid)
853 {
854         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
855 }
856
857 static inline int record_attach(const struct lu_env *env,
858                                 struct llog_handle *llh, char *devname,
859                                 char *type, char *uuid)
860 {
861         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
862 }
863
864 static inline int record_setup(const struct lu_env *env,
865                                struct llog_handle *llh, char *devname,
866                                char *s1, char *s2, char *s3, char *s4)
867 {
868         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
869 }
870
871 /**
872  * \retval <0 record processing error
873  * \retval n record is processed. No need copy original one.
874  * \retval 0 record is not processed.
875  */
876 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
877                            struct mgs_replace_uuid_lookup *mrul)
878 {
879         int nids_added = 0;
880         lnet_nid_t nid;
881         char *ptr;
882         int rc;
883
884         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
885                 /* LCFG_ADD_UUID command found. Let's skip original command
886                    and add passed nids */
887                 ptr = mrul->target.mti_params;
888                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
889                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
890                                "device %s\n", libcfs_nid2str(nid),
891                                 mrul->target.mti_params,
892                                 mrul->target.mti_svname);
893                         rc = record_add_uuid(env,
894                                              mrul->temp_llh, nid,
895                                              mrul->target.mti_params);
896                         if (!rc)
897                                 nids_added++;
898                 }
899
900                 if (nids_added == 0) {
901                         CERROR("No new nids were added, nid %s with uuid %s, "
902                                "device %s\n", libcfs_nid2str(nid),
903                                mrul->target.mti_params,
904                                mrul->target.mti_svname);
905                         RETURN(-ENXIO);
906                 } else {
907                         mrul->device_nids_added = 1;
908                 }
909
910                 return nids_added;
911         }
912
913         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
914                 /* LCFG_SETUP command found. UUID should be changed */
915                 rc = record_setup(env,
916                                   mrul->temp_llh,
917                                   /* devname the same */
918                                   lustre_cfg_string(lcfg, 0),
919                                   /* s1 is not changed */
920                                   lustre_cfg_string(lcfg, 1),
921                                   /* new uuid should be
922                                   the full nidlist */
923                                   mrul->target.mti_params,
924                                   /* s3 is not changed */
925                                   lustre_cfg_string(lcfg, 3),
926                                   /* s4 is not changed */
927                                   lustre_cfg_string(lcfg, 4));
928                 return rc ? rc : 1;
929         }
930
931         /* Another commands in target device block */
932         return 0;
933 }
934
935 /**
936  * Handler that called for every record in llog.
937  * Records are processed in order they placed in llog.
938  *
939  * \param[in] llh       log to be processed
940  * \param[in] rec       current record
941  * \param[in] data      mgs_replace_uuid_lookup structure
942  *
943  * \retval 0    success
944  */
945 static int mgs_replace_handler(const struct lu_env *env,
946                                struct llog_handle *llh,
947                                struct llog_rec_hdr *rec,
948                                void *data)
949 {
950         struct llog_rec_hdr local_rec = *rec;
951         struct mgs_replace_uuid_lookup *mrul;
952         struct lustre_cfg *lcfg = REC_DATA(rec);
953         int cfg_len = REC_DATA_LEN(rec);
954         int rc;
955         ENTRY;
956
957         mrul = (struct mgs_replace_uuid_lookup *)data;
958
959         if (rec->lrh_type != OBD_CFG_REC) {
960                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
961                        rec->lrh_type, lcfg->lcfg_command,
962                        lustre_cfg_string(lcfg, 0),
963                        lustre_cfg_string(lcfg, 1));
964                 RETURN(-EINVAL);
965         }
966
967         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
968         if (rc) {
969                 /* Do not copy any invalidated records */
970                 GOTO(skip_out, rc = 0);
971         }
972
973         rc = check_markers(lcfg, mrul);
974         if (rc || mrul->skip_it)
975                 GOTO(skip_out, rc = 0);
976
977         /* Write to new log all commands outside target device block */
978         if (!mrul->in_target_device)
979                 GOTO(copy_out, rc = 0);
980
981         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
982            (failover nids) for this target, assuming that if then
983            primary is changing then so is the failover */
984         if (mrul->device_nids_added &&
985             (lcfg->lcfg_command == LCFG_ADD_UUID ||
986              lcfg->lcfg_command == LCFG_ADD_CONN))
987                 GOTO(skip_out, rc = 0);
988
989         rc = process_command(env, lcfg, mrul);
990         if (rc < 0)
991                 RETURN(rc);
992
993         if (rc)
994                 RETURN(0);
995 copy_out:
996         /* Record is placed in temporary llog as is */
997         local_rec.lrh_len -= sizeof(*rec) + sizeof(struct llog_rec_tail);
998         rc = llog_write(env, mrul->temp_llh, &local_rec, NULL, 0,
999                         (void *)lcfg, -1);
1000
1001         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1002                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1003                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1004         RETURN(rc);
1005
1006 skip_out:
1007         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1008                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1009                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1010         RETURN(rc);
1011 }
1012
1013 static int mgs_backup_llog(const struct lu_env *env,
1014                            struct obd_device *mgs,
1015                            char *fsname, char *backup)
1016 {
1017         struct obd_uuid *uuid;
1018         struct llog_handle *orig_llh, *bak_llh;
1019         struct llog_ctxt *lctxt;
1020         int rc, rc2;
1021         ENTRY;
1022
1023         lctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1024         if (!lctxt) {
1025                 CERROR("%s: missing llog context\n", mgs->obd_name);
1026                 GOTO(out, rc = -EINVAL);
1027         }
1028
1029         /* Make sure there's no old backup log */
1030         rc = llog_erase(env, lctxt, NULL, backup);
1031         if (rc < 0 && rc != -ENOENT)
1032                 GOTO(out_put, rc);
1033
1034         /* open backup log */
1035         rc = llog_open_create(env, lctxt, &bak_llh, NULL, backup);
1036         if (rc) {
1037                 CERROR("%s: backup logfile open %s: rc = %d\n",
1038                        mgs->obd_name, backup, rc);
1039                 GOTO(out_put, rc);
1040         }
1041
1042         /* set the log header uuid */
1043         OBD_ALLOC_PTR(uuid);
1044         if (uuid == NULL)
1045                 GOTO(out_put, rc = -ENOMEM);
1046         obd_str2uuid(uuid, backup);
1047         rc = llog_init_handle(env, bak_llh, LLOG_F_IS_PLAIN, uuid);
1048         OBD_FREE_PTR(uuid);
1049         if (rc)
1050                 GOTO(out_close1, rc);
1051
1052         /* open original log */
1053         rc = llog_open(env, lctxt, &orig_llh, NULL, fsname,
1054                        LLOG_OPEN_EXISTS);
1055         if (rc < 0) {
1056                 if (rc == -ENOENT)
1057                         rc = 0;
1058                 GOTO(out_close1, rc);
1059         }
1060
1061         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1062         if (rc)
1063                 GOTO(out_close2, rc);
1064
1065         /* Copy remote log */
1066         rc = llog_process(env, orig_llh, llog_copy_handler,
1067                           (void *)bak_llh, NULL);
1068
1069 out_close2:
1070         rc2 = llog_close(env, orig_llh);
1071         if (!rc)
1072                 rc = rc2;
1073 out_close1:
1074         rc2 = llog_close(env, bak_llh);
1075         if (!rc)
1076                 rc = rc2;
1077 out_put:
1078         if (lctxt)
1079                 llog_ctxt_put(lctxt);
1080 out:
1081         if (rc)
1082                 CERROR("%s: Failed to backup log %s: rc = %d\n",
1083                        mgs->obd_name, fsname, rc);
1084         RETURN(rc);
1085 }
1086
1087 static int mgs_log_is_empty(const struct lu_env *env, struct mgs_device *mgs,
1088                             char *name);
1089
1090 static int mgs_replace_nids_log(const struct lu_env *env,
1091                                 struct obd_device *mgs, struct fs_db *fsdb,
1092                                 char *logname, char *devname, char *nids)
1093 {
1094         struct llog_handle *orig_llh, *backup_llh;
1095         struct llog_ctxt *ctxt;
1096         struct mgs_replace_uuid_lookup *mrul;
1097         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1098         char *backup;
1099         int rc, rc2;
1100         ENTRY;
1101
1102         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1103
1104         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1105         LASSERT(ctxt != NULL);
1106
1107         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1108                 /* Log is empty. Nothing to replace */
1109                 GOTO(out_put, rc = 0);
1110         }
1111
1112         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1113         if (backup == NULL)
1114                 GOTO(out_put, rc = -ENOMEM);
1115
1116         sprintf(backup, "%s.bak", logname);
1117
1118         rc = mgs_backup_llog(env, mgs, logname, backup);
1119         if (rc < 0) {
1120                 CERROR("%s: can't make backup for %s: rc = %d\n",
1121                        mgs->obd_name, logname, rc);
1122                 GOTO(out_free,rc);
1123         }
1124
1125         /* Now erase original log file. Connections are not allowed.
1126            Backup is already saved */
1127         rc = llog_erase(env, ctxt, NULL, logname);
1128         if (rc < 0 && rc != -ENOENT)
1129                 GOTO(out_free, rc);
1130
1131         /* open local log */
1132         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1133         if (rc)
1134                 GOTO(out_restore, rc);
1135
1136         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, NULL);
1137         if (rc)
1138                 GOTO(out_closel, rc);
1139
1140         /* open backup llog */
1141         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1142                        LLOG_OPEN_EXISTS);
1143         if (rc)
1144                 GOTO(out_closel, rc);
1145
1146         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1147         if (rc)
1148                 GOTO(out_close, rc);
1149
1150         if (llog_get_size(backup_llh) <= 1)
1151                 GOTO(out_close, rc = 0);
1152
1153         OBD_ALLOC_PTR(mrul);
1154         if (!mrul)
1155                 GOTO(out_close, rc = -ENOMEM);
1156         /* devname is only needed information to replace UUID records */
1157         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1158         /* parse nids later */
1159         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1160         /* Copy records to this temporary llog */
1161         mrul->temp_llh = orig_llh;
1162
1163         rc = llog_process(env, backup_llh, mgs_replace_handler,
1164                           (void *)mrul, NULL);
1165         OBD_FREE_PTR(mrul);
1166 out_close:
1167         rc2 = llog_close(NULL, backup_llh);
1168         if (!rc)
1169                 rc = rc2;
1170 out_closel:
1171         rc2 = llog_close(NULL, orig_llh);
1172         if (!rc)
1173                 rc = rc2;
1174
1175 out_restore:
1176         if (rc) {
1177                 CERROR("%s: llog should be restored: rc = %d\n",
1178                        mgs->obd_name, rc);
1179                 rc2 = mgs_backup_llog(env, mgs, backup, logname);
1180                 if (rc2 < 0)
1181                         CERROR("%s: can't restore backup %s: rc = %d\n",
1182                                mgs->obd_name, logname, rc2);
1183         }
1184
1185 out_free:
1186         OBD_FREE(backup, strlen(backup) + 1);
1187
1188 out_put:
1189         llog_ctxt_put(ctxt);
1190
1191         if (rc)
1192                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1193                        mgs->obd_name, logname, rc);
1194
1195         RETURN(rc);
1196 }
1197
1198 /**
1199  * Parse device name and get file system name and/or device index
1200  *
1201  * \param[in]   devname device name (ex. lustre-MDT0000)
1202  * \param[out]  fsname  file system name(optional)
1203  * \param[out]  index   device index(optional)
1204  *
1205  * \retval 0    success
1206  */
1207 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1208 {
1209         char *ptr;
1210         ENTRY;
1211
1212         /* Extract fsname */
1213         ptr = strrchr(devname, '-');
1214
1215         if (fsname) {
1216                 if (!ptr) {
1217                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1218                                devname);
1219                         RETURN(-EINVAL);
1220                 }
1221                 memset(fsname, 0, MTI_NAME_MAXLEN);
1222                 strncpy(fsname, devname, ptr - devname);
1223                 fsname[MTI_NAME_MAXLEN - 1] = 0;
1224         }
1225
1226         if (index) {
1227                 if (server_name2index(ptr, index, NULL) < 0) {
1228                         CDEBUG(D_MGS, "Device name with wrong index\n");
1229                         RETURN(-EINVAL);
1230                 }
1231         }
1232
1233         RETURN(0);
1234 }
1235
1236 static int only_mgs_is_running(struct obd_device *mgs_obd)
1237 {
1238         /* TDB: Is global variable with devices count exists? */
1239         int num_devices = get_devices_count();
1240         /* osd, MGS and MGC + self_export
1241            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1242         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1243 }
1244
1245 static int name_create_mdt(char **logname, char *fsname, int i)
1246 {
1247         char mdt_index[9];
1248
1249         sprintf(mdt_index, "-MDT%04x", i);
1250         return name_create(logname, fsname, mdt_index);
1251 }
1252
1253 /**
1254  * Replace nids for \a device to \a nids values
1255  *
1256  * \param obd           MGS obd device
1257  * \param devname       nids need to be replaced for this device
1258  * (ex. lustre-OST0000)
1259  * \param nids          nids list (ex. nid1,nid2,nid3)
1260  *
1261  * \retval 0    success
1262  */
1263 int mgs_replace_nids(const struct lu_env *env,
1264                      struct mgs_device *mgs,
1265                      char *devname, char *nids)
1266 {
1267         /* Assume fsname is part of device name */
1268         char fsname[MTI_NAME_MAXLEN];
1269         int rc;
1270         __u32 index;
1271         char *logname;
1272         struct fs_db *fsdb;
1273         unsigned int i;
1274         int conn_state;
1275         struct obd_device *mgs_obd = mgs->mgs_obd;
1276         ENTRY;
1277
1278         /* We can only change NIDs if no other nodes are connected */
1279         spin_lock(&mgs_obd->obd_dev_lock);
1280         conn_state = mgs_obd->obd_no_conn;
1281         mgs_obd->obd_no_conn = 1;
1282         spin_unlock(&mgs_obd->obd_dev_lock);
1283
1284         /* We can not change nids if not only MGS is started */
1285         if (!only_mgs_is_running(mgs_obd)) {
1286                 CERROR("Only MGS is allowed to be started\n");
1287                 GOTO(out, rc = -EINPROGRESS);
1288         }
1289
1290         /* Get fsname and index*/
1291         rc = mgs_parse_devname(devname, fsname, &index);
1292         if (rc)
1293                 GOTO(out, rc);
1294
1295         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1296         if (rc) {
1297                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1298                 GOTO(out, rc);
1299         }
1300
1301         /* Process client llogs */
1302         name_create(&logname, fsname, "-client");
1303         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1304         name_destroy(&logname);
1305         if (rc) {
1306                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1307                        fsname, devname, rc);
1308                 GOTO(out, rc);
1309         }
1310
1311         /* Process MDT llogs */
1312         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1313                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1314                         continue;
1315                 name_create_mdt(&logname, fsname, i);
1316                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1317                 name_destroy(&logname);
1318                 if (rc)
1319                         GOTO(out, rc);
1320         }
1321
1322 out:
1323         spin_lock(&mgs_obd->obd_dev_lock);
1324         mgs_obd->obd_no_conn = conn_state;
1325         spin_unlock(&mgs_obd->obd_dev_lock);
1326
1327         RETURN(rc);
1328 }
1329
1330 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1331                             char *devname, struct lov_desc *desc)
1332 {
1333         struct mgs_thread_info *mgi = mgs_env_info(env);
1334         struct lustre_cfg *lcfg;
1335         int rc;
1336
1337         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1338         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1339         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1340         if (!lcfg)
1341                 return -ENOMEM;
1342         rc = record_lcfg(env, llh, lcfg);
1343
1344         lustre_cfg_free(lcfg);
1345         return rc;
1346 }
1347
1348 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1349                             char *devname, struct lmv_desc *desc)
1350 {
1351         struct mgs_thread_info *mgi = mgs_env_info(env);
1352         struct lustre_cfg *lcfg;
1353         int rc;
1354
1355         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1356         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1357         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1358
1359         rc = record_lcfg(env, llh, lcfg);
1360
1361         lustre_cfg_free(lcfg);
1362         return rc;
1363 }
1364
1365 static inline int record_mdc_add(const struct lu_env *env,
1366                                  struct llog_handle *llh,
1367                                  char *logname, char *mdcuuid,
1368                                  char *mdtuuid, char *index,
1369                                  char *gen)
1370 {
1371         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1372                            mdtuuid,index,gen,mdcuuid);
1373 }
1374
1375 static inline int record_lov_add(const struct lu_env *env,
1376                                  struct llog_handle *llh,
1377                                  char *lov_name, char *ost_uuid,
1378                                  char *index, char *gen)
1379 {
1380         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1381                            ost_uuid, index, gen, 0);
1382 }
1383
1384 static inline int record_mount_opt(const struct lu_env *env,
1385                                    struct llog_handle *llh,
1386                                    char *profile, char *lov_name,
1387                                    char *mdc_name)
1388 {
1389         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1390                            profile,lov_name,mdc_name,0);
1391 }
1392
1393 static int record_marker(const struct lu_env *env,
1394                          struct llog_handle *llh,
1395                          struct fs_db *fsdb, __u32 flags,
1396                          char *tgtname, char *comment)
1397 {
1398         struct mgs_thread_info *mgi = mgs_env_info(env);
1399         struct lustre_cfg *lcfg;
1400         int rc;
1401         int cplen = 0;
1402
1403         if (flags & CM_START)
1404                 fsdb->fsdb_gen++;
1405         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1406         mgi->mgi_marker.cm_flags = flags;
1407         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1408         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1409                         sizeof(mgi->mgi_marker.cm_tgtname));
1410         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1411                 return -E2BIG;
1412         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1413                         sizeof(mgi->mgi_marker.cm_comment));
1414         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1415                 return -E2BIG;
1416         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1417         mgi->mgi_marker.cm_canceltime = 0;
1418         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1419         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1420                             sizeof(mgi->mgi_marker));
1421         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1422         if (!lcfg)
1423                 return -ENOMEM;
1424         rc = record_lcfg(env, llh, lcfg);
1425
1426         lustre_cfg_free(lcfg);
1427         return rc;
1428 }
1429
1430 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1431                             struct llog_handle **llh, char *name)
1432 {
1433         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1434         struct llog_ctxt        *ctxt;
1435         int                      rc = 0;
1436
1437         if (*llh)
1438                 GOTO(out, rc = -EBUSY);
1439
1440         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1441         if (!ctxt)
1442                 GOTO(out, rc = -ENODEV);
1443         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1444
1445         rc = llog_open_create(env, ctxt, llh, NULL, name);
1446         if (rc)
1447                 GOTO(out_ctxt, rc);
1448         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1449         if (rc)
1450                 llog_close(env, *llh);
1451 out_ctxt:
1452         llog_ctxt_put(ctxt);
1453 out:
1454         if (rc) {
1455                 CERROR("%s: can't start log %s: rc = %d\n",
1456                        mgs->mgs_obd->obd_name, name, rc);
1457                 *llh = NULL;
1458         }
1459         RETURN(rc);
1460 }
1461
1462 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1463 {
1464         int rc;
1465
1466         rc = llog_close(env, *llh);
1467         *llh = NULL;
1468
1469         return rc;
1470 }
1471
1472 static int mgs_log_is_empty(const struct lu_env *env,
1473                             struct mgs_device *mgs, char *name)
1474 {
1475         struct llog_handle *llh;
1476         struct llog_ctxt *ctxt;
1477         int rc = 0;
1478
1479         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1480         LASSERT(ctxt != NULL);
1481         rc = llog_open(env, ctxt, &llh, NULL, name, LLOG_OPEN_EXISTS);
1482         if (rc < 0) {
1483                 if (rc == -ENOENT)
1484                         rc = 0;
1485                 GOTO(out_ctxt, rc);
1486         }
1487
1488         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
1489         if (rc)
1490                 GOTO(out_close, rc);
1491         rc = llog_get_size(llh);
1492
1493 out_close:
1494         llog_close(env, llh);
1495 out_ctxt:
1496         llog_ctxt_put(ctxt);
1497         /* header is record 1 */
1498         return (rc <= 1);
1499 }
1500
1501 /******************** config "macros" *********************/
1502
1503 /* write an lcfg directly into a log (with markers) */
1504 static int mgs_write_log_direct(const struct lu_env *env,
1505                                 struct mgs_device *mgs, struct fs_db *fsdb,
1506                                 char *logname, struct lustre_cfg *lcfg,
1507                                 char *devname, char *comment)
1508 {
1509         struct llog_handle *llh = NULL;
1510         int rc;
1511         ENTRY;
1512
1513         if (!lcfg)
1514                 RETURN(-ENOMEM);
1515
1516         rc = record_start_log(env, mgs, &llh, logname);
1517         if (rc)
1518                 RETURN(rc);
1519
1520         /* FIXME These should be a single journal transaction */
1521         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1522         if (rc)
1523                 GOTO(out_end, rc);
1524         rc = record_lcfg(env, llh, lcfg);
1525         if (rc)
1526                 GOTO(out_end, rc);
1527         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1528         if (rc)
1529                 GOTO(out_end, rc);
1530 out_end:
1531         record_end_log(env, &llh);
1532         RETURN(rc);
1533 }
1534
1535 /* write the lcfg in all logs for the given fs */
1536 int mgs_write_log_direct_all(const struct lu_env *env,
1537                              struct mgs_device *mgs,
1538                              struct fs_db *fsdb,
1539                              struct mgs_target_info *mti,
1540                              struct lustre_cfg *lcfg,
1541                              char *devname, char *comment,
1542                              int server_only)
1543 {
1544         cfs_list_t list;
1545         struct mgs_direntry *dirent, *n;
1546         char *fsname = mti->mti_fsname;
1547         char *logname;
1548         int rc = 0, len = strlen(fsname);
1549         ENTRY;
1550
1551         /* We need to set params for any future logs
1552            as well. FIXME Append this file to every new log.
1553            Actually, we should store as params (text), not llogs.  Or
1554            in a database. */
1555         rc = name_create(&logname, fsname, "-params");
1556         if (rc)
1557                 RETURN(rc);
1558         if (mgs_log_is_empty(env, mgs, logname)) {
1559                 struct llog_handle *llh = NULL;
1560                 rc = record_start_log(env, mgs, &llh, logname);
1561                 record_end_log(env, &llh);
1562         }
1563         name_destroy(&logname);
1564         if (rc)
1565                 RETURN(rc);
1566
1567         /* Find all the logs in the CONFIGS directory */
1568         rc = class_dentry_readdir(env, mgs, &list);
1569         if (rc)
1570                 RETURN(rc);
1571
1572         /* Could use fsdb index maps instead of directory listing */
1573         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1574                 cfs_list_del(&dirent->list);
1575                 /* don't write to sptlrpc rule log */
1576                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1577                         goto next;
1578
1579                 /* caller wants write server logs only */
1580                 if (server_only && strstr(dirent->name, "-client") != NULL)
1581                         goto next;
1582
1583                 if (strncmp(fsname, dirent->name, len) == 0) {
1584                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1585                         /* Erase any old settings of this same parameter */
1586                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1587                                         devname, comment, CM_SKIP);
1588                         if (rc < 0)
1589                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1590                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1591                         /* Write the new one */
1592                         if (lcfg) {
1593                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1594                                                           dirent->name,
1595                                                           lcfg, devname,
1596                                                           comment);
1597                                 if (rc)
1598                                         CERROR("%s: writing log %s: rc = %d\n",
1599                                                mgs->mgs_obd->obd_name,
1600                                                dirent->name, rc);
1601                         }
1602                 }
1603 next:
1604                 mgs_direntry_free(dirent);
1605         }
1606
1607         RETURN(rc);
1608 }
1609
1610 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1611                                     struct mgs_device *mgs,
1612                                     struct fs_db *fsdb,
1613                                     struct mgs_target_info *mti,
1614                                     int index, char *logname);
1615 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1616                                     struct mgs_device *mgs,
1617                                     struct fs_db *fsdb,
1618                                     struct mgs_target_info *mti,
1619                                     char *logname, char *suffix, char *lovname,
1620                                     enum lustre_sec_part sec_part, int flags);
1621 static int name_create_mdt_and_lov(char **logname, char **lovname,
1622                                    struct fs_db *fsdb, int i);
1623
1624 static int add_param(char *params, char *key, char *val)
1625 {
1626         char *start = params + strlen(params);
1627         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1628         int keylen = 0;
1629
1630         if (key != NULL)
1631                 keylen = strlen(key);
1632         if (start + 1 + keylen + strlen(val) >= end) {
1633                 CERROR("params are too long: %s %s%s\n",
1634                        params, key != NULL ? key : "", val);
1635                 return -EINVAL;
1636         }
1637
1638         sprintf(start, " %s%s", key != NULL ? key : "", val);
1639         return 0;
1640 }
1641
1642 /**
1643  * Walk through client config log record and convert the related records
1644  * into the target.
1645  **/
1646 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1647                                          struct llog_handle *llh,
1648                                          struct llog_rec_hdr *rec, void *data)
1649 {
1650         struct mgs_device *mgs;
1651         struct obd_device *obd;
1652         struct mgs_target_info *mti, *tmti;
1653         struct fs_db *fsdb;
1654         int cfg_len = rec->lrh_len;
1655         char *cfg_buf = (char*) (rec + 1);
1656         struct lustre_cfg *lcfg;
1657         int rc = 0;
1658         struct llog_handle *mdt_llh = NULL;
1659         static int got_an_osc_or_mdc = 0;
1660         /* 0: not found any osc/mdc;
1661            1: found osc;
1662            2: found mdc;
1663         */
1664         static int last_step = -1;
1665         int cplen = 0;
1666
1667         ENTRY;
1668
1669         mti = ((struct temp_comp*)data)->comp_mti;
1670         tmti = ((struct temp_comp*)data)->comp_tmti;
1671         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1672         obd = ((struct temp_comp *)data)->comp_obd;
1673         mgs = lu2mgs_dev(obd->obd_lu_dev);
1674         LASSERT(mgs);
1675
1676         if (rec->lrh_type != OBD_CFG_REC) {
1677                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1678                 RETURN(-EINVAL);
1679         }
1680
1681         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1682         if (rc) {
1683                 CERROR("Insane cfg\n");
1684                 RETURN(rc);
1685         }
1686
1687         lcfg = (struct lustre_cfg *)cfg_buf;
1688
1689         if (lcfg->lcfg_command == LCFG_MARKER) {
1690                 struct cfg_marker *marker;
1691                 marker = lustre_cfg_buf(lcfg, 1);
1692                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1693                     (marker->cm_flags & CM_START) &&
1694                      !(marker->cm_flags & CM_SKIP)) {
1695                         got_an_osc_or_mdc = 1;
1696                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1697                                         sizeof(tmti->mti_svname));
1698                         if (cplen >= sizeof(tmti->mti_svname))
1699                                 RETURN(-E2BIG);
1700                         rc = record_start_log(env, mgs, &mdt_llh,
1701                                               mti->mti_svname);
1702                         if (rc)
1703                                 RETURN(rc);
1704                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1705                                            mti->mti_svname, "add osc(copied)");
1706                         record_end_log(env, &mdt_llh);
1707                         last_step = marker->cm_step;
1708                         RETURN(rc);
1709                 }
1710                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1711                     (marker->cm_flags & CM_END) &&
1712                      !(marker->cm_flags & CM_SKIP)) {
1713                         LASSERT(last_step == marker->cm_step);
1714                         last_step = -1;
1715                         got_an_osc_or_mdc = 0;
1716                         memset(tmti, 0, sizeof(*tmti));
1717                         rc = record_start_log(env, mgs, &mdt_llh,
1718                                               mti->mti_svname);
1719                         if (rc)
1720                                 RETURN(rc);
1721                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1722                                            mti->mti_svname, "add osc(copied)");
1723                         record_end_log(env, &mdt_llh);
1724                         RETURN(rc);
1725                 }
1726                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1727                     (marker->cm_flags & CM_START) &&
1728                      !(marker->cm_flags & CM_SKIP)) {
1729                         got_an_osc_or_mdc = 2;
1730                         last_step = marker->cm_step;
1731                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1732                                strlen(marker->cm_tgtname));
1733
1734                         RETURN(rc);
1735                 }
1736                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1737                     (marker->cm_flags & CM_END) &&
1738                      !(marker->cm_flags & CM_SKIP)) {
1739                         LASSERT(last_step == marker->cm_step);
1740                         last_step = -1;
1741                         got_an_osc_or_mdc = 0;
1742                         memset(tmti, 0, sizeof(*tmti));
1743                         RETURN(rc);
1744                 }
1745         }
1746
1747         if (got_an_osc_or_mdc == 0 || last_step < 0)
1748                 RETURN(rc);
1749
1750         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1751                 uint64_t nodenid = lcfg->lcfg_nid;
1752
1753                 if (strlen(tmti->mti_uuid) == 0) {
1754                         /* target uuid not set, this config record is before
1755                          * LCFG_SETUP, this nid is one of target node nid.
1756                          */
1757                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1758                         tmti->mti_nid_count++;
1759                 } else {
1760                         /* failover node nid */
1761                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1762                                        libcfs_nid2str(nodenid));
1763                 }
1764
1765                 RETURN(rc);
1766         }
1767
1768         if (lcfg->lcfg_command == LCFG_SETUP) {
1769                 char *target;
1770
1771                 target = lustre_cfg_string(lcfg, 1);
1772                 memcpy(tmti->mti_uuid, target, strlen(target));
1773                 RETURN(rc);
1774         }
1775
1776         /* ignore client side sptlrpc_conf_log */
1777         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1778                 RETURN(rc);
1779
1780         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1781                 int index;
1782
1783                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1784                         RETURN (-EINVAL);
1785
1786                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1787                        strlen(mti->mti_fsname));
1788                 tmti->mti_stripe_index = index;
1789
1790                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1791                                               mti->mti_stripe_index,
1792                                               mti->mti_svname);
1793                 memset(tmti, 0, sizeof(*tmti));
1794                 RETURN(rc);
1795         }
1796
1797         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1798                 int index;
1799                 char mdt_index[9];
1800                 char *logname, *lovname;
1801
1802                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1803                                              mti->mti_stripe_index);
1804                 if (rc)
1805                         RETURN(rc);
1806                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1807
1808                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1809                         name_destroy(&logname);
1810                         name_destroy(&lovname);
1811                         RETURN(-EINVAL);
1812                 }
1813
1814                 tmti->mti_stripe_index = index;
1815                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1816                                          mdt_index, lovname,
1817                                          LUSTRE_SP_MDT, 0);
1818                 name_destroy(&logname);
1819                 name_destroy(&lovname);
1820                 RETURN(rc);
1821         }
1822         RETURN(rc);
1823 }
1824
1825 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1826 /* stealed from mgs_get_fsdb_from_llog*/
1827 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1828                                               struct mgs_device *mgs,
1829                                               char *client_name,
1830                                               struct temp_comp* comp)
1831 {
1832         struct llog_handle *loghandle;
1833         struct mgs_target_info *tmti;
1834         struct llog_ctxt *ctxt;
1835         int rc;
1836
1837         ENTRY;
1838
1839         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1840         LASSERT(ctxt != NULL);
1841
1842         OBD_ALLOC_PTR(tmti);
1843         if (tmti == NULL)
1844                 GOTO(out_ctxt, rc = -ENOMEM);
1845
1846         comp->comp_tmti = tmti;
1847         comp->comp_obd = mgs->mgs_obd;
1848
1849         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1850                        LLOG_OPEN_EXISTS);
1851         if (rc < 0) {
1852                 if (rc == -ENOENT)
1853                         rc = 0;
1854                 GOTO(out_pop, rc);
1855         }
1856
1857         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1858         if (rc)
1859                 GOTO(out_close, rc);
1860
1861         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1862                                   (void *)comp, NULL, false);
1863         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1864 out_close:
1865         llog_close(env, loghandle);
1866 out_pop:
1867         OBD_FREE_PTR(tmti);
1868 out_ctxt:
1869         llog_ctxt_put(ctxt);
1870         RETURN(rc);
1871 }
1872
1873 /* lmv is the second thing for client logs */
1874 /* copied from mgs_write_log_lov. Please refer to that.  */
1875 static int mgs_write_log_lmv(const struct lu_env *env,
1876                              struct mgs_device *mgs,
1877                              struct fs_db *fsdb,
1878                              struct mgs_target_info *mti,
1879                              char *logname, char *lmvname)
1880 {
1881         struct llog_handle *llh = NULL;
1882         struct lmv_desc *lmvdesc;
1883         char *uuid;
1884         int rc = 0;
1885         ENTRY;
1886
1887         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1888
1889         OBD_ALLOC_PTR(lmvdesc);
1890         if (lmvdesc == NULL)
1891                 RETURN(-ENOMEM);
1892         lmvdesc->ld_active_tgt_count = 0;
1893         lmvdesc->ld_tgt_count = 0;
1894         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1895         uuid = (char *)lmvdesc->ld_uuid.uuid;
1896
1897         rc = record_start_log(env, mgs, &llh, logname);
1898         if (rc)
1899                 GOTO(out_free, rc);
1900         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1901         if (rc)
1902                 GOTO(out_end, rc);
1903         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1904         if (rc)
1905                 GOTO(out_end, rc);
1906         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1907         if (rc)
1908                 GOTO(out_end, rc);
1909         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1910         if (rc)
1911                 GOTO(out_end, rc);
1912 out_end:
1913         record_end_log(env, &llh);
1914 out_free:
1915         OBD_FREE_PTR(lmvdesc);
1916         RETURN(rc);
1917 }
1918
1919 /* lov is the first thing in the mdt and client logs */
1920 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1921                              struct fs_db *fsdb, struct mgs_target_info *mti,
1922                              char *logname, char *lovname)
1923 {
1924         struct llog_handle *llh = NULL;
1925         struct lov_desc *lovdesc;
1926         char *uuid;
1927         int rc = 0;
1928         ENTRY;
1929
1930         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1931
1932         /*
1933         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1934         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1935               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1936         */
1937
1938         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1939         OBD_ALLOC_PTR(lovdesc);
1940         if (lovdesc == NULL)
1941                 RETURN(-ENOMEM);
1942         lovdesc->ld_magic = LOV_DESC_MAGIC;
1943         lovdesc->ld_tgt_count = 0;
1944         /* Defaults.  Can be changed later by lcfg config_param */
1945         lovdesc->ld_default_stripe_count = 1;
1946         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1947         lovdesc->ld_default_stripe_size = 1024 * 1024;
1948         lovdesc->ld_default_stripe_offset = -1;
1949         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1950         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1951         /* can these be the same? */
1952         uuid = (char *)lovdesc->ld_uuid.uuid;
1953
1954         /* This should always be the first entry in a log.
1955         rc = mgs_clear_log(obd, logname); */
1956         rc = record_start_log(env, mgs, &llh, logname);
1957         if (rc)
1958                 GOTO(out_free, rc);
1959         /* FIXME these should be a single journal transaction */
1960         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1961         if (rc)
1962                 GOTO(out_end, rc);
1963         rc = record_attach(env, llh, lovname, "lov", uuid);
1964         if (rc)
1965                 GOTO(out_end, rc);
1966         rc = record_lov_setup(env, llh, lovname, lovdesc);
1967         if (rc)
1968                 GOTO(out_end, rc);
1969         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1970         if (rc)
1971                 GOTO(out_end, rc);
1972         EXIT;
1973 out_end:
1974         record_end_log(env, &llh);
1975 out_free:
1976         OBD_FREE_PTR(lovdesc);
1977         return rc;
1978 }
1979
1980 /* add failnids to open log */
1981 static int mgs_write_log_failnids(const struct lu_env *env,
1982                                   struct mgs_target_info *mti,
1983                                   struct llog_handle *llh,
1984                                   char *cliname)
1985 {
1986         char *failnodeuuid = NULL;
1987         char *ptr = mti->mti_params;
1988         lnet_nid_t nid;
1989         int rc = 0;
1990
1991         /*
1992         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1993         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1994         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1995         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1996         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1997         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1998         */
1999
2000         /* Pull failnid info out of params string */
2001         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2002                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2003                         if (failnodeuuid == NULL) {
2004                                 /* We don't know the failover node name,
2005                                    so just use the first nid as the uuid */
2006                                 rc = name_create(&failnodeuuid,
2007                                                  libcfs_nid2str(nid), "");
2008                                 if (rc)
2009                                         return rc;
2010                         }
2011                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2012                                "client %s\n", libcfs_nid2str(nid),
2013                                failnodeuuid, cliname);
2014                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2015                 }
2016                 if (failnodeuuid)
2017                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2018         }
2019
2020         name_destroy(&failnodeuuid);
2021         return rc;
2022 }
2023
2024 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2025                                     struct mgs_device *mgs,
2026                                     struct fs_db *fsdb,
2027                                     struct mgs_target_info *mti,
2028                                     char *logname, char *lmvname)
2029 {
2030         struct llog_handle *llh = NULL;
2031         char *mdcname = NULL;
2032         char *nodeuuid = NULL;
2033         char *mdcuuid = NULL;
2034         char *lmvuuid = NULL;
2035         char index[6];
2036         int i, rc;
2037         ENTRY;
2038
2039         if (mgs_log_is_empty(env, mgs, logname)) {
2040                 CERROR("log is empty! Logical error\n");
2041                 RETURN(-EINVAL);
2042         }
2043
2044         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2045                mti->mti_svname, logname, lmvname);
2046
2047         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2048         if (rc)
2049                 RETURN(rc);
2050         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2051         if (rc)
2052                 GOTO(out_free, rc);
2053         rc = name_create(&mdcuuid, mdcname, "_UUID");
2054         if (rc)
2055                 GOTO(out_free, rc);
2056         rc = name_create(&lmvuuid, lmvname, "_UUID");
2057         if (rc)
2058                 GOTO(out_free, rc);
2059
2060         rc = record_start_log(env, mgs, &llh, logname);
2061         if (rc)
2062                 GOTO(out_free, rc);
2063         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2064                            "add mdc");
2065         if (rc)
2066                 GOTO(out_end, rc);
2067         for (i = 0; i < mti->mti_nid_count; i++) {
2068                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2069                        libcfs_nid2str(mti->mti_nids[i]));
2070
2071                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2072                 if (rc)
2073                         GOTO(out_end, rc);
2074         }
2075
2076         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2077         if (rc)
2078                 GOTO(out_end, rc);
2079         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
2080         if (rc)
2081                 GOTO(out_end, rc);
2082         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2083         if (rc)
2084                 GOTO(out_end, rc);
2085         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2086         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2087                             index, "1");
2088         if (rc)
2089                 GOTO(out_end, rc);
2090         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2091                            "add mdc");
2092         if (rc)
2093                 GOTO(out_end, rc);
2094 out_end:
2095         record_end_log(env, &llh);
2096 out_free:
2097         name_destroy(&lmvuuid);
2098         name_destroy(&mdcuuid);
2099         name_destroy(&mdcname);
2100         name_destroy(&nodeuuid);
2101         RETURN(rc);
2102 }
2103
2104 static inline int name_create_lov(char **lovname, char *mdtname,
2105                                   struct fs_db *fsdb, int index)
2106 {
2107         /* COMPAT_180 */
2108         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2109                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2110         else
2111                 return name_create(lovname, mdtname, "-mdtlov");
2112 }
2113
2114 static int name_create_mdt_and_lov(char **logname, char **lovname,
2115                                    struct fs_db *fsdb, int i)
2116 {
2117         int rc;
2118
2119         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2120         if (rc)
2121                 return rc;
2122         /* COMPAT_180 */
2123         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2124                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2125         else
2126                 rc = name_create(lovname, *logname, "-mdtlov");
2127         if (rc) {
2128                 name_destroy(logname);
2129                 *logname = NULL;
2130         }
2131         return rc;
2132 }
2133
2134 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2135                                       struct fs_db *fsdb, int i)
2136 {
2137         char suffix[16];
2138
2139         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2140                 sprintf(suffix, "-osc");
2141         else
2142                 sprintf(suffix, "-osc-MDT%04x", i);
2143         return name_create(oscname, ostname, suffix);
2144 }
2145
2146 /* add new mdc to already existent MDS */
2147 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2148                                     struct mgs_device *mgs,
2149                                     struct fs_db *fsdb,
2150                                     struct mgs_target_info *mti,
2151                                     int mdt_index, char *logname)
2152 {
2153         struct llog_handle      *llh = NULL;
2154         char    *nodeuuid = NULL;
2155         char    *ospname = NULL;
2156         char    *lovuuid = NULL;
2157         char    *mdtuuid = NULL;
2158         char    *svname = NULL;
2159         char    *mdtname = NULL;
2160         char    *lovname = NULL;
2161         char    index_str[16];
2162         int     i, rc;
2163
2164         ENTRY;
2165         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2166                 CERROR("log is empty! Logical error\n");
2167                 RETURN (-EINVAL);
2168         }
2169
2170         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2171                logname);
2172
2173         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2174         if (rc)
2175                 RETURN(rc);
2176
2177         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2178         if (rc)
2179                 GOTO(out_destory, rc);
2180
2181         rc = name_create(&svname, mdtname, "-osp");
2182         if (rc)
2183                 GOTO(out_destory, rc);
2184
2185         sprintf(index_str, "-MDT%04x", mdt_index);
2186         rc = name_create(&ospname, svname, index_str);
2187         if (rc)
2188                 GOTO(out_destory, rc);
2189
2190         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2191         if (rc)
2192                 GOTO(out_destory, rc);
2193
2194         rc = name_create(&lovuuid, lovname, "_UUID");
2195         if (rc)
2196                 GOTO(out_destory, rc);
2197
2198         rc = name_create(&mdtuuid, mdtname, "_UUID");
2199         if (rc)
2200                 GOTO(out_destory, rc);
2201
2202         rc = record_start_log(env, mgs, &llh, logname);
2203         if (rc)
2204                 GOTO(out_destory, rc);
2205
2206         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2207                            "add osp");
2208         if (rc)
2209                 GOTO(out_destory, rc);
2210
2211         for (i = 0; i < mti->mti_nid_count; i++) {
2212                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2213                        libcfs_nid2str(mti->mti_nids[i]));
2214                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2215                 if (rc)
2216                         GOTO(out_end, rc);
2217         }
2218
2219         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2220         if (rc)
2221                 GOTO(out_end, rc);
2222
2223         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2224                           NULL, NULL);
2225         if (rc)
2226                 GOTO(out_end, rc);
2227
2228         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2229         if (rc)
2230                 GOTO(out_end, rc);
2231
2232         /* Add mdc(osp) to lod */
2233         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2234                  mti->mti_stripe_index);
2235         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2236                          index_str, "1", NULL);
2237         if (rc)
2238                 GOTO(out_end, rc);
2239
2240         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2241         if (rc)
2242                 GOTO(out_end, rc);
2243
2244 out_end:
2245         record_end_log(env, &llh);
2246
2247 out_destory:
2248         name_destroy(&mdtuuid);
2249         name_destroy(&lovuuid);
2250         name_destroy(&lovname);
2251         name_destroy(&ospname);
2252         name_destroy(&svname);
2253         name_destroy(&nodeuuid);
2254         name_destroy(&mdtname);
2255         RETURN(rc);
2256 }
2257
2258 static int mgs_write_log_mdt0(const struct lu_env *env,
2259                               struct mgs_device *mgs,
2260                               struct fs_db *fsdb,
2261                               struct mgs_target_info *mti)
2262 {
2263         char *log = mti->mti_svname;
2264         struct llog_handle *llh = NULL;
2265         char *uuid, *lovname;
2266         char mdt_index[6];
2267         char *ptr = mti->mti_params;
2268         int rc = 0, failout = 0;
2269         ENTRY;
2270
2271         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2272         if (uuid == NULL)
2273                 RETURN(-ENOMEM);
2274
2275         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2276                 failout = (strncmp(ptr, "failout", 7) == 0);
2277
2278         rc = name_create(&lovname, log, "-mdtlov");
2279         if (rc)
2280                 GOTO(out_free, rc);
2281         if (mgs_log_is_empty(env, mgs, log)) {
2282                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2283                 if (rc)
2284                         GOTO(out_lod, rc);
2285         }
2286
2287         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2288
2289         rc = record_start_log(env, mgs, &llh, log);
2290         if (rc)
2291                 GOTO(out_lod, rc);
2292
2293         /* add MDT itself */
2294
2295         /* FIXME this whole fn should be a single journal transaction */
2296         sprintf(uuid, "%s_UUID", log);
2297         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2298         if (rc)
2299                 GOTO(out_lod, rc);
2300         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2301         if (rc)
2302                 GOTO(out_end, rc);
2303         rc = record_mount_opt(env, llh, log, lovname, NULL);
2304         if (rc)
2305                 GOTO(out_end, rc);
2306         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2307                         failout ? "n" : "f");
2308         if (rc)
2309                 GOTO(out_end, rc);
2310         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2311         if (rc)
2312                 GOTO(out_end, rc);
2313 out_end:
2314         record_end_log(env, &llh);
2315 out_lod:
2316         name_destroy(&lovname);
2317 out_free:
2318         OBD_FREE(uuid, sizeof(struct obd_uuid));
2319         RETURN(rc);
2320 }
2321
2322 /* envelope method for all layers log */
2323 static int mgs_write_log_mdt(const struct lu_env *env,
2324                              struct mgs_device *mgs,
2325                              struct fs_db *fsdb,
2326                              struct mgs_target_info *mti)
2327 {
2328         struct mgs_thread_info *mgi = mgs_env_info(env);
2329         struct llog_handle *llh = NULL;
2330         char *cliname;
2331         int rc, i = 0;
2332         ENTRY;
2333
2334         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2335
2336         if (mti->mti_uuid[0] == '\0') {
2337                 /* Make up our own uuid */
2338                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2339                          "%s_UUID", mti->mti_svname);
2340         }
2341
2342         /* add mdt */
2343         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2344         if (rc)
2345                 RETURN(rc);
2346         /* Append the mdt info to the client log */
2347         rc = name_create(&cliname, mti->mti_fsname, "-client");
2348         if (rc)
2349                 RETURN(rc);
2350
2351         if (mgs_log_is_empty(env, mgs, cliname)) {
2352                 /* Start client log */
2353                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2354                                        fsdb->fsdb_clilov);
2355                 if (rc)
2356                         GOTO(out_free, rc);
2357                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2358                                        fsdb->fsdb_clilmv);
2359                 if (rc)
2360                         GOTO(out_free, rc);
2361         }
2362
2363         /*
2364         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2365         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2366         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2367         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2368         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2369         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2370         */
2371
2372                 /* copy client info about lov/lmv */
2373                 mgi->mgi_comp.comp_mti = mti;
2374                 mgi->mgi_comp.comp_fsdb = fsdb;
2375
2376                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2377                                                         &mgi->mgi_comp);
2378                 if (rc)
2379                         GOTO(out_free, rc);
2380                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2381                                               fsdb->fsdb_clilmv);
2382                 if (rc)
2383                         GOTO(out_free, rc);
2384
2385                 /* add mountopts */
2386                 rc = record_start_log(env, mgs, &llh, cliname);
2387                 if (rc)
2388                         GOTO(out_free, rc);
2389
2390                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2391                                    "mount opts");
2392                 if (rc)
2393                         GOTO(out_end, rc);
2394                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2395                                       fsdb->fsdb_clilmv);
2396                 if (rc)
2397                         GOTO(out_end, rc);
2398                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2399                                    "mount opts");
2400
2401         if (rc)
2402                 GOTO(out_end, rc);
2403
2404         /* for_all_existing_mdt except current one */
2405         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2406                 if (i !=  mti->mti_stripe_index &&
2407                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2408                         char *logname;
2409
2410                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2411                         if (rc)
2412                                 GOTO(out_end, rc);
2413
2414                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2415                                                       i, logname);
2416                         name_destroy(&logname);
2417                         if (rc)
2418                                 GOTO(out_end, rc);
2419                 }
2420         }
2421 out_end:
2422         record_end_log(env, &llh);
2423 out_free:
2424         name_destroy(&cliname);
2425         RETURN(rc);
2426 }
2427
2428 /* Add the ost info to the client/mdt lov */
2429 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2430                                     struct mgs_device *mgs, struct fs_db *fsdb,
2431                                     struct mgs_target_info *mti,
2432                                     char *logname, char *suffix, char *lovname,
2433                                     enum lustre_sec_part sec_part, int flags)
2434 {
2435         struct llog_handle *llh = NULL;
2436         char *nodeuuid = NULL;
2437         char *oscname = NULL;
2438         char *oscuuid = NULL;
2439         char *lovuuid = NULL;
2440         char *svname = NULL;
2441         char index[6];
2442         int i, rc;
2443
2444         ENTRY;
2445         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2446                mti->mti_svname, logname);
2447
2448         if (mgs_log_is_empty(env, mgs, logname)) {
2449                 CERROR("log is empty! Logical error\n");
2450                 RETURN (-EINVAL);
2451         }
2452
2453         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2454         if (rc)
2455                 RETURN(rc);
2456         rc = name_create(&svname, mti->mti_svname, "-osc");
2457         if (rc)
2458                 GOTO(out_free, rc);
2459
2460         /* for the system upgraded from old 1.8, keep using the old osc naming
2461          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2462         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2463                 rc = name_create(&oscname, svname, "");
2464         else
2465                 rc = name_create(&oscname, svname, suffix);
2466         if (rc)
2467                 GOTO(out_free, rc);
2468
2469         rc = name_create(&oscuuid, oscname, "_UUID");
2470         if (rc)
2471                 GOTO(out_free, rc);
2472         rc = name_create(&lovuuid, lovname, "_UUID");
2473         if (rc)
2474                 GOTO(out_free, rc);
2475
2476
2477         /*
2478         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2479         multihomed (#4)
2480         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2481         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2482         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2483         failover (#6,7)
2484         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2485         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2486         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2487         */
2488
2489         rc = record_start_log(env, mgs, &llh, logname);
2490         if (rc)
2491                 GOTO(out_free, rc);
2492
2493         /* FIXME these should be a single journal transaction */
2494         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2495                            "add osc");
2496         if (rc)
2497                 GOTO(out_end, rc);
2498
2499         /* NB: don't change record order, because upon MDT steal OSC config
2500          * from client, it treats all nids before LCFG_SETUP as target nids
2501          * (multiple interfaces), while nids after as failover node nids.
2502          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2503          */
2504         for (i = 0; i < mti->mti_nid_count; i++) {
2505                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2506                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2507                 if (rc)
2508                         GOTO(out_end, rc);
2509         }
2510         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2511         if (rc)
2512                 GOTO(out_end, rc);
2513         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2514         if (rc)
2515                 GOTO(out_end, rc);
2516         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2517         if (rc)
2518                 GOTO(out_end, rc);
2519
2520         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2521
2522         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2523         if (rc)
2524                 GOTO(out_end, rc);
2525         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2526                            "add osc");
2527         if (rc)
2528                 GOTO(out_end, rc);
2529 out_end:
2530         record_end_log(env, &llh);
2531 out_free:
2532         name_destroy(&lovuuid);
2533         name_destroy(&oscuuid);
2534         name_destroy(&oscname);
2535         name_destroy(&svname);
2536         name_destroy(&nodeuuid);
2537         RETURN(rc);
2538 }
2539
2540 static int mgs_write_log_ost(const struct lu_env *env,
2541                              struct mgs_device *mgs, struct fs_db *fsdb,
2542                              struct mgs_target_info *mti)
2543 {
2544         struct llog_handle *llh = NULL;
2545         char *logname, *lovname;
2546         char *ptr = mti->mti_params;
2547         int rc, flags = 0, failout = 0, i;
2548         ENTRY;
2549
2550         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2551
2552         /* The ost startup log */
2553
2554         /* If the ost log already exists, that means that someone reformatted
2555            the ost and it called target_add again. */
2556         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2557                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2558                                    "exists, yet the server claims it never "
2559                                    "registered. It may have been reformatted, "
2560                                    "or the index changed. writeconf the MDT to "
2561                                    "regenerate all logs.\n", mti->mti_svname);
2562                 RETURN(-EALREADY);
2563         }
2564
2565         /*
2566         attach obdfilter ost1 ost1_UUID
2567         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2568         */
2569         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2570                 failout = (strncmp(ptr, "failout", 7) == 0);
2571         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2572         if (rc)
2573                 RETURN(rc);
2574         /* FIXME these should be a single journal transaction */
2575         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2576         if (rc)
2577                 GOTO(out_end, rc);
2578         if (*mti->mti_uuid == '\0')
2579                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2580                          "%s_UUID", mti->mti_svname);
2581         rc = record_attach(env, llh, mti->mti_svname,
2582                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2583         if (rc)
2584                 GOTO(out_end, rc);
2585         rc = record_setup(env, llh, mti->mti_svname,
2586                           "dev"/*ignored*/, "type"/*ignored*/,
2587                           failout ? "n" : "f", 0/*options*/);
2588         if (rc)
2589                 GOTO(out_end, rc);
2590         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2591         if (rc)
2592                 GOTO(out_end, rc);
2593 out_end:
2594         record_end_log(env, &llh);
2595         if (rc)
2596                 RETURN(rc);
2597         /* We also have to update the other logs where this osc is part of
2598            the lov */
2599
2600         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2601                 /* If we're upgrading, the old mdt log already has our
2602                    entry. Let's do a fake one for fun. */
2603                 /* Note that we can't add any new failnids, since we don't
2604                    know the old osc names. */
2605                 flags = CM_SKIP | CM_UPGRADE146;
2606
2607         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2608                 /* If the update flag isn't set, don't update client/mdt
2609                    logs. */
2610                 flags |= CM_SKIP;
2611                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2612                               "the MDT first to regenerate it.\n",
2613                               mti->mti_svname);
2614         }
2615
2616         /* Add ost to all MDT lov defs */
2617         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2618                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2619                         char mdt_index[9];
2620
2621                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2622                                                      i);
2623                         if (rc)
2624                                 RETURN(rc);
2625                         sprintf(mdt_index, "-MDT%04x", i);
2626                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2627                                                       logname, mdt_index,
2628                                                       lovname, LUSTRE_SP_MDT,
2629                                                       flags);
2630                         name_destroy(&logname);
2631                         name_destroy(&lovname);
2632                         if (rc)
2633                                 RETURN(rc);
2634                 }
2635         }
2636
2637         /* Append ost info to the client log */
2638         rc = name_create(&logname, mti->mti_fsname, "-client");
2639         if (rc)
2640                 RETURN(rc);
2641         if (mgs_log_is_empty(env, mgs, logname)) {
2642                 /* Start client log */
2643                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2644                                        fsdb->fsdb_clilov);
2645                 if (rc)
2646                         GOTO(out_free, rc);
2647                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2648                                        fsdb->fsdb_clilmv);
2649                 if (rc)
2650                         GOTO(out_free, rc);
2651         }
2652         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2653                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2654 out_free:
2655         name_destroy(&logname);
2656         RETURN(rc);
2657 }
2658
2659 static __inline__ int mgs_param_empty(char *ptr)
2660 {
2661         char *tmp;
2662
2663         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2664                 return 1;
2665         return 0;
2666 }
2667
2668 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2669                                           struct mgs_device *mgs,
2670                                           struct fs_db *fsdb,
2671                                           struct mgs_target_info *mti,
2672                                           char *logname, char *cliname)
2673 {
2674         int rc;
2675         struct llog_handle *llh = NULL;
2676
2677         if (mgs_param_empty(mti->mti_params)) {
2678                 /* Remove _all_ failnids */
2679                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2680                                 mti->mti_svname, "add failnid", CM_SKIP);
2681                 return rc < 0 ? rc : 0;
2682         }
2683
2684         /* Otherwise failover nids are additive */
2685         rc = record_start_log(env, mgs, &llh, logname);
2686         if (rc)
2687                 return rc;
2688                 /* FIXME this should be a single journal transaction */
2689         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2690                            "add failnid");
2691         if (rc)
2692                 goto out_end;
2693         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2694         if (rc)
2695                 goto out_end;
2696         rc = record_marker(env, llh, fsdb, CM_END,
2697                            mti->mti_svname, "add failnid");
2698 out_end:
2699         record_end_log(env, &llh);
2700         return rc;
2701 }
2702
2703
2704 /* Add additional failnids to an existing log.
2705    The mdc/osc must have been added to logs first */
2706 /* tcp nids must be in dotted-quad ascii -
2707    we can't resolve hostnames from the kernel. */
2708 static int mgs_write_log_add_failnid(const struct lu_env *env,
2709                                      struct mgs_device *mgs,
2710                                      struct fs_db *fsdb,
2711                                      struct mgs_target_info *mti)
2712 {
2713         char *logname, *cliname;
2714         int rc;
2715         ENTRY;
2716
2717         /* FIXME we currently can't erase the failnids
2718          * given when a target first registers, since they aren't part of
2719          * an "add uuid" stanza */
2720
2721         /* Verify that we know about this target */
2722         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2723                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2724                                    "yet. It must be started before failnids "
2725                                    "can be added.\n", mti->mti_svname);
2726                 RETURN(-ENOENT);
2727         }
2728
2729         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2730         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2731                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2732         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2733                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2734         } else {
2735                 RETURN(-EINVAL);
2736         }
2737         if (rc)
2738                 RETURN(rc);
2739         /* Add failover nids to the client log */
2740         rc = name_create(&logname, mti->mti_fsname, "-client");
2741         if (rc) {
2742                 name_destroy(&cliname);
2743                 RETURN(rc);
2744         }
2745         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2746         name_destroy(&logname);
2747         name_destroy(&cliname);
2748         if (rc)
2749                 RETURN(rc);
2750
2751         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2752                 /* Add OST failover nids to the MDT logs as well */
2753                 int i;
2754
2755                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2756                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2757                                 continue;
2758                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2759                         if (rc)
2760                                 RETURN(rc);
2761                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2762                                                  fsdb, i);
2763                         if (rc) {
2764                                 name_destroy(&logname);
2765                                 RETURN(rc);
2766                         }
2767                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2768                                                             mti, logname,
2769                                                             cliname);
2770                         name_destroy(&cliname);
2771                         name_destroy(&logname);
2772                         if (rc)
2773                                 RETURN(rc);
2774                 }
2775         }
2776
2777         RETURN(rc);
2778 }
2779
2780 static int mgs_wlp_lcfg(const struct lu_env *env,
2781                         struct mgs_device *mgs, struct fs_db *fsdb,
2782                         struct mgs_target_info *mti,
2783                         char *logname, struct lustre_cfg_bufs *bufs,
2784                         char *tgtname, char *ptr)
2785 {
2786         char comment[MTI_NAME_MAXLEN];
2787         char *tmp;
2788         struct lustre_cfg *lcfg;
2789         int rc, del;
2790
2791         /* Erase any old settings of this same parameter */
2792         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2793         comment[MTI_NAME_MAXLEN - 1] = 0;
2794         /* But don't try to match the value. */
2795         if ((tmp = strchr(comment, '=')))
2796             *tmp = 0;
2797         /* FIXME we should skip settings that are the same as old values */
2798         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2799         if (rc < 0)
2800                 return rc;
2801         del = mgs_param_empty(ptr);
2802
2803         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2804                       "Sett" : "Modify", tgtname, comment, logname);
2805         if (del)
2806                 return rc;
2807
2808         lustre_cfg_bufs_reset(bufs, tgtname);
2809         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2810         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2811         if (!lcfg)
2812                 return -ENOMEM;
2813         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2814         lustre_cfg_free(lcfg);
2815         return rc;
2816 }
2817
2818 /* write global variable settings into log */
2819 static int mgs_write_log_sys(const struct lu_env *env,
2820                              struct mgs_device *mgs, struct fs_db *fsdb,
2821                              struct mgs_target_info *mti, char *sys, char *ptr)
2822 {
2823         struct mgs_thread_info *mgi = mgs_env_info(env);
2824         struct lustre_cfg *lcfg;
2825         char *tmp, sep;
2826         int rc, cmd, convert = 1;
2827
2828         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2829                 cmd = LCFG_SET_TIMEOUT;
2830         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2831                 cmd = LCFG_SET_LDLM_TIMEOUT;
2832         /* Check for known params here so we can return error to lctl */
2833         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2834                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2835                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2836                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2837                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2838                 cmd = LCFG_PARAM;
2839         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2840                 convert = 0; /* Don't convert string value to integer */
2841                 cmd = LCFG_PARAM;
2842         } else {
2843                 return -EINVAL;
2844         }
2845
2846         if (mgs_param_empty(ptr))
2847                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2848         else
2849                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2850
2851         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2852         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2853         if (!convert && *tmp != '\0')
2854                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2855         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2856         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2857         /* truncate the comment to the parameter name */
2858         ptr = tmp - 1;
2859         sep = *ptr;
2860         *ptr = '\0';
2861         /* modify all servers and clients */
2862         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2863                                       *tmp == '\0' ? NULL : lcfg,
2864                                       mti->mti_fsname, sys, 0);
2865         if (rc == 0 && *tmp != '\0') {
2866                 switch (cmd) {
2867                 case LCFG_SET_TIMEOUT:
2868                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2869                                 class_process_config(lcfg);
2870                         break;
2871                 case LCFG_SET_LDLM_TIMEOUT:
2872                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2873                                 class_process_config(lcfg);
2874                         break;
2875                 default:
2876                         break;
2877                 }
2878         }
2879         *ptr = sep;
2880         lustre_cfg_free(lcfg);
2881         return rc;
2882 }
2883
2884 /* write quota settings into log */
2885 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2886                                struct fs_db *fsdb, struct mgs_target_info *mti,
2887                                char *quota, char *ptr)
2888 {
2889         struct mgs_thread_info  *mgi = mgs_env_info(env);
2890         struct lustre_cfg       *lcfg;
2891         char                    *tmp;
2892         char                     sep;
2893         int                      rc, cmd = LCFG_PARAM;
2894
2895         /* support only 'meta' and 'data' pools so far */
2896         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2897             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2898                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2899                        "& quota.ost are)\n", ptr);
2900                 return -EINVAL;
2901         }
2902
2903         if (*tmp == '\0') {
2904                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2905         } else {
2906                 CDEBUG(D_MGS, "global '%s'\n", quota);
2907
2908                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2909                     strcmp(tmp, "none") != 0) {
2910                         CERROR("enable option(%s) isn't supported\n", tmp);
2911                         return -EINVAL;
2912                 }
2913         }
2914
2915         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2916         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2917         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2918         /* truncate the comment to the parameter name */
2919         ptr = tmp - 1;
2920         sep = *ptr;
2921         *ptr = '\0';
2922
2923         /* XXX we duplicated quota enable information in all server
2924          *     config logs, it should be moved to a separate config
2925          *     log once we cleanup the config log for global param. */
2926         /* modify all servers */
2927         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2928                                       *tmp == '\0' ? NULL : lcfg,
2929                                       mti->mti_fsname, quota, 1);
2930         *ptr = sep;
2931         lustre_cfg_free(lcfg);
2932         return rc < 0 ? rc : 0;
2933 }
2934
2935 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2936                                    struct mgs_device *mgs,
2937                                    struct fs_db *fsdb,
2938                                    struct mgs_target_info *mti,
2939                                    char *param)
2940 {
2941         struct mgs_thread_info *mgi = mgs_env_info(env);
2942         struct llog_handle     *llh = NULL;
2943         char                   *logname;
2944         char                   *comment, *ptr;
2945         struct lustre_cfg      *lcfg;
2946         int                     rc, len;
2947         ENTRY;
2948
2949         /* get comment */
2950         ptr = strchr(param, '=');
2951         LASSERT(ptr);
2952         len = ptr - param;
2953
2954         OBD_ALLOC(comment, len + 1);
2955         if (comment == NULL)
2956                 RETURN(-ENOMEM);
2957         strncpy(comment, param, len);
2958         comment[len] = '\0';
2959
2960         /* prepare lcfg */
2961         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2962         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2963         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2964         if (lcfg == NULL)
2965                 GOTO(out_comment, rc = -ENOMEM);
2966
2967         /* construct log name */
2968         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2969         if (rc)
2970                 GOTO(out_lcfg, rc);
2971
2972         if (mgs_log_is_empty(env, mgs, logname)) {
2973                 rc = record_start_log(env, mgs, &llh, logname);
2974                 if (rc)
2975                         GOTO(out, rc);
2976                 record_end_log(env, &llh);
2977         }
2978
2979         /* obsolete old one */
2980         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2981                         comment, CM_SKIP);
2982         if (rc < 0)
2983                 GOTO(out, rc);
2984         /* write the new one */
2985         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2986                                   mti->mti_svname, comment);
2987         if (rc)
2988                 CERROR("err %d writing log %s\n", rc, logname);
2989 out:
2990         name_destroy(&logname);
2991 out_lcfg:
2992         lustre_cfg_free(lcfg);
2993 out_comment:
2994         OBD_FREE(comment, len + 1);
2995         RETURN(rc);
2996 }
2997
2998 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2999                                         char *param)
3000 {
3001         char    *ptr;
3002
3003         /* disable the adjustable udesc parameter for now, i.e. use default
3004          * setting that client always ship udesc to MDT if possible. to enable
3005          * it simply remove the following line */
3006         goto error_out;
3007
3008         ptr = strchr(param, '=');
3009         if (ptr == NULL)
3010                 goto error_out;
3011         *ptr++ = '\0';
3012
3013         if (strcmp(param, PARAM_SRPC_UDESC))
3014                 goto error_out;
3015
3016         if (strcmp(ptr, "yes") == 0) {
3017                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3018                 CWARN("Enable user descriptor shipping from client to MDT\n");
3019         } else if (strcmp(ptr, "no") == 0) {
3020                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3021                 CWARN("Disable user descriptor shipping from client to MDT\n");
3022         } else {
3023                 *(ptr - 1) = '=';
3024                 goto error_out;
3025         }
3026         return 0;
3027
3028 error_out:
3029         CERROR("Invalid param: %s\n", param);
3030         return -EINVAL;
3031 }
3032
3033 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3034                                   const char *svname,
3035                                   char *param)
3036 {
3037         struct sptlrpc_rule      rule;
3038         struct sptlrpc_rule_set *rset;
3039         int                      rc;
3040         ENTRY;
3041
3042         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3043                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3044                 RETURN(-EINVAL);
3045         }
3046
3047         if (strncmp(param, PARAM_SRPC_UDESC,
3048                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3049                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3050         }
3051
3052         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3053                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3054                 RETURN(-EINVAL);
3055         }
3056
3057         param += sizeof(PARAM_SRPC_FLVR) - 1;
3058
3059         rc = sptlrpc_parse_rule(param, &rule);
3060         if (rc)
3061                 RETURN(rc);
3062
3063         /* mgs rules implies must be mgc->mgs */
3064         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3065                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3066                      rule.sr_from != LUSTRE_SP_ANY) ||
3067                     (rule.sr_to != LUSTRE_SP_MGS &&
3068                      rule.sr_to != LUSTRE_SP_ANY))
3069                         RETURN(-EINVAL);
3070         }
3071
3072         /* preapre room for this coming rule. svcname format should be:
3073          * - fsname: general rule
3074          * - fsname-tgtname: target-specific rule
3075          */
3076         if (strchr(svname, '-')) {
3077                 struct mgs_tgt_srpc_conf *tgtconf;
3078                 int                       found = 0;
3079
3080                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3081                      tgtconf = tgtconf->mtsc_next) {
3082                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3083                                 found = 1;
3084                                 break;
3085                         }
3086                 }
3087
3088                 if (!found) {
3089                         int name_len;
3090
3091                         OBD_ALLOC_PTR(tgtconf);
3092                         if (tgtconf == NULL)
3093                                 RETURN(-ENOMEM);
3094
3095                         name_len = strlen(svname);
3096
3097                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3098                         if (tgtconf->mtsc_tgt == NULL) {
3099                                 OBD_FREE_PTR(tgtconf);
3100                                 RETURN(-ENOMEM);
3101                         }
3102                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3103
3104                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3105                         fsdb->fsdb_srpc_tgt = tgtconf;
3106                 }
3107
3108                 rset = &tgtconf->mtsc_rset;
3109         } else {
3110                 rset = &fsdb->fsdb_srpc_gen;
3111         }
3112
3113         rc = sptlrpc_rule_set_merge(rset, &rule);
3114
3115         RETURN(rc);
3116 }
3117
3118 static int mgs_srpc_set_param(const struct lu_env *env,
3119                               struct mgs_device *mgs,
3120                               struct fs_db *fsdb,
3121                               struct mgs_target_info *mti,
3122                               char *param)
3123 {
3124         char                   *copy;
3125         int                     rc, copy_size;
3126         ENTRY;
3127
3128 #ifndef HAVE_GSS
3129         RETURN(-EINVAL);
3130 #endif
3131         /* keep a copy of original param, which could be destroied
3132          * during parsing */
3133         copy_size = strlen(param) + 1;
3134         OBD_ALLOC(copy, copy_size);
3135         if (copy == NULL)
3136                 return -ENOMEM;
3137         memcpy(copy, param, copy_size);
3138
3139         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3140         if (rc)
3141                 goto out_free;
3142
3143         /* previous steps guaranteed the syntax is correct */
3144         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3145         if (rc)
3146                 goto out_free;
3147
3148         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3149                 /*
3150                  * for mgs rules, make them effective immediately.
3151                  */
3152                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3153                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3154                                                  &fsdb->fsdb_srpc_gen);
3155         }
3156
3157 out_free:
3158         OBD_FREE(copy, copy_size);
3159         RETURN(rc);
3160 }
3161
3162 struct mgs_srpc_read_data {
3163         struct fs_db   *msrd_fsdb;
3164         int             msrd_skip;
3165 };
3166
3167 static int mgs_srpc_read_handler(const struct lu_env *env,
3168                                  struct llog_handle *llh,
3169                                  struct llog_rec_hdr *rec, void *data)
3170 {
3171         struct mgs_srpc_read_data *msrd = data;
3172         struct cfg_marker         *marker;
3173         struct lustre_cfg         *lcfg = REC_DATA(rec);
3174         char                      *svname, *param;
3175         int                        cfg_len, rc;
3176         ENTRY;
3177
3178         if (rec->lrh_type != OBD_CFG_REC) {
3179                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3180                 RETURN(-EINVAL);
3181         }
3182
3183         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3184                   sizeof(struct llog_rec_tail);
3185
3186         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3187         if (rc) {
3188                 CERROR("Insane cfg\n");
3189                 RETURN(rc);
3190         }
3191
3192         if (lcfg->lcfg_command == LCFG_MARKER) {
3193                 marker = lustre_cfg_buf(lcfg, 1);
3194
3195                 if (marker->cm_flags & CM_START &&
3196                     marker->cm_flags & CM_SKIP)
3197                         msrd->msrd_skip = 1;
3198                 if (marker->cm_flags & CM_END)
3199                         msrd->msrd_skip = 0;
3200
3201                 RETURN(0);
3202         }
3203
3204         if (msrd->msrd_skip)
3205                 RETURN(0);
3206
3207         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3208                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3209                 RETURN(0);
3210         }
3211
3212         svname = lustre_cfg_string(lcfg, 0);
3213         if (svname == NULL) {
3214                 CERROR("svname is empty\n");
3215                 RETURN(0);
3216         }
3217
3218         param = lustre_cfg_string(lcfg, 1);
3219         if (param == NULL) {
3220                 CERROR("param is empty\n");
3221                 RETURN(0);
3222         }
3223
3224         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3225         if (rc)
3226                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3227
3228         RETURN(0);
3229 }
3230
3231 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3232                                 struct mgs_device *mgs,
3233                                 struct fs_db *fsdb)
3234 {
3235         struct llog_handle        *llh = NULL;
3236         struct llog_ctxt          *ctxt;
3237         char                      *logname;
3238         struct mgs_srpc_read_data  msrd;
3239         int                        rc;
3240         ENTRY;
3241
3242         /* construct log name */
3243         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3244         if (rc)
3245                 RETURN(rc);
3246
3247         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3248         LASSERT(ctxt != NULL);
3249
3250         if (mgs_log_is_empty(env, mgs, logname))
3251                 GOTO(out, rc = 0);
3252
3253         rc = llog_open(env, ctxt, &llh, NULL, logname,
3254                        LLOG_OPEN_EXISTS);
3255         if (rc < 0) {
3256                 if (rc == -ENOENT)
3257                         rc = 0;
3258                 GOTO(out, rc);
3259         }
3260
3261         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);