Whamcloud - gitweb
LU-2059 llog: MGC to use OSD API for backup logs
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         mutex_init(&fsdb->fsdb_mutex);
349         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551         } else {
552                 GOTO(out_up, rc = -EINVAL);
553         }
554
555         if (mti->mti_flags & LDD_F_NEED_INDEX) {
556                 rc = next_index(imap, INDEX_MAP_SIZE);
557                 if (rc == -1)
558                         GOTO(out_up, rc = -ERANGE);
559                 mti->mti_stripe_index = rc;
560                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
561                         fsdb->fsdb_mdt_count ++;
562         }
563
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
566                                    "but the max index is %d.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 /* Header and tail are added back to lrh_len in
648                    llog_lvfs_write_rec */
649                 rec->lrh_len = cfg_len;
650                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
651                                 rec->lrh_index);
652                 if (!rc)
653                          mml->mml_modified++;
654         }
655
656         RETURN(rc);
657 }
658
659 /**
660  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
661  * Return code:
662  * 0 - modified successfully,
663  * 1 - no modification was done
664  * negative - error
665  */
666 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
667                       struct fs_db *fsdb, struct mgs_target_info *mti,
668                       char *logname, char *devname, char *comment, int flags)
669 {
670         struct llog_handle *loghandle;
671         struct llog_ctxt *ctxt;
672         struct mgs_modify_lookup *mml;
673         int rc;
674
675         ENTRY;
676
677         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
678         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
679                flags);
680
681         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
682         LASSERT(ctxt != NULL);
683         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
684         if (rc < 0) {
685                 if (rc == -ENOENT)
686                         rc = 0;
687                 GOTO(out_pop, rc);
688         }
689
690         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
691         if (rc)
692                 GOTO(out_close, rc);
693
694         if (llog_get_size(loghandle) <= 1)
695                 GOTO(out_close, rc = 0);
696
697         OBD_ALLOC_PTR(mml);
698         if (!mml)
699                 GOTO(out_close, rc = -ENOMEM);
700         strcpy(mml->mml_marker.cm_comment, comment);
701         strcpy(mml->mml_marker.cm_tgtname, devname);
702         /* Modify mostly means cancel */
703         mml->mml_marker.cm_flags = flags;
704         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
705         mml->mml_modified = 0;
706         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
707                           NULL);
708         if (!rc && !mml->mml_modified)
709                 rc = 1;
710         OBD_FREE_PTR(mml);
711
712 out_close:
713         llog_close(env, loghandle);
714 out_pop:
715         if (rc < 0)
716                 CERROR("%s: modify %s/%s failed: rc = %d\n",
717                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
718         llog_ctxt_put(ctxt);
719         RETURN(rc);
720 }
721
722 /** This structure is passed to mgs_replace_handler */
723 struct mgs_replace_uuid_lookup {
724         /* Nids are replaced for this target device */
725         struct mgs_target_info target;
726         /* Temporary modified llog */
727         struct llog_handle *temp_llh;
728         /* Flag is set if in target block*/
729         int in_target_device;
730         /* Nids already added. Just skip (multiple nids) */
731         int device_nids_added;
732         /* Flag is set if this block should not be copied */
733         int skip_it;
734 };
735
736 /**
737  * Check: a) if block should be skipped
738  * b) is it target block
739  *
740  * \param[in] lcfg
741  * \param[in] mrul
742  *
743  * \retval 0 should not to be skipped
744  * \retval 1 should to be skipped
745  */
746 static int check_markers(struct lustre_cfg *lcfg,
747                          struct mgs_replace_uuid_lookup *mrul)
748 {
749          struct cfg_marker *marker;
750
751         /* Track markers. Find given device */
752         if (lcfg->lcfg_command == LCFG_MARKER) {
753                 marker = lustre_cfg_buf(lcfg, 1);
754                 /* Clean llog from records marked as CM_EXCLUDE.
755                    CM_SKIP records are used for "active" command
756                    and can be restored if needed */
757                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
758                     (CM_EXCLUDE | CM_START)) {
759                         mrul->skip_it = 1;
760                         return 1;
761                 }
762
763                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
764                     (CM_EXCLUDE | CM_END)) {
765                         mrul->skip_it = 0;
766                         return 1;
767                 }
768
769                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
770                         LASSERT(!(marker->cm_flags & CM_START) ||
771                                 !(marker->cm_flags & CM_END));
772                         if (marker->cm_flags & CM_START) {
773                                 mrul->in_target_device = 1;
774                                 mrul->device_nids_added = 0;
775                         } else if (marker->cm_flags & CM_END)
776                                 mrul->in_target_device = 0;
777                 }
778         }
779
780         return 0;
781 }
782
783 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
784                        struct lustre_cfg *lcfg)
785 {
786         struct llog_rec_hdr      rec;
787         int                      buflen, rc;
788
789         if (!lcfg || !llh)
790                 return -ENOMEM;
791
792         LASSERT(llh->lgh_ctxt);
793
794         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
795                                 lcfg->lcfg_buflens);
796         rec.lrh_len = llog_data_len(buflen);
797         rec.lrh_type = OBD_CFG_REC;
798
799         /* idx = -1 means append */
800         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
801         if (rc)
802                 CERROR("failed %d\n", rc);
803         return rc;
804 }
805
806 static int record_base(const struct lu_env *env, struct llog_handle *llh,
807                      char *cfgname, lnet_nid_t nid, int cmd,
808                      char *s1, char *s2, char *s3, char *s4)
809 {
810         struct mgs_thread_info *mgi = mgs_env_info(env);
811         struct lustre_cfg     *lcfg;
812         int rc;
813
814         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
815                cmd, s1, s2, s3, s4);
816
817         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
818         if (s1)
819                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
820         if (s2)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
822         if (s3)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
824         if (s4)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
826
827         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
828         if (!lcfg)
829                 return -ENOMEM;
830         lcfg->lcfg_nid = nid;
831
832         rc = record_lcfg(env, llh, lcfg);
833
834         lustre_cfg_free(lcfg);
835
836         if (rc) {
837                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
838                        cmd, s1, s2, s3, s4);
839         }
840         return rc;
841 }
842
843 static inline int record_add_uuid(const struct lu_env *env,
844                                   struct llog_handle *llh,
845                                   uint64_t nid, char *uuid)
846 {
847         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
848 }
849
850 static inline int record_add_conn(const struct lu_env *env,
851                                   struct llog_handle *llh,
852                                   char *devname, char *uuid)
853 {
854         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
855 }
856
857 static inline int record_attach(const struct lu_env *env,
858                                 struct llog_handle *llh, char *devname,
859                                 char *type, char *uuid)
860 {
861         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
862 }
863
864 static inline int record_setup(const struct lu_env *env,
865                                struct llog_handle *llh, char *devname,
866                                char *s1, char *s2, char *s3, char *s4)
867 {
868         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
869 }
870
871 /**
872  * \retval <0 record processing error
873  * \retval n record is processed. No need copy original one.
874  * \retval 0 record is not processed.
875  */
876 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
877                            struct mgs_replace_uuid_lookup *mrul)
878 {
879         int nids_added = 0;
880         lnet_nid_t nid;
881         char *ptr;
882         int rc;
883
884         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
885                 /* LCFG_ADD_UUID command found. Let's skip original command
886                    and add passed nids */
887                 ptr = mrul->target.mti_params;
888                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
889                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
890                                "device %s\n", libcfs_nid2str(nid),
891                                 mrul->target.mti_params,
892                                 mrul->target.mti_svname);
893                         rc = record_add_uuid(env,
894                                              mrul->temp_llh, nid,
895                                              mrul->target.mti_params);
896                         if (!rc)
897                                 nids_added++;
898                 }
899
900                 if (nids_added == 0) {
901                         CERROR("No new nids were added, nid %s with uuid %s, "
902                                "device %s\n", libcfs_nid2str(nid),
903                                mrul->target.mti_params,
904                                mrul->target.mti_svname);
905                         RETURN(-ENXIO);
906                 } else {
907                         mrul->device_nids_added = 1;
908                 }
909
910                 return nids_added;
911         }
912
913         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
914                 /* LCFG_SETUP command found. UUID should be changed */
915                 rc = record_setup(env,
916                                   mrul->temp_llh,
917                                   /* devname the same */
918                                   lustre_cfg_string(lcfg, 0),
919                                   /* s1 is not changed */
920                                   lustre_cfg_string(lcfg, 1),
921                                   /* new uuid should be
922                                   the full nidlist */
923                                   mrul->target.mti_params,
924                                   /* s3 is not changed */
925                                   lustre_cfg_string(lcfg, 3),
926                                   /* s4 is not changed */
927                                   lustre_cfg_string(lcfg, 4));
928                 return rc ? rc : 1;
929         }
930
931         /* Another commands in target device block */
932         return 0;
933 }
934
935 /**
936  * Handler that called for every record in llog.
937  * Records are processed in order they placed in llog.
938  *
939  * \param[in] llh       log to be processed
940  * \param[in] rec       current record
941  * \param[in] data      mgs_replace_uuid_lookup structure
942  *
943  * \retval 0    success
944  */
945 static int mgs_replace_handler(const struct lu_env *env,
946                                struct llog_handle *llh,
947                                struct llog_rec_hdr *rec,
948                                void *data)
949 {
950         struct mgs_replace_uuid_lookup *mrul;
951         struct lustre_cfg *lcfg = REC_DATA(rec);
952         int cfg_len = REC_DATA_LEN(rec);
953         int rc;
954         ENTRY;
955
956         mrul = (struct mgs_replace_uuid_lookup *)data;
957
958         if (rec->lrh_type != OBD_CFG_REC) {
959                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
960                        rec->lrh_type, lcfg->lcfg_command,
961                        lustre_cfg_string(lcfg, 0),
962                        lustre_cfg_string(lcfg, 1));
963                 RETURN(-EINVAL);
964         }
965
966         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
967         if (rc) {
968                 /* Do not copy any invalidated records */
969                 GOTO(skip_out, rc = 0);
970         }
971
972         rc = check_markers(lcfg, mrul);
973         if (rc || mrul->skip_it)
974                 GOTO(skip_out, rc = 0);
975
976         /* Write to new log all commands outside target device block */
977         if (!mrul->in_target_device)
978                 GOTO(copy_out, rc = 0);
979
980         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
981            (failover nids) for this target, assuming that if then
982            primary is changing then so is the failover */
983         if (mrul->device_nids_added &&
984             (lcfg->lcfg_command == LCFG_ADD_UUID ||
985              lcfg->lcfg_command == LCFG_ADD_CONN))
986                 GOTO(skip_out, rc = 0);
987
988         rc = process_command(env, lcfg, mrul);
989         if (rc < 0)
990                 RETURN(rc);
991
992         if (rc)
993                 RETURN(0);
994 copy_out:
995         /* Record is placed in temporary llog as is */
996         rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1);
997
998         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
999                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1000                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1001         RETURN(rc);
1002
1003 skip_out:
1004         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1005                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1006                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1007         RETURN(rc);
1008 }
1009
1010 static int mgs_log_is_empty(const struct lu_env *env,
1011                             struct mgs_device *mgs, char *name)
1012 {
1013         struct llog_ctxt        *ctxt;
1014         int                      rc;
1015
1016         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1017         LASSERT(ctxt != NULL);
1018
1019         rc = llog_is_empty(env, ctxt, name);
1020         llog_ctxt_put(ctxt);
1021         return rc;
1022 }
1023
1024 static int mgs_replace_nids_log(const struct lu_env *env,
1025                                 struct obd_device *mgs, struct fs_db *fsdb,
1026                                 char *logname, char *devname, char *nids)
1027 {
1028         struct llog_handle *orig_llh, *backup_llh;
1029         struct llog_ctxt *ctxt;
1030         struct mgs_replace_uuid_lookup *mrul;
1031         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1032         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1033         char *backup;
1034         int rc, rc2;
1035         ENTRY;
1036
1037         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1038
1039         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1040         LASSERT(ctxt != NULL);
1041
1042         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1043                 /* Log is empty. Nothing to replace */
1044                 GOTO(out_put, rc = 0);
1045         }
1046
1047         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1048         if (backup == NULL)
1049                 GOTO(out_put, rc = -ENOMEM);
1050
1051         sprintf(backup, "%s.bak", logname);
1052
1053         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1054         if (rc == 0) {
1055                 /* Now erase original log file. Connections are not allowed.
1056                    Backup is already saved */
1057                 rc = llog_erase(env, ctxt, NULL, logname);
1058                 if (rc < 0)
1059                         GOTO(out_free, rc);
1060         } else if (rc != -ENOENT) {
1061                 CERROR("%s: can't make backup for %s: rc = %d\n",
1062                        mgs->obd_name, logname, rc);
1063                 GOTO(out_free,rc);
1064         }
1065
1066         /* open local log */
1067         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1068         if (rc)
1069                 GOTO(out_restore, rc);
1070
1071         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1072         if (rc)
1073                 GOTO(out_closel, rc);
1074
1075         /* open backup llog */
1076         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1077                        LLOG_OPEN_EXISTS);
1078         if (rc)
1079                 GOTO(out_closel, rc);
1080
1081         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1082         if (rc)
1083                 GOTO(out_close, rc);
1084
1085         if (llog_get_size(backup_llh) <= 1)
1086                 GOTO(out_close, rc = 0);
1087
1088         OBD_ALLOC_PTR(mrul);
1089         if (!mrul)
1090                 GOTO(out_close, rc = -ENOMEM);
1091         /* devname is only needed information to replace UUID records */
1092         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1093         /* parse nids later */
1094         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1095         /* Copy records to this temporary llog */
1096         mrul->temp_llh = orig_llh;
1097
1098         rc = llog_process(env, backup_llh, mgs_replace_handler,
1099                           (void *)mrul, NULL);
1100         OBD_FREE_PTR(mrul);
1101 out_close:
1102         rc2 = llog_close(NULL, backup_llh);
1103         if (!rc)
1104                 rc = rc2;
1105 out_closel:
1106         rc2 = llog_close(NULL, orig_llh);
1107         if (!rc)
1108                 rc = rc2;
1109
1110 out_restore:
1111         if (rc) {
1112                 CERROR("%s: llog should be restored: rc = %d\n",
1113                        mgs->obd_name, rc);
1114                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1115                                   logname);
1116                 if (rc2 < 0)
1117                         CERROR("%s: can't restore backup %s: rc = %d\n",
1118                                mgs->obd_name, logname, rc2);
1119         }
1120
1121 out_free:
1122         OBD_FREE(backup, strlen(backup) + 1);
1123
1124 out_put:
1125         llog_ctxt_put(ctxt);
1126
1127         if (rc)
1128                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1129                        mgs->obd_name, logname, rc);
1130
1131         RETURN(rc);
1132 }
1133
1134 /**
1135  * Parse device name and get file system name and/or device index
1136  *
1137  * \param[in]   devname device name (ex. lustre-MDT0000)
1138  * \param[out]  fsname  file system name(optional)
1139  * \param[out]  index   device index(optional)
1140  *
1141  * \retval 0    success
1142  */
1143 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1144 {
1145         int rc;
1146         ENTRY;
1147
1148         /* Extract fsname */
1149         if (fsname) {
1150                 rc = server_name2fsname(devname, fsname, NULL);
1151                 if (rc < 0) {
1152                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1153                                devname);
1154                         RETURN(-EINVAL);
1155                 }
1156         }
1157
1158         if (index) {
1159                 rc = server_name2index(devname, index, NULL);
1160                 if (rc < 0) {
1161                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1162                                devname);
1163                         RETURN(-EINVAL);
1164                 }
1165         }
1166
1167         RETURN(0);
1168 }
1169
1170 static int only_mgs_is_running(struct obd_device *mgs_obd)
1171 {
1172         /* TDB: Is global variable with devices count exists? */
1173         int num_devices = get_devices_count();
1174         /* osd, MGS and MGC + self_export
1175            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1176         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1177 }
1178
1179 static int name_create_mdt(char **logname, char *fsname, int i)
1180 {
1181         char mdt_index[9];
1182
1183         sprintf(mdt_index, "-MDT%04x", i);
1184         return name_create(logname, fsname, mdt_index);
1185 }
1186
1187 /**
1188  * Replace nids for \a device to \a nids values
1189  *
1190  * \param obd           MGS obd device
1191  * \param devname       nids need to be replaced for this device
1192  * (ex. lustre-OST0000)
1193  * \param nids          nids list (ex. nid1,nid2,nid3)
1194  *
1195  * \retval 0    success
1196  */
1197 int mgs_replace_nids(const struct lu_env *env,
1198                      struct mgs_device *mgs,
1199                      char *devname, char *nids)
1200 {
1201         /* Assume fsname is part of device name */
1202         char fsname[MTI_NAME_MAXLEN];
1203         int rc;
1204         __u32 index;
1205         char *logname;
1206         struct fs_db *fsdb;
1207         unsigned int i;
1208         int conn_state;
1209         struct obd_device *mgs_obd = mgs->mgs_obd;
1210         ENTRY;
1211
1212         /* We can only change NIDs if no other nodes are connected */
1213         spin_lock(&mgs_obd->obd_dev_lock);
1214         conn_state = mgs_obd->obd_no_conn;
1215         mgs_obd->obd_no_conn = 1;
1216         spin_unlock(&mgs_obd->obd_dev_lock);
1217
1218         /* We can not change nids if not only MGS is started */
1219         if (!only_mgs_is_running(mgs_obd)) {
1220                 CERROR("Only MGS is allowed to be started\n");
1221                 GOTO(out, rc = -EINPROGRESS);
1222         }
1223
1224         /* Get fsname and index*/
1225         rc = mgs_parse_devname(devname, fsname, &index);
1226         if (rc)
1227                 GOTO(out, rc);
1228
1229         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1230         if (rc) {
1231                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1232                 GOTO(out, rc);
1233         }
1234
1235         /* Process client llogs */
1236         name_create(&logname, fsname, "-client");
1237         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1238         name_destroy(&logname);
1239         if (rc) {
1240                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1241                        fsname, devname, rc);
1242                 GOTO(out, rc);
1243         }
1244
1245         /* Process MDT llogs */
1246         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1247                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1248                         continue;
1249                 name_create_mdt(&logname, fsname, i);
1250                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1251                 name_destroy(&logname);
1252                 if (rc)
1253                         GOTO(out, rc);
1254         }
1255
1256 out:
1257         spin_lock(&mgs_obd->obd_dev_lock);
1258         mgs_obd->obd_no_conn = conn_state;
1259         spin_unlock(&mgs_obd->obd_dev_lock);
1260
1261         RETURN(rc);
1262 }
1263
1264 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1265                             char *devname, struct lov_desc *desc)
1266 {
1267         struct mgs_thread_info *mgi = mgs_env_info(env);
1268         struct lustre_cfg *lcfg;
1269         int rc;
1270
1271         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1272         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1273         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1274         if (!lcfg)
1275                 return -ENOMEM;
1276         rc = record_lcfg(env, llh, lcfg);
1277
1278         lustre_cfg_free(lcfg);
1279         return rc;
1280 }
1281
1282 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1283                             char *devname, struct lmv_desc *desc)
1284 {
1285         struct mgs_thread_info *mgi = mgs_env_info(env);
1286         struct lustre_cfg *lcfg;
1287         int rc;
1288
1289         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1290         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1291         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1292
1293         rc = record_lcfg(env, llh, lcfg);
1294
1295         lustre_cfg_free(lcfg);
1296         return rc;
1297 }
1298
1299 static inline int record_mdc_add(const struct lu_env *env,
1300                                  struct llog_handle *llh,
1301                                  char *logname, char *mdcuuid,
1302                                  char *mdtuuid, char *index,
1303                                  char *gen)
1304 {
1305         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1306                            mdtuuid,index,gen,mdcuuid);
1307 }
1308
1309 static inline int record_lov_add(const struct lu_env *env,
1310                                  struct llog_handle *llh,
1311                                  char *lov_name, char *ost_uuid,
1312                                  char *index, char *gen)
1313 {
1314         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1315                            ost_uuid, index, gen, 0);
1316 }
1317
1318 static inline int record_mount_opt(const struct lu_env *env,
1319                                    struct llog_handle *llh,
1320                                    char *profile, char *lov_name,
1321                                    char *mdc_name)
1322 {
1323         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1324                            profile,lov_name,mdc_name,0);
1325 }
1326
1327 static int record_marker(const struct lu_env *env,
1328                          struct llog_handle *llh,
1329                          struct fs_db *fsdb, __u32 flags,
1330                          char *tgtname, char *comment)
1331 {
1332         struct mgs_thread_info *mgi = mgs_env_info(env);
1333         struct lustre_cfg *lcfg;
1334         int rc;
1335         int cplen = 0;
1336
1337         if (flags & CM_START)
1338                 fsdb->fsdb_gen++;
1339         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1340         mgi->mgi_marker.cm_flags = flags;
1341         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1342         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1343                         sizeof(mgi->mgi_marker.cm_tgtname));
1344         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1345                 return -E2BIG;
1346         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1347                         sizeof(mgi->mgi_marker.cm_comment));
1348         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1349                 return -E2BIG;
1350         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1351         mgi->mgi_marker.cm_canceltime = 0;
1352         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1353         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1354                             sizeof(mgi->mgi_marker));
1355         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1356         if (!lcfg)
1357                 return -ENOMEM;
1358         rc = record_lcfg(env, llh, lcfg);
1359
1360         lustre_cfg_free(lcfg);
1361         return rc;
1362 }
1363
1364 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1365                             struct llog_handle **llh, char *name)
1366 {
1367         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1368         struct llog_ctxt        *ctxt;
1369         int                      rc = 0;
1370
1371         if (*llh)
1372                 GOTO(out, rc = -EBUSY);
1373
1374         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1375         if (!ctxt)
1376                 GOTO(out, rc = -ENODEV);
1377         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1378
1379         rc = llog_open_create(env, ctxt, llh, NULL, name);
1380         if (rc)
1381                 GOTO(out_ctxt, rc);
1382         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1383         if (rc)
1384                 llog_close(env, *llh);
1385 out_ctxt:
1386         llog_ctxt_put(ctxt);
1387 out:
1388         if (rc) {
1389                 CERROR("%s: can't start log %s: rc = %d\n",
1390                        mgs->mgs_obd->obd_name, name, rc);
1391                 *llh = NULL;
1392         }
1393         RETURN(rc);
1394 }
1395
1396 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1397 {
1398         int rc;
1399
1400         rc = llog_close(env, *llh);
1401         *llh = NULL;
1402
1403         return rc;
1404 }
1405
1406 /******************** config "macros" *********************/
1407
1408 /* write an lcfg directly into a log (with markers) */
1409 static int mgs_write_log_direct(const struct lu_env *env,
1410                                 struct mgs_device *mgs, struct fs_db *fsdb,
1411                                 char *logname, struct lustre_cfg *lcfg,
1412                                 char *devname, char *comment)
1413 {
1414         struct llog_handle *llh = NULL;
1415         int rc;
1416         ENTRY;
1417
1418         if (!lcfg)
1419                 RETURN(-ENOMEM);
1420
1421         rc = record_start_log(env, mgs, &llh, logname);
1422         if (rc)
1423                 RETURN(rc);
1424
1425         /* FIXME These should be a single journal transaction */
1426         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1427         if (rc)
1428                 GOTO(out_end, rc);
1429         rc = record_lcfg(env, llh, lcfg);
1430         if (rc)
1431                 GOTO(out_end, rc);
1432         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1433         if (rc)
1434                 GOTO(out_end, rc);
1435 out_end:
1436         record_end_log(env, &llh);
1437         RETURN(rc);
1438 }
1439
1440 /* write the lcfg in all logs for the given fs */
1441 int mgs_write_log_direct_all(const struct lu_env *env,
1442                              struct mgs_device *mgs,
1443                              struct fs_db *fsdb,
1444                              struct mgs_target_info *mti,
1445                              struct lustre_cfg *lcfg,
1446                              char *devname, char *comment,
1447                              int server_only)
1448 {
1449         cfs_list_t list;
1450         struct mgs_direntry *dirent, *n;
1451         char *fsname = mti->mti_fsname;
1452         char *logname;
1453         int rc = 0, len = strlen(fsname);
1454         ENTRY;
1455
1456         /* We need to set params for any future logs
1457            as well. FIXME Append this file to every new log.
1458            Actually, we should store as params (text), not llogs.  Or
1459            in a database. */
1460         rc = name_create(&logname, fsname, "-params");
1461         if (rc)
1462                 RETURN(rc);
1463         if (mgs_log_is_empty(env, mgs, logname)) {
1464                 struct llog_handle *llh = NULL;
1465                 rc = record_start_log(env, mgs, &llh, logname);
1466                 record_end_log(env, &llh);
1467         }
1468         name_destroy(&logname);
1469         if (rc)
1470                 RETURN(rc);
1471
1472         /* Find all the logs in the CONFIGS directory */
1473         rc = class_dentry_readdir(env, mgs, &list);
1474         if (rc)
1475                 RETURN(rc);
1476
1477         /* Could use fsdb index maps instead of directory listing */
1478         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1479                 cfs_list_del(&dirent->list);
1480                 /* don't write to sptlrpc rule log */
1481                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1482                         goto next;
1483
1484                 /* caller wants write server logs only */
1485                 if (server_only && strstr(dirent->name, "-client") != NULL)
1486                         goto next;
1487
1488                 if (strncmp(fsname, dirent->name, len) == 0) {
1489                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1490                         /* Erase any old settings of this same parameter */
1491                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1492                                         devname, comment, CM_SKIP);
1493                         if (rc < 0)
1494                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1495                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1496                         /* Write the new one */
1497                         if (lcfg) {
1498                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1499                                                           dirent->name,
1500                                                           lcfg, devname,
1501                                                           comment);
1502                                 if (rc)
1503                                         CERROR("%s: writing log %s: rc = %d\n",
1504                                                mgs->mgs_obd->obd_name,
1505                                                dirent->name, rc);
1506                         }
1507                 }
1508 next:
1509                 mgs_direntry_free(dirent);
1510         }
1511
1512         RETURN(rc);
1513 }
1514
1515 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1516                                     struct mgs_device *mgs,
1517                                     struct fs_db *fsdb,
1518                                     struct mgs_target_info *mti,
1519                                     int index, char *logname);
1520 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1521                                     struct mgs_device *mgs,
1522                                     struct fs_db *fsdb,
1523                                     struct mgs_target_info *mti,
1524                                     char *logname, char *suffix, char *lovname,
1525                                     enum lustre_sec_part sec_part, int flags);
1526 static int name_create_mdt_and_lov(char **logname, char **lovname,
1527                                    struct fs_db *fsdb, int i);
1528
1529 static int add_param(char *params, char *key, char *val)
1530 {
1531         char *start = params + strlen(params);
1532         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1533         int keylen = 0;
1534
1535         if (key != NULL)
1536                 keylen = strlen(key);
1537         if (start + 1 + keylen + strlen(val) >= end) {
1538                 CERROR("params are too long: %s %s%s\n",
1539                        params, key != NULL ? key : "", val);
1540                 return -EINVAL;
1541         }
1542
1543         sprintf(start, " %s%s", key != NULL ? key : "", val);
1544         return 0;
1545 }
1546
1547 /**
1548  * Walk through client config log record and convert the related records
1549  * into the target.
1550  **/
1551 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1552                                          struct llog_handle *llh,
1553                                          struct llog_rec_hdr *rec, void *data)
1554 {
1555         struct mgs_device *mgs;
1556         struct obd_device *obd;
1557         struct mgs_target_info *mti, *tmti;
1558         struct fs_db *fsdb;
1559         int cfg_len = rec->lrh_len;
1560         char *cfg_buf = (char*) (rec + 1);
1561         struct lustre_cfg *lcfg;
1562         int rc = 0;
1563         struct llog_handle *mdt_llh = NULL;
1564         static int got_an_osc_or_mdc = 0;
1565         /* 0: not found any osc/mdc;
1566            1: found osc;
1567            2: found mdc;
1568         */
1569         static int last_step = -1;
1570         int cplen = 0;
1571
1572         ENTRY;
1573
1574         mti = ((struct temp_comp*)data)->comp_mti;
1575         tmti = ((struct temp_comp*)data)->comp_tmti;
1576         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1577         obd = ((struct temp_comp *)data)->comp_obd;
1578         mgs = lu2mgs_dev(obd->obd_lu_dev);
1579         LASSERT(mgs);
1580
1581         if (rec->lrh_type != OBD_CFG_REC) {
1582                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1583                 RETURN(-EINVAL);
1584         }
1585
1586         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1587         if (rc) {
1588                 CERROR("Insane cfg\n");
1589                 RETURN(rc);
1590         }
1591
1592         lcfg = (struct lustre_cfg *)cfg_buf;
1593
1594         if (lcfg->lcfg_command == LCFG_MARKER) {
1595                 struct cfg_marker *marker;
1596                 marker = lustre_cfg_buf(lcfg, 1);
1597                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1598                     (marker->cm_flags & CM_START) &&
1599                      !(marker->cm_flags & CM_SKIP)) {
1600                         got_an_osc_or_mdc = 1;
1601                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1602                                         sizeof(tmti->mti_svname));
1603                         if (cplen >= sizeof(tmti->mti_svname))
1604                                 RETURN(-E2BIG);
1605                         rc = record_start_log(env, mgs, &mdt_llh,
1606                                               mti->mti_svname);
1607                         if (rc)
1608                                 RETURN(rc);
1609                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1610                                            mti->mti_svname, "add osc(copied)");
1611                         record_end_log(env, &mdt_llh);
1612                         last_step = marker->cm_step;
1613                         RETURN(rc);
1614                 }
1615                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1616                     (marker->cm_flags & CM_END) &&
1617                      !(marker->cm_flags & CM_SKIP)) {
1618                         LASSERT(last_step == marker->cm_step);
1619                         last_step = -1;
1620                         got_an_osc_or_mdc = 0;
1621                         memset(tmti, 0, sizeof(*tmti));
1622                         rc = record_start_log(env, mgs, &mdt_llh,
1623                                               mti->mti_svname);
1624                         if (rc)
1625                                 RETURN(rc);
1626                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1627                                            mti->mti_svname, "add osc(copied)");
1628                         record_end_log(env, &mdt_llh);
1629                         RETURN(rc);
1630                 }
1631                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1632                     (marker->cm_flags & CM_START) &&
1633                      !(marker->cm_flags & CM_SKIP)) {
1634                         got_an_osc_or_mdc = 2;
1635                         last_step = marker->cm_step;
1636                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1637                                strlen(marker->cm_tgtname));
1638
1639                         RETURN(rc);
1640                 }
1641                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1642                     (marker->cm_flags & CM_END) &&
1643                      !(marker->cm_flags & CM_SKIP)) {
1644                         LASSERT(last_step == marker->cm_step);
1645                         last_step = -1;
1646                         got_an_osc_or_mdc = 0;
1647                         memset(tmti, 0, sizeof(*tmti));
1648                         RETURN(rc);
1649                 }
1650         }
1651
1652         if (got_an_osc_or_mdc == 0 || last_step < 0)
1653                 RETURN(rc);
1654
1655         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1656                 uint64_t nodenid = lcfg->lcfg_nid;
1657
1658                 if (strlen(tmti->mti_uuid) == 0) {
1659                         /* target uuid not set, this config record is before
1660                          * LCFG_SETUP, this nid is one of target node nid.
1661                          */
1662                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1663                         tmti->mti_nid_count++;
1664                 } else {
1665                         /* failover node nid */
1666                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1667                                        libcfs_nid2str(nodenid));
1668                 }
1669
1670                 RETURN(rc);
1671         }
1672
1673         if (lcfg->lcfg_command == LCFG_SETUP) {
1674                 char *target;
1675
1676                 target = lustre_cfg_string(lcfg, 1);
1677                 memcpy(tmti->mti_uuid, target, strlen(target));
1678                 RETURN(rc);
1679         }
1680
1681         /* ignore client side sptlrpc_conf_log */
1682         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1683                 RETURN(rc);
1684
1685         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1686                 int index;
1687
1688                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1689                         RETURN (-EINVAL);
1690
1691                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1692                        strlen(mti->mti_fsname));
1693                 tmti->mti_stripe_index = index;
1694
1695                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1696                                               mti->mti_stripe_index,
1697                                               mti->mti_svname);
1698                 memset(tmti, 0, sizeof(*tmti));
1699                 RETURN(rc);
1700         }
1701
1702         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1703                 int index;
1704                 char mdt_index[9];
1705                 char *logname, *lovname;
1706
1707                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1708                                              mti->mti_stripe_index);
1709                 if (rc)
1710                         RETURN(rc);
1711                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1712
1713                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1714                         name_destroy(&logname);
1715                         name_destroy(&lovname);
1716                         RETURN(-EINVAL);
1717                 }
1718
1719                 tmti->mti_stripe_index = index;
1720                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1721                                          mdt_index, lovname,
1722                                          LUSTRE_SP_MDT, 0);
1723                 name_destroy(&logname);
1724                 name_destroy(&lovname);
1725                 RETURN(rc);
1726         }
1727         RETURN(rc);
1728 }
1729
1730 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1731 /* stealed from mgs_get_fsdb_from_llog*/
1732 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1733                                               struct mgs_device *mgs,
1734                                               char *client_name,
1735                                               struct temp_comp* comp)
1736 {
1737         struct llog_handle *loghandle;
1738         struct mgs_target_info *tmti;
1739         struct llog_ctxt *ctxt;
1740         int rc;
1741
1742         ENTRY;
1743
1744         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1745         LASSERT(ctxt != NULL);
1746
1747         OBD_ALLOC_PTR(tmti);
1748         if (tmti == NULL)
1749                 GOTO(out_ctxt, rc = -ENOMEM);
1750
1751         comp->comp_tmti = tmti;
1752         comp->comp_obd = mgs->mgs_obd;
1753
1754         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1755                        LLOG_OPEN_EXISTS);
1756         if (rc < 0) {
1757                 if (rc == -ENOENT)
1758                         rc = 0;
1759                 GOTO(out_pop, rc);
1760         }
1761
1762         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1763         if (rc)
1764                 GOTO(out_close, rc);
1765
1766         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1767                                   (void *)comp, NULL, false);
1768         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1769 out_close:
1770         llog_close(env, loghandle);
1771 out_pop:
1772         OBD_FREE_PTR(tmti);
1773 out_ctxt:
1774         llog_ctxt_put(ctxt);
1775         RETURN(rc);
1776 }
1777
1778 /* lmv is the second thing for client logs */
1779 /* copied from mgs_write_log_lov. Please refer to that.  */
1780 static int mgs_write_log_lmv(const struct lu_env *env,
1781                              struct mgs_device *mgs,
1782                              struct fs_db *fsdb,
1783                              struct mgs_target_info *mti,
1784                              char *logname, char *lmvname)
1785 {
1786         struct llog_handle *llh = NULL;
1787         struct lmv_desc *lmvdesc;
1788         char *uuid;
1789         int rc = 0;
1790         ENTRY;
1791
1792         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1793
1794         OBD_ALLOC_PTR(lmvdesc);
1795         if (lmvdesc == NULL)
1796                 RETURN(-ENOMEM);
1797         lmvdesc->ld_active_tgt_count = 0;
1798         lmvdesc->ld_tgt_count = 0;
1799         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1800         uuid = (char *)lmvdesc->ld_uuid.uuid;
1801
1802         rc = record_start_log(env, mgs, &llh, logname);
1803         if (rc)
1804                 GOTO(out_free, rc);
1805         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1806         if (rc)
1807                 GOTO(out_end, rc);
1808         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1809         if (rc)
1810                 GOTO(out_end, rc);
1811         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1812         if (rc)
1813                 GOTO(out_end, rc);
1814         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1815         if (rc)
1816                 GOTO(out_end, rc);
1817 out_end:
1818         record_end_log(env, &llh);
1819 out_free:
1820         OBD_FREE_PTR(lmvdesc);
1821         RETURN(rc);
1822 }
1823
1824 /* lov is the first thing in the mdt and client logs */
1825 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1826                              struct fs_db *fsdb, struct mgs_target_info *mti,
1827                              char *logname, char *lovname)
1828 {
1829         struct llog_handle *llh = NULL;
1830         struct lov_desc *lovdesc;
1831         char *uuid;
1832         int rc = 0;
1833         ENTRY;
1834
1835         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1836
1837         /*
1838         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1839         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1840               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1841         */
1842
1843         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1844         OBD_ALLOC_PTR(lovdesc);
1845         if (lovdesc == NULL)
1846                 RETURN(-ENOMEM);
1847         lovdesc->ld_magic = LOV_DESC_MAGIC;
1848         lovdesc->ld_tgt_count = 0;
1849         /* Defaults.  Can be changed later by lcfg config_param */
1850         lovdesc->ld_default_stripe_count = 1;
1851         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1852         lovdesc->ld_default_stripe_size = 1024 * 1024;
1853         lovdesc->ld_default_stripe_offset = -1;
1854         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1855         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1856         /* can these be the same? */
1857         uuid = (char *)lovdesc->ld_uuid.uuid;
1858
1859         /* This should always be the first entry in a log.
1860         rc = mgs_clear_log(obd, logname); */
1861         rc = record_start_log(env, mgs, &llh, logname);
1862         if (rc)
1863                 GOTO(out_free, rc);
1864         /* FIXME these should be a single journal transaction */
1865         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1866         if (rc)
1867                 GOTO(out_end, rc);
1868         rc = record_attach(env, llh, lovname, "lov", uuid);
1869         if (rc)
1870                 GOTO(out_end, rc);
1871         rc = record_lov_setup(env, llh, lovname, lovdesc);
1872         if (rc)
1873                 GOTO(out_end, rc);
1874         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1875         if (rc)
1876                 GOTO(out_end, rc);
1877         EXIT;
1878 out_end:
1879         record_end_log(env, &llh);
1880 out_free:
1881         OBD_FREE_PTR(lovdesc);
1882         return rc;
1883 }
1884
1885 /* add failnids to open log */
1886 static int mgs_write_log_failnids(const struct lu_env *env,
1887                                   struct mgs_target_info *mti,
1888                                   struct llog_handle *llh,
1889                                   char *cliname)
1890 {
1891         char *failnodeuuid = NULL;
1892         char *ptr = mti->mti_params;
1893         lnet_nid_t nid;
1894         int rc = 0;
1895
1896         /*
1897         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1898         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1899         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1900         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1901         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1902         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1903         */
1904
1905         /* Pull failnid info out of params string */
1906         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1907                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1908                         if (failnodeuuid == NULL) {
1909                                 /* We don't know the failover node name,
1910                                    so just use the first nid as the uuid */
1911                                 rc = name_create(&failnodeuuid,
1912                                                  libcfs_nid2str(nid), "");
1913                                 if (rc)
1914                                         return rc;
1915                         }
1916                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1917                                "client %s\n", libcfs_nid2str(nid),
1918                                failnodeuuid, cliname);
1919                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1920                 }
1921                 if (failnodeuuid)
1922                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1923         }
1924
1925         name_destroy(&failnodeuuid);
1926         return rc;
1927 }
1928
1929 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1930                                     struct mgs_device *mgs,
1931                                     struct fs_db *fsdb,
1932                                     struct mgs_target_info *mti,
1933                                     char *logname, char *lmvname)
1934 {
1935         struct llog_handle *llh = NULL;
1936         char *mdcname = NULL;
1937         char *nodeuuid = NULL;
1938         char *mdcuuid = NULL;
1939         char *lmvuuid = NULL;
1940         char index[6];
1941         int i, rc;
1942         ENTRY;
1943
1944         if (mgs_log_is_empty(env, mgs, logname)) {
1945                 CERROR("log is empty! Logical error\n");
1946                 RETURN(-EINVAL);
1947         }
1948
1949         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1950                mti->mti_svname, logname, lmvname);
1951
1952         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1953         if (rc)
1954                 RETURN(rc);
1955         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1956         if (rc)
1957                 GOTO(out_free, rc);
1958         rc = name_create(&mdcuuid, mdcname, "_UUID");
1959         if (rc)
1960                 GOTO(out_free, rc);
1961         rc = name_create(&lmvuuid, lmvname, "_UUID");
1962         if (rc)
1963                 GOTO(out_free, rc);
1964
1965         rc = record_start_log(env, mgs, &llh, logname);
1966         if (rc)
1967                 GOTO(out_free, rc);
1968         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1969                            "add mdc");
1970         if (rc)
1971                 GOTO(out_end, rc);
1972         for (i = 0; i < mti->mti_nid_count; i++) {
1973                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1974                        libcfs_nid2str(mti->mti_nids[i]));
1975
1976                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1977                 if (rc)
1978                         GOTO(out_end, rc);
1979         }
1980
1981         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1982         if (rc)
1983                 GOTO(out_end, rc);
1984         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1985         if (rc)
1986                 GOTO(out_end, rc);
1987         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1988         if (rc)
1989                 GOTO(out_end, rc);
1990         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1991         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1992                             index, "1");
1993         if (rc)
1994                 GOTO(out_end, rc);
1995         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1996                            "add mdc");
1997         if (rc)
1998                 GOTO(out_end, rc);
1999 out_end:
2000         record_end_log(env, &llh);
2001 out_free:
2002         name_destroy(&lmvuuid);
2003         name_destroy(&mdcuuid);
2004         name_destroy(&mdcname);
2005         name_destroy(&nodeuuid);
2006         RETURN(rc);
2007 }
2008
2009 static inline int name_create_lov(char **lovname, char *mdtname,
2010                                   struct fs_db *fsdb, int index)
2011 {
2012         /* COMPAT_180 */
2013         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2014                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2015         else
2016                 return name_create(lovname, mdtname, "-mdtlov");
2017 }
2018
2019 static int name_create_mdt_and_lov(char **logname, char **lovname,
2020                                    struct fs_db *fsdb, int i)
2021 {
2022         int rc;
2023
2024         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2025         if (rc)
2026                 return rc;
2027         /* COMPAT_180 */
2028         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2029                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2030         else
2031                 rc = name_create(lovname, *logname, "-mdtlov");
2032         if (rc) {
2033                 name_destroy(logname);
2034                 *logname = NULL;
2035         }
2036         return rc;
2037 }
2038
2039 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2040                                       struct fs_db *fsdb, int i)
2041 {
2042         char suffix[16];
2043
2044         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2045                 sprintf(suffix, "-osc");
2046         else
2047                 sprintf(suffix, "-osc-MDT%04x", i);
2048         return name_create(oscname, ostname, suffix);
2049 }
2050
2051 /* add new mdc to already existent MDS */
2052 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2053                                     struct mgs_device *mgs,
2054                                     struct fs_db *fsdb,
2055                                     struct mgs_target_info *mti,
2056                                     int mdt_index, char *logname)
2057 {
2058         struct llog_handle      *llh = NULL;
2059         char    *nodeuuid = NULL;
2060         char    *ospname = NULL;
2061         char    *lovuuid = NULL;
2062         char    *mdtuuid = NULL;
2063         char    *svname = NULL;
2064         char    *mdtname = NULL;
2065         char    *lovname = NULL;
2066         char    index_str[16];
2067         int     i, rc;
2068
2069         ENTRY;
2070         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2071                 CERROR("log is empty! Logical error\n");
2072                 RETURN (-EINVAL);
2073         }
2074
2075         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2076                logname);
2077
2078         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2079         if (rc)
2080                 RETURN(rc);
2081
2082         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2083         if (rc)
2084                 GOTO(out_destory, rc);
2085
2086         rc = name_create(&svname, mdtname, "-osp");
2087         if (rc)
2088                 GOTO(out_destory, rc);
2089
2090         sprintf(index_str, "-MDT%04x", mdt_index);
2091         rc = name_create(&ospname, svname, index_str);
2092         if (rc)
2093                 GOTO(out_destory, rc);
2094
2095         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2096         if (rc)
2097                 GOTO(out_destory, rc);
2098
2099         rc = name_create(&lovuuid, lovname, "_UUID");
2100         if (rc)
2101                 GOTO(out_destory, rc);
2102
2103         rc = name_create(&mdtuuid, mdtname, "_UUID");
2104         if (rc)
2105                 GOTO(out_destory, rc);
2106
2107         rc = record_start_log(env, mgs, &llh, logname);
2108         if (rc)
2109                 GOTO(out_destory, rc);
2110
2111         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2112                            "add osp");
2113         if (rc)
2114                 GOTO(out_destory, rc);
2115
2116         for (i = 0; i < mti->mti_nid_count; i++) {
2117                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2118                        libcfs_nid2str(mti->mti_nids[i]));
2119                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2120                 if (rc)
2121                         GOTO(out_end, rc);
2122         }
2123
2124         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2125         if (rc)
2126                 GOTO(out_end, rc);
2127
2128         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2129                           NULL, NULL);
2130         if (rc)
2131                 GOTO(out_end, rc);
2132
2133         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2134         if (rc)
2135                 GOTO(out_end, rc);
2136
2137         /* Add mdc(osp) to lod */
2138         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2139                  mti->mti_stripe_index);
2140         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2141                          index_str, "1", NULL);
2142         if (rc)
2143                 GOTO(out_end, rc);
2144
2145         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2146         if (rc)
2147                 GOTO(out_end, rc);
2148
2149 out_end:
2150         record_end_log(env, &llh);
2151
2152 out_destory:
2153         name_destroy(&mdtuuid);
2154         name_destroy(&lovuuid);
2155         name_destroy(&lovname);
2156         name_destroy(&ospname);
2157         name_destroy(&svname);
2158         name_destroy(&nodeuuid);
2159         name_destroy(&mdtname);
2160         RETURN(rc);
2161 }
2162
2163 static int mgs_write_log_mdt0(const struct lu_env *env,
2164                               struct mgs_device *mgs,
2165                               struct fs_db *fsdb,
2166                               struct mgs_target_info *mti)
2167 {
2168         char *log = mti->mti_svname;
2169         struct llog_handle *llh = NULL;
2170         char *uuid, *lovname;
2171         char mdt_index[6];
2172         char *ptr = mti->mti_params;
2173         int rc = 0, failout = 0;
2174         ENTRY;
2175
2176         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2177         if (uuid == NULL)
2178                 RETURN(-ENOMEM);
2179
2180         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2181                 failout = (strncmp(ptr, "failout", 7) == 0);
2182
2183         rc = name_create(&lovname, log, "-mdtlov");
2184         if (rc)
2185                 GOTO(out_free, rc);
2186         if (mgs_log_is_empty(env, mgs, log)) {
2187                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2188                 if (rc)
2189                         GOTO(out_lod, rc);
2190         }
2191
2192         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2193
2194         rc = record_start_log(env, mgs, &llh, log);
2195         if (rc)
2196                 GOTO(out_lod, rc);
2197
2198         /* add MDT itself */
2199
2200         /* FIXME this whole fn should be a single journal transaction */
2201         sprintf(uuid, "%s_UUID", log);
2202         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2203         if (rc)
2204                 GOTO(out_lod, rc);
2205         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2206         if (rc)
2207                 GOTO(out_end, rc);
2208         rc = record_mount_opt(env, llh, log, lovname, NULL);
2209         if (rc)
2210                 GOTO(out_end, rc);
2211         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2212                         failout ? "n" : "f");
2213         if (rc)
2214                 GOTO(out_end, rc);
2215         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2216         if (rc)
2217                 GOTO(out_end, rc);
2218 out_end:
2219         record_end_log(env, &llh);
2220 out_lod:
2221         name_destroy(&lovname);
2222 out_free:
2223         OBD_FREE(uuid, sizeof(struct obd_uuid));
2224         RETURN(rc);
2225 }
2226
2227 /* envelope method for all layers log */
2228 static int mgs_write_log_mdt(const struct lu_env *env,
2229                              struct mgs_device *mgs,
2230                              struct fs_db *fsdb,
2231                              struct mgs_target_info *mti)
2232 {
2233         struct mgs_thread_info *mgi = mgs_env_info(env);
2234         struct llog_handle *llh = NULL;
2235         char *cliname;
2236         int rc, i = 0;
2237         ENTRY;
2238
2239         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2240
2241         if (mti->mti_uuid[0] == '\0') {
2242                 /* Make up our own uuid */
2243                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2244                          "%s_UUID", mti->mti_svname);
2245         }
2246
2247         /* add mdt */
2248         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2249         if (rc)
2250                 RETURN(rc);
2251         /* Append the mdt info to the client log */
2252         rc = name_create(&cliname, mti->mti_fsname, "-client");
2253         if (rc)
2254                 RETURN(rc);
2255
2256         if (mgs_log_is_empty(env, mgs, cliname)) {
2257                 /* Start client log */
2258                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2259                                        fsdb->fsdb_clilov);
2260                 if (rc)
2261                         GOTO(out_free, rc);
2262                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2263                                        fsdb->fsdb_clilmv);
2264                 if (rc)
2265                         GOTO(out_free, rc);
2266         }
2267
2268         /*
2269         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2270         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2271         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2272         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2273         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2274         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2275         */
2276
2277                 /* copy client info about lov/lmv */
2278                 mgi->mgi_comp.comp_mti = mti;
2279                 mgi->mgi_comp.comp_fsdb = fsdb;
2280
2281                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2282                                                         &mgi->mgi_comp);
2283                 if (rc)
2284                         GOTO(out_free, rc);
2285                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2286                                               fsdb->fsdb_clilmv);
2287                 if (rc)
2288                         GOTO(out_free, rc);
2289
2290                 /* add mountopts */
2291                 rc = record_start_log(env, mgs, &llh, cliname);
2292                 if (rc)
2293                         GOTO(out_free, rc);
2294
2295                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2296                                    "mount opts");
2297                 if (rc)
2298                         GOTO(out_end, rc);
2299                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2300                                       fsdb->fsdb_clilmv);
2301                 if (rc)
2302                         GOTO(out_end, rc);
2303                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2304                                    "mount opts");
2305
2306         if (rc)
2307                 GOTO(out_end, rc);
2308
2309         /* for_all_existing_mdt except current one */
2310         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2311                 if (i !=  mti->mti_stripe_index &&
2312                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2313                         char *logname;
2314
2315                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2316                         if (rc)
2317                                 GOTO(out_end, rc);
2318
2319                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2320                                                       i, logname);
2321                         name_destroy(&logname);
2322                         if (rc)
2323                                 GOTO(out_end, rc);
2324                 }
2325         }
2326 out_end:
2327         record_end_log(env, &llh);
2328 out_free:
2329         name_destroy(&cliname);
2330         RETURN(rc);
2331 }
2332
2333 /* Add the ost info to the client/mdt lov */
2334 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2335                                     struct mgs_device *mgs, struct fs_db *fsdb,
2336                                     struct mgs_target_info *mti,
2337                                     char *logname, char *suffix, char *lovname,
2338                                     enum lustre_sec_part sec_part, int flags)
2339 {
2340         struct llog_handle *llh = NULL;
2341         char *nodeuuid = NULL;
2342         char *oscname = NULL;
2343         char *oscuuid = NULL;
2344         char *lovuuid = NULL;
2345         char *svname = NULL;
2346         char index[6];
2347         int i, rc;
2348
2349         ENTRY;
2350         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2351                mti->mti_svname, logname);
2352
2353         if (mgs_log_is_empty(env, mgs, logname)) {
2354                 CERROR("log is empty! Logical error\n");
2355                 RETURN (-EINVAL);
2356         }
2357
2358         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2359         if (rc)
2360                 RETURN(rc);
2361         rc = name_create(&svname, mti->mti_svname, "-osc");
2362         if (rc)
2363                 GOTO(out_free, rc);
2364
2365         /* for the system upgraded from old 1.8, keep using the old osc naming
2366          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2367         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2368                 rc = name_create(&oscname, svname, "");
2369         else
2370                 rc = name_create(&oscname, svname, suffix);
2371         if (rc)
2372                 GOTO(out_free, rc);
2373
2374         rc = name_create(&oscuuid, oscname, "_UUID");
2375         if (rc)
2376                 GOTO(out_free, rc);
2377         rc = name_create(&lovuuid, lovname, "_UUID");
2378         if (rc)
2379                 GOTO(out_free, rc);
2380
2381
2382         /*
2383         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2384         multihomed (#4)
2385         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2386         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2387         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2388         failover (#6,7)
2389         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2390         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2391         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2392         */
2393
2394         rc = record_start_log(env, mgs, &llh, logname);
2395         if (rc)
2396                 GOTO(out_free, rc);
2397
2398         /* FIXME these should be a single journal transaction */
2399         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2400                            "add osc");
2401         if (rc)
2402                 GOTO(out_end, rc);
2403
2404         /* NB: don't change record order, because upon MDT steal OSC config
2405          * from client, it treats all nids before LCFG_SETUP as target nids
2406          * (multiple interfaces), while nids after as failover node nids.
2407          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2408          */
2409         for (i = 0; i < mti->mti_nid_count; i++) {
2410                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2411                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2412                 if (rc)
2413                         GOTO(out_end, rc);
2414         }
2415         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2416         if (rc)
2417                 GOTO(out_end, rc);
2418         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2419         if (rc)
2420                 GOTO(out_end, rc);
2421         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2422         if (rc)
2423                 GOTO(out_end, rc);
2424
2425         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2426
2427         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2428         if (rc)
2429                 GOTO(out_end, rc);
2430         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2431                            "add osc");
2432         if (rc)
2433                 GOTO(out_end, rc);
2434 out_end:
2435         record_end_log(env, &llh);
2436 out_free:
2437         name_destroy(&lovuuid);
2438         name_destroy(&oscuuid);
2439         name_destroy(&oscname);
2440         name_destroy(&svname);
2441         name_destroy(&nodeuuid);
2442         RETURN(rc);
2443 }
2444
2445 static int mgs_write_log_ost(const struct lu_env *env,
2446                              struct mgs_device *mgs, struct fs_db *fsdb,
2447                              struct mgs_target_info *mti)
2448 {
2449         struct llog_handle *llh = NULL;
2450         char *logname, *lovname;
2451         char *ptr = mti->mti_params;
2452         int rc, flags = 0, failout = 0, i;
2453         ENTRY;
2454
2455         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2456
2457         /* The ost startup log */
2458
2459         /* If the ost log already exists, that means that someone reformatted
2460            the ost and it called target_add again. */
2461         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2462                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2463                                    "exists, yet the server claims it never "
2464                                    "registered. It may have been reformatted, "
2465                                    "or the index changed. writeconf the MDT to "
2466                                    "regenerate all logs.\n", mti->mti_svname);
2467                 RETURN(-EALREADY);
2468         }
2469
2470         /*
2471         attach obdfilter ost1 ost1_UUID
2472         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2473         */
2474         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2475                 failout = (strncmp(ptr, "failout", 7) == 0);
2476         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2477         if (rc)
2478                 RETURN(rc);
2479         /* FIXME these should be a single journal transaction */
2480         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2481         if (rc)
2482                 GOTO(out_end, rc);
2483         if (*mti->mti_uuid == '\0')
2484                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2485                          "%s_UUID", mti->mti_svname);
2486         rc = record_attach(env, llh, mti->mti_svname,
2487                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2488         if (rc)
2489                 GOTO(out_end, rc);
2490         rc = record_setup(env, llh, mti->mti_svname,
2491                           "dev"/*ignored*/, "type"/*ignored*/,
2492                           failout ? "n" : "f", 0/*options*/);
2493         if (rc)
2494                 GOTO(out_end, rc);
2495         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2496         if (rc)
2497                 GOTO(out_end, rc);
2498 out_end:
2499         record_end_log(env, &llh);
2500         if (rc)
2501                 RETURN(rc);
2502         /* We also have to update the other logs where this osc is part of
2503            the lov */
2504
2505         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2506                 /* If we're upgrading, the old mdt log already has our
2507                    entry. Let's do a fake one for fun. */
2508                 /* Note that we can't add any new failnids, since we don't
2509                    know the old osc names. */
2510                 flags = CM_SKIP | CM_UPGRADE146;
2511
2512         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2513                 /* If the update flag isn't set, don't update client/mdt
2514                    logs. */
2515                 flags |= CM_SKIP;
2516                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2517                               "the MDT first to regenerate it.\n",
2518                               mti->mti_svname);
2519         }
2520
2521         /* Add ost to all MDT lov defs */
2522         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2523                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2524                         char mdt_index[9];
2525
2526                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2527                                                      i);
2528                         if (rc)
2529                                 RETURN(rc);
2530                         sprintf(mdt_index, "-MDT%04x", i);
2531                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2532                                                       logname, mdt_index,
2533                                                       lovname, LUSTRE_SP_MDT,
2534                                                       flags);
2535                         name_destroy(&logname);
2536                         name_destroy(&lovname);
2537                         if (rc)
2538                                 RETURN(rc);
2539                 }
2540         }
2541
2542         /* Append ost info to the client log */
2543         rc = name_create(&logname, mti->mti_fsname, "-client");
2544         if (rc)
2545                 RETURN(rc);
2546         if (mgs_log_is_empty(env, mgs, logname)) {
2547                 /* Start client log */
2548                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2549                                        fsdb->fsdb_clilov);
2550                 if (rc)
2551                         GOTO(out_free, rc);
2552                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2553                                        fsdb->fsdb_clilmv);
2554                 if (rc)
2555                         GOTO(out_free, rc);
2556         }
2557         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2558                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2559 out_free:
2560         name_destroy(&logname);
2561         RETURN(rc);
2562 }
2563
2564 static __inline__ int mgs_param_empty(char *ptr)
2565 {
2566         char *tmp;
2567
2568         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2569                 return 1;
2570         return 0;
2571 }
2572
2573 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2574                                           struct mgs_device *mgs,
2575                                           struct fs_db *fsdb,
2576                                           struct mgs_target_info *mti,
2577                                           char *logname, char *cliname)
2578 {
2579         int rc;
2580         struct llog_handle *llh = NULL;
2581
2582         if (mgs_param_empty(mti->mti_params)) {
2583                 /* Remove _all_ failnids */
2584                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2585                                 mti->mti_svname, "add failnid", CM_SKIP);
2586                 return rc < 0 ? rc : 0;
2587         }
2588
2589         /* Otherwise failover nids are additive */
2590         rc = record_start_log(env, mgs, &llh, logname);
2591         if (rc)
2592                 return rc;
2593                 /* FIXME this should be a single journal transaction */
2594         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2595                            "add failnid");
2596         if (rc)
2597                 goto out_end;
2598         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2599         if (rc)
2600                 goto out_end;
2601         rc = record_marker(env, llh, fsdb, CM_END,
2602                            mti->mti_svname, "add failnid");
2603 out_end:
2604         record_end_log(env, &llh);
2605         return rc;
2606 }
2607
2608
2609 /* Add additional failnids to an existing log.
2610    The mdc/osc must have been added to logs first */
2611 /* tcp nids must be in dotted-quad ascii -
2612    we can't resolve hostnames from the kernel. */
2613 static int mgs_write_log_add_failnid(const struct lu_env *env,
2614                                      struct mgs_device *mgs,
2615                                      struct fs_db *fsdb,
2616                                      struct mgs_target_info *mti)
2617 {
2618         char *logname, *cliname;
2619         int rc;
2620         ENTRY;
2621
2622         /* FIXME we currently can't erase the failnids
2623          * given when a target first registers, since they aren't part of
2624          * an "add uuid" stanza */
2625
2626         /* Verify that we know about this target */
2627         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2628                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2629                                    "yet. It must be started before failnids "
2630                                    "can be added.\n", mti->mti_svname);
2631                 RETURN(-ENOENT);
2632         }
2633
2634         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2635         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2636                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2637         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2638                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2639         } else {
2640                 RETURN(-EINVAL);
2641         }
2642         if (rc)
2643                 RETURN(rc);
2644         /* Add failover nids to the client log */
2645         rc = name_create(&logname, mti->mti_fsname, "-client");
2646         if (rc) {
2647                 name_destroy(&cliname);
2648                 RETURN(rc);
2649         }
2650         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2651         name_destroy(&logname);
2652         name_destroy(&cliname);
2653         if (rc)
2654                 RETURN(rc);
2655
2656         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2657                 /* Add OST failover nids to the MDT logs as well */
2658                 int i;
2659
2660                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2661                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2662                                 continue;
2663                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2664                         if (rc)
2665                                 RETURN(rc);
2666                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2667                                                  fsdb, i);
2668                         if (rc) {
2669                                 name_destroy(&logname);
2670                                 RETURN(rc);
2671                         }
2672                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2673                                                             mti, logname,
2674                                                             cliname);
2675                         name_destroy(&cliname);
2676                         name_destroy(&logname);
2677                         if (rc)
2678                                 RETURN(rc);
2679                 }
2680         }
2681
2682         RETURN(rc);
2683 }
2684
2685 static int mgs_wlp_lcfg(const struct lu_env *env,
2686                         struct mgs_device *mgs, struct fs_db *fsdb,
2687                         struct mgs_target_info *mti,
2688                         char *logname, struct lustre_cfg_bufs *bufs,
2689                         char *tgtname, char *ptr)
2690 {
2691         char comment[MTI_NAME_MAXLEN];
2692         char *tmp;
2693         struct lustre_cfg *lcfg;
2694         int rc, del;
2695
2696         /* Erase any old settings of this same parameter */
2697         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2698         comment[MTI_NAME_MAXLEN - 1] = 0;
2699         /* But don't try to match the value. */
2700         if ((tmp = strchr(comment, '=')))
2701             *tmp = 0;
2702         /* FIXME we should skip settings that are the same as old values */
2703         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2704         if (rc < 0)
2705                 return rc;
2706         del = mgs_param_empty(ptr);
2707
2708         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2709                       "Sett" : "Modify", tgtname, comment, logname);
2710         if (del)
2711                 return rc;
2712
2713         lustre_cfg_bufs_reset(bufs, tgtname);
2714         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2715         lcfg = lustre_cfg_new(LCFG_PARAM, bufs);
2716         if (!lcfg)
2717                 return -ENOMEM;
2718         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2719         lustre_cfg_free(lcfg);
2720         return rc;
2721 }
2722
2723 /* write global variable settings into log */
2724 static int mgs_write_log_sys(const struct lu_env *env,
2725                              struct mgs_device *mgs, struct fs_db *fsdb,
2726                              struct mgs_target_info *mti, char *sys, char *ptr)
2727 {
2728         struct mgs_thread_info *mgi = mgs_env_info(env);
2729         struct lustre_cfg *lcfg;
2730         char *tmp, sep;
2731         int rc, cmd, convert = 1;
2732
2733         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2734                 cmd = LCFG_SET_TIMEOUT;
2735         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2736                 cmd = LCFG_SET_LDLM_TIMEOUT;
2737         /* Check for known params here so we can return error to lctl */
2738         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2739                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2740                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2741                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2742                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2743                 cmd = LCFG_PARAM;
2744         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2745                 convert = 0; /* Don't convert string value to integer */
2746                 cmd = LCFG_PARAM;
2747         } else {
2748                 return -EINVAL;
2749         }
2750
2751         if (mgs_param_empty(ptr))
2752                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2753         else
2754                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2755
2756         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2757         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2758         if (!convert && *tmp != '\0')
2759                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2760         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2761         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2762         /* truncate the comment to the parameter name */
2763         ptr = tmp - 1;
2764         sep = *ptr;
2765         *ptr = '\0';
2766         /* modify all servers and clients */
2767         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2768                                       *tmp == '\0' ? NULL : lcfg,
2769                                       mti->mti_fsname, sys, 0);
2770         if (rc == 0 && *tmp != '\0') {
2771                 switch (cmd) {
2772                 case LCFG_SET_TIMEOUT:
2773                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2774                                 class_process_config(lcfg);
2775                         break;
2776                 case LCFG_SET_LDLM_TIMEOUT:
2777                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2778                                 class_process_config(lcfg);
2779                         break;
2780                 default:
2781                         break;
2782                 }
2783         }
2784         *ptr = sep;
2785         lustre_cfg_free(lcfg);
2786         return rc;
2787 }
2788
2789 /* write quota settings into log */
2790 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2791                                struct fs_db *fsdb, struct mgs_target_info *mti,
2792                                char *quota, char *ptr)
2793 {
2794         struct mgs_thread_info  *mgi = mgs_env_info(env);
2795         struct lustre_cfg       *lcfg;
2796         char                    *tmp;
2797         char                     sep;
2798         int                      rc, cmd = LCFG_PARAM;
2799
2800         /* support only 'meta' and 'data' pools so far */
2801         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2802             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2803                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2804                        "& quota.ost are)\n", ptr);
2805                 return -EINVAL;
2806         }
2807
2808         if (*tmp == '\0') {
2809                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2810         } else {
2811                 CDEBUG(D_MGS, "global '%s'\n", quota);
2812
2813                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2814                     strcmp(tmp, "none") != 0) {
2815                         CERROR("enable option(%s) isn't supported\n", tmp);
2816                         return -EINVAL;
2817                 }
2818         }
2819
2820         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2821         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2822         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2823         /* truncate the comment to the parameter name */
2824         ptr = tmp - 1;
2825         sep = *ptr;
2826         *ptr = '\0';
2827
2828         /* XXX we duplicated quota enable information in all server
2829          *     config logs, it should be moved to a separate config
2830          *     log once we cleanup the config log for global param. */
2831         /* modify all servers */
2832         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2833                                       *tmp == '\0' ? NULL : lcfg,
2834                                       mti->mti_fsname, quota, 1);
2835         *ptr = sep;
2836         lustre_cfg_free(lcfg);
2837         return rc < 0 ? rc : 0;
2838 }
2839
2840 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2841                                    struct mgs_device *mgs,
2842                                    struct fs_db *fsdb,
2843                                    struct mgs_target_info *mti,
2844                                    char *param)
2845 {
2846         struct mgs_thread_info *mgi = mgs_env_info(env);
2847         struct llog_handle     *llh = NULL;
2848         char                   *logname;
2849         char                   *comment, *ptr;
2850         struct lustre_cfg      *lcfg;
2851         int                     rc, len;
2852         ENTRY;
2853
2854         /* get comment */
2855         ptr = strchr(param, '=');
2856         LASSERT(ptr);
2857         len = ptr - param;
2858
2859         OBD_ALLOC(comment, len + 1);
2860         if (comment == NULL)
2861                 RETURN(-ENOMEM);
2862         strncpy(comment, param, len);
2863         comment[len] = '\0';
2864
2865         /* prepare lcfg */
2866         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2867         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2868         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2869         if (lcfg == NULL)
2870                 GOTO(out_comment, rc = -ENOMEM);
2871
2872         /* construct log name */
2873         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2874         if (rc)
2875                 GOTO(out_lcfg, rc);
2876
2877         if (mgs_log_is_empty(env, mgs, logname)) {
2878                 rc = record_start_log(env, mgs, &llh, logname);
2879                 if (rc)
2880                         GOTO(out, rc);
2881                 record_end_log(env, &llh);
2882         }
2883
2884         /* obsolete old one */
2885         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2886                         comment, CM_SKIP);
2887         if (rc < 0)
2888                 GOTO(out, rc);
2889         /* write the new one */
2890         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2891                                   mti->mti_svname, comment);
2892         if (rc)
2893                 CERROR("err %d writing log %s\n", rc, logname);
2894 out:
2895         name_destroy(&logname);
2896 out_lcfg:
2897         lustre_cfg_free(lcfg);
2898 out_comment:
2899         OBD_FREE(comment, len + 1);
2900         RETURN(rc);
2901 }
2902
2903 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2904                                         char *param)
2905 {
2906         char    *ptr;
2907
2908         /* disable the adjustable udesc parameter for now, i.e. use default
2909          * setting that client always ship udesc to MDT if possible. to enable
2910          * it simply remove the following line */
2911         goto error_out;
2912
2913         ptr = strchr(param, '=');
2914         if (ptr == NULL)
2915                 goto error_out;
2916         *ptr++ = '\0';
2917
2918         if (strcmp(param, PARAM_SRPC_UDESC))
2919                 goto error_out;
2920
2921         if (strcmp(ptr, "yes") == 0) {
2922                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2923                 CWARN("Enable user descriptor shipping from client to MDT\n");
2924         } else if (strcmp(ptr, "no") == 0) {
2925                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2926                 CWARN("Disable user descriptor shipping from client to MDT\n");
2927         } else {
2928                 *(ptr - 1) = '=';
2929                 goto error_out;
2930         }
2931         return 0;
2932
2933 error_out:
2934         CERROR("Invalid param: %s\n", param);
2935         return -EINVAL;
2936 }
2937
2938 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2939                                   const char *svname,
2940                                   char *param)
2941 {
2942         struct sptlrpc_rule      rule;
2943         struct sptlrpc_rule_set *rset;
2944         int                      rc;
2945         ENTRY;
2946
2947         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2948                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2949                 RETURN(-EINVAL);
2950         }
2951
2952         if (strncmp(param, PARAM_SRPC_UDESC,
2953                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2954                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2955         }
2956
2957         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2958                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2959                 RETURN(-EINVAL);
2960         }
2961
2962         param += sizeof(PARAM_SRPC_FLVR) - 1;
2963
2964         rc = sptlrpc_parse_rule(param, &rule);
2965         if (rc)
2966                 RETURN(rc);
2967
2968         /* mgs rules implies must be mgc->mgs */
2969         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2970                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2971                      rule.sr_from != LUSTRE_SP_ANY) ||
2972                     (rule.sr_to != LUSTRE_SP_MGS &&
2973                      rule.sr_to != LUSTRE_SP_ANY))
2974                         RETURN(-EINVAL);
2975         }
2976
2977         /* preapre room for this coming rule. svcname format should be:
2978          * - fsname: general rule
2979          * - fsname-tgtname: target-specific rule
2980          */
2981         if (strchr(svname, '-')) {
2982                 struct mgs_tgt_srpc_conf *tgtconf;
2983                 int                       found = 0;
2984
2985                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
2986                      tgtconf = tgtconf->mtsc_next) {
2987                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
2988                                 found = 1;
2989                                 break;
2990                         }
2991                 }
2992
2993                 if (!found) {
2994                         int name_len;
2995
2996                         OBD_ALLOC_PTR(tgtconf);
2997                         if (tgtconf == NULL)
2998                                 RETURN(-ENOMEM);
2999
3000                         name_len = strlen(svname);
3001
3002                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3003                         if (tgtconf->mtsc_tgt == NULL) {
3004                                 OBD_FREE_PTR(tgtconf);
3005                                 RETURN(-ENOMEM);
3006                         }
3007                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3008
3009                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3010                         fsdb->fsdb_srpc_tgt = tgtconf;
3011                 }
3012
3013                 rset = &tgtconf->mtsc_rset;
3014         } else {
3015                 rset = &fsdb->fsdb_srpc_gen;
3016         }
3017
3018         rc = sptlrpc_rule_set_merge(rset, &rule);
3019
3020         RETURN(rc);
3021 }
3022
3023 static int mgs_srpc_set_param(const struct lu_env *env,
3024                               struct mgs_device *mgs,
3025                               struct fs_db *fsdb,
3026                               struct mgs_target_info *mti,
3027                               char *param)
3028 {
3029         char                   *copy;
3030         int                     rc, copy_size;
3031         ENTRY;
3032
3033 #ifndef HAVE_GSS
3034         RETURN(-EINVAL);
3035 #endif
3036         /* keep a copy of original param, which could be destroied
3037          * during parsing */
3038         copy_size = strlen(param) + 1;
3039         OBD_ALLOC(copy, copy_size);
3040         if (copy == NULL)
3041                 return -ENOMEM;
3042         memcpy(copy, param, copy_size);
3043
3044         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3045         if (rc)
3046                 goto out_free;
3047
3048         /* previous steps guaranteed the syntax is correct */
3049         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3050         if (rc)
3051                 goto out_free;
3052
3053         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3054                 /*
3055                  * for mgs rules, make them effective immediately.
3056                  */
3057                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3058                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3059                                                  &fsdb->fsdb_srpc_gen);
3060         }
3061
3062 out_free:
3063         OBD_FREE(copy, copy_size);
3064         RETURN(rc);
3065 }
3066
3067 struct mgs_srpc_read_data {
3068         struct fs_db   *msrd_fsdb;
3069         int             msrd_skip;
3070 };
3071
3072 static int mgs_srpc_read_handler(const struct lu_env *env,
3073                                  struct llog_handle *llh,
3074                                  struct llog_rec_hdr *rec, void *data)
3075 {
3076         struct mgs_srpc_read_data *msrd = data;
3077         struct cfg_marker         *marker;
3078         struct lustre_cfg         *lcfg = REC_DATA(rec);
3079         char                      *svname, *param;
3080         int                        cfg_len, rc;
3081         ENTRY;
3082
3083         if (rec->lrh_type != OBD_CFG_REC) {
3084                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3085                 RETURN(-EINVAL);
3086         }
3087
3088         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3089                   sizeof(struct llog_rec_tail);
3090
3091         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3092         if (rc) {
3093                 CERROR("Insane cfg\n");
3094                 RETURN(rc);
3095         }
3096
3097         if (lcfg->lcfg_command == LCFG_MARKER) {
3098                 marker = lustre_cfg_buf(lcfg, 1);
3099
3100                 if (marker->cm_flags & CM_START &&
3101                     marker->cm_flags & CM_SKIP)
3102                         msrd->msrd_skip = 1;
3103                 if (marker->cm_flags & CM_END)
3104                         msrd->msrd_skip = 0;
3105
3106                 RETURN(0);
3107         }
3108
3109         if (msrd->msrd_skip)
3110                 RETURN(0);
3111
3112         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3113                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3114                 RETURN(0);
3115         }
3116
3117         svname = lustre_cfg_string(lcfg, 0);
3118         if (svname == NULL) {
3119                 CERROR("svname is empty\n");
3120                 RETURN(0);
3121         }
3122
3123         param = lustre_cfg_string(lcfg, 1);
3124         if (param == NULL) {
3125                 CERROR("param is empty\n");
3126                 RETURN(0);
3127         }
3128
3129         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3130         if (rc)
3131                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3132
3133         RETURN(0);
3134 }
3135
3136 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3137                                 struct mgs_device *mgs,
3138                                 struct fs_db *fsdb)
3139 {
3140         struct llog_handle        *llh = NULL;
3141         struct llog_ctxt          *ctxt;
3142         char                      *logname;
3143         struct mgs_srpc_read_data  msrd;
3144         int                        rc;
3145         ENTRY;
3146
3147         /* construct log name */
3148         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3149         if (rc)
3150                 RETURN(rc);
3151
3152         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3153         LASSERT(ctxt != NULL);
3154
3155         if (mgs_log_is_empty(env, mgs, logname))
3156                 GOTO(out, rc = 0);
3157
3158         rc = llog_open(env, ctxt, &llh, NULL, logname,
3159                        LLOG_OPEN_EXISTS);
3160         if (rc < 0) {
3161                 if (rc == -ENOENT)
3162                         rc = 0;
3163                 GOTO(out, rc);
3164         }
3165
3166         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3167         if (rc)
3168                 GOTO(out_close, rc);
3169
3170         if (llog_get_size(llh) <= 1)
3171                 GOTO(out_close, rc = 0);
3172
3173         msrd.msrd_fsdb = fsdb;
3174         msrd.msrd_skip = 0;
3175
3176         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3177                           NULL);
3178
3179 out_close:
3180         llog_close(env, llh);
3181 out:
3182         llog_ctxt_put(ctxt);
3183         name_destroy(&logname);
3184
3185         if (rc)
3186                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3187         RETURN(rc);
3188 }
3189
3190 /* Permanent settings of all parameters by writing into the appropriate
3191  * configuration logs.
3192  * A parameter with null value ("<param>='\0'") means to erase it out of
3193  * the logs.
3194  */
3195 static int mgs_write_log_param(const struct lu_env *env,
3196                                struct mgs_device *mgs, struct fs_db *fsdb,
3197                                struct mgs_target_info *mti, char *ptr)
3198 {
3199         struct mgs_thread_info *mgi = mgs_env_info(env);
3200         char *logname;
3201         char *tmp;
3202         int rc = 0, rc2 = 0;
3203         ENTRY;
3204
3205         /* For various parameter settings, we have to figure out which logs
3206            care about them (e.g. both mdt and client for lov settings) */
3207         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3208
3209         /* The params are stored in MOUNT_DATA_FILE and modified via
3210            tunefs.lustre, or set using lctl conf_param */
3211
3212         /* Processed in lustre_start_mgc */
3213         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3214                 GOTO(end, rc);
3215
3216         /* Processed in ost/mdt */
3217         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3218                 GOTO(end, rc);
3219
3220         /* Processed in mgs_write_log_ost */
3221         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3222                 if (mti->mti_flags & LDD_F_PARAM) {
3223                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3224                                            "changed with tunefs.lustre"
3225                                            "and --writeconf\n", ptr);
3226                         rc = -EPERM;
3227                 }
3228                 GOTO(end, rc);
3229         }
3230
3231         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3232                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3233                 GOTO(end, rc);
3234         }
3235
3236         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3237                 /* Add a failover nidlist */
3238                 rc = 0;
3239                 /* We already processed failovers params for new
3240                    targets in mgs_write_log_target */
3241                 if (mti->mti_flags & LDD_F_PARAM) {
3242                         CDEBUG(D_MGS, "Adding failnode\n");
3243                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3244                 }
3245                 GOTO(end, rc);
3246         }
3247
3248         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3249                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3250                 GOTO(end, rc);
3251         }
3252
3253         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3254                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3255                 GOTO(end, rc);