Whamcloud - gitweb
LU-3996 mgs: Don't close llog that we failed to open
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         LASSERT(dir);
71         LASSERT(dir->do_index_ops);
72
73         iops = &dir->do_index_ops->dio_it;
74         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
75         if (IS_ERR(it))
76                 RETURN(PTR_ERR(it));
77
78         rc = iops->load(env, it, 0);
79         if (rc <= 0)
80                 GOTO(fini, rc = 0);
81
82         /* main cycle */
83         do {
84                 key = (void *)iops->key(env, it);
85                 if (IS_ERR(key)) {
86                         CERROR("%s: key failed when listing %s: rc = %d\n",
87                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
88                                (int) PTR_ERR(key));
89                         goto next;
90                 }
91                 key_sz = iops->key_size(env, it);
92                 LASSERT(key_sz > 0);
93
94                 /* filter out "." and ".." entries */
95                 if (key[0] == '.') {
96                         if (key_sz == 1)
97                                 goto next;
98                         if (key_sz == 2 && key[1] == '.')
99                                 goto next;
100                 }
101
102                 de = mgs_direntry_alloc(key_sz + 1);
103                 if (de == NULL) {
104                         rc = -ENOMEM;
105                         break;
106                 }
107
108                 memcpy(de->name, key, key_sz);
109                 de->name[key_sz] = 0;
110
111                 cfs_list_add(&de->list, list);
112
113 next:
114                 rc = iops->next(env, it);
115         } while (rc == 0);
116         rc = 0;
117
118         iops->put(env, it);
119
120 fini:
121         iops->fini(env, it);
122         if (rc)
123                 CERROR("%s: key failed when listing %s: rc = %d\n",
124                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
125         RETURN(rc);
126 }
127
128 /******************** DB functions *********************/
129
130 static inline int name_create(char **newname, char *prefix, char *suffix)
131 {
132         LASSERT(newname);
133         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
134         if (!*newname)
135                 return -ENOMEM;
136         sprintf(*newname, "%s%s", prefix, suffix);
137         return 0;
138 }
139
140 static inline void name_destroy(char **name)
141 {
142         if (*name)
143                 OBD_FREE(*name, strlen(*name) + 1);
144         *name = NULL;
145 }
146
147 struct mgs_fsdb_handler_data
148 {
149         struct fs_db   *fsdb;
150         __u32           ver;
151 };
152
153 /* from the (client) config log, figure out:
154         1. which ost's/mdt's are configured (by index)
155         2. what the last config step is
156         3. COMPAT_18 osc name
157 */
158 /* It might be better to have a separate db file, instead of parsing the info
159    out of the client log.  This is slow and potentially error-prone. */
160 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
161                             struct llog_rec_hdr *rec, void *data)
162 {
163         struct mgs_fsdb_handler_data *d = data;
164         struct fs_db *fsdb = d->fsdb;
165         int cfg_len = rec->lrh_len;
166         char *cfg_buf = (char*) (rec + 1);
167         struct lustre_cfg *lcfg;
168         __u32 index;
169         int rc = 0;
170         ENTRY;
171
172         if (rec->lrh_type != OBD_CFG_REC) {
173                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
174                 RETURN(-EINVAL);
175         }
176
177         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
178         if (rc) {
179                 CERROR("Insane cfg\n");
180                 RETURN(rc);
181         }
182
183         lcfg = (struct lustre_cfg *)cfg_buf;
184
185         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
186                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
187
188         /* Figure out ost indicies */
189         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
190         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
191             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
192                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
193                                        NULL, 10);
194                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
195                        lustre_cfg_string(lcfg, 1), index,
196                        lustre_cfg_string(lcfg, 2));
197                 set_bit(index, fsdb->fsdb_ost_index_map);
198         }
199
200         /* Figure out mdt indicies */
201         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
202         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
203             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
204                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
205                                        &index, NULL);
206                 if (rc != LDD_F_SV_TYPE_MDT) {
207                         CWARN("Unparsable MDC name %s, assuming index 0\n",
208                               lustre_cfg_string(lcfg, 0));
209                         index = 0;
210                 }
211                 rc = 0;
212                 CDEBUG(D_MGS, "MDT index is %u\n", index);
213                 set_bit(index, fsdb->fsdb_mdt_index_map);
214                 fsdb->fsdb_mdt_count ++;
215         }
216
217         /**
218          * figure out the old config. fsdb_gen = 0 means old log
219          * It is obsoleted and not supported anymore
220          */
221         if (fsdb->fsdb_gen == 0) {
222                 CERROR("Old config format is not supported\n");
223                 RETURN(-EINVAL);
224         }
225
226         /*
227          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
228          */
229         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
230             lcfg->lcfg_command == LCFG_ATTACH &&
231             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
232                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
233                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
234                         CWARN("MDT using 1.8 OSC name scheme\n");
235                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
236                 }
237         }
238
239         if (lcfg->lcfg_command == LCFG_MARKER) {
240                 struct cfg_marker *marker;
241                 marker = lustre_cfg_buf(lcfg, 1);
242
243                 d->ver = marker->cm_vers;
244
245                 /* Keep track of the latest marker step */
246                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
247         }
248
249         RETURN(rc);
250 }
251
252 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
253 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
254                                   struct mgs_device *mgs,
255                                   struct fs_db *fsdb)
256 {
257         char                            *logname;
258         struct llog_handle              *loghandle;
259         struct llog_ctxt                *ctxt;
260         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
261         int rc;
262
263         ENTRY;
264
265         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
266         LASSERT(ctxt != NULL);
267         rc = name_create(&logname, fsdb->fsdb_name, "-client");
268         if (rc)
269                 GOTO(out_put, rc);
270         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
271         if (rc)
272                 GOTO(out_pop, rc);
273
274         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
275         if (rc)
276                 GOTO(out_close, rc);
277
278         if (llog_get_size(loghandle) <= 1)
279                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
280
281         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
282         CDEBUG(D_INFO, "get_db = %d\n", rc);
283 out_close:
284         llog_close(env, loghandle);
285 out_pop:
286         name_destroy(&logname);
287 out_put:
288         llog_ctxt_put(ctxt);
289
290         RETURN(rc);
291 }
292
293 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
294 {
295         struct mgs_tgt_srpc_conf *tgtconf;
296
297         /* free target-specific rules */
298         while (fsdb->fsdb_srpc_tgt) {
299                 tgtconf = fsdb->fsdb_srpc_tgt;
300                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
301
302                 LASSERT(tgtconf->mtsc_tgt);
303
304                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
305                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
306                 OBD_FREE_PTR(tgtconf);
307         }
308
309         /* free general rules */
310         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
311 }
312
313 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
314 {
315         struct fs_db *fsdb;
316         cfs_list_t *tmp;
317
318         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
319                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
320                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
321                         return fsdb;
322         }
323         return NULL;
324 }
325
326 /* caller must hold the mgs->mgs_fs_db_lock */
327 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
328                                   struct mgs_device *mgs, char *fsname)
329 {
330         struct fs_db *fsdb;
331         int rc;
332         ENTRY;
333
334         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
335                 CERROR("fsname %s is too long\n", fsname);
336                 RETURN(NULL);
337         }
338
339         OBD_ALLOC_PTR(fsdb);
340         if (!fsdb)
341                 RETURN(NULL);
342
343         strcpy(fsdb->fsdb_name, fsname);
344         mutex_init(&fsdb->fsdb_mutex);
345         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
346         fsdb->fsdb_gen = 1;
347
348         if (strcmp(fsname, MGSSELF_NAME) == 0) {
349                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
350         } else {
351                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
352                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
353                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
354                         CERROR("No memory for index maps\n");
355                         GOTO(err, rc = -ENOMEM);
356                 }
357
358                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
359                 if (rc)
360                         GOTO(err, rc);
361                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
362                 if (rc)
363                         GOTO(err, rc);
364
365                 /* initialise data for NID table */
366                 mgs_ir_init_fs(env, mgs, fsdb);
367
368                 lproc_mgs_add_live(mgs, fsdb);
369         }
370
371         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
372
373         RETURN(fsdb);
374 err:
375         if (fsdb->fsdb_ost_index_map)
376                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
377         if (fsdb->fsdb_mdt_index_map)
378                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
379         name_destroy(&fsdb->fsdb_clilov);
380         name_destroy(&fsdb->fsdb_clilmv);
381         OBD_FREE_PTR(fsdb);
382         RETURN(NULL);
383 }
384
385 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
386 {
387         /* wait for anyone with the sem */
388         mutex_lock(&fsdb->fsdb_mutex);
389         lproc_mgs_del_live(mgs, fsdb);
390         cfs_list_del(&fsdb->fsdb_list);
391
392         /* deinitialize fsr */
393         mgs_ir_fini_fs(mgs, fsdb);
394
395         if (fsdb->fsdb_ost_index_map)
396                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
397         if (fsdb->fsdb_mdt_index_map)
398                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
399         name_destroy(&fsdb->fsdb_clilov);
400         name_destroy(&fsdb->fsdb_clilmv);
401         mgs_free_fsdb_srpc(fsdb);
402         mutex_unlock(&fsdb->fsdb_mutex);
403         OBD_FREE_PTR(fsdb);
404 }
405
406 int mgs_init_fsdb_list(struct mgs_device *mgs)
407 {
408         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
409         return 0;
410 }
411
412 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
413 {
414         struct fs_db *fsdb;
415         cfs_list_t *tmp, *tmp2;
416         mutex_lock(&mgs->mgs_mutex);
417         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
418                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
419                 mgs_free_fsdb(mgs, fsdb);
420         }
421         mutex_unlock(&mgs->mgs_mutex);
422         return 0;
423 }
424
425 int mgs_find_or_make_fsdb(const struct lu_env *env,
426                           struct mgs_device *mgs, char *name,
427                           struct fs_db **dbh)
428 {
429         struct fs_db *fsdb;
430         int rc = 0;
431
432         ENTRY;
433         mutex_lock(&mgs->mgs_mutex);
434         fsdb = mgs_find_fsdb(mgs, name);
435         if (fsdb) {
436                 mutex_unlock(&mgs->mgs_mutex);
437                 *dbh = fsdb;
438                 RETURN(0);
439         }
440
441         CDEBUG(D_MGS, "Creating new db\n");
442         fsdb = mgs_new_fsdb(env, mgs, name);
443         /* lock fsdb_mutex until the db is loaded from llogs */
444         if (fsdb)
445                 mutex_lock(&fsdb->fsdb_mutex);
446         mutex_unlock(&mgs->mgs_mutex);
447         if (!fsdb)
448                 RETURN(-ENOMEM);
449
450         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
451                 /* populate the db from the client llog */
452                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
453                 if (rc) {
454                         CERROR("Can't get db from client log %d\n", rc);
455                         GOTO(out_free, rc);
456                 }
457         }
458
459         /* populate srpc rules from params llog */
460         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
461         if (rc) {
462                 CERROR("Can't get db from params log %d\n", rc);
463                 GOTO(out_free, rc);
464         }
465
466         mutex_unlock(&fsdb->fsdb_mutex);
467         *dbh = fsdb;
468
469         RETURN(0);
470
471 out_free:
472         mutex_unlock(&fsdb->fsdb_mutex);
473         mgs_free_fsdb(mgs, fsdb);
474         return rc;
475 }
476
477 /* 1 = index in use
478    0 = index unused
479    -1= empty client log */
480 int mgs_check_index(const struct lu_env *env,
481                     struct mgs_device *mgs,
482                     struct mgs_target_info *mti)
483 {
484         struct fs_db *fsdb;
485         void *imap;
486         int rc = 0;
487         ENTRY;
488
489         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
490
491         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
492         if (rc) {
493                 CERROR("Can't get db for %s\n", mti->mti_fsname);
494                 RETURN(rc);
495         }
496
497         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
498                 RETURN(-1);
499
500         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
501                 imap = fsdb->fsdb_ost_index_map;
502         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
503                 imap = fsdb->fsdb_mdt_index_map;
504         else
505                 RETURN(-EINVAL);
506
507         if (test_bit(mti->mti_stripe_index, imap))
508                 RETURN(1);
509         RETURN(0);
510 }
511
512 static __inline__ int next_index(void *index_map, int map_len)
513 {
514         int i;
515         for (i = 0; i < map_len * 8; i++)
516                 if (!test_bit(i, index_map)) {
517                          return i;
518                  }
519         CERROR("max index %d exceeded.\n", i);
520         return -1;
521 }
522
523 /* Return codes:
524         0  newly marked as in use
525         <0 err
526         +EALREADY for update of an old index */
527 static int mgs_set_index(const struct lu_env *env,
528                          struct mgs_device *mgs,
529                          struct mgs_target_info *mti)
530 {
531         struct fs_db *fsdb;
532         void *imap;
533         int rc = 0;
534         ENTRY;
535
536         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
537         if (rc) {
538                 CERROR("Can't get db for %s\n", mti->mti_fsname);
539                 RETURN(rc);
540         }
541
542         mutex_lock(&fsdb->fsdb_mutex);
543         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
544                 imap = fsdb->fsdb_ost_index_map;
545         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
546                 imap = fsdb->fsdb_mdt_index_map;
547         } else {
548                 GOTO(out_up, rc = -EINVAL);
549         }
550
551         if (mti->mti_flags & LDD_F_NEED_INDEX) {
552                 rc = next_index(imap, INDEX_MAP_SIZE);
553                 if (rc == -1)
554                         GOTO(out_up, rc = -ERANGE);
555                 mti->mti_stripe_index = rc;
556                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
557                         fsdb->fsdb_mdt_count ++;
558         }
559
560         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
561                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
562                                    "but the max index is %d.\n",
563                                    mti->mti_svname, mti->mti_stripe_index,
564                                    INDEX_MAP_SIZE * 8);
565                 GOTO(out_up, rc = -ERANGE);
566         }
567
568         if (test_bit(mti->mti_stripe_index, imap)) {
569                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
570                     !(mti->mti_flags & LDD_F_WRITECONF)) {
571                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
572                                            "%d, but that index is already in "
573                                            "use. Use --writeconf to force\n",
574                                            mti->mti_svname,
575                                            mti->mti_stripe_index);
576                         GOTO(out_up, rc = -EADDRINUSE);
577                 } else {
578                         CDEBUG(D_MGS, "Server %s updating index %d\n",
579                                mti->mti_svname, mti->mti_stripe_index);
580                         GOTO(out_up, rc = EALREADY);
581                 }
582         }
583
584         set_bit(mti->mti_stripe_index, imap);
585         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
586         mutex_unlock(&fsdb->fsdb_mutex);
587         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
588                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
589
590         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
591                mti->mti_stripe_index);
592
593         RETURN(0);
594 out_up:
595         mutex_unlock(&fsdb->fsdb_mutex);
596         return rc;
597 }
598
599 struct mgs_modify_lookup {
600         struct cfg_marker mml_marker;
601         int               mml_modified;
602 };
603
604 static int mgs_modify_handler(const struct lu_env *env,
605                               struct llog_handle *llh,
606                               struct llog_rec_hdr *rec, void *data)
607 {
608         struct mgs_modify_lookup *mml = data;
609         struct cfg_marker *marker;
610         struct lustre_cfg *lcfg = REC_DATA(rec);
611         int cfg_len = REC_DATA_LEN(rec);
612         int rc;
613         ENTRY;
614
615         if (rec->lrh_type != OBD_CFG_REC) {
616                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
617                 RETURN(-EINVAL);
618         }
619
620         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
621         if (rc) {
622                 CERROR("Insane cfg\n");
623                 RETURN(rc);
624         }
625
626         /* We only care about markers */
627         if (lcfg->lcfg_command != LCFG_MARKER)
628                 RETURN(0);
629
630         marker = lustre_cfg_buf(lcfg, 1);
631         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
632             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
633             !(marker->cm_flags & CM_SKIP)) {
634                 /* Found a non-skipped marker match */
635                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
636                        rec->lrh_index, marker->cm_step,
637                        marker->cm_flags, mml->mml_marker.cm_flags,
638                        marker->cm_tgtname, marker->cm_comment);
639                 /* Overwrite the old marker llog entry */
640                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
641                 marker->cm_flags |= mml->mml_marker.cm_flags;
642                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
643                 /* Header and tail are added back to lrh_len in
644                    llog_lvfs_write_rec */
645                 rec->lrh_len = cfg_len;
646                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
647                                 rec->lrh_index);
648                 if (!rc)
649                          mml->mml_modified++;
650         }
651
652         RETURN(rc);
653 }
654
655 /**
656  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
657  * Return code:
658  * 0 - modified successfully,
659  * 1 - no modification was done
660  * negative - error
661  */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct llog_ctxt *ctxt;
668         struct mgs_modify_lookup *mml;
669         int rc;
670
671         ENTRY;
672
673         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
674         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
675                flags);
676
677         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
678         LASSERT(ctxt != NULL);
679         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
680         if (rc < 0) {
681                 if (rc == -ENOENT)
682                         rc = 0;
683                 GOTO(out_pop, rc);
684         }
685
686         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
687         if (rc)
688                 GOTO(out_close, rc);
689
690         if (llog_get_size(loghandle) <= 1)
691                 GOTO(out_close, rc = 0);
692
693         OBD_ALLOC_PTR(mml);
694         if (!mml)
695                 GOTO(out_close, rc = -ENOMEM);
696         if (strlcpy(mml->mml_marker.cm_comment, comment,
697                     sizeof(mml->mml_marker.cm_comment)) >=
698             sizeof(mml->mml_marker.cm_comment))
699                 GOTO(out_close, rc = -E2BIG);
700         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
701                     sizeof(mml->mml_marker.cm_tgtname)) >=
702             sizeof(mml->mml_marker.cm_tgtname))
703                 GOTO(out_close, rc = -E2BIG);
704         /* Modify mostly means cancel */
705         mml->mml_marker.cm_flags = flags;
706         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
707         mml->mml_modified = 0;
708         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
709                           NULL);
710         if (!rc && !mml->mml_modified)
711                 rc = 1;
712         OBD_FREE_PTR(mml);
713
714 out_close:
715         llog_close(env, loghandle);
716 out_pop:
717         if (rc < 0)
718                 CERROR("%s: modify %s/%s failed: rc = %d\n",
719                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
720         llog_ctxt_put(ctxt);
721         RETURN(rc);
722 }
723
724 /** This structure is passed to mgs_replace_handler */
725 struct mgs_replace_uuid_lookup {
726         /* Nids are replaced for this target device */
727         struct mgs_target_info target;
728         /* Temporary modified llog */
729         struct llog_handle *temp_llh;
730         /* Flag is set if in target block*/
731         int in_target_device;
732         /* Nids already added. Just skip (multiple nids) */
733         int device_nids_added;
734         /* Flag is set if this block should not be copied */
735         int skip_it;
736 };
737
738 /**
739  * Check: a) if block should be skipped
740  * b) is it target block
741  *
742  * \param[in] lcfg
743  * \param[in] mrul
744  *
745  * \retval 0 should not to be skipped
746  * \retval 1 should to be skipped
747  */
748 static int check_markers(struct lustre_cfg *lcfg,
749                          struct mgs_replace_uuid_lookup *mrul)
750 {
751          struct cfg_marker *marker;
752
753         /* Track markers. Find given device */
754         if (lcfg->lcfg_command == LCFG_MARKER) {
755                 marker = lustre_cfg_buf(lcfg, 1);
756                 /* Clean llog from records marked as CM_EXCLUDE.
757                    CM_SKIP records are used for "active" command
758                    and can be restored if needed */
759                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
760                     (CM_EXCLUDE | CM_START)) {
761                         mrul->skip_it = 1;
762                         return 1;
763                 }
764
765                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
766                     (CM_EXCLUDE | CM_END)) {
767                         mrul->skip_it = 0;
768                         return 1;
769                 }
770
771                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
772                         LASSERT(!(marker->cm_flags & CM_START) ||
773                                 !(marker->cm_flags & CM_END));
774                         if (marker->cm_flags & CM_START) {
775                                 mrul->in_target_device = 1;
776                                 mrul->device_nids_added = 0;
777                         } else if (marker->cm_flags & CM_END)
778                                 mrul->in_target_device = 0;
779                 }
780         }
781
782         return 0;
783 }
784
785 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
786                        struct lustre_cfg *lcfg)
787 {
788         struct llog_rec_hdr      rec;
789         int                      buflen, rc;
790
791         if (!lcfg || !llh)
792                 return -ENOMEM;
793
794         LASSERT(llh->lgh_ctxt);
795
796         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
797                                 lcfg->lcfg_buflens);
798         rec.lrh_len = llog_data_len(buflen);
799         rec.lrh_type = OBD_CFG_REC;
800
801         /* idx = -1 means append */
802         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
803         if (rc)
804                 CERROR("failed %d\n", rc);
805         return rc;
806 }
807
808 static int record_base(const struct lu_env *env, struct llog_handle *llh,
809                      char *cfgname, lnet_nid_t nid, int cmd,
810                      char *s1, char *s2, char *s3, char *s4)
811 {
812         struct mgs_thread_info *mgi = mgs_env_info(env);
813         struct lustre_cfg     *lcfg;
814         int rc;
815
816         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
817                cmd, s1, s2, s3, s4);
818
819         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
820         if (s1)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
822         if (s2)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
824         if (s3)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
826         if (s4)
827                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
828
829         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
830         if (!lcfg)
831                 return -ENOMEM;
832         lcfg->lcfg_nid = nid;
833
834         rc = record_lcfg(env, llh, lcfg);
835
836         lustre_cfg_free(lcfg);
837
838         if (rc) {
839                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
840                        cmd, s1, s2, s3, s4);
841         }
842         return rc;
843 }
844
845 static inline int record_add_uuid(const struct lu_env *env,
846                                   struct llog_handle *llh,
847                                   uint64_t nid, char *uuid)
848 {
849         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
850 }
851
852 static inline int record_add_conn(const struct lu_env *env,
853                                   struct llog_handle *llh,
854                                   char *devname, char *uuid)
855 {
856         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
857 }
858
859 static inline int record_attach(const struct lu_env *env,
860                                 struct llog_handle *llh, char *devname,
861                                 char *type, char *uuid)
862 {
863         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
864 }
865
866 static inline int record_setup(const struct lu_env *env,
867                                struct llog_handle *llh, char *devname,
868                                char *s1, char *s2, char *s3, char *s4)
869 {
870         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
871 }
872
873 /**
874  * \retval <0 record processing error
875  * \retval n record is processed. No need copy original one.
876  * \retval 0 record is not processed.
877  */
878 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
879                            struct mgs_replace_uuid_lookup *mrul)
880 {
881         int nids_added = 0;
882         lnet_nid_t nid;
883         char *ptr;
884         int rc;
885
886         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
887                 /* LCFG_ADD_UUID command found. Let's skip original command
888                    and add passed nids */
889                 ptr = mrul->target.mti_params;
890                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
891                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
892                                "device %s\n", libcfs_nid2str(nid),
893                                 mrul->target.mti_params,
894                                 mrul->target.mti_svname);
895                         rc = record_add_uuid(env,
896                                              mrul->temp_llh, nid,
897                                              mrul->target.mti_params);
898                         if (!rc)
899                                 nids_added++;
900                 }
901
902                 if (nids_added == 0) {
903                         CERROR("No new nids were added, nid %s with uuid %s, "
904                                "device %s\n", libcfs_nid2str(nid),
905                                mrul->target.mti_params,
906                                mrul->target.mti_svname);
907                         RETURN(-ENXIO);
908                 } else {
909                         mrul->device_nids_added = 1;
910                 }
911
912                 return nids_added;
913         }
914
915         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
916                 /* LCFG_SETUP command found. UUID should be changed */
917                 rc = record_setup(env,
918                                   mrul->temp_llh,
919                                   /* devname the same */
920                                   lustre_cfg_string(lcfg, 0),
921                                   /* s1 is not changed */
922                                   lustre_cfg_string(lcfg, 1),
923                                   /* new uuid should be
924                                   the full nidlist */
925                                   mrul->target.mti_params,
926                                   /* s3 is not changed */
927                                   lustre_cfg_string(lcfg, 3),
928                                   /* s4 is not changed */
929                                   lustre_cfg_string(lcfg, 4));
930                 return rc ? rc : 1;
931         }
932
933         /* Another commands in target device block */
934         return 0;
935 }
936
937 /**
938  * Handler that called for every record in llog.
939  * Records are processed in order they placed in llog.
940  *
941  * \param[in] llh       log to be processed
942  * \param[in] rec       current record
943  * \param[in] data      mgs_replace_uuid_lookup structure
944  *
945  * \retval 0    success
946  */
947 static int mgs_replace_handler(const struct lu_env *env,
948                                struct llog_handle *llh,
949                                struct llog_rec_hdr *rec,
950                                void *data)
951 {
952         struct mgs_replace_uuid_lookup *mrul;
953         struct lustre_cfg *lcfg = REC_DATA(rec);
954         int cfg_len = REC_DATA_LEN(rec);
955         int rc;
956         ENTRY;
957
958         mrul = (struct mgs_replace_uuid_lookup *)data;
959
960         if (rec->lrh_type != OBD_CFG_REC) {
961                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
962                        rec->lrh_type, lcfg->lcfg_command,
963                        lustre_cfg_string(lcfg, 0),
964                        lustre_cfg_string(lcfg, 1));
965                 RETURN(-EINVAL);
966         }
967
968         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
969         if (rc) {
970                 /* Do not copy any invalidated records */
971                 GOTO(skip_out, rc = 0);
972         }
973
974         rc = check_markers(lcfg, mrul);
975         if (rc || mrul->skip_it)
976                 GOTO(skip_out, rc = 0);
977
978         /* Write to new log all commands outside target device block */
979         if (!mrul->in_target_device)
980                 GOTO(copy_out, rc = 0);
981
982         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
983            (failover nids) for this target, assuming that if then
984            primary is changing then so is the failover */
985         if (mrul->device_nids_added &&
986             (lcfg->lcfg_command == LCFG_ADD_UUID ||
987              lcfg->lcfg_command == LCFG_ADD_CONN))
988                 GOTO(skip_out, rc = 0);
989
990         rc = process_command(env, lcfg, mrul);
991         if (rc < 0)
992                 RETURN(rc);
993
994         if (rc)
995                 RETURN(0);
996 copy_out:
997         /* Record is placed in temporary llog as is */
998         rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1);
999
1000         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1001                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1002                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1003         RETURN(rc);
1004
1005 skip_out:
1006         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1007                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1008                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1009         RETURN(rc);
1010 }
1011
1012 static int mgs_log_is_empty(const struct lu_env *env,
1013                             struct mgs_device *mgs, char *name)
1014 {
1015         struct llog_ctxt        *ctxt;
1016         int                      rc;
1017
1018         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1019         LASSERT(ctxt != NULL);
1020
1021         rc = llog_is_empty(env, ctxt, name);
1022         llog_ctxt_put(ctxt);
1023         return rc;
1024 }
1025
1026 static int mgs_replace_nids_log(const struct lu_env *env,
1027                                 struct obd_device *mgs, struct fs_db *fsdb,
1028                                 char *logname, char *devname, char *nids)
1029 {
1030         struct llog_handle *orig_llh, *backup_llh;
1031         struct llog_ctxt *ctxt;
1032         struct mgs_replace_uuid_lookup *mrul;
1033         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1034         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1035         char *backup;
1036         int rc, rc2;
1037         ENTRY;
1038
1039         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1040
1041         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1042         LASSERT(ctxt != NULL);
1043
1044         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1045                 /* Log is empty. Nothing to replace */
1046                 GOTO(out_put, rc = 0);
1047         }
1048
1049         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1050         if (backup == NULL)
1051                 GOTO(out_put, rc = -ENOMEM);
1052
1053         sprintf(backup, "%s.bak", logname);
1054
1055         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1056         if (rc == 0) {
1057                 /* Now erase original log file. Connections are not allowed.
1058                    Backup is already saved */
1059                 rc = llog_erase(env, ctxt, NULL, logname);
1060                 if (rc < 0)
1061                         GOTO(out_free, rc);
1062         } else if (rc != -ENOENT) {
1063                 CERROR("%s: can't make backup for %s: rc = %d\n",
1064                        mgs->obd_name, logname, rc);
1065                 GOTO(out_free,rc);
1066         }
1067
1068         /* open local log */
1069         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1070         if (rc)
1071                 GOTO(out_restore, rc);
1072
1073         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1074         if (rc)
1075                 GOTO(out_closel, rc);
1076
1077         /* open backup llog */
1078         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1079                        LLOG_OPEN_EXISTS);
1080         if (rc)
1081                 GOTO(out_closel, rc);
1082
1083         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1084         if (rc)
1085                 GOTO(out_close, rc);
1086
1087         if (llog_get_size(backup_llh) <= 1)
1088                 GOTO(out_close, rc = 0);
1089
1090         OBD_ALLOC_PTR(mrul);
1091         if (!mrul)
1092                 GOTO(out_close, rc = -ENOMEM);
1093         /* devname is only needed information to replace UUID records */
1094         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1095         /* parse nids later */
1096         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1097         /* Copy records to this temporary llog */
1098         mrul->temp_llh = orig_llh;
1099
1100         rc = llog_process(env, backup_llh, mgs_replace_handler,
1101                           (void *)mrul, NULL);
1102         OBD_FREE_PTR(mrul);
1103 out_close:
1104         rc2 = llog_close(NULL, backup_llh);
1105         if (!rc)
1106                 rc = rc2;
1107 out_closel:
1108         rc2 = llog_close(NULL, orig_llh);
1109         if (!rc)
1110                 rc = rc2;
1111
1112 out_restore:
1113         if (rc) {
1114                 CERROR("%s: llog should be restored: rc = %d\n",
1115                        mgs->obd_name, rc);
1116                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1117                                   logname);
1118                 if (rc2 < 0)
1119                         CERROR("%s: can't restore backup %s: rc = %d\n",
1120                                mgs->obd_name, logname, rc2);
1121         }
1122
1123 out_free:
1124         OBD_FREE(backup, strlen(backup) + 1);
1125
1126 out_put:
1127         llog_ctxt_put(ctxt);
1128
1129         if (rc)
1130                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1131                        mgs->obd_name, logname, rc);
1132
1133         RETURN(rc);
1134 }
1135
1136 /**
1137  * Parse device name and get file system name and/or device index
1138  *
1139  * \param[in]   devname device name (ex. lustre-MDT0000)
1140  * \param[out]  fsname  file system name(optional)
1141  * \param[out]  index   device index(optional)
1142  *
1143  * \retval 0    success
1144  */
1145 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1146 {
1147         int rc;
1148         ENTRY;
1149
1150         /* Extract fsname */
1151         if (fsname) {
1152                 rc = server_name2fsname(devname, fsname, NULL);
1153                 if (rc < 0) {
1154                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1155                                devname);
1156                         RETURN(-EINVAL);
1157                 }
1158         }
1159
1160         if (index) {
1161                 rc = server_name2index(devname, index, NULL);
1162                 if (rc < 0) {
1163                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1164                                devname);
1165                         RETURN(-EINVAL);
1166                 }
1167         }
1168
1169         RETURN(0);
1170 }
1171
1172 static int only_mgs_is_running(struct obd_device *mgs_obd)
1173 {
1174         /* TDB: Is global variable with devices count exists? */
1175         int num_devices = get_devices_count();
1176         /* osd, MGS and MGC + self_export
1177            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1178         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1179 }
1180
1181 static int name_create_mdt(char **logname, char *fsname, int i)
1182 {
1183         char mdt_index[9];
1184
1185         sprintf(mdt_index, "-MDT%04x", i);
1186         return name_create(logname, fsname, mdt_index);
1187 }
1188
1189 /**
1190  * Replace nids for \a device to \a nids values
1191  *
1192  * \param obd           MGS obd device
1193  * \param devname       nids need to be replaced for this device
1194  * (ex. lustre-OST0000)
1195  * \param nids          nids list (ex. nid1,nid2,nid3)
1196  *
1197  * \retval 0    success
1198  */
1199 int mgs_replace_nids(const struct lu_env *env,
1200                      struct mgs_device *mgs,
1201                      char *devname, char *nids)
1202 {
1203         /* Assume fsname is part of device name */
1204         char fsname[MTI_NAME_MAXLEN];
1205         int rc;
1206         __u32 index;
1207         char *logname;
1208         struct fs_db *fsdb;
1209         unsigned int i;
1210         int conn_state;
1211         struct obd_device *mgs_obd = mgs->mgs_obd;
1212         ENTRY;
1213
1214         /* We can only change NIDs if no other nodes are connected */
1215         spin_lock(&mgs_obd->obd_dev_lock);
1216         conn_state = mgs_obd->obd_no_conn;
1217         mgs_obd->obd_no_conn = 1;
1218         spin_unlock(&mgs_obd->obd_dev_lock);
1219
1220         /* We can not change nids if not only MGS is started */
1221         if (!only_mgs_is_running(mgs_obd)) {
1222                 CERROR("Only MGS is allowed to be started\n");
1223                 GOTO(out, rc = -EINPROGRESS);
1224         }
1225
1226         /* Get fsname and index*/
1227         rc = mgs_parse_devname(devname, fsname, &index);
1228         if (rc)
1229                 GOTO(out, rc);
1230
1231         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1232         if (rc) {
1233                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1234                 GOTO(out, rc);
1235         }
1236
1237         /* Process client llogs */
1238         name_create(&logname, fsname, "-client");
1239         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1240         name_destroy(&logname);
1241         if (rc) {
1242                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1243                        fsname, devname, rc);
1244                 GOTO(out, rc);
1245         }
1246
1247         /* Process MDT llogs */
1248         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1249                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1250                         continue;
1251                 name_create_mdt(&logname, fsname, i);
1252                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1253                 name_destroy(&logname);
1254                 if (rc)
1255                         GOTO(out, rc);
1256         }
1257
1258 out:
1259         spin_lock(&mgs_obd->obd_dev_lock);
1260         mgs_obd->obd_no_conn = conn_state;
1261         spin_unlock(&mgs_obd->obd_dev_lock);
1262
1263         RETURN(rc);
1264 }
1265
1266 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1267                             char *devname, struct lov_desc *desc)
1268 {
1269         struct mgs_thread_info *mgi = mgs_env_info(env);
1270         struct lustre_cfg *lcfg;
1271         int rc;
1272
1273         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1274         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1275         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1276         if (!lcfg)
1277                 return -ENOMEM;
1278         rc = record_lcfg(env, llh, lcfg);
1279
1280         lustre_cfg_free(lcfg);
1281         return rc;
1282 }
1283
1284 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1285                             char *devname, struct lmv_desc *desc)
1286 {
1287         struct mgs_thread_info *mgi = mgs_env_info(env);
1288         struct lustre_cfg *lcfg;
1289         int rc;
1290
1291         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1292         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1293         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1294
1295         rc = record_lcfg(env, llh, lcfg);
1296
1297         lustre_cfg_free(lcfg);
1298         return rc;
1299 }
1300
1301 static inline int record_mdc_add(const struct lu_env *env,
1302                                  struct llog_handle *llh,
1303                                  char *logname, char *mdcuuid,
1304                                  char *mdtuuid, char *index,
1305                                  char *gen)
1306 {
1307         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1308                            mdtuuid,index,gen,mdcuuid);
1309 }
1310
1311 static inline int record_lov_add(const struct lu_env *env,
1312                                  struct llog_handle *llh,
1313                                  char *lov_name, char *ost_uuid,
1314                                  char *index, char *gen)
1315 {
1316         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1317                            ost_uuid, index, gen, 0);
1318 }
1319
1320 static inline int record_mount_opt(const struct lu_env *env,
1321                                    struct llog_handle *llh,
1322                                    char *profile, char *lov_name,
1323                                    char *mdc_name)
1324 {
1325         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1326                            profile,lov_name,mdc_name,0);
1327 }
1328
1329 static int record_marker(const struct lu_env *env,
1330                          struct llog_handle *llh,
1331                          struct fs_db *fsdb, __u32 flags,
1332                          char *tgtname, char *comment)
1333 {
1334         struct mgs_thread_info *mgi = mgs_env_info(env);
1335         struct lustre_cfg *lcfg;
1336         int rc;
1337         int cplen = 0;
1338
1339         if (flags & CM_START)
1340                 fsdb->fsdb_gen++;
1341         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1342         mgi->mgi_marker.cm_flags = flags;
1343         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1344         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1345                         sizeof(mgi->mgi_marker.cm_tgtname));
1346         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1347                 return -E2BIG;
1348         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1349                         sizeof(mgi->mgi_marker.cm_comment));
1350         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1351                 return -E2BIG;
1352         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1353         mgi->mgi_marker.cm_canceltime = 0;
1354         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1355         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1356                             sizeof(mgi->mgi_marker));
1357         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1358         if (!lcfg)
1359                 return -ENOMEM;
1360         rc = record_lcfg(env, llh, lcfg);
1361
1362         lustre_cfg_free(lcfg);
1363         return rc;
1364 }
1365
1366 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1367                             struct llog_handle **llh, char *name)
1368 {
1369         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1370         struct llog_ctxt        *ctxt;
1371         int                      rc = 0;
1372
1373         if (*llh)
1374                 GOTO(out, rc = -EBUSY);
1375
1376         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1377         if (!ctxt)
1378                 GOTO(out, rc = -ENODEV);
1379         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1380
1381         rc = llog_open_create(env, ctxt, llh, NULL, name);
1382         if (rc)
1383                 GOTO(out_ctxt, rc);
1384         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1385         if (rc)
1386                 llog_close(env, *llh);
1387 out_ctxt:
1388         llog_ctxt_put(ctxt);
1389 out:
1390         if (rc) {
1391                 CERROR("%s: can't start log %s: rc = %d\n",
1392                        mgs->mgs_obd->obd_name, name, rc);
1393                 *llh = NULL;
1394         }
1395         RETURN(rc);
1396 }
1397
1398 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1399 {
1400         int rc;
1401
1402         rc = llog_close(env, *llh);
1403         *llh = NULL;
1404
1405         return rc;
1406 }
1407
1408 /******************** config "macros" *********************/
1409
1410 /* write an lcfg directly into a log (with markers) */
1411 static int mgs_write_log_direct(const struct lu_env *env,
1412                                 struct mgs_device *mgs, struct fs_db *fsdb,
1413                                 char *logname, struct lustre_cfg *lcfg,
1414                                 char *devname, char *comment)
1415 {
1416         struct llog_handle *llh = NULL;
1417         int rc;
1418         ENTRY;
1419
1420         if (!lcfg)
1421                 RETURN(-ENOMEM);
1422
1423         rc = record_start_log(env, mgs, &llh, logname);
1424         if (rc)
1425                 RETURN(rc);
1426
1427         /* FIXME These should be a single journal transaction */
1428         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1429         if (rc)
1430                 GOTO(out_end, rc);
1431         rc = record_lcfg(env, llh, lcfg);
1432         if (rc)
1433                 GOTO(out_end, rc);
1434         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1435         if (rc)
1436                 GOTO(out_end, rc);
1437 out_end:
1438         record_end_log(env, &llh);
1439         RETURN(rc);
1440 }
1441
1442 /* write the lcfg in all logs for the given fs */
1443 int mgs_write_log_direct_all(const struct lu_env *env,
1444                              struct mgs_device *mgs,
1445                              struct fs_db *fsdb,
1446                              struct mgs_target_info *mti,
1447                              struct lustre_cfg *lcfg,
1448                              char *devname, char *comment,
1449                              int server_only)
1450 {
1451         cfs_list_t list;
1452         struct mgs_direntry *dirent, *n;
1453         char *fsname = mti->mti_fsname;
1454         char *logname;
1455         int rc = 0, len = strlen(fsname);
1456         ENTRY;
1457
1458         /* We need to set params for any future logs
1459            as well. FIXME Append this file to every new log.
1460            Actually, we should store as params (text), not llogs.  Or
1461            in a database. */
1462         rc = name_create(&logname, fsname, "-params");
1463         if (rc)
1464                 RETURN(rc);
1465         if (mgs_log_is_empty(env, mgs, logname)) {
1466                 struct llog_handle *llh = NULL;
1467                 rc = record_start_log(env, mgs, &llh, logname);
1468                 if (rc == 0)
1469                         record_end_log(env, &llh);
1470         }
1471         name_destroy(&logname);
1472         if (rc)
1473                 RETURN(rc);
1474
1475         /* Find all the logs in the CONFIGS directory */
1476         rc = class_dentry_readdir(env, mgs, &list);
1477         if (rc)
1478                 RETURN(rc);
1479
1480         /* Could use fsdb index maps instead of directory listing */
1481         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1482                 cfs_list_del(&dirent->list);
1483                 /* don't write to sptlrpc rule log */
1484                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1485                         goto next;
1486
1487                 /* caller wants write server logs only */
1488                 if (server_only && strstr(dirent->name, "-client") != NULL)
1489                         goto next;
1490
1491                 if (strncmp(fsname, dirent->name, len) == 0) {
1492                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1493                         /* Erase any old settings of this same parameter */
1494                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1495                                         devname, comment, CM_SKIP);
1496                         if (rc < 0)
1497                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1498                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1499                         /* Write the new one */
1500                         if (lcfg) {
1501                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1502                                                           dirent->name,
1503                                                           lcfg, devname,
1504                                                           comment);
1505                                 if (rc)
1506                                         CERROR("%s: writing log %s: rc = %d\n",
1507                                                mgs->mgs_obd->obd_name,
1508                                                dirent->name, rc);
1509                         }
1510                 }
1511 next:
1512                 mgs_direntry_free(dirent);
1513         }
1514
1515         RETURN(rc);
1516 }
1517
1518 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1519                                     struct mgs_device *mgs,
1520                                     struct fs_db *fsdb,
1521                                     struct mgs_target_info *mti,
1522                                     int index, char *logname);
1523 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1524                                     struct mgs_device *mgs,
1525                                     struct fs_db *fsdb,
1526                                     struct mgs_target_info *mti,
1527                                     char *logname, char *suffix, char *lovname,
1528                                     enum lustre_sec_part sec_part, int flags);
1529 static int name_create_mdt_and_lov(char **logname, char **lovname,
1530                                    struct fs_db *fsdb, int i);
1531
1532 static int add_param(char *params, char *key, char *val)
1533 {
1534         char *start = params + strlen(params);
1535         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1536         int keylen = 0;
1537
1538         if (key != NULL)
1539                 keylen = strlen(key);
1540         if (start + 1 + keylen + strlen(val) >= end) {
1541                 CERROR("params are too long: %s %s%s\n",
1542                        params, key != NULL ? key : "", val);
1543                 return -EINVAL;
1544         }
1545
1546         sprintf(start, " %s%s", key != NULL ? key : "", val);
1547         return 0;
1548 }
1549
1550 /**
1551  * Walk through client config log record and convert the related records
1552  * into the target.
1553  **/
1554 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1555                                          struct llog_handle *llh,
1556                                          struct llog_rec_hdr *rec, void *data)
1557 {
1558         struct mgs_device *mgs;
1559         struct obd_device *obd;
1560         struct mgs_target_info *mti, *tmti;
1561         struct fs_db *fsdb;
1562         int cfg_len = rec->lrh_len;
1563         char *cfg_buf = (char*) (rec + 1);
1564         struct lustre_cfg *lcfg;
1565         int rc = 0;
1566         struct llog_handle *mdt_llh = NULL;
1567         static int got_an_osc_or_mdc = 0;
1568         /* 0: not found any osc/mdc;
1569            1: found osc;
1570            2: found mdc;
1571         */
1572         static int last_step = -1;
1573         int cplen = 0;
1574
1575         ENTRY;
1576
1577         mti = ((struct temp_comp*)data)->comp_mti;
1578         tmti = ((struct temp_comp*)data)->comp_tmti;
1579         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1580         obd = ((struct temp_comp *)data)->comp_obd;
1581         mgs = lu2mgs_dev(obd->obd_lu_dev);
1582         LASSERT(mgs);
1583
1584         if (rec->lrh_type != OBD_CFG_REC) {
1585                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1586                 RETURN(-EINVAL);
1587         }
1588
1589         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1590         if (rc) {
1591                 CERROR("Insane cfg\n");
1592                 RETURN(rc);
1593         }
1594
1595         lcfg = (struct lustre_cfg *)cfg_buf;
1596
1597         if (lcfg->lcfg_command == LCFG_MARKER) {
1598                 struct cfg_marker *marker;
1599                 marker = lustre_cfg_buf(lcfg, 1);
1600                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1601                     (marker->cm_flags & CM_START) &&
1602                      !(marker->cm_flags & CM_SKIP)) {
1603                         got_an_osc_or_mdc = 1;
1604                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1605                                         sizeof(tmti->mti_svname));
1606                         if (cplen >= sizeof(tmti->mti_svname))
1607                                 RETURN(-E2BIG);
1608                         rc = record_start_log(env, mgs, &mdt_llh,
1609                                               mti->mti_svname);
1610                         if (rc)
1611                                 RETURN(rc);
1612                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1613                                            mti->mti_svname, "add osc(copied)");
1614                         record_end_log(env, &mdt_llh);
1615                         last_step = marker->cm_step;
1616                         RETURN(rc);
1617                 }
1618                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1619                     (marker->cm_flags & CM_END) &&
1620                      !(marker->cm_flags & CM_SKIP)) {
1621                         LASSERT(last_step == marker->cm_step);
1622                         last_step = -1;
1623                         got_an_osc_or_mdc = 0;
1624                         memset(tmti, 0, sizeof(*tmti));
1625                         rc = record_start_log(env, mgs, &mdt_llh,
1626                                               mti->mti_svname);
1627                         if (rc)
1628                                 RETURN(rc);
1629                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1630                                            mti->mti_svname, "add osc(copied)");
1631                         record_end_log(env, &mdt_llh);
1632                         RETURN(rc);
1633                 }
1634                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1635                     (marker->cm_flags & CM_START) &&
1636                      !(marker->cm_flags & CM_SKIP)) {
1637                         got_an_osc_or_mdc = 2;
1638                         last_step = marker->cm_step;
1639                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1640                                strlen(marker->cm_tgtname));
1641
1642                         RETURN(rc);
1643                 }
1644                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1645                     (marker->cm_flags & CM_END) &&
1646                      !(marker->cm_flags & CM_SKIP)) {
1647                         LASSERT(last_step == marker->cm_step);
1648                         last_step = -1;
1649                         got_an_osc_or_mdc = 0;
1650                         memset(tmti, 0, sizeof(*tmti));
1651                         RETURN(rc);
1652                 }
1653         }
1654
1655         if (got_an_osc_or_mdc == 0 || last_step < 0)
1656                 RETURN(rc);
1657
1658         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1659                 uint64_t nodenid = lcfg->lcfg_nid;
1660
1661                 if (strlen(tmti->mti_uuid) == 0) {
1662                         /* target uuid not set, this config record is before
1663                          * LCFG_SETUP, this nid is one of target node nid.
1664                          */
1665                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1666                         tmti->mti_nid_count++;
1667                 } else {
1668                         /* failover node nid */
1669                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1670                                        libcfs_nid2str(nodenid));
1671                 }
1672
1673                 RETURN(rc);
1674         }
1675
1676         if (lcfg->lcfg_command == LCFG_SETUP) {
1677                 char *target;
1678
1679                 target = lustre_cfg_string(lcfg, 1);
1680                 memcpy(tmti->mti_uuid, target, strlen(target));
1681                 RETURN(rc);
1682         }
1683
1684         /* ignore client side sptlrpc_conf_log */
1685         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1686                 RETURN(rc);
1687
1688         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1689                 int index;
1690
1691                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1692                         RETURN (-EINVAL);
1693
1694                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1695                        strlen(mti->mti_fsname));
1696                 tmti->mti_stripe_index = index;
1697
1698                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1699                                               mti->mti_stripe_index,
1700                                               mti->mti_svname);
1701                 memset(tmti, 0, sizeof(*tmti));
1702                 RETURN(rc);
1703         }
1704
1705         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1706                 int index;
1707                 char mdt_index[9];
1708                 char *logname, *lovname;
1709
1710                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1711                                              mti->mti_stripe_index);
1712                 if (rc)
1713                         RETURN(rc);
1714                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1715
1716                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1717                         name_destroy(&logname);
1718                         name_destroy(&lovname);
1719                         RETURN(-EINVAL);
1720                 }
1721
1722                 tmti->mti_stripe_index = index;
1723                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1724                                          mdt_index, lovname,
1725                                          LUSTRE_SP_MDT, 0);
1726                 name_destroy(&logname);
1727                 name_destroy(&lovname);
1728                 RETURN(rc);
1729         }
1730         RETURN(rc);
1731 }
1732
1733 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1734 /* stealed from mgs_get_fsdb_from_llog*/
1735 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1736                                               struct mgs_device *mgs,
1737                                               char *client_name,
1738                                               struct temp_comp* comp)
1739 {
1740         struct llog_handle *loghandle;
1741         struct mgs_target_info *tmti;
1742         struct llog_ctxt *ctxt;
1743         int rc;
1744
1745         ENTRY;
1746
1747         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1748         LASSERT(ctxt != NULL);
1749
1750         OBD_ALLOC_PTR(tmti);
1751         if (tmti == NULL)
1752                 GOTO(out_ctxt, rc = -ENOMEM);
1753
1754         comp->comp_tmti = tmti;
1755         comp->comp_obd = mgs->mgs_obd;
1756
1757         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1758                        LLOG_OPEN_EXISTS);
1759         if (rc < 0) {
1760                 if (rc == -ENOENT)
1761                         rc = 0;
1762                 GOTO(out_pop, rc);
1763         }
1764
1765         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1766         if (rc)
1767                 GOTO(out_close, rc);
1768
1769         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1770                                   (void *)comp, NULL, false);
1771         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1772 out_close:
1773         llog_close(env, loghandle);
1774 out_pop:
1775         OBD_FREE_PTR(tmti);
1776 out_ctxt:
1777         llog_ctxt_put(ctxt);
1778         RETURN(rc);
1779 }
1780
1781 /* lmv is the second thing for client logs */
1782 /* copied from mgs_write_log_lov. Please refer to that.  */
1783 static int mgs_write_log_lmv(const struct lu_env *env,
1784                              struct mgs_device *mgs,
1785                              struct fs_db *fsdb,
1786                              struct mgs_target_info *mti,
1787                              char *logname, char *lmvname)
1788 {
1789         struct llog_handle *llh = NULL;
1790         struct lmv_desc *lmvdesc;
1791         char *uuid;
1792         int rc = 0;
1793         ENTRY;
1794
1795         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1796
1797         OBD_ALLOC_PTR(lmvdesc);
1798         if (lmvdesc == NULL)
1799                 RETURN(-ENOMEM);
1800         lmvdesc->ld_active_tgt_count = 0;
1801         lmvdesc->ld_tgt_count = 0;
1802         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1803         uuid = (char *)lmvdesc->ld_uuid.uuid;
1804
1805         rc = record_start_log(env, mgs, &llh, logname);
1806         if (rc)
1807                 GOTO(out_free, rc);
1808         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1809         if (rc)
1810                 GOTO(out_end, rc);
1811         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1812         if (rc)
1813                 GOTO(out_end, rc);
1814         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1815         if (rc)
1816                 GOTO(out_end, rc);
1817         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1818         if (rc)
1819                 GOTO(out_end, rc);
1820 out_end:
1821         record_end_log(env, &llh);
1822 out_free:
1823         OBD_FREE_PTR(lmvdesc);
1824         RETURN(rc);
1825 }
1826
1827 /* lov is the first thing in the mdt and client logs */
1828 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1829                              struct fs_db *fsdb, struct mgs_target_info *mti,
1830                              char *logname, char *lovname)
1831 {
1832         struct llog_handle *llh = NULL;
1833         struct lov_desc *lovdesc;
1834         char *uuid;
1835         int rc = 0;
1836         ENTRY;
1837
1838         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1839
1840         /*
1841         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1842         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1843               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1844         */
1845
1846         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1847         OBD_ALLOC_PTR(lovdesc);
1848         if (lovdesc == NULL)
1849                 RETURN(-ENOMEM);
1850         lovdesc->ld_magic = LOV_DESC_MAGIC;
1851         lovdesc->ld_tgt_count = 0;
1852         /* Defaults.  Can be changed later by lcfg config_param */
1853         lovdesc->ld_default_stripe_count = 1;
1854         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1855         lovdesc->ld_default_stripe_size = 1024 * 1024;
1856         lovdesc->ld_default_stripe_offset = -1;
1857         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1858         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1859         /* can these be the same? */
1860         uuid = (char *)lovdesc->ld_uuid.uuid;
1861
1862         /* This should always be the first entry in a log.
1863         rc = mgs_clear_log(obd, logname); */
1864         rc = record_start_log(env, mgs, &llh, logname);
1865         if (rc)
1866                 GOTO(out_free, rc);
1867         /* FIXME these should be a single journal transaction */
1868         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1869         if (rc)
1870                 GOTO(out_end, rc);
1871         rc = record_attach(env, llh, lovname, "lov", uuid);
1872         if (rc)
1873                 GOTO(out_end, rc);
1874         rc = record_lov_setup(env, llh, lovname, lovdesc);
1875         if (rc)
1876                 GOTO(out_end, rc);
1877         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1878         if (rc)
1879                 GOTO(out_end, rc);
1880         EXIT;
1881 out_end:
1882         record_end_log(env, &llh);
1883 out_free:
1884         OBD_FREE_PTR(lovdesc);
1885         return rc;
1886 }
1887
1888 /* add failnids to open log */
1889 static int mgs_write_log_failnids(const struct lu_env *env,
1890                                   struct mgs_target_info *mti,
1891                                   struct llog_handle *llh,
1892                                   char *cliname)
1893 {
1894         char *failnodeuuid = NULL;
1895         char *ptr = mti->mti_params;
1896         lnet_nid_t nid;
1897         int rc = 0;
1898
1899         /*
1900         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1901         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1902         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1903         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1904         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1905         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1906         */
1907
1908         /* Pull failnid info out of params string */
1909         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1910                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1911                         if (failnodeuuid == NULL) {
1912                                 /* We don't know the failover node name,
1913                                    so just use the first nid as the uuid */
1914                                 rc = name_create(&failnodeuuid,
1915                                                  libcfs_nid2str(nid), "");
1916                                 if (rc)
1917                                         return rc;
1918                         }
1919                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1920                                "client %s\n", libcfs_nid2str(nid),
1921                                failnodeuuid, cliname);
1922                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1923                 }
1924                 if (failnodeuuid)
1925                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1926         }
1927
1928         name_destroy(&failnodeuuid);
1929         return rc;
1930 }
1931
1932 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1933                                     struct mgs_device *mgs,
1934                                     struct fs_db *fsdb,
1935                                     struct mgs_target_info *mti,
1936                                     char *logname, char *lmvname)
1937 {
1938         struct llog_handle *llh = NULL;
1939         char *mdcname = NULL;
1940         char *nodeuuid = NULL;
1941         char *mdcuuid = NULL;
1942         char *lmvuuid = NULL;
1943         char index[6];
1944         int i, rc;
1945         ENTRY;
1946
1947         if (mgs_log_is_empty(env, mgs, logname)) {
1948                 CERROR("log is empty! Logical error\n");
1949                 RETURN(-EINVAL);
1950         }
1951
1952         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1953                mti->mti_svname, logname, lmvname);
1954
1955         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1956         if (rc)
1957                 RETURN(rc);
1958         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1959         if (rc)
1960                 GOTO(out_free, rc);
1961         rc = name_create(&mdcuuid, mdcname, "_UUID");
1962         if (rc)
1963                 GOTO(out_free, rc);
1964         rc = name_create(&lmvuuid, lmvname, "_UUID");
1965         if (rc)
1966                 GOTO(out_free, rc);
1967
1968         rc = record_start_log(env, mgs, &llh, logname);
1969         if (rc)
1970                 GOTO(out_free, rc);
1971         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1972                            "add mdc");
1973         if (rc)
1974                 GOTO(out_end, rc);
1975         for (i = 0; i < mti->mti_nid_count; i++) {
1976                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1977                        libcfs_nid2str(mti->mti_nids[i]));
1978
1979                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1980                 if (rc)
1981                         GOTO(out_end, rc);
1982         }
1983
1984         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1985         if (rc)
1986                 GOTO(out_end, rc);
1987         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1988         if (rc)
1989                 GOTO(out_end, rc);
1990         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1991         if (rc)
1992                 GOTO(out_end, rc);
1993         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1994         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1995                             index, "1");
1996         if (rc)
1997                 GOTO(out_end, rc);
1998         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1999                            "add mdc");
2000         if (rc)
2001                 GOTO(out_end, rc);
2002 out_end:
2003         record_end_log(env, &llh);
2004 out_free:
2005         name_destroy(&lmvuuid);
2006         name_destroy(&mdcuuid);
2007         name_destroy(&mdcname);
2008         name_destroy(&nodeuuid);
2009         RETURN(rc);
2010 }
2011
2012 static inline int name_create_lov(char **lovname, char *mdtname,
2013                                   struct fs_db *fsdb, int index)
2014 {
2015         /* COMPAT_180 */
2016         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2017                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2018         else
2019                 return name_create(lovname, mdtname, "-mdtlov");
2020 }
2021
2022 static int name_create_mdt_and_lov(char **logname, char **lovname,
2023                                    struct fs_db *fsdb, int i)
2024 {
2025         int rc;
2026
2027         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2028         if (rc)
2029                 return rc;
2030         /* COMPAT_180 */
2031         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2032                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2033         else
2034                 rc = name_create(lovname, *logname, "-mdtlov");
2035         if (rc) {
2036                 name_destroy(logname);
2037                 *logname = NULL;
2038         }
2039         return rc;
2040 }
2041
2042 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2043                                       struct fs_db *fsdb, int i)
2044 {
2045         char suffix[16];
2046
2047         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2048                 sprintf(suffix, "-osc");
2049         else
2050                 sprintf(suffix, "-osc-MDT%04x", i);
2051         return name_create(oscname, ostname, suffix);
2052 }
2053
2054 /* add new mdc to already existent MDS */
2055 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2056                                     struct mgs_device *mgs,
2057                                     struct fs_db *fsdb,
2058                                     struct mgs_target_info *mti,
2059                                     int mdt_index, char *logname)
2060 {
2061         struct llog_handle      *llh = NULL;
2062         char    *nodeuuid = NULL;
2063         char    *ospname = NULL;
2064         char    *lovuuid = NULL;
2065         char    *mdtuuid = NULL;
2066         char    *svname = NULL;
2067         char    *mdtname = NULL;
2068         char    *lovname = NULL;
2069         char    index_str[16];
2070         int     i, rc;
2071
2072         ENTRY;
2073         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2074                 CERROR("log is empty! Logical error\n");
2075                 RETURN (-EINVAL);
2076         }
2077
2078         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2079                logname);
2080
2081         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2082         if (rc)
2083                 RETURN(rc);
2084
2085         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2086         if (rc)
2087                 GOTO(out_destory, rc);
2088
2089         rc = name_create(&svname, mdtname, "-osp");
2090         if (rc)
2091                 GOTO(out_destory, rc);
2092
2093         sprintf(index_str, "-MDT%04x", mdt_index);
2094         rc = name_create(&ospname, svname, index_str);
2095         if (rc)
2096                 GOTO(out_destory, rc);
2097
2098         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2099         if (rc)
2100                 GOTO(out_destory, rc);
2101
2102         rc = name_create(&lovuuid, lovname, "_UUID");
2103         if (rc)
2104                 GOTO(out_destory, rc);
2105
2106         rc = name_create(&mdtuuid, mdtname, "_UUID");
2107         if (rc)
2108                 GOTO(out_destory, rc);
2109
2110         rc = record_start_log(env, mgs, &llh, logname);
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2115                            "add osp");
2116         if (rc)
2117                 GOTO(out_destory, rc);
2118
2119         for (i = 0; i < mti->mti_nid_count; i++) {
2120                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2121                        libcfs_nid2str(mti->mti_nids[i]));
2122                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2123                 if (rc)
2124                         GOTO(out_end, rc);
2125         }
2126
2127         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2128         if (rc)
2129                 GOTO(out_end, rc);
2130
2131         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2132                           NULL, NULL);
2133         if (rc)
2134                 GOTO(out_end, rc);
2135
2136         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2137         if (rc)
2138                 GOTO(out_end, rc);
2139
2140         /* Add mdc(osp) to lod */
2141         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2142                  mti->mti_stripe_index);
2143         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2144                          index_str, "1", NULL);
2145         if (rc)
2146                 GOTO(out_end, rc);
2147
2148         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2149         if (rc)
2150                 GOTO(out_end, rc);
2151
2152 out_end:
2153         record_end_log(env, &llh);
2154
2155 out_destory:
2156         name_destroy(&mdtuuid);
2157         name_destroy(&lovuuid);
2158         name_destroy(&lovname);
2159         name_destroy(&ospname);
2160         name_destroy(&svname);
2161         name_destroy(&nodeuuid);
2162         name_destroy(&mdtname);
2163         RETURN(rc);
2164 }
2165
2166 static int mgs_write_log_mdt0(const struct lu_env *env,
2167                               struct mgs_device *mgs,
2168                               struct fs_db *fsdb,
2169                               struct mgs_target_info *mti)
2170 {
2171         char *log = mti->mti_svname;
2172         struct llog_handle *llh = NULL;
2173         char *uuid, *lovname;
2174         char mdt_index[6];
2175         char *ptr = mti->mti_params;
2176         int rc = 0, failout = 0;
2177         ENTRY;
2178
2179         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2180         if (uuid == NULL)
2181                 RETURN(-ENOMEM);
2182
2183         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2184                 failout = (strncmp(ptr, "failout", 7) == 0);
2185
2186         rc = name_create(&lovname, log, "-mdtlov");
2187         if (rc)
2188                 GOTO(out_free, rc);
2189         if (mgs_log_is_empty(env, mgs, log)) {
2190                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2191                 if (rc)
2192                         GOTO(out_lod, rc);
2193         }
2194
2195         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2196
2197         rc = record_start_log(env, mgs, &llh, log);
2198         if (rc)
2199                 GOTO(out_lod, rc);
2200
2201         /* add MDT itself */
2202
2203         /* FIXME this whole fn should be a single journal transaction */
2204         sprintf(uuid, "%s_UUID", log);
2205         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2206         if (rc)
2207                 GOTO(out_lod, rc);
2208         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2209         if (rc)
2210                 GOTO(out_end, rc);
2211         rc = record_mount_opt(env, llh, log, lovname, NULL);
2212         if (rc)
2213                 GOTO(out_end, rc);
2214         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2215                         failout ? "n" : "f");
2216         if (rc)
2217                 GOTO(out_end, rc);
2218         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2219         if (rc)
2220                 GOTO(out_end, rc);
2221 out_end:
2222         record_end_log(env, &llh);
2223 out_lod:
2224         name_destroy(&lovname);
2225 out_free:
2226         OBD_FREE(uuid, sizeof(struct obd_uuid));
2227         RETURN(rc);
2228 }
2229
2230 /* envelope method for all layers log */
2231 static int mgs_write_log_mdt(const struct lu_env *env,
2232                              struct mgs_device *mgs,
2233                              struct fs_db *fsdb,
2234                              struct mgs_target_info *mti)
2235 {
2236         struct mgs_thread_info *mgi = mgs_env_info(env);
2237         struct llog_handle *llh = NULL;
2238         char *cliname;
2239         int rc, i = 0;
2240         ENTRY;
2241
2242         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2243
2244         if (mti->mti_uuid[0] == '\0') {
2245                 /* Make up our own uuid */
2246                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2247                          "%s_UUID", mti->mti_svname);
2248         }
2249
2250         /* add mdt */
2251         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2252         if (rc)
2253                 RETURN(rc);
2254         /* Append the mdt info to the client log */
2255         rc = name_create(&cliname, mti->mti_fsname, "-client");
2256         if (rc)
2257                 RETURN(rc);
2258
2259         if (mgs_log_is_empty(env, mgs, cliname)) {
2260                 /* Start client log */
2261                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2262                                        fsdb->fsdb_clilov);
2263                 if (rc)
2264                         GOTO(out_free, rc);
2265                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2266                                        fsdb->fsdb_clilmv);
2267                 if (rc)
2268                         GOTO(out_free, rc);
2269         }
2270
2271         /*
2272         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2273         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2274         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2275         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2276         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2277         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2278         */
2279
2280                 /* copy client info about lov/lmv */
2281                 mgi->mgi_comp.comp_mti = mti;
2282                 mgi->mgi_comp.comp_fsdb = fsdb;
2283
2284                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2285                                                         &mgi->mgi_comp);
2286                 if (rc)
2287                         GOTO(out_free, rc);
2288                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2289                                               fsdb->fsdb_clilmv);
2290                 if (rc)
2291                         GOTO(out_free, rc);
2292
2293                 /* add mountopts */
2294                 rc = record_start_log(env, mgs, &llh, cliname);
2295                 if (rc)
2296                         GOTO(out_free, rc);
2297
2298                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2299                                    "mount opts");
2300                 if (rc)
2301                         GOTO(out_end, rc);
2302                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2303                                       fsdb->fsdb_clilmv);
2304                 if (rc)
2305                         GOTO(out_end, rc);
2306                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2307                                    "mount opts");
2308
2309         if (rc)
2310                 GOTO(out_end, rc);
2311
2312         /* for_all_existing_mdt except current one */
2313         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2314                 if (i !=  mti->mti_stripe_index &&
2315                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2316                         char *logname;
2317
2318                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2319                         if (rc)
2320                                 GOTO(out_end, rc);
2321
2322                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2323                                                       i, logname);
2324                         name_destroy(&logname);
2325                         if (rc)
2326                                 GOTO(out_end, rc);
2327                 }
2328         }
2329 out_end:
2330         record_end_log(env, &llh);
2331 out_free:
2332         name_destroy(&cliname);
2333         RETURN(rc);
2334 }
2335
2336 /* Add the ost info to the client/mdt lov */
2337 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2338                                     struct mgs_device *mgs, struct fs_db *fsdb,
2339                                     struct mgs_target_info *mti,
2340                                     char *logname, char *suffix, char *lovname,
2341                                     enum lustre_sec_part sec_part, int flags)
2342 {
2343         struct llog_handle *llh = NULL;
2344         char *nodeuuid = NULL;
2345         char *oscname = NULL;
2346         char *oscuuid = NULL;
2347         char *lovuuid = NULL;
2348         char *svname = NULL;
2349         char index[6];
2350         int i, rc;
2351
2352         ENTRY;
2353         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2354                mti->mti_svname, logname);
2355
2356         if (mgs_log_is_empty(env, mgs, logname)) {
2357                 CERROR("log is empty! Logical error\n");
2358                 RETURN (-EINVAL);
2359         }
2360
2361         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2362         if (rc)
2363                 RETURN(rc);
2364         rc = name_create(&svname, mti->mti_svname, "-osc");
2365         if (rc)
2366                 GOTO(out_free, rc);
2367
2368         /* for the system upgraded from old 1.8, keep using the old osc naming
2369          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2370         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2371                 rc = name_create(&oscname, svname, "");
2372         else
2373                 rc = name_create(&oscname, svname, suffix);
2374         if (rc)
2375                 GOTO(out_free, rc);
2376
2377         rc = name_create(&oscuuid, oscname, "_UUID");
2378         if (rc)
2379                 GOTO(out_free, rc);
2380         rc = name_create(&lovuuid, lovname, "_UUID");
2381         if (rc)
2382                 GOTO(out_free, rc);
2383
2384
2385         /*
2386         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2387         multihomed (#4)
2388         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2389         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2390         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2391         failover (#6,7)
2392         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2393         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2394         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2395         */
2396
2397         rc = record_start_log(env, mgs, &llh, logname);
2398         if (rc)
2399                 GOTO(out_free, rc);
2400
2401         /* FIXME these should be a single journal transaction */
2402         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2403                            "add osc");
2404         if (rc)
2405                 GOTO(out_end, rc);
2406
2407         /* NB: don't change record order, because upon MDT steal OSC config
2408          * from client, it treats all nids before LCFG_SETUP as target nids
2409          * (multiple interfaces), while nids after as failover node nids.
2410          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2411          */
2412         for (i = 0; i < mti->mti_nid_count; i++) {
2413                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2414                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2415                 if (rc)
2416                         GOTO(out_end, rc);
2417         }
2418         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2419         if (rc)
2420                 GOTO(out_end, rc);
2421         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2422         if (rc)
2423                 GOTO(out_end, rc);
2424         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2425         if (rc)
2426                 GOTO(out_end, rc);
2427
2428         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2429
2430         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2431         if (rc)
2432                 GOTO(out_end, rc);
2433         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2434                            "add osc");
2435         if (rc)
2436                 GOTO(out_end, rc);
2437 out_end:
2438         record_end_log(env, &llh);
2439 out_free:
2440         name_destroy(&lovuuid);
2441         name_destroy(&oscuuid);
2442         name_destroy(&oscname);
2443         name_destroy(&svname);
2444         name_destroy(&nodeuuid);
2445         RETURN(rc);
2446 }
2447
2448 static int mgs_write_log_ost(const struct lu_env *env,
2449                              struct mgs_device *mgs, struct fs_db *fsdb,
2450                              struct mgs_target_info *mti)
2451 {
2452         struct llog_handle *llh = NULL;
2453         char *logname, *lovname;
2454         char *ptr = mti->mti_params;
2455         int rc, flags = 0, failout = 0, i;
2456         ENTRY;
2457
2458         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2459
2460         /* The ost startup log */
2461
2462         /* If the ost log already exists, that means that someone reformatted
2463            the ost and it called target_add again. */
2464         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2465                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2466                                    "exists, yet the server claims it never "
2467                                    "registered. It may have been reformatted, "
2468                                    "or the index changed. writeconf the MDT to "
2469                                    "regenerate all logs.\n", mti->mti_svname);
2470                 RETURN(-EALREADY);
2471         }
2472
2473         /*
2474         attach obdfilter ost1 ost1_UUID
2475         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2476         */
2477         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2478                 failout = (strncmp(ptr, "failout", 7) == 0);
2479         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2480         if (rc)
2481                 RETURN(rc);
2482         /* FIXME these should be a single journal transaction */
2483         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2484         if (rc)
2485                 GOTO(out_end, rc);
2486         if (*mti->mti_uuid == '\0')
2487                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2488                          "%s_UUID", mti->mti_svname);
2489         rc = record_attach(env, llh, mti->mti_svname,
2490                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2491         if (rc)
2492                 GOTO(out_end, rc);
2493         rc = record_setup(env, llh, mti->mti_svname,
2494                           "dev"/*ignored*/, "type"/*ignored*/,
2495                           failout ? "n" : "f", 0/*options*/);
2496         if (rc)
2497                 GOTO(out_end, rc);
2498         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2499         if (rc)
2500                 GOTO(out_end, rc);
2501 out_end:
2502         record_end_log(env, &llh);
2503         if (rc)
2504                 RETURN(rc);
2505         /* We also have to update the other logs where this osc is part of
2506            the lov */
2507
2508         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2509                 /* If we're upgrading, the old mdt log already has our
2510                    entry. Let's do a fake one for fun. */
2511                 /* Note that we can't add any new failnids, since we don't
2512                    know the old osc names. */
2513                 flags = CM_SKIP | CM_UPGRADE146;
2514
2515         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2516                 /* If the update flag isn't set, don't update client/mdt
2517                    logs. */
2518                 flags |= CM_SKIP;
2519                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2520                               "the MDT first to regenerate it.\n",
2521                               mti->mti_svname);
2522         }
2523
2524         /* Add ost to all MDT lov defs */
2525         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2526                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2527                         char mdt_index[9];
2528
2529                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2530                                                      i);
2531                         if (rc)
2532                                 RETURN(rc);
2533                         sprintf(mdt_index, "-MDT%04x", i);
2534                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2535                                                       logname, mdt_index,
2536                                                       lovname, LUSTRE_SP_MDT,
2537                                                       flags);
2538                         name_destroy(&logname);
2539                         name_destroy(&lovname);
2540                         if (rc)
2541                                 RETURN(rc);
2542                 }
2543         }
2544
2545         /* Append ost info to the client log */
2546         rc = name_create(&logname, mti->mti_fsname, "-client");
2547         if (rc)
2548                 RETURN(rc);
2549         if (mgs_log_is_empty(env, mgs, logname)) {
2550                 /* Start client log */
2551                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2552                                        fsdb->fsdb_clilov);
2553                 if (rc)
2554                         GOTO(out_free, rc);
2555                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2556                                        fsdb->fsdb_clilmv);
2557                 if (rc)
2558                         GOTO(out_free, rc);
2559         }
2560         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2561                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2562 out_free:
2563         name_destroy(&logname);
2564         RETURN(rc);
2565 }
2566
2567 static __inline__ int mgs_param_empty(char *ptr)
2568 {
2569         char *tmp;
2570
2571         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2572                 return 1;
2573         return 0;
2574 }
2575
2576 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2577                                           struct mgs_device *mgs,
2578                                           struct fs_db *fsdb,
2579                                           struct mgs_target_info *mti,
2580                                           char *logname, char *cliname)
2581 {
2582         int rc;
2583         struct llog_handle *llh = NULL;
2584
2585         if (mgs_param_empty(mti->mti_params)) {
2586                 /* Remove _all_ failnids */
2587                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2588                                 mti->mti_svname, "add failnid", CM_SKIP);
2589                 return rc < 0 ? rc : 0;
2590         }
2591
2592         /* Otherwise failover nids are additive */
2593         rc = record_start_log(env, mgs, &llh, logname);
2594         if (rc)
2595                 return rc;
2596                 /* FIXME this should be a single journal transaction */
2597         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2598                            "add failnid");
2599         if (rc)
2600                 goto out_end;
2601         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2602         if (rc)
2603                 goto out_end;
2604         rc = record_marker(env, llh, fsdb, CM_END,
2605                            mti->mti_svname, "add failnid");
2606 out_end:
2607         record_end_log(env, &llh);
2608         return rc;
2609 }
2610
2611
2612 /* Add additional failnids to an existing log.
2613    The mdc/osc must have been added to logs first */
2614 /* tcp nids must be in dotted-quad ascii -
2615    we can't resolve hostnames from the kernel. */
2616 static int mgs_write_log_add_failnid(const struct lu_env *env,
2617                                      struct mgs_device *mgs,
2618                                      struct fs_db *fsdb,
2619                                      struct mgs_target_info *mti)
2620 {
2621         char *logname, *cliname;
2622         int rc;
2623         ENTRY;
2624
2625         /* FIXME we currently can't erase the failnids
2626          * given when a target first registers, since they aren't part of
2627          * an "add uuid" stanza */
2628
2629         /* Verify that we know about this target */
2630         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2631                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2632                                    "yet. It must be started before failnids "
2633                                    "can be added.\n", mti->mti_svname);
2634                 RETURN(-ENOENT);
2635         }
2636
2637         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2638         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2639                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2640         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2641                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2642         } else {
2643                 RETURN(-EINVAL);
2644         }
2645         if (rc)
2646                 RETURN(rc);
2647         /* Add failover nids to the client log */
2648         rc = name_create(&logname, mti->mti_fsname, "-client");
2649         if (rc) {
2650                 name_destroy(&cliname);
2651                 RETURN(rc);
2652         }
2653         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2654         name_destroy(&logname);
2655         name_destroy(&cliname);
2656         if (rc)
2657                 RETURN(rc);
2658
2659         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2660                 /* Add OST failover nids to the MDT logs as well */
2661                 int i;
2662
2663                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2664                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2665                                 continue;
2666                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2667                         if (rc)
2668                                 RETURN(rc);
2669                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2670                                                  fsdb, i);
2671                         if (rc) {
2672                                 name_destroy(&logname);
2673                                 RETURN(rc);
2674                         }
2675                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2676                                                             mti, logname,
2677                                                             cliname);
2678                         name_destroy(&cliname);
2679                         name_destroy(&logname);
2680                         if (rc)
2681                                 RETURN(rc);
2682                 }
2683         }
2684
2685         RETURN(rc);
2686 }
2687
2688 static int mgs_wlp_lcfg(const struct lu_env *env,
2689                         struct mgs_device *mgs, struct fs_db *fsdb,
2690                         struct mgs_target_info *mti,
2691                         char *logname, struct lustre_cfg_bufs *bufs,
2692                         char *tgtname, char *ptr)
2693 {
2694         char comment[MTI_NAME_MAXLEN];
2695         char *tmp;
2696         struct lustre_cfg *lcfg;
2697         int rc, del;
2698
2699         /* Erase any old settings of this same parameter */
2700         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2701         comment[MTI_NAME_MAXLEN - 1] = 0;
2702         /* But don't try to match the value. */
2703         if ((tmp = strchr(comment, '=')))
2704             *tmp = 0;
2705         /* FIXME we should skip settings that are the same as old values */
2706         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2707         if (rc < 0)
2708                 return rc;
2709         del = mgs_param_empty(ptr);
2710
2711         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2712                       "Sett" : "Modify", tgtname, comment, logname);
2713         if (del)
2714                 return rc;
2715
2716         lustre_cfg_bufs_reset(bufs, tgtname);
2717         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2718         if (mti->mti_flags & LDD_F_PARAM2)
2719                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2720
2721         lcfg = lustre_cfg_new((mti->mti_flags & LDD_F_PARAM2) ?
2722                               LCFG_SET_PARAM : LCFG_PARAM, bufs);
2723
2724         if (!lcfg)
2725                 return -ENOMEM;
2726         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2727         lustre_cfg_free(lcfg);
2728         return rc;
2729 }
2730
2731 static int mgs_write_log_param2(const struct lu_env *env,
2732                                 struct mgs_device *mgs,
2733                                 struct fs_db *fsdb,
2734                                 struct mgs_target_info *mti, char *ptr)
2735 {
2736         struct lustre_cfg_bufs  bufs;
2737         int                     rc = 0;
2738         ENTRY;
2739
2740         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2741         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2742                           mti->mti_svname, ptr);
2743
2744         RETURN(rc);
2745 }
2746
2747 /* write global variable settings into log */
2748 static int mgs_write_log_sys(const struct lu_env *env,
2749                              struct mgs_device *mgs, struct fs_db *fsdb,
2750                              struct mgs_target_info *mti, char *sys, char *ptr)
2751 {
2752         struct mgs_thread_info *mgi = mgs_env_info(env);
2753         struct lustre_cfg *lcfg;
2754         char *tmp, sep;
2755         int rc, cmd, convert = 1;
2756
2757         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2758                 cmd = LCFG_SET_TIMEOUT;
2759         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2760                 cmd = LCFG_SET_LDLM_TIMEOUT;
2761         /* Check for known params here so we can return error to lctl */
2762         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2763                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2764                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2765                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2766                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2767                 cmd = LCFG_PARAM;
2768         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2769                 convert = 0; /* Don't convert string value to integer */
2770                 cmd = LCFG_PARAM;
2771         } else {
2772                 return -EINVAL;
2773         }
2774
2775         if (mgs_param_empty(ptr))
2776                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2777         else
2778                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2779
2780         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2781         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2782         if (!convert && *tmp != '\0')
2783                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2784         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2785         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2786         /* truncate the comment to the parameter name */
2787         ptr = tmp - 1;
2788         sep = *ptr;
2789         *ptr = '\0';
2790         /* modify all servers and clients */
2791         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2792                                       *tmp == '\0' ? NULL : lcfg,
2793                                       mti->mti_fsname, sys, 0);
2794         if (rc == 0 && *tmp != '\0') {
2795                 switch (cmd) {
2796                 case LCFG_SET_TIMEOUT:
2797                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2798                                 class_process_config(lcfg);
2799                         break;
2800                 case LCFG_SET_LDLM_TIMEOUT:
2801                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2802                                 class_process_config(lcfg);
2803                         break;
2804                 default:
2805                         break;
2806                 }
2807         }
2808         *ptr = sep;
2809         lustre_cfg_free(lcfg);
2810         return rc;
2811 }
2812
2813 /* write quota settings into log */
2814 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2815                                struct fs_db *fsdb, struct mgs_target_info *mti,
2816                                char *quota, char *ptr)
2817 {
2818         struct mgs_thread_info  *mgi = mgs_env_info(env);
2819         struct lustre_cfg       *lcfg;
2820         char                    *tmp;
2821         char                     sep;
2822         int                      rc, cmd = LCFG_PARAM;
2823
2824         /* support only 'meta' and 'data' pools so far */
2825         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2826             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2827                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2828                        "& quota.ost are)\n", ptr);
2829                 return -EINVAL;
2830         }
2831
2832         if (*tmp == '\0') {
2833                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2834         } else {
2835                 CDEBUG(D_MGS, "global '%s'\n", quota);
2836
2837                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2838                     strcmp(tmp, "none") != 0) {
2839                         CERROR("enable option(%s) isn't supported\n", tmp);
2840                         return -EINVAL;
2841                 }
2842         }
2843
2844         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2845         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2846         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2847         /* truncate the comment to the parameter name */
2848         ptr = tmp - 1;
2849         sep = *ptr;
2850         *ptr = '\0';
2851
2852         /* XXX we duplicated quota enable information in all server
2853          *     config logs, it should be moved to a separate config
2854          *     log once we cleanup the config log for global param. */
2855         /* modify all servers */
2856         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2857                                       *tmp == '\0' ? NULL : lcfg,
2858                                       mti->mti_fsname, quota, 1);
2859         *ptr = sep;
2860         lustre_cfg_free(lcfg);
2861         return rc < 0 ? rc : 0;
2862 }
2863
2864 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2865                                    struct mgs_device *mgs,
2866                                    struct fs_db *fsdb,
2867                                    struct mgs_target_info *mti,
2868                                    char *param)
2869 {
2870         struct mgs_thread_info *mgi = mgs_env_info(env);
2871         struct llog_handle     *llh = NULL;
2872         char                   *logname;
2873         char                   *comment, *ptr;
2874         struct lustre_cfg      *lcfg;
2875         int                     rc, len;
2876         ENTRY;
2877
2878         /* get comment */
2879         ptr = strchr(param, '=');
2880         LASSERT(ptr);
2881         len = ptr - param;
2882
2883         OBD_ALLOC(comment, len + 1);
2884         if (comment == NULL)
2885                 RETURN(-ENOMEM);
2886         strncpy(comment, param, len);
2887         comment[len] = '\0';
2888
2889         /* prepare lcfg */
2890         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2891         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2892         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2893         if (lcfg == NULL)
2894                 GOTO(out_comment, rc = -ENOMEM);
2895
2896         /* construct log name */
2897         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2898         if (rc)
2899                 GOTO(out_lcfg, rc);
2900
2901         if (mgs_log_is_empty(env, mgs, logname)) {
2902                 rc = record_start_log(env, mgs, &llh, logname);
2903                 if (rc)
2904                         GOTO(out, rc);
2905                 record_end_log(env, &llh);
2906         }
2907
2908         /* obsolete old one */
2909         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2910                         comment, CM_SKIP);
2911         if (rc < 0)
2912                 GOTO(out, rc);
2913         /* write the new one */
2914         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2915                                   mti->mti_svname, comment);
2916         if (rc)
2917                 CERROR("err %d writing log %s\n", rc, logname);
2918 out:
2919         name_destroy(&logname);
2920 out_lcfg:
2921         lustre_cfg_free(lcfg);
2922 out_comment:
2923         OBD_FREE(comment, len + 1);
2924         RETURN(rc);
2925 }
2926
2927 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2928                                         char *param)
2929 {
2930         char    *ptr;
2931
2932         /* disable the adjustable udesc parameter for now, i.e. use default
2933          * setting that client always ship udesc to MDT if possible. to enable
2934          * it simply remove the following line */
2935         goto error_out;
2936
2937         ptr = strchr(param, '=');
2938         if (ptr == NULL)
2939                 goto error_out;
2940         *ptr++ = '\0';
2941
2942         if (strcmp(param, PARAM_SRPC_UDESC))
2943                 goto error_out;
2944
2945         if (strcmp(ptr, "yes") == 0) {
2946                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2947                 CWARN("Enable user descriptor shipping from client to MDT\n");
2948         } else if (strcmp(ptr, "no") == 0) {
2949                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2950                 CWARN("Disable user descriptor shipping from client to MDT\n");
2951         } else {
2952                 *(ptr - 1) = '=';
2953                 goto error_out;
2954         }
2955         return 0;
2956
2957 error_out:
2958         CERROR("Invalid param: %s\n", param);
2959         return -EINVAL;
2960 }
2961
2962 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2963                                   const char *svname,
2964                                   char *param)
2965 {
2966         struct sptlrpc_rule      rule;
2967         struct sptlrpc_rule_set *rset;
2968         int                      rc;
2969         ENTRY;
2970
2971         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2972                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2973                 RETURN(-EINVAL);
2974         }
2975
2976         if (strncmp(param, PARAM_SRPC_UDESC,
2977                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2978                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2979         }
2980
2981         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2982                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2983                 RETURN(-EINVAL);
2984         }
2985
2986         param += sizeof(PARAM_SRPC_FLVR) - 1;
2987
2988         rc = sptlrpc_parse_rule(param, &rule);
2989         if (rc)
2990                 RETURN(rc);
2991
2992         /* mgs rules implies must be mgc->mgs */
2993         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2994                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2995                      rule.sr_from != LUSTRE_SP_ANY) ||
2996                     (rule.sr_to != LUSTRE_SP_MGS &&
2997                      rule.sr_to != LUSTRE_SP_ANY))
2998                         RETURN(-EINVAL);
2999         }
3000
3001         /* preapre room for this coming rule. svcname format should be:
3002          * - fsname: general rule
3003          * - fsname-tgtname: target-specific rule
3004          */
3005         if (strchr(svname, '-')) {
3006                 struct mgs_tgt_srpc_conf *tgtconf;
3007                 int                       found = 0;
3008
3009                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3010                      tgtconf = tgtconf->mtsc_next) {
3011                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3012                                 found = 1;
3013                                 break;
3014                         }
3015                 }
3016
3017                 if (!found) {
3018                         int name_len;
3019
3020                         OBD_ALLOC_PTR(tgtconf);
3021                         if (tgtconf == NULL)
3022                                 RETURN(-ENOMEM);
3023
3024                         name_len = strlen(svname);
3025
3026                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3027                         if (tgtconf->mtsc_tgt == NULL) {
3028                                 OBD_FREE_PTR(tgtconf);
3029                                 RETURN(-ENOMEM);
3030                         }
3031                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3032
3033                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3034                         fsdb->fsdb_srpc_tgt = tgtconf;
3035                 }
3036
3037                 rset = &tgtconf->mtsc_rset;
3038         } else {
3039                 rset = &fsdb->fsdb_srpc_gen;
3040         }
3041
3042         rc = sptlrpc_rule_set_merge(rset, &rule);
3043
3044         RETURN(rc);
3045 }
3046
3047 static int mgs_srpc_set_param(const struct lu_env *env,
3048                               struct mgs_device *mgs,
3049                               struct fs_db *fsdb,
3050                               struct mgs_target_info *mti,
3051                               char *param)
3052 {
3053         char                   *copy;
3054         int                     rc, copy_size;
3055         ENTRY;
3056
3057 #ifndef HAVE_GSS
3058         RETURN(-EINVAL);
3059 #endif
3060         /* keep a copy of original param, which could be destroied
3061          * during parsing */
3062         copy_size = strlen(param) + 1;
3063         OBD_ALLOC(copy, copy_size);
3064         if (copy == NULL)
3065                 return -ENOMEM;
3066         memcpy(copy, param, copy_size);
3067
3068         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3069         if (rc)
3070                 goto out_free;
3071
3072         /* previous steps guaranteed the syntax is correct */
3073         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3074         if (rc)
3075                 goto out_free;
3076
3077         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3078                 /*
3079                  * for mgs rules, make them effective immediately.
3080                  */
3081                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3082                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3083                                                  &fsdb->fsdb_srpc_gen);
3084         }
3085
3086 out_free:
3087         OBD_FREE(copy, copy_size);
3088         RETURN(rc);
3089 }
3090
3091 struct mgs_srpc_read_data {
3092         struct fs_db   *msrd_fsdb;
3093         int             msrd_skip;
3094 };
3095
3096 static int mgs_srpc_read_handler(const struct lu_env *env,
3097                                  struct llog_handle *llh,
3098                                  struct llog_rec_hdr *rec, void *data)
3099 {
3100         struct mgs_srpc_read_data *msrd = data;
3101         struct cfg_marker         *marker;
3102         struct lustre_cfg         *lcfg = REC_DATA(rec);
3103         char                      *svname, *param;
3104         int                        cfg_len, rc;
3105         ENTRY;
3106
3107         if (rec->lrh_type != OBD_CFG_REC) {
3108                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3109                 RETURN(-EINVAL);
3110         }
3111
3112         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3113                   sizeof(struct llog_rec_tail);
3114
3115         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3116         if (rc) {
3117                 CERROR("Insane cfg\n");
3118                 RETURN(rc);
3119         }
3120
3121         if (lcfg->lcfg_command == LCFG_MARKER) {
3122                 marker = lustre_cfg_buf(lcfg, 1);
3123
3124                 if (marker->cm_flags & CM_START &&
3125                     marker->cm_flags & CM_SKIP)
3126                         msrd->msrd_skip = 1;
3127                 if (marker->cm_flags & CM_END)
3128                         msrd->msrd_skip = 0;
3129
3130                 RETURN(0);
3131         }
3132
3133         if (msrd->msrd_skip)
3134                 RETURN(0);
3135
3136         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3137                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3138                 RETURN(0);
3139         }
3140
3141         svname = lustre_cfg_string(lcfg, 0);
3142         if (svname == NULL) {
3143                 CERROR("svname is empty\n");
3144                 RETURN(0);
3145         }
3146
3147         param = lustre_cfg_string(lcfg, 1);
3148         if (param == NULL) {
3149                 CERROR("param is empty\n");
3150                 RETURN(0);
3151         }
3152
3153         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3154         if (rc)
3155                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3156
3157         RETURN(0);
3158 }
3159
3160 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3161                                 struct mgs_device *mgs,
3162                                 struct fs_db *fsdb)
3163 {
3164         struct llog_handle        *llh = NULL;
3165         struct llog_ctxt          *ctxt;
3166         char                      *logname;
3167         struct mgs_srpc_read_data  msrd;
3168         int                        rc;
3169         ENTRY;
3170
3171         /* construct log name */
3172         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3173         if (rc)
3174                 RETURN(rc);
3175
3176         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3177         LASSERT(ctxt != NULL);
3178
3179         if (mgs_log_is_empty(env, mgs, logname))
3180                 GOTO(out, rc = 0);
3181
3182         rc = llog_open(env, ctxt, &llh, NULL, logname,
3183                        LLOG_OPEN_EXISTS);
3184         if (rc < 0) {
3185                 if (rc == -ENOENT)
3186                         rc = 0;
3187                 GOTO(out, rc);
3188         }
3189
3190         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3191         if (rc)
3192                 GOTO(out_close, rc);
3193
3194         if (llog_get_size(llh) <= 1)
3195                 GOTO(out_close, rc = 0);
3196
3197         msrd.msrd_fsdb = fsdb;
3198         msrd.msrd_skip = 0;
3199
3200         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3201                           NULL);
3202
3203 out_close:
3204         llog_close(env, llh);
3205 out:
3206         llog_ctxt_put(ctxt);
3207         name_destroy(&logname);
3208
3209         if (rc)
3210                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3211         RETURN(rc);
3212 }
3213
3214 /* Permanent settings of all parameters by writing into the appropriate
3215  * configuration logs.
3216  * A parameter with null value ("<param>='\0'") means to erase it out of
3217  * the logs.
3218  */
3219 static int mgs_write_log_param(const struct lu_env *env,
3220                                struct mgs_device *mgs, struct fs_db *fsdb,
3221                                struct mgs_target_info *mti, char *ptr)
3222 {
3223         struct mgs_thread_info *mgi = mgs_env_info(env);
3224         char *logname;
3225         char *tmp;
3226         int rc = 0, rc2 = 0;
3227         ENTRY;
3228
3229         /* For various parameter settings, we have to figure out which logs
3230            care about them (e.g. both mdt and client for lov settings) */
3231         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3232
3233         /* The params are stored in MOUNT_DATA_FILE and modified via
3234            tunefs.lustre, or set using lctl conf_param */
3235
3236         /* Processed in lustre_start_mgc */
3237         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3238                 GOTO(end, rc);
3239
3240         /* Processed in ost/mdt */
3241         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3242                 GOTO(end, rc);
3243
3244         /* Processed in mgs_write_log_ost */
3245         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3246                 if (mti->mti_flags & LDD_F_PARAM) {
3247                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3248                                            "changed with tunefs.lustre"
3249                                            "and --writeconf\n", ptr);
3250                         rc = -EPERM;
3251                 }
3252                 GOTO(end, rc);
3253         }
3254
3255         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {