Whamcloud - gitweb
LU-2445 lfs: fixed support for lfs migrate -b
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(NULL);
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(NULL);
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351         } else {
352                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
353                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
354                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
355                         CERROR("No memory for index maps\n");
356                         GOTO(err, rc = -ENOMEM);
357                 }
358
359                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
360                 if (rc)
361                         GOTO(err, rc);
362                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
363                 if (rc)
364                         GOTO(err, rc);
365
366                 /* initialise data for NID table */
367                 mgs_ir_init_fs(env, mgs, fsdb);
368
369                 lproc_mgs_add_live(mgs, fsdb);
370         }
371
372         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
373
374         RETURN(fsdb);
375 err:
376         if (fsdb->fsdb_ost_index_map)
377                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
378         if (fsdb->fsdb_mdt_index_map)
379                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
380         name_destroy(&fsdb->fsdb_clilov);
381         name_destroy(&fsdb->fsdb_clilmv);
382         OBD_FREE_PTR(fsdb);
383         RETURN(NULL);
384 }
385
386 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
387 {
388         /* wait for anyone with the sem */
389         mutex_lock(&fsdb->fsdb_mutex);
390         lproc_mgs_del_live(mgs, fsdb);
391         list_del(&fsdb->fsdb_list);
392
393         /* deinitialize fsr */
394         mgs_ir_fini_fs(mgs, fsdb);
395
396         if (fsdb->fsdb_ost_index_map)
397                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
398         if (fsdb->fsdb_mdt_index_map)
399                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
400         name_destroy(&fsdb->fsdb_clilov);
401         name_destroy(&fsdb->fsdb_clilmv);
402         mgs_free_fsdb_srpc(fsdb);
403         mutex_unlock(&fsdb->fsdb_mutex);
404         OBD_FREE_PTR(fsdb);
405 }
406
407 int mgs_init_fsdb_list(struct mgs_device *mgs)
408 {
409         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
410         return 0;
411 }
412
413 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
414 {
415         struct fs_db *fsdb;
416         struct list_head *tmp, *tmp2;
417
418         mutex_lock(&mgs->mgs_mutex);
419         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
420                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
421                 mgs_free_fsdb(mgs, fsdb);
422         }
423         mutex_unlock(&mgs->mgs_mutex);
424         return 0;
425 }
426
427 int mgs_find_or_make_fsdb(const struct lu_env *env,
428                           struct mgs_device *mgs, char *name,
429                           struct fs_db **dbh)
430 {
431         struct fs_db *fsdb;
432         int rc = 0;
433
434         ENTRY;
435         mutex_lock(&mgs->mgs_mutex);
436         fsdb = mgs_find_fsdb(mgs, name);
437         if (fsdb) {
438                 mutex_unlock(&mgs->mgs_mutex);
439                 *dbh = fsdb;
440                 RETURN(0);
441         }
442
443         CDEBUG(D_MGS, "Creating new db\n");
444         fsdb = mgs_new_fsdb(env, mgs, name);
445         /* lock fsdb_mutex until the db is loaded from llogs */
446         if (fsdb)
447                 mutex_lock(&fsdb->fsdb_mutex);
448         mutex_unlock(&mgs->mgs_mutex);
449         if (!fsdb)
450                 RETURN(-ENOMEM);
451
452         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
453                 /* populate the db from the client llog */
454                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
455                 if (rc) {
456                         CERROR("Can't get db from client log %d\n", rc);
457                         GOTO(out_free, rc);
458                 }
459         }
460
461         /* populate srpc rules from params llog */
462         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
463         if (rc) {
464                 CERROR("Can't get db from params log %d\n", rc);
465                 GOTO(out_free, rc);
466         }
467
468         mutex_unlock(&fsdb->fsdb_mutex);
469         *dbh = fsdb;
470
471         RETURN(0);
472
473 out_free:
474         mutex_unlock(&fsdb->fsdb_mutex);
475         mgs_free_fsdb(mgs, fsdb);
476         return rc;
477 }
478
479 /* 1 = index in use
480    0 = index unused
481    -1= empty client log */
482 int mgs_check_index(const struct lu_env *env,
483                     struct mgs_device *mgs,
484                     struct mgs_target_info *mti)
485 {
486         struct fs_db *fsdb;
487         void *imap;
488         int rc = 0;
489         ENTRY;
490
491         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
492
493         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
494         if (rc) {
495                 CERROR("Can't get db for %s\n", mti->mti_fsname);
496                 RETURN(rc);
497         }
498
499         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
500                 RETURN(-1);
501
502         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
503                 imap = fsdb->fsdb_ost_index_map;
504         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
505                 imap = fsdb->fsdb_mdt_index_map;
506         else
507                 RETURN(-EINVAL);
508
509         if (test_bit(mti->mti_stripe_index, imap))
510                 RETURN(1);
511         RETURN(0);
512 }
513
514 static __inline__ int next_index(void *index_map, int map_len)
515 {
516         int i;
517         for (i = 0; i < map_len * 8; i++)
518                 if (!test_bit(i, index_map)) {
519                          return i;
520                  }
521         CERROR("max index %d exceeded.\n", i);
522         return -1;
523 }
524
525 /* Return codes:
526         0  newly marked as in use
527         <0 err
528         +EALREADY for update of an old index */
529 static int mgs_set_index(const struct lu_env *env,
530                          struct mgs_device *mgs,
531                          struct mgs_target_info *mti)
532 {
533         struct fs_db *fsdb;
534         void *imap;
535         int rc = 0;
536         ENTRY;
537
538         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
539         if (rc) {
540                 CERROR("Can't get db for %s\n", mti->mti_fsname);
541                 RETURN(rc);
542         }
543
544         mutex_lock(&fsdb->fsdb_mutex);
545         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
546                 imap = fsdb->fsdb_ost_index_map;
547         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
548                 imap = fsdb->fsdb_mdt_index_map;
549         } else {
550                 GOTO(out_up, rc = -EINVAL);
551         }
552
553         if (mti->mti_flags & LDD_F_NEED_INDEX) {
554                 rc = next_index(imap, INDEX_MAP_SIZE);
555                 if (rc == -1)
556                         GOTO(out_up, rc = -ERANGE);
557                 mti->mti_stripe_index = rc;
558                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
559                         fsdb->fsdb_mdt_count ++;
560         }
561
562         /* the last index(0xffff) is reserved for default value. */
563         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
564                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
565                                    "but index must be less than %u.\n",
566                                    mti->mti_svname, mti->mti_stripe_index,
567                                    INDEX_MAP_SIZE * 8 - 1);
568                 GOTO(out_up, rc = -ERANGE);
569         }
570
571         if (test_bit(mti->mti_stripe_index, imap)) {
572                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
573                     !(mti->mti_flags & LDD_F_WRITECONF)) {
574                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
575                                            "%d, but that index is already in "
576                                            "use. Use --writeconf to force\n",
577                                            mti->mti_svname,
578                                            mti->mti_stripe_index);
579                         GOTO(out_up, rc = -EADDRINUSE);
580                 } else {
581                         CDEBUG(D_MGS, "Server %s updating index %d\n",
582                                mti->mti_svname, mti->mti_stripe_index);
583                         GOTO(out_up, rc = EALREADY);
584                 }
585         }
586
587         set_bit(mti->mti_stripe_index, imap);
588         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
589         mutex_unlock(&fsdb->fsdb_mutex);
590         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
591                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
592
593         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
594                mti->mti_stripe_index);
595
596         RETURN(0);
597 out_up:
598         mutex_unlock(&fsdb->fsdb_mutex);
599         return rc;
600 }
601
602 struct mgs_modify_lookup {
603         struct cfg_marker mml_marker;
604         int               mml_modified;
605 };
606
607 static int mgs_modify_handler(const struct lu_env *env,
608                               struct llog_handle *llh,
609                               struct llog_rec_hdr *rec, void *data)
610 {
611         struct mgs_modify_lookup *mml = data;
612         struct cfg_marker *marker;
613         struct lustre_cfg *lcfg = REC_DATA(rec);
614         int cfg_len = REC_DATA_LEN(rec);
615         int rc;
616         ENTRY;
617
618         if (rec->lrh_type != OBD_CFG_REC) {
619                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
620                 RETURN(-EINVAL);
621         }
622
623         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
624         if (rc) {
625                 CERROR("Insane cfg\n");
626                 RETURN(rc);
627         }
628
629         /* We only care about markers */
630         if (lcfg->lcfg_command != LCFG_MARKER)
631                 RETURN(0);
632
633         marker = lustre_cfg_buf(lcfg, 1);
634         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
635             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
636             !(marker->cm_flags & CM_SKIP)) {
637                 /* Found a non-skipped marker match */
638                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
639                        rec->lrh_index, marker->cm_step,
640                        marker->cm_flags, mml->mml_marker.cm_flags,
641                        marker->cm_tgtname, marker->cm_comment);
642                 /* Overwrite the old marker llog entry */
643                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
644                 marker->cm_flags |= mml->mml_marker.cm_flags;
645                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
646                 rc = llog_write(env, llh, rec, rec->lrh_index);
647                 if (!rc)
648                          mml->mml_modified++;
649         }
650
651         RETURN(rc);
652 }
653
654 /**
655  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
656  * Return code:
657  * 0 - modified successfully,
658  * 1 - no modification was done
659  * negative - error
660  */
661 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
662                       struct fs_db *fsdb, struct mgs_target_info *mti,
663                       char *logname, char *devname, char *comment, int flags)
664 {
665         struct llog_handle *loghandle;
666         struct llog_ctxt *ctxt;
667         struct mgs_modify_lookup *mml;
668         int rc;
669
670         ENTRY;
671
672         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
673         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
674                flags);
675
676         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
677         LASSERT(ctxt != NULL);
678         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
679         if (rc < 0) {
680                 if (rc == -ENOENT)
681                         rc = 0;
682                 GOTO(out_pop, rc);
683         }
684
685         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
686         if (rc)
687                 GOTO(out_close, rc);
688
689         if (llog_get_size(loghandle) <= 1)
690                 GOTO(out_close, rc = 0);
691
692         OBD_ALLOC_PTR(mml);
693         if (!mml)
694                 GOTO(out_close, rc = -ENOMEM);
695         if (strlcpy(mml->mml_marker.cm_comment, comment,
696                     sizeof(mml->mml_marker.cm_comment)) >=
697             sizeof(mml->mml_marker.cm_comment))
698                 GOTO(out_free, rc = -E2BIG);
699         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
700                     sizeof(mml->mml_marker.cm_tgtname)) >=
701             sizeof(mml->mml_marker.cm_tgtname))
702                 GOTO(out_free, rc = -E2BIG);
703         /* Modify mostly means cancel */
704         mml->mml_marker.cm_flags = flags;
705         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
706         mml->mml_modified = 0;
707         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
708                           NULL);
709         if (!rc && !mml->mml_modified)
710                 rc = 1;
711
712 out_free:
713         OBD_FREE_PTR(mml);
714
715 out_close:
716         llog_close(env, loghandle);
717 out_pop:
718         if (rc < 0)
719                 CERROR("%s: modify %s/%s failed: rc = %d\n",
720                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
721         llog_ctxt_put(ctxt);
722         RETURN(rc);
723 }
724
725 /** This structure is passed to mgs_replace_handler */
726 struct mgs_replace_uuid_lookup {
727         /* Nids are replaced for this target device */
728         struct mgs_target_info target;
729         /* Temporary modified llog */
730         struct llog_handle *temp_llh;
731         /* Flag is set if in target block*/
732         int in_target_device;
733         /* Nids already added. Just skip (multiple nids) */
734         int device_nids_added;
735         /* Flag is set if this block should not be copied */
736         int skip_it;
737 };
738
739 /**
740  * Check: a) if block should be skipped
741  * b) is it target block
742  *
743  * \param[in] lcfg
744  * \param[in] mrul
745  *
746  * \retval 0 should not to be skipped
747  * \retval 1 should to be skipped
748  */
749 static int check_markers(struct lustre_cfg *lcfg,
750                          struct mgs_replace_uuid_lookup *mrul)
751 {
752          struct cfg_marker *marker;
753
754         /* Track markers. Find given device */
755         if (lcfg->lcfg_command == LCFG_MARKER) {
756                 marker = lustre_cfg_buf(lcfg, 1);
757                 /* Clean llog from records marked as CM_EXCLUDE.
758                    CM_SKIP records are used for "active" command
759                    and can be restored if needed */
760                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
761                     (CM_EXCLUDE | CM_START)) {
762                         mrul->skip_it = 1;
763                         return 1;
764                 }
765
766                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
767                     (CM_EXCLUDE | CM_END)) {
768                         mrul->skip_it = 0;
769                         return 1;
770                 }
771
772                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
773                         LASSERT(!(marker->cm_flags & CM_START) ||
774                                 !(marker->cm_flags & CM_END));
775                         if (marker->cm_flags & CM_START) {
776                                 mrul->in_target_device = 1;
777                                 mrul->device_nids_added = 0;
778                         } else if (marker->cm_flags & CM_END)
779                                 mrul->in_target_device = 0;
780                 }
781         }
782
783         return 0;
784 }
785
786 static int record_base(const struct lu_env *env, struct llog_handle *llh,
787                      char *cfgname, lnet_nid_t nid, int cmd,
788                      char *s1, char *s2, char *s3, char *s4)
789 {
790         struct mgs_thread_info  *mgi = mgs_env_info(env);
791         struct llog_cfg_rec     *lcr;
792         int rc;
793
794         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
795                cmd, s1, s2, s3, s4);
796
797         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
798         if (s1)
799                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
800         if (s2)
801                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
802         if (s3)
803                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
804         if (s4)
805                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
806
807         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
808         if (lcr == NULL)
809                 return -ENOMEM;
810
811         lcr->lcr_cfg.lcfg_nid = nid;
812         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
813
814         lustre_cfg_rec_free(lcr);
815
816         if (rc < 0)
817                 CDEBUG(D_MGS,
818                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
819                        cfgname, cmd, s1, s2, s3, s4, rc);
820         return rc;
821 }
822
823 static inline int record_add_uuid(const struct lu_env *env,
824                                   struct llog_handle *llh,
825                                   uint64_t nid, char *uuid)
826 {
827         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
828                            NULL, NULL, NULL);
829 }
830
831 static inline int record_add_conn(const struct lu_env *env,
832                                   struct llog_handle *llh,
833                                   char *devname, char *uuid)
834 {
835         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
836                            NULL, NULL, NULL);
837 }
838
839 static inline int record_attach(const struct lu_env *env,
840                                 struct llog_handle *llh, char *devname,
841                                 char *type, char *uuid)
842 {
843         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
844                            NULL, NULL);
845 }
846
847 static inline int record_setup(const struct lu_env *env,
848                                struct llog_handle *llh, char *devname,
849                                char *s1, char *s2, char *s3, char *s4)
850 {
851         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
852 }
853
854 /**
855  * \retval <0 record processing error
856  * \retval n record is processed. No need copy original one.
857  * \retval 0 record is not processed.
858  */
859 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
860                            struct mgs_replace_uuid_lookup *mrul)
861 {
862         int nids_added = 0;
863         lnet_nid_t nid;
864         char *ptr;
865         int rc;
866
867         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
868                 /* LCFG_ADD_UUID command found. Let's skip original command
869                    and add passed nids */
870                 ptr = mrul->target.mti_params;
871                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
872                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
873                                "device %s\n", libcfs_nid2str(nid),
874                                 mrul->target.mti_params,
875                                 mrul->target.mti_svname);
876                         rc = record_add_uuid(env,
877                                              mrul->temp_llh, nid,
878                                              mrul->target.mti_params);
879                         if (!rc)
880                                 nids_added++;
881                 }
882
883                 if (nids_added == 0) {
884                         CERROR("No new nids were added, nid %s with uuid %s, "
885                                "device %s\n", libcfs_nid2str(nid),
886                                mrul->target.mti_params,
887                                mrul->target.mti_svname);
888                         RETURN(-ENXIO);
889                 } else {
890                         mrul->device_nids_added = 1;
891                 }
892
893                 return nids_added;
894         }
895
896         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
897                 /* LCFG_SETUP command found. UUID should be changed */
898                 rc = record_setup(env,
899                                   mrul->temp_llh,
900                                   /* devname the same */
901                                   lustre_cfg_string(lcfg, 0),
902                                   /* s1 is not changed */
903                                   lustre_cfg_string(lcfg, 1),
904                                   /* new uuid should be
905                                   the full nidlist */
906                                   mrul->target.mti_params,
907                                   /* s3 is not changed */
908                                   lustre_cfg_string(lcfg, 3),
909                                   /* s4 is not changed */
910                                   lustre_cfg_string(lcfg, 4));
911                 return rc ? rc : 1;
912         }
913
914         /* Another commands in target device block */
915         return 0;
916 }
917
918 /**
919  * Handler that called for every record in llog.
920  * Records are processed in order they placed in llog.
921  *
922  * \param[in] llh       log to be processed
923  * \param[in] rec       current record
924  * \param[in] data      mgs_replace_uuid_lookup structure
925  *
926  * \retval 0    success
927  */
928 static int mgs_replace_handler(const struct lu_env *env,
929                                struct llog_handle *llh,
930                                struct llog_rec_hdr *rec,
931                                void *data)
932 {
933         struct mgs_replace_uuid_lookup *mrul;
934         struct lustre_cfg *lcfg = REC_DATA(rec);
935         int cfg_len = REC_DATA_LEN(rec);
936         int rc;
937         ENTRY;
938
939         mrul = (struct mgs_replace_uuid_lookup *)data;
940
941         if (rec->lrh_type != OBD_CFG_REC) {
942                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
943                        rec->lrh_type, lcfg->lcfg_command,
944                        lustre_cfg_string(lcfg, 0),
945                        lustre_cfg_string(lcfg, 1));
946                 RETURN(-EINVAL);
947         }
948
949         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
950         if (rc) {
951                 /* Do not copy any invalidated records */
952                 GOTO(skip_out, rc = 0);
953         }
954
955         rc = check_markers(lcfg, mrul);
956         if (rc || mrul->skip_it)
957                 GOTO(skip_out, rc = 0);
958
959         /* Write to new log all commands outside target device block */
960         if (!mrul->in_target_device)
961                 GOTO(copy_out, rc = 0);
962
963         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
964            (failover nids) for this target, assuming that if then
965            primary is changing then so is the failover */
966         if (mrul->device_nids_added &&
967             (lcfg->lcfg_command == LCFG_ADD_UUID ||
968              lcfg->lcfg_command == LCFG_ADD_CONN))
969                 GOTO(skip_out, rc = 0);
970
971         rc = process_command(env, lcfg, mrul);
972         if (rc < 0)
973                 RETURN(rc);
974
975         if (rc)
976                 RETURN(0);
977 copy_out:
978         /* Record is placed in temporary llog as is */
979         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
980
981         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
982                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
983                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
984         RETURN(rc);
985
986 skip_out:
987         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
988                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
989                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
990         RETURN(rc);
991 }
992
993 static int mgs_log_is_empty(const struct lu_env *env,
994                             struct mgs_device *mgs, char *name)
995 {
996         struct llog_ctxt        *ctxt;
997         int                      rc;
998
999         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1000         LASSERT(ctxt != NULL);
1001
1002         rc = llog_is_empty(env, ctxt, name);
1003         llog_ctxt_put(ctxt);
1004         return rc;
1005 }
1006
1007 static int mgs_replace_nids_log(const struct lu_env *env,
1008                                 struct obd_device *mgs, struct fs_db *fsdb,
1009                                 char *logname, char *devname, char *nids)
1010 {
1011         struct llog_handle *orig_llh, *backup_llh;
1012         struct llog_ctxt *ctxt;
1013         struct mgs_replace_uuid_lookup *mrul;
1014         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1015         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1016         char *backup;
1017         int rc, rc2;
1018         ENTRY;
1019
1020         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1021
1022         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1023         LASSERT(ctxt != NULL);
1024
1025         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1026                 /* Log is empty. Nothing to replace */
1027                 GOTO(out_put, rc = 0);
1028         }
1029
1030         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1031         if (backup == NULL)
1032                 GOTO(out_put, rc = -ENOMEM);
1033
1034         sprintf(backup, "%s.bak", logname);
1035
1036         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1037         if (rc == 0) {
1038                 /* Now erase original log file. Connections are not allowed.
1039                    Backup is already saved */
1040                 rc = llog_erase(env, ctxt, NULL, logname);
1041                 if (rc < 0)
1042                         GOTO(out_free, rc);
1043         } else if (rc != -ENOENT) {
1044                 CERROR("%s: can't make backup for %s: rc = %d\n",
1045                        mgs->obd_name, logname, rc);
1046                 GOTO(out_free,rc);
1047         }
1048
1049         /* open local log */
1050         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1051         if (rc)
1052                 GOTO(out_restore, rc);
1053
1054         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1055         if (rc)
1056                 GOTO(out_closel, rc);
1057
1058         /* open backup llog */
1059         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1060                        LLOG_OPEN_EXISTS);
1061         if (rc)
1062                 GOTO(out_closel, rc);
1063
1064         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1065         if (rc)
1066                 GOTO(out_close, rc);
1067
1068         if (llog_get_size(backup_llh) <= 1)
1069                 GOTO(out_close, rc = 0);
1070
1071         OBD_ALLOC_PTR(mrul);
1072         if (!mrul)
1073                 GOTO(out_close, rc = -ENOMEM);
1074         /* devname is only needed information to replace UUID records */
1075         strlcpy(mrul->target.mti_svname, devname,
1076                 sizeof(mrul->target.mti_svname));
1077         /* parse nids later */
1078         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1079         /* Copy records to this temporary llog */
1080         mrul->temp_llh = orig_llh;
1081
1082         rc = llog_process(env, backup_llh, mgs_replace_handler,
1083                           (void *)mrul, NULL);
1084         OBD_FREE_PTR(mrul);
1085 out_close:
1086         rc2 = llog_close(NULL, backup_llh);
1087         if (!rc)
1088                 rc = rc2;
1089 out_closel:
1090         rc2 = llog_close(NULL, orig_llh);
1091         if (!rc)
1092                 rc = rc2;
1093
1094 out_restore:
1095         if (rc) {
1096                 CERROR("%s: llog should be restored: rc = %d\n",
1097                        mgs->obd_name, rc);
1098                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1099                                   logname);
1100                 if (rc2 < 0)
1101                         CERROR("%s: can't restore backup %s: rc = %d\n",
1102                                mgs->obd_name, logname, rc2);
1103         }
1104
1105 out_free:
1106         OBD_FREE(backup, strlen(backup) + 1);
1107
1108 out_put:
1109         llog_ctxt_put(ctxt);
1110
1111         if (rc)
1112                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1113                        mgs->obd_name, logname, rc);
1114
1115         RETURN(rc);
1116 }
1117
1118 /**
1119  * Parse device name and get file system name and/or device index
1120  *
1121  * \param[in]   devname device name (ex. lustre-MDT0000)
1122  * \param[out]  fsname  file system name(optional)
1123  * \param[out]  index   device index(optional)
1124  *
1125  * \retval 0    success
1126  */
1127 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1128 {
1129         int rc;
1130         ENTRY;
1131
1132         /* Extract fsname */
1133         if (fsname) {
1134                 rc = server_name2fsname(devname, fsname, NULL);
1135                 if (rc < 0) {
1136                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1137                                devname);
1138                         RETURN(-EINVAL);
1139                 }
1140         }
1141
1142         if (index) {
1143                 rc = server_name2index(devname, index, NULL);
1144                 if (rc < 0) {
1145                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1146                                devname);
1147                         RETURN(-EINVAL);
1148                 }
1149         }
1150
1151         RETURN(0);
1152 }
1153
1154 /* This is only called during replace_nids */
1155 static int only_mgs_is_running(struct obd_device *mgs_obd)
1156 {
1157         /* TDB: Is global variable with devices count exists? */
1158         int num_devices = get_devices_count();
1159         int num_exports = 0;
1160         struct obd_export *exp;
1161
1162         spin_lock(&mgs_obd->obd_dev_lock);
1163         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1164                 /* skip self export */
1165                 if (exp == mgs_obd->obd_self_export)
1166                         continue;
1167                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1168                         continue;
1169
1170                 ++num_exports;
1171
1172                 CERROR("%s: node %s still connected during replace_nids "
1173                        "connect_flags:%llx\n",
1174                        mgs_obd->obd_name,
1175                        libcfs_nid2str(exp->exp_nid_stats->nid),
1176                        exp_connect_flags(exp));
1177
1178         }
1179         spin_unlock(&mgs_obd->obd_dev_lock);
1180
1181         /* osd, MGS and MGC + self_export
1182            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1183         return (num_devices <= 3) && (num_exports == 0);
1184 }
1185
1186 static int name_create_mdt(char **logname, char *fsname, int i)
1187 {
1188         char mdt_index[9];
1189
1190         sprintf(mdt_index, "-MDT%04x", i);
1191         return name_create(logname, fsname, mdt_index);
1192 }
1193
1194 /**
1195  * Replace nids for \a device to \a nids values
1196  *
1197  * \param obd           MGS obd device
1198  * \param devname       nids need to be replaced for this device
1199  * (ex. lustre-OST0000)
1200  * \param nids          nids list (ex. nid1,nid2,nid3)
1201  *
1202  * \retval 0    success
1203  */
1204 int mgs_replace_nids(const struct lu_env *env,
1205                      struct mgs_device *mgs,
1206                      char *devname, char *nids)
1207 {
1208         /* Assume fsname is part of device name */
1209         char fsname[MTI_NAME_MAXLEN];
1210         int rc;
1211         __u32 index;
1212         char *logname;
1213         struct fs_db *fsdb;
1214         unsigned int i;
1215         int conn_state;
1216         struct obd_device *mgs_obd = mgs->mgs_obd;
1217         ENTRY;
1218
1219         /* We can only change NIDs if no other nodes are connected */
1220         spin_lock(&mgs_obd->obd_dev_lock);
1221         conn_state = mgs_obd->obd_no_conn;
1222         mgs_obd->obd_no_conn = 1;
1223         spin_unlock(&mgs_obd->obd_dev_lock);
1224
1225         /* We can not change nids if not only MGS is started */
1226         if (!only_mgs_is_running(mgs_obd)) {
1227                 CERROR("Only MGS is allowed to be started\n");
1228                 GOTO(out, rc = -EINPROGRESS);
1229         }
1230
1231         /* Get fsname and index*/
1232         rc = mgs_parse_devname(devname, fsname, &index);
1233         if (rc)
1234                 GOTO(out, rc);
1235
1236         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1237         if (rc) {
1238                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1239                 GOTO(out, rc);
1240         }
1241
1242         /* Process client llogs */
1243         name_create(&logname, fsname, "-client");
1244         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1245         name_destroy(&logname);
1246         if (rc) {
1247                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1248                        fsname, devname, rc);
1249                 GOTO(out, rc);
1250         }
1251
1252         /* Process MDT llogs */
1253         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1254                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1255                         continue;
1256                 name_create_mdt(&logname, fsname, i);
1257                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1258                 name_destroy(&logname);
1259                 if (rc)
1260                         GOTO(out, rc);
1261         }
1262
1263 out:
1264         spin_lock(&mgs_obd->obd_dev_lock);
1265         mgs_obd->obd_no_conn = conn_state;
1266         spin_unlock(&mgs_obd->obd_dev_lock);
1267
1268         RETURN(rc);
1269 }
1270
1271 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1272                             char *devname, struct lov_desc *desc)
1273 {
1274         struct mgs_thread_info  *mgi = mgs_env_info(env);
1275         struct llog_cfg_rec     *lcr;
1276         int rc;
1277
1278         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1279         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1280         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1281         if (lcr == NULL)
1282                 return -ENOMEM;
1283
1284         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1285         lustre_cfg_rec_free(lcr);
1286         return rc;
1287 }
1288
1289 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1290                             char *devname, struct lmv_desc *desc)
1291 {
1292         struct mgs_thread_info  *mgi = mgs_env_info(env);
1293         struct llog_cfg_rec     *lcr;
1294         int rc;
1295
1296         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1297         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1298         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1299         if (lcr == NULL)
1300                 return -ENOMEM;
1301
1302         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1303         lustre_cfg_rec_free(lcr);
1304         return rc;
1305 }
1306
1307 static inline int record_mdc_add(const struct lu_env *env,
1308                                  struct llog_handle *llh,
1309                                  char *logname, char *mdcuuid,
1310                                  char *mdtuuid, char *index,
1311                                  char *gen)
1312 {
1313         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1314                            mdtuuid,index,gen,mdcuuid);
1315 }
1316
1317 static inline int record_lov_add(const struct lu_env *env,
1318                                  struct llog_handle *llh,
1319                                  char *lov_name, char *ost_uuid,
1320                                  char *index, char *gen)
1321 {
1322         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1323                            ost_uuid, index, gen, NULL);
1324 }
1325
1326 static inline int record_mount_opt(const struct lu_env *env,
1327                                    struct llog_handle *llh,
1328                                    char *profile, char *lov_name,
1329                                    char *mdc_name)
1330 {
1331         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1332                            profile, lov_name, mdc_name, NULL);
1333 }
1334
1335 static int record_marker(const struct lu_env *env,
1336                          struct llog_handle *llh,
1337                          struct fs_db *fsdb, __u32 flags,
1338                          char *tgtname, char *comment)
1339 {
1340         struct mgs_thread_info *mgi = mgs_env_info(env);
1341         struct llog_cfg_rec *lcr;
1342         int rc;
1343         int cplen = 0;
1344
1345         if (flags & CM_START)
1346                 fsdb->fsdb_gen++;
1347         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1348         mgi->mgi_marker.cm_flags = flags;
1349         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1350         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1351                         sizeof(mgi->mgi_marker.cm_tgtname));
1352         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1353                 return -E2BIG;
1354         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1355                         sizeof(mgi->mgi_marker.cm_comment));
1356         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1357                 return -E2BIG;
1358         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1359         mgi->mgi_marker.cm_canceltime = 0;
1360         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1361         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1362                             sizeof(mgi->mgi_marker));
1363         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1364         if (lcr == NULL)
1365                 return -ENOMEM;
1366
1367         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1368         lustre_cfg_rec_free(lcr);
1369         return rc;
1370 }
1371
1372 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1373                             struct llog_handle **llh, char *name)
1374 {
1375         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1376         struct llog_ctxt        *ctxt;
1377         int                      rc = 0;
1378         ENTRY;
1379
1380         if (*llh)
1381                 GOTO(out, rc = -EBUSY);
1382
1383         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1384         if (!ctxt)
1385                 GOTO(out, rc = -ENODEV);
1386         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1387
1388         rc = llog_open_create(env, ctxt, llh, NULL, name);
1389         if (rc)
1390                 GOTO(out_ctxt, rc);
1391         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1392         if (rc)
1393                 llog_close(env, *llh);
1394 out_ctxt:
1395         llog_ctxt_put(ctxt);
1396 out:
1397         if (rc) {
1398                 CERROR("%s: can't start log %s: rc = %d\n",
1399                        mgs->mgs_obd->obd_name, name, rc);
1400                 *llh = NULL;
1401         }
1402         RETURN(rc);
1403 }
1404
1405 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1406 {
1407         int rc;
1408
1409         rc = llog_close(env, *llh);
1410         *llh = NULL;
1411
1412         return rc;
1413 }
1414
1415 /******************** config "macros" *********************/
1416
1417 /* write an lcfg directly into a log (with markers) */
1418 static int mgs_write_log_direct(const struct lu_env *env,
1419                                 struct mgs_device *mgs, struct fs_db *fsdb,
1420                                 char *logname, struct llog_cfg_rec *lcr,
1421                                 char *devname, char *comment)
1422 {
1423         struct llog_handle *llh = NULL;
1424         int rc;
1425
1426         ENTRY;
1427
1428         rc = record_start_log(env, mgs, &llh, logname);
1429         if (rc)
1430                 RETURN(rc);
1431
1432         /* FIXME These should be a single journal transaction */
1433         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1434         if (rc)
1435                 GOTO(out_end, rc);
1436         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1437         if (rc)
1438                 GOTO(out_end, rc);
1439         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1440         if (rc)
1441                 GOTO(out_end, rc);
1442 out_end:
1443         record_end_log(env, &llh);
1444         RETURN(rc);
1445 }
1446
1447 /* write the lcfg in all logs for the given fs */
1448 static int mgs_write_log_direct_all(const struct lu_env *env,
1449                                     struct mgs_device *mgs,
1450                                     struct fs_db *fsdb,
1451                                     struct mgs_target_info *mti,
1452                                     struct llog_cfg_rec *lcr, char *devname,
1453                                     char *comment, int server_only)
1454 {
1455         struct list_head         log_list;
1456         struct mgs_direntry     *dirent, *n;
1457         char                    *fsname = mti->mti_fsname;
1458         int                      rc = 0, len = strlen(fsname);
1459
1460         ENTRY;
1461         /* Find all the logs in the CONFIGS directory */
1462         rc = class_dentry_readdir(env, mgs, &log_list);
1463         if (rc)
1464                 RETURN(rc);
1465
1466         /* Could use fsdb index maps instead of directory listing */
1467         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1468                 list_del_init(&dirent->mde_list);
1469                 /* don't write to sptlrpc rule log */
1470                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1471                         goto next;
1472
1473                 /* caller wants write server logs only */
1474                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1475                         goto next;
1476
1477                 if (strlen(dirent->mde_name) <= len ||
1478                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1479                     dirent->mde_name[len] != '-')
1480                         goto next;
1481
1482                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1483                 /* Erase any old settings of this same parameter */
1484                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1485                                 devname, comment, CM_SKIP);
1486                 if (rc < 0)
1487                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1488                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1489                 if (lcr == NULL)
1490                         goto next;
1491                 /* Write the new one */
1492                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1493                                           lcr, devname, comment);
1494                 if (rc != 0)
1495                         CERROR("%s: writing log %s: rc = %d\n",
1496                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1497 next:
1498                 mgs_direntry_free(dirent);
1499         }
1500
1501         RETURN(rc);
1502 }
1503
1504 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1505                                     struct mgs_device *mgs,
1506                                     struct fs_db *fsdb,
1507                                     struct mgs_target_info *mti,
1508                                     int index, char *logname);
1509 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1510                                     struct mgs_device *mgs,
1511                                     struct fs_db *fsdb,
1512                                     struct mgs_target_info *mti,
1513                                     char *logname, char *suffix, char *lovname,
1514                                     enum lustre_sec_part sec_part, int flags);
1515 static int name_create_mdt_and_lov(char **logname, char **lovname,
1516                                    struct fs_db *fsdb, int i);
1517
1518 static int add_param(char *params, char *key, char *val)
1519 {
1520         char *start = params + strlen(params);
1521         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1522         int keylen = 0;
1523
1524         if (key != NULL)
1525                 keylen = strlen(key);
1526         if (start + 1 + keylen + strlen(val) >= end) {
1527                 CERROR("params are too long: %s %s%s\n",
1528                        params, key != NULL ? key : "", val);
1529                 return -EINVAL;
1530         }
1531
1532         sprintf(start, " %s%s", key != NULL ? key : "", val);
1533         return 0;
1534 }
1535
1536 /**
1537  * Walk through client config log record and convert the related records
1538  * into the target.
1539  **/
1540 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1541                                          struct llog_handle *llh,
1542                                          struct llog_rec_hdr *rec, void *data)
1543 {
1544         struct mgs_device *mgs;
1545         struct obd_device *obd;
1546         struct mgs_target_info *mti, *tmti;
1547         struct fs_db *fsdb;
1548         int cfg_len = rec->lrh_len;
1549         char *cfg_buf = (char*) (rec + 1);
1550         struct lustre_cfg *lcfg;
1551         int rc = 0;
1552         struct llog_handle *mdt_llh = NULL;
1553         static int got_an_osc_or_mdc = 0;
1554         /* 0: not found any osc/mdc;
1555            1: found osc;
1556            2: found mdc;
1557         */
1558         static int last_step = -1;
1559         int cplen = 0;
1560
1561         ENTRY;
1562
1563         mti = ((struct temp_comp*)data)->comp_mti;
1564         tmti = ((struct temp_comp*)data)->comp_tmti;
1565         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1566         obd = ((struct temp_comp *)data)->comp_obd;
1567         mgs = lu2mgs_dev(obd->obd_lu_dev);
1568         LASSERT(mgs);
1569
1570         if (rec->lrh_type != OBD_CFG_REC) {
1571                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1572                 RETURN(-EINVAL);
1573         }
1574
1575         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1576         if (rc) {
1577                 CERROR("Insane cfg\n");
1578                 RETURN(rc);
1579         }
1580
1581         lcfg = (struct lustre_cfg *)cfg_buf;
1582
1583         if (lcfg->lcfg_command == LCFG_MARKER) {
1584                 struct cfg_marker *marker;
1585                 marker = lustre_cfg_buf(lcfg, 1);
1586                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1587                     (marker->cm_flags & CM_START) &&
1588                      !(marker->cm_flags & CM_SKIP)) {
1589                         got_an_osc_or_mdc = 1;
1590                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1591                                         sizeof(tmti->mti_svname));
1592                         if (cplen >= sizeof(tmti->mti_svname))
1593                                 RETURN(-E2BIG);
1594                         rc = record_start_log(env, mgs, &mdt_llh,
1595                                               mti->mti_svname);
1596                         if (rc)
1597                                 RETURN(rc);
1598                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1599                                            mti->mti_svname, "add osc(copied)");
1600                         record_end_log(env, &mdt_llh);
1601                         last_step = marker->cm_step;
1602                         RETURN(rc);
1603                 }
1604                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1605                     (marker->cm_flags & CM_END) &&
1606                      !(marker->cm_flags & CM_SKIP)) {
1607                         LASSERT(last_step == marker->cm_step);
1608                         last_step = -1;
1609                         got_an_osc_or_mdc = 0;
1610                         memset(tmti, 0, sizeof(*tmti));
1611                         rc = record_start_log(env, mgs, &mdt_llh,
1612                                               mti->mti_svname);
1613                         if (rc)
1614                                 RETURN(rc);
1615                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1616                                            mti->mti_svname, "add osc(copied)");
1617                         record_end_log(env, &mdt_llh);
1618                         RETURN(rc);
1619                 }
1620                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1621                     (marker->cm_flags & CM_START) &&
1622                      !(marker->cm_flags & CM_SKIP)) {
1623                         got_an_osc_or_mdc = 2;
1624                         last_step = marker->cm_step;
1625                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1626                                strlen(marker->cm_tgtname));
1627
1628                         RETURN(rc);
1629                 }
1630                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1631                     (marker->cm_flags & CM_END) &&
1632                      !(marker->cm_flags & CM_SKIP)) {
1633                         LASSERT(last_step == marker->cm_step);
1634                         last_step = -1;
1635                         got_an_osc_or_mdc = 0;
1636                         memset(tmti, 0, sizeof(*tmti));
1637                         RETURN(rc);
1638                 }
1639         }
1640
1641         if (got_an_osc_or_mdc == 0 || last_step < 0)
1642                 RETURN(rc);
1643
1644         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1645                 uint64_t nodenid = lcfg->lcfg_nid;
1646
1647                 if (strlen(tmti->mti_uuid) == 0) {
1648                         /* target uuid not set, this config record is before
1649                          * LCFG_SETUP, this nid is one of target node nid.
1650                          */
1651                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1652                         tmti->mti_nid_count++;
1653                 } else {
1654                         /* failover node nid */
1655                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1656                                        libcfs_nid2str(nodenid));
1657                 }
1658
1659                 RETURN(rc);
1660         }
1661
1662         if (lcfg->lcfg_command == LCFG_SETUP) {
1663                 char *target;
1664
1665                 target = lustre_cfg_string(lcfg, 1);
1666                 memcpy(tmti->mti_uuid, target, strlen(target));
1667                 RETURN(rc);
1668         }
1669
1670         /* ignore client side sptlrpc_conf_log */
1671         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1672                 RETURN(rc);
1673
1674         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1675                 int index;
1676
1677                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1678                         RETURN (-EINVAL);
1679
1680                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1681                        strlen(mti->mti_fsname));
1682                 tmti->mti_stripe_index = index;
1683
1684                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1685                                               mti->mti_stripe_index,
1686                                               mti->mti_svname);
1687                 memset(tmti, 0, sizeof(*tmti));
1688                 RETURN(rc);
1689         }
1690
1691         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1692                 int index;
1693                 char mdt_index[9];
1694                 char *logname, *lovname;
1695
1696                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1697                                              mti->mti_stripe_index);
1698                 if (rc)
1699                         RETURN(rc);
1700                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1701
1702                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1703                         name_destroy(&logname);
1704                         name_destroy(&lovname);
1705                         RETURN(-EINVAL);
1706                 }
1707
1708                 tmti->mti_stripe_index = index;
1709                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1710                                          mdt_index, lovname,
1711                                          LUSTRE_SP_MDT, 0);
1712                 name_destroy(&logname);
1713                 name_destroy(&lovname);
1714                 RETURN(rc);
1715         }
1716         RETURN(rc);
1717 }
1718
1719 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1720 /* stealed from mgs_get_fsdb_from_llog*/
1721 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1722                                               struct mgs_device *mgs,
1723                                               char *client_name,
1724                                               struct temp_comp* comp)
1725 {
1726         struct llog_handle *loghandle;
1727         struct mgs_target_info *tmti;
1728         struct llog_ctxt *ctxt;
1729         int rc;
1730
1731         ENTRY;
1732
1733         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1734         LASSERT(ctxt != NULL);
1735
1736         OBD_ALLOC_PTR(tmti);
1737         if (tmti == NULL)
1738                 GOTO(out_ctxt, rc = -ENOMEM);
1739
1740         comp->comp_tmti = tmti;
1741         comp->comp_obd = mgs->mgs_obd;
1742
1743         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1744                        LLOG_OPEN_EXISTS);
1745         if (rc < 0) {
1746                 if (rc == -ENOENT)
1747                         rc = 0;
1748                 GOTO(out_pop, rc);
1749         }
1750
1751         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1752         if (rc)
1753                 GOTO(out_close, rc);
1754
1755         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1756                                   (void *)comp, NULL, false);
1757         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1758 out_close:
1759         llog_close(env, loghandle);
1760 out_pop:
1761         OBD_FREE_PTR(tmti);
1762 out_ctxt:
1763         llog_ctxt_put(ctxt);
1764         RETURN(rc);
1765 }
1766
1767 /* lmv is the second thing for client logs */
1768 /* copied from mgs_write_log_lov. Please refer to that.  */
1769 static int mgs_write_log_lmv(const struct lu_env *env,
1770                              struct mgs_device *mgs,
1771                              struct fs_db *fsdb,
1772                              struct mgs_target_info *mti,
1773                              char *logname, char *lmvname)
1774 {
1775         struct llog_handle *llh = NULL;
1776         struct lmv_desc *lmvdesc;
1777         char *uuid;
1778         int rc = 0;
1779         ENTRY;
1780
1781         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1782
1783         OBD_ALLOC_PTR(lmvdesc);
1784         if (lmvdesc == NULL)
1785                 RETURN(-ENOMEM);
1786         lmvdesc->ld_active_tgt_count = 0;
1787         lmvdesc->ld_tgt_count = 0;
1788         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1789         uuid = (char *)lmvdesc->ld_uuid.uuid;
1790
1791         rc = record_start_log(env, mgs, &llh, logname);
1792         if (rc)
1793                 GOTO(out_free, rc);
1794         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1795         if (rc)
1796                 GOTO(out_end, rc);
1797         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1798         if (rc)
1799                 GOTO(out_end, rc);
1800         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1801         if (rc)
1802                 GOTO(out_end, rc);
1803         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1804         if (rc)
1805                 GOTO(out_end, rc);
1806 out_end:
1807         record_end_log(env, &llh);
1808 out_free:
1809         OBD_FREE_PTR(lmvdesc);
1810         RETURN(rc);
1811 }
1812
1813 /* lov is the first thing in the mdt and client logs */
1814 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1815                              struct fs_db *fsdb, struct mgs_target_info *mti,
1816                              char *logname, char *lovname)
1817 {
1818         struct llog_handle *llh = NULL;
1819         struct lov_desc *lovdesc;
1820         char *uuid;
1821         int rc = 0;
1822         ENTRY;
1823
1824         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1825
1826         /*
1827         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1828         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1829               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1830         */
1831
1832         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1833         OBD_ALLOC_PTR(lovdesc);
1834         if (lovdesc == NULL)
1835                 RETURN(-ENOMEM);
1836         lovdesc->ld_magic = LOV_DESC_MAGIC;
1837         lovdesc->ld_tgt_count = 0;
1838         /* Defaults.  Can be changed later by lcfg config_param */
1839         lovdesc->ld_default_stripe_count = 1;
1840         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1841         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1842         lovdesc->ld_default_stripe_offset = -1;
1843         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1844         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1845         /* can these be the same? */
1846         uuid = (char *)lovdesc->ld_uuid.uuid;
1847
1848         /* This should always be the first entry in a log.
1849         rc = mgs_clear_log(obd, logname); */
1850         rc = record_start_log(env, mgs, &llh, logname);
1851         if (rc)
1852                 GOTO(out_free, rc);
1853         /* FIXME these should be a single journal transaction */
1854         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1855         if (rc)
1856                 GOTO(out_end, rc);
1857         rc = record_attach(env, llh, lovname, "lov", uuid);
1858         if (rc)
1859                 GOTO(out_end, rc);
1860         rc = record_lov_setup(env, llh, lovname, lovdesc);
1861         if (rc)
1862                 GOTO(out_end, rc);
1863         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1864         if (rc)
1865                 GOTO(out_end, rc);
1866         EXIT;
1867 out_end:
1868         record_end_log(env, &llh);
1869 out_free:
1870         OBD_FREE_PTR(lovdesc);
1871         return rc;
1872 }
1873
1874 /* add failnids to open log */
1875 static int mgs_write_log_failnids(const struct lu_env *env,
1876                                   struct mgs_target_info *mti,
1877                                   struct llog_handle *llh,
1878                                   char *cliname)
1879 {
1880         char *failnodeuuid = NULL;
1881         char *ptr = mti->mti_params;
1882         lnet_nid_t nid;
1883         int rc = 0;
1884
1885         /*
1886         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1887         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1888         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1889         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1890         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1891         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1892         */
1893
1894         /*
1895          * Pull failnid info out of params string, which may contain something
1896          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1897          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1898          * etc.  However, convert_hostnames() should have caught those.
1899          */
1900         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1901                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1902                         if (failnodeuuid == NULL) {
1903                                 /* We don't know the failover node name,
1904                                    so just use the first nid as the uuid */
1905                                 rc = name_create(&failnodeuuid,
1906                                                  libcfs_nid2str(nid), "");
1907                                 if (rc)
1908                                         return rc;
1909                         }
1910                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1911                                "client %s\n", libcfs_nid2str(nid),
1912                                failnodeuuid, cliname);
1913                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1914                         /*
1915                          * If *ptr is ':', we have added all NIDs for
1916                          * failnodeuuid.
1917                          */
1918                         if (*ptr == ':') {
1919                                 rc = record_add_conn(env, llh, cliname,
1920                                                      failnodeuuid);
1921                                 name_destroy(&failnodeuuid);
1922                                 failnodeuuid = NULL;
1923                         }
1924                 }
1925                 if (failnodeuuid) {
1926                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1927                         name_destroy(&failnodeuuid);
1928                         failnodeuuid = NULL;
1929                 }
1930         }
1931
1932         return rc;
1933 }
1934
1935 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1936                                     struct mgs_device *mgs,
1937                                     struct fs_db *fsdb,
1938                                     struct mgs_target_info *mti,
1939                                     char *logname, char *lmvname)
1940 {
1941         struct llog_handle *llh = NULL;
1942         char *mdcname = NULL;
1943         char *nodeuuid = NULL;
1944         char *mdcuuid = NULL;
1945         char *lmvuuid = NULL;
1946         char index[6];
1947         int i, rc;
1948         ENTRY;
1949
1950         if (mgs_log_is_empty(env, mgs, logname)) {
1951                 CERROR("log is empty! Logical error\n");
1952                 RETURN(-EINVAL);
1953         }
1954
1955         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1956                mti->mti_svname, logname, lmvname);
1957
1958         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1959         if (rc)
1960                 RETURN(rc);
1961         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1962         if (rc)
1963                 GOTO(out_free, rc);
1964         rc = name_create(&mdcuuid, mdcname, "_UUID");
1965         if (rc)
1966                 GOTO(out_free, rc);
1967         rc = name_create(&lmvuuid, lmvname, "_UUID");
1968         if (rc)
1969                 GOTO(out_free, rc);
1970
1971         rc = record_start_log(env, mgs, &llh, logname);
1972         if (rc)
1973                 GOTO(out_free, rc);
1974         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1975                            "add mdc");
1976         if (rc)
1977                 GOTO(out_end, rc);
1978         for (i = 0; i < mti->mti_nid_count; i++) {
1979                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1980                        libcfs_nid2str(mti->mti_nids[i]));
1981
1982                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1983                 if (rc)
1984                         GOTO(out_end, rc);
1985         }
1986
1987         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1988         if (rc)
1989                 GOTO(out_end, rc);
1990         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
1991                           NULL, NULL);
1992         if (rc)
1993                 GOTO(out_end, rc);
1994         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1995         if (rc)
1996                 GOTO(out_end, rc);
1997         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1998         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1999                             index, "1");
2000         if (rc)
2001                 GOTO(out_end, rc);
2002         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2003                            "add mdc");
2004         if (rc)
2005                 GOTO(out_end, rc);
2006 out_end:
2007         record_end_log(env, &llh);
2008 out_free:
2009         name_destroy(&lmvuuid);
2010         name_destroy(&mdcuuid);
2011         name_destroy(&mdcname);
2012         name_destroy(&nodeuuid);
2013         RETURN(rc);
2014 }
2015
2016 static inline int name_create_lov(char **lovname, char *mdtname,
2017                                   struct fs_db *fsdb, int index)
2018 {
2019         /* COMPAT_180 */
2020         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2021                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2022         else
2023                 return name_create(lovname, mdtname, "-mdtlov");
2024 }
2025
2026 static int name_create_mdt_and_lov(char **logname, char **lovname,
2027                                    struct fs_db *fsdb, int i)
2028 {
2029         int rc;
2030
2031         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2032         if (rc)
2033                 return rc;
2034         /* COMPAT_180 */
2035         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2036                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2037         else
2038                 rc = name_create(lovname, *logname, "-mdtlov");
2039         if (rc) {
2040                 name_destroy(logname);
2041                 *logname = NULL;
2042         }
2043         return rc;
2044 }
2045
2046 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2047                                       struct fs_db *fsdb, int i)
2048 {
2049         char suffix[16];
2050
2051         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2052                 sprintf(suffix, "-osc");
2053         else
2054                 sprintf(suffix, "-osc-MDT%04x", i);
2055         return name_create(oscname, ostname, suffix);
2056 }
2057
2058 /* add new mdc to already existent MDS */
2059 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2060                                     struct mgs_device *mgs,
2061                                     struct fs_db *fsdb,
2062                                     struct mgs_target_info *mti,
2063                                     int mdt_index, char *logname)
2064 {
2065         struct llog_handle      *llh = NULL;
2066         char    *nodeuuid = NULL;
2067         char    *ospname = NULL;
2068         char    *lovuuid = NULL;
2069         char    *mdtuuid = NULL;
2070         char    *svname = NULL;
2071         char    *mdtname = NULL;
2072         char    *lovname = NULL;
2073         char    index_str[16];
2074         int     i, rc;
2075
2076         ENTRY;
2077         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2078                 CERROR("log is empty! Logical error\n");
2079                 RETURN (-EINVAL);
2080         }
2081
2082         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2083                logname);
2084
2085         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2086         if (rc)
2087                 RETURN(rc);
2088
2089         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2090         if (rc)
2091                 GOTO(out_destory, rc);
2092
2093         rc = name_create(&svname, mdtname, "-osp");
2094         if (rc)
2095                 GOTO(out_destory, rc);
2096
2097         sprintf(index_str, "-MDT%04x", mdt_index);
2098         rc = name_create(&ospname, svname, index_str);
2099         if (rc)
2100                 GOTO(out_destory, rc);
2101
2102         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2103         if (rc)
2104                 GOTO(out_destory, rc);
2105
2106         rc = name_create(&lovuuid, lovname, "_UUID");
2107         if (rc)
2108                 GOTO(out_destory, rc);
2109
2110         rc = name_create(&mdtuuid, mdtname, "_UUID");
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         rc = record_start_log(env, mgs, &llh, logname);
2115         if (rc)
2116                 GOTO(out_destory, rc);
2117
2118         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2119                            "add osp");
2120         if (rc)
2121                 GOTO(out_destory, rc);
2122
2123         for (i = 0; i < mti->mti_nid_count; i++) {
2124                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2125                        libcfs_nid2str(mti->mti_nids[i]));
2126                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2127                 if (rc)
2128                         GOTO(out_end, rc);
2129         }
2130
2131         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2132         if (rc)
2133                 GOTO(out_end, rc);
2134
2135         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2136                           NULL, NULL);
2137         if (rc)
2138                 GOTO(out_end, rc);
2139
2140         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2141         if (rc)
2142                 GOTO(out_end, rc);
2143
2144         /* Add mdc(osp) to lod */
2145         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2146                  mti->mti_stripe_index);
2147         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2148                          index_str, "1", NULL);
2149         if (rc)
2150                 GOTO(out_end, rc);
2151
2152         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2153         if (rc)
2154                 GOTO(out_end, rc);
2155
2156 out_end:
2157         record_end_log(env, &llh);
2158
2159 out_destory:
2160         name_destroy(&mdtuuid);
2161         name_destroy(&lovuuid);
2162         name_destroy(&lovname);
2163         name_destroy(&ospname);
2164         name_destroy(&svname);
2165         name_destroy(&nodeuuid);
2166         name_destroy(&mdtname);
2167         RETURN(rc);
2168 }
2169
2170 static int mgs_write_log_mdt0(const struct lu_env *env,
2171                               struct mgs_device *mgs,
2172                               struct fs_db *fsdb,
2173                               struct mgs_target_info *mti)
2174 {
2175         char *log = mti->mti_svname;
2176         struct llog_handle *llh = NULL;
2177         char *uuid, *lovname;
2178         char mdt_index[6];
2179         char *ptr = mti->mti_params;
2180         int rc = 0, failout = 0;
2181         ENTRY;
2182
2183         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2184         if (uuid == NULL)
2185                 RETURN(-ENOMEM);
2186
2187         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2188                 failout = (strncmp(ptr, "failout", 7) == 0);
2189
2190         rc = name_create(&lovname, log, "-mdtlov");
2191         if (rc)
2192                 GOTO(out_free, rc);
2193         if (mgs_log_is_empty(env, mgs, log)) {
2194                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2195                 if (rc)
2196                         GOTO(out_lod, rc);
2197         }
2198
2199         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2200
2201         rc = record_start_log(env, mgs, &llh, log);
2202         if (rc)
2203                 GOTO(out_lod, rc);
2204
2205         /* add MDT itself */
2206
2207         /* FIXME this whole fn should be a single journal transaction */
2208         sprintf(uuid, "%s_UUID", log);
2209         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2210         if (rc)
2211                 GOTO(out_lod, rc);
2212         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2213         if (rc)
2214                 GOTO(out_end, rc);
2215         rc = record_mount_opt(env, llh, log, lovname, NULL);
2216         if (rc)
2217                 GOTO(out_end, rc);
2218         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2219                         failout ? "n" : "f");
2220         if (rc)
2221                 GOTO(out_end, rc);
2222         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2223         if (rc)
2224                 GOTO(out_end, rc);
2225 out_end:
2226         record_end_log(env, &llh);
2227 out_lod:
2228         name_destroy(&lovname);
2229 out_free:
2230         OBD_FREE(uuid, sizeof(struct obd_uuid));
2231         RETURN(rc);
2232 }
2233
2234 /* envelope method for all layers log */
2235 static int mgs_write_log_mdt(const struct lu_env *env,
2236                              struct mgs_device *mgs,
2237                              struct fs_db *fsdb,
2238                              struct mgs_target_info *mti)
2239 {
2240         struct mgs_thread_info *mgi = mgs_env_info(env);
2241         struct llog_handle *llh = NULL;
2242         char *cliname;
2243         int rc, i = 0;
2244         ENTRY;
2245
2246         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2247
2248         if (mti->mti_uuid[0] == '\0') {
2249                 /* Make up our own uuid */
2250                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2251                          "%s_UUID", mti->mti_svname);
2252         }
2253
2254         /* add mdt */
2255         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2256         if (rc)
2257                 RETURN(rc);
2258         /* Append the mdt info to the client log */
2259         rc = name_create(&cliname, mti->mti_fsname, "-client");
2260         if (rc)
2261                 RETURN(rc);
2262
2263         if (mgs_log_is_empty(env, mgs, cliname)) {
2264                 /* Start client log */
2265                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2266                                        fsdb->fsdb_clilov);
2267                 if (rc)
2268                         GOTO(out_free, rc);
2269                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2270                                        fsdb->fsdb_clilmv);
2271                 if (rc)
2272                         GOTO(out_free, rc);
2273         }
2274
2275         /*
2276         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2277         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2278         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2279         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2280         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2281         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2282         */
2283
2284                 /* copy client info about lov/lmv */
2285                 mgi->mgi_comp.comp_mti = mti;
2286                 mgi->mgi_comp.comp_fsdb = fsdb;
2287
2288                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2289                                                         &mgi->mgi_comp);
2290                 if (rc)
2291                         GOTO(out_free, rc);
2292                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2293                                               fsdb->fsdb_clilmv);
2294                 if (rc)
2295                         GOTO(out_free, rc);
2296
2297                 /* add mountopts */
2298                 rc = record_start_log(env, mgs, &llh, cliname);
2299                 if (rc)
2300                         GOTO(out_free, rc);
2301
2302                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2303                                    "mount opts");
2304                 if (rc)
2305                         GOTO(out_end, rc);
2306                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2307                                       fsdb->fsdb_clilmv);
2308                 if (rc)
2309                         GOTO(out_end, rc);
2310                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2311                                    "mount opts");
2312
2313         if (rc)
2314                 GOTO(out_end, rc);
2315
2316         /* for_all_existing_mdt except current one */
2317         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2318                 if (i !=  mti->mti_stripe_index &&
2319                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2320                         char *logname;
2321
2322                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2323                         if (rc)
2324                                 GOTO(out_end, rc);
2325
2326                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2327                                                       i, logname);
2328                         name_destroy(&logname);
2329                         if (rc)
2330                                 GOTO(out_end, rc);
2331                 }
2332         }
2333 out_end:
2334         record_end_log(env, &llh);
2335 out_free:
2336         name_destroy(&cliname);
2337         RETURN(rc);
2338 }
2339
2340 /* Add the ost info to the client/mdt lov */
2341 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2342                                     struct mgs_device *mgs, struct fs_db *fsdb,
2343                                     struct mgs_target_info *mti,
2344                                     char *logname, char *suffix, char *lovname,
2345                                     enum lustre_sec_part sec_part, int flags)
2346 {
2347         struct llog_handle *llh = NULL;
2348         char *nodeuuid = NULL;
2349         char *oscname = NULL;
2350         char *oscuuid = NULL;
2351         char *lovuuid = NULL;
2352         char *svname = NULL;
2353         char index[6];
2354         int i, rc;
2355
2356         ENTRY;
2357         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2358                mti->mti_svname, logname);
2359
2360         if (mgs_log_is_empty(env, mgs, logname)) {
2361                 CERROR("log is empty! Logical error\n");
2362                 RETURN (-EINVAL);
2363         }
2364
2365         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2366         if (rc)
2367                 RETURN(rc);
2368         rc = name_create(&svname, mti->mti_svname, "-osc");
2369         if (rc)
2370                 GOTO(out_free, rc);
2371
2372         /* for the system upgraded from old 1.8, keep using the old osc naming
2373          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2374         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2375                 rc = name_create(&oscname, svname, "");
2376         else
2377                 rc = name_create(&oscname, svname, suffix);
2378         if (rc)
2379                 GOTO(out_free, rc);
2380
2381         rc = name_create(&oscuuid, oscname, "_UUID");
2382         if (rc)
2383                 GOTO(out_free, rc);
2384         rc = name_create(&lovuuid, lovname, "_UUID");
2385         if (rc)
2386                 GOTO(out_free, rc);
2387
2388
2389         /*
2390         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2391         multihomed (#4)
2392         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2393         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2394         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2395         failover (#6,7)
2396         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2397         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2398         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2399         */
2400
2401         rc = record_start_log(env, mgs, &llh, logname);
2402         if (rc)
2403                 GOTO(out_free, rc);
2404
2405         /* FIXME these should be a single journal transaction */
2406         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2407                            "add osc");
2408         if (rc)
2409                 GOTO(out_end, rc);
2410
2411         /* NB: don't change record order, because upon MDT steal OSC config
2412          * from client, it treats all nids before LCFG_SETUP as target nids
2413          * (multiple interfaces), while nids after as failover node nids.
2414          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2415          */
2416         for (i = 0; i < mti->mti_nid_count; i++) {
2417                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2418                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2419                 if (rc)
2420                         GOTO(out_end, rc);
2421         }
2422         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2423         if (rc)
2424                 GOTO(out_end, rc);
2425         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2426                           NULL, NULL);
2427         if (rc)
2428                 GOTO(out_end, rc);
2429         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2430         if (rc)
2431                 GOTO(out_end, rc);
2432
2433         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2434
2435         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2436         if (rc)
2437                 GOTO(out_end, rc);
2438         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2439                            "add osc");
2440         if (rc)
2441                 GOTO(out_end, rc);
2442 out_end:
2443         record_end_log(env, &llh);
2444 out_free:
2445         name_destroy(&lovuuid);
2446         name_destroy(&oscuuid);
2447         name_destroy(&oscname);
2448         name_destroy(&svname);
2449         name_destroy(&nodeuuid);
2450         RETURN(rc);
2451 }
2452
2453 static int mgs_write_log_ost(const struct lu_env *env,
2454                              struct mgs_device *mgs, struct fs_db *fsdb,
2455                              struct mgs_target_info *mti)
2456 {
2457         struct llog_handle *llh = NULL;
2458         char *logname, *lovname;
2459         char *ptr = mti->mti_params;
2460         int rc, flags = 0, failout = 0, i;
2461         ENTRY;
2462
2463         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2464
2465         /* The ost startup log */
2466
2467         /* If the ost log already exists, that means that someone reformatted
2468            the ost and it called target_add again. */
2469         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2470                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2471                                    "exists, yet the server claims it never "
2472                                    "registered. It may have been reformatted, "
2473                                    "or the index changed. writeconf the MDT to "
2474                                    "regenerate all logs.\n", mti->mti_svname);
2475                 RETURN(-EALREADY);
2476         }
2477
2478         /*
2479         attach obdfilter ost1 ost1_UUID
2480         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2481         */
2482         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2483                 failout = (strncmp(ptr, "failout", 7) == 0);
2484         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2485         if (rc)
2486                 RETURN(rc);
2487         /* FIXME these should be a single journal transaction */
2488         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2489         if (rc)
2490                 GOTO(out_end, rc);
2491         if (*mti->mti_uuid == '\0')
2492                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2493                          "%s_UUID", mti->mti_svname);
2494         rc = record_attach(env, llh, mti->mti_svname,
2495                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2496         if (rc)
2497                 GOTO(out_end, rc);
2498         rc = record_setup(env, llh, mti->mti_svname,
2499                           "dev"/*ignored*/, "type"/*ignored*/,
2500                           failout ? "n" : "f", NULL/*options*/);
2501         if (rc)
2502                 GOTO(out_end, rc);
2503         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2504         if (rc)
2505                 GOTO(out_end, rc);
2506 out_end:
2507         record_end_log(env, &llh);
2508         if (rc)
2509                 RETURN(rc);
2510         /* We also have to update the other logs where this osc is part of
2511            the lov */
2512
2513         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2514                 /* If we're upgrading, the old mdt log already has our
2515                    entry. Let's do a fake one for fun. */
2516                 /* Note that we can't add any new failnids, since we don't
2517                    know the old osc names. */
2518                 flags = CM_SKIP | CM_UPGRADE146;
2519
2520         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2521                 /* If the update flag isn't set, don't update client/mdt
2522                    logs. */
2523                 flags |= CM_SKIP;
2524                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2525                               "the MDT first to regenerate it.\n",
2526                               mti->mti_svname);
2527         }
2528
2529         /* Add ost to all MDT lov defs */
2530         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2531                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2532                         char mdt_index[9];
2533
2534                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2535                                                      i);
2536                         if (rc)
2537                                 RETURN(rc);
2538                         sprintf(mdt_index, "-MDT%04x", i);
2539                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2540                                                       logname, mdt_index,
2541                                                       lovname, LUSTRE_SP_MDT,
2542                                                       flags);
2543                         name_destroy(&logname);
2544                         name_destroy(&lovname);
2545                         if (rc)
2546                                 RETURN(rc);
2547                 }
2548         }
2549
2550         /* Append ost info to the client log */
2551         rc = name_create(&logname, mti->mti_fsname, "-client");
2552         if (rc)
2553                 RETURN(rc);
2554         if (mgs_log_is_empty(env, mgs, logname)) {
2555                 /* Start client log */
2556                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2557                                        fsdb->fsdb_clilov);
2558                 if (rc)
2559                         GOTO(out_free, rc);
2560                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2561                                        fsdb->fsdb_clilmv);
2562                 if (rc)
2563                         GOTO(out_free, rc);
2564         }
2565         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2566                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2567 out_free:
2568         name_destroy(&logname);
2569         RETURN(rc);
2570 }
2571
2572 static __inline__ int mgs_param_empty(char *ptr)
2573 {
2574         char *tmp;
2575
2576         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2577                 return 1;
2578         return 0;
2579 }
2580
2581 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2582                                           struct mgs_device *mgs,
2583                                           struct fs_db *fsdb,
2584                                           struct mgs_target_info *mti,
2585                                           char *logname, char *cliname)
2586 {
2587         int rc;
2588         struct llog_handle *llh = NULL;
2589
2590         if (mgs_param_empty(mti->mti_params)) {
2591                 /* Remove _all_ failnids */
2592                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2593                                 mti->mti_svname, "add failnid", CM_SKIP);
2594                 return rc < 0 ? rc : 0;
2595         }
2596
2597         /* Otherwise failover nids are additive */
2598         rc = record_start_log(env, mgs, &llh, logname);
2599         if (rc)
2600                 return rc;
2601                 /* FIXME this should be a single journal transaction */
2602         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2603                            "add failnid");
2604         if (rc)
2605                 goto out_end;
2606         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2607         if (rc)
2608                 goto out_end;
2609         rc = record_marker(env, llh, fsdb, CM_END,
2610                            mti->mti_svname, "add failnid");
2611 out_end:
2612         record_end_log(env, &llh);
2613         return rc;
2614 }
2615
2616
2617 /* Add additional failnids to an existing log.
2618    The mdc/osc must have been added to logs first */
2619 /* tcp nids must be in dotted-quad ascii -
2620    we can't resolve hostnames from the kernel. */
2621 static int mgs_write_log_add_failnid(const struct lu_env *env,
2622                                      struct mgs_device *mgs,
2623                                      struct fs_db *fsdb,
2624                                      struct mgs_target_info *mti)
2625 {
2626         char *logname, *cliname;
2627         int rc;
2628         ENTRY;
2629
2630         /* FIXME we currently can't erase the failnids
2631          * given when a target first registers, since they aren't part of
2632          * an "add uuid" stanza */
2633
2634         /* Verify that we know about this target */
2635         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2636                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2637                                    "yet. It must be started before failnids "
2638                                    "can be added.\n", mti->mti_svname);
2639                 RETURN(-ENOENT);
2640         }
2641
2642         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2643         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2644                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2645         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2646                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2647         } else {
2648                 RETURN(-EINVAL);
2649         }
2650         if (rc)
2651                 RETURN(rc);
2652         /* Add failover nids to the client log */
2653         rc = name_create(&logname, mti->mti_fsname, "-client");
2654         if (rc) {
2655                 name_destroy(&cliname);
2656                 RETURN(rc);
2657         }
2658         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2659         name_destroy(&logname);
2660         name_destroy(&cliname);
2661         if (rc)
2662                 RETURN(rc);
2663
2664         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2665                 /* Add OST failover nids to the MDT logs as well */
2666                 int i;
2667
2668                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2669                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2670                                 continue;
2671                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2672                         if (rc)
2673                                 RETURN(rc);
2674                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2675                                                  fsdb, i);
2676                         if (rc) {
2677                                 name_destroy(&logname);
2678                                 RETURN(rc);
2679                         }
2680                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2681                                                             mti, logname,
2682                                                             cliname);
2683                         name_destroy(&cliname);
2684                         name_destroy(&logname);
2685                         if (rc)
2686                                 RETURN(rc);
2687                 }
2688         }
2689
2690         RETURN(rc);
2691 }
2692
2693 static int mgs_wlp_lcfg(const struct lu_env *env,
2694                         struct mgs_device *mgs, struct fs_db *fsdb,
2695                         struct mgs_target_info *mti,
2696                         char *logname, struct lustre_cfg_bufs *bufs,
2697                         char *tgtname, char *ptr)
2698 {
2699         char comment[MTI_NAME_MAXLEN];
2700         char *tmp;
2701         struct llog_cfg_rec *lcr;
2702         int rc, del;
2703
2704         /* Erase any old settings of this same parameter */
2705         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2706         comment[MTI_NAME_MAXLEN - 1] = 0;
2707         /* But don't try to match the value. */
2708         tmp = strchr(comment, '=');
2709         if (tmp != NULL)
2710                 *tmp = 0;
2711         /* FIXME we should skip settings that are the same as old values */
2712         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2713         if (rc < 0)
2714                 return rc;
2715         del = mgs_param_empty(ptr);
2716
2717         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2718                       "Setting" : "Modifying", tgtname, comment, logname);
2719         if (del) {
2720                 /* mgs_modify() will return 1 if nothing had to be done */
2721                 if (rc == 1)
2722                         rc = 0;
2723                 return rc;
2724         }
2725
2726         lustre_cfg_bufs_reset(bufs, tgtname);
2727         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2728         if (mti->mti_flags & LDD_F_PARAM2)
2729                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2730
2731         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2732                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2733         if (lcr == NULL)
2734                 return -ENOMEM;
2735
2736         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2737                                   comment);
2738         lustre_cfg_rec_free(lcr);
2739         return rc;
2740 }
2741
2742 static int mgs_write_log_param2(const struct lu_env *env,
2743                                 struct mgs_device *mgs,
2744                                 struct fs_db *fsdb,
2745                                 struct mgs_target_info *mti, char *ptr)
2746 {
2747         struct lustre_cfg_bufs  bufs;
2748         int                     rc = 0;
2749         ENTRY;
2750
2751         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2752         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2753                           mti->mti_svname, ptr);
2754
2755         RETURN(rc);
2756 }
2757
2758 /* write global variable settings into log */
2759 static int mgs_write_log_sys(const struct lu_env *env,
2760                              struct mgs_device *mgs, struct fs_db *fsdb,
2761                              struct mgs_target_info *mti, char *sys, char *ptr)
2762 {
2763         struct mgs_thread_info  *mgi = mgs_env_info(env);
2764         struct lustre_cfg       *lcfg;
2765         struct llog_cfg_rec     *lcr;
2766         char *tmp, sep;
2767         int rc, cmd, convert = 1;
2768
2769         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2770                 cmd = LCFG_SET_TIMEOUT;
2771         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2772                 cmd = LCFG_SET_LDLM_TIMEOUT;
2773         /* Check for known params here so we can return error to lctl */
2774         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2775                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2776                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2777                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2778                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2779                 cmd = LCFG_PARAM;
2780         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2781                 convert = 0; /* Don't convert string value to integer */
2782                 cmd = LCFG_PARAM;
2783         } else {
2784                 return -EINVAL;
2785         }
2786
2787         if (mgs_param_empty(ptr))
2788                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2789         else
2790                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2791
2792         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2793         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2794         if (!convert && *tmp != '\0')
2795                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2796         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2797         if (lcr == NULL)
2798                 return -ENOMEM;
2799
2800         lcfg = &lcr->lcr_cfg;
2801         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2802         /* truncate the comment to the parameter name */
2803         ptr = tmp - 1;
2804         sep = *ptr;
2805         *ptr = '\0';
2806         /* modify all servers and clients */
2807         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2808                                       *tmp == '\0' ? NULL : lcr,
2809                                       mti->mti_fsname, sys, 0);
2810         if (rc == 0 && *tmp != '\0') {
2811                 switch (cmd) {
2812                 case LCFG_SET_TIMEOUT:
2813                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2814                                 class_process_config(lcfg);
2815                         break;
2816                 case LCFG_SET_LDLM_TIMEOUT:
2817                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2818                                 class_process_config(lcfg);
2819                         break;
2820                 default:
2821                         break;
2822                 }
2823         }
2824         *ptr = sep;
2825         lustre_cfg_rec_free(lcr);
2826         return rc;
2827 }
2828
2829 /* write quota settings into log */
2830 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2831                                struct fs_db *fsdb, struct mgs_target_info *mti,
2832                                char *quota, char *ptr)
2833 {
2834         struct mgs_thread_info  *mgi = mgs_env_info(env);
2835         struct llog_cfg_rec     *lcr;
2836         char                    *tmp;
2837         char                     sep;
2838         int                      rc, cmd = LCFG_PARAM;
2839
2840         /* support only 'meta' and 'data' pools so far */
2841         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2842             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2843                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2844                        "& quota.ost are)\n", ptr);
2845                 return -EINVAL;
2846         }
2847
2848         if (*tmp == '\0') {
2849                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2850         } else {
2851                 CDEBUG(D_MGS, "global '%s'\n", quota);
2852
2853                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2854                     strcmp(tmp, "none") != 0) {
2855                         CERROR("enable option(%s) isn't supported\n", tmp);
2856                         return -EINVAL;
2857                 }
2858         }
2859
2860         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2861         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2862         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2863         if (lcr == NULL)
2864                 return -ENOMEM;
2865
2866         /* truncate the comment to the parameter name */
2867         ptr = tmp - 1;
2868         sep = *ptr;
2869         *ptr = '\0';
2870
2871         /* XXX we duplicated quota enable information in all server
2872          *     config logs, it should be moved to a separate config
2873          *     log once we cleanup the config log for global param. */
2874         /* modify all servers */
2875         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2876                                       *tmp == '\0' ? NULL : lcr,
2877                                       mti->mti_fsname, quota, 1);
2878         *ptr = sep;
2879         lustre_cfg_rec_free(lcr);
2880         return rc < 0 ? rc : 0;
2881 }
2882
2883 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2884                                    struct mgs_device *mgs,
2885                                    struct fs_db *fsdb,
2886                                    struct mgs_target_info *mti,
2887                                    char *param)
2888 {
2889         struct mgs_thread_info  *mgi = mgs_env_info(env);
2890         struct llog_cfg_rec     *lcr;
2891         struct llog_handle      *llh = NULL;
2892         char                    *logname;
2893         char                    *comment, *ptr;
2894         int                      rc, len;
2895
2896         ENTRY;
2897
2898         /* get comment */
2899         ptr = strchr(param, '=');
2900         LASSERT(ptr != NULL);
2901         len = ptr - param;
2902
2903         OBD_ALLOC(comment, len + 1);
2904         if (comment == NULL)
2905                 RETURN(-ENOMEM);
2906         strncpy(comment, param, len);
2907         comment[len] = '\0';
2908
2909         /* prepare lcfg */
2910         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2911         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2912         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2913         if (lcr == NULL)
2914                 GOTO(out_comment, rc = -ENOMEM);
2915
2916         /* construct log name */
2917         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2918         if (rc < 0)
2919                 GOTO(out_lcfg, rc);
2920
2921         if (mgs_log_is_empty(env, mgs, logname)) {
2922                 rc = record_start_log(env, mgs, &llh, logname);
2923                 if (rc < 0)
2924                         GOTO(out, rc);
2925                 record_end_log(env, &llh);
2926         }
2927
2928         /* obsolete old one */
2929         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2930                         comment, CM_SKIP);
2931         if (rc < 0)
2932                 GOTO(out, rc);
2933         /* write the new one */
2934         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2935                                   mti->mti_svname, comment);
2936         if (rc)
2937                 CERROR("%s: error writing log %s: rc = %d\n",
2938                        mgs->mgs_obd->obd_name, logname, rc);
2939 out:
2940         name_destroy(&logname);
2941 out_lcfg:
2942         lustre_cfg_rec_free(lcr);
2943 out_comment:
2944         OBD_FREE(comment, len + 1);
2945         RETURN(rc);
2946 }
2947
2948 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2949                                         char *param)
2950 {
2951         char    *ptr;
2952
2953         /* disable the adjustable udesc parameter for now, i.e. use default
2954          * setting that client always ship udesc to MDT if possible. to enable
2955          * it simply remove the following line */
2956         goto error_out;
2957
2958         ptr = strchr(param, '=');
2959         if (ptr == NULL)
2960                 goto error_out;
2961         *ptr++ = '\0';
2962
2963         if (strcmp(param, PARAM_SRPC_UDESC))
2964                 goto error_out;
2965
2966         if (strcmp(ptr, "yes") == 0) {
2967                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2968                 CWARN("Enable user descriptor shipping from client to MDT\n");
2969         } else if (strcmp(ptr, "no") == 0) {
2970                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2971                 CWARN("Disable user descriptor shipping from client to MDT\n");
2972         } else {
2973                 *(ptr - 1) = '=';
2974                 goto error_out;
2975         }
2976         return 0;
2977
2978 error_out:
2979         CERROR("Invalid param: %s\n", param);
2980         return -EINVAL;
2981 }
2982
2983 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2984                                   const char *svname,
2985                                   char *param)
2986 {
2987         struct sptlrpc_rule      rule;
2988         struct sptlrpc_rule_set *rset;
2989         int                      rc;
2990         ENTRY;
2991
2992         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2993                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2994                 RETURN(-EINVAL);
2995         }
2996
2997         if (strncmp(param, PARAM_SRPC_UDESC,
2998                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2999                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3000         }
3001
3002         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3003                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3004                 RETURN(-EINVAL);
3005         }
3006
3007         param += sizeof(PARAM_SRPC_FLVR) - 1;
3008
3009         rc = sptlrpc_parse_rule(param, &rule);
3010         if (rc)
3011                 RETURN(rc);
3012
3013         /* mgs rules implies must be mgc->mgs */
3014         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3015                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3016                      rule.sr_from != LUSTRE_SP_ANY) ||
3017                     (rule.sr_to != LUSTRE_SP_MGS &&
3018                      rule.sr_to != LUSTRE_SP_ANY))
3019                         RETURN(-EINVAL);
3020         }
3021
3022         /* preapre room for this coming rule. svcname format should be:
3023          * - fsname: general rule
3024          * - fsname-tgtname: target-specific rule
3025          */
3026         if (strchr(svname, '-')) {
3027                 struct mgs_tgt_srpc_conf *tgtconf;
3028                 int                       found = 0;
3029
3030                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3031                      tgtconf = tgtconf->mtsc_next) {
3032                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3033                                 found = 1;
3034                                 break;
3035                         }
3036                 }
3037
3038                 if (!found) {
3039                         int name_len;
3040
3041                         OBD_ALLOC_PTR(tgtconf);
3042                         if (tgtconf == NULL)
3043                                 RETURN(-ENOMEM);
3044
3045                         name_len = strlen(svname);
3046
3047                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3048                         if (tgtconf->mtsc_tgt == NULL) {
3049                                 OBD_FREE_PTR(tgtconf);
3050                                 RETURN(-ENOMEM);
3051                         }
3052                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3053
3054                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3055                         fsdb->fsdb_srpc_tgt = tgtconf;
3056                 }
3057
3058                 rset = &tgtconf->mtsc_rset;
3059         } else {
3060                 rset = &fsdb->fsdb_srpc_gen;
3061         }
3062
3063         rc = sptlrpc_rule_set_merge(rset, &rule);
3064
3065         RETURN(rc);
3066 }
3067
3068 static int mgs_srpc_set_param(const struct lu_env *env,
3069                               struct mgs_device *mgs,
3070                               struct fs_db *fsdb,
3071                               struct mgs_target_info *mti,
3072                               char *param)
3073 {
3074         char                   *copy;
3075         int                     rc, copy_size;
3076         ENTRY;
3077
3078 #ifndef HAVE_GSS
3079         RETURN(-EINVAL);
3080 #endif
3081         /* keep a copy of original param, which could be destroied
3082          * during parsing */
3083         copy_size = strlen(param) + 1;
3084         OBD_ALLOC(copy, copy_size);
3085         if (copy == NULL)
3086                 return -ENOMEM;
3087         memcpy(copy, param, copy_size);
3088
3089         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3090         if (rc)
3091                 goto out_free;
3092
3093         /* previous steps guaranteed the syntax is correct */
3094         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3095         if (rc)
3096                 goto out_free;
3097
3098         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3099                 /*
3100                  * for mgs rules, make them effective immediately.
3101                  */
3102                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3103                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3104                                                  &fsdb->fsdb_srpc_gen);
3105         }
3106
3107 out_free:
3108         OBD_FREE(copy, copy_size);
3109         RETURN(rc);
3110 }
3111
3112 struct mgs_srpc_read_data {
3113         struct fs_db   *msrd_fsdb;
3114         int             msrd_skip;
3115 };
3116
3117 static int mgs_srpc_read_handler(const struct lu_env *env,
3118                                  struct llog_handle *llh,
3119                                  struct llog_rec_hdr *rec, void *data)
3120 {
3121         struct mgs_srpc_read_data *msrd = data;
3122         struct cfg_marker         *marker;
3123         struct lustre_cfg         *lcfg = REC_DATA(rec);
3124         char                      *svname, *param;
3125         int                        cfg_len, rc;
3126         ENTRY;
3127
3128         if (rec->lrh_type != OBD_CFG_REC) {
3129                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3130                 RETURN(-EINVAL);
3131         }
3132
3133         cfg_len = REC_DATA_LEN(rec);
3134
3135         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3136         if (rc) {
3137                 CERROR("Insane cfg\n");
3138                 RETURN(rc);
3139         }
3140
3141         if (lcfg->lcfg_command == LCFG_MARKER) {
3142                 marker = lustre_cfg_buf(lcfg, 1);
3143
3144                 if (marker->cm_flags & CM_START &&
3145                     marker->cm_flags & CM_SKIP)
3146                         msrd->msrd_skip = 1;
3147                 if (marker->cm_flags & CM_END)
3148                         msrd->msrd_skip = 0;
3149
3150                 RETURN(0);
3151         }
3152
3153         if (msrd->msrd_skip)
3154                 RETURN(0);
3155
3156         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3157                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3158                 RETURN(0);
3159         }
3160
3161         svname = lustre_cfg_string(lcfg, 0);
3162         if (svname == NULL) {
3163                 CERROR("svname is empty\n");
3164                 RETURN(0);
3165         }
3166
3167         param = lustre_cfg_string(lcfg, 1);
3168         if (param == NULL) {
3169                 CERROR("param is empty\n");
3170                 RETURN(0);
3171         }
3172
3173         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3174         if (rc)
3175                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3176
3177         RETURN(0);
3178 }
3179
3180 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3181                                 struct mgs_device *mgs,
3182                                 struct fs_db *fsdb)
3183 {
3184         struct llog_handle        *llh = NULL;
3185         struct llog_ctxt          *ctxt;
3186         char                      *logname;
3187         struct mgs_srpc_read_data  msrd;
3188         int                        rc;
3189         ENTRY;
3190
3191         /* construct log name */
3192         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3193         if (rc)
3194                 RETURN(rc);
3195
3196         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3197         LASSERT(ctxt != NULL);
3198
3199         if (mgs_log_is_empty(env, mgs, logname))
3200                 GOTO(out, rc = 0);
3201
3202         rc = llog_open(env, ctxt, &llh, NULL, logname,
3203                        LLOG_OPEN_EXISTS);
3204         if (rc < 0) {
3205                 if (rc == -ENOENT)
3206                         rc = 0;
3207                 GOTO(out, rc);
3208         }
3209
3210         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3211         if (rc)
3212                 GOTO(out_close, rc);
3213
3214         if (llog_get_size(llh) <= 1)
3215                 GOTO(out_close, rc = 0);
3216
3217         msrd.msrd_fsdb = fsdb;
3218         msrd.msrd_skip = 0;
3219
3220         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3221                           NULL);
3222
3223 out_close:
3224         llog_close(env, llh);
3225 out:
3226         llog_ctxt_put(ctxt);
3227         name_destroy(&logname);
3228
3229         if (rc)
3230                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3231         RETURN(rc);
3232 }
3233
3234 /* Permanent settings of all parameters by writing into the appropriate
3235  * configuration logs.
3236  * A parameter with null value ("<param>='\0'") means to erase it out of
3237  * the logs.
3238  */
3239 static int mgs_write_log_param(const struct lu_env *env,
3240                                struct mgs_device *mgs, struct fs_db *fsdb,
3241                                struct mgs_target_info *mti, char *ptr)
3242 {
3243         struct mgs_thread_info *mgi = mgs_env_info(env);
3244         char *logname;
3245         char *tmp;
3246         int rc = 0, rc2 = 0;
3247         ENTRY;
3248
3249         /* For various parameter settings, we have to figure out which logs
3250            care about them (e.g. both mdt and client for lov settings) */
3251         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3252
3253         /* The params are stored in MOUNT_DATA_FILE and modified via
3254            tunefs.lustre, or set using lctl conf_param */
3255
3256         /* Processed in lustre_start_mgc */
3257         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3258                 GOTO(end, rc);
3259
3260         /* Processed in ost/mdt */
3261         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3262                 GOTO(end, rc);
3263
3264         /* Processed in mgs_write_log_ost */
3265         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3266                 if (mti->mti_flags & LDD_F_PARAM) {
3267                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3268                                            "changed with tunefs.lustre"
3269                                            "and --writeconf\n", ptr);
3270                         rc = -EPERM;
3271                 }
3272                 GOTO(end, rc);
3273         }
3274
3275         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3276                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3277                 GOTO(end, rc);
3278         }
3279
3280         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3281                 /* Add a failover nidlist */
3282                 rc = 0;
3283                 /* We already processed failovers params for new
3284                    targets in mgs_write_log_target */
3285                 if (mti->mti_flags & LDD_F_PARAM) {
3286                         CDEBUG(D_MGS, "Adding failnode\n");
3287                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3288                 }
3289                 GOTO(end, rc);
3290         }
3291
3292         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3293                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3294                 GOTO(end, rc);
3295         }
3296
3297         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3298                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3299                 GOTO(end, rc);
3300         }
3301
3302         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3303                 /* active=0 means off, anything else means on */
3304                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3305                 int i;
3306
3307                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3308                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3309                                            "be (de)activated.\n",
3310                                            mti->mti_svname);
3311                         GOTO(end, rc = -EINVAL);
3312                 }
3313                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3314                               flag ? "de": "re", mti->mti_svname);
3315                 /* Modify clilov */
3316                 rc = name_create(&logname, mti->mti_fsname, "-client");
3317                 if (rc)
3318                         GOTO(end, rc);
3319                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3320                                 mti->mti_svname, "add osc", flag);
3321                 name_destroy(&logname);
3322                 if (rc)
3323                         goto active_err;
3324                 /* Modify mdtlov */
3325                 /* Add to all MDT logs for CMD */
3326                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3327                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3328                                 continue;
3329                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3330                         if (rc)
3331                                 GOTO(end, rc);
3332                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3333                                         mti->mti_svname, "add osc", flag);
3334                         name_destroy(&logname);
3335                         if (rc)
3336                                 goto active_err;
3337                 }
3338         active_err:
3339                 if (rc) {
3340                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3341                                            "log (%d). No permanent "
3342                                            "changes were made to the "
3343                                            "config log.\n",
3344                                            mti->mti_svname, rc);
3345                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3346                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3347                                                    " because the log"
3348                                                    "is in the old 1.4"
3349                                                    "style. Consider "
3350                                                    " --writeconf to "
3351                                                    "update the logs.\n");
3352                         GOTO(end, rc);
3353                 }
3354                 /* Fall through to osc proc for deactivating live OSC
3355                    on running MDT / clients. */
3356         }
3357         /* Below here, let obd's XXX_process_config methods handle it */
3358
3359         /* All lov. in proc */
3360         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3361                 char *mdtlovname;
3362
3363                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3364                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3365                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3366                                            "set on the MDT, not %s. "
3367                                            "Ignoring.\n",
3368                                            mti->mti_svname);
3369                         GOTO(end, rc = 0);
3370                 }
3371
3372                 /* Modify mdtlov */
3373                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3374                         GOTO(end, rc = -ENODEV);
3375
3376                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3377                                              mti->mti_stripe_index);
3378                 if (rc)
3379                         GOTO(end, rc);
3380                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3381                                   &mgi->mgi_bufs, mdtlovname, ptr);
3382                 name_destroy(&logname);
3383                 name_destroy(&mdtlovname);
3384                 if (rc)
3385                         GOTO(end, rc);
3386
3387                 /* Modify clilov */
3388                 rc = name_create(&logname, mti->mti_fsname, "-client");
3389                 if (rc)
3390                         GOTO(end, rc);
3391                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3392                                   fsdb->fsdb_clilov, ptr);
3393                 name_destroy(&logname);
3394                 GOTO(end, rc);
3395         }
3396
3397         /* All osc., mdc., llite. params in proc */
3398         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3399             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3400             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3401                 char *cname;
3402
3403                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3404                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3405                                            " cannot be modified. Consider"
3406                                            " updating the configuration with"
3407                                            " --writeconf\n",
3408                                            mti->mti_svname);
3409                         GOTO(end, rc = -EINVAL);
3410                 }
3411                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3412                         rc = name_create(&cname, mti->mti_fsname, "-client");
3413                         /* Add the client type to match the obdname in
3414                            class_config_llog_handler */
3415                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3416                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3417                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3418                         rc = name_create(&cname, mti->mti_svname, "-osc");
3419                 } else {
3420                         GOTO(end, rc = -EINVAL);
3421                 }
3422                 if (rc)
3423                         GOTO(end, rc);
3424
3425                 /* Forbid direct update of llite root squash parameters.
3426                  * These parameters are indirectly set via the MDT settings.
3427                  * See (LU-1778) */
3428                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3429                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3430                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3431                         LCONSOLE_ERROR("%s: root squash parameters can only "
3432                                 "be updated through MDT component\n",
3433                                 mti->mti_fsname);
3434                         name_destroy(&cname);
3435                         GOTO(end, rc = -EINVAL);
3436                 }
3437
3438                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3439
3440                 /* Modify client */
3441                 rc = name_create(&logname, mti->mti_fsname, "-client");
3442                 if (rc) {
3443                         name_destroy(&cname);
3444                         GOTO(end, rc);
3445                 }
3446                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3447                                   cname, ptr);
3448
3449                 /* osc params affect the MDT as well */
3450                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3451                         int i;
3452
3453                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3454                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3455                                         continue;
3456                                 name_destroy(&cname);
3457                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3458                                                          fsdb, i);
3459                                 name_destroy(&logname);
3460                                 if (rc)
3461                                         break;
3462                                 rc = name_create_mdt(&logname,
3463                                                      mti->mti_fsname, i);
3464                                 if (rc)
3465                                         break;
3466                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3467                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3468                                                           mti, logname,
3469                                                           &mgi->mgi_bufs,
3470                                                           cname, ptr);
3471                                         if (rc)
3472                                                 break;
3473                                 }
3474                         }
3475                 }
3476                 name_destroy(&logname);
3477                 name_destroy(&cname);
3478                 GOTO(end, rc);
3479         }
3480
3481         /* All mdt. params in proc */
3482         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3483                 int i;
3484                 __u32 idx;
3485
3486                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3487                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3488                             MTI_NAME_MAXLEN) == 0)
3489                         /* device is unspecified completely? */
3490                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3491                 else
3492                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3493                 if (rc < 0)
3494                         goto active_err;
3495                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3496                         goto active_err;
3497                 if (rc & LDD_F_SV_ALL) {
3498                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3499                                 if (!test_bit(i,
3500                                                   fsdb->fsdb_mdt_index_map))
3501                                         continue;
3502                                 rc = name_create_mdt(&logname,
3503                                                 mti->mti_fsname, i);
3504                                 if (rc)
3505                                         goto active_err;
3506                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3507                                                   logname, &mgi->mgi_bufs,
3508                                                   logname, ptr);
3509                                 name_destroy(&logname);
3510                                 if (rc)
3511                                         goto active_err;
3512                         }
3513                 } else {
3514                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3515                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3516                                 LCONSOLE_ERROR("%s: root squash parameters "
3517                                         "cannot be applied to a single MDT\n",
3518                                         mti->mti_fsname);
3519                                 GOTO(end, rc = -EINVAL);
3520                         }
3521                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3522                                           mti->mti_svname, &mgi->mgi_bufs,
3523                                           mti->mti_svname, ptr);
3524                         if (rc)
3525                                 goto active_err;
3526                 }
3527
3528                 /* root squash settings are also applied to llite
3529                  * config log (see LU-1778) */
3530                 if (rc == 0 &&
3531                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3532                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3533                         char *cname;
3534                         char *ptr2;
3535
3536                         rc = name_create(&cname, mti->mti_fsname, "-client");
3537                         if (rc)
3538                                 GOTO(end, rc);
3539                         rc = name_create(&logname, mti->mti_fsname, "-client");
3540                         if (rc) {
3541                                 name_destroy(&cname);
3542                                 GOTO(end, rc);
3543                         }
3544                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3545                         if (rc) {
3546                                 name_destroy(&cname);
3547                                 name_destroy(&logname);
3548                                 GOTO(end, rc);
3549                         }
3550                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3551                                           &mgi->mgi_bufs, cname, ptr2);
3552                         name_destroy(&ptr2);
3553                         name_destroy(&logname);
3554                         name_destroy(&cname);
3555                 }
3556                 GOTO(end, rc);
3557         }
3558
3559         /* All mdd., ost. and osd. params in proc */
3560         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3561             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3562             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3563                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3564                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3565                         GOTO(end, rc = -ENODEV);
3566
3567                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3568                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3569                 GOTO(end, rc);
3570         }
3571
3572         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3573         rc2 = -ENOSYS;
3574
3575 end:
3576         if (rc)
3577                 CERROR("err %d on param '%s'\n", rc, ptr);
3578
3579         RETURN(rc ?: rc2);
3580 }
3581
3582 /* Not implementing automatic failover nid addition at this time. */
3583 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3584                       struct mgs_target_info *mti)
3585 {
3586 #if 0
3587         struct fs_db *fsdb;
3588         int rc;
3589         ENTRY;
3590
3591         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3592         if (rc)
3593                 RETURN(rc);
3594
3595         if (mgs_log_is_empty(obd, mti->mti_svname))
3596                 /* should never happen */
3597                 RETURN(-ENOENT);
3598
3599         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3600
3601         /* FIXME We can just check mti->params to see if we're already in
3602            the failover list.  Modify mti->params for rewriting back at
3603            server_register_target(). */
3604
3605         mutex_lock(&fsdb->fsdb_mutex);
3606         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3607         mutex_unlock(&fsdb->fsdb_mutex);
3608         char    *buf, *params;
3609         int      rc = -EINVAL;
3610
3611         RETURN(rc);
3612 #endif
3613         return 0;
3614 }
3615
3616 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3617                          struct mgs_target_info *mti, struct fs_db *fsdb)
3618 {
3619         char    *buf, *params;
3620         int      rc = -EINVAL;
3621
3622         ENTRY;
3623
3624         /* set/check the new target index */
3625         rc = mgs_set_index(env, mgs, mti);
3626         if (rc < 0)
3627                 RETURN(rc);
3628
3629         if (rc == EALREADY) {
3630                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3631                               mti->mti_stripe_index, mti->mti_svname);
3632                 /* We would like to mark old log sections as invalid
3633                    and add new log sections in the client and mdt logs.
3634                    But if we add new sections, then live clients will
3635                    get repeat setup instructions for already running
3636                    osc's. So don't update the client/mdt logs. */
3637                 mti->mti_flags &= ~LDD_F_UPDATE;
3638                 rc = 0;
3639         }
3640
3641         mutex_lock(&fsdb->fsdb_mutex);
3642
3643         if (mti->mti_flags &
3644             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3645                 /* Generate a log from scratch */
3646                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3647                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3648                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3649                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3650                 } else {
3651                         CERROR("Unknown target type %#x, can't create log for "
3652                                "%s\n", mti->mti_flags, mti->mti_svname);
3653                 }
3654                 if (rc) {
3655                         CERROR("Can't write logs for %s (%d)\n",
3656                                mti->mti_svname, rc);
3657                         GOTO(out_up, rc);
3658                 }
3659         } else {
3660                 /* Just update the params from tunefs in mgs_write_log_params */
3661                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3662                 mti->mti_flags |= LDD_F_PARAM;
3663         }
3664
3665         /* allocate temporary buffer, where class_get_next_param will
3666            make copy of a current  parameter */
3667         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3668         if (buf == NULL)
3669                 GOTO(out_up, rc = -ENOMEM);
3670         params = mti->mti_params;
3671         while (params != NULL) {
3672                 rc = class_get_next_param(&params, buf);
3673                 if (rc) {
3674                         if (rc == 1)
3675                                 /* there is no next parameter, that is
3676                                    not an error */
3677                                 rc = 0;
3678                         break;
3679                 }
3680                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3681                        params, buf);
3682                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3683                 if (rc)
3684                         break;
3685         }
3686
3687         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3688
3689 out_up:
3690         mutex_unlock(&fsdb->fsdb_mutex);
3691         RETURN(rc);
3692 }
3693
3694 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3695 {
3696         struct llog_ctxt        *ctxt;
3697         int                      rc = 0;
3698
3699         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3700         if (ctxt == NULL) {
3701                 CERROR("%s: MGS config context doesn't exist\n",
3702                        mgs->mgs_obd->obd_name);
3703                 rc = -ENODEV;
3704         } else {
3705                 rc = llog_erase(env, ctxt, NULL, name);
3706                 /* llog may not exist */
3707                 if (rc == -ENOENT)
3708                         rc = 0;
3709                 llog_ctxt_put(ctxt);
3710         }
3711
3712         if (rc)
3713                 CERROR("%s: failed to clear log %s: %d\n",
3714                        mgs->mgs_obd->obd_name, name, rc);
3715
3716         return rc;
3717 }
3718
3719 /* erase all logs for the given fs */
3720 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3721 {
3722         struct fs_db *fsdb;
3723         struct list_head log_list;
3724         struct mgs_direntry *dirent, *n;
3725         int rc, len = strlen(fsname);
3726         char *suffix;
3727         ENTRY;
3728
3729         /* Find all the logs in the CONFIGS directory */
3730         rc = class_dentry_readdir(env, mgs, &log_list);
3731         if (rc)
3732                 RETURN(rc);
3733
3734         mutex_lock(&mgs->mgs_mutex);
3735
3736         /* Delete the fs db */
3737         fsdb = mgs_find_fsdb(mgs, fsname);
3738         if (fsdb)
3739                 mgs_free_fsdb(mgs, fsdb);
3740
3741         mutex_unlock(&mgs->mgs_mutex);
3742
3743         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3744                 list_del_init(&dirent->mde_list);
3745                 suffix = strrchr(dirent->mde_name, '-');
3746                 if (suffix != NULL) {
3747                         if ((len == suffix - dirent->mde_name) &&
3748                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3749                                 CDEBUG(D_MGS, "Removing log %s\n",
3750                                        dirent->mde_name);
3751                                 mgs_erase_log(env, mgs, dirent->mde_name);
3752                         }
3753                 }
3754                 mgs_direntry_free(dirent);
3755         }
3756
3757         RETURN(rc);
3758 }
3759
3760 /* list all logs for the given fs */
3761 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3762                   struct obd_ioctl_data *data)
3763 {
3764         struct list_head         log_list;
3765         struct mgs_direntry     *dirent, *n;
3766         char                    *out, *suffix;
3767         int                      l, remains, rc;
3768
3769         ENTRY;
3770
3771         /* Find all the logs in the CONFIGS directory */
3772         rc = class_dentry_readdir(env, mgs, &log_list);
3773         if (rc)
3774                 RETURN(rc);
3775
3776         out = data->ioc_bulk;
3777         remains = data->ioc_inllen1;
3778         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3779                 list_del_init(&dirent->mde_list);
3780                 suffix = strrchr(dirent->mde_name, '-');
3781                 if (suffix != NULL) {
3782                         l = snprintf(out, remains, "config log: $%s\n",
3783                                      dirent->mde_name);
3784                         out += l;
3785                         remains -= l;
3786                 }
3787                 mgs_direntry_free(dirent);
3788                 if (remains <= 0)
3789                         break;
3790         }
3791         RETURN(rc);
3792 }
3793
3794 /* from llog_swab */
3795 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3796 {
3797         int i;
3798         ENTRY;
3799
3800         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3801         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3802
3803         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3804         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3805         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3806         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3807
3808         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3809         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3810                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3811                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3812                                i, lcfg->lcfg_buflens[i],
3813                                lustre_cfg_string(lcfg, i));
3814                 }
3815         EXIT;
3816 }
3817
3818 /* Setup params fsdb and log
3819  */
3820 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3821                           struct fs_db *fsdb)
3822 {
3823         struct llog_handle      *params_llh = NULL;
3824         int                     rc;
3825         ENTRY;
3826
3827         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3828         if (fsdb != NULL) {
3829                 mutex_lock(&fsdb->fsdb_mutex);
3830                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3831                 if (rc == 0)
3832                         rc = record_end_log(env, &params_llh);
3833                 mutex_unlock(&fsdb->fsdb_mutex);
3834         }
3835
3836         RETURN(rc);
3837 }
3838
3839 /* Cleanup params fsdb and log
3840  */
3841 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3842 {
3843         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3844 }
3845
3846 /* Set a permanent (config log) param for a target or fs
3847  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3848  *             buf1 contains the single parameter
3849  */
3850 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3851                  struct lustre_cfg *lcfg, char *fsname)
3852 {
3853         struct fs_db *fsdb;
3854         struct mgs_target_info *mti;
3855         char *devname, *param;
3856         char *ptr;
3857         const char *tmp;
3858         __u32 index;
3859         int rc = 0;
3860         ENTRY;
3861
3862         print_lustre_cfg(lcfg);
3863
3864         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3865         devname = lustre_cfg_string(lcfg, 0);
3866         param = lustre_cfg_string(lcfg, 1);
3867         if (!devname) {
3868                 /* Assume device name embedded in param:
3869                    lustre-OST0000.osc.max_dirty_mb=32 */
3870                 ptr = strchr(param, '.');
3871                 if (ptr) {
3872                         devname = param;
3873                         *ptr = 0;
3874                         param = ptr + 1;
3875                 }
3876         }
3877         if (!devname) {
3878                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3879                 RETURN(-ENOSYS);
3880         }
3881
3882         rc = mgs_parse_devname(devname, fsname, NULL);
3883         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3884                 /* param related to llite isn't allowed to set by OST or MDT */
3885                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3886                                        sizeof(PARAM_LLITE) - 1) == 0)
3887                         RETURN(-EINVAL);
3888         } else {
3889                 /* assume devname is the fsname */
3890                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
3891         }
3892         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3893
3894         rc = mgs_find_or_make_fsdb(env, mgs,
3895                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3896                                    PARAMS_FILENAME : fsname, &fsdb);
3897         if (rc)
3898                 RETURN(rc);
3899
3900         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3901             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3902             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3903                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3904                        "is '%s'\n", fsname, devname);
3905                 mgs_free_fsdb(mgs, fsdb);
3906                 RETURN(-EINVAL);
3907         }
3908
3909         /* Create a fake mti to hold everything */
3910         OBD_ALLOC_PTR(mti);
3911         if (!mti)
3912                 GOTO(out, rc = -ENOMEM);
3913         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3914             >= sizeof(mti->mti_fsname))
3915                 GOTO(out, rc = -E2BIG);
3916         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3917             >= sizeof(mti->mti_svname))
3918                 GOTO(out, rc = -E2BIG);
3919         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3920             >= sizeof(mti->mti_params))
3921                 GOTO(out, rc = -E2BIG);
3922         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3923         if (rc < 0)
3924                 /* Not a valid server; may be only fsname */
3925                 rc = 0;
3926         else
3927                 /* Strip -osc or -mdc suffix from svname */
3928                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3929                                      mti->mti_svname))
3930                         GOTO(out, rc = -EINVAL);
3931         /*
3932          * Revoke lock so everyone updates.  Should be alright if
3933          * someone was already reading while we were updating the logs,
3934          * so we don't really need to hold the lock while we're
3935          * writing (above).
3936          */
3937         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3938                 mti->mti_flags = rc | LDD_F_PARAM2;
3939                 mutex_lock(&fsdb->fsdb_mutex);
3940                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3941                 mutex_unlock(&fsdb->fsdb_mutex);
3942                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3943         } else {
3944                 mti->mti_flags = rc | LDD_F_PARAM;
3945                 mutex_lock(&fsdb->fsdb_mutex);
3946                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3947                 mutex_unlock(&fsdb->fsdb_mutex);
3948                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3949         }
3950
3951 out:
3952         OBD_FREE_PTR(mti);
3953         RETURN(rc);
3954 }
3955
3956 static int mgs_write_log_pool(const struct lu_env *env,
3957                               struct mgs_device *mgs, char *logname,
3958                               struct fs_db *fsdb, char *tgtname,
3959                               enum lcfg_command_type cmd,
3960                               char *fsname, char *poolname,
3961                               char *ostname, char *comment)
3962 {
3963         struct llog_handle *llh = NULL;
3964         int rc;
3965
3966         rc = record_start_log(env, mgs, &llh, logname);
3967         if (rc)
3968                 return rc;
3969         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3970         if (rc)
3971                 goto out;
3972         rc = record_base(env, llh, tgtname, 0, cmd,
3973                          fsname, poolname, ostname, NULL);
3974         if (rc)
3975                 goto out;
3976         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3977 out:
3978         record_end_log(env, &llh);
3979         return rc;
3980 }
3981
3982 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
3983                     enum lcfg_command_type cmd, const char *nodemap_name,
3984                     char *param)
3985 {
3986         lnet_nid_t      nid[2];
3987         __u32           idmap[2];
3988         bool            bool_switch;
3989         __u32           int_id;
3990         int             rc = 0;
3991         ENTRY;
3992
3993         switch (cmd) {
3994         case LCFG_NODEMAP_ADD:
3995                 rc = nodemap_add(nodemap_name);
3996                 break;
3997         case LCFG_NODEMAP_DEL:
3998                 rc = nodemap_del(nodemap_name);
3999                 break;
4000         case LCFG_NODEMAP_ADD_RANGE:
4001                 rc = nodemap_parse_range(param, nid);
4002                 if (rc != 0)
4003                         break;
4004                 rc = nodemap_add_range(nodemap_name, nid);
4005                 break;
4006         case LCFG_NODEMAP_DEL_RANGE:
4007                 rc = nodemap_parse_range(param, nid);
4008                 if (rc != 0)
4009                         break;
4010                 rc = nodemap_del_range(nodemap_name, nid);
4011                 break;
4012         case LCFG_NODEMAP_ADMIN:
4013                 bool_switch = simple_strtoul(param, NULL, 10);
4014                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4015                 break;
4016         case LCFG_NODEMAP_TRUSTED:
4017                 bool_switch = simple_strtoul(param, NULL, 10);
4018                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4019                 break;
4020         case LCFG_NODEMAP_SQUASH_UID:
4021                 int_id = simple_strtoul(param, NULL, 10);
4022                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4023                 break;
4024         case LCFG_NODEMAP_SQUASH_GID:
4025                 int_id = simple_strtoul(param, NULL, 10);
4026                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4027                 break;
4028         case LCFG_NODEMAP_ADD_UIDMAP:
4029         case LCFG_NODEMAP_ADD_GIDMAP:
4030                 rc = nodemap_parse_idmap(param, idmap);
4031                 if (rc != 0)
4032                         break;
4033                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4034                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4035                                                idmap);
4036                 else
4037                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4038                                                idmap);
4039                 break;
4040         case LCFG_NODEMAP_DEL_UIDMAP:
4041         case LCFG_NODEMAP_DEL_GIDMAP:
4042                 rc = nodemap_parse_idmap(param, idmap);
4043                 if (rc != 0)
4044                         break;
4045                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4046                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4047                                                idmap);
4048                 else
4049                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4050                                                idmap);
4051                 break;
4052         default:
4053                 rc = -EINVAL;
4054         }
4055
4056         RETURN(rc);
4057 }
4058
4059 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4060                  enum lcfg_command_type cmd, char *fsname,
4061                  char *poolname, char *ostname)
4062 {
4063         struct fs_db *fsdb;
4064         char *lovname;
4065         char *logname;
4066         char *label = NULL, *canceled_label = NULL;
4067         int label_sz;
4068         struct mgs_target_info *mti = NULL;
4069         int rc, i;
4070         ENTRY;
4071
4072         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4073         if (rc) {
4074                 CERROR("Can't get db for %s\n", fsname);
4075                 RETURN(rc);
4076         }
4077         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4078                 CERROR("%s is not defined\n", fsname);
4079                 mgs_free_fsdb(mgs, fsdb);
4080                 RETURN(-EINVAL);
4081         }
4082
4083         label_sz = 10 + strlen(fsname) + strlen(poolname);
4084
4085         /* check if ostname match fsname */
4086         if (ostname != NULL) {
4087                 char *ptr;
4088
4089                 ptr = strrchr(ostname, '-');
4090                 if ((ptr == NULL) ||
4091                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4092                         RETURN(-EINVAL);
4093                 label_sz += strlen(ostname);
4094         }
4095
4096         OBD_ALLOC(label, label_sz);
4097         if (label == NULL)
4098                 RETURN(-ENOMEM);
4099
4100         switch(cmd) {
4101         case LCFG_POOL_NEW:
4102                 sprintf(label,
4103                         "new %s.%s", fsname, poolname);
4104                 break;
4105         case LCFG_POOL_ADD:
4106                 sprintf(label,
4107                         "add %s.%s.%s", fsname, poolname, ostname);
4108                 break;
4109         case LCFG_POOL_REM:
4110                 OBD_ALLOC(canceled_label, label_sz);
4111                 if (canceled_label == NULL)
4112                         GOTO(out_label, rc = -ENOMEM);
4113                 sprintf(label,
4114                         "rem %s.%s.%s", fsname, poolname, ostname);
4115                 sprintf(canceled_label,
4116                         "add %s.%s.%s", fsname, poolname, ostname);
4117                 break;
4118         case LCFG_POOL_DEL:
4119                 OBD_ALLOC(canceled_label, label_sz);
4120                 if (canceled_label == NULL)
4121                         GOTO(out_label, rc = -ENOMEM);
4122                 sprintf(label,
4123                         "del %s.%s", fsname, poolname);
4124                 sprintf(canceled_label,
4125                         "new %s.%s", fsname, poolname);
4126                 break;
4127         default:
4128                 break;
4129         }
4130
4131         if (canceled_label != NULL) {
4132                 OBD_ALLOC_PTR(mti);
4133                 if (mti == NULL)
4134                         GOTO(out_cancel, rc = -ENOMEM);
4135         }
4136
4137         mutex_lock(&fsdb->fsdb_mutex);
4138         /* write pool def to all MDT logs */
4139         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4140                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4141                         rc = name_create_mdt_and_lov(&logname, &lovname,
4142                                                      fsdb, i);
4143                         if (rc) {
4144                                 mutex_unlock(&fsdb->fsdb_mutex);
4145                                 GOTO(out_mti, rc);
4146                         }
4147                         if (canceled_label != NULL) {
4148                                 strcpy(mti->mti_svname, "lov pool");
4149                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4150                                                 lovname, canceled_label,
4151                                                 CM_SKIP);
4152                         }
4153
4154                         if (rc >= 0)
4155                                 rc = mgs_write_log_pool(env, mgs, logname,
4156                                                         fsdb, lovname, cmd,
4157                                                         fsname, poolname,
4158                                                         ostname, label);
4159                         name_destroy(&logname);
4160                         name_destroy(&lovname);
4161                         if (rc) {
4162                                 mutex_unlock(&fsdb->fsdb_mutex);
4163                                 GOTO(out_mti, rc);
4164                         }
4165                 }
4166         }
4167
4168         rc = name_create(&logname, fsname, "-client");
4169         if (rc) {
4170                 mutex_unlock(&fsdb->fsdb_mutex);
4171                 GOTO(out_mti, rc);
4172         }
4173         if (canceled_label != NULL) {
4174                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4175                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4176                 if (rc < 0) {
4177                         mutex_unlock(&fsdb->fsdb_mutex);
4178                         name_destroy(&logname);
4179                         GOTO(out_mti, rc);
4180                 }
4181         }
4182
4183         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4184                                 cmd, fsname, poolname, ostname, label);
4185         mutex_unlock(&fsdb->fsdb_mutex);
4186         name_destroy(&logname);
4187         /* request for update */
4188         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4189
4190         EXIT;
4191 out_mti:
4192         if (mti != NULL)
4193                 OBD_FREE_PTR(mti);
4194 out_cancel:
4195         if (canceled_label != NULL)
4196                 OBD_FREE(canceled_label, label_sz);
4197 out_label:
4198         OBD_FREE(label, label_sz);
4199         return rc;
4200 }