Whamcloud - gitweb
85a42a95737f0caa8a5ea345efa1f7be16117149
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(NULL);
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(NULL);
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351         } else {
352                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
353                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
354                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
355                         CERROR("No memory for index maps\n");
356                         GOTO(err, rc = -ENOMEM);
357                 }
358
359                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
360                 if (rc)
361                         GOTO(err, rc);
362                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
363                 if (rc)
364                         GOTO(err, rc);
365
366                 /* initialise data for NID table */
367                 mgs_ir_init_fs(env, mgs, fsdb);
368
369                 lproc_mgs_add_live(mgs, fsdb);
370         }
371
372         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
373
374         RETURN(fsdb);
375 err:
376         if (fsdb->fsdb_ost_index_map)
377                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
378         if (fsdb->fsdb_mdt_index_map)
379                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
380         name_destroy(&fsdb->fsdb_clilov);
381         name_destroy(&fsdb->fsdb_clilmv);
382         OBD_FREE_PTR(fsdb);
383         RETURN(NULL);
384 }
385
386 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
387 {
388         /* wait for anyone with the sem */
389         mutex_lock(&fsdb->fsdb_mutex);
390         lproc_mgs_del_live(mgs, fsdb);
391         list_del(&fsdb->fsdb_list);
392
393         /* deinitialize fsr */
394         mgs_ir_fini_fs(mgs, fsdb);
395
396         if (fsdb->fsdb_ost_index_map)
397                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
398         if (fsdb->fsdb_mdt_index_map)
399                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
400         name_destroy(&fsdb->fsdb_clilov);
401         name_destroy(&fsdb->fsdb_clilmv);
402         mgs_free_fsdb_srpc(fsdb);
403         mutex_unlock(&fsdb->fsdb_mutex);
404         OBD_FREE_PTR(fsdb);
405 }
406
407 int mgs_init_fsdb_list(struct mgs_device *mgs)
408 {
409         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
410         return 0;
411 }
412
413 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
414 {
415         struct fs_db *fsdb;
416         struct list_head *tmp, *tmp2;
417
418         mutex_lock(&mgs->mgs_mutex);
419         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
420                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
421                 mgs_free_fsdb(mgs, fsdb);
422         }
423         mutex_unlock(&mgs->mgs_mutex);
424         return 0;
425 }
426
427 int mgs_find_or_make_fsdb(const struct lu_env *env,
428                           struct mgs_device *mgs, char *name,
429                           struct fs_db **dbh)
430 {
431         struct fs_db *fsdb;
432         int rc = 0;
433
434         ENTRY;
435         mutex_lock(&mgs->mgs_mutex);
436         fsdb = mgs_find_fsdb(mgs, name);
437         if (fsdb) {
438                 mutex_unlock(&mgs->mgs_mutex);
439                 *dbh = fsdb;
440                 RETURN(0);
441         }
442
443         CDEBUG(D_MGS, "Creating new db\n");
444         fsdb = mgs_new_fsdb(env, mgs, name);
445         /* lock fsdb_mutex until the db is loaded from llogs */
446         if (fsdb)
447                 mutex_lock(&fsdb->fsdb_mutex);
448         mutex_unlock(&mgs->mgs_mutex);
449         if (!fsdb)
450                 RETURN(-ENOMEM);
451
452         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
453                 /* populate the db from the client llog */
454                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
455                 if (rc) {
456                         CERROR("Can't get db from client log %d\n", rc);
457                         GOTO(out_free, rc);
458                 }
459         }
460
461         /* populate srpc rules from params llog */
462         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
463         if (rc) {
464                 CERROR("Can't get db from params log %d\n", rc);
465                 GOTO(out_free, rc);
466         }
467
468         mutex_unlock(&fsdb->fsdb_mutex);
469         *dbh = fsdb;
470
471         RETURN(0);
472
473 out_free:
474         mutex_unlock(&fsdb->fsdb_mutex);
475         mgs_free_fsdb(mgs, fsdb);
476         return rc;
477 }
478
479 /* 1 = index in use
480    0 = index unused
481    -1= empty client log */
482 int mgs_check_index(const struct lu_env *env,
483                     struct mgs_device *mgs,
484                     struct mgs_target_info *mti)
485 {
486         struct fs_db *fsdb;
487         void *imap;
488         int rc = 0;
489         ENTRY;
490
491         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
492
493         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
494         if (rc) {
495                 CERROR("Can't get db for %s\n", mti->mti_fsname);
496                 RETURN(rc);
497         }
498
499         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
500                 RETURN(-1);
501
502         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
503                 imap = fsdb->fsdb_ost_index_map;
504         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
505                 imap = fsdb->fsdb_mdt_index_map;
506         else
507                 RETURN(-EINVAL);
508
509         if (test_bit(mti->mti_stripe_index, imap))
510                 RETURN(1);
511         RETURN(0);
512 }
513
514 static __inline__ int next_index(void *index_map, int map_len)
515 {
516         int i;
517         for (i = 0; i < map_len * 8; i++)
518                 if (!test_bit(i, index_map)) {
519                          return i;
520                  }
521         CERROR("max index %d exceeded.\n", i);
522         return -1;
523 }
524
525 /* Return codes:
526         0  newly marked as in use
527         <0 err
528         +EALREADY for update of an old index */
529 static int mgs_set_index(const struct lu_env *env,
530                          struct mgs_device *mgs,
531                          struct mgs_target_info *mti)
532 {
533         struct fs_db *fsdb;
534         void *imap;
535         int rc = 0;
536         ENTRY;
537
538         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
539         if (rc) {
540                 CERROR("Can't get db for %s\n", mti->mti_fsname);
541                 RETURN(rc);
542         }
543
544         mutex_lock(&fsdb->fsdb_mutex);
545         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
546                 imap = fsdb->fsdb_ost_index_map;
547         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
548                 imap = fsdb->fsdb_mdt_index_map;
549         } else {
550                 GOTO(out_up, rc = -EINVAL);
551         }
552
553         if (mti->mti_flags & LDD_F_NEED_INDEX) {
554                 rc = next_index(imap, INDEX_MAP_SIZE);
555                 if (rc == -1)
556                         GOTO(out_up, rc = -ERANGE);
557                 mti->mti_stripe_index = rc;
558                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
559                         fsdb->fsdb_mdt_count ++;
560         }
561
562         /* the last index(0xffff) is reserved for default value. */
563         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
564                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
565                                    "but index must be less than %u.\n",
566                                    mti->mti_svname, mti->mti_stripe_index,
567                                    INDEX_MAP_SIZE * 8 - 1);
568                 GOTO(out_up, rc = -ERANGE);
569         }
570
571         if (test_bit(mti->mti_stripe_index, imap)) {
572                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
573                     !(mti->mti_flags & LDD_F_WRITECONF)) {
574                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
575                                            "%d, but that index is already in "
576                                            "use. Use --writeconf to force\n",
577                                            mti->mti_svname,
578                                            mti->mti_stripe_index);
579                         GOTO(out_up, rc = -EADDRINUSE);
580                 } else {
581                         CDEBUG(D_MGS, "Server %s updating index %d\n",
582                                mti->mti_svname, mti->mti_stripe_index);
583                         GOTO(out_up, rc = EALREADY);
584                 }
585         }
586
587         set_bit(mti->mti_stripe_index, imap);
588         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
589         mutex_unlock(&fsdb->fsdb_mutex);
590         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
591                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
592
593         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
594                mti->mti_stripe_index);
595
596         RETURN(0);
597 out_up:
598         mutex_unlock(&fsdb->fsdb_mutex);
599         return rc;
600 }
601
602 struct mgs_modify_lookup {
603         struct cfg_marker mml_marker;
604         int               mml_modified;
605 };
606
607 static int mgs_modify_handler(const struct lu_env *env,
608                               struct llog_handle *llh,
609                               struct llog_rec_hdr *rec, void *data)
610 {
611         struct mgs_modify_lookup *mml = data;
612         struct cfg_marker *marker;
613         struct lustre_cfg *lcfg = REC_DATA(rec);
614         int cfg_len = REC_DATA_LEN(rec);
615         int rc;
616         ENTRY;
617
618         if (rec->lrh_type != OBD_CFG_REC) {
619                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
620                 RETURN(-EINVAL);
621         }
622
623         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
624         if (rc) {
625                 CERROR("Insane cfg\n");
626                 RETURN(rc);
627         }
628
629         /* We only care about markers */
630         if (lcfg->lcfg_command != LCFG_MARKER)
631                 RETURN(0);
632
633         marker = lustre_cfg_buf(lcfg, 1);
634         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
635             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
636             !(marker->cm_flags & CM_SKIP)) {
637                 /* Found a non-skipped marker match */
638                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
639                        rec->lrh_index, marker->cm_step,
640                        marker->cm_flags, mml->mml_marker.cm_flags,
641                        marker->cm_tgtname, marker->cm_comment);
642                 /* Overwrite the old marker llog entry */
643                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
644                 marker->cm_flags |= mml->mml_marker.cm_flags;
645                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
646                 rc = llog_write(env, llh, rec, rec->lrh_index);
647                 if (!rc)
648                          mml->mml_modified++;
649         }
650
651         RETURN(rc);
652 }
653
654 /**
655  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
656  * Return code:
657  * 0 - modified successfully,
658  * 1 - no modification was done
659  * negative - error
660  */
661 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
662                       struct fs_db *fsdb, struct mgs_target_info *mti,
663                       char *logname, char *devname, char *comment, int flags)
664 {
665         struct llog_handle *loghandle;
666         struct llog_ctxt *ctxt;
667         struct mgs_modify_lookup *mml;
668         int rc;
669
670         ENTRY;
671
672         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
673         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
674                flags);
675
676         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
677         LASSERT(ctxt != NULL);
678         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
679         if (rc < 0) {
680                 if (rc == -ENOENT)
681                         rc = 0;
682                 GOTO(out_pop, rc);
683         }
684
685         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
686         if (rc)
687                 GOTO(out_close, rc);
688
689         if (llog_get_size(loghandle) <= 1)
690                 GOTO(out_close, rc = 0);
691
692         OBD_ALLOC_PTR(mml);
693         if (!mml)
694                 GOTO(out_close, rc = -ENOMEM);
695         if (strlcpy(mml->mml_marker.cm_comment, comment,
696                     sizeof(mml->mml_marker.cm_comment)) >=
697             sizeof(mml->mml_marker.cm_comment))
698                 GOTO(out_free, rc = -E2BIG);
699         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
700                     sizeof(mml->mml_marker.cm_tgtname)) >=
701             sizeof(mml->mml_marker.cm_tgtname))
702                 GOTO(out_free, rc = -E2BIG);
703         /* Modify mostly means cancel */
704         mml->mml_marker.cm_flags = flags;
705         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
706         mml->mml_modified = 0;
707         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
708                           NULL);
709         if (!rc && !mml->mml_modified)
710                 rc = 1;
711
712 out_free:
713         OBD_FREE_PTR(mml);
714
715 out_close:
716         llog_close(env, loghandle);
717 out_pop:
718         if (rc < 0)
719                 CERROR("%s: modify %s/%s failed: rc = %d\n",
720                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
721         llog_ctxt_put(ctxt);
722         RETURN(rc);
723 }
724
725 /** This structure is passed to mgs_replace_handler */
726 struct mgs_replace_uuid_lookup {
727         /* Nids are replaced for this target device */
728         struct mgs_target_info target;
729         /* Temporary modified llog */
730         struct llog_handle *temp_llh;
731         /* Flag is set if in target block*/
732         int in_target_device;
733         /* Nids already added. Just skip (multiple nids) */
734         int device_nids_added;
735         /* Flag is set if this block should not be copied */
736         int skip_it;
737 };
738
739 /**
740  * Check: a) if block should be skipped
741  * b) is it target block
742  *
743  * \param[in] lcfg
744  * \param[in] mrul
745  *
746  * \retval 0 should not to be skipped
747  * \retval 1 should to be skipped
748  */
749 static int check_markers(struct lustre_cfg *lcfg,
750                          struct mgs_replace_uuid_lookup *mrul)
751 {
752          struct cfg_marker *marker;
753
754         /* Track markers. Find given device */
755         if (lcfg->lcfg_command == LCFG_MARKER) {
756                 marker = lustre_cfg_buf(lcfg, 1);
757                 /* Clean llog from records marked as CM_EXCLUDE.
758                    CM_SKIP records are used for "active" command
759                    and can be restored if needed */
760                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
761                     (CM_EXCLUDE | CM_START)) {
762                         mrul->skip_it = 1;
763                         return 1;
764                 }
765
766                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
767                     (CM_EXCLUDE | CM_END)) {
768                         mrul->skip_it = 0;
769                         return 1;
770                 }
771
772                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
773                         LASSERT(!(marker->cm_flags & CM_START) ||
774                                 !(marker->cm_flags & CM_END));
775                         if (marker->cm_flags & CM_START) {
776                                 mrul->in_target_device = 1;
777                                 mrul->device_nids_added = 0;
778                         } else if (marker->cm_flags & CM_END)
779                                 mrul->in_target_device = 0;
780                 }
781         }
782
783         return 0;
784 }
785
786 static int record_base(const struct lu_env *env, struct llog_handle *llh,
787                      char *cfgname, lnet_nid_t nid, int cmd,
788                      char *s1, char *s2, char *s3, char *s4)
789 {
790         struct mgs_thread_info  *mgi = mgs_env_info(env);
791         struct llog_cfg_rec     *lcr;
792         int rc;
793
794         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
795                cmd, s1, s2, s3, s4);
796
797         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
798         if (s1)
799                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
800         if (s2)
801                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
802         if (s3)
803                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
804         if (s4)
805                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
806
807         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
808         if (lcr == NULL)
809                 return -ENOMEM;
810
811         lcr->lcr_cfg.lcfg_nid = nid;
812         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
813
814         lustre_cfg_rec_free(lcr);
815
816         if (rc < 0)
817                 CDEBUG(D_MGS,
818                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
819                        cfgname, cmd, s1, s2, s3, s4, rc);
820         return rc;
821 }
822
823 static inline int record_add_uuid(const struct lu_env *env,
824                                   struct llog_handle *llh,
825                                   uint64_t nid, char *uuid)
826 {
827         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
828 }
829
830 static inline int record_add_conn(const struct lu_env *env,
831                                   struct llog_handle *llh,
832                                   char *devname, char *uuid)
833 {
834         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
835 }
836
837 static inline int record_attach(const struct lu_env *env,
838                                 struct llog_handle *llh, char *devname,
839                                 char *type, char *uuid)
840 {
841         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
842 }
843
844 static inline int record_setup(const struct lu_env *env,
845                                struct llog_handle *llh, char *devname,
846                                char *s1, char *s2, char *s3, char *s4)
847 {
848         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
849 }
850
851 /**
852  * \retval <0 record processing error
853  * \retval n record is processed. No need copy original one.
854  * \retval 0 record is not processed.
855  */
856 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
857                            struct mgs_replace_uuid_lookup *mrul)
858 {
859         int nids_added = 0;
860         lnet_nid_t nid;
861         char *ptr;
862         int rc;
863
864         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
865                 /* LCFG_ADD_UUID command found. Let's skip original command
866                    and add passed nids */
867                 ptr = mrul->target.mti_params;
868                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
869                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
870                                "device %s\n", libcfs_nid2str(nid),
871                                 mrul->target.mti_params,
872                                 mrul->target.mti_svname);
873                         rc = record_add_uuid(env,
874                                              mrul->temp_llh, nid,
875                                              mrul->target.mti_params);
876                         if (!rc)
877                                 nids_added++;
878                 }
879
880                 if (nids_added == 0) {
881                         CERROR("No new nids were added, nid %s with uuid %s, "
882                                "device %s\n", libcfs_nid2str(nid),
883                                mrul->target.mti_params,
884                                mrul->target.mti_svname);
885                         RETURN(-ENXIO);
886                 } else {
887                         mrul->device_nids_added = 1;
888                 }
889
890                 return nids_added;
891         }
892
893         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
894                 /* LCFG_SETUP command found. UUID should be changed */
895                 rc = record_setup(env,
896                                   mrul->temp_llh,
897                                   /* devname the same */
898                                   lustre_cfg_string(lcfg, 0),
899                                   /* s1 is not changed */
900                                   lustre_cfg_string(lcfg, 1),
901                                   /* new uuid should be
902                                   the full nidlist */
903                                   mrul->target.mti_params,
904                                   /* s3 is not changed */
905                                   lustre_cfg_string(lcfg, 3),
906                                   /* s4 is not changed */
907                                   lustre_cfg_string(lcfg, 4));
908                 return rc ? rc : 1;
909         }
910
911         /* Another commands in target device block */
912         return 0;
913 }
914
915 /**
916  * Handler that called for every record in llog.
917  * Records are processed in order they placed in llog.
918  *
919  * \param[in] llh       log to be processed
920  * \param[in] rec       current record
921  * \param[in] data      mgs_replace_uuid_lookup structure
922  *
923  * \retval 0    success
924  */
925 static int mgs_replace_handler(const struct lu_env *env,
926                                struct llog_handle *llh,
927                                struct llog_rec_hdr *rec,
928                                void *data)
929 {
930         struct mgs_replace_uuid_lookup *mrul;
931         struct lustre_cfg *lcfg = REC_DATA(rec);
932         int cfg_len = REC_DATA_LEN(rec);
933         int rc;
934         ENTRY;
935
936         mrul = (struct mgs_replace_uuid_lookup *)data;
937
938         if (rec->lrh_type != OBD_CFG_REC) {
939                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
940                        rec->lrh_type, lcfg->lcfg_command,
941                        lustre_cfg_string(lcfg, 0),
942                        lustre_cfg_string(lcfg, 1));
943                 RETURN(-EINVAL);
944         }
945
946         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
947         if (rc) {
948                 /* Do not copy any invalidated records */
949                 GOTO(skip_out, rc = 0);
950         }
951
952         rc = check_markers(lcfg, mrul);
953         if (rc || mrul->skip_it)
954                 GOTO(skip_out, rc = 0);
955
956         /* Write to new log all commands outside target device block */
957         if (!mrul->in_target_device)
958                 GOTO(copy_out, rc = 0);
959
960         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
961            (failover nids) for this target, assuming that if then
962            primary is changing then so is the failover */
963         if (mrul->device_nids_added &&
964             (lcfg->lcfg_command == LCFG_ADD_UUID ||
965              lcfg->lcfg_command == LCFG_ADD_CONN))
966                 GOTO(skip_out, rc = 0);
967
968         rc = process_command(env, lcfg, mrul);
969         if (rc < 0)
970                 RETURN(rc);
971
972         if (rc)
973                 RETURN(0);
974 copy_out:
975         /* Record is placed in temporary llog as is */
976         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
977
978         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
979                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
980                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
981         RETURN(rc);
982
983 skip_out:
984         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
985                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
986                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
987         RETURN(rc);
988 }
989
990 static int mgs_log_is_empty(const struct lu_env *env,
991                             struct mgs_device *mgs, char *name)
992 {
993         struct llog_ctxt        *ctxt;
994         int                      rc;
995
996         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
997         LASSERT(ctxt != NULL);
998
999         rc = llog_is_empty(env, ctxt, name);
1000         llog_ctxt_put(ctxt);
1001         return rc;
1002 }
1003
1004 static int mgs_replace_nids_log(const struct lu_env *env,
1005                                 struct obd_device *mgs, struct fs_db *fsdb,
1006                                 char *logname, char *devname, char *nids)
1007 {
1008         struct llog_handle *orig_llh, *backup_llh;
1009         struct llog_ctxt *ctxt;
1010         struct mgs_replace_uuid_lookup *mrul;
1011         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1012         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1013         char *backup;
1014         int rc, rc2;
1015         ENTRY;
1016
1017         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1018
1019         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1020         LASSERT(ctxt != NULL);
1021
1022         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1023                 /* Log is empty. Nothing to replace */
1024                 GOTO(out_put, rc = 0);
1025         }
1026
1027         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1028         if (backup == NULL)
1029                 GOTO(out_put, rc = -ENOMEM);
1030
1031         sprintf(backup, "%s.bak", logname);
1032
1033         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1034         if (rc == 0) {
1035                 /* Now erase original log file. Connections are not allowed.
1036                    Backup is already saved */
1037                 rc = llog_erase(env, ctxt, NULL, logname);
1038                 if (rc < 0)
1039                         GOTO(out_free, rc);
1040         } else if (rc != -ENOENT) {
1041                 CERROR("%s: can't make backup for %s: rc = %d\n",
1042                        mgs->obd_name, logname, rc);
1043                 GOTO(out_free,rc);
1044         }
1045
1046         /* open local log */
1047         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1048         if (rc)
1049                 GOTO(out_restore, rc);
1050
1051         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1052         if (rc)
1053                 GOTO(out_closel, rc);
1054
1055         /* open backup llog */
1056         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1057                        LLOG_OPEN_EXISTS);
1058         if (rc)
1059                 GOTO(out_closel, rc);
1060
1061         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1062         if (rc)
1063                 GOTO(out_close, rc);
1064
1065         if (llog_get_size(backup_llh) <= 1)
1066                 GOTO(out_close, rc = 0);
1067
1068         OBD_ALLOC_PTR(mrul);
1069         if (!mrul)
1070                 GOTO(out_close, rc = -ENOMEM);
1071         /* devname is only needed information to replace UUID records */
1072         strlcpy(mrul->target.mti_svname, devname,
1073                 sizeof(mrul->target.mti_svname));
1074         /* parse nids later */
1075         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1076         /* Copy records to this temporary llog */
1077         mrul->temp_llh = orig_llh;
1078
1079         rc = llog_process(env, backup_llh, mgs_replace_handler,
1080                           (void *)mrul, NULL);
1081         OBD_FREE_PTR(mrul);
1082 out_close:
1083         rc2 = llog_close(NULL, backup_llh);
1084         if (!rc)
1085                 rc = rc2;
1086 out_closel:
1087         rc2 = llog_close(NULL, orig_llh);
1088         if (!rc)
1089                 rc = rc2;
1090
1091 out_restore:
1092         if (rc) {
1093                 CERROR("%s: llog should be restored: rc = %d\n",
1094                        mgs->obd_name, rc);
1095                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1096                                   logname);
1097                 if (rc2 < 0)
1098                         CERROR("%s: can't restore backup %s: rc = %d\n",
1099                                mgs->obd_name, logname, rc2);
1100         }
1101
1102 out_free:
1103         OBD_FREE(backup, strlen(backup) + 1);
1104
1105 out_put:
1106         llog_ctxt_put(ctxt);
1107
1108         if (rc)
1109                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1110                        mgs->obd_name, logname, rc);
1111
1112         RETURN(rc);
1113 }
1114
1115 /**
1116  * Parse device name and get file system name and/or device index
1117  *
1118  * \param[in]   devname device name (ex. lustre-MDT0000)
1119  * \param[out]  fsname  file system name(optional)
1120  * \param[out]  index   device index(optional)
1121  *
1122  * \retval 0    success
1123  */
1124 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1125 {
1126         int rc;
1127         ENTRY;
1128
1129         /* Extract fsname */
1130         if (fsname) {
1131                 rc = server_name2fsname(devname, fsname, NULL);
1132                 if (rc < 0) {
1133                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1134                                devname);
1135                         RETURN(-EINVAL);
1136                 }
1137         }
1138
1139         if (index) {
1140                 rc = server_name2index(devname, index, NULL);
1141                 if (rc < 0) {
1142                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1143                                devname);
1144                         RETURN(-EINVAL);
1145                 }
1146         }
1147
1148         RETURN(0);
1149 }
1150
1151 /* This is only called during replace_nids */
1152 static int only_mgs_is_running(struct obd_device *mgs_obd)
1153 {
1154         /* TDB: Is global variable with devices count exists? */
1155         int num_devices = get_devices_count();
1156         int num_exports = 0;
1157         struct obd_export *exp;
1158
1159         spin_lock(&mgs_obd->obd_dev_lock);
1160         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1161                 /* skip self export */
1162                 if (exp == mgs_obd->obd_self_export)
1163                         continue;
1164                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1165                         continue;
1166
1167                 ++num_exports;
1168
1169                 CERROR("%s: node %s still connected during replace_nids "
1170                        "connect_flags:%llx\n",
1171                        mgs_obd->obd_name,
1172                        libcfs_nid2str(exp->exp_nid_stats->nid),
1173                        exp_connect_flags(exp));
1174
1175         }
1176         spin_unlock(&mgs_obd->obd_dev_lock);
1177
1178         /* osd, MGS and MGC + self_export
1179            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1180         return (num_devices <= 3) && (num_exports == 0);
1181 }
1182
1183 static int name_create_mdt(char **logname, char *fsname, int i)
1184 {
1185         char mdt_index[9];
1186
1187         sprintf(mdt_index, "-MDT%04x", i);
1188         return name_create(logname, fsname, mdt_index);
1189 }
1190
1191 /**
1192  * Replace nids for \a device to \a nids values
1193  *
1194  * \param obd           MGS obd device
1195  * \param devname       nids need to be replaced for this device
1196  * (ex. lustre-OST0000)
1197  * \param nids          nids list (ex. nid1,nid2,nid3)
1198  *
1199  * \retval 0    success
1200  */
1201 int mgs_replace_nids(const struct lu_env *env,
1202                      struct mgs_device *mgs,
1203                      char *devname, char *nids)
1204 {
1205         /* Assume fsname is part of device name */
1206         char fsname[MTI_NAME_MAXLEN];
1207         int rc;
1208         __u32 index;
1209         char *logname;
1210         struct fs_db *fsdb;
1211         unsigned int i;
1212         int conn_state;
1213         struct obd_device *mgs_obd = mgs->mgs_obd;
1214         ENTRY;
1215
1216         /* We can only change NIDs if no other nodes are connected */
1217         spin_lock(&mgs_obd->obd_dev_lock);
1218         conn_state = mgs_obd->obd_no_conn;
1219         mgs_obd->obd_no_conn = 1;
1220         spin_unlock(&mgs_obd->obd_dev_lock);
1221
1222         /* We can not change nids if not only MGS is started */
1223         if (!only_mgs_is_running(mgs_obd)) {
1224                 CERROR("Only MGS is allowed to be started\n");
1225                 GOTO(out, rc = -EINPROGRESS);
1226         }
1227
1228         /* Get fsname and index*/
1229         rc = mgs_parse_devname(devname, fsname, &index);
1230         if (rc)
1231                 GOTO(out, rc);
1232
1233         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1234         if (rc) {
1235                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1236                 GOTO(out, rc);
1237         }
1238
1239         /* Process client llogs */
1240         name_create(&logname, fsname, "-client");
1241         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1242         name_destroy(&logname);
1243         if (rc) {
1244                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1245                        fsname, devname, rc);
1246                 GOTO(out, rc);
1247         }
1248
1249         /* Process MDT llogs */
1250         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1251                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1252                         continue;
1253                 name_create_mdt(&logname, fsname, i);
1254                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1255                 name_destroy(&logname);
1256                 if (rc)
1257                         GOTO(out, rc);
1258         }
1259
1260 out:
1261         spin_lock(&mgs_obd->obd_dev_lock);
1262         mgs_obd->obd_no_conn = conn_state;
1263         spin_unlock(&mgs_obd->obd_dev_lock);
1264
1265         RETURN(rc);
1266 }
1267
1268 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1269                             char *devname, struct lov_desc *desc)
1270 {
1271         struct mgs_thread_info  *mgi = mgs_env_info(env);
1272         struct llog_cfg_rec     *lcr;
1273         int rc;
1274
1275         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1276         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1277         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1278         if (lcr == NULL)
1279                 return -ENOMEM;
1280
1281         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1282         lustre_cfg_rec_free(lcr);
1283         return rc;
1284 }
1285
1286 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1287                             char *devname, struct lmv_desc *desc)
1288 {
1289         struct mgs_thread_info  *mgi = mgs_env_info(env);
1290         struct llog_cfg_rec     *lcr;
1291         int rc;
1292
1293         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1294         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1295         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1296         if (lcr == NULL)
1297                 return -ENOMEM;
1298
1299         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1300         lustre_cfg_rec_free(lcr);
1301         return rc;
1302 }
1303
1304 static inline int record_mdc_add(const struct lu_env *env,
1305                                  struct llog_handle *llh,
1306                                  char *logname, char *mdcuuid,
1307                                  char *mdtuuid, char *index,
1308                                  char *gen)
1309 {
1310         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1311                            mdtuuid,index,gen,mdcuuid);
1312 }
1313
1314 static inline int record_lov_add(const struct lu_env *env,
1315                                  struct llog_handle *llh,
1316                                  char *lov_name, char *ost_uuid,
1317                                  char *index, char *gen)
1318 {
1319         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1320                            ost_uuid, index, gen, 0);
1321 }
1322
1323 static inline int record_mount_opt(const struct lu_env *env,
1324                                    struct llog_handle *llh,
1325                                    char *profile, char *lov_name,
1326                                    char *mdc_name)
1327 {
1328         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1329                            profile,lov_name,mdc_name,0);
1330 }
1331
1332 static int record_marker(const struct lu_env *env,
1333                          struct llog_handle *llh,
1334                          struct fs_db *fsdb, __u32 flags,
1335                          char *tgtname, char *comment)
1336 {
1337         struct mgs_thread_info *mgi = mgs_env_info(env);
1338         struct llog_cfg_rec *lcr;
1339         int rc;
1340         int cplen = 0;
1341
1342         if (flags & CM_START)
1343                 fsdb->fsdb_gen++;
1344         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1345         mgi->mgi_marker.cm_flags = flags;
1346         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1347         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1348                         sizeof(mgi->mgi_marker.cm_tgtname));
1349         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1350                 return -E2BIG;
1351         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1352                         sizeof(mgi->mgi_marker.cm_comment));
1353         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1354                 return -E2BIG;
1355         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1356         mgi->mgi_marker.cm_canceltime = 0;
1357         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1358         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1359                             sizeof(mgi->mgi_marker));
1360         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1361         if (lcr == NULL)
1362                 return -ENOMEM;
1363
1364         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1365         lustre_cfg_rec_free(lcr);
1366         return rc;
1367 }
1368
1369 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1370                             struct llog_handle **llh, char *name)
1371 {
1372         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1373         struct llog_ctxt        *ctxt;
1374         int                      rc = 0;
1375         ENTRY;
1376
1377         if (*llh)
1378                 GOTO(out, rc = -EBUSY);
1379
1380         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1381         if (!ctxt)
1382                 GOTO(out, rc = -ENODEV);
1383         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1384
1385         rc = llog_open_create(env, ctxt, llh, NULL, name);
1386         if (rc)
1387                 GOTO(out_ctxt, rc);
1388         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1389         if (rc)
1390                 llog_close(env, *llh);
1391 out_ctxt:
1392         llog_ctxt_put(ctxt);
1393 out:
1394         if (rc) {
1395                 CERROR("%s: can't start log %s: rc = %d\n",
1396                        mgs->mgs_obd->obd_name, name, rc);
1397                 *llh = NULL;
1398         }
1399         RETURN(rc);
1400 }
1401
1402 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1403 {
1404         int rc;
1405
1406         rc = llog_close(env, *llh);
1407         *llh = NULL;
1408
1409         return rc;
1410 }
1411
1412 /******************** config "macros" *********************/
1413
1414 /* write an lcfg directly into a log (with markers) */
1415 static int mgs_write_log_direct(const struct lu_env *env,
1416                                 struct mgs_device *mgs, struct fs_db *fsdb,
1417                                 char *logname, struct llog_cfg_rec *lcr,
1418                                 char *devname, char *comment)
1419 {
1420         struct llog_handle *llh = NULL;
1421         int rc;
1422
1423         ENTRY;
1424
1425         rc = record_start_log(env, mgs, &llh, logname);
1426         if (rc)
1427                 RETURN(rc);
1428
1429         /* FIXME These should be a single journal transaction */
1430         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1431         if (rc)
1432                 GOTO(out_end, rc);
1433         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1434         if (rc)
1435                 GOTO(out_end, rc);
1436         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1437         if (rc)
1438                 GOTO(out_end, rc);
1439 out_end:
1440         record_end_log(env, &llh);
1441         RETURN(rc);
1442 }
1443
1444 /* write the lcfg in all logs for the given fs */
1445 static int mgs_write_log_direct_all(const struct lu_env *env,
1446                                     struct mgs_device *mgs,
1447                                     struct fs_db *fsdb,
1448                                     struct mgs_target_info *mti,
1449                                     struct llog_cfg_rec *lcr, char *devname,
1450                                     char *comment, int server_only)
1451 {
1452         struct list_head         log_list;
1453         struct mgs_direntry     *dirent, *n;
1454         char                    *fsname = mti->mti_fsname;
1455         int                      rc = 0, len = strlen(fsname);
1456
1457         ENTRY;
1458         /* Find all the logs in the CONFIGS directory */
1459         rc = class_dentry_readdir(env, mgs, &log_list);
1460         if (rc)
1461                 RETURN(rc);
1462
1463         /* Could use fsdb index maps instead of directory listing */
1464         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1465                 list_del_init(&dirent->mde_list);
1466                 /* don't write to sptlrpc rule log */
1467                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1468                         goto next;
1469
1470                 /* caller wants write server logs only */
1471                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1472                         goto next;
1473
1474                 if (strlen(dirent->mde_name) <= len ||
1475                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1476                     dirent->mde_name[len] != '-')
1477                         goto next;
1478
1479                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1480                 /* Erase any old settings of this same parameter */
1481                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1482                                 devname, comment, CM_SKIP);
1483                 if (rc < 0)
1484                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1485                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1486                 if (lcr == NULL)
1487                         goto next;
1488                 /* Write the new one */
1489                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1490                                           lcr, devname, comment);
1491                 if (rc != 0)
1492                         CERROR("%s: writing log %s: rc = %d\n",
1493                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1494 next:
1495                 mgs_direntry_free(dirent);
1496         }
1497
1498         RETURN(rc);
1499 }
1500
1501 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1502                                     struct mgs_device *mgs,
1503                                     struct fs_db *fsdb,
1504                                     struct mgs_target_info *mti,
1505                                     int index, char *logname);
1506 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1507                                     struct mgs_device *mgs,
1508                                     struct fs_db *fsdb,
1509                                     struct mgs_target_info *mti,
1510                                     char *logname, char *suffix, char *lovname,
1511                                     enum lustre_sec_part sec_part, int flags);
1512 static int name_create_mdt_and_lov(char **logname, char **lovname,
1513                                    struct fs_db *fsdb, int i);
1514
1515 static int add_param(char *params, char *key, char *val)
1516 {
1517         char *start = params + strlen(params);
1518         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1519         int keylen = 0;
1520
1521         if (key != NULL)
1522                 keylen = strlen(key);
1523         if (start + 1 + keylen + strlen(val) >= end) {
1524                 CERROR("params are too long: %s %s%s\n",
1525                        params, key != NULL ? key : "", val);
1526                 return -EINVAL;
1527         }
1528
1529         sprintf(start, " %s%s", key != NULL ? key : "", val);
1530         return 0;
1531 }
1532
1533 /**
1534  * Walk through client config log record and convert the related records
1535  * into the target.
1536  **/
1537 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1538                                          struct llog_handle *llh,
1539                                          struct llog_rec_hdr *rec, void *data)
1540 {
1541         struct mgs_device *mgs;
1542         struct obd_device *obd;
1543         struct mgs_target_info *mti, *tmti;
1544         struct fs_db *fsdb;
1545         int cfg_len = rec->lrh_len;
1546         char *cfg_buf = (char*) (rec + 1);
1547         struct lustre_cfg *lcfg;
1548         int rc = 0;
1549         struct llog_handle *mdt_llh = NULL;
1550         static int got_an_osc_or_mdc = 0;
1551         /* 0: not found any osc/mdc;
1552            1: found osc;
1553            2: found mdc;
1554         */
1555         static int last_step = -1;
1556         int cplen = 0;
1557
1558         ENTRY;
1559
1560         mti = ((struct temp_comp*)data)->comp_mti;
1561         tmti = ((struct temp_comp*)data)->comp_tmti;
1562         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1563         obd = ((struct temp_comp *)data)->comp_obd;
1564         mgs = lu2mgs_dev(obd->obd_lu_dev);
1565         LASSERT(mgs);
1566
1567         if (rec->lrh_type != OBD_CFG_REC) {
1568                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1569                 RETURN(-EINVAL);
1570         }
1571
1572         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1573         if (rc) {
1574                 CERROR("Insane cfg\n");
1575                 RETURN(rc);
1576         }
1577
1578         lcfg = (struct lustre_cfg *)cfg_buf;
1579
1580         if (lcfg->lcfg_command == LCFG_MARKER) {
1581                 struct cfg_marker *marker;
1582                 marker = lustre_cfg_buf(lcfg, 1);
1583                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1584                     (marker->cm_flags & CM_START) &&
1585                      !(marker->cm_flags & CM_SKIP)) {
1586                         got_an_osc_or_mdc = 1;
1587                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1588                                         sizeof(tmti->mti_svname));
1589                         if (cplen >= sizeof(tmti->mti_svname))
1590                                 RETURN(-E2BIG);
1591                         rc = record_start_log(env, mgs, &mdt_llh,
1592                                               mti->mti_svname);
1593                         if (rc)
1594                                 RETURN(rc);
1595                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1596                                            mti->mti_svname, "add osc(copied)");
1597                         record_end_log(env, &mdt_llh);
1598                         last_step = marker->cm_step;
1599                         RETURN(rc);
1600                 }
1601                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1602                     (marker->cm_flags & CM_END) &&
1603                      !(marker->cm_flags & CM_SKIP)) {
1604                         LASSERT(last_step == marker->cm_step);
1605                         last_step = -1;
1606                         got_an_osc_or_mdc = 0;
1607                         memset(tmti, 0, sizeof(*tmti));
1608                         rc = record_start_log(env, mgs, &mdt_llh,
1609                                               mti->mti_svname);
1610                         if (rc)
1611                                 RETURN(rc);
1612                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1613                                            mti->mti_svname, "add osc(copied)");
1614                         record_end_log(env, &mdt_llh);
1615                         RETURN(rc);
1616                 }
1617                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1618                     (marker->cm_flags & CM_START) &&
1619                      !(marker->cm_flags & CM_SKIP)) {
1620                         got_an_osc_or_mdc = 2;
1621                         last_step = marker->cm_step;
1622                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1623                                strlen(marker->cm_tgtname));
1624
1625                         RETURN(rc);
1626                 }
1627                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1628                     (marker->cm_flags & CM_END) &&
1629                      !(marker->cm_flags & CM_SKIP)) {
1630                         LASSERT(last_step == marker->cm_step);
1631                         last_step = -1;
1632                         got_an_osc_or_mdc = 0;
1633                         memset(tmti, 0, sizeof(*tmti));
1634                         RETURN(rc);
1635                 }
1636         }
1637
1638         if (got_an_osc_or_mdc == 0 || last_step < 0)
1639                 RETURN(rc);
1640
1641         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1642                 uint64_t nodenid = lcfg->lcfg_nid;
1643
1644                 if (strlen(tmti->mti_uuid) == 0) {
1645                         /* target uuid not set, this config record is before
1646                          * LCFG_SETUP, this nid is one of target node nid.
1647                          */
1648                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1649                         tmti->mti_nid_count++;
1650                 } else {
1651                         /* failover node nid */
1652                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1653                                        libcfs_nid2str(nodenid));
1654                 }
1655
1656                 RETURN(rc);
1657         }
1658
1659         if (lcfg->lcfg_command == LCFG_SETUP) {
1660                 char *target;
1661
1662                 target = lustre_cfg_string(lcfg, 1);
1663                 memcpy(tmti->mti_uuid, target, strlen(target));
1664                 RETURN(rc);
1665         }
1666
1667         /* ignore client side sptlrpc_conf_log */
1668         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1669                 RETURN(rc);
1670
1671         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1672                 int index;
1673
1674                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1675                         RETURN (-EINVAL);
1676
1677                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1678                        strlen(mti->mti_fsname));
1679                 tmti->mti_stripe_index = index;
1680
1681                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1682                                               mti->mti_stripe_index,
1683                                               mti->mti_svname);
1684                 memset(tmti, 0, sizeof(*tmti));
1685                 RETURN(rc);
1686         }
1687
1688         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1689                 int index;
1690                 char mdt_index[9];
1691                 char *logname, *lovname;
1692
1693                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1694                                              mti->mti_stripe_index);
1695                 if (rc)
1696                         RETURN(rc);
1697                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1698
1699                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1700                         name_destroy(&logname);
1701                         name_destroy(&lovname);
1702                         RETURN(-EINVAL);
1703                 }
1704
1705                 tmti->mti_stripe_index = index;
1706                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1707                                          mdt_index, lovname,
1708                                          LUSTRE_SP_MDT, 0);
1709                 name_destroy(&logname);
1710                 name_destroy(&lovname);
1711                 RETURN(rc);
1712         }
1713         RETURN(rc);
1714 }
1715
1716 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1717 /* stealed from mgs_get_fsdb_from_llog*/
1718 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1719                                               struct mgs_device *mgs,
1720                                               char *client_name,
1721                                               struct temp_comp* comp)
1722 {
1723         struct llog_handle *loghandle;
1724         struct mgs_target_info *tmti;
1725         struct llog_ctxt *ctxt;
1726         int rc;
1727
1728         ENTRY;
1729
1730         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1731         LASSERT(ctxt != NULL);
1732
1733         OBD_ALLOC_PTR(tmti);
1734         if (tmti == NULL)
1735                 GOTO(out_ctxt, rc = -ENOMEM);
1736
1737         comp->comp_tmti = tmti;
1738         comp->comp_obd = mgs->mgs_obd;
1739
1740         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1741                        LLOG_OPEN_EXISTS);
1742         if (rc < 0) {
1743                 if (rc == -ENOENT)
1744                         rc = 0;
1745                 GOTO(out_pop, rc);
1746         }
1747
1748         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1749         if (rc)
1750                 GOTO(out_close, rc);
1751
1752         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1753                                   (void *)comp, NULL, false);
1754         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1755 out_close:
1756         llog_close(env, loghandle);
1757 out_pop:
1758         OBD_FREE_PTR(tmti);
1759 out_ctxt:
1760         llog_ctxt_put(ctxt);
1761         RETURN(rc);
1762 }
1763
1764 /* lmv is the second thing for client logs */
1765 /* copied from mgs_write_log_lov. Please refer to that.  */
1766 static int mgs_write_log_lmv(const struct lu_env *env,
1767                              struct mgs_device *mgs,
1768                              struct fs_db *fsdb,
1769                              struct mgs_target_info *mti,
1770                              char *logname, char *lmvname)
1771 {
1772         struct llog_handle *llh = NULL;
1773         struct lmv_desc *lmvdesc;
1774         char *uuid;
1775         int rc = 0;
1776         ENTRY;
1777
1778         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1779
1780         OBD_ALLOC_PTR(lmvdesc);
1781         if (lmvdesc == NULL)
1782                 RETURN(-ENOMEM);
1783         lmvdesc->ld_active_tgt_count = 0;
1784         lmvdesc->ld_tgt_count = 0;
1785         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1786         uuid = (char *)lmvdesc->ld_uuid.uuid;
1787
1788         rc = record_start_log(env, mgs, &llh, logname);
1789         if (rc)
1790                 GOTO(out_free, rc);
1791         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1792         if (rc)
1793                 GOTO(out_end, rc);
1794         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1795         if (rc)
1796                 GOTO(out_end, rc);
1797         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1798         if (rc)
1799                 GOTO(out_end, rc);
1800         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1801         if (rc)
1802                 GOTO(out_end, rc);
1803 out_end:
1804         record_end_log(env, &llh);
1805 out_free:
1806         OBD_FREE_PTR(lmvdesc);
1807         RETURN(rc);
1808 }
1809
1810 /* lov is the first thing in the mdt and client logs */
1811 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1812                              struct fs_db *fsdb, struct mgs_target_info *mti,
1813                              char *logname, char *lovname)
1814 {
1815         struct llog_handle *llh = NULL;
1816         struct lov_desc *lovdesc;
1817         char *uuid;
1818         int rc = 0;
1819         ENTRY;
1820
1821         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1822
1823         /*
1824         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1825         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1826               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1827         */
1828
1829         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1830         OBD_ALLOC_PTR(lovdesc);
1831         if (lovdesc == NULL)
1832                 RETURN(-ENOMEM);
1833         lovdesc->ld_magic = LOV_DESC_MAGIC;
1834         lovdesc->ld_tgt_count = 0;
1835         /* Defaults.  Can be changed later by lcfg config_param */
1836         lovdesc->ld_default_stripe_count = 1;
1837         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1838         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1839         lovdesc->ld_default_stripe_offset = -1;
1840         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1841         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1842         /* can these be the same? */
1843         uuid = (char *)lovdesc->ld_uuid.uuid;
1844
1845         /* This should always be the first entry in a log.
1846         rc = mgs_clear_log(obd, logname); */
1847         rc = record_start_log(env, mgs, &llh, logname);
1848         if (rc)
1849                 GOTO(out_free, rc);
1850         /* FIXME these should be a single journal transaction */
1851         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1852         if (rc)
1853                 GOTO(out_end, rc);
1854         rc = record_attach(env, llh, lovname, "lov", uuid);
1855         if (rc)
1856                 GOTO(out_end, rc);
1857         rc = record_lov_setup(env, llh, lovname, lovdesc);
1858         if (rc)
1859                 GOTO(out_end, rc);
1860         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1861         if (rc)
1862                 GOTO(out_end, rc);
1863         EXIT;
1864 out_end:
1865         record_end_log(env, &llh);
1866 out_free:
1867         OBD_FREE_PTR(lovdesc);
1868         return rc;
1869 }
1870
1871 /* add failnids to open log */
1872 static int mgs_write_log_failnids(const struct lu_env *env,
1873                                   struct mgs_target_info *mti,
1874                                   struct llog_handle *llh,
1875                                   char *cliname)
1876 {
1877         char *failnodeuuid = NULL;
1878         char *ptr = mti->mti_params;
1879         lnet_nid_t nid;
1880         int rc = 0;
1881
1882         /*
1883         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1884         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1885         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1886         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1887         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1888         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1889         */
1890
1891         /*
1892          * Pull failnid info out of params string, which may contain something
1893          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1894          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1895          * etc.  However, convert_hostnames() should have caught those.
1896          */
1897         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1898                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1899                         if (failnodeuuid == NULL) {
1900                                 /* We don't know the failover node name,
1901                                    so just use the first nid as the uuid */
1902                                 rc = name_create(&failnodeuuid,
1903                                                  libcfs_nid2str(nid), "");
1904                                 if (rc)
1905                                         return rc;
1906                         }
1907                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1908                                "client %s\n", libcfs_nid2str(nid),
1909                                failnodeuuid, cliname);
1910                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1911                         /*
1912                          * If *ptr is ':', we have added all NIDs for
1913                          * failnodeuuid.
1914                          */
1915                         if (*ptr == ':') {
1916                                 rc = record_add_conn(env, llh, cliname,
1917                                                      failnodeuuid);
1918                                 name_destroy(&failnodeuuid);
1919                                 failnodeuuid = NULL;
1920                         }
1921                 }
1922                 if (failnodeuuid) {
1923                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1924                         name_destroy(&failnodeuuid);
1925                         failnodeuuid = NULL;
1926                 }
1927         }
1928
1929         return rc;
1930 }
1931
1932 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1933                                     struct mgs_device *mgs,
1934                                     struct fs_db *fsdb,
1935                                     struct mgs_target_info *mti,
1936                                     char *logname, char *lmvname)
1937 {
1938         struct llog_handle *llh = NULL;
1939         char *mdcname = NULL;
1940         char *nodeuuid = NULL;
1941         char *mdcuuid = NULL;
1942         char *lmvuuid = NULL;
1943         char index[6];
1944         int i, rc;
1945         ENTRY;
1946
1947         if (mgs_log_is_empty(env, mgs, logname)) {
1948                 CERROR("log is empty! Logical error\n");
1949                 RETURN(-EINVAL);
1950         }
1951
1952         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1953                mti->mti_svname, logname, lmvname);
1954
1955         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1956         if (rc)
1957                 RETURN(rc);
1958         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1959         if (rc)
1960                 GOTO(out_free, rc);
1961         rc = name_create(&mdcuuid, mdcname, "_UUID");
1962         if (rc)
1963                 GOTO(out_free, rc);
1964         rc = name_create(&lmvuuid, lmvname, "_UUID");
1965         if (rc)
1966                 GOTO(out_free, rc);
1967
1968         rc = record_start_log(env, mgs, &llh, logname);
1969         if (rc)
1970                 GOTO(out_free, rc);
1971         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1972                            "add mdc");
1973         if (rc)
1974                 GOTO(out_end, rc);
1975         for (i = 0; i < mti->mti_nid_count; i++) {
1976                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1977                        libcfs_nid2str(mti->mti_nids[i]));
1978
1979                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1980                 if (rc)
1981                         GOTO(out_end, rc);
1982         }
1983
1984         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1985         if (rc)
1986                 GOTO(out_end, rc);
1987         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1988         if (rc)
1989                 GOTO(out_end, rc);
1990         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1991         if (rc)
1992                 GOTO(out_end, rc);
1993         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1994         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1995                             index, "1");
1996         if (rc)
1997                 GOTO(out_end, rc);
1998         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1999                            "add mdc");
2000         if (rc)
2001                 GOTO(out_end, rc);
2002 out_end:
2003         record_end_log(env, &llh);
2004 out_free:
2005         name_destroy(&lmvuuid);
2006         name_destroy(&mdcuuid);
2007         name_destroy(&mdcname);
2008         name_destroy(&nodeuuid);
2009         RETURN(rc);
2010 }
2011
2012 static inline int name_create_lov(char **lovname, char *mdtname,
2013                                   struct fs_db *fsdb, int index)
2014 {
2015         /* COMPAT_180 */
2016         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2017                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2018         else
2019                 return name_create(lovname, mdtname, "-mdtlov");
2020 }
2021
2022 static int name_create_mdt_and_lov(char **logname, char **lovname,
2023                                    struct fs_db *fsdb, int i)
2024 {
2025         int rc;
2026
2027         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2028         if (rc)
2029                 return rc;
2030         /* COMPAT_180 */
2031         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2032                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2033         else
2034                 rc = name_create(lovname, *logname, "-mdtlov");
2035         if (rc) {
2036                 name_destroy(logname);
2037                 *logname = NULL;
2038         }
2039         return rc;
2040 }
2041
2042 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2043                                       struct fs_db *fsdb, int i)
2044 {
2045         char suffix[16];
2046
2047         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2048                 sprintf(suffix, "-osc");
2049         else
2050                 sprintf(suffix, "-osc-MDT%04x", i);
2051         return name_create(oscname, ostname, suffix);
2052 }
2053
2054 /* add new mdc to already existent MDS */
2055 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2056                                     struct mgs_device *mgs,
2057                                     struct fs_db *fsdb,
2058                                     struct mgs_target_info *mti,
2059                                     int mdt_index, char *logname)
2060 {
2061         struct llog_handle      *llh = NULL;
2062         char    *nodeuuid = NULL;
2063         char    *ospname = NULL;
2064         char    *lovuuid = NULL;
2065         char    *mdtuuid = NULL;
2066         char    *svname = NULL;
2067         char    *mdtname = NULL;
2068         char    *lovname = NULL;
2069         char    index_str[16];
2070         int     i, rc;
2071
2072         ENTRY;
2073         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2074                 CERROR("log is empty! Logical error\n");
2075                 RETURN (-EINVAL);
2076         }
2077
2078         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2079                logname);
2080
2081         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2082         if (rc)
2083                 RETURN(rc);
2084
2085         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2086         if (rc)
2087                 GOTO(out_destory, rc);
2088
2089         rc = name_create(&svname, mdtname, "-osp");
2090         if (rc)
2091                 GOTO(out_destory, rc);
2092
2093         sprintf(index_str, "-MDT%04x", mdt_index);
2094         rc = name_create(&ospname, svname, index_str);
2095         if (rc)
2096                 GOTO(out_destory, rc);
2097
2098         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2099         if (rc)
2100                 GOTO(out_destory, rc);
2101
2102         rc = name_create(&lovuuid, lovname, "_UUID");
2103         if (rc)
2104                 GOTO(out_destory, rc);
2105
2106         rc = name_create(&mdtuuid, mdtname, "_UUID");
2107         if (rc)
2108                 GOTO(out_destory, rc);
2109
2110         rc = record_start_log(env, mgs, &llh, logname);
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2115                            "add osp");
2116         if (rc)
2117                 GOTO(out_destory, rc);
2118
2119         for (i = 0; i < mti->mti_nid_count; i++) {
2120                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2121                        libcfs_nid2str(mti->mti_nids[i]));
2122                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2123                 if (rc)
2124                         GOTO(out_end, rc);
2125         }
2126
2127         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2128         if (rc)
2129                 GOTO(out_end, rc);
2130
2131         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2132                           NULL, NULL);
2133         if (rc)
2134                 GOTO(out_end, rc);
2135
2136         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2137         if (rc)
2138                 GOTO(out_end, rc);
2139
2140         /* Add mdc(osp) to lod */
2141         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2142                  mti->mti_stripe_index);
2143         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2144                          index_str, "1", NULL);
2145         if (rc)
2146                 GOTO(out_end, rc);
2147
2148         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2149         if (rc)
2150                 GOTO(out_end, rc);
2151
2152 out_end:
2153         record_end_log(env, &llh);
2154
2155 out_destory:
2156         name_destroy(&mdtuuid);
2157         name_destroy(&lovuuid);
2158         name_destroy(&lovname);
2159         name_destroy(&ospname);
2160         name_destroy(&svname);
2161         name_destroy(&nodeuuid);
2162         name_destroy(&mdtname);
2163         RETURN(rc);
2164 }
2165
2166 static int mgs_write_log_mdt0(const struct lu_env *env,
2167                               struct mgs_device *mgs,
2168                               struct fs_db *fsdb,
2169                               struct mgs_target_info *mti)
2170 {
2171         char *log = mti->mti_svname;
2172         struct llog_handle *llh = NULL;
2173         char *uuid, *lovname;
2174         char mdt_index[6];
2175         char *ptr = mti->mti_params;
2176         int rc = 0, failout = 0;
2177         ENTRY;
2178
2179         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2180         if (uuid == NULL)
2181                 RETURN(-ENOMEM);
2182
2183         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2184                 failout = (strncmp(ptr, "failout", 7) == 0);
2185
2186         rc = name_create(&lovname, log, "-mdtlov");
2187         if (rc)
2188                 GOTO(out_free, rc);
2189         if (mgs_log_is_empty(env, mgs, log)) {
2190                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2191                 if (rc)
2192                         GOTO(out_lod, rc);
2193         }
2194
2195         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2196
2197         rc = record_start_log(env, mgs, &llh, log);
2198         if (rc)
2199                 GOTO(out_lod, rc);
2200
2201         /* add MDT itself */
2202
2203         /* FIXME this whole fn should be a single journal transaction */
2204         sprintf(uuid, "%s_UUID", log);
2205         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2206         if (rc)
2207                 GOTO(out_lod, rc);
2208         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2209         if (rc)
2210                 GOTO(out_end, rc);
2211         rc = record_mount_opt(env, llh, log, lovname, NULL);
2212         if (rc)
2213                 GOTO(out_end, rc);
2214         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2215                         failout ? "n" : "f");
2216         if (rc)
2217                 GOTO(out_end, rc);
2218         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2219         if (rc)
2220                 GOTO(out_end, rc);
2221 out_end:
2222         record_end_log(env, &llh);
2223 out_lod:
2224         name_destroy(&lovname);
2225 out_free:
2226         OBD_FREE(uuid, sizeof(struct obd_uuid));
2227         RETURN(rc);
2228 }
2229
2230 /* envelope method for all layers log */
2231 static int mgs_write_log_mdt(const struct lu_env *env,
2232                              struct mgs_device *mgs,
2233                              struct fs_db *fsdb,
2234                              struct mgs_target_info *mti)
2235 {
2236         struct mgs_thread_info *mgi = mgs_env_info(env);
2237         struct llog_handle *llh = NULL;
2238         char *cliname;
2239         int rc, i = 0;
2240         ENTRY;
2241
2242         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2243
2244         if (mti->mti_uuid[0] == '\0') {
2245                 /* Make up our own uuid */
2246                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2247                          "%s_UUID", mti->mti_svname);
2248         }
2249
2250         /* add mdt */
2251         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2252         if (rc)
2253                 RETURN(rc);
2254         /* Append the mdt info to the client log */
2255         rc = name_create(&cliname, mti->mti_fsname, "-client");
2256         if (rc)
2257                 RETURN(rc);
2258
2259         if (mgs_log_is_empty(env, mgs, cliname)) {
2260                 /* Start client log */
2261                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2262                                        fsdb->fsdb_clilov);
2263                 if (rc)
2264                         GOTO(out_free, rc);
2265                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2266                                        fsdb->fsdb_clilmv);
2267                 if (rc)
2268                         GOTO(out_free, rc);
2269         }
2270
2271         /*
2272         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2273         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2274         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2275         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2276         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2277         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2278         */
2279
2280                 /* copy client info about lov/lmv */
2281                 mgi->mgi_comp.comp_mti = mti;
2282                 mgi->mgi_comp.comp_fsdb = fsdb;
2283
2284                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2285                                                         &mgi->mgi_comp);
2286                 if (rc)
2287                         GOTO(out_free, rc);
2288                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2289                                               fsdb->fsdb_clilmv);
2290                 if (rc)
2291                         GOTO(out_free, rc);
2292
2293                 /* add mountopts */
2294                 rc = record_start_log(env, mgs, &llh, cliname);
2295                 if (rc)
2296                         GOTO(out_free, rc);
2297
2298                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2299                                    "mount opts");
2300                 if (rc)
2301                         GOTO(out_end, rc);
2302                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2303                                       fsdb->fsdb_clilmv);
2304                 if (rc)
2305                         GOTO(out_end, rc);
2306                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2307                                    "mount opts");
2308
2309         if (rc)
2310                 GOTO(out_end, rc);
2311
2312         /* for_all_existing_mdt except current one */
2313         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2314                 if (i !=  mti->mti_stripe_index &&
2315                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2316                         char *logname;
2317
2318                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2319                         if (rc)
2320                                 GOTO(out_end, rc);
2321
2322                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2323                                                       i, logname);
2324                         name_destroy(&logname);
2325                         if (rc)
2326                                 GOTO(out_end, rc);
2327                 }
2328         }
2329 out_end:
2330         record_end_log(env, &llh);
2331 out_free:
2332         name_destroy(&cliname);
2333         RETURN(rc);
2334 }
2335
2336 /* Add the ost info to the client/mdt lov */
2337 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2338                                     struct mgs_device *mgs, struct fs_db *fsdb,
2339                                     struct mgs_target_info *mti,
2340                                     char *logname, char *suffix, char *lovname,
2341                                     enum lustre_sec_part sec_part, int flags)
2342 {
2343         struct llog_handle *llh = NULL;
2344         char *nodeuuid = NULL;
2345         char *oscname = NULL;
2346         char *oscuuid = NULL;
2347         char *lovuuid = NULL;
2348         char *svname = NULL;
2349         char index[6];
2350         int i, rc;
2351
2352         ENTRY;
2353         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2354                mti->mti_svname, logname);
2355
2356         if (mgs_log_is_empty(env, mgs, logname)) {
2357                 CERROR("log is empty! Logical error\n");
2358                 RETURN (-EINVAL);
2359         }
2360
2361         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2362         if (rc)
2363                 RETURN(rc);
2364         rc = name_create(&svname, mti->mti_svname, "-osc");
2365         if (rc)
2366                 GOTO(out_free, rc);
2367
2368         /* for the system upgraded from old 1.8, keep using the old osc naming
2369          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2370         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2371                 rc = name_create(&oscname, svname, "");
2372         else
2373                 rc = name_create(&oscname, svname, suffix);
2374         if (rc)
2375                 GOTO(out_free, rc);
2376
2377         rc = name_create(&oscuuid, oscname, "_UUID");
2378         if (rc)
2379                 GOTO(out_free, rc);
2380         rc = name_create(&lovuuid, lovname, "_UUID");
2381         if (rc)
2382                 GOTO(out_free, rc);
2383
2384
2385         /*
2386         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2387         multihomed (#4)
2388         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2389         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2390         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2391         failover (#6,7)
2392         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2393         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2394         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2395         */
2396
2397         rc = record_start_log(env, mgs, &llh, logname);
2398         if (rc)
2399                 GOTO(out_free, rc);
2400
2401         /* FIXME these should be a single journal transaction */
2402         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2403                            "add osc");
2404         if (rc)
2405                 GOTO(out_end, rc);
2406
2407         /* NB: don't change record order, because upon MDT steal OSC config
2408          * from client, it treats all nids before LCFG_SETUP as target nids
2409          * (multiple interfaces), while nids after as failover node nids.
2410          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2411          */
2412         for (i = 0; i < mti->mti_nid_count; i++) {
2413                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2414                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2415                 if (rc)
2416                         GOTO(out_end, rc);
2417         }
2418         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2419         if (rc)
2420                 GOTO(out_end, rc);
2421         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2422         if (rc)
2423                 GOTO(out_end, rc);
2424         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2425         if (rc)
2426                 GOTO(out_end, rc);
2427
2428         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2429
2430         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2431         if (rc)
2432                 GOTO(out_end, rc);
2433         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2434                            "add osc");
2435         if (rc)
2436                 GOTO(out_end, rc);
2437 out_end:
2438         record_end_log(env, &llh);
2439 out_free:
2440         name_destroy(&lovuuid);
2441         name_destroy(&oscuuid);
2442         name_destroy(&oscname);
2443         name_destroy(&svname);
2444         name_destroy(&nodeuuid);
2445         RETURN(rc);
2446 }
2447
2448 static int mgs_write_log_ost(const struct lu_env *env,
2449                              struct mgs_device *mgs, struct fs_db *fsdb,
2450                              struct mgs_target_info *mti)
2451 {
2452         struct llog_handle *llh = NULL;
2453         char *logname, *lovname;
2454         char *ptr = mti->mti_params;
2455         int rc, flags = 0, failout = 0, i;
2456         ENTRY;
2457
2458         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2459
2460         /* The ost startup log */
2461
2462         /* If the ost log already exists, that means that someone reformatted
2463            the ost and it called target_add again. */
2464         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2465                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2466                                    "exists, yet the server claims it never "
2467                                    "registered. It may have been reformatted, "
2468                                    "or the index changed. writeconf the MDT to "
2469                                    "regenerate all logs.\n", mti->mti_svname);
2470                 RETURN(-EALREADY);
2471         }
2472
2473         /*
2474         attach obdfilter ost1 ost1_UUID
2475         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2476         */
2477         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2478                 failout = (strncmp(ptr, "failout", 7) == 0);
2479         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2480         if (rc)
2481                 RETURN(rc);
2482         /* FIXME these should be a single journal transaction */
2483         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2484         if (rc)
2485                 GOTO(out_end, rc);
2486         if (*mti->mti_uuid == '\0')
2487                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2488                          "%s_UUID", mti->mti_svname);
2489         rc = record_attach(env, llh, mti->mti_svname,
2490                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2491         if (rc)
2492                 GOTO(out_end, rc);
2493         rc = record_setup(env, llh, mti->mti_svname,
2494                           "dev"/*ignored*/, "type"/*ignored*/,
2495                           failout ? "n" : "f", 0/*options*/);
2496         if (rc)
2497                 GOTO(out_end, rc);
2498         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2499         if (rc)
2500                 GOTO(out_end, rc);
2501 out_end:
2502         record_end_log(env, &llh);
2503         if (rc)
2504                 RETURN(rc);
2505         /* We also have to update the other logs where this osc is part of
2506            the lov */
2507
2508         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2509                 /* If we're upgrading, the old mdt log already has our
2510                    entry. Let's do a fake one for fun. */
2511                 /* Note that we can't add any new failnids, since we don't
2512                    know the old osc names. */
2513                 flags = CM_SKIP | CM_UPGRADE146;
2514
2515         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2516                 /* If the update flag isn't set, don't update client/mdt
2517                    logs. */
2518                 flags |= CM_SKIP;
2519                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2520                               "the MDT first to regenerate it.\n",
2521                               mti->mti_svname);
2522         }
2523
2524         /* Add ost to all MDT lov defs */
2525         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2526                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2527                         char mdt_index[9];
2528
2529                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2530                                                      i);
2531                         if (rc)
2532                                 RETURN(rc);
2533                         sprintf(mdt_index, "-MDT%04x", i);
2534                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2535                                                       logname, mdt_index,
2536                                                       lovname, LUSTRE_SP_MDT,
2537                                                       flags);
2538                         name_destroy(&logname);
2539                         name_destroy(&lovname);
2540                         if (rc)
2541                                 RETURN(rc);
2542                 }
2543         }
2544
2545         /* Append ost info to the client log */
2546         rc = name_create(&logname, mti->mti_fsname, "-client");
2547         if (rc)
2548                 RETURN(rc);
2549         if (mgs_log_is_empty(env, mgs, logname)) {
2550                 /* Start client log */
2551                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2552                                        fsdb->fsdb_clilov);
2553                 if (rc)
2554                         GOTO(out_free, rc);
2555                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2556                                        fsdb->fsdb_clilmv);
2557                 if (rc)
2558                         GOTO(out_free, rc);
2559         }
2560         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2561                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2562 out_free:
2563         name_destroy(&logname);
2564         RETURN(rc);
2565 }
2566
2567 static __inline__ int mgs_param_empty(char *ptr)
2568 {
2569         char *tmp;
2570
2571         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2572                 return 1;
2573         return 0;
2574 }
2575
2576 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2577                                           struct mgs_device *mgs,
2578                                           struct fs_db *fsdb,
2579                                           struct mgs_target_info *mti,
2580                                           char *logname, char *cliname)
2581 {
2582         int rc;
2583         struct llog_handle *llh = NULL;
2584
2585         if (mgs_param_empty(mti->mti_params)) {
2586                 /* Remove _all_ failnids */
2587                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2588                                 mti->mti_svname, "add failnid", CM_SKIP);
2589                 return rc < 0 ? rc : 0;
2590         }
2591
2592         /* Otherwise failover nids are additive */
2593         rc = record_start_log(env, mgs, &llh, logname);
2594         if (rc)
2595                 return rc;
2596                 /* FIXME this should be a single journal transaction */
2597         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2598                            "add failnid");
2599         if (rc)
2600                 goto out_end;
2601         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2602         if (rc)
2603                 goto out_end;
2604         rc = record_marker(env, llh, fsdb, CM_END,
2605                            mti->mti_svname, "add failnid");
2606 out_end:
2607         record_end_log(env, &llh);
2608         return rc;
2609 }
2610
2611
2612 /* Add additional failnids to an existing log.
2613    The mdc/osc must have been added to logs first */
2614 /* tcp nids must be in dotted-quad ascii -
2615    we can't resolve hostnames from the kernel. */
2616 static int mgs_write_log_add_failnid(const struct lu_env *env,
2617                                      struct mgs_device *mgs,
2618                                      struct fs_db *fsdb,
2619                                      struct mgs_target_info *mti)
2620 {
2621         char *logname, *cliname;
2622         int rc;
2623         ENTRY;
2624
2625         /* FIXME we currently can't erase the failnids
2626          * given when a target first registers, since they aren't part of
2627          * an "add uuid" stanza */
2628
2629         /* Verify that we know about this target */
2630         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2631                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2632                                    "yet. It must be started before failnids "
2633                                    "can be added.\n", mti->mti_svname);
2634                 RETURN(-ENOENT);
2635         }
2636
2637         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2638         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2639                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2640         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2641                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2642         } else {
2643                 RETURN(-EINVAL);
2644         }
2645         if (rc)
2646                 RETURN(rc);
2647         /* Add failover nids to the client log */
2648         rc = name_create(&logname, mti->mti_fsname, "-client");
2649         if (rc) {
2650                 name_destroy(&cliname);
2651                 RETURN(rc);
2652         }
2653         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2654         name_destroy(&logname);
2655         name_destroy(&cliname);
2656         if (rc)
2657                 RETURN(rc);
2658
2659         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2660                 /* Add OST failover nids to the MDT logs as well */
2661                 int i;
2662
2663                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2664                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2665                                 continue;
2666                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2667                         if (rc)
2668                                 RETURN(rc);
2669                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2670                                                  fsdb, i);
2671                         if (rc) {
2672                                 name_destroy(&logname);
2673                                 RETURN(rc);
2674                         }
2675                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2676                                                             mti, logname,
2677                                                             cliname);
2678                         name_destroy(&cliname);
2679                         name_destroy(&logname);
2680                         if (rc)
2681                                 RETURN(rc);
2682                 }
2683         }
2684
2685         RETURN(rc);
2686 }
2687
2688 static int mgs_wlp_lcfg(const struct lu_env *env,
2689                         struct mgs_device *mgs, struct fs_db *fsdb,
2690                         struct mgs_target_info *mti,
2691                         char *logname, struct lustre_cfg_bufs *bufs,
2692                         char *tgtname, char *ptr)
2693 {
2694         char comment[MTI_NAME_MAXLEN];
2695         char *tmp;
2696         struct llog_cfg_rec *lcr;
2697         int rc, del;
2698
2699         /* Erase any old settings of this same parameter */
2700         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2701         comment[MTI_NAME_MAXLEN - 1] = 0;
2702         /* But don't try to match the value. */
2703         tmp = strchr(comment, '=');
2704         if (tmp != NULL)
2705                 *tmp = 0;
2706         /* FIXME we should skip settings that are the same as old values */
2707         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2708         if (rc < 0)
2709                 return rc;
2710         del = mgs_param_empty(ptr);
2711
2712         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2713                       "Setting" : "Modifying", tgtname, comment, logname);
2714         if (del) {
2715                 /* mgs_modify() will return 1 if nothing had to be done */
2716                 if (rc == 1)
2717                         rc = 0;
2718                 return rc;
2719         }
2720
2721         lustre_cfg_bufs_reset(bufs, tgtname);
2722         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2723         if (mti->mti_flags & LDD_F_PARAM2)
2724                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2725
2726         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2727                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2728         if (lcr == NULL)
2729                 return -ENOMEM;
2730
2731         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2732                                   comment);
2733         lustre_cfg_rec_free(lcr);
2734         return rc;
2735 }
2736
2737 static int mgs_write_log_param2(const struct lu_env *env,
2738                                 struct mgs_device *mgs,
2739                                 struct fs_db *fsdb,
2740                                 struct mgs_target_info *mti, char *ptr)
2741 {
2742         struct lustre_cfg_bufs  bufs;
2743         int                     rc = 0;
2744         ENTRY;
2745
2746         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2747         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2748                           mti->mti_svname, ptr);
2749
2750         RETURN(rc);
2751 }
2752
2753 /* write global variable settings into log */
2754 static int mgs_write_log_sys(const struct lu_env *env,
2755                              struct mgs_device *mgs, struct fs_db *fsdb,
2756                              struct mgs_target_info *mti, char *sys, char *ptr)
2757 {
2758         struct mgs_thread_info  *mgi = mgs_env_info(env);
2759         struct lustre_cfg       *lcfg;
2760         struct llog_cfg_rec     *lcr;
2761         char *tmp, sep;
2762         int rc, cmd, convert = 1;
2763
2764         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2765                 cmd = LCFG_SET_TIMEOUT;
2766         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2767                 cmd = LCFG_SET_LDLM_TIMEOUT;
2768         /* Check for known params here so we can return error to lctl */
2769         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2770                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2771                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2772                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2773                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2774                 cmd = LCFG_PARAM;
2775         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2776                 convert = 0; /* Don't convert string value to integer */
2777                 cmd = LCFG_PARAM;
2778         } else {
2779                 return -EINVAL;
2780         }
2781
2782         if (mgs_param_empty(ptr))
2783                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2784         else
2785                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2786
2787         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2788         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2789         if (!convert && *tmp != '\0')
2790                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2791         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2792         if (lcr == NULL)
2793                 return -ENOMEM;
2794
2795         lcfg = &lcr->lcr_cfg;
2796         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2797         /* truncate the comment to the parameter name */
2798         ptr = tmp - 1;
2799         sep = *ptr;
2800         *ptr = '\0';
2801         /* modify all servers and clients */
2802         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2803                                       *tmp == '\0' ? NULL : lcr,
2804                                       mti->mti_fsname, sys, 0);
2805         if (rc == 0 && *tmp != '\0') {
2806                 switch (cmd) {
2807                 case LCFG_SET_TIMEOUT:
2808                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2809                                 class_process_config(lcfg);
2810                         break;
2811                 case LCFG_SET_LDLM_TIMEOUT:
2812                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2813                                 class_process_config(lcfg);
2814                         break;
2815                 default:
2816                         break;
2817                 }
2818         }
2819         *ptr = sep;
2820         lustre_cfg_rec_free(lcr);
2821         return rc;
2822 }
2823
2824 /* write quota settings into log */
2825 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2826                                struct fs_db *fsdb, struct mgs_target_info *mti,
2827                                char *quota, char *ptr)
2828 {
2829         struct mgs_thread_info  *mgi = mgs_env_info(env);
2830         struct llog_cfg_rec     *lcr;
2831         char                    *tmp;
2832         char                     sep;
2833         int                      rc, cmd = LCFG_PARAM;
2834
2835         /* support only 'meta' and 'data' pools so far */
2836         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2837             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2838                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2839                        "& quota.ost are)\n", ptr);
2840                 return -EINVAL;
2841         }
2842
2843         if (*tmp == '\0') {
2844                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2845         } else {
2846                 CDEBUG(D_MGS, "global '%s'\n", quota);
2847
2848                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2849                     strcmp(tmp, "none") != 0) {
2850                         CERROR("enable option(%s) isn't supported\n", tmp);
2851                         return -EINVAL;
2852                 }
2853         }
2854
2855         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2856         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2857         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2858         if (lcr == NULL)
2859                 return -ENOMEM;
2860
2861         /* truncate the comment to the parameter name */
2862         ptr = tmp - 1;
2863         sep = *ptr;
2864         *ptr = '\0';
2865
2866         /* XXX we duplicated quota enable information in all server
2867          *     config logs, it should be moved to a separate config
2868          *     log once we cleanup the config log for global param. */
2869         /* modify all servers */
2870         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2871                                       *tmp == '\0' ? NULL : lcr,
2872                                       mti->mti_fsname, quota, 1);
2873         *ptr = sep;
2874         lustre_cfg_rec_free(lcr);
2875         return rc < 0 ? rc : 0;
2876 }
2877
2878 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2879                                    struct mgs_device *mgs,
2880                                    struct fs_db *fsdb,
2881                                    struct mgs_target_info *mti,
2882                                    char *param)
2883 {
2884         struct mgs_thread_info  *mgi = mgs_env_info(env);
2885         struct llog_cfg_rec     *lcr;
2886         struct llog_handle      *llh = NULL;
2887         char                    *logname;
2888         char                    *comment, *ptr;
2889         int                      rc, len;
2890
2891         ENTRY;
2892
2893         /* get comment */
2894         ptr = strchr(param, '=');
2895         LASSERT(ptr != NULL);
2896         len = ptr - param;
2897
2898         OBD_ALLOC(comment, len + 1);
2899         if (comment == NULL)
2900                 RETURN(-ENOMEM);
2901         strncpy(comment, param, len);
2902         comment[len] = '\0';
2903
2904         /* prepare lcfg */
2905         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2906         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2907         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2908         if (lcr == NULL)
2909                 GOTO(out_comment, rc = -ENOMEM);
2910
2911         /* construct log name */
2912         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2913         if (rc < 0)
2914                 GOTO(out_lcfg, rc);
2915
2916         if (mgs_log_is_empty(env, mgs, logname)) {
2917                 rc = record_start_log(env, mgs, &llh, logname);
2918                 if (rc < 0)
2919                         GOTO(out, rc);
2920                 record_end_log(env, &llh);
2921         }
2922
2923         /* obsolete old one */
2924         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2925                         comment, CM_SKIP);
2926         if (rc < 0)
2927                 GOTO(out, rc);
2928         /* write the new one */
2929         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2930                                   mti->mti_svname, comment);
2931         if (rc)
2932                 CERROR("%s: error writing log %s: rc = %d\n",
2933                        mgs->mgs_obd->obd_name, logname, rc);
2934 out:
2935         name_destroy(&logname);
2936 out_lcfg:
2937         lustre_cfg_rec_free(lcr);
2938 out_comment:
2939         OBD_FREE(comment, len + 1);
2940         RETURN(rc);
2941 }
2942
2943 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2944                                         char *param)
2945 {
2946         char    *ptr;
2947
2948         /* disable the adjustable udesc parameter for now, i.e. use default
2949          * setting that client always ship udesc to MDT if possible. to enable
2950          * it simply remove the following line */
2951         goto error_out;
2952
2953         ptr = strchr(param, '=');
2954         if (ptr == NULL)
2955                 goto error_out;
2956         *ptr++ = '\0';
2957
2958         if (strcmp(param, PARAM_SRPC_UDESC))
2959                 goto error_out;
2960
2961         if (strcmp(ptr, "yes") == 0) {
2962                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2963                 CWARN("Enable user descriptor shipping from client to MDT\n");
2964         } else if (strcmp(ptr, "no") == 0) {
2965                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2966                 CWARN("Disable user descriptor shipping from client to MDT\n");
2967         } else {
2968                 *(ptr - 1) = '=';
2969                 goto error_out;
2970         }
2971         return 0;
2972
2973 error_out:
2974         CERROR("Invalid param: %s\n", param);
2975         return -EINVAL;
2976 }
2977
2978 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2979                                   const char *svname,
2980                                   char *param)
2981 {
2982         struct sptlrpc_rule      rule;
2983         struct sptlrpc_rule_set *rset;
2984         int                      rc;
2985         ENTRY;
2986
2987         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2988                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2989                 RETURN(-EINVAL);
2990         }
2991
2992         if (strncmp(param, PARAM_SRPC_UDESC,
2993                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2994                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2995         }
2996
2997         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2998                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2999                 RETURN(-EINVAL);
3000         }
3001
3002         param += sizeof(PARAM_SRPC_FLVR) - 1;
3003
3004         rc = sptlrpc_parse_rule(param, &rule);
3005         if (rc)
3006                 RETURN(rc);
3007
3008         /* mgs rules implies must be mgc->mgs */
3009         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3010                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3011                      rule.sr_from != LUSTRE_SP_ANY) ||
3012                     (rule.sr_to != LUSTRE_SP_MGS &&
3013                      rule.sr_to != LUSTRE_SP_ANY))
3014                         RETURN(-EINVAL);
3015         }
3016
3017         /* preapre room for this coming rule. svcname format should be:
3018          * - fsname: general rule
3019          * - fsname-tgtname: target-specific rule
3020          */
3021         if (strchr(svname, '-')) {
3022                 struct mgs_tgt_srpc_conf *tgtconf;
3023                 int                       found = 0;
3024
3025                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3026                      tgtconf = tgtconf->mtsc_next) {
3027                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3028                                 found = 1;
3029                                 break;
3030                         }
3031                 }
3032
3033                 if (!found) {
3034                         int name_len;
3035
3036                         OBD_ALLOC_PTR(tgtconf);
3037                         if (tgtconf == NULL)
3038                                 RETURN(-ENOMEM);
3039
3040                         name_len = strlen(svname);
3041
3042                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3043                         if (tgtconf->mtsc_tgt == NULL) {
3044                                 OBD_FREE_PTR(tgtconf);
3045                                 RETURN(-ENOMEM);
3046                         }
3047                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3048
3049                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3050                         fsdb->fsdb_srpc_tgt = tgtconf;
3051                 }
3052
3053                 rset = &tgtconf->mtsc_rset;
3054         } else {
3055                 rset = &fsdb->fsdb_srpc_gen;
3056         }
3057
3058         rc = sptlrpc_rule_set_merge(rset, &rule);
3059
3060         RETURN(rc);
3061 }
3062
3063 static int mgs_srpc_set_param(const struct lu_env *env,
3064                               struct mgs_device *mgs,
3065                               struct fs_db *fsdb,
3066                               struct mgs_target_info *mti,
3067                               char *param)
3068 {
3069         char                   *copy;
3070         int                     rc, copy_size;
3071         ENTRY;
3072
3073 #ifndef HAVE_GSS
3074         RETURN(-EINVAL);
3075 #endif
3076         /* keep a copy of original param, which could be destroied
3077          * during parsing */
3078         copy_size = strlen(param) + 1;
3079         OBD_ALLOC(copy, copy_size);
3080         if (copy == NULL)
3081                 return -ENOMEM;
3082         memcpy(copy, param, copy_size);
3083
3084         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3085         if (rc)
3086                 goto out_free;
3087
3088         /* previous steps guaranteed the syntax is correct */
3089         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3090         if (rc)
3091                 goto out_free;
3092
3093         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3094                 /*
3095                  * for mgs rules, make them effective immediately.
3096                  */
3097                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3098                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3099                                                  &fsdb->fsdb_srpc_gen);
3100         }
3101
3102 out_free:
3103         OBD_FREE(copy, copy_size);
3104         RETURN(rc);
3105 }
3106
3107 struct mgs_srpc_read_data {
3108         struct fs_db   *msrd_fsdb;
3109         int             msrd_skip;
3110 };
3111
3112 static int mgs_srpc_read_handler(const struct lu_env *env,
3113                                  struct llog_handle *llh,
3114                                  struct llog_rec_hdr *rec, void *data)
3115 {
3116         struct mgs_srpc_read_data *msrd = data;
3117         struct cfg_marker         *marker;
3118         struct lustre_cfg         *lcfg = REC_DATA(rec);
3119         char                      *svname, *param;
3120         int                        cfg_len, rc;
3121         ENTRY;
3122
3123         if (rec->lrh_type != OBD_CFG_REC) {
3124                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3125                 RETURN(-EINVAL);
3126         }
3127
3128         cfg_len = REC_DATA_LEN(rec);
3129
3130         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3131         if (rc) {
3132                 CERROR("Insane cfg\n");
3133                 RETURN(rc);
3134         }
3135
3136         if (lcfg->lcfg_command == LCFG_MARKER) {
3137                 marker = lustre_cfg_buf(lcfg, 1);
3138
3139                 if (marker->cm_flags & CM_START &&
3140                     marker->cm_flags & CM_SKIP)
3141                         msrd->msrd_skip = 1;
3142                 if (marker->cm_flags & CM_END)
3143                         msrd->msrd_skip = 0;
3144
3145                 RETURN(0);
3146         }
3147
3148         if (msrd->msrd_skip)
3149                 RETURN(0);
3150
3151         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3152                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3153                 RETURN(0);
3154         }
3155
3156         svname = lustre_cfg_string(lcfg, 0);
3157         if (svname == NULL) {
3158                 CERROR("svname is empty\n");
3159                 RETURN(0);
3160         }
3161
3162         param = lustre_cfg_string(lcfg, 1);
3163         if (param == NULL) {
3164                 CERROR("param is empty\n");
3165                 RETURN(0);
3166         }
3167
3168         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3169         if (rc)
3170                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3171
3172         RETURN(0);
3173 }
3174
3175 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3176                                 struct mgs_device *mgs,
3177                                 struct fs_db *fsdb)
3178 {
3179         struct llog_handle        *llh = NULL;
3180         struct llog_ctxt          *ctxt;
3181         char                      *logname;
3182         struct mgs_srpc_read_data  msrd;
3183         int                        rc;
3184         ENTRY;
3185
3186         /* construct log name */
3187         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3188         if (rc)
3189                 RETURN(rc);
3190
3191         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3192         LASSERT(ctxt != NULL);
3193
3194         if (mgs_log_is_empty(env, mgs, logname))
3195                 GOTO(out, rc = 0);
3196
3197         rc = llog_open(env, ctxt, &llh, NULL, logname,
3198                        LLOG_OPEN_EXISTS);
3199         if (rc < 0) {
3200                 if (rc == -ENOENT)
3201                         rc = 0;
3202                 GOTO(out, rc);
3203         }
3204
3205         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3206         if (rc)
3207                 GOTO(out_close, rc);
3208
3209         if (llog_get_size(llh) <= 1)
3210                 GOTO(out_close, rc = 0);
3211
3212         msrd.msrd_fsdb = fsdb;
3213         msrd.msrd_skip = 0;
3214
3215         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3216                           NULL);
3217
3218 out_close:
3219         llog_close(env, llh);
3220 out:
3221         llog_ctxt_put(ctxt);
3222         name_destroy(&logname);
3223
3224         if (rc)
3225                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3226         RETURN(rc);
3227 }
3228
3229 /* Permanent settings of all parameters by writing into the appropriate
3230  * configuration logs.
3231  * A parameter with null value ("<param>='\0'") means to erase it out of
3232  * the logs.
3233  */
3234 static int mgs_write_log_param(const struct lu_env *env,
3235                                struct mgs_device *mgs, struct fs_db *fsdb,
3236                                struct mgs_target_info *mti, char *ptr)
3237 {
3238         struct mgs_thread_info *mgi = mgs_env_info(env);
3239         char *logname;
3240         char *tmp;
3241         int rc = 0, rc2 = 0;
3242         ENTRY;
3243
3244         /* For various parameter settings, we have to figure out which logs
3245            care about them (e.g. both mdt and client for lov settings) */
3246         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3247
3248         /* The params are stored in MOUNT_DATA_FILE and modified via
3249            tunefs.lustre, or set using lctl conf_param */
3250
3251         /* Processed in lustre_start_mgc */
3252         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3253                 GOTO(end, rc);
3254
3255         /* Processed in ost/mdt */
3256         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3257                 GOTO(end, rc);
3258
3259         /* Processed in mgs_write_log_ost */
3260         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3261                 if (mti->mti_flags & LDD_F_PARAM) {
3262                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3263                                            "changed with tunefs.lustre"
3264                                            "and --writeconf\n", ptr);
3265                         rc = -EPERM;
3266                 }
3267                 GOTO(end, rc);
3268         }
3269
3270         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3271                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3272                 GOTO(end, rc);
3273         }
3274
3275         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3276                 /* Add a failover nidlist */
3277                 rc = 0;
3278                 /* We already processed failovers params for new
3279                    targets in mgs_write_log_target */
3280                 if (mti->mti_flags & LDD_F_PARAM) {
3281                         CDEBUG(D_MGS, "Adding failnode\n");
3282                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3283                 }
3284                 GOTO(end, rc);
3285         }
3286
3287         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3288                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3289                 GOTO(end, rc);
3290         }
3291
3292         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3293                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3294                 GOTO(end, rc);
3295         }
3296
3297         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3298                 /* active=0 means off, anything else means on */
3299                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3300                 int i;
3301
3302                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3303                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3304                                            "be (de)activated.\n",
3305                                            mti->mti_svname);
3306                         GOTO(end, rc = -EINVAL);
3307                 }
3308                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3309                               flag ? "de": "re", mti->mti_svname);
3310                 /* Modify clilov */
3311                 rc = name_create(&logname, mti->mti_fsname, "-client");
3312                 if (rc)
3313                         GOTO(end, rc);
3314                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3315                                 mti->mti_svname, "add osc", flag);
3316                 name_destroy(&logname);
3317                 if (rc)
3318                         goto active_err;
3319                 /* Modify mdtlov */
3320                 /* Add to all MDT logs for CMD */
3321                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3322                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3323                                 continue;
3324                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3325                         if (rc)
3326                                 GOTO(end, rc);
3327                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3328                                         mti->mti_svname, "add osc", flag);
3329                         name_destroy(&logname);
3330                         if (rc)
3331                                 goto active_err;
3332                 }
3333         active_err:
3334                 if (rc) {
3335                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3336                                            "log (%d). No permanent "
3337                                            "changes were made to the "
3338                                            "config log.\n",
3339                                            mti->mti_svname, rc);
3340                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3341                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3342                                                    " because the log"
3343                                                    "is in the old 1.4"
3344                                                    "style. Consider "
3345                                                    " --writeconf to "
3346                                                    "update the logs.\n");
3347                         GOTO(end, rc);
3348                 }
3349                 /* Fall through to osc proc for deactivating live OSC
3350                    on running MDT / clients. */
3351         }
3352         /* Below here, let obd's XXX_process_config methods handle it */
3353
3354         /* All lov. in proc */
3355         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3356                 char *mdtlovname;
3357
3358                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3359                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3360                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3361                                            "set on the MDT, not %s. "
3362                                            "Ignoring.\n",
3363                                            mti->mti_svname);
3364                         GOTO(end, rc = 0);
3365                 }
3366
3367                 /* Modify mdtlov */
3368                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3369                         GOTO(end, rc = -ENODEV);
3370
3371                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3372                                              mti->mti_stripe_index);
3373                 if (rc)
3374                         GOTO(end, rc);
3375                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3376                                   &mgi->mgi_bufs, mdtlovname, ptr);
3377                 name_destroy(&logname);
3378                 name_destroy(&mdtlovname);
3379                 if (rc)
3380                         GOTO(end, rc);
3381
3382                 /* Modify clilov */
3383                 rc = name_create(&logname, mti->mti_fsname, "-client");
3384                 if (rc)
3385                         GOTO(end, rc);
3386                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3387                                   fsdb->fsdb_clilov, ptr);
3388                 name_destroy(&logname);
3389                 GOTO(end, rc);
3390         }
3391
3392         /* All osc., mdc., llite. params in proc */
3393         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3394             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3395             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3396                 char *cname;
3397
3398                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3399                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3400                                            " cannot be modified. Consider"
3401                                            " updating the configuration with"
3402                                            " --writeconf\n",
3403                                            mti->mti_svname);
3404                         GOTO(end, rc = -EINVAL);
3405                 }
3406                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3407                         rc = name_create(&cname, mti->mti_fsname, "-client");
3408                         /* Add the client type to match the obdname in
3409                            class_config_llog_handler */
3410                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3411                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3412                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3413                         rc = name_create(&cname, mti->mti_svname, "-osc");
3414                 } else {
3415                         GOTO(end, rc = -EINVAL);
3416                 }
3417                 if (rc)
3418                         GOTO(end, rc);
3419
3420                 /* Forbid direct update of llite root squash parameters.
3421                  * These parameters are indirectly set via the MDT settings.
3422                  * See (LU-1778) */
3423                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3424                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3425                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3426                         LCONSOLE_ERROR("%s: root squash parameters can only "
3427                                 "be updated through MDT component\n",
3428                                 mti->mti_fsname);
3429                         name_destroy(&cname);
3430                         GOTO(end, rc = -EINVAL);
3431                 }
3432
3433                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3434
3435                 /* Modify client */
3436                 rc = name_create(&logname, mti->mti_fsname, "-client");
3437                 if (rc) {
3438                         name_destroy(&cname);
3439                         GOTO(end, rc);
3440                 }
3441                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3442                                   cname, ptr);
3443
3444                 /* osc params affect the MDT as well */
3445                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3446                         int i;
3447
3448                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3449                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3450                                         continue;
3451                                 name_destroy(&cname);
3452                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3453                                                          fsdb, i);
3454                                 name_destroy(&logname);
3455                                 if (rc)
3456                                         break;
3457                                 rc = name_create_mdt(&logname,
3458                                                      mti->mti_fsname, i);
3459                                 if (rc)
3460                                         break;
3461                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3462                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3463                                                           mti, logname,
3464                                                           &mgi->mgi_bufs,
3465                                                           cname, ptr);
3466                                         if (rc)
3467                                                 break;
3468                                 }
3469                         }
3470                 }
3471                 name_destroy(&logname);
3472                 name_destroy(&cname);
3473                 GOTO(end, rc);
3474         }
3475
3476         /* All mdt. params in proc */
3477         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3478                 int i;
3479                 __u32 idx;
3480
3481                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3482                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3483                             MTI_NAME_MAXLEN) == 0)
3484                         /* device is unspecified completely? */
3485                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3486                 else
3487                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3488                 if (rc < 0)
3489                         goto active_err;
3490                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3491                         goto active_err;
3492                 if (rc & LDD_F_SV_ALL) {
3493                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3494                                 if (!test_bit(i,
3495                                                   fsdb->fsdb_mdt_index_map))
3496                                         continue;
3497                                 rc = name_create_mdt(&logname,
3498                                                 mti->mti_fsname, i);
3499                                 if (rc)
3500                                         goto active_err;
3501                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3502                                                   logname, &mgi->mgi_bufs,
3503                                                   logname, ptr);
3504                                 name_destroy(&logname);
3505                                 if (rc)
3506                                         goto active_err;
3507                         }
3508                 } else {
3509                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3510                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3511                                 LCONSOLE_ERROR("%s: root squash parameters "
3512                                         "cannot be applied to a single MDT\n",
3513                                         mti->mti_fsname);
3514                                 GOTO(end, rc = -EINVAL);
3515                         }
3516                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3517                                           mti->mti_svname, &mgi->mgi_bufs,
3518                                           mti->mti_svname, ptr);
3519                         if (rc)
3520                                 goto active_err;
3521                 }
3522
3523                 /* root squash settings are also applied to llite
3524                  * config log (see LU-1778) */
3525                 if (rc == 0 &&
3526                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3527                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3528                         char *cname;
3529                         char *ptr2;
3530
3531                         rc = name_create(&cname, mti->mti_fsname, "-client");
3532                         if (rc)
3533                                 GOTO(end, rc);
3534                         rc = name_create(&logname, mti->mti_fsname, "-client");
3535                         if (rc) {
3536                                 name_destroy(&cname);
3537                                 GOTO(end, rc);
3538                         }
3539                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3540                         if (rc) {
3541                                 name_destroy(&cname);
3542                                 name_destroy(&logname);
3543                                 GOTO(end, rc);
3544                         }
3545                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3546                                           &mgi->mgi_bufs, cname, ptr2);
3547                         name_destroy(&ptr2);
3548                         name_destroy(&logname);
3549                         name_destroy(&cname);
3550                 }
3551                 GOTO(end, rc);
3552         }
3553
3554         /* All mdd., ost. and osd. params in proc */
3555         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3556             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3557             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3558                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3559                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3560                         GOTO(end, rc = -ENODEV);
3561
3562                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3563                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3564                 GOTO(end, rc);
3565         }
3566
3567         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3568         rc2 = -ENOSYS;
3569
3570 end:
3571         if (rc)
3572                 CERROR("err %d on param '%s'\n", rc, ptr);
3573
3574         RETURN(rc ?: rc2);
3575 }
3576
3577 /* Not implementing automatic failover nid addition at this time. */
3578 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3579                       struct mgs_target_info *mti)
3580 {
3581 #if 0
3582         struct fs_db *fsdb;
3583         int rc;
3584         ENTRY;
3585
3586         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3587         if (rc)
3588                 RETURN(rc);
3589
3590         if (mgs_log_is_empty(obd, mti->mti_svname))
3591                 /* should never happen */
3592                 RETURN(-ENOENT);
3593
3594         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3595
3596         /* FIXME We can just check mti->params to see if we're already in
3597            the failover list.  Modify mti->params for rewriting back at
3598            server_register_target(). */
3599
3600         mutex_lock(&fsdb->fsdb_mutex);
3601         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3602         mutex_unlock(&fsdb->fsdb_mutex);
3603         char    *buf, *params;
3604         int      rc = -EINVAL;
3605
3606         RETURN(rc);
3607 #endif
3608         return 0;
3609 }
3610
3611 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3612                          struct mgs_target_info *mti, struct fs_db *fsdb)
3613 {
3614         char    *buf, *params;
3615         int      rc = -EINVAL;
3616
3617         ENTRY;
3618
3619         /* set/check the new target index */
3620         rc = mgs_set_index(env, mgs, mti);
3621         if (rc < 0)
3622                 RETURN(rc);
3623
3624         if (rc == EALREADY) {
3625                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3626                               mti->mti_stripe_index, mti->mti_svname);
3627                 /* We would like to mark old log sections as invalid
3628                    and add new log sections in the client and mdt logs.
3629                    But if we add new sections, then live clients will
3630                    get repeat setup instructions for already running
3631                    osc's. So don't update the client/mdt logs. */
3632                 mti->mti_flags &= ~LDD_F_UPDATE;
3633                 rc = 0;
3634         }
3635
3636         mutex_lock(&fsdb->fsdb_mutex);
3637
3638         if (mti->mti_flags &
3639             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3640                 /* Generate a log from scratch */
3641                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3642                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3643                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3644                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3645                 } else {
3646                         CERROR("Unknown target type %#x, can't create log for "
3647                                "%s\n", mti->mti_flags, mti->mti_svname);
3648                 }
3649                 if (rc) {
3650                         CERROR("Can't write logs for %s (%d)\n",
3651                                mti->mti_svname, rc);
3652                         GOTO(out_up, rc);
3653                 }
3654         } else {
3655                 /* Just update the params from tunefs in mgs_write_log_params */
3656                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3657                 mti->mti_flags |= LDD_F_PARAM;
3658         }
3659
3660         /* allocate temporary buffer, where class_get_next_param will
3661            make copy of a current  parameter */
3662         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3663         if (buf == NULL)
3664                 GOTO(out_up, rc = -ENOMEM);
3665         params = mti->mti_params;
3666         while (params != NULL) {
3667                 rc = class_get_next_param(&params, buf);
3668                 if (rc) {
3669                         if (rc == 1)
3670                                 /* there is no next parameter, that is
3671                                    not an error */
3672                                 rc = 0;
3673                         break;
3674                 }
3675                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3676                        params, buf);
3677                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3678                 if (rc)
3679                         break;
3680         }
3681
3682         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3683
3684 out_up:
3685         mutex_unlock(&fsdb->fsdb_mutex);
3686         RETURN(rc);
3687 }
3688
3689 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3690 {
3691         struct llog_ctxt        *ctxt;
3692         int                      rc = 0;
3693
3694         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3695         if (ctxt == NULL) {
3696                 CERROR("%s: MGS config context doesn't exist\n",
3697                        mgs->mgs_obd->obd_name);
3698                 rc = -ENODEV;
3699         } else {
3700                 rc = llog_erase(env, ctxt, NULL, name);
3701                 /* llog may not exist */
3702                 if (rc == -ENOENT)
3703                         rc = 0;
3704                 llog_ctxt_put(ctxt);
3705         }
3706
3707         if (rc)
3708                 CERROR("%s: failed to clear log %s: %d\n",
3709                        mgs->mgs_obd->obd_name, name, rc);
3710
3711         return rc;
3712 }
3713
3714 /* erase all logs for the given fs */
3715 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3716 {
3717         struct fs_db *fsdb;
3718         struct list_head log_list;
3719         struct mgs_direntry *dirent, *n;
3720         int rc, len = strlen(fsname);
3721         char *suffix;
3722         ENTRY;
3723
3724         /* Find all the logs in the CONFIGS directory */
3725         rc = class_dentry_readdir(env, mgs, &log_list);
3726         if (rc)
3727                 RETURN(rc);
3728
3729         mutex_lock(&mgs->mgs_mutex);
3730
3731         /* Delete the fs db */
3732         fsdb = mgs_find_fsdb(mgs, fsname);
3733         if (fsdb)
3734                 mgs_free_fsdb(mgs, fsdb);
3735
3736         mutex_unlock(&mgs->mgs_mutex);
3737
3738         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3739                 list_del_init(&dirent->mde_list);
3740                 suffix = strrchr(dirent->mde_name, '-');
3741                 if (suffix != NULL) {
3742                         if ((len == suffix - dirent->mde_name) &&
3743                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3744                                 CDEBUG(D_MGS, "Removing log %s\n",
3745                                        dirent->mde_name);
3746                                 mgs_erase_log(env, mgs, dirent->mde_name);
3747                         }
3748                 }
3749                 mgs_direntry_free(dirent);
3750         }
3751
3752         RETURN(rc);
3753 }
3754
3755 /* list all logs for the given fs */
3756 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3757                   struct obd_ioctl_data *data)
3758 {
3759         struct list_head         log_list;
3760         struct mgs_direntry     *dirent, *n;
3761         char                    *out, *suffix;
3762         int                      l, remains, rc;
3763
3764         ENTRY;
3765
3766         /* Find all the logs in the CONFIGS directory */
3767         rc = class_dentry_readdir(env, mgs, &log_list);
3768         if (rc)
3769                 RETURN(rc);
3770
3771         out = data->ioc_bulk;
3772         remains = data->ioc_inllen1;
3773         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3774                 list_del_init(&dirent->mde_list);
3775                 suffix = strrchr(dirent->mde_name, '-');
3776                 if (suffix != NULL) {
3777                         l = snprintf(out, remains, "config log: $%s\n",
3778                                      dirent->mde_name);
3779                         out += l;
3780                         remains -= l;
3781                 }
3782                 mgs_direntry_free(dirent);
3783                 if (remains <= 0)
3784                         break;
3785         }
3786         RETURN(rc);
3787 }
3788
3789 /* from llog_swab */
3790 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3791 {
3792         int i;
3793         ENTRY;
3794
3795         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3796         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3797
3798         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3799         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3800         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3801         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3802
3803         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3804         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3805                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3806                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3807                                i, lcfg->lcfg_buflens[i],
3808                                lustre_cfg_string(lcfg, i));
3809                 }
3810         EXIT;
3811 }
3812
3813 /* Setup params fsdb and log
3814  */
3815 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3816                           struct fs_db *fsdb)
3817 {
3818         struct llog_handle      *params_llh = NULL;
3819         int                     rc;
3820         ENTRY;
3821
3822         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3823         if (fsdb != NULL) {
3824                 mutex_lock(&fsdb->fsdb_mutex);
3825                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3826                 if (rc == 0)
3827                         rc = record_end_log(env, &params_llh);
3828                 mutex_unlock(&fsdb->fsdb_mutex);
3829         }
3830
3831         RETURN(rc);
3832 }
3833
3834 /* Cleanup params fsdb and log
3835  */
3836 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3837 {
3838         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3839 }
3840
3841 /* Set a permanent (config log) param for a target or fs
3842  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3843  *             buf1 contains the single parameter
3844  */
3845 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3846                  struct lustre_cfg *lcfg, char *fsname)
3847 {
3848         struct fs_db *fsdb;
3849         struct mgs_target_info *mti;
3850         char *devname, *param;
3851         char *ptr;
3852         const char *tmp;
3853         __u32 index;
3854         int rc = 0;
3855         ENTRY;
3856
3857         print_lustre_cfg(lcfg);
3858
3859         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3860         devname = lustre_cfg_string(lcfg, 0);
3861         param = lustre_cfg_string(lcfg, 1);
3862         if (!devname) {
3863                 /* Assume device name embedded in param:
3864                    lustre-OST0000.osc.max_dirty_mb=32 */
3865                 ptr = strchr(param, '.');
3866                 if (ptr) {
3867                         devname = param;
3868                         *ptr = 0;
3869                         param = ptr + 1;
3870                 }
3871         }
3872         if (!devname) {
3873                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3874                 RETURN(-ENOSYS);
3875         }
3876
3877         rc = mgs_parse_devname(devname, fsname, NULL);
3878         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3879                 /* param related to llite isn't allowed to set by OST or MDT */
3880                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3881                                        sizeof(PARAM_LLITE) - 1) == 0)
3882                         RETURN(-EINVAL);
3883         } else {
3884                 /* assume devname is the fsname */
3885                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
3886         }
3887         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3888
3889         rc = mgs_find_or_make_fsdb(env, mgs,
3890                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3891                                    PARAMS_FILENAME : fsname, &fsdb);
3892         if (rc)
3893                 RETURN(rc);
3894
3895         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3896             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3897             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3898                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3899                        "is '%s'\n", fsname, devname);
3900                 mgs_free_fsdb(mgs, fsdb);
3901                 RETURN(-EINVAL);
3902         }
3903
3904         /* Create a fake mti to hold everything */
3905         OBD_ALLOC_PTR(mti);
3906         if (!mti)
3907                 GOTO(out, rc = -ENOMEM);
3908         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3909             >= sizeof(mti->mti_fsname))
3910                 GOTO(out, rc = -E2BIG);
3911         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3912             >= sizeof(mti->mti_svname))
3913                 GOTO(out, rc = -E2BIG);
3914         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3915             >= sizeof(mti->mti_params))
3916                 GOTO(out, rc = -E2BIG);
3917         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3918         if (rc < 0)
3919                 /* Not a valid server; may be only fsname */
3920                 rc = 0;
3921         else
3922                 /* Strip -osc or -mdc suffix from svname */
3923                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3924                                      mti->mti_svname))
3925                         GOTO(out, rc = -EINVAL);
3926         /*
3927          * Revoke lock so everyone updates.  Should be alright if
3928          * someone was already reading while we were updating the logs,
3929          * so we don't really need to hold the lock while we're
3930          * writing (above).
3931          */
3932         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3933                 mti->mti_flags = rc | LDD_F_PARAM2;
3934                 mutex_lock(&fsdb->fsdb_mutex);
3935                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3936                 mutex_unlock(&fsdb->fsdb_mutex);
3937                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3938         } else {
3939                 mti->mti_flags = rc | LDD_F_PARAM;
3940                 mutex_lock(&fsdb->fsdb_mutex);
3941                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3942                 mutex_unlock(&fsdb->fsdb_mutex);
3943                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3944         }
3945
3946 out:
3947         OBD_FREE_PTR(mti);
3948         RETURN(rc);
3949 }
3950
3951 static int mgs_write_log_pool(const struct lu_env *env,
3952                               struct mgs_device *mgs, char *logname,
3953                               struct fs_db *fsdb, char *tgtname,
3954                               enum lcfg_command_type cmd,
3955                               char *fsname, char *poolname,
3956                               char *ostname, char *comment)
3957 {
3958         struct llog_handle *llh = NULL;
3959         int rc;
3960
3961         rc = record_start_log(env, mgs, &llh, logname);
3962         if (rc)
3963                 return rc;
3964         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3965         if (rc)
3966                 goto out;
3967         rc = record_base(env, llh, tgtname, 0, cmd,
3968                          fsname, poolname, ostname, 0);
3969         if (rc)
3970                 goto out;
3971         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3972 out:
3973         record_end_log(env, &llh);
3974         return rc;
3975 }
3976
3977 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
3978                     enum lcfg_command_type cmd, const char *nodemap_name,
3979                     char *param)
3980 {
3981         lnet_nid_t      nid[2];
3982         __u32           idmap[2];
3983         bool            bool_switch;
3984         __u32           int_id;
3985         int             rc = 0;
3986         ENTRY;
3987
3988         switch (cmd) {
3989         case LCFG_NODEMAP_ADD:
3990                 rc = nodemap_add(nodemap_name);
3991                 break;
3992         case LCFG_NODEMAP_DEL:
3993                 rc = nodemap_del(nodemap_name);
3994                 break;
3995         case LCFG_NODEMAP_ADD_RANGE:
3996                 rc = nodemap_parse_range(param, nid);
3997                 if (rc != 0)
3998                         break;
3999                 rc = nodemap_add_range(nodemap_name, nid);
4000                 break;
4001         case LCFG_NODEMAP_DEL_RANGE:
4002                 rc = nodemap_parse_range(param, nid);
4003                 if (rc != 0)
4004                         break;
4005                 rc = nodemap_del_range(nodemap_name, nid);
4006                 break;
4007         case LCFG_NODEMAP_ADMIN:
4008                 bool_switch = simple_strtoul(param, NULL, 10);
4009                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4010                 break;
4011         case LCFG_NODEMAP_TRUSTED:
4012                 bool_switch = simple_strtoul(param, NULL, 10);
4013                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4014                 break;
4015         case LCFG_NODEMAP_SQUASH_UID:
4016                 int_id = simple_strtoul(param, NULL, 10);
4017                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4018                 break;
4019         case LCFG_NODEMAP_SQUASH_GID:
4020                 int_id = simple_strtoul(param, NULL, 10);
4021                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4022                 break;
4023         case LCFG_NODEMAP_ADD_UIDMAP:
4024         case LCFG_NODEMAP_ADD_GIDMAP:
4025                 rc = nodemap_parse_idmap(param, idmap);
4026                 if (rc != 0)
4027                         break;
4028                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4029                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4030                                                idmap);
4031                 else
4032                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4033                                                idmap);
4034                 break;
4035         case LCFG_NODEMAP_DEL_UIDMAP:
4036         case LCFG_NODEMAP_DEL_GIDMAP:
4037                 rc = nodemap_parse_idmap(param, idmap);
4038                 if (rc != 0)
4039                         break;
4040                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4041                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4042                                                idmap);
4043                 else
4044                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4045                                                idmap);
4046                 break;
4047         default:
4048                 rc = -EINVAL;
4049         }
4050
4051         RETURN(rc);
4052 }
4053
4054 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4055                  enum lcfg_command_type cmd, char *fsname,
4056                  char *poolname, char *ostname)
4057 {
4058         struct fs_db *fsdb;
4059         char *lovname;
4060         char *logname;
4061         char *label = NULL, *canceled_label = NULL;
4062         int label_sz;
4063         struct mgs_target_info *mti = NULL;
4064         int rc, i;
4065         ENTRY;
4066
4067         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4068         if (rc) {
4069                 CERROR("Can't get db for %s\n", fsname);
4070                 RETURN(rc);
4071         }
4072         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4073                 CERROR("%s is not defined\n", fsname);
4074                 mgs_free_fsdb(mgs, fsdb);
4075                 RETURN(-EINVAL);
4076         }
4077
4078         label_sz = 10 + strlen(fsname) + strlen(poolname);
4079
4080         /* check if ostname match fsname */
4081         if (ostname != NULL) {
4082                 char *ptr;
4083
4084                 ptr = strrchr(ostname, '-');
4085                 if ((ptr == NULL) ||
4086                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4087                         RETURN(-EINVAL);
4088                 label_sz += strlen(ostname);
4089         }
4090
4091         OBD_ALLOC(label, label_sz);
4092         if (label == NULL)
4093                 RETURN(-ENOMEM);
4094
4095         switch(cmd) {
4096         case LCFG_POOL_NEW:
4097                 sprintf(label,
4098                         "new %s.%s", fsname, poolname);
4099                 break;
4100         case LCFG_POOL_ADD:
4101                 sprintf(label,
4102                         "add %s.%s.%s", fsname, poolname, ostname);
4103                 break;
4104         case LCFG_POOL_REM:
4105                 OBD_ALLOC(canceled_label, label_sz);
4106                 if (canceled_label == NULL)
4107                         GOTO(out_label, rc = -ENOMEM);
4108                 sprintf(label,
4109                         "rem %s.%s.%s", fsname, poolname, ostname);
4110                 sprintf(canceled_label,
4111                         "add %s.%s.%s", fsname, poolname, ostname);
4112                 break;
4113         case LCFG_POOL_DEL:
4114                 OBD_ALLOC(canceled_label, label_sz);
4115                 if (canceled_label == NULL)
4116                         GOTO(out_label, rc = -ENOMEM);
4117                 sprintf(label,
4118                         "del %s.%s", fsname, poolname);
4119                 sprintf(canceled_label,
4120                         "new %s.%s", fsname, poolname);
4121                 break;
4122         default:
4123                 break;
4124         }
4125
4126         if (canceled_label != NULL) {
4127                 OBD_ALLOC_PTR(mti);
4128                 if (mti == NULL)
4129                         GOTO(out_cancel, rc = -ENOMEM);
4130         }
4131
4132         mutex_lock(&fsdb->fsdb_mutex);
4133         /* write pool def to all MDT logs */
4134         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4135                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4136                         rc = name_create_mdt_and_lov(&logname, &lovname,
4137                                                      fsdb, i);
4138                         if (rc) {
4139                                 mutex_unlock(&fsdb->fsdb_mutex);
4140                                 GOTO(out_mti, rc);
4141                         }
4142                         if (canceled_label != NULL) {
4143                                 strcpy(mti->mti_svname, "lov pool");
4144                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4145                                                 lovname, canceled_label,
4146                                                 CM_SKIP);
4147                         }
4148
4149                         if (rc >= 0)
4150                                 rc = mgs_write_log_pool(env, mgs, logname,
4151                                                         fsdb, lovname, cmd,
4152                                                         fsname, poolname,
4153                                                         ostname, label);
4154                         name_destroy(&logname);
4155                         name_destroy(&lovname);
4156                         if (rc) {
4157                                 mutex_unlock(&fsdb->fsdb_mutex);
4158                                 GOTO(out_mti, rc);
4159                         }
4160                 }
4161         }
4162
4163         rc = name_create(&logname, fsname, "-client");
4164         if (rc) {
4165                 mutex_unlock(&fsdb->fsdb_mutex);
4166                 GOTO(out_mti, rc);
4167         }
4168         if (canceled_label != NULL) {
4169                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4170                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4171                 if (rc < 0) {
4172                         mutex_unlock(&fsdb->fsdb_mutex);
4173                         name_destroy(&logname);
4174                         GOTO(out_mti, rc);
4175                 }
4176         }
4177
4178         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4179                                 cmd, fsname, poolname, ostname, label);
4180         mutex_unlock(&fsdb->fsdb_mutex);
4181         name_destroy(&logname);
4182         /* request for update */
4183         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4184
4185         EXIT;
4186 out_mti:
4187         if (mti != NULL)
4188                 OBD_FREE_PTR(mti);
4189 out_cancel:
4190         if (canceled_label != NULL)
4191                 OBD_FREE(canceled_label, label_sz);
4192 out_label:
4193         OBD_FREE(label, label_sz);
4194         return rc;
4195 }