Whamcloud - gitweb
LU-3610 ptlrpc: Fix argument names of mgs_write_log_pool()
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         LASSERT(dir);
71         LASSERT(dir->do_index_ops);
72
73         iops = &dir->do_index_ops->dio_it;
74         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
75         if (IS_ERR(it))
76                 RETURN(PTR_ERR(it));
77
78         rc = iops->load(env, it, 0);
79         if (rc <= 0)
80                 GOTO(fini, rc = 0);
81
82         /* main cycle */
83         do {
84                 key = (void *)iops->key(env, it);
85                 if (IS_ERR(key)) {
86                         CERROR("%s: key failed when listing %s: rc = %d\n",
87                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
88                                (int) PTR_ERR(key));
89                         goto next;
90                 }
91                 key_sz = iops->key_size(env, it);
92                 LASSERT(key_sz > 0);
93
94                 /* filter out "." and ".." entries */
95                 if (key[0] == '.') {
96                         if (key_sz == 1)
97                                 goto next;
98                         if (key_sz == 2 && key[1] == '.')
99                                 goto next;
100                 }
101
102                 de = mgs_direntry_alloc(key_sz + 1);
103                 if (de == NULL) {
104                         rc = -ENOMEM;
105                         break;
106                 }
107
108                 memcpy(de->name, key, key_sz);
109                 de->name[key_sz] = 0;
110
111                 cfs_list_add(&de->list, list);
112
113 next:
114                 rc = iops->next(env, it);
115         } while (rc == 0);
116         rc = 0;
117
118         iops->put(env, it);
119
120 fini:
121         iops->fini(env, it);
122         if (rc)
123                 CERROR("%s: key failed when listing %s: rc = %d\n",
124                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
125         RETURN(rc);
126 }
127
128 /******************** DB functions *********************/
129
130 static inline int name_create(char **newname, char *prefix, char *suffix)
131 {
132         LASSERT(newname);
133         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
134         if (!*newname)
135                 return -ENOMEM;
136         sprintf(*newname, "%s%s", prefix, suffix);
137         return 0;
138 }
139
140 static inline void name_destroy(char **name)
141 {
142         if (*name)
143                 OBD_FREE(*name, strlen(*name) + 1);
144         *name = NULL;
145 }
146
147 struct mgs_fsdb_handler_data
148 {
149         struct fs_db   *fsdb;
150         __u32           ver;
151 };
152
153 /* from the (client) config log, figure out:
154         1. which ost's/mdt's are configured (by index)
155         2. what the last config step is
156         3. COMPAT_18 osc name
157 */
158 /* It might be better to have a separate db file, instead of parsing the info
159    out of the client log.  This is slow and potentially error-prone. */
160 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
161                             struct llog_rec_hdr *rec, void *data)
162 {
163         struct mgs_fsdb_handler_data *d = data;
164         struct fs_db *fsdb = d->fsdb;
165         int cfg_len = rec->lrh_len;
166         char *cfg_buf = (char*) (rec + 1);
167         struct lustre_cfg *lcfg;
168         __u32 index;
169         int rc = 0;
170         ENTRY;
171
172         if (rec->lrh_type != OBD_CFG_REC) {
173                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
174                 RETURN(-EINVAL);
175         }
176
177         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
178         if (rc) {
179                 CERROR("Insane cfg\n");
180                 RETURN(rc);
181         }
182
183         lcfg = (struct lustre_cfg *)cfg_buf;
184
185         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
186                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
187
188         /* Figure out ost indicies */
189         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
190         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
191             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
192                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
193                                        NULL, 10);
194                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
195                        lustre_cfg_string(lcfg, 1), index,
196                        lustre_cfg_string(lcfg, 2));
197                 set_bit(index, fsdb->fsdb_ost_index_map);
198         }
199
200         /* Figure out mdt indicies */
201         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
202         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
203             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
204                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
205                                        &index, NULL);
206                 if (rc != LDD_F_SV_TYPE_MDT) {
207                         CWARN("Unparsable MDC name %s, assuming index 0\n",
208                               lustre_cfg_string(lcfg, 0));
209                         index = 0;
210                 }
211                 rc = 0;
212                 CDEBUG(D_MGS, "MDT index is %u\n", index);
213                 set_bit(index, fsdb->fsdb_mdt_index_map);
214                 fsdb->fsdb_mdt_count ++;
215         }
216
217         /**
218          * figure out the old config. fsdb_gen = 0 means old log
219          * It is obsoleted and not supported anymore
220          */
221         if (fsdb->fsdb_gen == 0) {
222                 CERROR("Old config format is not supported\n");
223                 RETURN(-EINVAL);
224         }
225
226         /*
227          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
228          */
229         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
230             lcfg->lcfg_command == LCFG_ATTACH &&
231             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
232                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
233                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
234                         CWARN("MDT using 1.8 OSC name scheme\n");
235                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
236                 }
237         }
238
239         if (lcfg->lcfg_command == LCFG_MARKER) {
240                 struct cfg_marker *marker;
241                 marker = lustre_cfg_buf(lcfg, 1);
242
243                 d->ver = marker->cm_vers;
244
245                 /* Keep track of the latest marker step */
246                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
247         }
248
249         RETURN(rc);
250 }
251
252 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
253 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
254                                   struct mgs_device *mgs,
255                                   struct fs_db *fsdb)
256 {
257         char                            *logname;
258         struct llog_handle              *loghandle;
259         struct llog_ctxt                *ctxt;
260         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
261         int rc;
262
263         ENTRY;
264
265         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
266         LASSERT(ctxt != NULL);
267         rc = name_create(&logname, fsdb->fsdb_name, "-client");
268         if (rc)
269                 GOTO(out_put, rc);
270         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
271         if (rc)
272                 GOTO(out_pop, rc);
273
274         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
275         if (rc)
276                 GOTO(out_close, rc);
277
278         if (llog_get_size(loghandle) <= 1)
279                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
280
281         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
282         CDEBUG(D_INFO, "get_db = %d\n", rc);
283 out_close:
284         llog_close(env, loghandle);
285 out_pop:
286         name_destroy(&logname);
287 out_put:
288         llog_ctxt_put(ctxt);
289
290         RETURN(rc);
291 }
292
293 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
294 {
295         struct mgs_tgt_srpc_conf *tgtconf;
296
297         /* free target-specific rules */
298         while (fsdb->fsdb_srpc_tgt) {
299                 tgtconf = fsdb->fsdb_srpc_tgt;
300                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
301
302                 LASSERT(tgtconf->mtsc_tgt);
303
304                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
305                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
306                 OBD_FREE_PTR(tgtconf);
307         }
308
309         /* free general rules */
310         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
311 }
312
313 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
314 {
315         struct fs_db *fsdb;
316         cfs_list_t *tmp;
317
318         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
319                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
320                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
321                         return fsdb;
322         }
323         return NULL;
324 }
325
326 /* caller must hold the mgs->mgs_fs_db_lock */
327 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
328                                   struct mgs_device *mgs, char *fsname)
329 {
330         struct fs_db *fsdb;
331         int rc;
332         ENTRY;
333
334         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
335                 CERROR("fsname %s is too long\n", fsname);
336                 RETURN(NULL);
337         }
338
339         OBD_ALLOC_PTR(fsdb);
340         if (!fsdb)
341                 RETURN(NULL);
342
343         strcpy(fsdb->fsdb_name, fsname);
344         mutex_init(&fsdb->fsdb_mutex);
345         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
346         fsdb->fsdb_gen = 1;
347
348         if (strcmp(fsname, MGSSELF_NAME) == 0) {
349                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
350         } else {
351                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
352                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
353                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
354                         CERROR("No memory for index maps\n");
355                         GOTO(err, rc = -ENOMEM);
356                 }
357
358                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
359                 if (rc)
360                         GOTO(err, rc);
361                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
362                 if (rc)
363                         GOTO(err, rc);
364
365                 /* initialise data for NID table */
366                 mgs_ir_init_fs(env, mgs, fsdb);
367
368                 lproc_mgs_add_live(mgs, fsdb);
369         }
370
371         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
372
373         RETURN(fsdb);
374 err:
375         if (fsdb->fsdb_ost_index_map)
376                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
377         if (fsdb->fsdb_mdt_index_map)
378                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
379         name_destroy(&fsdb->fsdb_clilov);
380         name_destroy(&fsdb->fsdb_clilmv);
381         OBD_FREE_PTR(fsdb);
382         RETURN(NULL);
383 }
384
385 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
386 {
387         /* wait for anyone with the sem */
388         mutex_lock(&fsdb->fsdb_mutex);
389         lproc_mgs_del_live(mgs, fsdb);
390         cfs_list_del(&fsdb->fsdb_list);
391
392         /* deinitialize fsr */
393         mgs_ir_fini_fs(mgs, fsdb);
394
395         if (fsdb->fsdb_ost_index_map)
396                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
397         if (fsdb->fsdb_mdt_index_map)
398                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
399         name_destroy(&fsdb->fsdb_clilov);
400         name_destroy(&fsdb->fsdb_clilmv);
401         mgs_free_fsdb_srpc(fsdb);
402         mutex_unlock(&fsdb->fsdb_mutex);
403         OBD_FREE_PTR(fsdb);
404 }
405
406 int mgs_init_fsdb_list(struct mgs_device *mgs)
407 {
408         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
409         return 0;
410 }
411
412 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
413 {
414         struct fs_db *fsdb;
415         cfs_list_t *tmp, *tmp2;
416         mutex_lock(&mgs->mgs_mutex);
417         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
418                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
419                 mgs_free_fsdb(mgs, fsdb);
420         }
421         mutex_unlock(&mgs->mgs_mutex);
422         return 0;
423 }
424
425 int mgs_find_or_make_fsdb(const struct lu_env *env,
426                           struct mgs_device *mgs, char *name,
427                           struct fs_db **dbh)
428 {
429         struct fs_db *fsdb;
430         int rc = 0;
431
432         ENTRY;
433         mutex_lock(&mgs->mgs_mutex);
434         fsdb = mgs_find_fsdb(mgs, name);
435         if (fsdb) {
436                 mutex_unlock(&mgs->mgs_mutex);
437                 *dbh = fsdb;
438                 RETURN(0);
439         }
440
441         CDEBUG(D_MGS, "Creating new db\n");
442         fsdb = mgs_new_fsdb(env, mgs, name);
443         /* lock fsdb_mutex until the db is loaded from llogs */
444         if (fsdb)
445                 mutex_lock(&fsdb->fsdb_mutex);
446         mutex_unlock(&mgs->mgs_mutex);
447         if (!fsdb)
448                 RETURN(-ENOMEM);
449
450         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
451                 /* populate the db from the client llog */
452                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
453                 if (rc) {
454                         CERROR("Can't get db from client log %d\n", rc);
455                         GOTO(out_free, rc);
456                 }
457         }
458
459         /* populate srpc rules from params llog */
460         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
461         if (rc) {
462                 CERROR("Can't get db from params log %d\n", rc);
463                 GOTO(out_free, rc);
464         }
465
466         mutex_unlock(&fsdb->fsdb_mutex);
467         *dbh = fsdb;
468
469         RETURN(0);
470
471 out_free:
472         mutex_unlock(&fsdb->fsdb_mutex);
473         mgs_free_fsdb(mgs, fsdb);
474         return rc;
475 }
476
477 /* 1 = index in use
478    0 = index unused
479    -1= empty client log */
480 int mgs_check_index(const struct lu_env *env,
481                     struct mgs_device *mgs,
482                     struct mgs_target_info *mti)
483 {
484         struct fs_db *fsdb;
485         void *imap;
486         int rc = 0;
487         ENTRY;
488
489         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
490
491         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
492         if (rc) {
493                 CERROR("Can't get db for %s\n", mti->mti_fsname);
494                 RETURN(rc);
495         }
496
497         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
498                 RETURN(-1);
499
500         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
501                 imap = fsdb->fsdb_ost_index_map;
502         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
503                 imap = fsdb->fsdb_mdt_index_map;
504         else
505                 RETURN(-EINVAL);
506
507         if (test_bit(mti->mti_stripe_index, imap))
508                 RETURN(1);
509         RETURN(0);
510 }
511
512 static __inline__ int next_index(void *index_map, int map_len)
513 {
514         int i;
515         for (i = 0; i < map_len * 8; i++)
516                 if (!test_bit(i, index_map)) {
517                          return i;
518                  }
519         CERROR("max index %d exceeded.\n", i);
520         return -1;
521 }
522
523 /* Return codes:
524         0  newly marked as in use
525         <0 err
526         +EALREADY for update of an old index */
527 static int mgs_set_index(const struct lu_env *env,
528                          struct mgs_device *mgs,
529                          struct mgs_target_info *mti)
530 {
531         struct fs_db *fsdb;
532         void *imap;
533         int rc = 0;
534         ENTRY;
535
536         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
537         if (rc) {
538                 CERROR("Can't get db for %s\n", mti->mti_fsname);
539                 RETURN(rc);
540         }
541
542         mutex_lock(&fsdb->fsdb_mutex);
543         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
544                 imap = fsdb->fsdb_ost_index_map;
545         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
546                 imap = fsdb->fsdb_mdt_index_map;
547         } else {
548                 GOTO(out_up, rc = -EINVAL);
549         }
550
551         if (mti->mti_flags & LDD_F_NEED_INDEX) {
552                 rc = next_index(imap, INDEX_MAP_SIZE);
553                 if (rc == -1)
554                         GOTO(out_up, rc = -ERANGE);
555                 mti->mti_stripe_index = rc;
556                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
557                         fsdb->fsdb_mdt_count ++;
558         }
559
560         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
561                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
562                                    "but the max index is %d.\n",
563                                    mti->mti_svname, mti->mti_stripe_index,
564                                    INDEX_MAP_SIZE * 8);
565                 GOTO(out_up, rc = -ERANGE);
566         }
567
568         if (test_bit(mti->mti_stripe_index, imap)) {
569                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
570                     !(mti->mti_flags & LDD_F_WRITECONF)) {
571                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
572                                            "%d, but that index is already in "
573                                            "use. Use --writeconf to force\n",
574                                            mti->mti_svname,
575                                            mti->mti_stripe_index);
576                         GOTO(out_up, rc = -EADDRINUSE);
577                 } else {
578                         CDEBUG(D_MGS, "Server %s updating index %d\n",
579                                mti->mti_svname, mti->mti_stripe_index);
580                         GOTO(out_up, rc = EALREADY);
581                 }
582         }
583
584         set_bit(mti->mti_stripe_index, imap);
585         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
586         mutex_unlock(&fsdb->fsdb_mutex);
587         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
588                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
589
590         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
591                mti->mti_stripe_index);
592
593         RETURN(0);
594 out_up:
595         mutex_unlock(&fsdb->fsdb_mutex);
596         return rc;
597 }
598
599 struct mgs_modify_lookup {
600         struct cfg_marker mml_marker;
601         int               mml_modified;
602 };
603
604 static int mgs_modify_handler(const struct lu_env *env,
605                               struct llog_handle *llh,
606                               struct llog_rec_hdr *rec, void *data)
607 {
608         struct mgs_modify_lookup *mml = data;
609         struct cfg_marker *marker;
610         struct lustre_cfg *lcfg = REC_DATA(rec);
611         int cfg_len = REC_DATA_LEN(rec);
612         int rc;
613         ENTRY;
614
615         if (rec->lrh_type != OBD_CFG_REC) {
616                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
617                 RETURN(-EINVAL);
618         }
619
620         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
621         if (rc) {
622                 CERROR("Insane cfg\n");
623                 RETURN(rc);
624         }
625
626         /* We only care about markers */
627         if (lcfg->lcfg_command != LCFG_MARKER)
628                 RETURN(0);
629
630         marker = lustre_cfg_buf(lcfg, 1);
631         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
632             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
633             !(marker->cm_flags & CM_SKIP)) {
634                 /* Found a non-skipped marker match */
635                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
636                        rec->lrh_index, marker->cm_step,
637                        marker->cm_flags, mml->mml_marker.cm_flags,
638                        marker->cm_tgtname, marker->cm_comment);
639                 /* Overwrite the old marker llog entry */
640                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
641                 marker->cm_flags |= mml->mml_marker.cm_flags;
642                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
643                 /* Header and tail are added back to lrh_len in
644                    llog_lvfs_write_rec */
645                 rec->lrh_len = cfg_len;
646                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
647                                 rec->lrh_index);
648                 if (!rc)
649                          mml->mml_modified++;
650         }
651
652         RETURN(rc);
653 }
654
655 /**
656  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
657  * Return code:
658  * 0 - modified successfully,
659  * 1 - no modification was done
660  * negative - error
661  */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct llog_ctxt *ctxt;
668         struct mgs_modify_lookup *mml;
669         int rc;
670
671         ENTRY;
672
673         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
674         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
675                flags);
676
677         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
678         LASSERT(ctxt != NULL);
679         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
680         if (rc < 0) {
681                 if (rc == -ENOENT)
682                         rc = 0;
683                 GOTO(out_pop, rc);
684         }
685
686         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
687         if (rc)
688                 GOTO(out_close, rc);
689
690         if (llog_get_size(loghandle) <= 1)
691                 GOTO(out_close, rc = 0);
692
693         OBD_ALLOC_PTR(mml);
694         if (!mml)
695                 GOTO(out_close, rc = -ENOMEM);
696         strcpy(mml->mml_marker.cm_comment, comment);
697         strcpy(mml->mml_marker.cm_tgtname, devname);
698         /* Modify mostly means cancel */
699         mml->mml_marker.cm_flags = flags;
700         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
701         mml->mml_modified = 0;
702         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
703                           NULL);
704         if (!rc && !mml->mml_modified)
705                 rc = 1;
706         OBD_FREE_PTR(mml);
707
708 out_close:
709         llog_close(env, loghandle);
710 out_pop:
711         if (rc < 0)
712                 CERROR("%s: modify %s/%s failed: rc = %d\n",
713                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
714         llog_ctxt_put(ctxt);
715         RETURN(rc);
716 }
717
718 /** This structure is passed to mgs_replace_handler */
719 struct mgs_replace_uuid_lookup {
720         /* Nids are replaced for this target device */
721         struct mgs_target_info target;
722         /* Temporary modified llog */
723         struct llog_handle *temp_llh;
724         /* Flag is set if in target block*/
725         int in_target_device;
726         /* Nids already added. Just skip (multiple nids) */
727         int device_nids_added;
728         /* Flag is set if this block should not be copied */
729         int skip_it;
730 };
731
732 /**
733  * Check: a) if block should be skipped
734  * b) is it target block
735  *
736  * \param[in] lcfg
737  * \param[in] mrul
738  *
739  * \retval 0 should not to be skipped
740  * \retval 1 should to be skipped
741  */
742 static int check_markers(struct lustre_cfg *lcfg,
743                          struct mgs_replace_uuid_lookup *mrul)
744 {
745          struct cfg_marker *marker;
746
747         /* Track markers. Find given device */
748         if (lcfg->lcfg_command == LCFG_MARKER) {
749                 marker = lustre_cfg_buf(lcfg, 1);
750                 /* Clean llog from records marked as CM_EXCLUDE.
751                    CM_SKIP records are used for "active" command
752                    and can be restored if needed */
753                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
754                     (CM_EXCLUDE | CM_START)) {
755                         mrul->skip_it = 1;
756                         return 1;
757                 }
758
759                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
760                     (CM_EXCLUDE | CM_END)) {
761                         mrul->skip_it = 0;
762                         return 1;
763                 }
764
765                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
766                         LASSERT(!(marker->cm_flags & CM_START) ||
767                                 !(marker->cm_flags & CM_END));
768                         if (marker->cm_flags & CM_START) {
769                                 mrul->in_target_device = 1;
770                                 mrul->device_nids_added = 0;
771                         } else if (marker->cm_flags & CM_END)
772                                 mrul->in_target_device = 0;
773                 }
774         }
775
776         return 0;
777 }
778
779 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
780                        struct lustre_cfg *lcfg)
781 {
782         struct llog_rec_hdr      rec;
783         int                      buflen, rc;
784
785         if (!lcfg || !llh)
786                 return -ENOMEM;
787
788         LASSERT(llh->lgh_ctxt);
789
790         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
791                                 lcfg->lcfg_buflens);
792         rec.lrh_len = llog_data_len(buflen);
793         rec.lrh_type = OBD_CFG_REC;
794
795         /* idx = -1 means append */
796         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
797         if (rc)
798                 CERROR("failed %d\n", rc);
799         return rc;
800 }
801
802 static int record_base(const struct lu_env *env, struct llog_handle *llh,
803                      char *cfgname, lnet_nid_t nid, int cmd,
804                      char *s1, char *s2, char *s3, char *s4)
805 {
806         struct mgs_thread_info *mgi = mgs_env_info(env);
807         struct lustre_cfg     *lcfg;
808         int rc;
809
810         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
811                cmd, s1, s2, s3, s4);
812
813         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
814         if (s1)
815                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
816         if (s2)
817                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
818         if (s3)
819                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
820         if (s4)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
822
823         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
824         if (!lcfg)
825                 return -ENOMEM;
826         lcfg->lcfg_nid = nid;
827
828         rc = record_lcfg(env, llh, lcfg);
829
830         lustre_cfg_free(lcfg);
831
832         if (rc) {
833                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
834                        cmd, s1, s2, s3, s4);
835         }
836         return rc;
837 }
838
839 static inline int record_add_uuid(const struct lu_env *env,
840                                   struct llog_handle *llh,
841                                   uint64_t nid, char *uuid)
842 {
843         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
844 }
845
846 static inline int record_add_conn(const struct lu_env *env,
847                                   struct llog_handle *llh,
848                                   char *devname, char *uuid)
849 {
850         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
851 }
852
853 static inline int record_attach(const struct lu_env *env,
854                                 struct llog_handle *llh, char *devname,
855                                 char *type, char *uuid)
856 {
857         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
858 }
859
860 static inline int record_setup(const struct lu_env *env,
861                                struct llog_handle *llh, char *devname,
862                                char *s1, char *s2, char *s3, char *s4)
863 {
864         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
865 }
866
867 /**
868  * \retval <0 record processing error
869  * \retval n record is processed. No need copy original one.
870  * \retval 0 record is not processed.
871  */
872 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
873                            struct mgs_replace_uuid_lookup *mrul)
874 {
875         int nids_added = 0;
876         lnet_nid_t nid;
877         char *ptr;
878         int rc;
879
880         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
881                 /* LCFG_ADD_UUID command found. Let's skip original command
882                    and add passed nids */
883                 ptr = mrul->target.mti_params;
884                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
885                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
886                                "device %s\n", libcfs_nid2str(nid),
887                                 mrul->target.mti_params,
888                                 mrul->target.mti_svname);
889                         rc = record_add_uuid(env,
890                                              mrul->temp_llh, nid,
891                                              mrul->target.mti_params);
892                         if (!rc)
893                                 nids_added++;
894                 }
895
896                 if (nids_added == 0) {
897                         CERROR("No new nids were added, nid %s with uuid %s, "
898                                "device %s\n", libcfs_nid2str(nid),
899                                mrul->target.mti_params,
900                                mrul->target.mti_svname);
901                         RETURN(-ENXIO);
902                 } else {
903                         mrul->device_nids_added = 1;
904                 }
905
906                 return nids_added;
907         }
908
909         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
910                 /* LCFG_SETUP command found. UUID should be changed */
911                 rc = record_setup(env,
912                                   mrul->temp_llh,
913                                   /* devname the same */
914                                   lustre_cfg_string(lcfg, 0),
915                                   /* s1 is not changed */
916                                   lustre_cfg_string(lcfg, 1),
917                                   /* new uuid should be
918                                   the full nidlist */
919                                   mrul->target.mti_params,
920                                   /* s3 is not changed */
921                                   lustre_cfg_string(lcfg, 3),
922                                   /* s4 is not changed */
923                                   lustre_cfg_string(lcfg, 4));
924                 return rc ? rc : 1;
925         }
926
927         /* Another commands in target device block */
928         return 0;
929 }
930
931 /**
932  * Handler that called for every record in llog.
933  * Records are processed in order they placed in llog.
934  *
935  * \param[in] llh       log to be processed
936  * \param[in] rec       current record
937  * \param[in] data      mgs_replace_uuid_lookup structure
938  *
939  * \retval 0    success
940  */
941 static int mgs_replace_handler(const struct lu_env *env,
942                                struct llog_handle *llh,
943                                struct llog_rec_hdr *rec,
944                                void *data)
945 {
946         struct mgs_replace_uuid_lookup *mrul;
947         struct lustre_cfg *lcfg = REC_DATA(rec);
948         int cfg_len = REC_DATA_LEN(rec);
949         int rc;
950         ENTRY;
951
952         mrul = (struct mgs_replace_uuid_lookup *)data;
953
954         if (rec->lrh_type != OBD_CFG_REC) {
955                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
956                        rec->lrh_type, lcfg->lcfg_command,
957                        lustre_cfg_string(lcfg, 0),
958                        lustre_cfg_string(lcfg, 1));
959                 RETURN(-EINVAL);
960         }
961
962         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
963         if (rc) {
964                 /* Do not copy any invalidated records */
965                 GOTO(skip_out, rc = 0);
966         }
967
968         rc = check_markers(lcfg, mrul);
969         if (rc || mrul->skip_it)
970                 GOTO(skip_out, rc = 0);
971
972         /* Write to new log all commands outside target device block */
973         if (!mrul->in_target_device)
974                 GOTO(copy_out, rc = 0);
975
976         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
977            (failover nids) for this target, assuming that if then
978            primary is changing then so is the failover */
979         if (mrul->device_nids_added &&
980             (lcfg->lcfg_command == LCFG_ADD_UUID ||
981              lcfg->lcfg_command == LCFG_ADD_CONN))
982                 GOTO(skip_out, rc = 0);
983
984         rc = process_command(env, lcfg, mrul);
985         if (rc < 0)
986                 RETURN(rc);
987
988         if (rc)
989                 RETURN(0);
990 copy_out:
991         /* Record is placed in temporary llog as is */
992         rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1);
993
994         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
995                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
996                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
997         RETURN(rc);
998
999 skip_out:
1000         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1001                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1002                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1003         RETURN(rc);
1004 }
1005
1006 static int mgs_log_is_empty(const struct lu_env *env,
1007                             struct mgs_device *mgs, char *name)
1008 {
1009         struct llog_ctxt        *ctxt;
1010         int                      rc;
1011
1012         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1013         LASSERT(ctxt != NULL);
1014
1015         rc = llog_is_empty(env, ctxt, name);
1016         llog_ctxt_put(ctxt);
1017         return rc;
1018 }
1019
1020 static int mgs_replace_nids_log(const struct lu_env *env,
1021                                 struct obd_device *mgs, struct fs_db *fsdb,
1022                                 char *logname, char *devname, char *nids)
1023 {
1024         struct llog_handle *orig_llh, *backup_llh;
1025         struct llog_ctxt *ctxt;
1026         struct mgs_replace_uuid_lookup *mrul;
1027         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1028         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1029         char *backup;
1030         int rc, rc2;
1031         ENTRY;
1032
1033         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1034
1035         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1036         LASSERT(ctxt != NULL);
1037
1038         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1039                 /* Log is empty. Nothing to replace */
1040                 GOTO(out_put, rc = 0);
1041         }
1042
1043         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1044         if (backup == NULL)
1045                 GOTO(out_put, rc = -ENOMEM);
1046
1047         sprintf(backup, "%s.bak", logname);
1048
1049         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1050         if (rc == 0) {
1051                 /* Now erase original log file. Connections are not allowed.
1052                    Backup is already saved */
1053                 rc = llog_erase(env, ctxt, NULL, logname);
1054                 if (rc < 0)
1055                         GOTO(out_free, rc);
1056         } else if (rc != -ENOENT) {
1057                 CERROR("%s: can't make backup for %s: rc = %d\n",
1058                        mgs->obd_name, logname, rc);
1059                 GOTO(out_free,rc);
1060         }
1061
1062         /* open local log */
1063         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1064         if (rc)
1065                 GOTO(out_restore, rc);
1066
1067         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1068         if (rc)
1069                 GOTO(out_closel, rc);
1070
1071         /* open backup llog */
1072         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1073                        LLOG_OPEN_EXISTS);
1074         if (rc)
1075                 GOTO(out_closel, rc);
1076
1077         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1078         if (rc)
1079                 GOTO(out_close, rc);
1080
1081         if (llog_get_size(backup_llh) <= 1)
1082                 GOTO(out_close, rc = 0);
1083
1084         OBD_ALLOC_PTR(mrul);
1085         if (!mrul)
1086                 GOTO(out_close, rc = -ENOMEM);
1087         /* devname is only needed information to replace UUID records */
1088         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1089         /* parse nids later */
1090         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1091         /* Copy records to this temporary llog */
1092         mrul->temp_llh = orig_llh;
1093
1094         rc = llog_process(env, backup_llh, mgs_replace_handler,
1095                           (void *)mrul, NULL);
1096         OBD_FREE_PTR(mrul);
1097 out_close:
1098         rc2 = llog_close(NULL, backup_llh);
1099         if (!rc)
1100                 rc = rc2;
1101 out_closel:
1102         rc2 = llog_close(NULL, orig_llh);
1103         if (!rc)
1104                 rc = rc2;
1105
1106 out_restore:
1107         if (rc) {
1108                 CERROR("%s: llog should be restored: rc = %d\n",
1109                        mgs->obd_name, rc);
1110                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1111                                   logname);
1112                 if (rc2 < 0)
1113                         CERROR("%s: can't restore backup %s: rc = %d\n",
1114                                mgs->obd_name, logname, rc2);
1115         }
1116
1117 out_free:
1118         OBD_FREE(backup, strlen(backup) + 1);
1119
1120 out_put:
1121         llog_ctxt_put(ctxt);
1122
1123         if (rc)
1124                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1125                        mgs->obd_name, logname, rc);
1126
1127         RETURN(rc);
1128 }
1129
1130 /**
1131  * Parse device name and get file system name and/or device index
1132  *
1133  * \param[in]   devname device name (ex. lustre-MDT0000)
1134  * \param[out]  fsname  file system name(optional)
1135  * \param[out]  index   device index(optional)
1136  *
1137  * \retval 0    success
1138  */
1139 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1140 {
1141         int rc;
1142         ENTRY;
1143
1144         /* Extract fsname */
1145         if (fsname) {
1146                 rc = server_name2fsname(devname, fsname, NULL);
1147                 if (rc < 0) {
1148                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1149                                devname);
1150                         RETURN(-EINVAL);
1151                 }
1152         }
1153
1154         if (index) {
1155                 rc = server_name2index(devname, index, NULL);
1156                 if (rc < 0) {
1157                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1158                                devname);
1159                         RETURN(-EINVAL);
1160                 }
1161         }
1162
1163         RETURN(0);
1164 }
1165
1166 static int only_mgs_is_running(struct obd_device *mgs_obd)
1167 {
1168         /* TDB: Is global variable with devices count exists? */
1169         int num_devices = get_devices_count();
1170         /* osd, MGS and MGC + self_export
1171            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1172         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1173 }
1174
1175 static int name_create_mdt(char **logname, char *fsname, int i)
1176 {
1177         char mdt_index[9];
1178
1179         sprintf(mdt_index, "-MDT%04x", i);
1180         return name_create(logname, fsname, mdt_index);
1181 }
1182
1183 /**
1184  * Replace nids for \a device to \a nids values
1185  *
1186  * \param obd           MGS obd device
1187  * \param devname       nids need to be replaced for this device
1188  * (ex. lustre-OST0000)
1189  * \param nids          nids list (ex. nid1,nid2,nid3)
1190  *
1191  * \retval 0    success
1192  */
1193 int mgs_replace_nids(const struct lu_env *env,
1194                      struct mgs_device *mgs,
1195                      char *devname, char *nids)
1196 {
1197         /* Assume fsname is part of device name */
1198         char fsname[MTI_NAME_MAXLEN];
1199         int rc;
1200         __u32 index;
1201         char *logname;
1202         struct fs_db *fsdb;
1203         unsigned int i;
1204         int conn_state;
1205         struct obd_device *mgs_obd = mgs->mgs_obd;
1206         ENTRY;
1207
1208         /* We can only change NIDs if no other nodes are connected */
1209         spin_lock(&mgs_obd->obd_dev_lock);
1210         conn_state = mgs_obd->obd_no_conn;
1211         mgs_obd->obd_no_conn = 1;
1212         spin_unlock(&mgs_obd->obd_dev_lock);
1213
1214         /* We can not change nids if not only MGS is started */
1215         if (!only_mgs_is_running(mgs_obd)) {
1216                 CERROR("Only MGS is allowed to be started\n");
1217                 GOTO(out, rc = -EINPROGRESS);
1218         }
1219
1220         /* Get fsname and index*/
1221         rc = mgs_parse_devname(devname, fsname, &index);
1222         if (rc)
1223                 GOTO(out, rc);
1224
1225         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1226         if (rc) {
1227                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1228                 GOTO(out, rc);
1229         }
1230
1231         /* Process client llogs */
1232         name_create(&logname, fsname, "-client");
1233         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1234         name_destroy(&logname);
1235         if (rc) {
1236                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1237                        fsname, devname, rc);
1238                 GOTO(out, rc);
1239         }
1240
1241         /* Process MDT llogs */
1242         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1243                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1244                         continue;
1245                 name_create_mdt(&logname, fsname, i);
1246                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1247                 name_destroy(&logname);
1248                 if (rc)
1249                         GOTO(out, rc);
1250         }
1251
1252 out:
1253         spin_lock(&mgs_obd->obd_dev_lock);
1254         mgs_obd->obd_no_conn = conn_state;
1255         spin_unlock(&mgs_obd->obd_dev_lock);
1256
1257         RETURN(rc);
1258 }
1259
1260 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1261                             char *devname, struct lov_desc *desc)
1262 {
1263         struct mgs_thread_info *mgi = mgs_env_info(env);
1264         struct lustre_cfg *lcfg;
1265         int rc;
1266
1267         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1268         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1269         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1270         if (!lcfg)
1271                 return -ENOMEM;
1272         rc = record_lcfg(env, llh, lcfg);
1273
1274         lustre_cfg_free(lcfg);
1275         return rc;
1276 }
1277
1278 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1279                             char *devname, struct lmv_desc *desc)
1280 {
1281         struct mgs_thread_info *mgi = mgs_env_info(env);
1282         struct lustre_cfg *lcfg;
1283         int rc;
1284
1285         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1286         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1287         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1288
1289         rc = record_lcfg(env, llh, lcfg);
1290
1291         lustre_cfg_free(lcfg);
1292         return rc;
1293 }
1294
1295 static inline int record_mdc_add(const struct lu_env *env,
1296                                  struct llog_handle *llh,
1297                                  char *logname, char *mdcuuid,
1298                                  char *mdtuuid, char *index,
1299                                  char *gen)
1300 {
1301         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1302                            mdtuuid,index,gen,mdcuuid);
1303 }
1304
1305 static inline int record_lov_add(const struct lu_env *env,
1306                                  struct llog_handle *llh,
1307                                  char *lov_name, char *ost_uuid,
1308                                  char *index, char *gen)
1309 {
1310         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1311                            ost_uuid, index, gen, 0);
1312 }
1313
1314 static inline int record_mount_opt(const struct lu_env *env,
1315                                    struct llog_handle *llh,
1316                                    char *profile, char *lov_name,
1317                                    char *mdc_name)
1318 {
1319         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1320                            profile,lov_name,mdc_name,0);
1321 }
1322
1323 static int record_marker(const struct lu_env *env,
1324                          struct llog_handle *llh,
1325                          struct fs_db *fsdb, __u32 flags,
1326                          char *tgtname, char *comment)
1327 {
1328         struct mgs_thread_info *mgi = mgs_env_info(env);
1329         struct lustre_cfg *lcfg;
1330         int rc;
1331         int cplen = 0;
1332
1333         if (flags & CM_START)
1334                 fsdb->fsdb_gen++;
1335         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1336         mgi->mgi_marker.cm_flags = flags;
1337         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1338         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1339                         sizeof(mgi->mgi_marker.cm_tgtname));
1340         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1341                 return -E2BIG;
1342         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1343                         sizeof(mgi->mgi_marker.cm_comment));
1344         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1345                 return -E2BIG;
1346         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1347         mgi->mgi_marker.cm_canceltime = 0;
1348         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1349         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1350                             sizeof(mgi->mgi_marker));
1351         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1352         if (!lcfg)
1353                 return -ENOMEM;
1354         rc = record_lcfg(env, llh, lcfg);
1355
1356         lustre_cfg_free(lcfg);
1357         return rc;
1358 }
1359
1360 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1361                             struct llog_handle **llh, char *name)
1362 {
1363         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1364         struct llog_ctxt        *ctxt;
1365         int                      rc = 0;
1366
1367         if (*llh)
1368                 GOTO(out, rc = -EBUSY);
1369
1370         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1371         if (!ctxt)
1372                 GOTO(out, rc = -ENODEV);
1373         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1374
1375         rc = llog_open_create(env, ctxt, llh, NULL, name);
1376         if (rc)
1377                 GOTO(out_ctxt, rc);
1378         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1379         if (rc)
1380                 llog_close(env, *llh);
1381 out_ctxt:
1382         llog_ctxt_put(ctxt);
1383 out:
1384         if (rc) {
1385                 CERROR("%s: can't start log %s: rc = %d\n",
1386                        mgs->mgs_obd->obd_name, name, rc);
1387                 *llh = NULL;
1388         }
1389         RETURN(rc);
1390 }
1391
1392 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1393 {
1394         int rc;
1395
1396         rc = llog_close(env, *llh);
1397         *llh = NULL;
1398
1399         return rc;
1400 }
1401
1402 /******************** config "macros" *********************/
1403
1404 /* write an lcfg directly into a log (with markers) */
1405 static int mgs_write_log_direct(const struct lu_env *env,
1406                                 struct mgs_device *mgs, struct fs_db *fsdb,
1407                                 char *logname, struct lustre_cfg *lcfg,
1408                                 char *devname, char *comment)
1409 {
1410         struct llog_handle *llh = NULL;
1411         int rc;
1412         ENTRY;
1413
1414         if (!lcfg)
1415                 RETURN(-ENOMEM);
1416
1417         rc = record_start_log(env, mgs, &llh, logname);
1418         if (rc)
1419                 RETURN(rc);
1420
1421         /* FIXME These should be a single journal transaction */
1422         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1423         if (rc)
1424                 GOTO(out_end, rc);
1425         rc = record_lcfg(env, llh, lcfg);
1426         if (rc)
1427                 GOTO(out_end, rc);
1428         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1429         if (rc)
1430                 GOTO(out_end, rc);
1431 out_end:
1432         record_end_log(env, &llh);
1433         RETURN(rc);
1434 }
1435
1436 /* write the lcfg in all logs for the given fs */
1437 int mgs_write_log_direct_all(const struct lu_env *env,
1438                              struct mgs_device *mgs,
1439                              struct fs_db *fsdb,
1440                              struct mgs_target_info *mti,
1441                              struct lustre_cfg *lcfg,
1442                              char *devname, char *comment,
1443                              int server_only)
1444 {
1445         cfs_list_t list;
1446         struct mgs_direntry *dirent, *n;
1447         char *fsname = mti->mti_fsname;
1448         char *logname;
1449         int rc = 0, len = strlen(fsname);
1450         ENTRY;
1451
1452         /* We need to set params for any future logs
1453            as well. FIXME Append this file to every new log.
1454            Actually, we should store as params (text), not llogs.  Or
1455            in a database. */
1456         rc = name_create(&logname, fsname, "-params");
1457         if (rc)
1458                 RETURN(rc);
1459         if (mgs_log_is_empty(env, mgs, logname)) {
1460                 struct llog_handle *llh = NULL;
1461                 rc = record_start_log(env, mgs, &llh, logname);
1462                 record_end_log(env, &llh);
1463         }
1464         name_destroy(&logname);
1465         if (rc)
1466                 RETURN(rc);
1467
1468         /* Find all the logs in the CONFIGS directory */
1469         rc = class_dentry_readdir(env, mgs, &list);
1470         if (rc)
1471                 RETURN(rc);
1472
1473         /* Could use fsdb index maps instead of directory listing */
1474         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1475                 cfs_list_del(&dirent->list);
1476                 /* don't write to sptlrpc rule log */
1477                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1478                         goto next;
1479
1480                 /* caller wants write server logs only */
1481                 if (server_only && strstr(dirent->name, "-client") != NULL)
1482                         goto next;
1483
1484                 if (strncmp(fsname, dirent->name, len) == 0) {
1485                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1486                         /* Erase any old settings of this same parameter */
1487                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1488                                         devname, comment, CM_SKIP);
1489                         if (rc < 0)
1490                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1491                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1492                         /* Write the new one */
1493                         if (lcfg) {
1494                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1495                                                           dirent->name,
1496                                                           lcfg, devname,
1497                                                           comment);
1498                                 if (rc)
1499                                         CERROR("%s: writing log %s: rc = %d\n",
1500                                                mgs->mgs_obd->obd_name,
1501                                                dirent->name, rc);
1502                         }
1503                 }
1504 next:
1505                 mgs_direntry_free(dirent);
1506         }
1507
1508         RETURN(rc);
1509 }
1510
1511 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1512                                     struct mgs_device *mgs,
1513                                     struct fs_db *fsdb,
1514                                     struct mgs_target_info *mti,
1515                                     int index, char *logname);
1516 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1517                                     struct mgs_device *mgs,
1518                                     struct fs_db *fsdb,
1519                                     struct mgs_target_info *mti,
1520                                     char *logname, char *suffix, char *lovname,
1521                                     enum lustre_sec_part sec_part, int flags);
1522 static int name_create_mdt_and_lov(char **logname, char **lovname,
1523                                    struct fs_db *fsdb, int i);
1524
1525 static int add_param(char *params, char *key, char *val)
1526 {
1527         char *start = params + strlen(params);
1528         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1529         int keylen = 0;
1530
1531         if (key != NULL)
1532                 keylen = strlen(key);
1533         if (start + 1 + keylen + strlen(val) >= end) {
1534                 CERROR("params are too long: %s %s%s\n",
1535                        params, key != NULL ? key : "", val);
1536                 return -EINVAL;
1537         }
1538
1539         sprintf(start, " %s%s", key != NULL ? key : "", val);
1540         return 0;
1541 }
1542
1543 /**
1544  * Walk through client config log record and convert the related records
1545  * into the target.
1546  **/
1547 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1548                                          struct llog_handle *llh,
1549                                          struct llog_rec_hdr *rec, void *data)
1550 {
1551         struct mgs_device *mgs;
1552         struct obd_device *obd;
1553         struct mgs_target_info *mti, *tmti;
1554         struct fs_db *fsdb;
1555         int cfg_len = rec->lrh_len;
1556         char *cfg_buf = (char*) (rec + 1);
1557         struct lustre_cfg *lcfg;
1558         int rc = 0;
1559         struct llog_handle *mdt_llh = NULL;
1560         static int got_an_osc_or_mdc = 0;
1561         /* 0: not found any osc/mdc;
1562            1: found osc;
1563            2: found mdc;
1564         */
1565         static int last_step = -1;
1566         int cplen = 0;
1567
1568         ENTRY;
1569
1570         mti = ((struct temp_comp*)data)->comp_mti;
1571         tmti = ((struct temp_comp*)data)->comp_tmti;
1572         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1573         obd = ((struct temp_comp *)data)->comp_obd;
1574         mgs = lu2mgs_dev(obd->obd_lu_dev);
1575         LASSERT(mgs);
1576
1577         if (rec->lrh_type != OBD_CFG_REC) {
1578                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1579                 RETURN(-EINVAL);
1580         }
1581
1582         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1583         if (rc) {
1584                 CERROR("Insane cfg\n");
1585                 RETURN(rc);
1586         }
1587
1588         lcfg = (struct lustre_cfg *)cfg_buf;
1589
1590         if (lcfg->lcfg_command == LCFG_MARKER) {
1591                 struct cfg_marker *marker;
1592                 marker = lustre_cfg_buf(lcfg, 1);
1593                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1594                     (marker->cm_flags & CM_START) &&
1595                      !(marker->cm_flags & CM_SKIP)) {
1596                         got_an_osc_or_mdc = 1;
1597                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1598                                         sizeof(tmti->mti_svname));
1599                         if (cplen >= sizeof(tmti->mti_svname))
1600                                 RETURN(-E2BIG);
1601                         rc = record_start_log(env, mgs, &mdt_llh,
1602                                               mti->mti_svname);
1603                         if (rc)
1604                                 RETURN(rc);
1605                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1606                                            mti->mti_svname, "add osc(copied)");
1607                         record_end_log(env, &mdt_llh);
1608                         last_step = marker->cm_step;
1609                         RETURN(rc);
1610                 }
1611                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1612                     (marker->cm_flags & CM_END) &&
1613                      !(marker->cm_flags & CM_SKIP)) {
1614                         LASSERT(last_step == marker->cm_step);
1615                         last_step = -1;
1616                         got_an_osc_or_mdc = 0;
1617                         memset(tmti, 0, sizeof(*tmti));
1618                         rc = record_start_log(env, mgs, &mdt_llh,
1619                                               mti->mti_svname);
1620                         if (rc)
1621                                 RETURN(rc);
1622                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1623                                            mti->mti_svname, "add osc(copied)");
1624                         record_end_log(env, &mdt_llh);
1625                         RETURN(rc);
1626                 }
1627                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1628                     (marker->cm_flags & CM_START) &&
1629                      !(marker->cm_flags & CM_SKIP)) {
1630                         got_an_osc_or_mdc = 2;
1631                         last_step = marker->cm_step;
1632                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1633                                strlen(marker->cm_tgtname));
1634
1635                         RETURN(rc);
1636                 }
1637                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1638                     (marker->cm_flags & CM_END) &&
1639                      !(marker->cm_flags & CM_SKIP)) {
1640                         LASSERT(last_step == marker->cm_step);
1641                         last_step = -1;
1642                         got_an_osc_or_mdc = 0;
1643                         memset(tmti, 0, sizeof(*tmti));
1644                         RETURN(rc);
1645                 }
1646         }
1647
1648         if (got_an_osc_or_mdc == 0 || last_step < 0)
1649                 RETURN(rc);
1650
1651         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1652                 uint64_t nodenid = lcfg->lcfg_nid;
1653
1654                 if (strlen(tmti->mti_uuid) == 0) {
1655                         /* target uuid not set, this config record is before
1656                          * LCFG_SETUP, this nid is one of target node nid.
1657                          */
1658                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1659                         tmti->mti_nid_count++;
1660                 } else {
1661                         /* failover node nid */
1662                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1663                                        libcfs_nid2str(nodenid));
1664                 }
1665
1666                 RETURN(rc);
1667         }
1668
1669         if (lcfg->lcfg_command == LCFG_SETUP) {
1670                 char *target;
1671
1672                 target = lustre_cfg_string(lcfg, 1);
1673                 memcpy(tmti->mti_uuid, target, strlen(target));
1674                 RETURN(rc);
1675         }
1676
1677         /* ignore client side sptlrpc_conf_log */
1678         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1679                 RETURN(rc);
1680
1681         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1682                 int index;
1683
1684                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1685                         RETURN (-EINVAL);
1686
1687                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1688                        strlen(mti->mti_fsname));
1689                 tmti->mti_stripe_index = index;
1690
1691                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1692                                               mti->mti_stripe_index,
1693                                               mti->mti_svname);
1694                 memset(tmti, 0, sizeof(*tmti));
1695                 RETURN(rc);
1696         }
1697
1698         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1699                 int index;
1700                 char mdt_index[9];
1701                 char *logname, *lovname;
1702
1703                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1704                                              mti->mti_stripe_index);
1705                 if (rc)
1706                         RETURN(rc);
1707                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1708
1709                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1710                         name_destroy(&logname);
1711                         name_destroy(&lovname);
1712                         RETURN(-EINVAL);
1713                 }
1714
1715                 tmti->mti_stripe_index = index;
1716                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1717                                          mdt_index, lovname,
1718                                          LUSTRE_SP_MDT, 0);
1719                 name_destroy(&logname);
1720                 name_destroy(&lovname);
1721                 RETURN(rc);
1722         }
1723         RETURN(rc);
1724 }
1725
1726 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1727 /* stealed from mgs_get_fsdb_from_llog*/
1728 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1729                                               struct mgs_device *mgs,
1730                                               char *client_name,
1731                                               struct temp_comp* comp)
1732 {
1733         struct llog_handle *loghandle;
1734         struct mgs_target_info *tmti;
1735         struct llog_ctxt *ctxt;
1736         int rc;
1737
1738         ENTRY;
1739
1740         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1741         LASSERT(ctxt != NULL);
1742
1743         OBD_ALLOC_PTR(tmti);
1744         if (tmti == NULL)
1745                 GOTO(out_ctxt, rc = -ENOMEM);
1746
1747         comp->comp_tmti = tmti;
1748         comp->comp_obd = mgs->mgs_obd;
1749
1750         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1751                        LLOG_OPEN_EXISTS);
1752         if (rc < 0) {
1753                 if (rc == -ENOENT)
1754                         rc = 0;
1755                 GOTO(out_pop, rc);
1756         }
1757
1758         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1759         if (rc)
1760                 GOTO(out_close, rc);
1761
1762         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1763                                   (void *)comp, NULL, false);
1764         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1765 out_close:
1766         llog_close(env, loghandle);
1767 out_pop:
1768         OBD_FREE_PTR(tmti);
1769 out_ctxt:
1770         llog_ctxt_put(ctxt);
1771         RETURN(rc);
1772 }
1773
1774 /* lmv is the second thing for client logs */
1775 /* copied from mgs_write_log_lov. Please refer to that.  */
1776 static int mgs_write_log_lmv(const struct lu_env *env,
1777                              struct mgs_device *mgs,
1778                              struct fs_db *fsdb,
1779                              struct mgs_target_info *mti,
1780                              char *logname, char *lmvname)
1781 {
1782         struct llog_handle *llh = NULL;
1783         struct lmv_desc *lmvdesc;
1784         char *uuid;
1785         int rc = 0;
1786         ENTRY;
1787
1788         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1789
1790         OBD_ALLOC_PTR(lmvdesc);
1791         if (lmvdesc == NULL)
1792                 RETURN(-ENOMEM);
1793         lmvdesc->ld_active_tgt_count = 0;
1794         lmvdesc->ld_tgt_count = 0;
1795         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1796         uuid = (char *)lmvdesc->ld_uuid.uuid;
1797
1798         rc = record_start_log(env, mgs, &llh, logname);
1799         if (rc)
1800                 GOTO(out_free, rc);
1801         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1802         if (rc)
1803                 GOTO(out_end, rc);
1804         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1805         if (rc)
1806                 GOTO(out_end, rc);
1807         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1808         if (rc)
1809                 GOTO(out_end, rc);
1810         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1811         if (rc)
1812                 GOTO(out_end, rc);
1813 out_end:
1814         record_end_log(env, &llh);
1815 out_free:
1816         OBD_FREE_PTR(lmvdesc);
1817         RETURN(rc);
1818 }
1819
1820 /* lov is the first thing in the mdt and client logs */
1821 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1822                              struct fs_db *fsdb, struct mgs_target_info *mti,
1823                              char *logname, char *lovname)
1824 {
1825         struct llog_handle *llh = NULL;
1826         struct lov_desc *lovdesc;
1827         char *uuid;
1828         int rc = 0;
1829         ENTRY;
1830
1831         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1832
1833         /*
1834         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1835         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1836               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1837         */
1838
1839         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1840         OBD_ALLOC_PTR(lovdesc);
1841         if (lovdesc == NULL)
1842                 RETURN(-ENOMEM);
1843         lovdesc->ld_magic = LOV_DESC_MAGIC;
1844         lovdesc->ld_tgt_count = 0;
1845         /* Defaults.  Can be changed later by lcfg config_param */
1846         lovdesc->ld_default_stripe_count = 1;
1847         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1848         lovdesc->ld_default_stripe_size = 1024 * 1024;
1849         lovdesc->ld_default_stripe_offset = -1;
1850         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1851         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1852         /* can these be the same? */
1853         uuid = (char *)lovdesc->ld_uuid.uuid;
1854
1855         /* This should always be the first entry in a log.
1856         rc = mgs_clear_log(obd, logname); */
1857         rc = record_start_log(env, mgs, &llh, logname);
1858         if (rc)
1859                 GOTO(out_free, rc);
1860         /* FIXME these should be a single journal transaction */
1861         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1862         if (rc)
1863                 GOTO(out_end, rc);
1864         rc = record_attach(env, llh, lovname, "lov", uuid);
1865         if (rc)
1866                 GOTO(out_end, rc);
1867         rc = record_lov_setup(env, llh, lovname, lovdesc);
1868         if (rc)
1869                 GOTO(out_end, rc);
1870         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1871         if (rc)
1872                 GOTO(out_end, rc);
1873         EXIT;
1874 out_end:
1875         record_end_log(env, &llh);
1876 out_free:
1877         OBD_FREE_PTR(lovdesc);
1878         return rc;
1879 }
1880
1881 /* add failnids to open log */
1882 static int mgs_write_log_failnids(const struct lu_env *env,
1883                                   struct mgs_target_info *mti,
1884                                   struct llog_handle *llh,
1885                                   char *cliname)
1886 {
1887         char *failnodeuuid = NULL;
1888         char *ptr = mti->mti_params;
1889         lnet_nid_t nid;
1890         int rc = 0;
1891
1892         /*
1893         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1894         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1895         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1896         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1897         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1898         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1899         */
1900
1901         /* Pull failnid info out of params string */
1902         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1903                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1904                         if (failnodeuuid == NULL) {
1905                                 /* We don't know the failover node name,
1906                                    so just use the first nid as the uuid */
1907                                 rc = name_create(&failnodeuuid,
1908                                                  libcfs_nid2str(nid), "");
1909                                 if (rc)
1910                                         return rc;
1911                         }
1912                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1913                                "client %s\n", libcfs_nid2str(nid),
1914                                failnodeuuid, cliname);
1915                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1916                 }
1917                 if (failnodeuuid)
1918                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1919         }
1920
1921         name_destroy(&failnodeuuid);
1922         return rc;
1923 }
1924
1925 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1926                                     struct mgs_device *mgs,
1927                                     struct fs_db *fsdb,
1928                                     struct mgs_target_info *mti,
1929                                     char *logname, char *lmvname)
1930 {
1931         struct llog_handle *llh = NULL;
1932         char *mdcname = NULL;
1933         char *nodeuuid = NULL;
1934         char *mdcuuid = NULL;
1935         char *lmvuuid = NULL;
1936         char index[6];
1937         int i, rc;
1938         ENTRY;
1939
1940         if (mgs_log_is_empty(env, mgs, logname)) {
1941                 CERROR("log is empty! Logical error\n");
1942                 RETURN(-EINVAL);
1943         }
1944
1945         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1946                mti->mti_svname, logname, lmvname);
1947
1948         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1949         if (rc)
1950                 RETURN(rc);
1951         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1952         if (rc)
1953                 GOTO(out_free, rc);
1954         rc = name_create(&mdcuuid, mdcname, "_UUID");
1955         if (rc)
1956                 GOTO(out_free, rc);
1957         rc = name_create(&lmvuuid, lmvname, "_UUID");
1958         if (rc)
1959                 GOTO(out_free, rc);
1960
1961         rc = record_start_log(env, mgs, &llh, logname);
1962         if (rc)
1963                 GOTO(out_free, rc);
1964         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1965                            "add mdc");
1966         if (rc)
1967                 GOTO(out_end, rc);
1968         for (i = 0; i < mti->mti_nid_count; i++) {
1969                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1970                        libcfs_nid2str(mti->mti_nids[i]));
1971
1972                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1973                 if (rc)
1974                         GOTO(out_end, rc);
1975         }
1976
1977         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1978         if (rc)
1979                 GOTO(out_end, rc);
1980         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1981         if (rc)
1982                 GOTO(out_end, rc);
1983         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1984         if (rc)
1985                 GOTO(out_end, rc);
1986         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1987         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1988                             index, "1");
1989         if (rc)
1990                 GOTO(out_end, rc);
1991         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1992                            "add mdc");
1993         if (rc)
1994                 GOTO(out_end, rc);
1995 out_end:
1996         record_end_log(env, &llh);
1997 out_free:
1998         name_destroy(&lmvuuid);
1999         name_destroy(&mdcuuid);
2000         name_destroy(&mdcname);
2001         name_destroy(&nodeuuid);
2002         RETURN(rc);
2003 }
2004
2005 static inline int name_create_lov(char **lovname, char *mdtname,
2006                                   struct fs_db *fsdb, int index)
2007 {
2008         /* COMPAT_180 */
2009         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2010                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2011         else
2012                 return name_create(lovname, mdtname, "-mdtlov");
2013 }
2014
2015 static int name_create_mdt_and_lov(char **logname, char **lovname,
2016                                    struct fs_db *fsdb, int i)
2017 {
2018         int rc;
2019
2020         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2021         if (rc)
2022                 return rc;
2023         /* COMPAT_180 */
2024         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2025                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2026         else
2027                 rc = name_create(lovname, *logname, "-mdtlov");
2028         if (rc) {
2029                 name_destroy(logname);
2030                 *logname = NULL;
2031         }
2032         return rc;
2033 }
2034
2035 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2036                                       struct fs_db *fsdb, int i)
2037 {
2038         char suffix[16];
2039
2040         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2041                 sprintf(suffix, "-osc");
2042         else
2043                 sprintf(suffix, "-osc-MDT%04x", i);
2044         return name_create(oscname, ostname, suffix);
2045 }
2046
2047 /* add new mdc to already existent MDS */
2048 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2049                                     struct mgs_device *mgs,
2050                                     struct fs_db *fsdb,
2051                                     struct mgs_target_info *mti,
2052                                     int mdt_index, char *logname)
2053 {
2054         struct llog_handle      *llh = NULL;
2055         char    *nodeuuid = NULL;
2056         char    *ospname = NULL;
2057         char    *lovuuid = NULL;
2058         char    *mdtuuid = NULL;
2059         char    *svname = NULL;
2060         char    *mdtname = NULL;
2061         char    *lovname = NULL;
2062         char    index_str[16];
2063         int     i, rc;
2064
2065         ENTRY;
2066         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2067                 CERROR("log is empty! Logical error\n");
2068                 RETURN (-EINVAL);
2069         }
2070
2071         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2072                logname);
2073
2074         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2075         if (rc)
2076                 RETURN(rc);
2077
2078         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2079         if (rc)
2080                 GOTO(out_destory, rc);
2081
2082         rc = name_create(&svname, mdtname, "-osp");
2083         if (rc)
2084                 GOTO(out_destory, rc);
2085
2086         sprintf(index_str, "-MDT%04x", mdt_index);
2087         rc = name_create(&ospname, svname, index_str);
2088         if (rc)
2089                 GOTO(out_destory, rc);
2090
2091         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2092         if (rc)
2093                 GOTO(out_destory, rc);
2094
2095         rc = name_create(&lovuuid, lovname, "_UUID");
2096         if (rc)
2097                 GOTO(out_destory, rc);
2098
2099         rc = name_create(&mdtuuid, mdtname, "_UUID");
2100         if (rc)
2101                 GOTO(out_destory, rc);
2102
2103         rc = record_start_log(env, mgs, &llh, logname);
2104         if (rc)
2105                 GOTO(out_destory, rc);
2106
2107         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2108                            "add osp");
2109         if (rc)
2110                 GOTO(out_destory, rc);
2111
2112         for (i = 0; i < mti->mti_nid_count; i++) {
2113                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2114                        libcfs_nid2str(mti->mti_nids[i]));
2115                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2116                 if (rc)
2117                         GOTO(out_end, rc);
2118         }
2119
2120         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2121         if (rc)
2122                 GOTO(out_end, rc);
2123
2124         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2125                           NULL, NULL);
2126         if (rc)
2127                 GOTO(out_end, rc);
2128
2129         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2130         if (rc)
2131                 GOTO(out_end, rc);
2132
2133         /* Add mdc(osp) to lod */
2134         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2135                  mti->mti_stripe_index);
2136         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2137                          index_str, "1", NULL);
2138         if (rc)
2139                 GOTO(out_end, rc);
2140
2141         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2142         if (rc)
2143                 GOTO(out_end, rc);
2144
2145 out_end:
2146         record_end_log(env, &llh);
2147
2148 out_destory:
2149         name_destroy(&mdtuuid);
2150         name_destroy(&lovuuid);
2151         name_destroy(&lovname);
2152         name_destroy(&ospname);
2153         name_destroy(&svname);
2154         name_destroy(&nodeuuid);
2155         name_destroy(&mdtname);
2156         RETURN(rc);
2157 }
2158
2159 static int mgs_write_log_mdt0(const struct lu_env *env,
2160                               struct mgs_device *mgs,
2161                               struct fs_db *fsdb,
2162                               struct mgs_target_info *mti)
2163 {
2164         char *log = mti->mti_svname;
2165         struct llog_handle *llh = NULL;
2166         char *uuid, *lovname;
2167         char mdt_index[6];
2168         char *ptr = mti->mti_params;
2169         int rc = 0, failout = 0;
2170         ENTRY;
2171
2172         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2173         if (uuid == NULL)
2174                 RETURN(-ENOMEM);
2175
2176         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2177                 failout = (strncmp(ptr, "failout", 7) == 0);
2178
2179         rc = name_create(&lovname, log, "-mdtlov");
2180         if (rc)
2181                 GOTO(out_free, rc);
2182         if (mgs_log_is_empty(env, mgs, log)) {
2183                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2184                 if (rc)
2185                         GOTO(out_lod, rc);
2186         }
2187
2188         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2189
2190         rc = record_start_log(env, mgs, &llh, log);
2191         if (rc)
2192                 GOTO(out_lod, rc);
2193
2194         /* add MDT itself */
2195
2196         /* FIXME this whole fn should be a single journal transaction */
2197         sprintf(uuid, "%s_UUID", log);
2198         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2199         if (rc)
2200                 GOTO(out_lod, rc);
2201         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2202         if (rc)
2203                 GOTO(out_end, rc);
2204         rc = record_mount_opt(env, llh, log, lovname, NULL);
2205         if (rc)
2206                 GOTO(out_end, rc);
2207         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2208                         failout ? "n" : "f");
2209         if (rc)
2210                 GOTO(out_end, rc);
2211         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2212         if (rc)
2213                 GOTO(out_end, rc);
2214 out_end:
2215         record_end_log(env, &llh);
2216 out_lod:
2217         name_destroy(&lovname);
2218 out_free:
2219         OBD_FREE(uuid, sizeof(struct obd_uuid));
2220         RETURN(rc);
2221 }
2222
2223 /* envelope method for all layers log */
2224 static int mgs_write_log_mdt(const struct lu_env *env,
2225                              struct mgs_device *mgs,
2226                              struct fs_db *fsdb,
2227                              struct mgs_target_info *mti)
2228 {
2229         struct mgs_thread_info *mgi = mgs_env_info(env);
2230         struct llog_handle *llh = NULL;
2231         char *cliname;
2232         int rc, i = 0;
2233         ENTRY;
2234
2235         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2236
2237         if (mti->mti_uuid[0] == '\0') {
2238                 /* Make up our own uuid */
2239                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2240                          "%s_UUID", mti->mti_svname);
2241         }
2242
2243         /* add mdt */
2244         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2245         if (rc)
2246                 RETURN(rc);
2247         /* Append the mdt info to the client log */
2248         rc = name_create(&cliname, mti->mti_fsname, "-client");
2249         if (rc)
2250                 RETURN(rc);
2251
2252         if (mgs_log_is_empty(env, mgs, cliname)) {
2253                 /* Start client log */
2254                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2255                                        fsdb->fsdb_clilov);
2256                 if (rc)
2257                         GOTO(out_free, rc);
2258                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2259                                        fsdb->fsdb_clilmv);
2260                 if (rc)
2261                         GOTO(out_free, rc);
2262         }
2263
2264         /*
2265         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2266         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2267         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2268         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2269         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2270         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2271         */
2272
2273                 /* copy client info about lov/lmv */
2274                 mgi->mgi_comp.comp_mti = mti;
2275                 mgi->mgi_comp.comp_fsdb = fsdb;
2276
2277                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2278                                                         &mgi->mgi_comp);
2279                 if (rc)
2280                         GOTO(out_free, rc);
2281                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2282                                               fsdb->fsdb_clilmv);
2283                 if (rc)
2284                         GOTO(out_free, rc);
2285
2286                 /* add mountopts */
2287                 rc = record_start_log(env, mgs, &llh, cliname);
2288                 if (rc)
2289                         GOTO(out_free, rc);
2290
2291                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2292                                    "mount opts");
2293                 if (rc)
2294                         GOTO(out_end, rc);
2295                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2296                                       fsdb->fsdb_clilmv);
2297                 if (rc)
2298                         GOTO(out_end, rc);
2299                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2300                                    "mount opts");
2301
2302         if (rc)
2303                 GOTO(out_end, rc);
2304
2305         /* for_all_existing_mdt except current one */
2306         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2307                 if (i !=  mti->mti_stripe_index &&
2308                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2309                         char *logname;
2310
2311                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2312                         if (rc)
2313                                 GOTO(out_end, rc);
2314
2315                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2316                                                       i, logname);
2317                         name_destroy(&logname);
2318                         if (rc)
2319                                 GOTO(out_end, rc);
2320                 }
2321         }
2322 out_end:
2323         record_end_log(env, &llh);
2324 out_free:
2325         name_destroy(&cliname);
2326         RETURN(rc);
2327 }
2328
2329 /* Add the ost info to the client/mdt lov */
2330 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2331                                     struct mgs_device *mgs, struct fs_db *fsdb,
2332                                     struct mgs_target_info *mti,
2333                                     char *logname, char *suffix, char *lovname,
2334                                     enum lustre_sec_part sec_part, int flags)
2335 {
2336         struct llog_handle *llh = NULL;
2337         char *nodeuuid = NULL;
2338         char *oscname = NULL;
2339         char *oscuuid = NULL;
2340         char *lovuuid = NULL;
2341         char *svname = NULL;
2342         char index[6];
2343         int i, rc;
2344
2345         ENTRY;
2346         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2347                mti->mti_svname, logname);
2348
2349         if (mgs_log_is_empty(env, mgs, logname)) {
2350                 CERROR("log is empty! Logical error\n");
2351                 RETURN (-EINVAL);
2352         }
2353
2354         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2355         if (rc)
2356                 RETURN(rc);
2357         rc = name_create(&svname, mti->mti_svname, "-osc");
2358         if (rc)
2359                 GOTO(out_free, rc);
2360
2361         /* for the system upgraded from old 1.8, keep using the old osc naming
2362          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2363         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2364                 rc = name_create(&oscname, svname, "");
2365         else
2366                 rc = name_create(&oscname, svname, suffix);
2367         if (rc)
2368                 GOTO(out_free, rc);
2369
2370         rc = name_create(&oscuuid, oscname, "_UUID");
2371         if (rc)
2372                 GOTO(out_free, rc);
2373         rc = name_create(&lovuuid, lovname, "_UUID");
2374         if (rc)
2375                 GOTO(out_free, rc);
2376
2377
2378         /*
2379         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2380         multihomed (#4)
2381         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2382         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2383         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2384         failover (#6,7)
2385         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2386         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2387         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2388         */
2389
2390         rc = record_start_log(env, mgs, &llh, logname);
2391         if (rc)
2392                 GOTO(out_free, rc);
2393
2394         /* FIXME these should be a single journal transaction */
2395         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2396                            "add osc");
2397         if (rc)
2398                 GOTO(out_end, rc);
2399
2400         /* NB: don't change record order, because upon MDT steal OSC config
2401          * from client, it treats all nids before LCFG_SETUP as target nids
2402          * (multiple interfaces), while nids after as failover node nids.
2403          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2404          */
2405         for (i = 0; i < mti->mti_nid_count; i++) {
2406                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2407                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2408                 if (rc)
2409                         GOTO(out_end, rc);
2410         }
2411         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2412         if (rc)
2413                 GOTO(out_end, rc);
2414         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2415         if (rc)
2416                 GOTO(out_end, rc);
2417         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2418         if (rc)
2419                 GOTO(out_end, rc);
2420
2421         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2422
2423         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2424         if (rc)
2425                 GOTO(out_end, rc);
2426         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2427                            "add osc");
2428         if (rc)
2429                 GOTO(out_end, rc);
2430 out_end:
2431         record_end_log(env, &llh);
2432 out_free:
2433         name_destroy(&lovuuid);
2434         name_destroy(&oscuuid);
2435         name_destroy(&oscname);
2436         name_destroy(&svname);
2437         name_destroy(&nodeuuid);
2438         RETURN(rc);
2439 }
2440
2441 static int mgs_write_log_ost(const struct lu_env *env,
2442                              struct mgs_device *mgs, struct fs_db *fsdb,
2443                              struct mgs_target_info *mti)
2444 {
2445         struct llog_handle *llh = NULL;
2446         char *logname, *lovname;
2447         char *ptr = mti->mti_params;
2448         int rc, flags = 0, failout = 0, i;
2449         ENTRY;
2450
2451         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2452
2453         /* The ost startup log */
2454
2455         /* If the ost log already exists, that means that someone reformatted
2456            the ost and it called target_add again. */
2457         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2458                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2459                                    "exists, yet the server claims it never "
2460                                    "registered. It may have been reformatted, "
2461                                    "or the index changed. writeconf the MDT to "
2462                                    "regenerate all logs.\n", mti->mti_svname);
2463                 RETURN(-EALREADY);
2464         }
2465
2466         /*
2467         attach obdfilter ost1 ost1_UUID
2468         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2469         */
2470         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2471                 failout = (strncmp(ptr, "failout", 7) == 0);
2472         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2473         if (rc)
2474                 RETURN(rc);
2475         /* FIXME these should be a single journal transaction */
2476         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2477         if (rc)
2478                 GOTO(out_end, rc);
2479         if (*mti->mti_uuid == '\0')
2480                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2481                          "%s_UUID", mti->mti_svname);
2482         rc = record_attach(env, llh, mti->mti_svname,
2483                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2484         if (rc)
2485                 GOTO(out_end, rc);
2486         rc = record_setup(env, llh, mti->mti_svname,
2487                           "dev"/*ignored*/, "type"/*ignored*/,
2488                           failout ? "n" : "f", 0/*options*/);
2489         if (rc)
2490                 GOTO(out_end, rc);
2491         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2492         if (rc)
2493                 GOTO(out_end, rc);
2494 out_end:
2495         record_end_log(env, &llh);
2496         if (rc)
2497                 RETURN(rc);
2498         /* We also have to update the other logs where this osc is part of
2499            the lov */
2500
2501         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2502                 /* If we're upgrading, the old mdt log already has our
2503                    entry. Let's do a fake one for fun. */
2504                 /* Note that we can't add any new failnids, since we don't
2505                    know the old osc names. */
2506                 flags = CM_SKIP | CM_UPGRADE146;
2507
2508         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2509                 /* If the update flag isn't set, don't update client/mdt
2510                    logs. */
2511                 flags |= CM_SKIP;
2512                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2513                               "the MDT first to regenerate it.\n",
2514                               mti->mti_svname);
2515         }
2516
2517         /* Add ost to all MDT lov defs */
2518         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2519                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2520                         char mdt_index[9];
2521
2522                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2523                                                      i);
2524                         if (rc)
2525                                 RETURN(rc);
2526                         sprintf(mdt_index, "-MDT%04x", i);
2527                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2528                                                       logname, mdt_index,
2529                                                       lovname, LUSTRE_SP_MDT,
2530                                                       flags);
2531                         name_destroy(&logname);
2532                         name_destroy(&lovname);
2533                         if (rc)
2534                                 RETURN(rc);
2535                 }
2536         }
2537
2538         /* Append ost info to the client log */
2539         rc = name_create(&logname, mti->mti_fsname, "-client");
2540         if (rc)
2541                 RETURN(rc);
2542         if (mgs_log_is_empty(env, mgs, logname)) {
2543                 /* Start client log */
2544                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2545                                        fsdb->fsdb_clilov);
2546                 if (rc)
2547                         GOTO(out_free, rc);
2548                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2549                                        fsdb->fsdb_clilmv);
2550                 if (rc)
2551                         GOTO(out_free, rc);
2552         }
2553         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2554                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2555 out_free:
2556         name_destroy(&logname);
2557         RETURN(rc);
2558 }
2559
2560 static __inline__ int mgs_param_empty(char *ptr)
2561 {
2562         char *tmp;
2563
2564         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2565                 return 1;
2566         return 0;
2567 }
2568
2569 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2570                                           struct mgs_device *mgs,
2571                                           struct fs_db *fsdb,
2572                                           struct mgs_target_info *mti,
2573                                           char *logname, char *cliname)
2574 {
2575         int rc;
2576         struct llog_handle *llh = NULL;
2577
2578         if (mgs_param_empty(mti->mti_params)) {
2579                 /* Remove _all_ failnids */
2580                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2581                                 mti->mti_svname, "add failnid", CM_SKIP);
2582                 return rc < 0 ? rc : 0;
2583         }
2584
2585         /* Otherwise failover nids are additive */
2586         rc = record_start_log(env, mgs, &llh, logname);
2587         if (rc)
2588                 return rc;
2589                 /* FIXME this should be a single journal transaction */
2590         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2591                            "add failnid");
2592         if (rc)
2593                 goto out_end;
2594         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2595         if (rc)
2596                 goto out_end;
2597         rc = record_marker(env, llh, fsdb, CM_END,
2598                            mti->mti_svname, "add failnid");
2599 out_end:
2600         record_end_log(env, &llh);
2601         return rc;
2602 }
2603
2604
2605 /* Add additional failnids to an existing log.
2606    The mdc/osc must have been added to logs first */
2607 /* tcp nids must be in dotted-quad ascii -
2608    we can't resolve hostnames from the kernel. */
2609 static int mgs_write_log_add_failnid(const struct lu_env *env,
2610                                      struct mgs_device *mgs,
2611                                      struct fs_db *fsdb,
2612                                      struct mgs_target_info *mti)
2613 {
2614         char *logname, *cliname;
2615         int rc;
2616         ENTRY;
2617
2618         /* FIXME we currently can't erase the failnids
2619          * given when a target first registers, since they aren't part of
2620          * an "add uuid" stanza */
2621
2622         /* Verify that we know about this target */
2623         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2624                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2625                                    "yet. It must be started before failnids "
2626                                    "can be added.\n", mti->mti_svname);
2627                 RETURN(-ENOENT);
2628         }
2629
2630         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2631         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2632                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2633         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2634                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2635         } else {
2636                 RETURN(-EINVAL);
2637         }
2638         if (rc)
2639                 RETURN(rc);
2640         /* Add failover nids to the client log */
2641         rc = name_create(&logname, mti->mti_fsname, "-client");
2642         if (rc) {
2643                 name_destroy(&cliname);
2644                 RETURN(rc);
2645         }
2646         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2647         name_destroy(&logname);
2648         name_destroy(&cliname);
2649         if (rc)
2650                 RETURN(rc);
2651
2652         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2653                 /* Add OST failover nids to the MDT logs as well */
2654                 int i;
2655
2656                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2657                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2658                                 continue;
2659                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2660                         if (rc)
2661                                 RETURN(rc);
2662                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2663                                                  fsdb, i);
2664                         if (rc) {
2665                                 name_destroy(&logname);
2666                                 RETURN(rc);
2667                         }
2668                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2669                                                             mti, logname,
2670                                                             cliname);
2671                         name_destroy(&cliname);
2672                         name_destroy(&logname);
2673                         if (rc)
2674                                 RETURN(rc);
2675                 }
2676         }
2677
2678         RETURN(rc);
2679 }
2680
2681 static int mgs_wlp_lcfg(const struct lu_env *env,
2682                         struct mgs_device *mgs, struct fs_db *fsdb,
2683                         struct mgs_target_info *mti,
2684                         char *logname, struct lustre_cfg_bufs *bufs,
2685                         char *tgtname, char *ptr)
2686 {
2687         char comment[MTI_NAME_MAXLEN];
2688         char *tmp;
2689         struct lustre_cfg *lcfg;
2690         int rc, del;
2691
2692         /* Erase any old settings of this same parameter */
2693         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2694         comment[MTI_NAME_MAXLEN - 1] = 0;
2695         /* But don't try to match the value. */
2696         if ((tmp = strchr(comment, '=')))
2697             *tmp = 0;
2698         /* FIXME we should skip settings that are the same as old values */
2699         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2700         if (rc < 0)
2701                 return rc;
2702         del = mgs_param_empty(ptr);
2703
2704         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2705                       "Sett" : "Modify", tgtname, comment, logname);
2706         if (del)
2707                 return rc;
2708
2709         lustre_cfg_bufs_reset(bufs, tgtname);
2710         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2711         if (mti->mti_flags & LDD_F_PARAM2)
2712                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2713
2714         lcfg = lustre_cfg_new((mti->mti_flags & LDD_F_PARAM2) ?
2715                               LCFG_SET_PARAM : LCFG_PARAM, bufs);
2716
2717         if (!lcfg)
2718                 return -ENOMEM;
2719         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2720         lustre_cfg_free(lcfg);
2721         return rc;
2722 }
2723
2724 static int mgs_write_log_param2(const struct lu_env *env,
2725                                 struct mgs_device *mgs,
2726                                 struct fs_db *fsdb,
2727                                 struct mgs_target_info *mti, char *ptr)
2728 {
2729         struct lustre_cfg_bufs  bufs;
2730         int                     rc = 0;
2731         ENTRY;
2732
2733         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2734         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2735                           mti->mti_svname, ptr);
2736
2737         RETURN(rc);
2738 }
2739
2740 /* write global variable settings into log */
2741 static int mgs_write_log_sys(const struct lu_env *env,
2742                              struct mgs_device *mgs, struct fs_db *fsdb,
2743                              struct mgs_target_info *mti, char *sys, char *ptr)
2744 {
2745         struct mgs_thread_info *mgi = mgs_env_info(env);
2746         struct lustre_cfg *lcfg;
2747         char *tmp, sep;
2748         int rc, cmd, convert = 1;
2749
2750         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2751                 cmd = LCFG_SET_TIMEOUT;
2752         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2753                 cmd = LCFG_SET_LDLM_TIMEOUT;
2754         /* Check for known params here so we can return error to lctl */
2755         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2756                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2757                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2758                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2759                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2760                 cmd = LCFG_PARAM;
2761         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2762                 convert = 0; /* Don't convert string value to integer */
2763                 cmd = LCFG_PARAM;
2764         } else {
2765                 return -EINVAL;
2766         }
2767
2768         if (mgs_param_empty(ptr))
2769                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2770         else
2771                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2772
2773         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2774         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2775         if (!convert && *tmp != '\0')
2776                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2777         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2778         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2779         /* truncate the comment to the parameter name */
2780         ptr = tmp - 1;
2781         sep = *ptr;
2782         *ptr = '\0';
2783         /* modify all servers and clients */
2784         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2785                                       *tmp == '\0' ? NULL : lcfg,
2786                                       mti->mti_fsname, sys, 0);
2787         if (rc == 0 && *tmp != '\0') {
2788                 switch (cmd) {
2789                 case LCFG_SET_TIMEOUT:
2790                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2791                                 class_process_config(lcfg);
2792                         break;
2793                 case LCFG_SET_LDLM_TIMEOUT:
2794                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2795                                 class_process_config(lcfg);
2796                         break;
2797                 default:
2798                         break;
2799                 }
2800         }
2801         *ptr = sep;
2802         lustre_cfg_free(lcfg);
2803         return rc;
2804 }
2805
2806 /* write quota settings into log */
2807 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2808                                struct fs_db *fsdb, struct mgs_target_info *mti,
2809                                char *quota, char *ptr)
2810 {
2811         struct mgs_thread_info  *mgi = mgs_env_info(env);
2812         struct lustre_cfg       *lcfg;
2813         char                    *tmp;
2814         char                     sep;
2815         int                      rc, cmd = LCFG_PARAM;
2816
2817         /* support only 'meta' and 'data' pools so far */
2818         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2819             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2820                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2821                        "& quota.ost are)\n", ptr);
2822                 return -EINVAL;
2823         }
2824
2825         if (*tmp == '\0') {
2826                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2827         } else {
2828                 CDEBUG(D_MGS, "global '%s'\n", quota);
2829
2830                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2831                     strcmp(tmp, "none") != 0) {
2832                         CERROR("enable option(%s) isn't supported\n", tmp);
2833                         return -EINVAL;
2834                 }
2835         }
2836
2837         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2838         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2839         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2840         /* truncate the comment to the parameter name */
2841         ptr = tmp - 1;
2842         sep = *ptr;
2843         *ptr = '\0';
2844
2845         /* XXX we duplicated quota enable information in all server
2846          *     config logs, it should be moved to a separate config
2847          *     log once we cleanup the config log for global param. */
2848         /* modify all servers */
2849         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2850                                       *tmp == '\0' ? NULL : lcfg,
2851                                       mti->mti_fsname, quota, 1);
2852         *ptr = sep;
2853         lustre_cfg_free(lcfg);
2854         return rc < 0 ? rc : 0;
2855 }
2856
2857 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2858                                    struct mgs_device *mgs,
2859                                    struct fs_db *fsdb,
2860                                    struct mgs_target_info *mti,
2861                                    char *param)
2862 {
2863         struct mgs_thread_info *mgi = mgs_env_info(env);
2864         struct llog_handle     *llh = NULL;
2865         char                   *logname;
2866         char                   *comment, *ptr;
2867         struct lustre_cfg      *lcfg;
2868         int                     rc, len;
2869         ENTRY;
2870
2871         /* get comment */
2872         ptr = strchr(param, '=');
2873         LASSERT(ptr);
2874         len = ptr - param;
2875
2876         OBD_ALLOC(comment, len + 1);
2877         if (comment == NULL)
2878                 RETURN(-ENOMEM);
2879         strncpy(comment, param, len);
2880         comment[len] = '\0';
2881
2882         /* prepare lcfg */
2883         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2884         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2885         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2886         if (lcfg == NULL)
2887                 GOTO(out_comment, rc = -ENOMEM);
2888
2889         /* construct log name */
2890         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2891         if (rc)
2892                 GOTO(out_lcfg, rc);
2893
2894         if (mgs_log_is_empty(env, mgs, logname)) {
2895                 rc = record_start_log(env, mgs, &llh, logname);
2896                 if (rc)
2897                         GOTO(out, rc);
2898                 record_end_log(env, &llh);
2899         }
2900
2901         /* obsolete old one */
2902         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2903                         comment, CM_SKIP);
2904         if (rc < 0)
2905                 GOTO(out, rc);
2906         /* write the new one */
2907         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2908                                   mti->mti_svname, comment);
2909         if (rc)
2910                 CERROR("err %d writing log %s\n", rc, logname);
2911 out:
2912         name_destroy(&logname);
2913 out_lcfg:
2914         lustre_cfg_free(lcfg);
2915 out_comment:
2916         OBD_FREE(comment, len + 1);
2917         RETURN(rc);
2918 }
2919
2920 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2921                                         char *param)
2922 {
2923         char    *ptr;
2924
2925         /* disable the adjustable udesc parameter for now, i.e. use default
2926          * setting that client always ship udesc to MDT if possible. to enable
2927          * it simply remove the following line */
2928         goto error_out;
2929
2930         ptr = strchr(param, '=');
2931         if (ptr == NULL)
2932                 goto error_out;
2933         *ptr++ = '\0';
2934
2935         if (strcmp(param, PARAM_SRPC_UDESC))
2936                 goto error_out;
2937
2938         if (strcmp(ptr, "yes") == 0) {
2939                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2940                 CWARN("Enable user descriptor shipping from client to MDT\n");
2941         } else if (strcmp(ptr, "no") == 0) {
2942                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2943                 CWARN("Disable user descriptor shipping from client to MDT\n");
2944         } else {
2945                 *(ptr - 1) = '=';
2946                 goto error_out;
2947         }
2948         return 0;
2949
2950 error_out:
2951         CERROR("Invalid param: %s\n", param);
2952         return -EINVAL;
2953 }
2954
2955 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2956                                   const char *svname,
2957                                   char *param)
2958 {
2959         struct sptlrpc_rule      rule;
2960         struct sptlrpc_rule_set *rset;
2961         int                      rc;
2962         ENTRY;
2963
2964         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2965                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2966                 RETURN(-EINVAL);
2967         }
2968
2969         if (strncmp(param, PARAM_SRPC_UDESC,
2970                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2971                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2972         }
2973
2974         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2975                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2976                 RETURN(-EINVAL);
2977         }
2978
2979         param += sizeof(PARAM_SRPC_FLVR) - 1;
2980
2981         rc = sptlrpc_parse_rule(param, &rule);
2982         if (rc)
2983                 RETURN(rc);
2984
2985         /* mgs rules implies must be mgc->mgs */
2986         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2987                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2988                      rule.sr_from != LUSTRE_SP_ANY) ||
2989                     (rule.sr_to != LUSTRE_SP_MGS &&
2990                      rule.sr_to != LUSTRE_SP_ANY))
2991                         RETURN(-EINVAL);
2992         }
2993
2994         /* preapre room for this coming rule. svcname format should be:
2995          * - fsname: general rule
2996          * - fsname-tgtname: target-specific rule
2997          */
2998         if (strchr(svname, '-')) {
2999                 struct mgs_tgt_srpc_conf *tgtconf;
3000                 int                       found = 0;
3001
3002                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3003                      tgtconf = tgtconf->mtsc_next) {
3004                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3005                                 found = 1;
3006                                 break;
3007                         }
3008                 }
3009
3010                 if (!found) {
3011                         int name_len;
3012
3013                         OBD_ALLOC_PTR(tgtconf);
3014                         if (tgtconf == NULL)
3015                                 RETURN(-ENOMEM);
3016
3017                         name_len = strlen(svname);
3018
3019                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3020                         if (tgtconf->mtsc_tgt == NULL) {
3021                                 OBD_FREE_PTR(tgtconf);
3022                                 RETURN(-ENOMEM);
3023                         }
3024                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3025
3026                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3027                         fsdb->fsdb_srpc_tgt = tgtconf;
3028                 }
3029
3030                 rset = &tgtconf->mtsc_rset;
3031         } else {
3032                 rset = &fsdb->fsdb_srpc_gen;
3033         }
3034
3035         rc = sptlrpc_rule_set_merge(rset, &rule);
3036
3037         RETURN(rc);
3038 }
3039
3040 static int mgs_srpc_set_param(const struct lu_env *env,
3041                               struct mgs_device *mgs,
3042                               struct fs_db *fsdb,
3043                               struct mgs_target_info *mti,
3044                               char *param)
3045 {
3046         char                   *copy;
3047         int                     rc, copy_size;
3048         ENTRY;
3049
3050 #ifndef HAVE_GSS
3051         RETURN(-EINVAL);
3052 #endif
3053         /* keep a copy of original param, which could be destroied
3054          * during parsing */
3055         copy_size = strlen(param) + 1;
3056         OBD_ALLOC(copy, copy_size);
3057         if (copy == NULL)
3058                 return -ENOMEM;
3059         memcpy(copy, param, copy_size);
3060
3061         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3062         if (rc)
3063                 goto out_free;
3064
3065         /* previous steps guaranteed the syntax is correct */
3066         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3067         if (rc)
3068                 goto out_free;
3069
3070         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3071                 /*
3072                  * for mgs rules, make them effective immediately.
3073                  */
3074                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3075                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3076                                                  &fsdb->fsdb_srpc_gen);
3077         }
3078
3079 out_free:
3080         OBD_FREE(copy, copy_size);
3081         RETURN(rc);
3082 }
3083
3084 struct mgs_srpc_read_data {
3085         struct fs_db   *msrd_fsdb;
3086         int             msrd_skip;
3087 };
3088
3089 static int mgs_srpc_read_handler(const struct lu_env *env,
3090                                  struct llog_handle *llh,
3091                                  struct llog_rec_hdr *rec, void *data)
3092 {
3093         struct mgs_srpc_read_data *msrd = data;
3094         struct cfg_marker         *marker;
3095         struct lustre_cfg         *lcfg = REC_DATA(rec);
3096         char                      *svname, *param;
3097         int                        cfg_len, rc;
3098         ENTRY;
3099
3100         if (rec->lrh_type != OBD_CFG_REC) {
3101                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3102                 RETURN(-EINVAL);
3103         }
3104
3105         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3106                   sizeof(struct llog_rec_tail);
3107
3108         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3109         if (rc) {
3110                 CERROR("Insane cfg\n");
3111                 RETURN(rc);
3112         }
3113
3114         if (lcfg->lcfg_command == LCFG_MARKER) {
3115                 marker = lustre_cfg_buf(lcfg, 1);
3116
3117                 if (marker->cm_flags & CM_START &&
3118                     marker->cm_flags & CM_SKIP)
3119                         msrd->msrd_skip = 1;
3120                 if (marker->cm_flags & CM_END)
3121                         msrd->msrd_skip = 0;
3122
3123                 RETURN(0);
3124         }
3125
3126         if (msrd->msrd_skip)
3127                 RETURN(0);
3128
3129         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3130                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3131                 RETURN(0);
3132         }
3133
3134         svname = lustre_cfg_string(lcfg, 0);
3135         if (svname == NULL) {
3136                 CERROR("svname is empty\n");
3137                 RETURN(0);
3138         }
3139
3140         param = lustre_cfg_string(lcfg, 1);
3141         if (param == NULL) {
3142                 CERROR("param is empty\n");
3143                 RETURN(0);
3144         }
3145
3146         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3147         if (rc)
3148                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3149
3150         RETURN(0);
3151 }
3152
3153 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3154                                 struct mgs_device *mgs,
3155                                 struct fs_db *fsdb)
3156 {
3157         struct llog_handle        *llh = NULL;
3158         struct llog_ctxt          *ctxt;
3159         char                      *logname;
3160         struct mgs_srpc_read_data  msrd;
3161         int                        rc;
3162         ENTRY;
3163
3164         /* construct log name */
3165         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3166         if (rc)
3167                 RETURN(rc);
3168
3169         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3170         LASSERT(ctxt != NULL);
3171
3172         if (mgs_log_is_empty(env, mgs, logname))
3173                 GOTO(out, rc = 0);
3174
3175         rc = llog_open(env, ctxt, &llh, NULL, logname,
3176                        LLOG_OPEN_EXISTS);
3177         if (rc < 0) {
3178                 if (rc == -ENOENT)
3179                         rc = 0;
3180                 GOTO(out, rc);
3181         }
3182
3183         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3184         if (rc)
3185                 GOTO(out_close, rc);
3186
3187         if (llog_get_size(llh) <= 1)
3188                 GOTO(out_close, rc = 0);
3189
3190         msrd.msrd_fsdb = fsdb;
3191         msrd.msrd_skip = 0;
3192
3193         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3194                           NULL);
3195
3196 out_close:
3197         llog_close(env, llh);
3198 out:
3199         llog_ctxt_put(ctxt);
3200         name_destroy(&logname);
3201
3202         if (rc)
3203                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3204         RETURN(rc);
3205 }
3206
3207 /* Permanent settings of all parameters by writing into the appropriate
3208  * configuration logs.
3209  * A parameter with null value ("<param>='\0'") means to erase it out of
3210  * the logs.
3211  */
3212 static int mgs_write_log_param(const struct lu_env *env,
3213                                struct mgs_device *mgs, struct fs_db *fsdb,
3214                                struct mgs_target_info *mti, char *ptr)
3215 {
3216         struct mgs_thread_info *mgi = mgs_env_info(env);
3217         char *logname;
3218         char *tmp;
3219         int rc = 0, rc2 = 0;
3220         ENTRY;
3221
3222         /* For various parameter settings, we have to figure out which logs
3223            care about them (e.g. both mdt and client for lov settings) */
3224         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3225
3226         /* The params are stored in MOUNT_DATA_FILE and modified via
3227            tunefs.lustre, or set using lctl conf_param */
3228
3229         /* Processed in lustre_start_mgc */
3230         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3231                 GOTO(end, rc);
3232
3233         /* Processed in ost/mdt */
3234         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3235                 GOTO(end, rc);
3236
3237         /* Processed in mgs_write_log_ost */
3238         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3239                 if (mti->mti_flags & LDD_F_PARAM) {
3240                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3241                                            "changed with tunefs.lustre"
3242                                            "and --writeconf\n", ptr);
3243                         rc = -EPERM;
3244                 }
3245                 GOTO(end, rc);
3246         }
3247
3248         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3249                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3250                 GOTO(end, rc);
3251         }
3252
3253         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3254                 /* Add a failover nidlist */
3255                 rc = 0;
3256                 /* We al