Whamcloud - gitweb
LU-4665 utils: lfs setstripe to specify OSTs
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(NULL);
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(NULL);
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351         } else {
352                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
353                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
354                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
355                         CERROR("No memory for index maps\n");
356                         GOTO(err, rc = -ENOMEM);
357                 }
358
359                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
360                 if (rc)
361                         GOTO(err, rc);
362                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
363                 if (rc)
364                         GOTO(err, rc);
365
366                 /* initialise data for NID table */
367                 mgs_ir_init_fs(env, mgs, fsdb);
368
369                 lproc_mgs_add_live(mgs, fsdb);
370         }
371
372         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
373
374         RETURN(fsdb);
375 err:
376         if (fsdb->fsdb_ost_index_map)
377                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
378         if (fsdb->fsdb_mdt_index_map)
379                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
380         name_destroy(&fsdb->fsdb_clilov);
381         name_destroy(&fsdb->fsdb_clilmv);
382         OBD_FREE_PTR(fsdb);
383         RETURN(NULL);
384 }
385
386 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
387 {
388         /* wait for anyone with the sem */
389         mutex_lock(&fsdb->fsdb_mutex);
390         lproc_mgs_del_live(mgs, fsdb);
391         list_del(&fsdb->fsdb_list);
392
393         /* deinitialize fsr */
394         mgs_ir_fini_fs(mgs, fsdb);
395
396         if (fsdb->fsdb_ost_index_map)
397                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
398         if (fsdb->fsdb_mdt_index_map)
399                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
400         name_destroy(&fsdb->fsdb_clilov);
401         name_destroy(&fsdb->fsdb_clilmv);
402         mgs_free_fsdb_srpc(fsdb);
403         mutex_unlock(&fsdb->fsdb_mutex);
404         OBD_FREE_PTR(fsdb);
405 }
406
407 int mgs_init_fsdb_list(struct mgs_device *mgs)
408 {
409         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
410         return 0;
411 }
412
413 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
414 {
415         struct fs_db *fsdb;
416         struct list_head *tmp, *tmp2;
417
418         mutex_lock(&mgs->mgs_mutex);
419         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
420                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
421                 mgs_free_fsdb(mgs, fsdb);
422         }
423         mutex_unlock(&mgs->mgs_mutex);
424         return 0;
425 }
426
427 int mgs_find_or_make_fsdb(const struct lu_env *env,
428                           struct mgs_device *mgs, char *name,
429                           struct fs_db **dbh)
430 {
431         struct fs_db *fsdb;
432         int rc = 0;
433
434         ENTRY;
435         mutex_lock(&mgs->mgs_mutex);
436         fsdb = mgs_find_fsdb(mgs, name);
437         if (fsdb) {
438                 mutex_unlock(&mgs->mgs_mutex);
439                 *dbh = fsdb;
440                 RETURN(0);
441         }
442
443         CDEBUG(D_MGS, "Creating new db\n");
444         fsdb = mgs_new_fsdb(env, mgs, name);
445         /* lock fsdb_mutex until the db is loaded from llogs */
446         if (fsdb)
447                 mutex_lock(&fsdb->fsdb_mutex);
448         mutex_unlock(&mgs->mgs_mutex);
449         if (!fsdb)
450                 RETURN(-ENOMEM);
451
452         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
453                 /* populate the db from the client llog */
454                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
455                 if (rc) {
456                         CERROR("Can't get db from client log %d\n", rc);
457                         GOTO(out_free, rc);
458                 }
459         }
460
461         /* populate srpc rules from params llog */
462         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
463         if (rc) {
464                 CERROR("Can't get db from params log %d\n", rc);
465                 GOTO(out_free, rc);
466         }
467
468         mutex_unlock(&fsdb->fsdb_mutex);
469         *dbh = fsdb;
470
471         RETURN(0);
472
473 out_free:
474         mutex_unlock(&fsdb->fsdb_mutex);
475         mgs_free_fsdb(mgs, fsdb);
476         return rc;
477 }
478
479 /* 1 = index in use
480    0 = index unused
481    -1= empty client log */
482 int mgs_check_index(const struct lu_env *env,
483                     struct mgs_device *mgs,
484                     struct mgs_target_info *mti)
485 {
486         struct fs_db *fsdb;
487         void *imap;
488         int rc = 0;
489         ENTRY;
490
491         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
492
493         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
494         if (rc) {
495                 CERROR("Can't get db for %s\n", mti->mti_fsname);
496                 RETURN(rc);
497         }
498
499         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
500                 RETURN(-1);
501
502         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
503                 imap = fsdb->fsdb_ost_index_map;
504         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
505                 imap = fsdb->fsdb_mdt_index_map;
506         else
507                 RETURN(-EINVAL);
508
509         if (test_bit(mti->mti_stripe_index, imap))
510                 RETURN(1);
511         RETURN(0);
512 }
513
514 static __inline__ int next_index(void *index_map, int map_len)
515 {
516         int i;
517         for (i = 0; i < map_len * 8; i++)
518                 if (!test_bit(i, index_map)) {
519                          return i;
520                  }
521         CERROR("max index %d exceeded.\n", i);
522         return -1;
523 }
524
525 /* Return codes:
526         0  newly marked as in use
527         <0 err
528         +EALREADY for update of an old index */
529 static int mgs_set_index(const struct lu_env *env,
530                          struct mgs_device *mgs,
531                          struct mgs_target_info *mti)
532 {
533         struct fs_db *fsdb;
534         void *imap;
535         int rc = 0;
536         ENTRY;
537
538         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
539         if (rc) {
540                 CERROR("Can't get db for %s\n", mti->mti_fsname);
541                 RETURN(rc);
542         }
543
544         mutex_lock(&fsdb->fsdb_mutex);
545         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
546                 imap = fsdb->fsdb_ost_index_map;
547         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
548                 imap = fsdb->fsdb_mdt_index_map;
549         } else {
550                 GOTO(out_up, rc = -EINVAL);
551         }
552
553         if (mti->mti_flags & LDD_F_NEED_INDEX) {
554                 rc = next_index(imap, INDEX_MAP_SIZE);
555                 if (rc == -1)
556                         GOTO(out_up, rc = -ERANGE);
557                 mti->mti_stripe_index = rc;
558                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
559                         fsdb->fsdb_mdt_count ++;
560         }
561
562         /* the last index(0xffff) is reserved for default value. */
563         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
564                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
565                                    "but index must be less than %u.\n",
566                                    mti->mti_svname, mti->mti_stripe_index,
567                                    INDEX_MAP_SIZE * 8 - 1);
568                 GOTO(out_up, rc = -ERANGE);
569         }
570
571         if (test_bit(mti->mti_stripe_index, imap)) {
572                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
573                     !(mti->mti_flags & LDD_F_WRITECONF)) {
574                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
575                                            "%d, but that index is already in "
576                                            "use. Use --writeconf to force\n",
577                                            mti->mti_svname,
578                                            mti->mti_stripe_index);
579                         GOTO(out_up, rc = -EADDRINUSE);
580                 } else {
581                         CDEBUG(D_MGS, "Server %s updating index %d\n",
582                                mti->mti_svname, mti->mti_stripe_index);
583                         GOTO(out_up, rc = EALREADY);
584                 }
585         }
586
587         set_bit(mti->mti_stripe_index, imap);
588         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
589         mutex_unlock(&fsdb->fsdb_mutex);
590         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
591                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
592
593         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
594                mti->mti_stripe_index);
595
596         RETURN(0);
597 out_up:
598         mutex_unlock(&fsdb->fsdb_mutex);
599         return rc;
600 }
601
602 struct mgs_modify_lookup {
603         struct cfg_marker mml_marker;
604         int               mml_modified;
605 };
606
607 static int mgs_modify_handler(const struct lu_env *env,
608                               struct llog_handle *llh,
609                               struct llog_rec_hdr *rec, void *data)
610 {
611         struct mgs_modify_lookup *mml = data;
612         struct cfg_marker *marker;
613         struct lustre_cfg *lcfg = REC_DATA(rec);
614         int cfg_len = REC_DATA_LEN(rec);
615         int rc;
616         ENTRY;
617
618         if (rec->lrh_type != OBD_CFG_REC) {
619                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
620                 RETURN(-EINVAL);
621         }
622
623         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
624         if (rc) {
625                 CERROR("Insane cfg\n");
626                 RETURN(rc);
627         }
628
629         /* We only care about markers */
630         if (lcfg->lcfg_command != LCFG_MARKER)
631                 RETURN(0);
632
633         marker = lustre_cfg_buf(lcfg, 1);
634         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
635             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
636             !(marker->cm_flags & CM_SKIP)) {
637                 /* Found a non-skipped marker match */
638                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
639                        rec->lrh_index, marker->cm_step,
640                        marker->cm_flags, mml->mml_marker.cm_flags,
641                        marker->cm_tgtname, marker->cm_comment);
642                 /* Overwrite the old marker llog entry */
643                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
644                 marker->cm_flags |= mml->mml_marker.cm_flags;
645                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
646                 rc = llog_write(env, llh, rec, rec->lrh_index);
647                 if (!rc)
648                          mml->mml_modified++;
649         }
650
651         RETURN(rc);
652 }
653
654 /**
655  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
656  * Return code:
657  * 0 - modified successfully,
658  * 1 - no modification was done
659  * negative - error
660  */
661 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
662                       struct fs_db *fsdb, struct mgs_target_info *mti,
663                       char *logname, char *devname, char *comment, int flags)
664 {
665         struct llog_handle *loghandle;
666         struct llog_ctxt *ctxt;
667         struct mgs_modify_lookup *mml;
668         int rc;
669
670         ENTRY;
671
672         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
673         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
674                flags);
675
676         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
677         LASSERT(ctxt != NULL);
678         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
679         if (rc < 0) {
680                 if (rc == -ENOENT)
681                         rc = 0;
682                 GOTO(out_pop, rc);
683         }
684
685         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
686         if (rc)
687                 GOTO(out_close, rc);
688
689         if (llog_get_size(loghandle) <= 1)
690                 GOTO(out_close, rc = 0);
691
692         OBD_ALLOC_PTR(mml);
693         if (!mml)
694                 GOTO(out_close, rc = -ENOMEM);
695         if (strlcpy(mml->mml_marker.cm_comment, comment,
696                     sizeof(mml->mml_marker.cm_comment)) >=
697             sizeof(mml->mml_marker.cm_comment))
698                 GOTO(out_free, rc = -E2BIG);
699         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
700                     sizeof(mml->mml_marker.cm_tgtname)) >=
701             sizeof(mml->mml_marker.cm_tgtname))
702                 GOTO(out_free, rc = -E2BIG);
703         /* Modify mostly means cancel */
704         mml->mml_marker.cm_flags = flags;
705         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
706         mml->mml_modified = 0;
707         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
708                           NULL);
709         if (!rc && !mml->mml_modified)
710                 rc = 1;
711
712 out_free:
713         OBD_FREE_PTR(mml);
714
715 out_close:
716         llog_close(env, loghandle);
717 out_pop:
718         if (rc < 0)
719                 CERROR("%s: modify %s/%s failed: rc = %d\n",
720                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
721         llog_ctxt_put(ctxt);
722         RETURN(rc);
723 }
724
725 /** This structure is passed to mgs_replace_handler */
726 struct mgs_replace_uuid_lookup {
727         /* Nids are replaced for this target device */
728         struct mgs_target_info target;
729         /* Temporary modified llog */
730         struct llog_handle *temp_llh;
731         /* Flag is set if in target block*/
732         int in_target_device;
733         /* Nids already added. Just skip (multiple nids) */
734         int device_nids_added;
735         /* Flag is set if this block should not be copied */
736         int skip_it;
737 };
738
739 /**
740  * Check: a) if block should be skipped
741  * b) is it target block
742  *
743  * \param[in] lcfg
744  * \param[in] mrul
745  *
746  * \retval 0 should not to be skipped
747  * \retval 1 should to be skipped
748  */
749 static int check_markers(struct lustre_cfg *lcfg,
750                          struct mgs_replace_uuid_lookup *mrul)
751 {
752          struct cfg_marker *marker;
753
754         /* Track markers. Find given device */
755         if (lcfg->lcfg_command == LCFG_MARKER) {
756                 marker = lustre_cfg_buf(lcfg, 1);
757                 /* Clean llog from records marked as CM_EXCLUDE.
758                    CM_SKIP records are used for "active" command
759                    and can be restored if needed */
760                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
761                     (CM_EXCLUDE | CM_START)) {
762                         mrul->skip_it = 1;
763                         return 1;
764                 }
765
766                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
767                     (CM_EXCLUDE | CM_END)) {
768                         mrul->skip_it = 0;
769                         return 1;
770                 }
771
772                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
773                         LASSERT(!(marker->cm_flags & CM_START) ||
774                                 !(marker->cm_flags & CM_END));
775                         if (marker->cm_flags & CM_START) {
776                                 mrul->in_target_device = 1;
777                                 mrul->device_nids_added = 0;
778                         } else if (marker->cm_flags & CM_END)
779                                 mrul->in_target_device = 0;
780                 }
781         }
782
783         return 0;
784 }
785
786 static int record_base(const struct lu_env *env, struct llog_handle *llh,
787                      char *cfgname, lnet_nid_t nid, int cmd,
788                      char *s1, char *s2, char *s3, char *s4)
789 {
790         struct mgs_thread_info  *mgi = mgs_env_info(env);
791         struct llog_cfg_rec     *lcr;
792         int rc;
793
794         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
795                cmd, s1, s2, s3, s4);
796
797         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
798         if (s1)
799                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
800         if (s2)
801                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
802         if (s3)
803                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
804         if (s4)
805                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
806
807         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
808         if (lcr == NULL)
809                 return -ENOMEM;
810
811         lcr->lcr_cfg.lcfg_nid = nid;
812         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
813
814         lustre_cfg_rec_free(lcr);
815
816         if (rc < 0)
817                 CDEBUG(D_MGS,
818                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
819                        cfgname, cmd, s1, s2, s3, s4, rc);
820         return rc;
821 }
822
823 static inline int record_add_uuid(const struct lu_env *env,
824                                   struct llog_handle *llh,
825                                   uint64_t nid, char *uuid)
826 {
827         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
828 }
829
830 static inline int record_add_conn(const struct lu_env *env,
831                                   struct llog_handle *llh,
832                                   char *devname, char *uuid)
833 {
834         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
835 }
836
837 static inline int record_attach(const struct lu_env *env,
838                                 struct llog_handle *llh, char *devname,
839                                 char *type, char *uuid)
840 {
841         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
842 }
843
844 static inline int record_setup(const struct lu_env *env,
845                                struct llog_handle *llh, char *devname,
846                                char *s1, char *s2, char *s3, char *s4)
847 {
848         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
849 }
850
851 /**
852  * \retval <0 record processing error
853  * \retval n record is processed. No need copy original one.
854  * \retval 0 record is not processed.
855  */
856 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
857                            struct mgs_replace_uuid_lookup *mrul)
858 {
859         int nids_added = 0;
860         lnet_nid_t nid;
861         char *ptr;
862         int rc;
863
864         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
865                 /* LCFG_ADD_UUID command found. Let's skip original command
866                    and add passed nids */
867                 ptr = mrul->target.mti_params;
868                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
869                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
870                                "device %s\n", libcfs_nid2str(nid),
871                                 mrul->target.mti_params,
872                                 mrul->target.mti_svname);
873                         rc = record_add_uuid(env,
874                                              mrul->temp_llh, nid,
875                                              mrul->target.mti_params);
876                         if (!rc)
877                                 nids_added++;
878                 }
879
880                 if (nids_added == 0) {
881                         CERROR("No new nids were added, nid %s with uuid %s, "
882                                "device %s\n", libcfs_nid2str(nid),
883                                mrul->target.mti_params,
884                                mrul->target.mti_svname);
885                         RETURN(-ENXIO);
886                 } else {
887                         mrul->device_nids_added = 1;
888                 }
889
890                 return nids_added;
891         }
892
893         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
894                 /* LCFG_SETUP command found. UUID should be changed */
895                 rc = record_setup(env,
896                                   mrul->temp_llh,
897                                   /* devname the same */
898                                   lustre_cfg_string(lcfg, 0),
899                                   /* s1 is not changed */
900                                   lustre_cfg_string(lcfg, 1),
901                                   /* new uuid should be
902                                   the full nidlist */
903                                   mrul->target.mti_params,
904                                   /* s3 is not changed */
905                                   lustre_cfg_string(lcfg, 3),
906                                   /* s4 is not changed */
907                                   lustre_cfg_string(lcfg, 4));
908                 return rc ? rc : 1;
909         }
910
911         /* Another commands in target device block */
912         return 0;
913 }
914
915 /**
916  * Handler that called for every record in llog.
917  * Records are processed in order they placed in llog.
918  *
919  * \param[in] llh       log to be processed
920  * \param[in] rec       current record
921  * \param[in] data      mgs_replace_uuid_lookup structure
922  *
923  * \retval 0    success
924  */
925 static int mgs_replace_handler(const struct lu_env *env,
926                                struct llog_handle *llh,
927                                struct llog_rec_hdr *rec,
928                                void *data)
929 {
930         struct mgs_replace_uuid_lookup *mrul;
931         struct lustre_cfg *lcfg = REC_DATA(rec);
932         int cfg_len = REC_DATA_LEN(rec);
933         int rc;
934         ENTRY;
935
936         mrul = (struct mgs_replace_uuid_lookup *)data;
937
938         if (rec->lrh_type != OBD_CFG_REC) {
939                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
940                        rec->lrh_type, lcfg->lcfg_command,
941                        lustre_cfg_string(lcfg, 0),
942                        lustre_cfg_string(lcfg, 1));
943                 RETURN(-EINVAL);
944         }
945
946         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
947         if (rc) {
948                 /* Do not copy any invalidated records */
949                 GOTO(skip_out, rc = 0);
950         }
951
952         rc = check_markers(lcfg, mrul);
953         if (rc || mrul->skip_it)
954                 GOTO(skip_out, rc = 0);
955
956         /* Write to new log all commands outside target device block */
957         if (!mrul->in_target_device)
958                 GOTO(copy_out, rc = 0);
959
960         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
961            (failover nids) for this target, assuming that if then
962            primary is changing then so is the failover */
963         if (mrul->device_nids_added &&
964             (lcfg->lcfg_command == LCFG_ADD_UUID ||
965              lcfg->lcfg_command == LCFG_ADD_CONN))
966                 GOTO(skip_out, rc = 0);
967
968         rc = process_command(env, lcfg, mrul);
969         if (rc < 0)
970                 RETURN(rc);
971
972         if (rc)
973                 RETURN(0);
974 copy_out:
975         /* Record is placed in temporary llog as is */
976         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
977
978         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
979                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
980                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
981         RETURN(rc);
982
983 skip_out:
984         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
985                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
986                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
987         RETURN(rc);
988 }
989
990 static int mgs_log_is_empty(const struct lu_env *env,
991                             struct mgs_device *mgs, char *name)
992 {
993         struct llog_ctxt        *ctxt;
994         int                      rc;
995
996         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
997         LASSERT(ctxt != NULL);
998
999         rc = llog_is_empty(env, ctxt, name);
1000         llog_ctxt_put(ctxt);
1001         return rc;
1002 }
1003
1004 static int mgs_replace_nids_log(const struct lu_env *env,
1005                                 struct obd_device *mgs, struct fs_db *fsdb,
1006                                 char *logname, char *devname, char *nids)
1007 {
1008         struct llog_handle *orig_llh, *backup_llh;
1009         struct llog_ctxt *ctxt;
1010         struct mgs_replace_uuid_lookup *mrul;
1011         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1012         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1013         char *backup;
1014         int rc, rc2;
1015         ENTRY;
1016
1017         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1018
1019         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1020         LASSERT(ctxt != NULL);
1021
1022         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1023                 /* Log is empty. Nothing to replace */
1024                 GOTO(out_put, rc = 0);
1025         }
1026
1027         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1028         if (backup == NULL)
1029                 GOTO(out_put, rc = -ENOMEM);
1030
1031         sprintf(backup, "%s.bak", logname);
1032
1033         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1034         if (rc == 0) {
1035                 /* Now erase original log file. Connections are not allowed.
1036                    Backup is already saved */
1037                 rc = llog_erase(env, ctxt, NULL, logname);
1038                 if (rc < 0)
1039                         GOTO(out_free, rc);
1040         } else if (rc != -ENOENT) {
1041                 CERROR("%s: can't make backup for %s: rc = %d\n",
1042                        mgs->obd_name, logname, rc);
1043                 GOTO(out_free,rc);
1044         }
1045
1046         /* open local log */
1047         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1048         if (rc)
1049                 GOTO(out_restore, rc);
1050
1051         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1052         if (rc)
1053                 GOTO(out_closel, rc);
1054
1055         /* open backup llog */
1056         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1057                        LLOG_OPEN_EXISTS);
1058         if (rc)
1059                 GOTO(out_closel, rc);
1060
1061         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1062         if (rc)
1063                 GOTO(out_close, rc);
1064
1065         if (llog_get_size(backup_llh) <= 1)
1066                 GOTO(out_close, rc = 0);
1067
1068         OBD_ALLOC_PTR(mrul);
1069         if (!mrul)
1070                 GOTO(out_close, rc = -ENOMEM);
1071         /* devname is only needed information to replace UUID records */
1072         strlcpy(mrul->target.mti_svname, devname,
1073                 sizeof(mrul->target.mti_svname));
1074         /* parse nids later */
1075         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1076         /* Copy records to this temporary llog */
1077         mrul->temp_llh = orig_llh;
1078
1079         rc = llog_process(env, backup_llh, mgs_replace_handler,
1080                           (void *)mrul, NULL);
1081         OBD_FREE_PTR(mrul);
1082 out_close:
1083         rc2 = llog_close(NULL, backup_llh);
1084         if (!rc)
1085                 rc = rc2;
1086 out_closel:
1087         rc2 = llog_close(NULL, orig_llh);
1088         if (!rc)
1089                 rc = rc2;
1090
1091 out_restore:
1092         if (rc) {
1093                 CERROR("%s: llog should be restored: rc = %d\n",
1094                        mgs->obd_name, rc);
1095                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1096                                   logname);
1097                 if (rc2 < 0)
1098                         CERROR("%s: can't restore backup %s: rc = %d\n",
1099                                mgs->obd_name, logname, rc2);
1100         }
1101
1102 out_free:
1103         OBD_FREE(backup, strlen(backup) + 1);
1104
1105 out_put:
1106         llog_ctxt_put(ctxt);
1107
1108         if (rc)
1109                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1110                        mgs->obd_name, logname, rc);
1111
1112         RETURN(rc);
1113 }
1114
1115 /**
1116  * Parse device name and get file system name and/or device index
1117  *
1118  * \param[in]   devname device name (ex. lustre-MDT0000)
1119  * \param[out]  fsname  file system name(optional)
1120  * \param[out]  index   device index(optional)
1121  *
1122  * \retval 0    success
1123  */
1124 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1125 {
1126         int rc;
1127         ENTRY;
1128
1129         /* Extract fsname */
1130         if (fsname) {
1131                 rc = server_name2fsname(devname, fsname, NULL);
1132                 if (rc < 0) {
1133                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1134                                devname);
1135                         RETURN(-EINVAL);
1136                 }
1137         }
1138
1139         if (index) {
1140                 rc = server_name2index(devname, index, NULL);
1141                 if (rc < 0) {
1142                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1143                                devname);
1144                         RETURN(-EINVAL);
1145                 }
1146         }
1147
1148         RETURN(0);
1149 }
1150
1151 /* This is only called during replace_nids */
1152 static int only_mgs_is_running(struct obd_device *mgs_obd)
1153 {
1154         /* TDB: Is global variable with devices count exists? */
1155         int num_devices = get_devices_count();
1156         int num_exports = 0;
1157         struct obd_export *exp;
1158
1159         spin_lock(&mgs_obd->obd_dev_lock);
1160         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1161                 /* skip self export */
1162                 if (exp == mgs_obd->obd_self_export)
1163                         continue;
1164                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1165                         continue;
1166
1167                 ++num_exports;
1168
1169                 CERROR("%s: node %s still connected during replace_nids "
1170                        "connect_flags:%llx\n",
1171                        mgs_obd->obd_name,
1172                        libcfs_nid2str(exp->exp_nid_stats->nid),
1173                        exp_connect_flags(exp));
1174
1175         }
1176         spin_unlock(&mgs_obd->obd_dev_lock);
1177
1178         /* osd, MGS and MGC + self_export
1179            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1180         return (num_devices <= 3) && (num_exports == 0);
1181 }
1182
1183 static int name_create_mdt(char **logname, char *fsname, int i)
1184 {
1185         char mdt_index[9];
1186
1187         sprintf(mdt_index, "-MDT%04x", i);
1188         return name_create(logname, fsname, mdt_index);
1189 }
1190
1191 /**
1192  * Replace nids for \a device to \a nids values
1193  *
1194  * \param obd           MGS obd device
1195  * \param devname       nids need to be replaced for this device
1196  * (ex. lustre-OST0000)
1197  * \param nids          nids list (ex. nid1,nid2,nid3)
1198  *
1199  * \retval 0    success
1200  */
1201 int mgs_replace_nids(const struct lu_env *env,
1202                      struct mgs_device *mgs,
1203                      char *devname, char *nids)
1204 {
1205         /* Assume fsname is part of device name */
1206         char fsname[MTI_NAME_MAXLEN];
1207         int rc;
1208         __u32 index;
1209         char *logname;
1210         struct fs_db *fsdb;
1211         unsigned int i;
1212         int conn_state;
1213         struct obd_device *mgs_obd = mgs->mgs_obd;
1214         ENTRY;
1215
1216         /* We can only change NIDs if no other nodes are connected */
1217         spin_lock(&mgs_obd->obd_dev_lock);
1218         conn_state = mgs_obd->obd_no_conn;
1219         mgs_obd->obd_no_conn = 1;
1220         spin_unlock(&mgs_obd->obd_dev_lock);
1221
1222         /* We can not change nids if not only MGS is started */
1223         if (!only_mgs_is_running(mgs_obd)) {
1224                 CERROR("Only MGS is allowed to be started\n");
1225                 GOTO(out, rc = -EINPROGRESS);
1226         }
1227
1228         /* Get fsname and index*/
1229         rc = mgs_parse_devname(devname, fsname, &index);
1230         if (rc)
1231                 GOTO(out, rc);
1232
1233         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1234         if (rc) {
1235                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1236                 GOTO(out, rc);
1237         }
1238
1239         /* Process client llogs */
1240         name_create(&logname, fsname, "-client");
1241         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1242         name_destroy(&logname);
1243         if (rc) {
1244                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1245                        fsname, devname, rc);
1246                 GOTO(out, rc);
1247         }
1248
1249         /* Process MDT llogs */
1250         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1251                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1252                         continue;
1253                 name_create_mdt(&logname, fsname, i);
1254                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1255                 name_destroy(&logname);
1256                 if (rc)
1257                         GOTO(out, rc);
1258         }
1259
1260 out:
1261         spin_lock(&mgs_obd->obd_dev_lock);
1262         mgs_obd->obd_no_conn = conn_state;
1263         spin_unlock(&mgs_obd->obd_dev_lock);
1264
1265         RETURN(rc);
1266 }
1267
1268 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1269                             char *devname, struct lov_desc *desc)
1270 {
1271         struct mgs_thread_info  *mgi = mgs_env_info(env);
1272         struct llog_cfg_rec     *lcr;
1273         int rc;
1274
1275         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1276         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1277         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1278         if (lcr == NULL)
1279                 return -ENOMEM;
1280
1281         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1282         lustre_cfg_rec_free(lcr);
1283         return rc;
1284 }
1285
1286 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1287                             char *devname, struct lmv_desc *desc)
1288 {
1289         struct mgs_thread_info  *mgi = mgs_env_info(env);
1290         struct llog_cfg_rec     *lcr;
1291         int rc;
1292
1293         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1294         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1295         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1296         if (lcr == NULL)
1297                 return -ENOMEM;
1298
1299         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1300         lustre_cfg_rec_free(lcr);
1301         return rc;
1302 }
1303
1304 static inline int record_mdc_add(const struct lu_env *env,
1305                                  struct llog_handle *llh,
1306                                  char *logname, char *mdcuuid,
1307                                  char *mdtuuid, char *index,
1308                                  char *gen)
1309 {
1310         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1311                            mdtuuid,index,gen,mdcuuid);
1312 }
1313
1314 static inline int record_lov_add(const struct lu_env *env,
1315                                  struct llog_handle *llh,
1316                                  char *lov_name, char *ost_uuid,
1317                                  char *index, char *gen)
1318 {
1319         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1320                            ost_uuid, index, gen, 0);
1321 }
1322
1323 static inline int record_mount_opt(const struct lu_env *env,
1324                                    struct llog_handle *llh,
1325                                    char *profile, char *lov_name,
1326                                    char *mdc_name)
1327 {
1328         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1329                            profile,lov_name,mdc_name,0);
1330 }
1331
1332 static int record_marker(const struct lu_env *env,
1333                          struct llog_handle *llh,
1334                          struct fs_db *fsdb, __u32 flags,
1335                          char *tgtname, char *comment)
1336 {
1337         struct mgs_thread_info *mgi = mgs_env_info(env);
1338         struct llog_cfg_rec *lcr;
1339         int rc;
1340         int cplen = 0;
1341
1342         if (flags & CM_START)
1343                 fsdb->fsdb_gen++;
1344         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1345         mgi->mgi_marker.cm_flags = flags;
1346         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1347         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1348                         sizeof(mgi->mgi_marker.cm_tgtname));
1349         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1350                 return -E2BIG;
1351         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1352                         sizeof(mgi->mgi_marker.cm_comment));
1353         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1354                 return -E2BIG;
1355         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1356         mgi->mgi_marker.cm_canceltime = 0;
1357         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1358         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1359                             sizeof(mgi->mgi_marker));
1360         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1361         if (lcr == NULL)
1362                 return -ENOMEM;
1363
1364         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1365         lustre_cfg_rec_free(lcr);
1366         return rc;
1367 }
1368
1369 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1370                             struct llog_handle **llh, char *name)
1371 {
1372         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1373         struct llog_ctxt        *ctxt;
1374         int                      rc = 0;
1375         ENTRY;
1376
1377         if (*llh)
1378                 GOTO(out, rc = -EBUSY);
1379
1380         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1381         if (!ctxt)
1382                 GOTO(out, rc = -ENODEV);
1383         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1384
1385         rc = llog_open_create(env, ctxt, llh, NULL, name);
1386         if (rc)
1387                 GOTO(out_ctxt, rc);
1388         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1389         if (rc)
1390                 llog_close(env, *llh);
1391 out_ctxt:
1392         llog_ctxt_put(ctxt);
1393 out:
1394         if (rc) {
1395                 CERROR("%s: can't start log %s: rc = %d\n",
1396                        mgs->mgs_obd->obd_name, name, rc);
1397                 *llh = NULL;
1398         }
1399         RETURN(rc);
1400 }
1401
1402 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1403 {
1404         int rc;
1405
1406         rc = llog_close(env, *llh);
1407         *llh = NULL;
1408
1409         return rc;
1410 }
1411
1412 /******************** config "macros" *********************/
1413
1414 /* write an lcfg directly into a log (with markers) */
1415 static int mgs_write_log_direct(const struct lu_env *env,
1416                                 struct mgs_device *mgs, struct fs_db *fsdb,
1417                                 char *logname, struct llog_cfg_rec *lcr,
1418                                 char *devname, char *comment)
1419 {
1420         struct llog_handle *llh = NULL;
1421         int rc;
1422
1423         ENTRY;
1424
1425         rc = record_start_log(env, mgs, &llh, logname);
1426         if (rc)
1427                 RETURN(rc);
1428
1429         /* FIXME These should be a single journal transaction */
1430         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1431         if (rc)
1432                 GOTO(out_end, rc);
1433         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1434         if (rc)
1435                 GOTO(out_end, rc);
1436         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1437         if (rc)
1438                 GOTO(out_end, rc);
1439 out_end:
1440         record_end_log(env, &llh);
1441         RETURN(rc);
1442 }
1443
1444 /* write the lcfg in all logs for the given fs */
1445 int mgs_write_log_direct_all(const struct lu_env *env, struct mgs_device *mgs,
1446                              struct fs_db *fsdb, struct mgs_target_info *mti,
1447                              struct llog_cfg_rec *lcr, char *devname,
1448                              char *comment, int server_only)
1449 {
1450         struct list_head         log_list;
1451         struct mgs_direntry     *dirent, *n;
1452         char                    *fsname = mti->mti_fsname;
1453         int                      rc = 0, len = strlen(fsname);
1454
1455         ENTRY;
1456         /* Find all the logs in the CONFIGS directory */
1457         rc = class_dentry_readdir(env, mgs, &log_list);
1458         if (rc)
1459                 RETURN(rc);
1460
1461         /* Could use fsdb index maps instead of directory listing */
1462         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1463                 list_del_init(&dirent->mde_list);
1464                 /* don't write to sptlrpc rule log */
1465                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1466                         goto next;
1467
1468                 /* caller wants write server logs only */
1469                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1470                         goto next;
1471
1472                 if (strncmp(fsname, dirent->mde_name, len) != 0)
1473                         goto next;
1474
1475                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1476                 /* Erase any old settings of this same parameter */
1477                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1478                                 devname, comment, CM_SKIP);
1479                 if (rc < 0)
1480                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1481                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1482                 if (lcr == NULL)
1483                         goto next;
1484                 /* Write the new one */
1485                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1486                                           lcr, devname, comment);
1487                 if (rc != 0)
1488                         CERROR("%s: writing log %s: rc = %d\n",
1489                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1490 next:
1491                 mgs_direntry_free(dirent);
1492         }
1493
1494         RETURN(rc);
1495 }
1496
1497 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1498                                     struct mgs_device *mgs,
1499                                     struct fs_db *fsdb,
1500                                     struct mgs_target_info *mti,
1501                                     int index, char *logname);
1502 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1503                                     struct mgs_device *mgs,
1504                                     struct fs_db *fsdb,
1505                                     struct mgs_target_info *mti,
1506                                     char *logname, char *suffix, char *lovname,
1507                                     enum lustre_sec_part sec_part, int flags);
1508 static int name_create_mdt_and_lov(char **logname, char **lovname,
1509                                    struct fs_db *fsdb, int i);
1510
1511 static int add_param(char *params, char *key, char *val)
1512 {
1513         char *start = params + strlen(params);
1514         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1515         int keylen = 0;
1516
1517         if (key != NULL)
1518                 keylen = strlen(key);
1519         if (start + 1 + keylen + strlen(val) >= end) {
1520                 CERROR("params are too long: %s %s%s\n",
1521                        params, key != NULL ? key : "", val);
1522                 return -EINVAL;
1523         }
1524
1525         sprintf(start, " %s%s", key != NULL ? key : "", val);
1526         return 0;
1527 }
1528
1529 /**
1530  * Walk through client config log record and convert the related records
1531  * into the target.
1532  **/
1533 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1534                                          struct llog_handle *llh,
1535                                          struct llog_rec_hdr *rec, void *data)
1536 {
1537         struct mgs_device *mgs;
1538         struct obd_device *obd;
1539         struct mgs_target_info *mti, *tmti;
1540         struct fs_db *fsdb;
1541         int cfg_len = rec->lrh_len;
1542         char *cfg_buf = (char*) (rec + 1);
1543         struct lustre_cfg *lcfg;
1544         int rc = 0;
1545         struct llog_handle *mdt_llh = NULL;
1546         static int got_an_osc_or_mdc = 0;
1547         /* 0: not found any osc/mdc;
1548            1: found osc;
1549            2: found mdc;
1550         */
1551         static int last_step = -1;
1552         int cplen = 0;
1553
1554         ENTRY;
1555
1556         mti = ((struct temp_comp*)data)->comp_mti;
1557         tmti = ((struct temp_comp*)data)->comp_tmti;
1558         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1559         obd = ((struct temp_comp *)data)->comp_obd;
1560         mgs = lu2mgs_dev(obd->obd_lu_dev);
1561         LASSERT(mgs);
1562
1563         if (rec->lrh_type != OBD_CFG_REC) {
1564                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1565                 RETURN(-EINVAL);
1566         }
1567
1568         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1569         if (rc) {
1570                 CERROR("Insane cfg\n");
1571                 RETURN(rc);
1572         }
1573
1574         lcfg = (struct lustre_cfg *)cfg_buf;
1575
1576         if (lcfg->lcfg_command == LCFG_MARKER) {
1577                 struct cfg_marker *marker;
1578                 marker = lustre_cfg_buf(lcfg, 1);
1579                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1580                     (marker->cm_flags & CM_START) &&
1581                      !(marker->cm_flags & CM_SKIP)) {
1582                         got_an_osc_or_mdc = 1;
1583                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1584                                         sizeof(tmti->mti_svname));
1585                         if (cplen >= sizeof(tmti->mti_svname))
1586                                 RETURN(-E2BIG);
1587                         rc = record_start_log(env, mgs, &mdt_llh,
1588                                               mti->mti_svname);
1589                         if (rc)
1590                                 RETURN(rc);
1591                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1592                                            mti->mti_svname, "add osc(copied)");
1593                         record_end_log(env, &mdt_llh);
1594                         last_step = marker->cm_step;
1595                         RETURN(rc);
1596                 }
1597                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1598                     (marker->cm_flags & CM_END) &&
1599                      !(marker->cm_flags & CM_SKIP)) {
1600                         LASSERT(last_step == marker->cm_step);
1601                         last_step = -1;
1602                         got_an_osc_or_mdc = 0;
1603                         memset(tmti, 0, sizeof(*tmti));
1604                         rc = record_start_log(env, mgs, &mdt_llh,
1605                                               mti->mti_svname);
1606                         if (rc)
1607                                 RETURN(rc);
1608                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1609                                            mti->mti_svname, "add osc(copied)");
1610                         record_end_log(env, &mdt_llh);
1611                         RETURN(rc);
1612                 }
1613                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1614                     (marker->cm_flags & CM_START) &&
1615                      !(marker->cm_flags & CM_SKIP)) {
1616                         got_an_osc_or_mdc = 2;
1617                         last_step = marker->cm_step;
1618                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1619                                strlen(marker->cm_tgtname));
1620
1621                         RETURN(rc);
1622                 }
1623                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1624                     (marker->cm_flags & CM_END) &&
1625                      !(marker->cm_flags & CM_SKIP)) {
1626                         LASSERT(last_step == marker->cm_step);
1627                         last_step = -1;
1628                         got_an_osc_or_mdc = 0;
1629                         memset(tmti, 0, sizeof(*tmti));
1630                         RETURN(rc);
1631                 }
1632         }
1633
1634         if (got_an_osc_or_mdc == 0 || last_step < 0)
1635                 RETURN(rc);
1636
1637         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1638                 uint64_t nodenid = lcfg->lcfg_nid;
1639
1640                 if (strlen(tmti->mti_uuid) == 0) {
1641                         /* target uuid not set, this config record is before
1642                          * LCFG_SETUP, this nid is one of target node nid.
1643                          */
1644                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1645                         tmti->mti_nid_count++;
1646                 } else {
1647                         /* failover node nid */
1648                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1649                                        libcfs_nid2str(nodenid));
1650                 }
1651
1652                 RETURN(rc);
1653         }
1654
1655         if (lcfg->lcfg_command == LCFG_SETUP) {
1656                 char *target;
1657
1658                 target = lustre_cfg_string(lcfg, 1);
1659                 memcpy(tmti->mti_uuid, target, strlen(target));
1660                 RETURN(rc);
1661         }
1662
1663         /* ignore client side sptlrpc_conf_log */
1664         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1665                 RETURN(rc);
1666
1667         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1668                 int index;
1669
1670                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1671                         RETURN (-EINVAL);
1672
1673                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1674                        strlen(mti->mti_fsname));
1675                 tmti->mti_stripe_index = index;
1676
1677                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1678                                               mti->mti_stripe_index,
1679                                               mti->mti_svname);
1680                 memset(tmti, 0, sizeof(*tmti));
1681                 RETURN(rc);
1682         }
1683
1684         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1685                 int index;
1686                 char mdt_index[9];
1687                 char *logname, *lovname;
1688
1689                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1690                                              mti->mti_stripe_index);
1691                 if (rc)
1692                         RETURN(rc);
1693                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1694
1695                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1696                         name_destroy(&logname);
1697                         name_destroy(&lovname);
1698                         RETURN(-EINVAL);
1699                 }
1700
1701                 tmti->mti_stripe_index = index;
1702                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1703                                          mdt_index, lovname,
1704                                          LUSTRE_SP_MDT, 0);
1705                 name_destroy(&logname);
1706                 name_destroy(&lovname);
1707                 RETURN(rc);
1708         }
1709         RETURN(rc);
1710 }
1711
1712 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1713 /* stealed from mgs_get_fsdb_from_llog*/
1714 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1715                                               struct mgs_device *mgs,
1716                                               char *client_name,
1717                                               struct temp_comp* comp)
1718 {
1719         struct llog_handle *loghandle;
1720         struct mgs_target_info *tmti;
1721         struct llog_ctxt *ctxt;
1722         int rc;
1723
1724         ENTRY;
1725
1726         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1727         LASSERT(ctxt != NULL);
1728
1729         OBD_ALLOC_PTR(tmti);
1730         if (tmti == NULL)
1731                 GOTO(out_ctxt, rc = -ENOMEM);
1732
1733         comp->comp_tmti = tmti;
1734         comp->comp_obd = mgs->mgs_obd;
1735
1736         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1737                        LLOG_OPEN_EXISTS);
1738         if (rc < 0) {
1739                 if (rc == -ENOENT)
1740                         rc = 0;
1741                 GOTO(out_pop, rc);
1742         }
1743
1744         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1745         if (rc)
1746                 GOTO(out_close, rc);
1747
1748         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1749                                   (void *)comp, NULL, false);
1750         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1751 out_close:
1752         llog_close(env, loghandle);
1753 out_pop:
1754         OBD_FREE_PTR(tmti);
1755 out_ctxt:
1756         llog_ctxt_put(ctxt);
1757         RETURN(rc);
1758 }
1759
1760 /* lmv is the second thing for client logs */
1761 /* copied from mgs_write_log_lov. Please refer to that.  */
1762 static int mgs_write_log_lmv(const struct lu_env *env,
1763                              struct mgs_device *mgs,
1764                              struct fs_db *fsdb,
1765                              struct mgs_target_info *mti,
1766                              char *logname, char *lmvname)
1767 {
1768         struct llog_handle *llh = NULL;
1769         struct lmv_desc *lmvdesc;
1770         char *uuid;
1771         int rc = 0;
1772         ENTRY;
1773
1774         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1775
1776         OBD_ALLOC_PTR(lmvdesc);
1777         if (lmvdesc == NULL)
1778                 RETURN(-ENOMEM);
1779         lmvdesc->ld_active_tgt_count = 0;
1780         lmvdesc->ld_tgt_count = 0;
1781         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1782         uuid = (char *)lmvdesc->ld_uuid.uuid;
1783
1784         rc = record_start_log(env, mgs, &llh, logname);
1785         if (rc)
1786                 GOTO(out_free, rc);
1787         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1788         if (rc)
1789                 GOTO(out_end, rc);
1790         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1791         if (rc)
1792                 GOTO(out_end, rc);
1793         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1794         if (rc)
1795                 GOTO(out_end, rc);
1796         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1797         if (rc)
1798                 GOTO(out_end, rc);
1799 out_end:
1800         record_end_log(env, &llh);
1801 out_free:
1802         OBD_FREE_PTR(lmvdesc);
1803         RETURN(rc);
1804 }
1805
1806 /* lov is the first thing in the mdt and client logs */
1807 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1808                              struct fs_db *fsdb, struct mgs_target_info *mti,
1809                              char *logname, char *lovname)
1810 {
1811         struct llog_handle *llh = NULL;
1812         struct lov_desc *lovdesc;
1813         char *uuid;
1814         int rc = 0;
1815         ENTRY;
1816
1817         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1818
1819         /*
1820         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1821         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1822               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1823         */
1824
1825         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1826         OBD_ALLOC_PTR(lovdesc);
1827         if (lovdesc == NULL)
1828                 RETURN(-ENOMEM);
1829         lovdesc->ld_magic = LOV_DESC_MAGIC;
1830         lovdesc->ld_tgt_count = 0;
1831         /* Defaults.  Can be changed later by lcfg config_param */
1832         lovdesc->ld_default_stripe_count = 1;
1833         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1834         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1835         lovdesc->ld_default_stripe_offset = -1;
1836         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1837         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1838         /* can these be the same? */
1839         uuid = (char *)lovdesc->ld_uuid.uuid;
1840
1841         /* This should always be the first entry in a log.
1842         rc = mgs_clear_log(obd, logname); */
1843         rc = record_start_log(env, mgs, &llh, logname);
1844         if (rc)
1845                 GOTO(out_free, rc);
1846         /* FIXME these should be a single journal transaction */
1847         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1848         if (rc)
1849                 GOTO(out_end, rc);
1850         rc = record_attach(env, llh, lovname, "lov", uuid);
1851         if (rc)
1852                 GOTO(out_end, rc);
1853         rc = record_lov_setup(env, llh, lovname, lovdesc);
1854         if (rc)
1855                 GOTO(out_end, rc);
1856         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1857         if (rc)
1858                 GOTO(out_end, rc);
1859         EXIT;
1860 out_end:
1861         record_end_log(env, &llh);
1862 out_free:
1863         OBD_FREE_PTR(lovdesc);
1864         return rc;
1865 }
1866
1867 /* add failnids to open log */
1868 static int mgs_write_log_failnids(const struct lu_env *env,
1869                                   struct mgs_target_info *mti,
1870                                   struct llog_handle *llh,
1871                                   char *cliname)
1872 {
1873         char *failnodeuuid = NULL;
1874         char *ptr = mti->mti_params;
1875         lnet_nid_t nid;
1876         int rc = 0;
1877
1878         /*
1879         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1880         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1881         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1882         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1883         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1884         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1885         */
1886
1887         /*
1888          * Pull failnid info out of params string, which may contain something
1889          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1890          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1891          * etc.  However, convert_hostnames() should have caught those.
1892          */
1893         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1894                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1895                         if (failnodeuuid == NULL) {
1896                                 /* We don't know the failover node name,
1897                                    so just use the first nid as the uuid */
1898                                 rc = name_create(&failnodeuuid,
1899                                                  libcfs_nid2str(nid), "");
1900                                 if (rc)
1901                                         return rc;
1902                         }
1903                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1904                                "client %s\n", libcfs_nid2str(nid),
1905                                failnodeuuid, cliname);
1906                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1907                         /*
1908                          * If *ptr is ':', we have added all NIDs for
1909                          * failnodeuuid.
1910                          */
1911                         if (*ptr == ':') {
1912                                 rc = record_add_conn(env, llh, cliname,
1913                                                      failnodeuuid);
1914                                 name_destroy(&failnodeuuid);
1915                                 failnodeuuid = NULL;
1916                         }
1917                 }
1918                 if (failnodeuuid) {
1919                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1920                         name_destroy(&failnodeuuid);
1921                         failnodeuuid = NULL;
1922                 }
1923         }
1924
1925         return rc;
1926 }
1927
1928 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1929                                     struct mgs_device *mgs,
1930                                     struct fs_db *fsdb,
1931                                     struct mgs_target_info *mti,
1932                                     char *logname, char *lmvname)
1933 {
1934         struct llog_handle *llh = NULL;
1935         char *mdcname = NULL;
1936         char *nodeuuid = NULL;
1937         char *mdcuuid = NULL;
1938         char *lmvuuid = NULL;
1939         char index[6];
1940         int i, rc;
1941         ENTRY;
1942
1943         if (mgs_log_is_empty(env, mgs, logname)) {
1944                 CERROR("log is empty! Logical error\n");
1945                 RETURN(-EINVAL);
1946         }
1947
1948         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1949                mti->mti_svname, logname, lmvname);
1950
1951         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1952         if (rc)
1953                 RETURN(rc);
1954         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1955         if (rc)
1956                 GOTO(out_free, rc);
1957         rc = name_create(&mdcuuid, mdcname, "_UUID");
1958         if (rc)
1959                 GOTO(out_free, rc);
1960         rc = name_create(&lmvuuid, lmvname, "_UUID");
1961         if (rc)
1962                 GOTO(out_free, rc);
1963
1964         rc = record_start_log(env, mgs, &llh, logname);
1965         if (rc)
1966                 GOTO(out_free, rc);
1967         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1968                            "add mdc");
1969         if (rc)
1970                 GOTO(out_end, rc);
1971         for (i = 0; i < mti->mti_nid_count; i++) {
1972                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1973                        libcfs_nid2str(mti->mti_nids[i]));
1974
1975                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1976                 if (rc)
1977                         GOTO(out_end, rc);
1978         }
1979
1980         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1981         if (rc)
1982                 GOTO(out_end, rc);
1983         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1984         if (rc)
1985                 GOTO(out_end, rc);
1986         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1987         if (rc)
1988                 GOTO(out_end, rc);
1989         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1990         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1991                             index, "1");
1992         if (rc)
1993                 GOTO(out_end, rc);
1994         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1995                            "add mdc");
1996         if (rc)
1997                 GOTO(out_end, rc);
1998 out_end:
1999         record_end_log(env, &llh);
2000 out_free:
2001         name_destroy(&lmvuuid);
2002         name_destroy(&mdcuuid);
2003         name_destroy(&mdcname);
2004         name_destroy(&nodeuuid);
2005         RETURN(rc);
2006 }
2007
2008 static inline int name_create_lov(char **lovname, char *mdtname,
2009                                   struct fs_db *fsdb, int index)
2010 {
2011         /* COMPAT_180 */
2012         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2013                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2014         else
2015                 return name_create(lovname, mdtname, "-mdtlov");
2016 }
2017
2018 static int name_create_mdt_and_lov(char **logname, char **lovname,
2019                                    struct fs_db *fsdb, int i)
2020 {
2021         int rc;
2022
2023         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2024         if (rc)
2025                 return rc;
2026         /* COMPAT_180 */
2027         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2028                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2029         else
2030                 rc = name_create(lovname, *logname, "-mdtlov");
2031         if (rc) {
2032                 name_destroy(logname);
2033                 *logname = NULL;
2034         }
2035         return rc;
2036 }
2037
2038 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2039                                       struct fs_db *fsdb, int i)
2040 {
2041         char suffix[16];
2042
2043         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2044                 sprintf(suffix, "-osc");
2045         else
2046                 sprintf(suffix, "-osc-MDT%04x", i);
2047         return name_create(oscname, ostname, suffix);
2048 }
2049
2050 /* add new mdc to already existent MDS */
2051 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2052                                     struct mgs_device *mgs,
2053                                     struct fs_db *fsdb,
2054                                     struct mgs_target_info *mti,
2055                                     int mdt_index, char *logname)
2056 {
2057         struct llog_handle      *llh = NULL;
2058         char    *nodeuuid = NULL;
2059         char    *ospname = NULL;
2060         char    *lovuuid = NULL;
2061         char    *mdtuuid = NULL;
2062         char    *svname = NULL;
2063         char    *mdtname = NULL;
2064         char    *lovname = NULL;
2065         char    index_str[16];
2066         int     i, rc;
2067
2068         ENTRY;
2069         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2070                 CERROR("log is empty! Logical error\n");
2071                 RETURN (-EINVAL);
2072         }
2073
2074         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2075                logname);
2076
2077         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2078         if (rc)
2079                 RETURN(rc);
2080
2081         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2082         if (rc)
2083                 GOTO(out_destory, rc);
2084
2085         rc = name_create(&svname, mdtname, "-osp");
2086         if (rc)
2087                 GOTO(out_destory, rc);
2088
2089         sprintf(index_str, "-MDT%04x", mdt_index);
2090         rc = name_create(&ospname, svname, index_str);
2091         if (rc)
2092                 GOTO(out_destory, rc);
2093
2094         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2095         if (rc)
2096                 GOTO(out_destory, rc);
2097
2098         rc = name_create(&lovuuid, lovname, "_UUID");
2099         if (rc)
2100                 GOTO(out_destory, rc);
2101
2102         rc = name_create(&mdtuuid, mdtname, "_UUID");
2103         if (rc)
2104                 GOTO(out_destory, rc);
2105
2106         rc = record_start_log(env, mgs, &llh, logname);
2107         if (rc)
2108                 GOTO(out_destory, rc);
2109
2110         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2111                            "add osp");
2112         if (rc)
2113                 GOTO(out_destory, rc);
2114
2115         for (i = 0; i < mti->mti_nid_count; i++) {
2116                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2117                        libcfs_nid2str(mti->mti_nids[i]));
2118                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2119                 if (rc)
2120                         GOTO(out_end, rc);
2121         }
2122
2123         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2124         if (rc)
2125                 GOTO(out_end, rc);
2126
2127         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2128                           NULL, NULL);
2129         if (rc)
2130                 GOTO(out_end, rc);
2131
2132         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2133         if (rc)
2134                 GOTO(out_end, rc);
2135
2136         /* Add mdc(osp) to lod */
2137         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2138                  mti->mti_stripe_index);
2139         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2140                          index_str, "1", NULL);
2141         if (rc)
2142                 GOTO(out_end, rc);
2143
2144         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2145         if (rc)
2146                 GOTO(out_end, rc);
2147
2148 out_end:
2149         record_end_log(env, &llh);
2150
2151 out_destory:
2152         name_destroy(&mdtuuid);
2153         name_destroy(&lovuuid);
2154         name_destroy(&lovname);
2155         name_destroy(&ospname);
2156         name_destroy(&svname);
2157         name_destroy(&nodeuuid);
2158         name_destroy(&mdtname);
2159         RETURN(rc);
2160 }
2161
2162 static int mgs_write_log_mdt0(const struct lu_env *env,
2163                               struct mgs_device *mgs,
2164                               struct fs_db *fsdb,
2165                               struct mgs_target_info *mti)
2166 {
2167         char *log = mti->mti_svname;
2168         struct llog_handle *llh = NULL;
2169         char *uuid, *lovname;
2170         char mdt_index[6];
2171         char *ptr = mti->mti_params;
2172         int rc = 0, failout = 0;
2173         ENTRY;
2174
2175         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2176         if (uuid == NULL)
2177                 RETURN(-ENOMEM);
2178
2179         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2180                 failout = (strncmp(ptr, "failout", 7) == 0);
2181
2182         rc = name_create(&lovname, log, "-mdtlov");
2183         if (rc)
2184                 GOTO(out_free, rc);
2185         if (mgs_log_is_empty(env, mgs, log)) {
2186                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2187                 if (rc)
2188                         GOTO(out_lod, rc);
2189         }
2190
2191         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2192
2193         rc = record_start_log(env, mgs, &llh, log);
2194         if (rc)
2195                 GOTO(out_lod, rc);
2196
2197         /* add MDT itself */
2198
2199         /* FIXME this whole fn should be a single journal transaction */
2200         sprintf(uuid, "%s_UUID", log);
2201         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2202         if (rc)
2203                 GOTO(out_lod, rc);
2204         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2205         if (rc)
2206                 GOTO(out_end, rc);
2207         rc = record_mount_opt(env, llh, log, lovname, NULL);
2208         if (rc)
2209                 GOTO(out_end, rc);
2210         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2211                         failout ? "n" : "f");
2212         if (rc)
2213                 GOTO(out_end, rc);
2214         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2215         if (rc)
2216                 GOTO(out_end, rc);
2217 out_end:
2218         record_end_log(env, &llh);
2219 out_lod:
2220         name_destroy(&lovname);
2221 out_free:
2222         OBD_FREE(uuid, sizeof(struct obd_uuid));
2223         RETURN(rc);
2224 }
2225
2226 /* envelope method for all layers log */
2227 static int mgs_write_log_mdt(const struct lu_env *env,
2228                              struct mgs_device *mgs,
2229                              struct fs_db *fsdb,
2230                              struct mgs_target_info *mti)
2231 {
2232         struct mgs_thread_info *mgi = mgs_env_info(env);
2233         struct llog_handle *llh = NULL;
2234         char *cliname;
2235         int rc, i = 0;
2236         ENTRY;
2237
2238         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2239
2240         if (mti->mti_uuid[0] == '\0') {
2241                 /* Make up our own uuid */
2242                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2243                          "%s_UUID", mti->mti_svname);
2244         }
2245
2246         /* add mdt */
2247         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2248         if (rc)
2249                 RETURN(rc);
2250         /* Append the mdt info to the client log */
2251         rc = name_create(&cliname, mti->mti_fsname, "-client");
2252         if (rc)
2253                 RETURN(rc);
2254
2255         if (mgs_log_is_empty(env, mgs, cliname)) {
2256                 /* Start client log */
2257                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2258                                        fsdb->fsdb_clilov);
2259                 if (rc)
2260                         GOTO(out_free, rc);
2261                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2262                                        fsdb->fsdb_clilmv);
2263                 if (rc)
2264                         GOTO(out_free, rc);
2265         }
2266
2267         /*
2268         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2269         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2270         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2271         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2272         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2273         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2274         */
2275
2276                 /* copy client info about lov/lmv */
2277                 mgi->mgi_comp.comp_mti = mti;
2278                 mgi->mgi_comp.comp_fsdb = fsdb;
2279
2280                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2281                                                         &mgi->mgi_comp);
2282                 if (rc)
2283                         GOTO(out_free, rc);
2284                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2285                                               fsdb->fsdb_clilmv);
2286                 if (rc)
2287                         GOTO(out_free, rc);
2288
2289                 /* add mountopts */
2290                 rc = record_start_log(env, mgs, &llh, cliname);
2291                 if (rc)
2292                         GOTO(out_free, rc);
2293
2294                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2295                                    "mount opts");
2296                 if (rc)
2297                         GOTO(out_end, rc);
2298                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2299                                       fsdb->fsdb_clilmv);
2300                 if (rc)
2301                         GOTO(out_end, rc);
2302                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2303                                    "mount opts");
2304
2305         if (rc)
2306                 GOTO(out_end, rc);
2307
2308         /* for_all_existing_mdt except current one */
2309         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2310                 if (i !=  mti->mti_stripe_index &&
2311                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2312                         char *logname;
2313
2314                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2315                         if (rc)
2316                                 GOTO(out_end, rc);
2317
2318                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2319                                                       i, logname);
2320                         name_destroy(&logname);
2321                         if (rc)
2322                                 GOTO(out_end, rc);
2323                 }
2324         }
2325 out_end:
2326         record_end_log(env, &llh);
2327 out_free:
2328         name_destroy(&cliname);
2329         RETURN(rc);
2330 }
2331
2332 /* Add the ost info to the client/mdt lov */
2333 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2334                                     struct mgs_device *mgs, struct fs_db *fsdb,
2335                                     struct mgs_target_info *mti,
2336                                     char *logname, char *suffix, char *lovname,
2337                                     enum lustre_sec_part sec_part, int flags)
2338 {
2339         struct llog_handle *llh = NULL;
2340         char *nodeuuid = NULL;
2341         char *oscname = NULL;
2342         char *oscuuid = NULL;
2343         char *lovuuid = NULL;
2344         char *svname = NULL;
2345         char index[6];
2346         int i, rc;
2347
2348         ENTRY;
2349         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2350                mti->mti_svname, logname);
2351
2352         if (mgs_log_is_empty(env, mgs, logname)) {
2353                 CERROR("log is empty! Logical error\n");
2354                 RETURN (-EINVAL);
2355         }
2356
2357         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2358         if (rc)
2359                 RETURN(rc);
2360         rc = name_create(&svname, mti->mti_svname, "-osc");
2361         if (rc)
2362                 GOTO(out_free, rc);
2363
2364         /* for the system upgraded from old 1.8, keep using the old osc naming
2365          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2366         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2367                 rc = name_create(&oscname, svname, "");
2368         else
2369                 rc = name_create(&oscname, svname, suffix);
2370         if (rc)
2371                 GOTO(out_free, rc);
2372
2373         rc = name_create(&oscuuid, oscname, "_UUID");
2374         if (rc)
2375                 GOTO(out_free, rc);
2376         rc = name_create(&lovuuid, lovname, "_UUID");
2377         if (rc)
2378                 GOTO(out_free, rc);
2379
2380
2381         /*
2382         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2383         multihomed (#4)
2384         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2385         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2386         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2387         failover (#6,7)
2388         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2389         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2390         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2391         */
2392
2393         rc = record_start_log(env, mgs, &llh, logname);
2394         if (rc)
2395                 GOTO(out_free, rc);
2396
2397         /* FIXME these should be a single journal transaction */
2398         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2399                            "add osc");
2400         if (rc)
2401                 GOTO(out_end, rc);
2402
2403         /* NB: don't change record order, because upon MDT steal OSC config
2404          * from client, it treats all nids before LCFG_SETUP as target nids
2405          * (multiple interfaces), while nids after as failover node nids.
2406          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2407          */
2408         for (i = 0; i < mti->mti_nid_count; i++) {
2409                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2410                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2411                 if (rc)
2412                         GOTO(out_end, rc);
2413         }
2414         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2415         if (rc)
2416                 GOTO(out_end, rc);
2417         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2418         if (rc)
2419                 GOTO(out_end, rc);
2420         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2421         if (rc)
2422                 GOTO(out_end, rc);
2423
2424         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2425
2426         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2427         if (rc)
2428                 GOTO(out_end, rc);
2429         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2430                            "add osc");
2431         if (rc)
2432                 GOTO(out_end, rc);
2433 out_end:
2434         record_end_log(env, &llh);
2435 out_free:
2436         name_destroy(&lovuuid);
2437         name_destroy(&oscuuid);
2438         name_destroy(&oscname);
2439         name_destroy(&svname);
2440         name_destroy(&nodeuuid);
2441         RETURN(rc);
2442 }
2443
2444 static int mgs_write_log_ost(const struct lu_env *env,
2445                              struct mgs_device *mgs, struct fs_db *fsdb,
2446                              struct mgs_target_info *mti)
2447 {
2448         struct llog_handle *llh = NULL;
2449         char *logname, *lovname;
2450         char *ptr = mti->mti_params;
2451         int rc, flags = 0, failout = 0, i;
2452         ENTRY;
2453
2454         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2455
2456         /* The ost startup log */
2457
2458         /* If the ost log already exists, that means that someone reformatted
2459            the ost and it called target_add again. */
2460         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2461                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2462                                    "exists, yet the server claims it never "
2463                                    "registered. It may have been reformatted, "
2464                                    "or the index changed. writeconf the MDT to "
2465                                    "regenerate all logs.\n", mti->mti_svname);
2466                 RETURN(-EALREADY);
2467         }
2468
2469         /*
2470         attach obdfilter ost1 ost1_UUID
2471         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2472         */
2473         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2474                 failout = (strncmp(ptr, "failout", 7) == 0);
2475         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2476         if (rc)
2477                 RETURN(rc);
2478         /* FIXME these should be a single journal transaction */
2479         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2480         if (rc)
2481                 GOTO(out_end, rc);
2482         if (*mti->mti_uuid == '\0')
2483                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2484                          "%s_UUID", mti->mti_svname);
2485         rc = record_attach(env, llh, mti->mti_svname,
2486                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2487         if (rc)
2488                 GOTO(out_end, rc);
2489         rc = record_setup(env, llh, mti->mti_svname,
2490                           "dev"/*ignored*/, "type"/*ignored*/,
2491                           failout ? "n" : "f", 0/*options*/);
2492         if (rc)
2493                 GOTO(out_end, rc);
2494         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2495         if (rc)
2496                 GOTO(out_end, rc);
2497 out_end:
2498         record_end_log(env, &llh);
2499         if (rc)
2500                 RETURN(rc);
2501         /* We also have to update the other logs where this osc is part of
2502            the lov */
2503
2504         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2505                 /* If we're upgrading, the old mdt log already has our
2506                    entry. Let's do a fake one for fun. */
2507                 /* Note that we can't add any new failnids, since we don't
2508                    know the old osc names. */
2509                 flags = CM_SKIP | CM_UPGRADE146;
2510
2511         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2512                 /* If the update flag isn't set, don't update client/mdt
2513                    logs. */
2514                 flags |= CM_SKIP;
2515                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2516                               "the MDT first to regenerate it.\n",
2517                               mti->mti_svname);
2518         }
2519
2520         /* Add ost to all MDT lov defs */
2521         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2522                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2523                         char mdt_index[9];
2524
2525                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2526                                                      i);
2527                         if (rc)
2528                                 RETURN(rc);
2529                         sprintf(mdt_index, "-MDT%04x", i);
2530                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2531                                                       logname, mdt_index,
2532                                                       lovname, LUSTRE_SP_MDT,
2533                                                       flags);
2534                         name_destroy(&logname);
2535                         name_destroy(&lovname);
2536                         if (rc)
2537                                 RETURN(rc);
2538                 }
2539         }
2540
2541         /* Append ost info to the client log */
2542         rc = name_create(&logname, mti->mti_fsname, "-client");
2543         if (rc)
2544                 RETURN(rc);
2545         if (mgs_log_is_empty(env, mgs, logname)) {
2546                 /* Start client log */
2547                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2548                                        fsdb->fsdb_clilov);
2549                 if (rc)
2550                         GOTO(out_free, rc);
2551                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2552                                        fsdb->fsdb_clilmv);
2553                 if (rc)
2554                         GOTO(out_free, rc);
2555         }
2556         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2557                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2558 out_free:
2559         name_destroy(&logname);
2560         RETURN(rc);
2561 }
2562
2563 static __inline__ int mgs_param_empty(char *ptr)
2564 {
2565         char *tmp;
2566
2567         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2568                 return 1;
2569         return 0;
2570 }
2571
2572 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2573                                           struct mgs_device *mgs,
2574                                           struct fs_db *fsdb,
2575                                           struct mgs_target_info *mti,
2576                                           char *logname, char *cliname)
2577 {
2578         int rc;
2579         struct llog_handle *llh = NULL;
2580
2581         if (mgs_param_empty(mti->mti_params)) {
2582                 /* Remove _all_ failnids */
2583                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2584                                 mti->mti_svname, "add failnid", CM_SKIP);
2585                 return rc < 0 ? rc : 0;
2586         }
2587
2588         /* Otherwise failover nids are additive */
2589         rc = record_start_log(env, mgs, &llh, logname);
2590         if (rc)
2591                 return rc;
2592                 /* FIXME this should be a single journal transaction */
2593         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2594                            "add failnid");
2595         if (rc)
2596                 goto out_end;
2597         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2598         if (rc)
2599                 goto out_end;
2600         rc = record_marker(env, llh, fsdb, CM_END,
2601                            mti->mti_svname, "add failnid");
2602 out_end:
2603         record_end_log(env, &llh);
2604         return rc;
2605 }
2606
2607
2608 /* Add additional failnids to an existing log.
2609    The mdc/osc must have been added to logs first */
2610 /* tcp nids must be in dotted-quad ascii -
2611    we can't resolve hostnames from the kernel. */
2612 static int mgs_write_log_add_failnid(const struct lu_env *env,
2613                                      struct mgs_device *mgs,
2614                                      struct fs_db *fsdb,
2615                                      struct mgs_target_info *mti)
2616 {
2617         char *logname, *cliname;
2618         int rc;
2619         ENTRY;
2620
2621         /* FIXME we currently can't erase the failnids
2622          * given when a target first registers, since they aren't part of
2623          * an "add uuid" stanza */
2624
2625         /* Verify that we know about this target */
2626         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2627                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2628                                    "yet. It must be started before failnids "
2629                                    "can be added.\n", mti->mti_svname);
2630                 RETURN(-ENOENT);
2631         }
2632
2633         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2634         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2635                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2636         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2637                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2638         } else {
2639                 RETURN(-EINVAL);
2640         }
2641         if (rc)
2642                 RETURN(rc);
2643         /* Add failover nids to the client log */
2644         rc = name_create(&logname, mti->mti_fsname, "-client");
2645         if (rc) {
2646                 name_destroy(&cliname);
2647                 RETURN(rc);
2648         }
2649         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2650         name_destroy(&logname);
2651         name_destroy(&cliname);
2652         if (rc)
2653                 RETURN(rc);
2654
2655         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2656                 /* Add OST failover nids to the MDT logs as well */
2657                 int i;
2658
2659                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2660                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2661                                 continue;
2662                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2663                         if (rc)
2664                                 RETURN(rc);
2665                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2666                                                  fsdb, i);
2667                         if (rc) {
2668                                 name_destroy(&logname);
2669                                 RETURN(rc);
2670                         }
2671                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2672                                                             mti, logname,
2673                                                             cliname);
2674                         name_destroy(&cliname);
2675                         name_destroy(&logname);
2676                         if (rc)
2677                                 RETURN(rc);
2678                 }
2679         }
2680
2681         RETURN(rc);
2682 }
2683
2684 static int mgs_wlp_lcfg(const struct lu_env *env,
2685                         struct mgs_device *mgs, struct fs_db *fsdb,
2686                         struct mgs_target_info *mti,
2687                         char *logname, struct lustre_cfg_bufs *bufs,
2688                         char *tgtname, char *ptr)
2689 {
2690         char comment[MTI_NAME_MAXLEN];
2691         char *tmp;
2692         struct llog_cfg_rec *lcr;
2693         int rc, del;
2694
2695         /* Erase any old settings of this same parameter */
2696         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2697         comment[MTI_NAME_MAXLEN - 1] = 0;
2698         /* But don't try to match the value. */
2699         tmp = strchr(comment, '=');
2700         if (tmp != NULL)
2701                 *tmp = 0;
2702         /* FIXME we should skip settings that are the same as old values */
2703         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2704         if (rc < 0)
2705                 return rc;
2706         del = mgs_param_empty(ptr);
2707
2708         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2709                       "Setting" : "Modifying", tgtname, comment, logname);
2710         if (del) {
2711                 /* mgs_modify() will return 1 if nothing had to be done */
2712                 if (rc == 1)
2713                         rc = 0;
2714                 return rc;
2715         }
2716
2717         lustre_cfg_bufs_reset(bufs, tgtname);
2718         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2719         if (mti->mti_flags & LDD_F_PARAM2)
2720                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2721
2722         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2723                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2724         if (lcr == NULL)
2725                 return -ENOMEM;
2726
2727         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2728                                   comment);
2729         lustre_cfg_rec_free(lcr);
2730         return rc;
2731 }
2732
2733 static int mgs_write_log_param2(const struct lu_env *env,
2734                                 struct mgs_device *mgs,
2735                                 struct fs_db *fsdb,
2736                                 struct mgs_target_info *mti, char *ptr)
2737 {
2738         struct lustre_cfg_bufs  bufs;
2739         int                     rc = 0;
2740         ENTRY;
2741
2742         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2743         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2744                           mti->mti_svname, ptr);
2745
2746         RETURN(rc);
2747 }
2748
2749 /* write global variable settings into log */
2750 static int mgs_write_log_sys(const struct lu_env *env,
2751                              struct mgs_device *mgs, struct fs_db *fsdb,
2752                              struct mgs_target_info *mti, char *sys, char *ptr)
2753 {
2754         struct mgs_thread_info  *mgi = mgs_env_info(env);
2755         struct lustre_cfg       *lcfg;
2756         struct llog_cfg_rec     *lcr;
2757         char *tmp, sep;
2758         int rc, cmd, convert = 1;
2759
2760         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2761                 cmd = LCFG_SET_TIMEOUT;
2762         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2763                 cmd = LCFG_SET_LDLM_TIMEOUT;
2764         /* Check for known params here so we can return error to lctl */
2765         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2766                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2767                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2768                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2769                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2770                 cmd = LCFG_PARAM;
2771         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2772                 convert = 0; /* Don't convert string value to integer */
2773                 cmd = LCFG_PARAM;
2774         } else {
2775                 return -EINVAL;
2776         }
2777
2778         if (mgs_param_empty(ptr))
2779                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2780         else
2781                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2782
2783         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2784         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2785         if (!convert && *tmp != '\0')
2786                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2787         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2788         if (lcr == NULL)
2789                 return -ENOMEM;
2790
2791         lcfg = &lcr->lcr_cfg;
2792         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2793         /* truncate the comment to the parameter name */
2794         ptr = tmp - 1;
2795         sep = *ptr;
2796         *ptr = '\0';
2797         /* modify all servers and clients */
2798         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2799                                       *tmp == '\0' ? NULL : lcr,
2800                                       mti->mti_fsname, sys, 0);
2801         if (rc == 0 && *tmp != '\0') {
2802                 switch (cmd) {
2803                 case LCFG_SET_TIMEOUT:
2804                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2805                                 class_process_config(lcfg);
2806                         break;
2807                 case LCFG_SET_LDLM_TIMEOUT:
2808                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2809                                 class_process_config(lcfg);
2810                         break;
2811                 default:
2812                         break;
2813                 }
2814         }
2815         *ptr = sep;
2816         lustre_cfg_rec_free(lcr);
2817         return rc;
2818 }
2819
2820 /* write quota settings into log */
2821 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2822                                struct fs_db *fsdb, struct mgs_target_info *mti,
2823                                char *quota, char *ptr)
2824 {
2825         struct mgs_thread_info  *mgi = mgs_env_info(env);
2826         struct llog_cfg_rec     *lcr;
2827         char                    *tmp;
2828         char                     sep;
2829         int                      rc, cmd = LCFG_PARAM;
2830
2831         /* support only 'meta' and 'data' pools so far */
2832         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2833             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2834                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2835                        "& quota.ost are)\n", ptr);
2836                 return -EINVAL;
2837         }
2838
2839         if (*tmp == '\0') {
2840                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2841         } else {
2842                 CDEBUG(D_MGS, "global '%s'\n", quota);
2843
2844                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2845                     strcmp(tmp, "none") != 0) {
2846                         CERROR("enable option(%s) isn't supported\n", tmp);
2847                         return -EINVAL;
2848                 }
2849         }
2850
2851         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2852         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2853         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2854         if (lcr == NULL)
2855                 return -ENOMEM;
2856
2857         /* truncate the comment to the parameter name */
2858         ptr = tmp - 1;
2859         sep = *ptr;
2860         *ptr = '\0';
2861
2862         /* XXX we duplicated quota enable information in all server
2863          *     config logs, it should be moved to a separate config
2864          *     log once we cleanup the config log for global param. */
2865         /* modify all servers */
2866         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2867                                       *tmp == '\0' ? NULL : lcr,
2868                                       mti->mti_fsname, quota, 1);
2869         *ptr = sep;
2870         lustre_cfg_rec_free(lcr);
2871         return rc < 0 ? rc : 0;
2872 }
2873
2874 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2875                                    struct mgs_device *mgs,
2876                                    struct fs_db *fsdb,
2877                                    struct mgs_target_info *mti,
2878                                    char *param)
2879 {
2880         struct mgs_thread_info  *mgi = mgs_env_info(env);
2881         struct llog_cfg_rec     *lcr;
2882         struct llog_handle      *llh = NULL;
2883         char                    *logname;
2884         char                    *comment, *ptr;
2885         int                      rc, len;
2886
2887         ENTRY;
2888
2889         /* get comment */
2890         ptr = strchr(param, '=');
2891         LASSERT(ptr != NULL);
2892         len = ptr - param;
2893
2894         OBD_ALLOC(comment, len + 1);
2895         if (comment == NULL)
2896                 RETURN(-ENOMEM);
2897         strncpy(comment, param, len);
2898         comment[len] = '\0';
2899
2900         /* prepare lcfg */
2901         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2902         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2903         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2904         if (lcr == NULL)
2905                 GOTO(out_comment, rc = -ENOMEM);
2906
2907         /* construct log name */
2908         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2909         if (rc < 0)
2910                 GOTO(out_lcfg, rc);
2911
2912         if (mgs_log_is_empty(env, mgs, logname)) {
2913                 rc = record_start_log(env, mgs, &llh, logname);
2914                 if (rc < 0)
2915                         GOTO(out, rc);
2916                 record_end_log(env, &llh);
2917         }
2918
2919         /* obsolete old one */
2920         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2921                         comment, CM_SKIP);
2922         if (rc < 0)
2923                 GOTO(out, rc);
2924         /* write the new one */
2925         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2926                                   mti->mti_svname, comment);
2927         if (rc)
2928                 CERROR("%s: error writing log %s: rc = %d\n",
2929                        mgs->mgs_obd->obd_name, logname, rc);
2930 out:
2931         name_destroy(&logname);
2932 out_lcfg:
2933         lustre_cfg_rec_free(lcr);
2934 out_comment:
2935         OBD_FREE(comment, len + 1);
2936         RETURN(rc);
2937 }
2938
2939 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2940                                         char *param)
2941 {
2942         char    *ptr;
2943
2944         /* disable the adjustable udesc parameter for now, i.e. use default
2945          * setting that client always ship udesc to MDT if possible. to enable
2946          * it simply remove the following line */
2947         goto error_out;
2948
2949         ptr = strchr(param, '=');
2950         if (ptr == NULL)
2951                 goto error_out;
2952         *ptr++ = '\0';
2953
2954         if (strcmp(param, PARAM_SRPC_UDESC))
2955                 goto error_out;
2956
2957         if (strcmp(ptr, "yes") == 0) {
2958                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2959                 CWARN("Enable user descriptor shipping from client to MDT\n");
2960         } else if (strcmp(ptr, "no") == 0) {
2961                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2962                 CWARN("Disable user descriptor shipping from client to MDT\n");
2963         } else {
2964                 *(ptr - 1) = '=';
2965                 goto error_out;
2966         }
2967         return 0;
2968
2969 error_out:
2970         CERROR("Invalid param: %s\n", param);
2971         return -EINVAL;
2972 }
2973
2974 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2975                                   const char *svname,
2976                                   char *param)
2977 {
2978         struct sptlrpc_rule      rule;
2979         struct sptlrpc_rule_set *rset;
2980         int                      rc;
2981         ENTRY;
2982
2983         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2984                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2985                 RETURN(-EINVAL);
2986         }
2987
2988         if (strncmp(param, PARAM_SRPC_UDESC,
2989                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2990                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2991         }
2992
2993         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2994                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2995                 RETURN(-EINVAL);
2996         }
2997
2998         param += sizeof(PARAM_SRPC_FLVR) - 1;
2999
3000         rc = sptlrpc_parse_rule(param, &rule);
3001         if (rc)
3002                 RETURN(rc);
3003
3004         /* mgs rules implies must be mgc->mgs */
3005         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3006                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3007                      rule.sr_from != LUSTRE_SP_ANY) ||
3008                     (rule.sr_to != LUSTRE_SP_MGS &&
3009                      rule.sr_to != LUSTRE_SP_ANY))
3010                         RETURN(-EINVAL);
3011         }
3012
3013         /* preapre room for this coming rule. svcname format should be:
3014          * - fsname: general rule
3015          * - fsname-tgtname: target-specific rule
3016          */
3017         if (strchr(svname, '-')) {
3018                 struct mgs_tgt_srpc_conf *tgtconf;
3019                 int                       found = 0;
3020
3021                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3022                      tgtconf = tgtconf->mtsc_next) {
3023                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3024                                 found = 1;
3025                                 break;
3026                         }
3027                 }
3028
3029                 if (!found) {
3030                         int name_len;
3031
3032                         OBD_ALLOC_PTR(tgtconf);
3033                         if (tgtconf == NULL)
3034                                 RETURN(-ENOMEM);
3035
3036                         name_len = strlen(svname);
3037
3038                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3039                         if (tgtconf->mtsc_tgt == NULL) {
3040                                 OBD_FREE_PTR(tgtconf);
3041                                 RETURN(-ENOMEM);
3042                         }
3043                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3044
3045                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3046                         fsdb->fsdb_srpc_tgt = tgtconf;
3047                 }
3048
3049                 rset = &tgtconf->mtsc_rset;
3050         } else {
3051                 rset = &fsdb->fsdb_srpc_gen;
3052         }
3053
3054         rc = sptlrpc_rule_set_merge(rset, &rule);
3055
3056         RETURN(rc);
3057 }
3058
3059 static int mgs_srpc_set_param(const struct lu_env *env,
3060                               struct mgs_device *mgs,
3061                               struct fs_db *fsdb,
3062                               struct mgs_target_info *mti,
3063                               char *param)
3064 {
3065         char                   *copy;
3066         int                     rc, copy_size;
3067         ENTRY;
3068
3069 #ifndef HAVE_GSS
3070         RETURN(-EINVAL);
3071 #endif
3072         /* keep a copy of original param, which could be destroied
3073          * during parsing */
3074         copy_size = strlen(param) + 1;
3075         OBD_ALLOC(copy, copy_size);
3076         if (copy == NULL)
3077                 return -ENOMEM;
3078         memcpy(copy, param, copy_size);
3079
3080         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3081         if (rc)
3082                 goto out_free;
3083
3084         /* previous steps guaranteed the syntax is correct */
3085         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3086         if (rc)
3087                 goto out_free;
3088
3089         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3090                 /*
3091                  * for mgs rules, make them effective immediately.
3092                  */
3093                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3094                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3095                                                  &fsdb->fsdb_srpc_gen);
3096         }
3097
3098 out_free:
3099         OBD_FREE(copy, copy_size);
3100         RETURN(rc);
3101 }
3102
3103 struct mgs_srpc_read_data {
3104         struct fs_db   *msrd_fsdb;
3105         int             msrd_skip;
3106 };
3107
3108 static int mgs_srpc_read_handler(const struct lu_env *env,
3109                                  struct llog_handle *llh,
3110                                  struct llog_rec_hdr *rec, void *data)
3111 {
3112         struct mgs_srpc_read_data *msrd = data;
3113         struct cfg_marker         *marker;
3114         struct lustre_cfg         *lcfg = REC_DATA(rec);
3115         char                      *svname, *param;
3116         int                        cfg_len, rc;
3117         ENTRY;
3118
3119         if (rec->lrh_type != OBD_CFG_REC) {
3120                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3121                 RETURN(-EINVAL);
3122         }
3123
3124         cfg_len = REC_DATA_LEN(rec);
3125
3126         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3127         if (rc) {
3128                 CERROR("Insane cfg\n");
3129                 RETURN(rc);
3130         }
3131
3132         if (lcfg->lcfg_command == LCFG_MARKER) {
3133                 marker = lustre_cfg_buf(lcfg, 1);
3134
3135                 if (marker->cm_flags & CM_START &&
3136                     marker->cm_flags & CM_SKIP)
3137                         msrd->msrd_skip = 1;
3138                 if (marker->cm_flags & CM_END)
3139                         msrd->msrd_skip = 0;
3140
3141                 RETURN(0);
3142         }
3143
3144         if (msrd->msrd_skip)
3145                 RETURN(0);
3146
3147         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3148                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3149                 RETURN(0);
3150         }
3151
3152         svname = lustre_cfg_string(lcfg, 0);
3153         if (svname == NULL) {
3154                 CERROR("svname is empty\n");
3155                 RETURN(0);
3156         }
3157
3158         param = lustre_cfg_string(lcfg, 1);
3159         if (param == NULL) {
3160                 CERROR("param is empty\n");
3161                 RETURN(0);
3162         }
3163
3164         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3165         if (rc)
3166                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3167
3168         RETURN(0);
3169 }
3170
3171 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3172                                 struct mgs_device *mgs,
3173                                 struct fs_db *fsdb)
3174 {
3175         struct llog_handle        *llh = NULL;
3176         struct llog_ctxt          *ctxt;
3177         char                      *logname;
3178         struct mgs_srpc_read_data  msrd;
3179         int                        rc;
3180         ENTRY;
3181
3182         /* construct log name */
3183         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3184         if (rc)
3185                 RETURN(rc);
3186
3187         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3188         LASSERT(ctxt != NULL);
3189
3190         if (mgs_log_is_empty(env, mgs, logname))
3191                 GOTO(out, rc = 0);
3192
3193         rc = llog_open(env, ctxt, &llh, NULL, logname,
3194                        LLOG_OPEN_EXISTS);
3195         if (rc < 0) {
3196                 if (rc == -ENOENT)
3197                         rc = 0;
3198                 GOTO(out, rc);
3199         }
3200
3201         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3202         if (rc)
3203                 GOTO(out_close, rc);
3204
3205         if (llog_get_size(llh) <= 1)
3206                 GOTO(out_close, rc = 0);
3207
3208         msrd.msrd_fsdb = fsdb;
3209         msrd.msrd_skip = 0;
3210
3211         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3212                           NULL);
3213
3214 out_close:
3215         llog_close(env, llh);
3216 out:
3217         llog_ctxt_put(ctxt);
3218         name_destroy(&logname);
3219
3220         if (rc)
3221                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3222         RETURN(rc);
3223 }
3224
3225 /* Permanent settings of all parameters by writing into the appropriate
3226  * configuration logs.
3227  * A parameter with null value ("<param>='\0'") means to erase it out of
3228  * the logs.
3229  */
3230 static int mgs_write_log_param(const struct lu_env *env,
3231                                struct mgs_device *mgs, struct fs_db *fsdb,
3232                                struct mgs_target_info *mti, char *ptr)
3233 {
3234         struct mgs_thread_info *mgi = mgs_env_info(env);
3235         char *logname;
3236         char *tmp;
3237         int rc = 0, rc2 = 0;
3238         ENTRY;
3239
3240         /* For various parameter settings, we have to figure out which logs
3241            care about them (e.g. both mdt and client for lov settings) */
3242         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3243
3244         /* The params are stored in MOUNT_DATA_FILE and modified via
3245            tunefs.lustre, or set using lctl conf_param */
3246
3247         /* Processed in lustre_start_mgc */
3248         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3249                 GOTO(end, rc);
3250
3251         /* Processed in ost/mdt */
3252         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3253                 GOTO(end, rc);
3254
3255         /* Processed in mgs_write_log_ost */
3256         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3257                 if (mti->mti_flags & LDD_F_PARAM) {
3258                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3259                                            "changed with tunefs.lustre"
3260                                            "and --writeconf\n", ptr);
3261                         rc = -EPERM;
3262                 }
3263                 GOTO(end, rc);
3264         }
3265
3266         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3267                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3268                 GOTO(end, rc);
3269         }
3270
3271         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3272                 /* Add a failover nidlist */
3273                 rc = 0;
3274                 /* We already processed failovers params for new
3275                    targets in mgs_write_log_target */
3276                 if (mti->mti_flags & LDD_F_PARAM) {
3277                         CDEBUG(D_MGS, "Adding failnode\n");
3278                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3279                 }
3280                 GOTO(end, rc);
3281         }
3282
3283         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3284                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3285                 GOTO(end, rc);
3286         }
3287
3288         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3289                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3290                 GOTO(end, rc);
3291         }
3292
3293         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3294                 /* active=0 means off, anything else means on */
3295                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3296                 int i;
3297
3298                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3299                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3300                                            "be (de)activated.\n",
3301                                            mti->mti_svname);
3302                         GOTO(end, rc = -EINVAL);
3303                 }
3304                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3305                               flag ? "de": "re", mti->mti_svname);
3306                 /* Modify clilov */
3307                 rc = name_create(&logname, mti->mti_fsname, "-client");
3308                 if (rc)
3309                         GOTO(end, rc);
3310                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3311                                 mti->mti_svname, "add osc", flag);
3312                 name_destroy(&logname);
3313                 if (rc)
3314                         goto active_err;
3315                 /* Modify mdtlov */
3316                 /* Add to all MDT logs for CMD */
3317                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3318                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3319                                 continue;
3320                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3321                         if (rc)
3322                                 GOTO(end, rc);
3323                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3324                                         mti->mti_svname, "add osc", flag);
3325                         name_destroy(&logname);
3326                         if (rc)
3327                                 goto active_err;
3328                 }
3329         active_err:
3330                 if (rc) {
3331                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3332                                            "log (%d). No permanent "
3333                                            "changes were made to the "
3334                                            "config log.\n",
3335                                            mti->mti_svname, rc);
3336                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3337                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3338                                                    " because the log"
3339                                                    "is in the old 1.4"
3340                                                    "style. Consider "
3341                                                    " --writeconf to "
3342                                                    "update the logs.\n");
3343                         GOTO(end, rc);
3344                 }
3345                 /* Fall through to osc proc for deactivating live OSC
3346                    on running MDT / clients. */
3347         }
3348         /* Below here, let obd's XXX_process_config methods handle it */
3349
3350         /* All lov. in proc */
3351         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3352                 char *mdtlovname;
3353
3354                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3355                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3356                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3357                                            "set on the MDT, not %s. "
3358                                            "Ignoring.\n",
3359                                            mti->mti_svname);
3360                         GOTO(end, rc = 0);
3361                 }
3362
3363                 /* Modify mdtlov */
3364                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3365                         GOTO(end, rc = -ENODEV);
3366
3367                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3368                                              mti->mti_stripe_index);
3369                 if (rc)
3370                         GOTO(end, rc);
3371                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3372                                   &mgi->mgi_bufs, mdtlovname, ptr);
3373                 name_destroy(&logname);
3374                 name_destroy(&mdtlovname);
3375                 if (rc)
3376                         GOTO(end, rc);
3377
3378                 /* Modify clilov */
3379                 rc = name_create(&logname, mti->mti_fsname, "-client");
3380                 if (rc)
3381                         GOTO(end, rc);
3382                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3383                                   fsdb->fsdb_clilov, ptr);
3384                 name_destroy(&logname);
3385                 GOTO(end, rc);
3386         }
3387
3388         /* All osc., mdc., llite. params in proc */
3389         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3390             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3391             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3392                 char *cname;
3393
3394                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3395                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3396                                            " cannot be modified. Consider"
3397                                            " updating the configuration with"
3398                                            " --writeconf\n",
3399                                            mti->mti_svname);
3400                         GOTO(end, rc = -EINVAL);
3401                 }
3402                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3403                         rc = name_create(&cname, mti->mti_fsname, "-client");
3404                         /* Add the client type to match the obdname in
3405                            class_config_llog_handler */
3406                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3407                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3408                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3409                         rc = name_create(&cname, mti->mti_svname, "-osc");
3410                 } else {
3411                         GOTO(end, rc = -EINVAL);
3412                 }
3413                 if (rc)
3414                         GOTO(end, rc);
3415
3416                 /* Forbid direct update of llite root squash parameters.
3417                  * These parameters are indirectly set via the MDT settings.
3418                  * See (LU-1778) */
3419                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3420                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3421                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3422                         LCONSOLE_ERROR("%s: root squash parameters can only "
3423                                 "be updated through MDT component\n",
3424                                 mti->mti_fsname);
3425                         name_destroy(&cname);
3426                         GOTO(end, rc = -EINVAL);
3427                 }
3428
3429                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3430
3431                 /* Modify client */
3432                 rc = name_create(&logname, mti->mti_fsname, "-client");
3433                 if (rc) {
3434                         name_destroy(&cname);
3435                         GOTO(end, rc);
3436                 }
3437                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3438                                   cname, ptr);
3439
3440                 /* osc params affect the MDT as well */
3441                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3442                         int i;
3443
3444                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3445                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3446                                         continue;
3447                                 name_destroy(&cname);
3448                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3449                                                          fsdb, i);
3450                                 name_destroy(&logname);
3451                                 if (rc)
3452                                         break;
3453                                 rc = name_create_mdt(&logname,
3454                                                      mti->mti_fsname, i);
3455                                 if (rc)
3456                                         break;
3457                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3458                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3459                                                           mti, logname,
3460                                                           &mgi->mgi_bufs,
3461                                                           cname, ptr);
3462                                         if (rc)
3463                                                 break;
3464                                 }
3465                         }
3466                 }
3467                 name_destroy(&logname);
3468                 name_destroy(&cname);
3469                 GOTO(end, rc);
3470         }
3471
3472         /* All mdt. params in proc */
3473         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3474                 int i;
3475                 __u32 idx;
3476
3477                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3478                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3479                             MTI_NAME_MAXLEN) == 0)
3480                         /* device is unspecified completely? */
3481                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3482                 else
3483                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3484                 if (rc < 0)
3485                         goto active_err;
3486                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3487                         goto active_err;
3488                 if (rc & LDD_F_SV_ALL) {
3489                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3490                                 if (!test_bit(i,
3491                                                   fsdb->fsdb_mdt_index_map))
3492                                         continue;
3493                                 rc = name_create_mdt(&logname,
3494                                                 mti->mti_fsname, i);
3495                                 if (rc)
3496                                         goto active_err;
3497                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3498                                                   logname, &mgi->mgi_bufs,
3499                                                   logname, ptr);
3500                                 name_destroy(&logname);
3501                                 if (rc)
3502                                         goto active_err;
3503                         }
3504                 } else {
3505                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3506                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3507                                 LCONSOLE_ERROR("%s: root squash parameters "
3508                                         "cannot be applied to a single MDT\n",
3509                                         mti->mti_fsname);
3510                                 GOTO(end, rc = -EINVAL);
3511                         }
3512                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3513                                           mti->mti_svname, &mgi->mgi_bufs,
3514                                           mti->mti_svname, ptr);
3515                         if (rc)
3516                                 goto active_err;
3517                 }
3518
3519                 /* root squash settings are also applied to llite
3520                  * config log (see LU-1778) */
3521                 if (rc == 0 &&
3522                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3523                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3524                         char *cname;
3525                         char *ptr2;
3526
3527                         rc = name_create(&cname, mti->mti_fsname, "-client");
3528                         if (rc)
3529                                 GOTO(end, rc);
3530                         rc = name_create(&logname, mti->mti_fsname, "-client");
3531                         if (rc) {
3532                                 name_destroy(&cname);
3533                                 GOTO(end, rc);
3534                         }
3535                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3536                         if (rc) {
3537                                 name_destroy(&cname);
3538                                 name_destroy(&logname);
3539                                 GOTO(end, rc);
3540                         }
3541                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3542                                           &mgi->mgi_bufs, cname, ptr2);
3543                         name_destroy(&ptr2);
3544                         name_destroy(&logname);
3545                         name_destroy(&cname);
3546                 }
3547                 GOTO(end, rc);
3548         }
3549
3550         /* All mdd., ost. and osd. params in proc */
3551         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3552             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3553             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3554                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3555                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3556                         GOTO(end, rc = -ENODEV);
3557
3558                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3559                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3560                 GOTO(end, rc);
3561         }
3562
3563         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3564         rc2 = -ENOSYS;
3565
3566 end:
3567         if (rc)
3568                 CERROR("err %d on param '%s'\n", rc, ptr);
3569
3570         RETURN(rc ?: rc2);
3571 }
3572
3573 /* Not implementing automatic failover nid addition at this time. */
3574 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3575                       struct mgs_target_info *mti)
3576 {
3577 #if 0
3578         struct fs_db *fsdb;
3579         int rc;
3580         ENTRY;
3581
3582         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3583         if (rc)
3584                 RETURN(rc);
3585
3586         if (mgs_log_is_empty(obd, mti->mti_svname))
3587                 /* should never happen */
3588                 RETURN(-ENOENT);
3589
3590         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3591
3592         /* FIXME We can just check mti->params to see if we're already in
3593            the failover list.  Modify mti->params for rewriting back at
3594            server_register_target(). */
3595
3596         mutex_lock(&fsdb->fsdb_mutex);
3597         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3598         mutex_unlock(&fsdb->fsdb_mutex);
3599         char    *buf, *params;
3600         int      rc = -EINVAL;
3601
3602         RETURN(rc);
3603 #endif
3604         return 0;
3605 }
3606
3607 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3608                          struct mgs_target_info *mti, struct fs_db *fsdb)
3609 {
3610         char    *buf, *params;
3611         int      rc = -EINVAL;
3612
3613         ENTRY;
3614
3615         /* set/check the new target index */
3616         rc = mgs_set_index(env, mgs, mti);
3617         if (rc < 0)
3618                 RETURN(rc);
3619
3620         if (rc == EALREADY) {
3621                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3622                               mti->mti_stripe_index, mti->mti_svname);
3623                 /* We would like to mark old log sections as invalid
3624                    and add new log sections in the client and mdt logs.
3625                    But if we add new sections, then live clients will
3626                    get repeat setup instructions for already running
3627                    osc's. So don't update the client/mdt logs. */
3628                 mti->mti_flags &= ~LDD_F_UPDATE;
3629                 rc = 0;
3630         }
3631
3632         mutex_lock(&fsdb->fsdb_mutex);
3633
3634         if (mti->mti_flags &
3635             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3636                 /* Generate a log from scratch */
3637                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3638                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3639                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3640                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3641                 } else {
3642                         CERROR("Unknown target type %#x, can't create log for "
3643                                "%s\n", mti->mti_flags, mti->mti_svname);
3644                 }
3645                 if (rc) {
3646                         CERROR("Can't write logs for %s (%d)\n",
3647                                mti->mti_svname, rc);
3648                         GOTO(out_up, rc);
3649                 }
3650         } else {
3651                 /* Just update the params from tunefs in mgs_write_log_params */
3652                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3653                 mti->mti_flags |= LDD_F_PARAM;
3654         }
3655
3656         /* allocate temporary buffer, where class_get_next_param will
3657            make copy of a current  parameter */
3658         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3659         if (buf == NULL)
3660                 GOTO(out_up, rc = -ENOMEM);
3661         params = mti->mti_params;
3662         while (params != NULL) {
3663                 rc = class_get_next_param(&params, buf);
3664                 if (rc) {
3665                         if (rc == 1)
3666                                 /* there is no next parameter, that is
3667                                    not an error */
3668                                 rc = 0;
3669                         break;
3670                 }
3671                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3672                        params, buf);
3673                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3674                 if (rc)
3675                         break;
3676         }
3677
3678         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3679
3680 out_up:
3681         mutex_unlock(&fsdb->fsdb_mutex);
3682         RETURN(rc);
3683 }
3684
3685 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3686 {
3687         struct llog_ctxt        *ctxt;
3688         int                      rc = 0;
3689
3690         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3691         if (ctxt == NULL) {
3692                 CERROR("%s: MGS config context doesn't exist\n",
3693                        mgs->mgs_obd->obd_name);
3694                 rc = -ENODEV;
3695         } else {
3696                 rc = llog_erase(env, ctxt, NULL, name);
3697                 /* llog may not exist */
3698                 if (rc == -ENOENT)
3699                         rc = 0;
3700                 llog_ctxt_put(ctxt);
3701         }
3702
3703         if (rc)
3704                 CERROR("%s: failed to clear log %s: %d\n",
3705                        mgs->mgs_obd->obd_name, name, rc);
3706
3707         return rc;
3708 }
3709
3710 /* erase all logs for the given fs */
3711 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3712 {
3713         struct fs_db *fsdb;
3714         struct list_head log_list;
3715         struct mgs_direntry *dirent, *n;
3716         int rc, len = strlen(fsname);
3717         char *suffix;
3718         ENTRY;
3719
3720         /* Find all the logs in the CONFIGS directory */
3721         rc = class_dentry_readdir(env, mgs, &log_list);
3722         if (rc)
3723                 RETURN(rc);
3724
3725         mutex_lock(&mgs->mgs_mutex);
3726
3727         /* Delete the fs db */
3728         fsdb = mgs_find_fsdb(mgs, fsname);
3729         if (fsdb)
3730                 mgs_free_fsdb(mgs, fsdb);
3731
3732         mutex_unlock(&mgs->mgs_mutex);
3733
3734         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3735                 list_del_init(&dirent->mde_list);
3736                 suffix = strrchr(dirent->mde_name, '-');
3737                 if (suffix != NULL) {
3738                         if ((len == suffix - dirent->mde_name) &&
3739                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3740                                 CDEBUG(D_MGS, "Removing log %s\n",
3741                                        dirent->mde_name);
3742                                 mgs_erase_log(env, mgs, dirent->mde_name);
3743                         }
3744                 }
3745                 mgs_direntry_free(dirent);
3746         }
3747
3748         RETURN(rc);
3749 }
3750
3751 /* list all logs for the given fs */
3752 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3753                   struct obd_ioctl_data *data)
3754 {
3755         struct list_head         log_list;
3756         struct mgs_direntry     *dirent, *n;
3757         char                    *out, *suffix;
3758         int                      l, remains, rc;
3759
3760         ENTRY;
3761
3762         /* Find all the logs in the CONFIGS directory */
3763         rc = class_dentry_readdir(env, mgs, &log_list);
3764         if (rc)
3765                 RETURN(rc);
3766
3767         out = data->ioc_bulk;
3768         remains = data->ioc_inllen1;
3769         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3770                 list_del_init(&dirent->mde_list);
3771                 suffix = strrchr(dirent->mde_name, '-');
3772                 if (suffix != NULL) {
3773                         l = snprintf(out, remains, "config log: $%s\n",
3774                                      dirent->mde_name);
3775                         out += l;
3776                         remains -= l;
3777                 }
3778                 mgs_direntry_free(dirent);
3779                 if (remains <= 0)
3780                         break;
3781         }
3782         RETURN(rc);
3783 }
3784
3785 /* from llog_swab */
3786 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3787 {
3788         int i;
3789         ENTRY;
3790
3791         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3792         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3793
3794         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3795         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3796         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3797         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3798
3799         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3800         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3801                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3802                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3803                                i, lcfg->lcfg_buflens[i],
3804                                lustre_cfg_string(lcfg, i));
3805                 }
3806         EXIT;
3807 }
3808
3809 /* Setup params fsdb and log
3810  */
3811 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3812                           struct fs_db *fsdb)
3813 {
3814         struct llog_handle      *params_llh = NULL;
3815         int                     rc;
3816         ENTRY;
3817
3818         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3819         if (fsdb != NULL) {
3820                 mutex_lock(&fsdb->fsdb_mutex);
3821                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3822                 if (rc == 0)
3823                         rc = record_end_log(env, &params_llh);
3824                 mutex_unlock(&fsdb->fsdb_mutex);
3825         }
3826
3827         RETURN(rc);
3828 }
3829
3830 /* Cleanup params fsdb and log
3831  */
3832 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3833 {
3834         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3835 }
3836
3837 /* Set a permanent (config log) param for a target or fs
3838  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3839  *             buf1 contains the single parameter
3840  */
3841 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3842                  struct lustre_cfg *lcfg, char *fsname)
3843 {
3844         struct fs_db *fsdb;
3845         struct mgs_target_info *mti;
3846         char *devname, *param;
3847         char *ptr;
3848         const char *tmp;
3849         __u32 index;
3850         int rc = 0;
3851         ENTRY;
3852
3853         print_lustre_cfg(lcfg);
3854
3855         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3856         devname = lustre_cfg_string(lcfg, 0);
3857         param = lustre_cfg_string(lcfg, 1);
3858         if (!devname) {
3859                 /* Assume device name embedded in param:
3860                    lustre-OST0000.osc.max_dirty_mb=32 */
3861                 ptr = strchr(param, '.');
3862                 if (ptr) {
3863                         devname = param;
3864                         *ptr = 0;
3865                         param = ptr + 1;
3866                 }
3867         }
3868         if (!devname) {
3869                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3870                 RETURN(-ENOSYS);
3871         }
3872
3873         rc = mgs_parse_devname(devname, fsname, NULL);
3874         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3875                 /* param related to llite isn't allowed to set by OST or MDT */
3876                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3877                                        sizeof(PARAM_LLITE) - 1) == 0)
3878                         RETURN(-EINVAL);
3879         } else {
3880                 /* assume devname is the fsname */
3881                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
3882         }
3883         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3884
3885         rc = mgs_find_or_make_fsdb(env, mgs,
3886                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3887                                    PARAMS_FILENAME : fsname, &fsdb);
3888         if (rc)
3889                 RETURN(rc);
3890
3891         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3892             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3893             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3894                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3895                        "is '%s'\n", fsname, devname);
3896                 mgs_free_fsdb(mgs, fsdb);
3897                 RETURN(-EINVAL);
3898         }
3899
3900         /* Create a fake mti to hold everything */
3901         OBD_ALLOC_PTR(mti);
3902         if (!mti)
3903                 GOTO(out, rc = -ENOMEM);
3904         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3905             >= sizeof(mti->mti_fsname))
3906                 GOTO(out, rc = -E2BIG);
3907         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3908             >= sizeof(mti->mti_svname))
3909                 GOTO(out, rc = -E2BIG);
3910         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3911             >= sizeof(mti->mti_params))
3912                 GOTO(out, rc = -E2BIG);
3913         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3914         if (rc < 0)
3915                 /* Not a valid server; may be only fsname */
3916                 rc = 0;
3917         else
3918                 /* Strip -osc or -mdc suffix from svname */
3919                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3920                                      mti->mti_svname))
3921                         GOTO(out, rc = -EINVAL);
3922         /*
3923          * Revoke lock so everyone updates.  Should be alright if
3924          * someone was already reading while we were updating the logs,
3925          * so we don't really need to hold the lock while we're
3926          * writing (above).
3927          */
3928         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3929                 mti->mti_flags = rc | LDD_F_PARAM2;
3930                 mutex_lock(&fsdb->fsdb_mutex);
3931                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3932                 mutex_unlock(&fsdb->fsdb_mutex);
3933                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3934         } else {
3935                 mti->mti_flags = rc | LDD_F_PARAM;
3936                 mutex_lock(&fsdb->fsdb_mutex);
3937                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3938                 mutex_unlock(&fsdb->fsdb_mutex);
3939                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3940         }
3941
3942 out:
3943         OBD_FREE_PTR(mti);
3944         RETURN(rc);
3945 }
3946
3947 static int mgs_write_log_pool(const struct lu_env *env,
3948                               struct mgs_device *mgs, char *logname,
3949                               struct fs_db *fsdb, char *tgtname,
3950                               enum lcfg_command_type cmd,
3951                               char *fsname, char *poolname,
3952                               char *ostname, char *comment)
3953 {
3954         struct llog_handle *llh = NULL;
3955         int rc;
3956
3957         rc = record_start_log(env, mgs, &llh, logname);
3958         if (rc)
3959                 return rc;
3960         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3961         if (rc)
3962                 goto out;
3963         rc = record_base(env, llh, tgtname, 0, cmd,
3964                          fsname, poolname, ostname, 0);
3965         if (rc)
3966                 goto out;
3967         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3968 out:
3969         record_end_log(env, &llh);
3970         return rc;
3971 }
3972
3973 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
3974                     enum lcfg_command_type cmd, const char *nodemap_name,
3975                     const char *param)
3976 {
3977         lnet_nid_t      nid[2];
3978         __u32           idmap[2];
3979         bool            bool_switch;
3980         __u32           int_id;
3981         int             rc = 0;
3982         ENTRY;
3983
3984         switch (cmd) {
3985         case LCFG_NODEMAP_ADD:
3986                 rc = nodemap_add(nodemap_name);
3987                 break;
3988         case LCFG_NODEMAP_DEL:
3989                 rc = nodemap_del(nodemap_name);
3990                 break;
3991         case LCFG_NODEMAP_ADD_RANGE:
3992                 rc = nodemap_parse_range(param, nid);
3993                 if (rc != 0)
3994                         break;
3995                 rc = nodemap_add_range(nodemap_name, nid);
3996                 break;
3997         case LCFG_NODEMAP_DEL_RANGE:
3998                 rc = nodemap_parse_range(param, nid);
3999                 if (rc != 0)
4000                         break;
4001                 rc = nodemap_del_range(nodemap_name, nid);
4002                 break;
4003         case LCFG_NODEMAP_ADMIN:
4004                 bool_switch = simple_strtoul(param, NULL, 10);
4005                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4006                 break;
4007         case LCFG_NODEMAP_TRUSTED:
4008                 bool_switch = simple_strtoul(param, NULL, 10);
4009                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4010                 break;
4011         case LCFG_NODEMAP_SQUASH_UID:
4012                 int_id = simple_strtoul(param, NULL, 10);
4013                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4014                 break;
4015         case LCFG_NODEMAP_SQUASH_GID:
4016                 int_id = simple_strtoul(param, NULL, 10);
4017                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4018                 break;
4019         case LCFG_NODEMAP_ADD_UIDMAP:
4020         case LCFG_NODEMAP_ADD_GIDMAP:
4021                 rc = nodemap_parse_idmap(param, idmap);
4022                 if (rc != 0)
4023                         break;
4024                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4025                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4026                                                idmap);
4027                 else
4028                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4029                                                idmap);
4030                 break;
4031         case LCFG_NODEMAP_DEL_UIDMAP:
4032         case LCFG_NODEMAP_DEL_GIDMAP:
4033                 rc = nodemap_parse_idmap(param, idmap);
4034                 if (rc != 0)
4035                         break;
4036                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4037                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4038                                                idmap);
4039                 else
4040                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4041                                                idmap);
4042                 break;
4043         default:
4044                 rc = -EINVAL;
4045         }
4046
4047         RETURN(rc);
4048 }
4049
4050 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4051                  enum lcfg_command_type cmd, char *fsname,
4052                  char *poolname, char *ostname)
4053 {
4054         struct fs_db *fsdb;
4055         char *lovname;
4056         char *logname;
4057         char *label = NULL, *canceled_label = NULL;
4058         int label_sz;
4059         struct mgs_target_info *mti = NULL;
4060         int rc, i;
4061         ENTRY;
4062
4063         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4064         if (rc) {
4065                 CERROR("Can't get db for %s\n", fsname);
4066                 RETURN(rc);
4067         }
4068         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4069                 CERROR("%s is not defined\n", fsname);
4070                 mgs_free_fsdb(mgs, fsdb);
4071                 RETURN(-EINVAL);
4072         }
4073
4074         label_sz = 10 + strlen(fsname) + strlen(poolname);
4075
4076         /* check if ostname match fsname */
4077         if (ostname != NULL) {
4078                 char *ptr;
4079
4080                 ptr = strrchr(ostname, '-');
4081                 if ((ptr == NULL) ||
4082                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4083                         RETURN(-EINVAL);
4084                 label_sz += strlen(ostname);
4085         }
4086
4087         OBD_ALLOC(label, label_sz);
4088         if (label == NULL)
4089                 RETURN(-ENOMEM);
4090
4091         switch(cmd) {
4092         case LCFG_POOL_NEW:
4093                 sprintf(label,
4094                         "new %s.%s", fsname, poolname);
4095                 break;
4096         case LCFG_POOL_ADD:
4097                 sprintf(label,
4098                         "add %s.%s.%s", fsname, poolname, ostname);
4099                 break;
4100         case LCFG_POOL_REM:
4101                 OBD_ALLOC(canceled_label, label_sz);
4102                 if (canceled_label == NULL)
4103                         GOTO(out_label, rc = -ENOMEM);
4104                 sprintf(label,
4105                         "rem %s.%s.%s", fsname, poolname, ostname);
4106                 sprintf(canceled_label,
4107                         "add %s.%s.%s", fsname, poolname, ostname);
4108                 break;
4109         case LCFG_POOL_DEL:
4110                 OBD_ALLOC(canceled_label, label_sz);
4111                 if (canceled_label == NULL)
4112                         GOTO(out_label, rc = -ENOMEM);
4113                 sprintf(label,
4114                         "del %s.%s", fsname, poolname);
4115                 sprintf(canceled_label,
4116                         "new %s.%s", fsname, poolname);
4117                 break;
4118         default:
4119                 break;
4120         }
4121
4122         if (canceled_label != NULL) {
4123                 OBD_ALLOC_PTR(mti);
4124                 if (mti == NULL)
4125                         GOTO(out_cancel, rc = -ENOMEM);
4126         }
4127
4128         mutex_lock(&fsdb->fsdb_mutex);
4129         /* write pool def to all MDT logs */
4130         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4131                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4132                         rc = name_create_mdt_and_lov(&logname, &lovname,
4133                                                      fsdb, i);
4134                         if (rc) {
4135                                 mutex_unlock(&fsdb->fsdb_mutex);
4136                                 GOTO(out_mti, rc);
4137                         }
4138                         if (canceled_label != NULL) {
4139                                 strcpy(mti->mti_svname, "lov pool");
4140                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4141                                                 lovname, canceled_label,
4142                                                 CM_SKIP);
4143                         }
4144
4145                         if (rc >= 0)
4146                                 rc = mgs_write_log_pool(env, mgs, logname,
4147                                                         fsdb, lovname, cmd,
4148                                                         fsname, poolname,
4149                                                         ostname, label);
4150                         name_destroy(&logname);
4151                         name_destroy(&lovname);
4152                         if (rc) {
4153                                 mutex_unlock(&fsdb->fsdb_mutex);
4154                                 GOTO(out_mti, rc);
4155                         }
4156                 }
4157         }
4158
4159         rc = name_create(&logname, fsname, "-client");
4160         if (rc) {
4161                 mutex_unlock(&fsdb->fsdb_mutex);
4162                 GOTO(out_mti, rc);
4163         }
4164         if (canceled_label != NULL) {
4165                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4166                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4167                 if (rc < 0) {
4168                         mutex_unlock(&fsdb->fsdb_mutex);
4169                         name_destroy(&logname);
4170                         GOTO(out_mti, rc);
4171                 }
4172         }
4173
4174         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4175                                 cmd, fsname, poolname, ostname, label);
4176         mutex_unlock(&fsdb->fsdb_mutex);
4177         name_destroy(&logname);
4178         /* request for update */
4179         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4180
4181         EXIT;
4182 out_mti:
4183         if (mti != NULL)
4184                 OBD_FREE_PTR(mti);
4185 out_cancel:
4186         if (canceled_label != NULL)
4187                 OBD_FREE(canceled_label, label_sz);
4188 out_label:
4189         OBD_FREE(label, label_sz);
4190         return rc;
4191 }