Whamcloud - gitweb
LU-5458: libcfs: protect kkuc_groups from write access
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(NULL);
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(NULL);
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351         } else {
352                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
353                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
354                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
355                         CERROR("No memory for index maps\n");
356                         GOTO(err, rc = -ENOMEM);
357                 }
358
359                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
360                 if (rc)
361                         GOTO(err, rc);
362                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
363                 if (rc)
364                         GOTO(err, rc);
365
366                 /* initialise data for NID table */
367                 mgs_ir_init_fs(env, mgs, fsdb);
368
369                 lproc_mgs_add_live(mgs, fsdb);
370         }
371
372         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
373
374         RETURN(fsdb);
375 err:
376         if (fsdb->fsdb_ost_index_map)
377                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
378         if (fsdb->fsdb_mdt_index_map)
379                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
380         name_destroy(&fsdb->fsdb_clilov);
381         name_destroy(&fsdb->fsdb_clilmv);
382         OBD_FREE_PTR(fsdb);
383         RETURN(NULL);
384 }
385
386 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
387 {
388         /* wait for anyone with the sem */
389         mutex_lock(&fsdb->fsdb_mutex);
390         lproc_mgs_del_live(mgs, fsdb);
391         list_del(&fsdb->fsdb_list);
392
393         /* deinitialize fsr */
394         mgs_ir_fini_fs(mgs, fsdb);
395
396         if (fsdb->fsdb_ost_index_map)
397                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
398         if (fsdb->fsdb_mdt_index_map)
399                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
400         name_destroy(&fsdb->fsdb_clilov);
401         name_destroy(&fsdb->fsdb_clilmv);
402         mgs_free_fsdb_srpc(fsdb);
403         mutex_unlock(&fsdb->fsdb_mutex);
404         OBD_FREE_PTR(fsdb);
405 }
406
407 int mgs_init_fsdb_list(struct mgs_device *mgs)
408 {
409         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
410         return 0;
411 }
412
413 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
414 {
415         struct fs_db *fsdb;
416         struct list_head *tmp, *tmp2;
417
418         mutex_lock(&mgs->mgs_mutex);
419         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
420                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
421                 mgs_free_fsdb(mgs, fsdb);
422         }
423         mutex_unlock(&mgs->mgs_mutex);
424         return 0;
425 }
426
427 int mgs_find_or_make_fsdb(const struct lu_env *env,
428                           struct mgs_device *mgs, char *name,
429                           struct fs_db **dbh)
430 {
431         struct fs_db *fsdb;
432         int rc = 0;
433
434         ENTRY;
435         mutex_lock(&mgs->mgs_mutex);
436         fsdb = mgs_find_fsdb(mgs, name);
437         if (fsdb) {
438                 mutex_unlock(&mgs->mgs_mutex);
439                 *dbh = fsdb;
440                 RETURN(0);
441         }
442
443         CDEBUG(D_MGS, "Creating new db\n");
444         fsdb = mgs_new_fsdb(env, mgs, name);
445         /* lock fsdb_mutex until the db is loaded from llogs */
446         if (fsdb)
447                 mutex_lock(&fsdb->fsdb_mutex);
448         mutex_unlock(&mgs->mgs_mutex);
449         if (!fsdb)
450                 RETURN(-ENOMEM);
451
452         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
453                 /* populate the db from the client llog */
454                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
455                 if (rc) {
456                         CERROR("Can't get db from client log %d\n", rc);
457                         GOTO(out_free, rc);
458                 }
459         }
460
461         /* populate srpc rules from params llog */
462         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
463         if (rc) {
464                 CERROR("Can't get db from params log %d\n", rc);
465                 GOTO(out_free, rc);
466         }
467
468         mutex_unlock(&fsdb->fsdb_mutex);
469         *dbh = fsdb;
470
471         RETURN(0);
472
473 out_free:
474         mutex_unlock(&fsdb->fsdb_mutex);
475         mgs_free_fsdb(mgs, fsdb);
476         return rc;
477 }
478
479 /* 1 = index in use
480    0 = index unused
481    -1= empty client log */
482 int mgs_check_index(const struct lu_env *env,
483                     struct mgs_device *mgs,
484                     struct mgs_target_info *mti)
485 {
486         struct fs_db *fsdb;
487         void *imap;
488         int rc = 0;
489         ENTRY;
490
491         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
492
493         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
494         if (rc) {
495                 CERROR("Can't get db for %s\n", mti->mti_fsname);
496                 RETURN(rc);
497         }
498
499         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
500                 RETURN(-1);
501
502         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
503                 imap = fsdb->fsdb_ost_index_map;
504         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
505                 imap = fsdb->fsdb_mdt_index_map;
506         else
507                 RETURN(-EINVAL);
508
509         if (test_bit(mti->mti_stripe_index, imap))
510                 RETURN(1);
511         RETURN(0);
512 }
513
514 static __inline__ int next_index(void *index_map, int map_len)
515 {
516         int i;
517         for (i = 0; i < map_len * 8; i++)
518                 if (!test_bit(i, index_map)) {
519                          return i;
520                  }
521         CERROR("max index %d exceeded.\n", i);
522         return -1;
523 }
524
525 /* Return codes:
526         0  newly marked as in use
527         <0 err
528         +EALREADY for update of an old index */
529 static int mgs_set_index(const struct lu_env *env,
530                          struct mgs_device *mgs,
531                          struct mgs_target_info *mti)
532 {
533         struct fs_db *fsdb;
534         void *imap;
535         int rc = 0;
536         ENTRY;
537
538         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
539         if (rc) {
540                 CERROR("Can't get db for %s\n", mti->mti_fsname);
541                 RETURN(rc);
542         }
543
544         mutex_lock(&fsdb->fsdb_mutex);
545         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
546                 imap = fsdb->fsdb_ost_index_map;
547         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
548                 imap = fsdb->fsdb_mdt_index_map;
549         } else {
550                 GOTO(out_up, rc = -EINVAL);
551         }
552
553         if (mti->mti_flags & LDD_F_NEED_INDEX) {
554                 rc = next_index(imap, INDEX_MAP_SIZE);
555                 if (rc == -1)
556                         GOTO(out_up, rc = -ERANGE);
557                 mti->mti_stripe_index = rc;
558                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
559                         fsdb->fsdb_mdt_count ++;
560         }
561
562         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
563                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
564                                    "but the max index is %d.\n",
565                                    mti->mti_svname, mti->mti_stripe_index,
566                                    INDEX_MAP_SIZE * 8);
567                 GOTO(out_up, rc = -ERANGE);
568         }
569
570         if (test_bit(mti->mti_stripe_index, imap)) {
571                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
572                     !(mti->mti_flags & LDD_F_WRITECONF)) {
573                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
574                                            "%d, but that index is already in "
575                                            "use. Use --writeconf to force\n",
576                                            mti->mti_svname,
577                                            mti->mti_stripe_index);
578                         GOTO(out_up, rc = -EADDRINUSE);
579                 } else {
580                         CDEBUG(D_MGS, "Server %s updating index %d\n",
581                                mti->mti_svname, mti->mti_stripe_index);
582                         GOTO(out_up, rc = EALREADY);
583                 }
584         }
585
586         set_bit(mti->mti_stripe_index, imap);
587         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
588         mutex_unlock(&fsdb->fsdb_mutex);
589         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
590                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
591
592         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
593                mti->mti_stripe_index);
594
595         RETURN(0);
596 out_up:
597         mutex_unlock(&fsdb->fsdb_mutex);
598         return rc;
599 }
600
601 struct mgs_modify_lookup {
602         struct cfg_marker mml_marker;
603         int               mml_modified;
604 };
605
606 static int mgs_modify_handler(const struct lu_env *env,
607                               struct llog_handle *llh,
608                               struct llog_rec_hdr *rec, void *data)
609 {
610         struct mgs_modify_lookup *mml = data;
611         struct cfg_marker *marker;
612         struct lustre_cfg *lcfg = REC_DATA(rec);
613         int cfg_len = REC_DATA_LEN(rec);
614         int rc;
615         ENTRY;
616
617         if (rec->lrh_type != OBD_CFG_REC) {
618                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
619                 RETURN(-EINVAL);
620         }
621
622         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
623         if (rc) {
624                 CERROR("Insane cfg\n");
625                 RETURN(rc);
626         }
627
628         /* We only care about markers */
629         if (lcfg->lcfg_command != LCFG_MARKER)
630                 RETURN(0);
631
632         marker = lustre_cfg_buf(lcfg, 1);
633         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
634             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
635             !(marker->cm_flags & CM_SKIP)) {
636                 /* Found a non-skipped marker match */
637                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
638                        rec->lrh_index, marker->cm_step,
639                        marker->cm_flags, mml->mml_marker.cm_flags,
640                        marker->cm_tgtname, marker->cm_comment);
641                 /* Overwrite the old marker llog entry */
642                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
643                 marker->cm_flags |= mml->mml_marker.cm_flags;
644                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
645                 rc = llog_write(env, llh, rec, rec->lrh_index);
646                 if (!rc)
647                          mml->mml_modified++;
648         }
649
650         RETURN(rc);
651 }
652
653 /**
654  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
655  * Return code:
656  * 0 - modified successfully,
657  * 1 - no modification was done
658  * negative - error
659  */
660 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
661                       struct fs_db *fsdb, struct mgs_target_info *mti,
662                       char *logname, char *devname, char *comment, int flags)
663 {
664         struct llog_handle *loghandle;
665         struct llog_ctxt *ctxt;
666         struct mgs_modify_lookup *mml;
667         int rc;
668
669         ENTRY;
670
671         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
672         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
673                flags);
674
675         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
676         LASSERT(ctxt != NULL);
677         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
678         if (rc < 0) {
679                 if (rc == -ENOENT)
680                         rc = 0;
681                 GOTO(out_pop, rc);
682         }
683
684         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
685         if (rc)
686                 GOTO(out_close, rc);
687
688         if (llog_get_size(loghandle) <= 1)
689                 GOTO(out_close, rc = 0);
690
691         OBD_ALLOC_PTR(mml);
692         if (!mml)
693                 GOTO(out_close, rc = -ENOMEM);
694         if (strlcpy(mml->mml_marker.cm_comment, comment,
695                     sizeof(mml->mml_marker.cm_comment)) >=
696             sizeof(mml->mml_marker.cm_comment))
697                 GOTO(out_free, rc = -E2BIG);
698         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
699                     sizeof(mml->mml_marker.cm_tgtname)) >=
700             sizeof(mml->mml_marker.cm_tgtname))
701                 GOTO(out_free, rc = -E2BIG);
702         /* Modify mostly means cancel */
703         mml->mml_marker.cm_flags = flags;
704         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
705         mml->mml_modified = 0;
706         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
707                           NULL);
708         if (!rc && !mml->mml_modified)
709                 rc = 1;
710
711 out_free:
712         OBD_FREE_PTR(mml);
713
714 out_close:
715         llog_close(env, loghandle);
716 out_pop:
717         if (rc < 0)
718                 CERROR("%s: modify %s/%s failed: rc = %d\n",
719                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
720         llog_ctxt_put(ctxt);
721         RETURN(rc);
722 }
723
724 /** This structure is passed to mgs_replace_handler */
725 struct mgs_replace_uuid_lookup {
726         /* Nids are replaced for this target device */
727         struct mgs_target_info target;
728         /* Temporary modified llog */
729         struct llog_handle *temp_llh;
730         /* Flag is set if in target block*/
731         int in_target_device;
732         /* Nids already added. Just skip (multiple nids) */
733         int device_nids_added;
734         /* Flag is set if this block should not be copied */
735         int skip_it;
736 };
737
738 /**
739  * Check: a) if block should be skipped
740  * b) is it target block
741  *
742  * \param[in] lcfg
743  * \param[in] mrul
744  *
745  * \retval 0 should not to be skipped
746  * \retval 1 should to be skipped
747  */
748 static int check_markers(struct lustre_cfg *lcfg,
749                          struct mgs_replace_uuid_lookup *mrul)
750 {
751          struct cfg_marker *marker;
752
753         /* Track markers. Find given device */
754         if (lcfg->lcfg_command == LCFG_MARKER) {
755                 marker = lustre_cfg_buf(lcfg, 1);
756                 /* Clean llog from records marked as CM_EXCLUDE.
757                    CM_SKIP records are used for "active" command
758                    and can be restored if needed */
759                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
760                     (CM_EXCLUDE | CM_START)) {
761                         mrul->skip_it = 1;
762                         return 1;
763                 }
764
765                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
766                     (CM_EXCLUDE | CM_END)) {
767                         mrul->skip_it = 0;
768                         return 1;
769                 }
770
771                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
772                         LASSERT(!(marker->cm_flags & CM_START) ||
773                                 !(marker->cm_flags & CM_END));
774                         if (marker->cm_flags & CM_START) {
775                                 mrul->in_target_device = 1;
776                                 mrul->device_nids_added = 0;
777                         } else if (marker->cm_flags & CM_END)
778                                 mrul->in_target_device = 0;
779                 }
780         }
781
782         return 0;
783 }
784
785 static int record_base(const struct lu_env *env, struct llog_handle *llh,
786                      char *cfgname, lnet_nid_t nid, int cmd,
787                      char *s1, char *s2, char *s3, char *s4)
788 {
789         struct mgs_thread_info  *mgi = mgs_env_info(env);
790         struct llog_cfg_rec     *lcr;
791         int rc;
792
793         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
794                cmd, s1, s2, s3, s4);
795
796         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
797         if (s1)
798                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
799         if (s2)
800                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
801         if (s3)
802                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
803         if (s4)
804                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
805
806         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
807         if (lcr == NULL)
808                 return -ENOMEM;
809
810         lcr->lcr_cfg.lcfg_nid = nid;
811         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
812
813         lustre_cfg_rec_free(lcr);
814
815         if (rc < 0)
816                 CDEBUG(D_MGS,
817                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
818                        cfgname, cmd, s1, s2, s3, s4, rc);
819         return rc;
820 }
821
822 static inline int record_add_uuid(const struct lu_env *env,
823                                   struct llog_handle *llh,
824                                   uint64_t nid, char *uuid)
825 {
826         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
827 }
828
829 static inline int record_add_conn(const struct lu_env *env,
830                                   struct llog_handle *llh,
831                                   char *devname, char *uuid)
832 {
833         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
834 }
835
836 static inline int record_attach(const struct lu_env *env,
837                                 struct llog_handle *llh, char *devname,
838                                 char *type, char *uuid)
839 {
840         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
841 }
842
843 static inline int record_setup(const struct lu_env *env,
844                                struct llog_handle *llh, char *devname,
845                                char *s1, char *s2, char *s3, char *s4)
846 {
847         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
848 }
849
850 /**
851  * \retval <0 record processing error
852  * \retval n record is processed. No need copy original one.
853  * \retval 0 record is not processed.
854  */
855 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
856                            struct mgs_replace_uuid_lookup *mrul)
857 {
858         int nids_added = 0;
859         lnet_nid_t nid;
860         char *ptr;
861         int rc;
862
863         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
864                 /* LCFG_ADD_UUID command found. Let's skip original command
865                    and add passed nids */
866                 ptr = mrul->target.mti_params;
867                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
868                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
869                                "device %s\n", libcfs_nid2str(nid),
870                                 mrul->target.mti_params,
871                                 mrul->target.mti_svname);
872                         rc = record_add_uuid(env,
873                                              mrul->temp_llh, nid,
874                                              mrul->target.mti_params);
875                         if (!rc)
876                                 nids_added++;
877                 }
878
879                 if (nids_added == 0) {
880                         CERROR("No new nids were added, nid %s with uuid %s, "
881                                "device %s\n", libcfs_nid2str(nid),
882                                mrul->target.mti_params,
883                                mrul->target.mti_svname);
884                         RETURN(-ENXIO);
885                 } else {
886                         mrul->device_nids_added = 1;
887                 }
888
889                 return nids_added;
890         }
891
892         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
893                 /* LCFG_SETUP command found. UUID should be changed */
894                 rc = record_setup(env,
895                                   mrul->temp_llh,
896                                   /* devname the same */
897                                   lustre_cfg_string(lcfg, 0),
898                                   /* s1 is not changed */
899                                   lustre_cfg_string(lcfg, 1),
900                                   /* new uuid should be
901                                   the full nidlist */
902                                   mrul->target.mti_params,
903                                   /* s3 is not changed */
904                                   lustre_cfg_string(lcfg, 3),
905                                   /* s4 is not changed */
906                                   lustre_cfg_string(lcfg, 4));
907                 return rc ? rc : 1;
908         }
909
910         /* Another commands in target device block */
911         return 0;
912 }
913
914 /**
915  * Handler that called for every record in llog.
916  * Records are processed in order they placed in llog.
917  *
918  * \param[in] llh       log to be processed
919  * \param[in] rec       current record
920  * \param[in] data      mgs_replace_uuid_lookup structure
921  *
922  * \retval 0    success
923  */
924 static int mgs_replace_handler(const struct lu_env *env,
925                                struct llog_handle *llh,
926                                struct llog_rec_hdr *rec,
927                                void *data)
928 {
929         struct mgs_replace_uuid_lookup *mrul;
930         struct lustre_cfg *lcfg = REC_DATA(rec);
931         int cfg_len = REC_DATA_LEN(rec);
932         int rc;
933         ENTRY;
934
935         mrul = (struct mgs_replace_uuid_lookup *)data;
936
937         if (rec->lrh_type != OBD_CFG_REC) {
938                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
939                        rec->lrh_type, lcfg->lcfg_command,
940                        lustre_cfg_string(lcfg, 0),
941                        lustre_cfg_string(lcfg, 1));
942                 RETURN(-EINVAL);
943         }
944
945         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
946         if (rc) {
947                 /* Do not copy any invalidated records */
948                 GOTO(skip_out, rc = 0);
949         }
950
951         rc = check_markers(lcfg, mrul);
952         if (rc || mrul->skip_it)
953                 GOTO(skip_out, rc = 0);
954
955         /* Write to new log all commands outside target device block */
956         if (!mrul->in_target_device)
957                 GOTO(copy_out, rc = 0);
958
959         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
960            (failover nids) for this target, assuming that if then
961            primary is changing then so is the failover */
962         if (mrul->device_nids_added &&
963             (lcfg->lcfg_command == LCFG_ADD_UUID ||
964              lcfg->lcfg_command == LCFG_ADD_CONN))
965                 GOTO(skip_out, rc = 0);
966
967         rc = process_command(env, lcfg, mrul);
968         if (rc < 0)
969                 RETURN(rc);
970
971         if (rc)
972                 RETURN(0);
973 copy_out:
974         /* Record is placed in temporary llog as is */
975         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
976
977         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
978                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
979                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
980         RETURN(rc);
981
982 skip_out:
983         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
984                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
985                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
986         RETURN(rc);
987 }
988
989 static int mgs_log_is_empty(const struct lu_env *env,
990                             struct mgs_device *mgs, char *name)
991 {
992         struct llog_ctxt        *ctxt;
993         int                      rc;
994
995         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
996         LASSERT(ctxt != NULL);
997
998         rc = llog_is_empty(env, ctxt, name);
999         llog_ctxt_put(ctxt);
1000         return rc;
1001 }
1002
1003 static int mgs_replace_nids_log(const struct lu_env *env,
1004                                 struct obd_device *mgs, struct fs_db *fsdb,
1005                                 char *logname, char *devname, char *nids)
1006 {
1007         struct llog_handle *orig_llh, *backup_llh;
1008         struct llog_ctxt *ctxt;
1009         struct mgs_replace_uuid_lookup *mrul;
1010         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1011         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1012         char *backup;
1013         int rc, rc2;
1014         ENTRY;
1015
1016         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1017
1018         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1019         LASSERT(ctxt != NULL);
1020
1021         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1022                 /* Log is empty. Nothing to replace */
1023                 GOTO(out_put, rc = 0);
1024         }
1025
1026         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1027         if (backup == NULL)
1028                 GOTO(out_put, rc = -ENOMEM);
1029
1030         sprintf(backup, "%s.bak", logname);
1031
1032         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1033         if (rc == 0) {
1034                 /* Now erase original log file. Connections are not allowed.
1035                    Backup is already saved */
1036                 rc = llog_erase(env, ctxt, NULL, logname);
1037                 if (rc < 0)
1038                         GOTO(out_free, rc);
1039         } else if (rc != -ENOENT) {
1040                 CERROR("%s: can't make backup for %s: rc = %d\n",
1041                        mgs->obd_name, logname, rc);
1042                 GOTO(out_free,rc);
1043         }
1044
1045         /* open local log */
1046         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1047         if (rc)
1048                 GOTO(out_restore, rc);
1049
1050         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1051         if (rc)
1052                 GOTO(out_closel, rc);
1053
1054         /* open backup llog */
1055         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1056                        LLOG_OPEN_EXISTS);
1057         if (rc)
1058                 GOTO(out_closel, rc);
1059
1060         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1061         if (rc)
1062                 GOTO(out_close, rc);
1063
1064         if (llog_get_size(backup_llh) <= 1)
1065                 GOTO(out_close, rc = 0);
1066
1067         OBD_ALLOC_PTR(mrul);
1068         if (!mrul)
1069                 GOTO(out_close, rc = -ENOMEM);
1070         /* devname is only needed information to replace UUID records */
1071         strlcpy(mrul->target.mti_svname, devname,
1072                 sizeof(mrul->target.mti_svname));
1073         /* parse nids later */
1074         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1075         /* Copy records to this temporary llog */
1076         mrul->temp_llh = orig_llh;
1077
1078         rc = llog_process(env, backup_llh, mgs_replace_handler,
1079                           (void *)mrul, NULL);
1080         OBD_FREE_PTR(mrul);
1081 out_close:
1082         rc2 = llog_close(NULL, backup_llh);
1083         if (!rc)
1084                 rc = rc2;
1085 out_closel:
1086         rc2 = llog_close(NULL, orig_llh);
1087         if (!rc)
1088                 rc = rc2;
1089
1090 out_restore:
1091         if (rc) {
1092                 CERROR("%s: llog should be restored: rc = %d\n",
1093                        mgs->obd_name, rc);
1094                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1095                                   logname);
1096                 if (rc2 < 0)
1097                         CERROR("%s: can't restore backup %s: rc = %d\n",
1098                                mgs->obd_name, logname, rc2);
1099         }
1100
1101 out_free:
1102         OBD_FREE(backup, strlen(backup) + 1);
1103
1104 out_put:
1105         llog_ctxt_put(ctxt);
1106
1107         if (rc)
1108                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1109                        mgs->obd_name, logname, rc);
1110
1111         RETURN(rc);
1112 }
1113
1114 /**
1115  * Parse device name and get file system name and/or device index
1116  *
1117  * \param[in]   devname device name (ex. lustre-MDT0000)
1118  * \param[out]  fsname  file system name(optional)
1119  * \param[out]  index   device index(optional)
1120  *
1121  * \retval 0    success
1122  */
1123 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1124 {
1125         int rc;
1126         ENTRY;
1127
1128         /* Extract fsname */
1129         if (fsname) {
1130                 rc = server_name2fsname(devname, fsname, NULL);
1131                 if (rc < 0) {
1132                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1133                                devname);
1134                         RETURN(-EINVAL);
1135                 }
1136         }
1137
1138         if (index) {
1139                 rc = server_name2index(devname, index, NULL);
1140                 if (rc < 0) {
1141                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1142                                devname);
1143                         RETURN(-EINVAL);
1144                 }
1145         }
1146
1147         RETURN(0);
1148 }
1149
1150 /* This is only called during replace_nids */
1151 static int only_mgs_is_running(struct obd_device *mgs_obd)
1152 {
1153         /* TDB: Is global variable with devices count exists? */
1154         int num_devices = get_devices_count();
1155         int num_exports = 0;
1156         struct obd_export *exp;
1157
1158         spin_lock(&mgs_obd->obd_dev_lock);
1159         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1160                 /* skip self export */
1161                 if (exp == mgs_obd->obd_self_export)
1162                         continue;
1163                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1164                         continue;
1165
1166                 ++num_exports;
1167
1168                 CERROR("%s: node %s still connected during replace_nids "
1169                        "connect_flags:%llx\n",
1170                        mgs_obd->obd_name,
1171                        libcfs_nid2str(exp->exp_nid_stats->nid),
1172                        exp_connect_flags(exp));
1173
1174         }
1175         spin_unlock(&mgs_obd->obd_dev_lock);
1176
1177         /* osd, MGS and MGC + self_export
1178            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1179         return (num_devices <= 3) && (num_exports == 0);
1180 }
1181
1182 static int name_create_mdt(char **logname, char *fsname, int i)
1183 {
1184         char mdt_index[9];
1185
1186         sprintf(mdt_index, "-MDT%04x", i);
1187         return name_create(logname, fsname, mdt_index);
1188 }
1189
1190 /**
1191  * Replace nids for \a device to \a nids values
1192  *
1193  * \param obd           MGS obd device
1194  * \param devname       nids need to be replaced for this device
1195  * (ex. lustre-OST0000)
1196  * \param nids          nids list (ex. nid1,nid2,nid3)
1197  *
1198  * \retval 0    success
1199  */
1200 int mgs_replace_nids(const struct lu_env *env,
1201                      struct mgs_device *mgs,
1202                      char *devname, char *nids)
1203 {
1204         /* Assume fsname is part of device name */
1205         char fsname[MTI_NAME_MAXLEN];
1206         int rc;
1207         __u32 index;
1208         char *logname;
1209         struct fs_db *fsdb;
1210         unsigned int i;
1211         int conn_state;
1212         struct obd_device *mgs_obd = mgs->mgs_obd;
1213         ENTRY;
1214
1215         /* We can only change NIDs if no other nodes are connected */
1216         spin_lock(&mgs_obd->obd_dev_lock);
1217         conn_state = mgs_obd->obd_no_conn;
1218         mgs_obd->obd_no_conn = 1;
1219         spin_unlock(&mgs_obd->obd_dev_lock);
1220
1221         /* We can not change nids if not only MGS is started */
1222         if (!only_mgs_is_running(mgs_obd)) {
1223                 CERROR("Only MGS is allowed to be started\n");
1224                 GOTO(out, rc = -EINPROGRESS);
1225         }
1226
1227         /* Get fsname and index*/
1228         rc = mgs_parse_devname(devname, fsname, &index);
1229         if (rc)
1230                 GOTO(out, rc);
1231
1232         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1233         if (rc) {
1234                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1235                 GOTO(out, rc);
1236         }
1237
1238         /* Process client llogs */
1239         name_create(&logname, fsname, "-client");
1240         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1241         name_destroy(&logname);
1242         if (rc) {
1243                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1244                        fsname, devname, rc);
1245                 GOTO(out, rc);
1246         }
1247
1248         /* Process MDT llogs */
1249         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1250                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1251                         continue;
1252                 name_create_mdt(&logname, fsname, i);
1253                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1254                 name_destroy(&logname);
1255                 if (rc)
1256                         GOTO(out, rc);
1257         }
1258
1259 out:
1260         spin_lock(&mgs_obd->obd_dev_lock);
1261         mgs_obd->obd_no_conn = conn_state;
1262         spin_unlock(&mgs_obd->obd_dev_lock);
1263
1264         RETURN(rc);
1265 }
1266
1267 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1268                             char *devname, struct lov_desc *desc)
1269 {
1270         struct mgs_thread_info  *mgi = mgs_env_info(env);
1271         struct llog_cfg_rec     *lcr;
1272         int rc;
1273
1274         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1275         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1276         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1277         if (lcr == NULL)
1278                 return -ENOMEM;
1279
1280         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1281         lustre_cfg_rec_free(lcr);
1282         return rc;
1283 }
1284
1285 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1286                             char *devname, struct lmv_desc *desc)
1287 {
1288         struct mgs_thread_info  *mgi = mgs_env_info(env);
1289         struct llog_cfg_rec     *lcr;
1290         int rc;
1291
1292         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1293         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1294         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1295         if (lcr == NULL)
1296                 return -ENOMEM;
1297
1298         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1299         lustre_cfg_rec_free(lcr);
1300         return rc;
1301 }
1302
1303 static inline int record_mdc_add(const struct lu_env *env,
1304                                  struct llog_handle *llh,
1305                                  char *logname, char *mdcuuid,
1306                                  char *mdtuuid, char *index,
1307                                  char *gen)
1308 {
1309         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1310                            mdtuuid,index,gen,mdcuuid);
1311 }
1312
1313 static inline int record_lov_add(const struct lu_env *env,
1314                                  struct llog_handle *llh,
1315                                  char *lov_name, char *ost_uuid,
1316                                  char *index, char *gen)
1317 {
1318         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1319                            ost_uuid, index, gen, 0);
1320 }
1321
1322 static inline int record_mount_opt(const struct lu_env *env,
1323                                    struct llog_handle *llh,
1324                                    char *profile, char *lov_name,
1325                                    char *mdc_name)
1326 {
1327         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1328                            profile,lov_name,mdc_name,0);
1329 }
1330
1331 static int record_marker(const struct lu_env *env,
1332                          struct llog_handle *llh,
1333                          struct fs_db *fsdb, __u32 flags,
1334                          char *tgtname, char *comment)
1335 {
1336         struct mgs_thread_info *mgi = mgs_env_info(env);
1337         struct llog_cfg_rec *lcr;
1338         int rc;
1339         int cplen = 0;
1340
1341         if (flags & CM_START)
1342                 fsdb->fsdb_gen++;
1343         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1344         mgi->mgi_marker.cm_flags = flags;
1345         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1346         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1347                         sizeof(mgi->mgi_marker.cm_tgtname));
1348         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1349                 return -E2BIG;
1350         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1351                         sizeof(mgi->mgi_marker.cm_comment));
1352         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1353                 return -E2BIG;
1354         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1355         mgi->mgi_marker.cm_canceltime = 0;
1356         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1357         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1358                             sizeof(mgi->mgi_marker));
1359         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1360         if (lcr == NULL)
1361                 return -ENOMEM;
1362
1363         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1364         lustre_cfg_rec_free(lcr);
1365         return rc;
1366 }
1367
1368 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1369                             struct llog_handle **llh, char *name)
1370 {
1371         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1372         struct llog_ctxt        *ctxt;
1373         int                      rc = 0;
1374         ENTRY;
1375
1376         if (*llh)
1377                 GOTO(out, rc = -EBUSY);
1378
1379         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1380         if (!ctxt)
1381                 GOTO(out, rc = -ENODEV);
1382         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1383
1384         rc = llog_open_create(env, ctxt, llh, NULL, name);
1385         if (rc)
1386                 GOTO(out_ctxt, rc);
1387         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1388         if (rc)
1389                 llog_close(env, *llh);
1390 out_ctxt:
1391         llog_ctxt_put(ctxt);
1392 out:
1393         if (rc) {
1394                 CERROR("%s: can't start log %s: rc = %d\n",
1395                        mgs->mgs_obd->obd_name, name, rc);
1396                 *llh = NULL;
1397         }
1398         RETURN(rc);
1399 }
1400
1401 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1402 {
1403         int rc;
1404
1405         rc = llog_close(env, *llh);
1406         *llh = NULL;
1407
1408         return rc;
1409 }
1410
1411 /******************** config "macros" *********************/
1412
1413 /* write an lcfg directly into a log (with markers) */
1414 static int mgs_write_log_direct(const struct lu_env *env,
1415                                 struct mgs_device *mgs, struct fs_db *fsdb,
1416                                 char *logname, struct llog_cfg_rec *lcr,
1417                                 char *devname, char *comment)
1418 {
1419         struct llog_handle *llh = NULL;
1420         int rc;
1421
1422         ENTRY;
1423
1424         rc = record_start_log(env, mgs, &llh, logname);
1425         if (rc)
1426                 RETURN(rc);
1427
1428         /* FIXME These should be a single journal transaction */
1429         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1430         if (rc)
1431                 GOTO(out_end, rc);
1432         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1433         if (rc)
1434                 GOTO(out_end, rc);
1435         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1436         if (rc)
1437                 GOTO(out_end, rc);
1438 out_end:
1439         record_end_log(env, &llh);
1440         RETURN(rc);
1441 }
1442
1443 /* write the lcfg in all logs for the given fs */
1444 int mgs_write_log_direct_all(const struct lu_env *env, struct mgs_device *mgs,
1445                              struct fs_db *fsdb, struct mgs_target_info *mti,
1446                              struct llog_cfg_rec *lcr, char *devname,
1447                              char *comment, int server_only)
1448 {
1449         struct list_head         log_list;
1450         struct mgs_direntry     *dirent, *n;
1451         char                    *fsname = mti->mti_fsname;
1452         int                      rc = 0, len = strlen(fsname);
1453
1454         ENTRY;
1455         /* Find all the logs in the CONFIGS directory */
1456         rc = class_dentry_readdir(env, mgs, &log_list);
1457         if (rc)
1458                 RETURN(rc);
1459
1460         /* Could use fsdb index maps instead of directory listing */
1461         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1462                 list_del_init(&dirent->mde_list);
1463                 /* don't write to sptlrpc rule log */
1464                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1465                         goto next;
1466
1467                 /* caller wants write server logs only */
1468                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1469                         goto next;
1470
1471                 if (strncmp(fsname, dirent->mde_name, len) != 0)
1472                         goto next;
1473
1474                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1475                 /* Erase any old settings of this same parameter */
1476                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1477                                 devname, comment, CM_SKIP);
1478                 if (rc < 0)
1479                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1480                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1481                 if (lcr == NULL)
1482                         goto next;
1483                 /* Write the new one */
1484                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1485                                           lcr, devname, comment);
1486                 if (rc != 0)
1487                         CERROR("%s: writing log %s: rc = %d\n",
1488                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1489 next:
1490                 mgs_direntry_free(dirent);
1491         }
1492
1493         RETURN(rc);
1494 }
1495
1496 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1497                                     struct mgs_device *mgs,
1498                                     struct fs_db *fsdb,
1499                                     struct mgs_target_info *mti,
1500                                     int index, char *logname);
1501 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1502                                     struct mgs_device *mgs,
1503                                     struct fs_db *fsdb,
1504                                     struct mgs_target_info *mti,
1505                                     char *logname, char *suffix, char *lovname,
1506                                     enum lustre_sec_part sec_part, int flags);
1507 static int name_create_mdt_and_lov(char **logname, char **lovname,
1508                                    struct fs_db *fsdb, int i);
1509
1510 static int add_param(char *params, char *key, char *val)
1511 {
1512         char *start = params + strlen(params);
1513         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1514         int keylen = 0;
1515
1516         if (key != NULL)
1517                 keylen = strlen(key);
1518         if (start + 1 + keylen + strlen(val) >= end) {
1519                 CERROR("params are too long: %s %s%s\n",
1520                        params, key != NULL ? key : "", val);
1521                 return -EINVAL;
1522         }
1523
1524         sprintf(start, " %s%s", key != NULL ? key : "", val);
1525         return 0;
1526 }
1527
1528 /**
1529  * Walk through client config log record and convert the related records
1530  * into the target.
1531  **/
1532 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1533                                          struct llog_handle *llh,
1534                                          struct llog_rec_hdr *rec, void *data)
1535 {
1536         struct mgs_device *mgs;
1537         struct obd_device *obd;
1538         struct mgs_target_info *mti, *tmti;
1539         struct fs_db *fsdb;
1540         int cfg_len = rec->lrh_len;
1541         char *cfg_buf = (char*) (rec + 1);
1542         struct lustre_cfg *lcfg;
1543         int rc = 0;
1544         struct llog_handle *mdt_llh = NULL;
1545         static int got_an_osc_or_mdc = 0;
1546         /* 0: not found any osc/mdc;
1547            1: found osc;
1548            2: found mdc;
1549         */
1550         static int last_step = -1;
1551         int cplen = 0;
1552
1553         ENTRY;
1554
1555         mti = ((struct temp_comp*)data)->comp_mti;
1556         tmti = ((struct temp_comp*)data)->comp_tmti;
1557         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1558         obd = ((struct temp_comp *)data)->comp_obd;
1559         mgs = lu2mgs_dev(obd->obd_lu_dev);
1560         LASSERT(mgs);
1561
1562         if (rec->lrh_type != OBD_CFG_REC) {
1563                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1564                 RETURN(-EINVAL);
1565         }
1566
1567         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1568         if (rc) {
1569                 CERROR("Insane cfg\n");
1570                 RETURN(rc);
1571         }
1572
1573         lcfg = (struct lustre_cfg *)cfg_buf;
1574
1575         if (lcfg->lcfg_command == LCFG_MARKER) {
1576                 struct cfg_marker *marker;
1577                 marker = lustre_cfg_buf(lcfg, 1);
1578                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1579                     (marker->cm_flags & CM_START) &&
1580                      !(marker->cm_flags & CM_SKIP)) {
1581                         got_an_osc_or_mdc = 1;
1582                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1583                                         sizeof(tmti->mti_svname));
1584                         if (cplen >= sizeof(tmti->mti_svname))
1585                                 RETURN(-E2BIG);
1586                         rc = record_start_log(env, mgs, &mdt_llh,
1587                                               mti->mti_svname);
1588                         if (rc)
1589                                 RETURN(rc);
1590                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1591                                            mti->mti_svname, "add osc(copied)");
1592                         record_end_log(env, &mdt_llh);
1593                         last_step = marker->cm_step;
1594                         RETURN(rc);
1595                 }
1596                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1597                     (marker->cm_flags & CM_END) &&
1598                      !(marker->cm_flags & CM_SKIP)) {
1599                         LASSERT(last_step == marker->cm_step);
1600                         last_step = -1;
1601                         got_an_osc_or_mdc = 0;
1602                         memset(tmti, 0, sizeof(*tmti));
1603                         rc = record_start_log(env, mgs, &mdt_llh,
1604                                               mti->mti_svname);
1605                         if (rc)
1606                                 RETURN(rc);
1607                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1608                                            mti->mti_svname, "add osc(copied)");
1609                         record_end_log(env, &mdt_llh);
1610                         RETURN(rc);
1611                 }
1612                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1613                     (marker->cm_flags & CM_START) &&
1614                      !(marker->cm_flags & CM_SKIP)) {
1615                         got_an_osc_or_mdc = 2;
1616                         last_step = marker->cm_step;
1617                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1618                                strlen(marker->cm_tgtname));
1619
1620                         RETURN(rc);
1621                 }
1622                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1623                     (marker->cm_flags & CM_END) &&
1624                      !(marker->cm_flags & CM_SKIP)) {
1625                         LASSERT(last_step == marker->cm_step);
1626                         last_step = -1;
1627                         got_an_osc_or_mdc = 0;
1628                         memset(tmti, 0, sizeof(*tmti));
1629                         RETURN(rc);
1630                 }
1631         }
1632
1633         if (got_an_osc_or_mdc == 0 || last_step < 0)
1634                 RETURN(rc);
1635
1636         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1637                 uint64_t nodenid = lcfg->lcfg_nid;
1638
1639                 if (strlen(tmti->mti_uuid) == 0) {
1640                         /* target uuid not set, this config record is before
1641                          * LCFG_SETUP, this nid is one of target node nid.
1642                          */
1643                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1644                         tmti->mti_nid_count++;
1645                 } else {
1646                         /* failover node nid */
1647                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1648                                        libcfs_nid2str(nodenid));
1649                 }
1650
1651                 RETURN(rc);
1652         }
1653
1654         if (lcfg->lcfg_command == LCFG_SETUP) {
1655                 char *target;
1656
1657                 target = lustre_cfg_string(lcfg, 1);
1658                 memcpy(tmti->mti_uuid, target, strlen(target));
1659                 RETURN(rc);
1660         }
1661
1662         /* ignore client side sptlrpc_conf_log */
1663         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1664                 RETURN(rc);
1665
1666         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1667                 int index;
1668
1669                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1670                         RETURN (-EINVAL);
1671
1672                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1673                        strlen(mti->mti_fsname));
1674                 tmti->mti_stripe_index = index;
1675
1676                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1677                                               mti->mti_stripe_index,
1678                                               mti->mti_svname);
1679                 memset(tmti, 0, sizeof(*tmti));
1680                 RETURN(rc);
1681         }
1682
1683         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1684                 int index;
1685                 char mdt_index[9];
1686                 char *logname, *lovname;
1687
1688                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1689                                              mti->mti_stripe_index);
1690                 if (rc)
1691                         RETURN(rc);
1692                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1693
1694                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1695                         name_destroy(&logname);
1696                         name_destroy(&lovname);
1697                         RETURN(-EINVAL);
1698                 }
1699
1700                 tmti->mti_stripe_index = index;
1701                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1702                                          mdt_index, lovname,
1703                                          LUSTRE_SP_MDT, 0);
1704                 name_destroy(&logname);
1705                 name_destroy(&lovname);
1706                 RETURN(rc);
1707         }
1708         RETURN(rc);
1709 }
1710
1711 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1712 /* stealed from mgs_get_fsdb_from_llog*/
1713 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1714                                               struct mgs_device *mgs,
1715                                               char *client_name,
1716                                               struct temp_comp* comp)
1717 {
1718         struct llog_handle *loghandle;
1719         struct mgs_target_info *tmti;
1720         struct llog_ctxt *ctxt;
1721         int rc;
1722
1723         ENTRY;
1724
1725         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1726         LASSERT(ctxt != NULL);
1727
1728         OBD_ALLOC_PTR(tmti);
1729         if (tmti == NULL)
1730                 GOTO(out_ctxt, rc = -ENOMEM);
1731
1732         comp->comp_tmti = tmti;
1733         comp->comp_obd = mgs->mgs_obd;
1734
1735         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1736                        LLOG_OPEN_EXISTS);
1737         if (rc < 0) {
1738                 if (rc == -ENOENT)
1739                         rc = 0;
1740                 GOTO(out_pop, rc);
1741         }
1742
1743         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1744         if (rc)
1745                 GOTO(out_close, rc);
1746
1747         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1748                                   (void *)comp, NULL, false);
1749         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1750 out_close:
1751         llog_close(env, loghandle);
1752 out_pop:
1753         OBD_FREE_PTR(tmti);
1754 out_ctxt:
1755         llog_ctxt_put(ctxt);
1756         RETURN(rc);
1757 }
1758
1759 /* lmv is the second thing for client logs */
1760 /* copied from mgs_write_log_lov. Please refer to that.  */
1761 static int mgs_write_log_lmv(const struct lu_env *env,
1762                              struct mgs_device *mgs,
1763                              struct fs_db *fsdb,
1764                              struct mgs_target_info *mti,
1765                              char *logname, char *lmvname)
1766 {
1767         struct llog_handle *llh = NULL;
1768         struct lmv_desc *lmvdesc;
1769         char *uuid;
1770         int rc = 0;
1771         ENTRY;
1772
1773         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1774
1775         OBD_ALLOC_PTR(lmvdesc);
1776         if (lmvdesc == NULL)
1777                 RETURN(-ENOMEM);
1778         lmvdesc->ld_active_tgt_count = 0;
1779         lmvdesc->ld_tgt_count = 0;
1780         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1781         uuid = (char *)lmvdesc->ld_uuid.uuid;
1782
1783         rc = record_start_log(env, mgs, &llh, logname);
1784         if (rc)
1785                 GOTO(out_free, rc);
1786         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1787         if (rc)
1788                 GOTO(out_end, rc);
1789         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1790         if (rc)
1791                 GOTO(out_end, rc);
1792         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1793         if (rc)
1794                 GOTO(out_end, rc);
1795         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1796         if (rc)
1797                 GOTO(out_end, rc);
1798 out_end:
1799         record_end_log(env, &llh);
1800 out_free:
1801         OBD_FREE_PTR(lmvdesc);
1802         RETURN(rc);
1803 }
1804
1805 /* lov is the first thing in the mdt and client logs */
1806 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1807                              struct fs_db *fsdb, struct mgs_target_info *mti,
1808                              char *logname, char *lovname)
1809 {
1810         struct llog_handle *llh = NULL;
1811         struct lov_desc *lovdesc;
1812         char *uuid;
1813         int rc = 0;
1814         ENTRY;
1815
1816         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1817
1818         /*
1819         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1820         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1821               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1822         */
1823
1824         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1825         OBD_ALLOC_PTR(lovdesc);
1826         if (lovdesc == NULL)
1827                 RETURN(-ENOMEM);
1828         lovdesc->ld_magic = LOV_DESC_MAGIC;
1829         lovdesc->ld_tgt_count = 0;
1830         /* Defaults.  Can be changed later by lcfg config_param */
1831         lovdesc->ld_default_stripe_count = 1;
1832         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1833         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1834         lovdesc->ld_default_stripe_offset = -1;
1835         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1836         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1837         /* can these be the same? */
1838         uuid = (char *)lovdesc->ld_uuid.uuid;
1839
1840         /* This should always be the first entry in a log.
1841         rc = mgs_clear_log(obd, logname); */
1842         rc = record_start_log(env, mgs, &llh, logname);
1843         if (rc)
1844                 GOTO(out_free, rc);
1845         /* FIXME these should be a single journal transaction */
1846         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1847         if (rc)
1848                 GOTO(out_end, rc);
1849         rc = record_attach(env, llh, lovname, "lov", uuid);
1850         if (rc)
1851                 GOTO(out_end, rc);
1852         rc = record_lov_setup(env, llh, lovname, lovdesc);
1853         if (rc)
1854                 GOTO(out_end, rc);
1855         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1856         if (rc)
1857                 GOTO(out_end, rc);
1858         EXIT;
1859 out_end:
1860         record_end_log(env, &llh);
1861 out_free:
1862         OBD_FREE_PTR(lovdesc);
1863         return rc;
1864 }
1865
1866 /* add failnids to open log */
1867 static int mgs_write_log_failnids(const struct lu_env *env,
1868                                   struct mgs_target_info *mti,
1869                                   struct llog_handle *llh,
1870                                   char *cliname)
1871 {
1872         char *failnodeuuid = NULL;
1873         char *ptr = mti->mti_params;
1874         lnet_nid_t nid;
1875         int rc = 0;
1876
1877         /*
1878         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1879         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1880         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1881         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1882         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1883         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1884         */
1885
1886         /*
1887          * Pull failnid info out of params string, which may contain something
1888          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1889          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1890          * etc.  However, convert_hostnames() should have caught those.
1891          */
1892         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1893                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1894                         if (failnodeuuid == NULL) {
1895                                 /* We don't know the failover node name,
1896                                    so just use the first nid as the uuid */
1897                                 rc = name_create(&failnodeuuid,
1898                                                  libcfs_nid2str(nid), "");
1899                                 if (rc)
1900                                         return rc;
1901                         }
1902                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1903                                "client %s\n", libcfs_nid2str(nid),
1904                                failnodeuuid, cliname);
1905                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1906                         /*
1907                          * If *ptr is ':', we have added all NIDs for
1908                          * failnodeuuid.
1909                          */
1910                         if (*ptr == ':') {
1911                                 rc = record_add_conn(env, llh, cliname,
1912                                                      failnodeuuid);
1913                                 name_destroy(&failnodeuuid);
1914                                 failnodeuuid = NULL;
1915                         }
1916                 }
1917                 if (failnodeuuid) {
1918                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1919                         name_destroy(&failnodeuuid);
1920                         failnodeuuid = NULL;
1921                 }
1922         }
1923
1924         return rc;
1925 }
1926
1927 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1928                                     struct mgs_device *mgs,
1929                                     struct fs_db *fsdb,
1930                                     struct mgs_target_info *mti,
1931                                     char *logname, char *lmvname)
1932 {
1933         struct llog_handle *llh = NULL;
1934         char *mdcname = NULL;
1935         char *nodeuuid = NULL;
1936         char *mdcuuid = NULL;
1937         char *lmvuuid = NULL;
1938         char index[6];
1939         int i, rc;
1940         ENTRY;
1941
1942         if (mgs_log_is_empty(env, mgs, logname)) {
1943                 CERROR("log is empty! Logical error\n");
1944                 RETURN(-EINVAL);
1945         }
1946
1947         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1948                mti->mti_svname, logname, lmvname);
1949
1950         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1951         if (rc)
1952                 RETURN(rc);
1953         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1954         if (rc)
1955                 GOTO(out_free, rc);
1956         rc = name_create(&mdcuuid, mdcname, "_UUID");
1957         if (rc)
1958                 GOTO(out_free, rc);
1959         rc = name_create(&lmvuuid, lmvname, "_UUID");
1960         if (rc)
1961                 GOTO(out_free, rc);
1962
1963         rc = record_start_log(env, mgs, &llh, logname);
1964         if (rc)
1965                 GOTO(out_free, rc);
1966         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1967                            "add mdc");
1968         if (rc)
1969                 GOTO(out_end, rc);
1970         for (i = 0; i < mti->mti_nid_count; i++) {
1971                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1972                        libcfs_nid2str(mti->mti_nids[i]));
1973
1974                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1975                 if (rc)
1976                         GOTO(out_end, rc);
1977         }
1978
1979         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1980         if (rc)
1981                 GOTO(out_end, rc);
1982         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1983         if (rc)
1984                 GOTO(out_end, rc);
1985         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1986         if (rc)
1987                 GOTO(out_end, rc);
1988         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1989         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1990                             index, "1");
1991         if (rc)
1992                 GOTO(out_end, rc);
1993         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1994                            "add mdc");
1995         if (rc)
1996                 GOTO(out_end, rc);
1997 out_end:
1998         record_end_log(env, &llh);
1999 out_free:
2000         name_destroy(&lmvuuid);
2001         name_destroy(&mdcuuid);
2002         name_destroy(&mdcname);
2003         name_destroy(&nodeuuid);
2004         RETURN(rc);
2005 }
2006
2007 static inline int name_create_lov(char **lovname, char *mdtname,
2008                                   struct fs_db *fsdb, int index)
2009 {
2010         /* COMPAT_180 */
2011         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2012                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2013         else
2014                 return name_create(lovname, mdtname, "-mdtlov");
2015 }
2016
2017 static int name_create_mdt_and_lov(char **logname, char **lovname,
2018                                    struct fs_db *fsdb, int i)
2019 {
2020         int rc;
2021
2022         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2023         if (rc)
2024                 return rc;
2025         /* COMPAT_180 */
2026         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2027                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2028         else
2029                 rc = name_create(lovname, *logname, "-mdtlov");
2030         if (rc) {
2031                 name_destroy(logname);
2032                 *logname = NULL;
2033         }
2034         return rc;
2035 }
2036
2037 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2038                                       struct fs_db *fsdb, int i)
2039 {
2040         char suffix[16];
2041
2042         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2043                 sprintf(suffix, "-osc");
2044         else
2045                 sprintf(suffix, "-osc-MDT%04x", i);
2046         return name_create(oscname, ostname, suffix);
2047 }
2048
2049 /* add new mdc to already existent MDS */
2050 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2051                                     struct mgs_device *mgs,
2052                                     struct fs_db *fsdb,
2053                                     struct mgs_target_info *mti,
2054                                     int mdt_index, char *logname)
2055 {
2056         struct llog_handle      *llh = NULL;
2057         char    *nodeuuid = NULL;
2058         char    *ospname = NULL;
2059         char    *lovuuid = NULL;
2060         char    *mdtuuid = NULL;
2061         char    *svname = NULL;
2062         char    *mdtname = NULL;
2063         char    *lovname = NULL;
2064         char    index_str[16];
2065         int     i, rc;
2066
2067         ENTRY;
2068         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2069                 CERROR("log is empty! Logical error\n");
2070                 RETURN (-EINVAL);
2071         }
2072
2073         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2074                logname);
2075
2076         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2077         if (rc)
2078                 RETURN(rc);
2079
2080         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2081         if (rc)
2082                 GOTO(out_destory, rc);
2083
2084         rc = name_create(&svname, mdtname, "-osp");
2085         if (rc)
2086                 GOTO(out_destory, rc);
2087
2088         sprintf(index_str, "-MDT%04x", mdt_index);
2089         rc = name_create(&ospname, svname, index_str);
2090         if (rc)
2091                 GOTO(out_destory, rc);
2092
2093         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2094         if (rc)
2095                 GOTO(out_destory, rc);
2096
2097         rc = name_create(&lovuuid, lovname, "_UUID");
2098         if (rc)
2099                 GOTO(out_destory, rc);
2100
2101         rc = name_create(&mdtuuid, mdtname, "_UUID");
2102         if (rc)
2103                 GOTO(out_destory, rc);
2104
2105         rc = record_start_log(env, mgs, &llh, logname);
2106         if (rc)
2107                 GOTO(out_destory, rc);
2108
2109         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2110                            "add osp");
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         for (i = 0; i < mti->mti_nid_count; i++) {
2115                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2116                        libcfs_nid2str(mti->mti_nids[i]));
2117                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2118                 if (rc)
2119                         GOTO(out_end, rc);
2120         }
2121
2122         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2123         if (rc)
2124                 GOTO(out_end, rc);
2125
2126         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2127                           NULL, NULL);
2128         if (rc)
2129                 GOTO(out_end, rc);
2130
2131         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2132         if (rc)
2133                 GOTO(out_end, rc);
2134
2135         /* Add mdc(osp) to lod */
2136         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2137                  mti->mti_stripe_index);
2138         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2139                          index_str, "1", NULL);
2140         if (rc)
2141                 GOTO(out_end, rc);
2142
2143         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2144         if (rc)
2145                 GOTO(out_end, rc);
2146
2147 out_end:
2148         record_end_log(env, &llh);
2149
2150 out_destory:
2151         name_destroy(&mdtuuid);
2152         name_destroy(&lovuuid);
2153         name_destroy(&lovname);
2154         name_destroy(&ospname);
2155         name_destroy(&svname);
2156         name_destroy(&nodeuuid);
2157         name_destroy(&mdtname);
2158         RETURN(rc);
2159 }
2160
2161 static int mgs_write_log_mdt0(const struct lu_env *env,
2162                               struct mgs_device *mgs,
2163                               struct fs_db *fsdb,
2164                               struct mgs_target_info *mti)
2165 {
2166         char *log = mti->mti_svname;
2167         struct llog_handle *llh = NULL;
2168         char *uuid, *lovname;
2169         char mdt_index[6];
2170         char *ptr = mti->mti_params;
2171         int rc = 0, failout = 0;
2172         ENTRY;
2173
2174         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2175         if (uuid == NULL)
2176                 RETURN(-ENOMEM);
2177
2178         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2179                 failout = (strncmp(ptr, "failout", 7) == 0);
2180
2181         rc = name_create(&lovname, log, "-mdtlov");
2182         if (rc)
2183                 GOTO(out_free, rc);
2184         if (mgs_log_is_empty(env, mgs, log)) {
2185                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2186                 if (rc)
2187                         GOTO(out_lod, rc);
2188         }
2189
2190         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2191
2192         rc = record_start_log(env, mgs, &llh, log);
2193         if (rc)
2194                 GOTO(out_lod, rc);
2195
2196         /* add MDT itself */
2197
2198         /* FIXME this whole fn should be a single journal transaction */
2199         sprintf(uuid, "%s_UUID", log);
2200         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2201         if (rc)
2202                 GOTO(out_lod, rc);
2203         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2204         if (rc)
2205                 GOTO(out_end, rc);
2206         rc = record_mount_opt(env, llh, log, lovname, NULL);
2207         if (rc)
2208                 GOTO(out_end, rc);
2209         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2210                         failout ? "n" : "f");
2211         if (rc)
2212                 GOTO(out_end, rc);
2213         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2214         if (rc)
2215                 GOTO(out_end, rc);
2216 out_end:
2217         record_end_log(env, &llh);
2218 out_lod:
2219         name_destroy(&lovname);
2220 out_free:
2221         OBD_FREE(uuid, sizeof(struct obd_uuid));
2222         RETURN(rc);
2223 }
2224
2225 /* envelope method for all layers log */
2226 static int mgs_write_log_mdt(const struct lu_env *env,
2227                              struct mgs_device *mgs,
2228                              struct fs_db *fsdb,
2229                              struct mgs_target_info *mti)
2230 {
2231         struct mgs_thread_info *mgi = mgs_env_info(env);
2232         struct llog_handle *llh = NULL;
2233         char *cliname;
2234         int rc, i = 0;
2235         ENTRY;
2236
2237         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2238
2239         if (mti->mti_uuid[0] == '\0') {
2240                 /* Make up our own uuid */
2241                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2242                          "%s_UUID", mti->mti_svname);
2243         }
2244
2245         /* add mdt */
2246         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2247         if (rc)
2248                 RETURN(rc);
2249         /* Append the mdt info to the client log */
2250         rc = name_create(&cliname, mti->mti_fsname, "-client");
2251         if (rc)
2252                 RETURN(rc);
2253
2254         if (mgs_log_is_empty(env, mgs, cliname)) {
2255                 /* Start client log */
2256                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2257                                        fsdb->fsdb_clilov);
2258                 if (rc)
2259                         GOTO(out_free, rc);
2260                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2261                                        fsdb->fsdb_clilmv);
2262                 if (rc)
2263                         GOTO(out_free, rc);
2264         }
2265
2266         /*
2267         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2268         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2269         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2270         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2271         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2272         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2273         */
2274
2275                 /* copy client info about lov/lmv */
2276                 mgi->mgi_comp.comp_mti = mti;
2277                 mgi->mgi_comp.comp_fsdb = fsdb;
2278
2279                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2280                                                         &mgi->mgi_comp);
2281                 if (rc)
2282                         GOTO(out_free, rc);
2283                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2284                                               fsdb->fsdb_clilmv);
2285                 if (rc)
2286                         GOTO(out_free, rc);
2287
2288                 /* add mountopts */
2289                 rc = record_start_log(env, mgs, &llh, cliname);
2290                 if (rc)
2291                         GOTO(out_free, rc);
2292
2293                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2294                                    "mount opts");
2295                 if (rc)
2296                         GOTO(out_end, rc);
2297                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2298                                       fsdb->fsdb_clilmv);
2299                 if (rc)
2300                         GOTO(out_end, rc);
2301                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2302                                    "mount opts");
2303
2304         if (rc)
2305                 GOTO(out_end, rc);
2306
2307         /* for_all_existing_mdt except current one */
2308         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2309                 if (i !=  mti->mti_stripe_index &&
2310                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2311                         char *logname;
2312
2313                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2314                         if (rc)
2315                                 GOTO(out_end, rc);
2316
2317                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2318                                                       i, logname);
2319                         name_destroy(&logname);
2320                         if (rc)
2321                                 GOTO(out_end, rc);
2322                 }
2323         }
2324 out_end:
2325         record_end_log(env, &llh);
2326 out_free:
2327         name_destroy(&cliname);
2328         RETURN(rc);
2329 }
2330
2331 /* Add the ost info to the client/mdt lov */
2332 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2333                                     struct mgs_device *mgs, struct fs_db *fsdb,
2334                                     struct mgs_target_info *mti,
2335                                     char *logname, char *suffix, char *lovname,
2336                                     enum lustre_sec_part sec_part, int flags)
2337 {
2338         struct llog_handle *llh = NULL;
2339         char *nodeuuid = NULL;
2340         char *oscname = NULL;
2341         char *oscuuid = NULL;
2342         char *lovuuid = NULL;
2343         char *svname = NULL;
2344         char index[6];
2345         int i, rc;
2346
2347         ENTRY;
2348         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2349                mti->mti_svname, logname);
2350
2351         if (mgs_log_is_empty(env, mgs, logname)) {
2352                 CERROR("log is empty! Logical error\n");
2353                 RETURN (-EINVAL);
2354         }
2355
2356         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2357         if (rc)
2358                 RETURN(rc);
2359         rc = name_create(&svname, mti->mti_svname, "-osc");
2360         if (rc)
2361                 GOTO(out_free, rc);
2362
2363         /* for the system upgraded from old 1.8, keep using the old osc naming
2364          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2365         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2366                 rc = name_create(&oscname, svname, "");
2367         else
2368                 rc = name_create(&oscname, svname, suffix);
2369         if (rc)
2370                 GOTO(out_free, rc);
2371
2372         rc = name_create(&oscuuid, oscname, "_UUID");
2373         if (rc)
2374                 GOTO(out_free, rc);
2375         rc = name_create(&lovuuid, lovname, "_UUID");
2376         if (rc)
2377                 GOTO(out_free, rc);
2378
2379
2380         /*
2381         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2382         multihomed (#4)
2383         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2384         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2385         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2386         failover (#6,7)
2387         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2388         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2389         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2390         */
2391
2392         rc = record_start_log(env, mgs, &llh, logname);
2393         if (rc)
2394                 GOTO(out_free, rc);
2395
2396         /* FIXME these should be a single journal transaction */
2397         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2398                            "add osc");
2399         if (rc)
2400                 GOTO(out_end, rc);
2401
2402         /* NB: don't change record order, because upon MDT steal OSC config
2403          * from client, it treats all nids before LCFG_SETUP as target nids
2404          * (multiple interfaces), while nids after as failover node nids.
2405          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2406          */
2407         for (i = 0; i < mti->mti_nid_count; i++) {
2408                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2409                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2410                 if (rc)
2411                         GOTO(out_end, rc);
2412         }
2413         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2414         if (rc)
2415                 GOTO(out_end, rc);
2416         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2417         if (rc)
2418                 GOTO(out_end, rc);
2419         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2420         if (rc)
2421                 GOTO(out_end, rc);
2422
2423         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2424
2425         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2426         if (rc)
2427                 GOTO(out_end, rc);
2428         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2429                            "add osc");
2430         if (rc)
2431                 GOTO(out_end, rc);
2432 out_end:
2433         record_end_log(env, &llh);
2434 out_free:
2435         name_destroy(&lovuuid);
2436         name_destroy(&oscuuid);
2437         name_destroy(&oscname);
2438         name_destroy(&svname);
2439         name_destroy(&nodeuuid);
2440         RETURN(rc);
2441 }
2442
2443 static int mgs_write_log_ost(const struct lu_env *env,
2444                              struct mgs_device *mgs, struct fs_db *fsdb,
2445                              struct mgs_target_info *mti)
2446 {
2447         struct llog_handle *llh = NULL;
2448         char *logname, *lovname;
2449         char *ptr = mti->mti_params;
2450         int rc, flags = 0, failout = 0, i;
2451         ENTRY;
2452
2453         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2454
2455         /* The ost startup log */
2456
2457         /* If the ost log already exists, that means that someone reformatted
2458            the ost and it called target_add again. */
2459         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2460                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2461                                    "exists, yet the server claims it never "
2462                                    "registered. It may have been reformatted, "
2463                                    "or the index changed. writeconf the MDT to "
2464                                    "regenerate all logs.\n", mti->mti_svname);
2465                 RETURN(-EALREADY);
2466         }
2467
2468         /*
2469         attach obdfilter ost1 ost1_UUID
2470         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2471         */
2472         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2473                 failout = (strncmp(ptr, "failout", 7) == 0);
2474         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2475         if (rc)
2476                 RETURN(rc);
2477         /* FIXME these should be a single journal transaction */
2478         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2479         if (rc)
2480                 GOTO(out_end, rc);
2481         if (*mti->mti_uuid == '\0')
2482                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2483                          "%s_UUID", mti->mti_svname);
2484         rc = record_attach(env, llh, mti->mti_svname,
2485                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2486         if (rc)
2487                 GOTO(out_end, rc);
2488         rc = record_setup(env, llh, mti->mti_svname,
2489                           "dev"/*ignored*/, "type"/*ignored*/,
2490                           failout ? "n" : "f", 0/*options*/);
2491         if (rc)
2492                 GOTO(out_end, rc);
2493         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2494         if (rc)
2495                 GOTO(out_end, rc);
2496 out_end:
2497         record_end_log(env, &llh);
2498         if (rc)
2499                 RETURN(rc);
2500         /* We also have to update the other logs where this osc is part of
2501            the lov */
2502
2503         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2504                 /* If we're upgrading, the old mdt log already has our
2505                    entry. Let's do a fake one for fun. */
2506                 /* Note that we can't add any new failnids, since we don't
2507                    know the old osc names. */
2508                 flags = CM_SKIP | CM_UPGRADE146;
2509
2510         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2511                 /* If the update flag isn't set, don't update client/mdt
2512                    logs. */
2513                 flags |= CM_SKIP;
2514                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2515                               "the MDT first to regenerate it.\n",
2516                               mti->mti_svname);
2517         }
2518
2519         /* Add ost to all MDT lov defs */
2520         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2521                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2522                         char mdt_index[9];
2523
2524                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2525                                                      i);
2526                         if (rc)
2527                                 RETURN(rc);
2528                         sprintf(mdt_index, "-MDT%04x", i);
2529                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2530                                                       logname, mdt_index,
2531                                                       lovname, LUSTRE_SP_MDT,
2532                                                       flags);
2533                         name_destroy(&logname);
2534                         name_destroy(&lovname);
2535                         if (rc)
2536                                 RETURN(rc);
2537                 }
2538         }
2539
2540         /* Append ost info to the client log */
2541         rc = name_create(&logname, mti->mti_fsname, "-client");
2542         if (rc)
2543                 RETURN(rc);
2544         if (mgs_log_is_empty(env, mgs, logname)) {
2545                 /* Start client log */
2546                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2547                                        fsdb->fsdb_clilov);
2548                 if (rc)
2549                         GOTO(out_free, rc);
2550                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2551                                        fsdb->fsdb_clilmv);
2552                 if (rc)
2553                         GOTO(out_free, rc);
2554         }
2555         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2556                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2557 out_free:
2558         name_destroy(&logname);
2559         RETURN(rc);
2560 }
2561
2562 static __inline__ int mgs_param_empty(char *ptr)
2563 {
2564         char *tmp;
2565
2566         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2567                 return 1;
2568         return 0;
2569 }
2570
2571 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2572                                           struct mgs_device *mgs,
2573                                           struct fs_db *fsdb,
2574                                           struct mgs_target_info *mti,
2575                                           char *logname, char *cliname)
2576 {
2577         int rc;
2578         struct llog_handle *llh = NULL;
2579
2580         if (mgs_param_empty(mti->mti_params)) {
2581                 /* Remove _all_ failnids */
2582                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2583                                 mti->mti_svname, "add failnid", CM_SKIP);
2584                 return rc < 0 ? rc : 0;
2585         }
2586
2587         /* Otherwise failover nids are additive */
2588         rc = record_start_log(env, mgs, &llh, logname);
2589         if (rc)
2590                 return rc;
2591                 /* FIXME this should be a single journal transaction */
2592         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2593                            "add failnid");
2594         if (rc)
2595                 goto out_end;
2596         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2597         if (rc)
2598                 goto out_end;
2599         rc = record_marker(env, llh, fsdb, CM_END,
2600                            mti->mti_svname, "add failnid");
2601 out_end:
2602         record_end_log(env, &llh);
2603         return rc;
2604 }
2605
2606
2607 /* Add additional failnids to an existing log.
2608    The mdc/osc must have been added to logs first */
2609 /* tcp nids must be in dotted-quad ascii -
2610    we can't resolve hostnames from the kernel. */
2611 static int mgs_write_log_add_failnid(const struct lu_env *env,
2612                                      struct mgs_device *mgs,
2613                                      struct fs_db *fsdb,
2614                                      struct mgs_target_info *mti)
2615 {
2616         char *logname, *cliname;
2617         int rc;
2618         ENTRY;
2619
2620         /* FIXME we currently can't erase the failnids
2621          * given when a target first registers, since they aren't part of
2622          * an "add uuid" stanza */
2623
2624         /* Verify that we know about this target */
2625         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2626                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2627                                    "yet. It must be started before failnids "
2628                                    "can be added.\n", mti->mti_svname);
2629                 RETURN(-ENOENT);
2630         }
2631
2632         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2633         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2634                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2635         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2636                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2637         } else {
2638                 RETURN(-EINVAL);
2639         }
2640         if (rc)
2641                 RETURN(rc);
2642         /* Add failover nids to the client log */
2643         rc = name_create(&logname, mti->mti_fsname, "-client");
2644         if (rc) {
2645                 name_destroy(&cliname);
2646                 RETURN(rc);
2647         }
2648         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2649         name_destroy(&logname);
2650         name_destroy(&cliname);
2651         if (rc)
2652                 RETURN(rc);
2653
2654         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2655                 /* Add OST failover nids to the MDT logs as well */
2656                 int i;
2657
2658                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2659                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2660                                 continue;
2661                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2662                         if (rc)
2663                                 RETURN(rc);
2664                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2665                                                  fsdb, i);
2666                         if (rc) {
2667                                 name_destroy(&logname);
2668                                 RETURN(rc);
2669                         }
2670                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2671                                                             mti, logname,
2672                                                             cliname);
2673                         name_destroy(&cliname);
2674                         name_destroy(&logname);
2675                         if (rc)
2676                                 RETURN(rc);
2677                 }
2678         }
2679
2680         RETURN(rc);
2681 }
2682
2683 static int mgs_wlp_lcfg(const struct lu_env *env,
2684                         struct mgs_device *mgs, struct fs_db *fsdb,
2685                         struct mgs_target_info *mti,
2686                         char *logname, struct lustre_cfg_bufs *bufs,
2687                         char *tgtname, char *ptr)
2688 {
2689         char comment[MTI_NAME_MAXLEN];
2690         char *tmp;
2691         struct llog_cfg_rec *lcr;
2692         int rc, del;
2693
2694         /* Erase any old settings of this same parameter */
2695         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2696         comment[MTI_NAME_MAXLEN - 1] = 0;
2697         /* But don't try to match the value. */
2698         tmp = strchr(comment, '=');
2699         if (tmp != NULL)
2700                 *tmp = 0;
2701         /* FIXME we should skip settings that are the same as old values */
2702         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2703         if (rc < 0)
2704                 return rc;
2705         del = mgs_param_empty(ptr);
2706
2707         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2708                       "Setting" : "Modifying", tgtname, comment, logname);
2709         if (del) {
2710                 /* mgs_modify() will return 1 if nothing had to be done */
2711                 if (rc == 1)
2712                         rc = 0;
2713                 return rc;
2714         }
2715
2716         lustre_cfg_bufs_reset(bufs, tgtname);
2717         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2718         if (mti->mti_flags & LDD_F_PARAM2)
2719                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2720
2721         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2722                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2723         if (lcr == NULL)
2724                 return -ENOMEM;
2725
2726         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2727                                   comment);
2728         lustre_cfg_rec_free(lcr);
2729         return rc;
2730 }
2731
2732 static int mgs_write_log_param2(const struct lu_env *env,
2733                                 struct mgs_device *mgs,
2734                                 struct fs_db *fsdb,
2735                                 struct mgs_target_info *mti, char *ptr)
2736 {
2737         struct lustre_cfg_bufs  bufs;
2738         int                     rc = 0;
2739         ENTRY;
2740
2741         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2742         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2743                           mti->mti_svname, ptr);
2744
2745         RETURN(rc);
2746 }
2747
2748 /* write global variable settings into log */
2749 static int mgs_write_log_sys(const struct lu_env *env,
2750                              struct mgs_device *mgs, struct fs_db *fsdb,
2751                              struct mgs_target_info *mti, char *sys, char *ptr)
2752 {
2753         struct mgs_thread_info  *mgi = mgs_env_info(env);
2754         struct lustre_cfg       *lcfg;
2755         struct llog_cfg_rec     *lcr;
2756         char *tmp, sep;
2757         int rc, cmd, convert = 1;
2758
2759         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2760                 cmd = LCFG_SET_TIMEOUT;
2761         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2762                 cmd = LCFG_SET_LDLM_TIMEOUT;
2763         /* Check for known params here so we can return error to lctl */
2764         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2765                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2766                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2767                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2768                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2769                 cmd = LCFG_PARAM;
2770         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2771                 convert = 0; /* Don't convert string value to integer */
2772                 cmd = LCFG_PARAM;
2773         } else {
2774                 return -EINVAL;
2775         }
2776
2777         if (mgs_param_empty(ptr))
2778                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2779         else
2780                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2781
2782         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2783         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2784         if (!convert && *tmp != '\0')
2785                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2786         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2787         if (lcr == NULL)
2788                 return -ENOMEM;
2789
2790         lcfg = &lcr->lcr_cfg;
2791         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2792         /* truncate the comment to the parameter name */
2793         ptr = tmp - 1;
2794         sep = *ptr;
2795         *ptr = '\0';
2796         /* modify all servers and clients */
2797         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2798                                       *tmp == '\0' ? NULL : lcr,
2799                                       mti->mti_fsname, sys, 0);
2800         if (rc == 0 && *tmp != '\0') {
2801                 switch (cmd) {
2802                 case LCFG_SET_TIMEOUT:
2803                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2804                                 class_process_config(lcfg);
2805                         break;
2806                 case LCFG_SET_LDLM_TIMEOUT:
2807                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2808                                 class_process_config(lcfg);
2809                         break;
2810                 default:
2811                         break;
2812                 }
2813         }
2814         *ptr = sep;
2815         lustre_cfg_rec_free(lcr);
2816         return rc;
2817 }
2818
2819 /* write quota settings into log */
2820 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2821                                struct fs_db *fsdb, struct mgs_target_info *mti,
2822                                char *quota, char *ptr)
2823 {
2824         struct mgs_thread_info  *mgi = mgs_env_info(env);
2825         struct llog_cfg_rec     *lcr;
2826         char                    *tmp;
2827         char                     sep;
2828         int                      rc, cmd = LCFG_PARAM;
2829
2830         /* support only 'meta' and 'data' pools so far */
2831         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2832             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2833                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2834                        "& quota.ost are)\n", ptr);
2835                 return -EINVAL;
2836         }
2837
2838         if (*tmp == '\0') {
2839                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2840         } else {
2841                 CDEBUG(D_MGS, "global '%s'\n", quota);
2842
2843                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2844                     strcmp(tmp, "none") != 0) {
2845                         CERROR("enable option(%s) isn't supported\n", tmp);
2846                         return -EINVAL;
2847                 }
2848         }
2849
2850         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2851         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2852         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2853         if (lcr == NULL)
2854                 return -ENOMEM;
2855
2856         /* truncate the comment to the parameter name */
2857         ptr = tmp - 1;
2858         sep = *ptr;
2859         *ptr = '\0';
2860
2861         /* XXX we duplicated quota enable information in all server
2862          *     config logs, it should be moved to a separate config
2863          *     log once we cleanup the config log for global param. */
2864         /* modify all servers */
2865         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2866                                       *tmp == '\0' ? NULL : lcr,
2867                                       mti->mti_fsname, quota, 1);
2868         *ptr = sep;
2869         lustre_cfg_rec_free(lcr);
2870         return rc < 0 ? rc : 0;
2871 }
2872
2873 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2874                                    struct mgs_device *mgs,
2875                                    struct fs_db *fsdb,
2876                                    struct mgs_target_info *mti,
2877                                    char *param)
2878 {
2879         struct mgs_thread_info  *mgi = mgs_env_info(env);
2880         struct llog_cfg_rec     *lcr;
2881         struct llog_handle      *llh = NULL;
2882         char                    *logname;
2883         char                    *comment, *ptr;
2884         int                      rc, len;
2885
2886         ENTRY;
2887
2888         /* get comment */
2889         ptr = strchr(param, '=');
2890         LASSERT(ptr != NULL);
2891         len = ptr - param;
2892
2893         OBD_ALLOC(comment, len + 1);
2894         if (comment == NULL)
2895                 RETURN(-ENOMEM);
2896         strncpy(comment, param, len);
2897         comment[len] = '\0';
2898
2899         /* prepare lcfg */
2900         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2901         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2902         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2903         if (lcr == NULL)
2904                 GOTO(out_comment, rc = -ENOMEM);
2905
2906         /* construct log name */
2907         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2908         if (rc < 0)
2909                 GOTO(out_lcfg, rc);
2910
2911         if (mgs_log_is_empty(env, mgs, logname)) {
2912                 rc = record_start_log(env, mgs, &llh, logname);
2913                 if (rc < 0)
2914                         GOTO(out, rc);
2915                 record_end_log(env, &llh);
2916         }
2917
2918         /* obsolete old one */
2919         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2920                         comment, CM_SKIP);
2921         if (rc < 0)
2922                 GOTO(out, rc);
2923         /* write the new one */
2924         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2925                                   mti->mti_svname, comment);
2926         if (rc)
2927                 CERROR("%s: error writing log %s: rc = %d\n",
2928                        mgs->mgs_obd->obd_name, logname, rc);
2929 out:
2930         name_destroy(&logname);
2931 out_lcfg:
2932         lustre_cfg_rec_free(lcr);
2933 out_comment:
2934         OBD_FREE(comment, len + 1);
2935         RETURN(rc);
2936 }
2937
2938 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2939                                         char *param)
2940 {
2941         char    *ptr;
2942
2943         /* disable the adjustable udesc parameter for now, i.e. use default
2944          * setting that client always ship udesc to MDT if possible. to enable
2945          * it simply remove the following line */
2946         goto error_out;
2947
2948         ptr = strchr(param, '=');
2949         if (ptr == NULL)
2950                 goto error_out;
2951         *ptr++ = '\0';
2952
2953         if (strcmp(param, PARAM_SRPC_UDESC))
2954                 goto error_out;
2955
2956         if (strcmp(ptr, "yes") == 0) {
2957                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2958                 CWARN("Enable user descriptor shipping from client to MDT\n");
2959         } else if (strcmp(ptr, "no") == 0) {
2960                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2961                 CWARN("Disable user descriptor shipping from client to MDT\n");
2962         } else {
2963                 *(ptr - 1) = '=';
2964                 goto error_out;
2965         }
2966         return 0;
2967
2968 error_out:
2969         CERROR("Invalid param: %s\n", param);
2970         return -EINVAL;
2971 }
2972
2973 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2974                                   const char *svname,
2975                                   char *param)
2976 {
2977         struct sptlrpc_rule      rule;
2978         struct sptlrpc_rule_set *rset;
2979         int                      rc;
2980         ENTRY;
2981
2982         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2983                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2984                 RETURN(-EINVAL);
2985         }
2986
2987         if (strncmp(param, PARAM_SRPC_UDESC,
2988                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2989                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2990         }
2991
2992         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2993                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2994                 RETURN(-EINVAL);
2995         }
2996
2997         param += sizeof(PARAM_SRPC_FLVR) - 1;
2998
2999         rc = sptlrpc_parse_rule(param, &rule);
3000         if (rc)
3001                 RETURN(rc);
3002
3003         /* mgs rules implies must be mgc->mgs */
3004         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3005                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3006                      rule.sr_from != LUSTRE_SP_ANY) ||
3007                     (rule.sr_to != LUSTRE_SP_MGS &&
3008                      rule.sr_to != LUSTRE_SP_ANY))
3009                         RETURN(-EINVAL);
3010         }
3011
3012         /* preapre room for this coming rule. svcname format should be:
3013          * - fsname: general rule
3014          * - fsname-tgtname: target-specific rule
3015          */
3016         if (strchr(svname, '-')) {
3017                 struct mgs_tgt_srpc_conf *tgtconf;
3018                 int                       found = 0;
3019
3020                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3021                      tgtconf = tgtconf->mtsc_next) {
3022                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3023                                 found = 1;
3024                                 break;
3025                         }
3026                 }
3027
3028                 if (!found) {
3029                         int name_len;
3030
3031                         OBD_ALLOC_PTR(tgtconf);
3032                         if (tgtconf == NULL)
3033                                 RETURN(-ENOMEM);
3034
3035                         name_len = strlen(svname);
3036
3037                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3038                         if (tgtconf->mtsc_tgt == NULL) {
3039                                 OBD_FREE_PTR(tgtconf);
3040                                 RETURN(-ENOMEM);
3041                         }
3042                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3043
3044                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3045                         fsdb->fsdb_srpc_tgt = tgtconf;
3046                 }
3047
3048                 rset = &tgtconf->mtsc_rset;
3049         } else {
3050                 rset = &fsdb->fsdb_srpc_gen;
3051         }
3052
3053         rc = sptlrpc_rule_set_merge(rset, &rule);
3054
3055         RETURN(rc);
3056 }
3057
3058 static int mgs_srpc_set_param(const struct lu_env *env,
3059                               struct mgs_device *mgs,
3060                               struct fs_db *fsdb,
3061                               struct mgs_target_info *mti,
3062                               char *param)
3063 {
3064         char                   *copy;
3065         int                     rc, copy_size;
3066         ENTRY;
3067
3068 #ifndef HAVE_GSS
3069         RETURN(-EINVAL);
3070 #endif
3071         /* keep a copy of original param, which could be destroied
3072          * during parsing */
3073         copy_size = strlen(param) + 1;
3074         OBD_ALLOC(copy, copy_size);
3075         if (copy == NULL)
3076                 return -ENOMEM;
3077         memcpy(copy, param, copy_size);
3078
3079         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3080         if (rc)
3081                 goto out_free;
3082
3083         /* previous steps guaranteed the syntax is correct */
3084         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3085         if (rc)
3086                 goto out_free;
3087
3088         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3089                 /*
3090                  * for mgs rules, make them effective immediately.
3091                  */
3092                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3093                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3094                                                  &fsdb->fsdb_srpc_gen);
3095         }
3096
3097 out_free:
3098         OBD_FREE(copy, copy_size);
3099         RETURN(rc);
3100 }
3101
3102 struct mgs_srpc_read_data {
3103         struct fs_db   *msrd_fsdb;
3104         int             msrd_skip;
3105 };
3106
3107 static int mgs_srpc_read_handler(const struct lu_env *env,
3108                                  struct llog_handle *llh,
3109                                  struct llog_rec_hdr *rec, void *data)
3110 {
3111         struct mgs_srpc_read_data *msrd = data;
3112         struct cfg_marker         *marker;
3113         struct lustre_cfg         *lcfg = REC_DATA(rec);
3114         char                      *svname, *param;
3115         int                        cfg_len, rc;
3116         ENTRY;
3117
3118         if (rec->lrh_type != OBD_CFG_REC) {
3119                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3120                 RETURN(-EINVAL);
3121         }
3122
3123         cfg_len = REC_DATA_LEN(rec);
3124
3125         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3126         if (rc) {
3127                 CERROR("Insane cfg\n");
3128                 RETURN(rc);
3129         }
3130
3131         if (lcfg->lcfg_command == LCFG_MARKER) {
3132                 marker = lustre_cfg_buf(lcfg, 1);
3133
3134                 if (marker->cm_flags & CM_START &&
3135                     marker->cm_flags & CM_SKIP)
3136                         msrd->msrd_skip = 1;
3137                 if (marker->cm_flags & CM_END)
3138                         msrd->msrd_skip = 0;
3139
3140                 RETURN(0);
3141         }
3142
3143         if (msrd->msrd_skip)
3144                 RETURN(0);
3145
3146         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3147                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3148                 RETURN(0);
3149         }
3150
3151         svname = lustre_cfg_string(lcfg, 0);
3152         if (svname == NULL) {
3153                 CERROR("svname is empty\n");
3154                 RETURN(0);
3155         }
3156
3157         param = lustre_cfg_string(lcfg, 1);
3158         if (param == NULL) {
3159                 CERROR("param is empty\n");
3160                 RETURN(0);
3161         }
3162
3163         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3164         if (rc)
3165                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3166
3167         RETURN(0);
3168 }
3169
3170 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3171                                 struct mgs_device *mgs,
3172                                 struct fs_db *fsdb)
3173 {
3174         struct llog_handle        *llh = NULL;
3175         struct llog_ctxt          *ctxt;
3176         char                      *logname;
3177         struct mgs_srpc_read_data  msrd;
3178         int                        rc;
3179         ENTRY;
3180
3181         /* construct log name */
3182         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3183         if (rc)
3184                 RETURN(rc);
3185
3186         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3187         LASSERT(ctxt != NULL);
3188
3189         if (mgs_log_is_empty(env, mgs, logname))
3190                 GOTO(out, rc = 0);
3191
3192         rc = llog_open(env, ctxt, &llh, NULL, logname,
3193                        LLOG_OPEN_EXISTS);
3194         if (rc < 0) {
3195                 if (rc == -ENOENT)
3196                         rc = 0;
3197                 GOTO(out, rc);
3198         }
3199
3200         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3201         if (rc)
3202                 GOTO(out_close, rc);
3203
3204         if (llog_get_size(llh) <= 1)
3205                 GOTO(out_close, rc = 0);
3206
3207         msrd.msrd_fsdb = fsdb;
3208         msrd.msrd_skip = 0;
3209
3210         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3211                           NULL);
3212
3213 out_close:
3214         llog_close(env, llh);
3215 out:
3216         llog_ctxt_put(ctxt);
3217         name_destroy(&logname);
3218
3219         if (rc)
3220                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3221         RETURN(rc);
3222 }
3223
3224 /* Permanent settings of all parameters by writing into the appropriate
3225  * configuration logs.
3226  * A parameter with null value ("<param>='\0'") means to erase it out of
3227  * the logs.
3228  */
3229 static int mgs_write_log_param(const struct lu_env *env,
3230                                struct mgs_device *mgs, struct fs_db *fsdb,
3231                                struct mgs_target_info *mti, char *ptr)
3232 {
3233         struct mgs_thread_info *mgi = mgs_env_info(env);
3234         char *logname;
3235         char *tmp;
3236         int rc = 0, rc2 = 0;
3237         ENTRY;
3238
3239         /* For various parameter settings, we have to figure out which logs
3240            care about them (e.g. both mdt and client for lov settings) */
3241         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3242
3243         /* The params are stored in MOUNT_DATA_FILE and modified via
3244            tunefs.lustre, or set using lctl conf_param */
3245
3246         /* Processed in lustre_start_mgc */
3247         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3248                 GOTO(end, rc);
3249
3250         /* Processed in ost/mdt */
3251         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3252                 GOTO(end, rc);
3253
3254         /* Processed in mgs_write_log_ost */
3255         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3256                 if (mti->mti_flags & LDD_F_PARAM) {
3257                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3258                                            "changed with tunefs.lustre"
3259                                            "and --writeconf\n", ptr);
3260                         rc = -EPERM;
3261                 }
3262                 GOTO(end, rc);
3263         }
3264
3265         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3266                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3267                 GOTO(end, rc);
3268         }
3269
3270         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3271                 /* Add a failover nidlist */
3272                 rc = 0;
3273                 /* We already processed failovers params for new
3274                    targets in mgs_write_log_target */
3275                 if (mti->mti_flags & LDD_F_PARAM) {
3276                         CDEBUG(D_MGS, "Adding failnode\n");
3277                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3278                 }
3279                 GOTO(end, rc);
3280         }
3281
3282         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3283                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3284                 GOTO(end, rc);
3285         }
3286
3287         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3288                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3289                 GOTO(end, rc);
3290         }
3291
3292         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3293                 /* active=0 means off, anything else means on */
3294                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3295                 int i;
3296
3297                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3298                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3299                                            "be (de)activated.\n",
3300                                            mti->mti_svname);
3301                         GOTO(end, rc = -EINVAL);
3302                 }
3303                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3304                               flag ? "de": "re", mti->mti_svname);
3305                 /* Modify clilov */
3306                 rc = name_create(&logname, mti->mti_fsname, "-client");
3307                 if (rc)
3308                         GOTO(end, rc);
3309                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3310                                 mti->mti_svname, "add osc", flag);
3311                 name_destroy(&logname);
3312                 if (rc)
3313                         goto active_err;
3314                 /* Modify mdtlov */
3315                 /* Add to all MDT logs for CMD */
3316                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3317                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3318                                 continue;
3319                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3320                         if (rc)
3321                                 GOTO(end, rc);
3322                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3323                                         mti->mti_svname, "add osc", flag);
3324                         name_destroy(&logname);
3325                         if (rc)
3326                                 goto active_err;
3327                 }
3328         active_err:
3329                 if (rc) {
3330                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3331                                            "log (%d). No permanent "
3332                                            "changes were made to the "
3333                                            "config log.\n",
3334                                            mti->mti_svname, rc);
3335                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3336                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3337                                                    " because the log"
3338                                                    "is in the old 1.4"
3339                                                    "style. Consider "
3340                                                    " --writeconf to "
3341                                                    "update the logs.\n");
3342                         GOTO(end, rc);
3343                 }
3344                 /* Fall through to osc proc for deactivating live OSC
3345                    on running MDT / clients. */
3346         }
3347         /* Below here, let obd's XXX_process_config methods handle it */
3348
3349         /* All lov. in proc */
3350         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3351                 char *mdtlovname;
3352
3353                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3354                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3355                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3356                                            "set on the MDT, not %s. "
3357                                            "Ignoring.\n",
3358                                            mti->mti_svname);
3359                         GOTO(end, rc = 0);
3360                 }
3361
3362                 /* Modify mdtlov */
3363                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3364                         GOTO(end, rc = -ENODEV);
3365
3366                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3367                                              mti->mti_stripe_index);
3368                 if (rc)
3369                         GOTO(end, rc);
3370                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3371                                   &mgi->mgi_bufs, mdtlovname, ptr);
3372                 name_destroy(&logname);
3373                 name_destroy(&mdtlovname);
3374                 if (rc)
3375                         GOTO(end, rc);
3376
3377                 /* Modify clilov */
3378                 rc = name_create(&logname, mti->mti_fsname, "-client");
3379                 if (rc)
3380                         GOTO(end, rc);
3381                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3382                                   fsdb->fsdb_clilov, ptr);
3383                 name_destroy(&logname);
3384                 GOTO(end, rc);
3385         }
3386
3387         /* All osc., mdc., llite. params in proc */
3388         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3389             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3390             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3391                 char *cname;
3392
3393                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3394                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3395                                            " cannot be modified. Consider"
3396                                            " updating the configuration with"
3397                                            " --writeconf\n",
3398                                            mti->mti_svname);
3399                         GOTO(end, rc = -EINVAL);
3400                 }
3401                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3402                         rc = name_create(&cname, mti->mti_fsname, "-client");
3403                         /* Add the client type to match the obdname in
3404                            class_config_llog_handler */
3405                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3406                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3407                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3408                         rc = name_create(&cname, mti->mti_svname, "-osc");
3409                 } else {
3410                         GOTO(end, rc = -EINVAL);
3411                 }
3412                 if (rc)
3413                         GOTO(end, rc);
3414
3415                 /* Forbid direct update of llite root squash parameters.
3416                  * These parameters are indirectly set via the MDT settings.
3417                  * See (LU-1778) */
3418                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3419                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3420                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3421                         LCONSOLE_ERROR("%s: root squash parameters can only "
3422                                 "be updated through MDT component\n",
3423                                 mti->mti_fsname);
3424                         name_destroy(&cname);
3425                         GOTO(end, rc = -EINVAL);
3426                 }
3427
3428                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3429
3430                 /* Modify client */
3431                 rc = name_create(&logname, mti->mti_fsname, "-client");
3432                 if (rc) {
3433                         name_destroy(&cname);
3434                         GOTO(end, rc);
3435                 }
3436                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3437                                   cname, ptr);
3438
3439                 /* osc params affect the MDT as well */
3440                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3441                         int i;
3442
3443                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3444                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3445                                         continue;
3446                                 name_destroy(&cname);
3447                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3448                                                          fsdb, i);
3449                                 name_destroy(&logname);
3450                                 if (rc)
3451                                         break;
3452                                 rc = name_create_mdt(&logname,
3453                                                      mti->mti_fsname, i);
3454                                 if (rc)
3455                                         break;
3456                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3457                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3458                                                           mti, logname,
3459                                                           &mgi->mgi_bufs,
3460                                                           cname, ptr);
3461                                         if (rc)
3462                                                 break;
3463                                 }
3464                         }
3465                 }
3466                 name_destroy(&logname);
3467                 name_destroy(&cname);
3468                 GOTO(end, rc);
3469         }
3470
3471         /* All mdt. params in proc */
3472         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3473                 int i;
3474                 __u32 idx;
3475
3476                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3477                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3478                             MTI_NAME_MAXLEN) == 0)
3479                         /* device is unspecified completely? */
3480                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3481                 else
3482                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3483                 if (rc < 0)
3484                         goto active_err;
3485                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3486                         goto active_err;
3487                 if (rc & LDD_F_SV_ALL) {
3488                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3489                                 if (!test_bit(i,
3490                                                   fsdb->fsdb_mdt_index_map))
3491                                         continue;
3492                                 rc = name_create_mdt(&logname,
3493                                                 mti->mti_fsname, i);
3494                                 if (rc)
3495                                         goto active_err;
3496                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3497                                                   logname, &mgi->mgi_bufs,
3498                                                   logname, ptr);
3499                                 name_destroy(&logname);
3500                                 if (rc)
3501                                         goto active_err;
3502                         }
3503                 } else {
3504                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3505                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3506                                 LCONSOLE_ERROR("%s: root squash parameters "
3507                                         "cannot be applied to a single MDT\n",
3508                                         mti->mti_fsname);
3509                                 GOTO(end, rc = -EINVAL);
3510                         }
3511                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3512                                           mti->mti_svname, &mgi->mgi_bufs,
3513                                           mti->mti_svname, ptr);
3514                         if (rc)
3515                                 goto active_err;
3516                 }
3517
3518                 /* root squash settings are also applied to llite
3519                  * config log (see LU-1778) */
3520                 if (rc == 0 &&
3521                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3522                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3523                         char *cname;
3524                         char *ptr2;
3525
3526                         rc = name_create(&cname, mti->mti_fsname, "-client");
3527                         if (rc)
3528                                 GOTO(end, rc);
3529                         rc = name_create(&logname, mti->mti_fsname, "-client");
3530                         if (rc) {
3531                                 name_destroy(&cname);
3532                                 GOTO(end, rc);
3533                         }
3534                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3535                         if (rc) {
3536                                 name_destroy(&cname);
3537                                 name_destroy(&logname);
3538                                 GOTO(end, rc);
3539                         }
3540                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3541                                           &mgi->mgi_bufs, cname, ptr2);
3542                         name_destroy(&ptr2);
3543                         name_destroy(&logname);
3544                         name_destroy(&cname);
3545                 }
3546                 GOTO(end, rc);
3547         }
3548
3549         /* All mdd., ost. and osd. params in proc */
3550         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3551             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3552             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3553                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3554                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3555                         GOTO(end, rc = -ENODEV);
3556
3557                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3558                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3559                 GOTO(end, rc);
3560         }
3561
3562         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3563         rc2 = -ENOSYS;
3564
3565 end:
3566         if (rc)
3567                 CERROR("err %d on param '%s'\n", rc, ptr);
3568
3569         RETURN(rc ?: rc2);
3570 }
3571
3572 /* Not implementing automatic failover nid addition at this time. */
3573 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3574                       struct mgs_target_info *mti)
3575 {
3576 #if 0
3577         struct fs_db *fsdb;
3578         int rc;
3579         ENTRY;
3580
3581         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3582         if (rc)
3583                 RETURN(rc);
3584
3585         if (mgs_log_is_empty(obd, mti->mti_svname))
3586                 /* should never happen */
3587                 RETURN(-ENOENT);
3588
3589         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3590
3591         /* FIXME We can just check mti->params to see if we're already in
3592            the failover list.  Modify mti->params for rewriting back at
3593            server_register_target(). */
3594
3595         mutex_lock(&fsdb->fsdb_mutex);
3596         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3597         mutex_unlock(&fsdb->fsdb_mutex);
3598         char    *buf, *params;
3599         int      rc = -EINVAL;
3600
3601         RETURN(rc);
3602 #endif
3603         return 0;
3604 }
3605
3606 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3607                          struct mgs_target_info *mti, struct fs_db *fsdb)
3608 {
3609         char    *buf, *params;
3610         int      rc = -EINVAL;
3611
3612         ENTRY;
3613
3614         /* set/check the new target index */
3615         rc = mgs_set_index(env, mgs, mti);
3616         if (rc < 0)
3617                 RETURN(rc);
3618
3619         if (rc == EALREADY) {
3620                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3621                               mti->mti_stripe_index, mti->mti_svname);
3622                 /* We would like to mark old log sections as invalid
3623                    and add new log sections in the client and mdt logs.
3624                    But if we add new sections, then live clients will
3625                    get repeat setup instructions for already running
3626                    osc's. So don't update the client/mdt logs. */
3627                 mti->mti_flags &= ~LDD_F_UPDATE;
3628                 rc = 0;
3629         }
3630
3631         mutex_lock(&fsdb->fsdb_mutex);
3632
3633         if (mti->mti_flags &
3634             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3635                 /* Generate a log from scratch */
3636                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3637                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3638                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3639                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3640                 } else {
3641                         CERROR("Unknown target type %#x, can't create log for "
3642                                "%s\n", mti->mti_flags, mti->mti_svname);
3643                 }
3644                 if (rc) {
3645                         CERROR("Can't write logs for %s (%d)\n",
3646                                mti->mti_svname, rc);
3647                         GOTO(out_up, rc);
3648                 }
3649         } else {
3650                 /* Just update the params from tunefs in mgs_write_log_params */
3651                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3652                 mti->mti_flags |= LDD_F_PARAM;
3653         }
3654
3655         /* allocate temporary buffer, where class_get_next_param will
3656            make copy of a current  parameter */
3657         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3658         if (buf == NULL)
3659                 GOTO(out_up, rc = -ENOMEM);
3660         params = mti->mti_params;
3661         while (params != NULL) {
3662                 rc = class_get_next_param(&params, buf);
3663                 if (rc) {
3664                         if (rc == 1)
3665                                 /* there is no next parameter, that is
3666                                    not an error */
3667                                 rc = 0;
3668                         break;
3669                 }
3670                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3671                        params, buf);
3672                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3673                 if (rc)
3674                         break;
3675         }
3676
3677         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3678
3679 out_up:
3680         mutex_unlock(&fsdb->fsdb_mutex);
3681         RETURN(rc);
3682 }
3683
3684 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3685 {
3686         struct llog_ctxt        *ctxt;
3687         int                      rc = 0;
3688
3689         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3690         if (ctxt == NULL) {
3691                 CERROR("%s: MGS config context doesn't exist\n",
3692                        mgs->mgs_obd->obd_name);
3693                 rc = -ENODEV;
3694         } else {
3695                 rc = llog_erase(env, ctxt, NULL, name);
3696                 /* llog may not exist */
3697                 if (rc == -ENOENT)
3698                         rc = 0;
3699                 llog_ctxt_put(ctxt);
3700         }
3701
3702         if (rc)
3703                 CERROR("%s: failed to clear log %s: %d\n",
3704                        mgs->mgs_obd->obd_name, name, rc);
3705
3706         return rc;
3707 }
3708
3709 /* erase all logs for the given fs */
3710 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3711 {
3712         struct fs_db *fsdb;
3713         struct list_head log_list;
3714         struct mgs_direntry *dirent, *n;
3715         int rc, len = strlen(fsname);
3716         char *suffix;
3717         ENTRY;
3718
3719         /* Find all the logs in the CONFIGS directory */
3720         rc = class_dentry_readdir(env, mgs, &log_list);
3721         if (rc)
3722                 RETURN(rc);
3723
3724         mutex_lock(&mgs->mgs_mutex);
3725
3726         /* Delete the fs db */
3727         fsdb = mgs_find_fsdb(mgs, fsname);
3728         if (fsdb)
3729                 mgs_free_fsdb(mgs, fsdb);
3730
3731         mutex_unlock(&mgs->mgs_mutex);
3732
3733         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3734                 list_del_init(&dirent->mde_list);
3735                 suffix = strrchr(dirent->mde_name, '-');
3736                 if (suffix != NULL) {
3737                         if ((len == suffix - dirent->mde_name) &&
3738                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3739                                 CDEBUG(D_MGS, "Removing log %s\n",
3740                                        dirent->mde_name);
3741                                 mgs_erase_log(env, mgs, dirent->mde_name);
3742                         }
3743                 }
3744                 mgs_direntry_free(dirent);
3745         }
3746
3747         RETURN(rc);
3748 }
3749
3750 /* list all logs for the given fs */
3751 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3752                   struct obd_ioctl_data *data)
3753 {
3754         struct list_head         log_list;
3755         struct mgs_direntry     *dirent, *n;
3756         char                    *out, *suffix;
3757         int                      l, remains, rc;
3758
3759         ENTRY;
3760
3761         /* Find all the logs in the CONFIGS directory */
3762         rc = class_dentry_readdir(env, mgs, &log_list);
3763         if (rc)
3764                 RETURN(rc);
3765
3766         out = data->ioc_bulk;
3767         remains = data->ioc_inllen1;
3768         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3769                 list_del_init(&dirent->mde_list);
3770                 suffix = strrchr(dirent->mde_name, '-');
3771                 if (suffix != NULL) {
3772                         l = snprintf(out, remains, "config log: $%s\n",
3773                                      dirent->mde_name);
3774                         out += l;
3775                         remains -= l;
3776                 }
3777                 mgs_direntry_free(dirent);
3778                 if (remains <= 0)
3779                         break;
3780         }
3781         RETURN(rc);
3782 }
3783
3784 /* from llog_swab */
3785 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3786 {
3787         int i;
3788         ENTRY;
3789
3790         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3791         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3792
3793         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3794         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3795         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3796         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3797
3798         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3799         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3800                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3801                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3802                                i, lcfg->lcfg_buflens[i],
3803                                lustre_cfg_string(lcfg, i));
3804                 }
3805         EXIT;
3806 }
3807
3808 /* Setup params fsdb and log
3809  */
3810 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3811                           struct fs_db *fsdb)
3812 {
3813         struct llog_handle      *params_llh = NULL;
3814         int                     rc;
3815         ENTRY;
3816
3817         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3818         if (fsdb != NULL) {
3819                 mutex_lock(&fsdb->fsdb_mutex);
3820                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3821                 if (rc == 0)
3822                         rc = record_end_log(env, &params_llh);
3823                 mutex_unlock(&fsdb->fsdb_mutex);
3824         }
3825
3826         RETURN(rc);
3827 }
3828
3829 /* Cleanup params fsdb and log
3830  */
3831 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3832 {
3833         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3834 }
3835
3836 /* Set a permanent (config log) param for a target or fs
3837  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3838  *             buf1 contains the single parameter
3839  */
3840 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3841                  struct lustre_cfg *lcfg, char *fsname)
3842 {
3843         struct fs_db *fsdb;
3844         struct mgs_target_info *mti;
3845         char *devname, *param;
3846         char *ptr;
3847         const char *tmp;
3848         __u32 index;
3849         int rc = 0;
3850         ENTRY;
3851
3852         print_lustre_cfg(lcfg);
3853
3854         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3855         devname = lustre_cfg_string(lcfg, 0);
3856         param = lustre_cfg_string(lcfg, 1);
3857         if (!devname) {
3858                 /* Assume device name embedded in param:
3859                    lustre-OST0000.osc.max_dirty_mb=32 */
3860                 ptr = strchr(param, '.');
3861                 if (ptr) {
3862                         devname = param;
3863                         *ptr = 0;
3864                         param = ptr + 1;
3865                 }
3866         }
3867         if (!devname) {
3868                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3869                 RETURN(-ENOSYS);
3870         }
3871
3872         rc = mgs_parse_devname(devname, fsname, NULL);
3873         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3874                 /* param related to llite isn't allowed to set by OST or MDT */
3875                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3876                                        sizeof(PARAM_LLITE) - 1) == 0)
3877                         RETURN(-EINVAL);
3878         } else {
3879                 /* assume devname is the fsname */
3880                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
3881         }
3882         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3883
3884         rc = mgs_find_or_make_fsdb(env, mgs,
3885                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3886                                    PARAMS_FILENAME : fsname, &fsdb);
3887         if (rc)
3888                 RETURN(rc);
3889
3890         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3891             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3892             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3893                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3894                        "is '%s'\n", fsname, devname);
3895                 mgs_free_fsdb(mgs, fsdb);
3896                 RETURN(-EINVAL);
3897         }
3898
3899         /* Create a fake mti to hold everything */
3900         OBD_ALLOC_PTR(mti);
3901         if (!mti)
3902                 GOTO(out, rc = -ENOMEM);
3903         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3904             >= sizeof(mti->mti_fsname))
3905                 GOTO(out, rc = -E2BIG);
3906         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3907             >= sizeof(mti->mti_svname))
3908                 GOTO(out, rc = -E2BIG);
3909         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3910             >= sizeof(mti->mti_params))
3911                 GOTO(out, rc = -E2BIG);
3912         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3913         if (rc < 0)
3914                 /* Not a valid server; may be only fsname */
3915                 rc = 0;
3916         else
3917                 /* Strip -osc or -mdc suffix from svname */
3918                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3919                                      mti->mti_svname))
3920                         GOTO(out, rc = -EINVAL);
3921         /*
3922          * Revoke lock so everyone updates.  Should be alright if
3923          * someone was already reading while we were updating the logs,
3924          * so we don't really need to hold the lock while we're
3925          * writing (above).
3926          */
3927         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3928                 mti->mti_flags = rc | LDD_F_PARAM2;
3929                 mutex_lock(&fsdb->fsdb_mutex);
3930                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3931                 mutex_unlock(&fsdb->fsdb_mutex);
3932                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3933         } else {
3934                 mti->mti_flags = rc | LDD_F_PARAM;
3935                 mutex_lock(&fsdb->fsdb_mutex);
3936                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3937                 mutex_unlock(&fsdb->fsdb_mutex);
3938                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3939         }
3940
3941 out:
3942         OBD_FREE_PTR(mti);
3943         RETURN(rc);
3944 }
3945
3946 static int mgs_write_log_pool(const struct lu_env *env,
3947                               struct mgs_device *mgs, char *logname,
3948                               struct fs_db *fsdb, char *tgtname,
3949                               enum lcfg_command_type cmd,
3950                               char *fsname, char *poolname,
3951                               char *ostname, char *comment)
3952 {
3953         struct llog_handle *llh = NULL;
3954         int rc;
3955
3956         rc = record_start_log(env, mgs, &llh, logname);
3957         if (rc)
3958                 return rc;
3959         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3960         if (rc)
3961                 goto out;
3962         rc = record_base(env, llh, tgtname, 0, cmd,
3963                          fsname, poolname, ostname, 0);
3964         if (rc)
3965                 goto out;
3966         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3967 out:
3968         record_end_log(env, &llh);
3969         return rc;
3970 }
3971
3972 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
3973                     enum lcfg_command_type cmd, const char *nodemap_name,
3974                     const char *param)
3975 {
3976         lnet_nid_t      nid[2];
3977         __u32           idmap[2];
3978         bool            bool_switch;
3979         __u32           int_id;
3980         int             rc = 0;
3981         ENTRY;
3982
3983         switch (cmd) {
3984         case LCFG_NODEMAP_ADD:
3985                 rc = nodemap_add(nodemap_name);
3986                 break;
3987         case LCFG_NODEMAP_DEL:
3988                 rc = nodemap_del(nodemap_name);
3989                 break;
3990         case LCFG_NODEMAP_ADD_RANGE:
3991                 rc = nodemap_parse_range(param, nid);
3992                 if (rc != 0)
3993                         break;
3994                 rc = nodemap_add_range(nodemap_name, nid);
3995                 break;
3996         case LCFG_NODEMAP_DEL_RANGE:
3997                 rc = nodemap_parse_range(param, nid);
3998                 if (rc != 0)
3999                         break;
4000                 rc = nodemap_del_range(nodemap_name, nid);
4001                 break;
4002         case LCFG_NODEMAP_ADMIN:
4003                 bool_switch = simple_strtoul(param, NULL, 10);
4004                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4005                 break;
4006         case LCFG_NODEMAP_TRUSTED:
4007                 bool_switch = simple_strtoul(param, NULL, 10);
4008                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4009                 break;
4010         case LCFG_NODEMAP_SQUASH_UID:
4011                 int_id = simple_strtoul(param, NULL, 10);
4012                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4013                 break;
4014         case LCFG_NODEMAP_SQUASH_GID:
4015                 int_id = simple_strtoul(param, NULL, 10);
4016                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4017                 break;
4018         case LCFG_NODEMAP_ADD_UIDMAP:
4019         case LCFG_NODEMAP_ADD_GIDMAP:
4020                 rc = nodemap_parse_idmap(param, idmap);
4021                 if (rc != 0)
4022                         break;
4023                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4024                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4025                                                idmap);
4026                 else
4027                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4028                                                idmap);
4029                 break;
4030         case LCFG_NODEMAP_DEL_UIDMAP:
4031         case LCFG_NODEMAP_DEL_GIDMAP:
4032                 rc = nodemap_parse_idmap(param, idmap);
4033                 if (rc != 0)
4034                         break;
4035                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4036                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4037                                                idmap);
4038                 else
4039                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4040                                                idmap);
4041                 break;
4042         default:
4043                 rc = -EINVAL;
4044         }
4045
4046         RETURN(rc);
4047 }
4048
4049 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4050                  enum lcfg_command_type cmd, char *fsname,
4051                  char *poolname, char *ostname)
4052 {
4053         struct fs_db *fsdb;
4054         char *lovname;
4055         char *logname;
4056         char *label = NULL, *canceled_label = NULL;
4057         int label_sz;
4058         struct mgs_target_info *mti = NULL;
4059         int rc, i;
4060         ENTRY;
4061
4062         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4063         if (rc) {
4064                 CERROR("Can't get db for %s\n", fsname);
4065                 RETURN(rc);
4066         }
4067         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4068                 CERROR("%s is not defined\n", fsname);
4069                 mgs_free_fsdb(mgs, fsdb);
4070                 RETURN(-EINVAL);
4071         }
4072
4073         label_sz = 10 + strlen(fsname) + strlen(poolname);
4074
4075         /* check if ostname match fsname */
4076         if (ostname != NULL) {
4077                 char *ptr;
4078
4079                 ptr = strrchr(ostname, '-');
4080                 if ((ptr == NULL) ||
4081                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4082                         RETURN(-EINVAL);
4083                 label_sz += strlen(ostname);
4084         }
4085
4086         OBD_ALLOC(label, label_sz);
4087         if (label == NULL)
4088                 RETURN(-ENOMEM);
4089
4090         switch(cmd) {
4091         case LCFG_POOL_NEW:
4092                 sprintf(label,
4093                         "new %s.%s", fsname, poolname);
4094                 break;
4095         case LCFG_POOL_ADD:
4096                 sprintf(label,
4097                         "add %s.%s.%s", fsname, poolname, ostname);
4098                 break;
4099         case LCFG_POOL_REM:
4100                 OBD_ALLOC(canceled_label, label_sz);
4101                 if (canceled_label == NULL)
4102                         GOTO(out_label, rc = -ENOMEM);
4103                 sprintf(label,
4104                         "rem %s.%s.%s", fsname, poolname, ostname);
4105                 sprintf(canceled_label,
4106                         "add %s.%s.%s", fsname, poolname, ostname);
4107                 break;
4108         case LCFG_POOL_DEL:
4109                 OBD_ALLOC(canceled_label, label_sz);
4110                 if (canceled_label == NULL)
4111                         GOTO(out_label, rc = -ENOMEM);
4112                 sprintf(label,
4113                         "del %s.%s", fsname, poolname);
4114                 sprintf(canceled_label,
4115                         "new %s.%s", fsname, poolname);
4116                 break;
4117         default:
4118                 break;
4119         }
4120
4121         if (canceled_label != NULL) {
4122                 OBD_ALLOC_PTR(mti);
4123                 if (mti == NULL)
4124                         GOTO(out_cancel, rc = -ENOMEM);
4125         }
4126
4127         mutex_lock(&fsdb->fsdb_mutex);
4128         /* write pool def to all MDT logs */
4129         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4130                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4131                         rc = name_create_mdt_and_lov(&logname, &lovname,
4132                                                      fsdb, i);
4133                         if (rc) {
4134                                 mutex_unlock(&fsdb->fsdb_mutex);
4135                                 GOTO(out_mti, rc);
4136                         }
4137                         if (canceled_label != NULL) {
4138                                 strcpy(mti->mti_svname, "lov pool");
4139                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4140                                                 lovname, canceled_label,
4141                                                 CM_SKIP);
4142                         }
4143
4144                         if (rc >= 0)
4145                                 rc = mgs_write_log_pool(env, mgs, logname,
4146                                                         fsdb, lovname, cmd,
4147                                                         fsname, poolname,
4148                                                         ostname, label);
4149                         name_destroy(&logname);
4150                         name_destroy(&lovname);
4151                         if (rc) {
4152                                 mutex_unlock(&fsdb->fsdb_mutex);
4153                                 GOTO(out_mti, rc);
4154                         }
4155                 }
4156         }
4157
4158         rc = name_create(&logname, fsname, "-client");
4159         if (rc) {
4160                 mutex_unlock(&fsdb->fsdb_mutex);
4161                 GOTO(out_mti, rc);
4162         }
4163         if (canceled_label != NULL) {
4164                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4165                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4166                 if (rc < 0) {
4167                         mutex_unlock(&fsdb->fsdb_mutex);
4168                         name_destroy(&logname);
4169                         GOTO(out_mti, rc);
4170                 }
4171         }
4172
4173         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4174                                 cmd, fsname, poolname, ostname, label);
4175         mutex_unlock(&fsdb->fsdb_mutex);
4176         name_destroy(&logname);
4177         /* request for update */
4178         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4179
4180         EXIT;
4181 out_mti:
4182         if (mti != NULL)
4183                 OBD_FREE_PTR(mti);
4184 out_cancel:
4185         if (canceled_label != NULL)
4186                 OBD_FREE(canceled_label, label_sz);
4187 out_label:
4188         OBD_FREE(label, label_sz);
4189         return rc;
4190 }