Whamcloud - gitweb
689ff3cea11119719f062e008a927fbe6cbcca77
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgs/mgs_llog.c
33  *
34  * Lustre Management Server (mgs) config llog creation
35  *
36  * Author: Nathan Rutman <nathan@clusterfs.com>
37  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
38  * Author: Mikhail Pershin <tappro@whamcloud.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MGS
42 #define D_MGS D_CONFIG
43
44 #include <obd.h>
45 #include <lustre_ioctl.h>
46 #include <lustre_param.h>
47 #include <lustre_sec.h>
48 #include <lustre_quota.h>
49
50 #include "mgs_internal.h"
51
52 /********************** Class functions ********************/
53
54 /* Find all logs in CONFIG directory and link then into list */
55 int class_dentry_readdir(const struct lu_env *env,
56                          struct mgs_device *mgs, struct list_head *log_list)
57 {
58         struct dt_object    *dir = mgs->mgs_configs_dir;
59         const struct dt_it_ops *iops;
60         struct dt_it        *it;
61         struct mgs_direntry *de;
62         char                *key;
63         int                  rc, key_sz;
64
65         INIT_LIST_HEAD(log_list);
66
67         LASSERT(dir);
68         LASSERT(dir->do_index_ops);
69
70         iops = &dir->do_index_ops->dio_it;
71         it = iops->init(env, dir, LUDA_64BITHASH);
72         if (IS_ERR(it))
73                 RETURN(PTR_ERR(it));
74
75         rc = iops->load(env, it, 0);
76         if (rc <= 0)
77                 GOTO(fini, rc = 0);
78
79         /* main cycle */
80         do {
81                 key = (void *)iops->key(env, it);
82                 if (IS_ERR(key)) {
83                         CERROR("%s: key failed when listing %s: rc = %d\n",
84                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
85                                (int) PTR_ERR(key));
86                         goto next;
87                 }
88                 key_sz = iops->key_size(env, it);
89                 LASSERT(key_sz > 0);
90
91                 /* filter out "." and ".." entries */
92                 if (key[0] == '.') {
93                         if (key_sz == 1)
94                                 goto next;
95                         if (key_sz == 2 && key[1] == '.')
96                                 goto next;
97                 }
98
99                 /* filter out ".bak" files */
100                 /* sizeof(".bak") - 1 == 3 */
101                 if (key_sz >= 3 &&
102                     !memcmp(".bak", key + key_sz - 3, 3)) {
103                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
104                                key_sz, key);
105                         goto next;
106                 }
107
108                 de = mgs_direntry_alloc(key_sz + 1);
109                 if (de == NULL) {
110                         rc = -ENOMEM;
111                         break;
112                 }
113
114                 memcpy(de->mde_name, key, key_sz);
115                 de->mde_name[key_sz] = 0;
116
117                 list_add(&de->mde_list, log_list);
118
119 next:
120                 rc = iops->next(env, it);
121         } while (rc == 0);
122         if (rc > 0)
123                 rc = 0;
124
125         iops->put(env, it);
126
127 fini:
128         iops->fini(env, it);
129         if (rc)
130                 CERROR("%s: key failed when listing %s: rc = %d\n",
131                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
132         RETURN(rc);
133 }
134
135 /******************** DB functions *********************/
136
137 static inline int name_create(char **newname, char *prefix, char *suffix)
138 {
139         LASSERT(newname);
140         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
141         if (!*newname)
142                 return -ENOMEM;
143         sprintf(*newname, "%s%s", prefix, suffix);
144         return 0;
145 }
146
147 static inline void name_destroy(char **name)
148 {
149         if (*name)
150                 OBD_FREE(*name, strlen(*name) + 1);
151         *name = NULL;
152 }
153
154 struct mgs_fsdb_handler_data
155 {
156         struct fs_db   *fsdb;
157         __u32           ver;
158 };
159
160 /* from the (client) config log, figure out:
161         1. which ost's/mdt's are configured (by index)
162         2. what the last config step is
163         3. COMPAT_18 osc name
164 */
165 /* It might be better to have a separate db file, instead of parsing the info
166    out of the client log.  This is slow and potentially error-prone. */
167 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
168                             struct llog_rec_hdr *rec, void *data)
169 {
170         struct mgs_fsdb_handler_data *d = data;
171         struct fs_db *fsdb = d->fsdb;
172         int cfg_len = rec->lrh_len;
173         char *cfg_buf = (char*) (rec + 1);
174         struct lustre_cfg *lcfg;
175         __u32 index;
176         int rc = 0;
177         ENTRY;
178
179         if (rec->lrh_type != OBD_CFG_REC) {
180                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
181                 RETURN(-EINVAL);
182         }
183
184         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
185         if (rc) {
186                 CERROR("Insane cfg\n");
187                 RETURN(rc);
188         }
189
190         lcfg = (struct lustre_cfg *)cfg_buf;
191
192         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
193                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
194
195         /* Figure out ost indicies */
196         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
197         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
198             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
199                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
200                                        NULL, 10);
201                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
202                        lustre_cfg_string(lcfg, 1), index,
203                        lustre_cfg_string(lcfg, 2));
204                 set_bit(index, fsdb->fsdb_ost_index_map);
205         }
206
207         /* Figure out mdt indicies */
208         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
209         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
210             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
211                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
212                                        &index, NULL);
213                 if (rc != LDD_F_SV_TYPE_MDT) {
214                         CWARN("Unparsable MDC name %s, assuming index 0\n",
215                               lustre_cfg_string(lcfg, 0));
216                         index = 0;
217                 }
218                 rc = 0;
219                 CDEBUG(D_MGS, "MDT index is %u\n", index);
220                 set_bit(index, fsdb->fsdb_mdt_index_map);
221                 fsdb->fsdb_mdt_count ++;
222         }
223
224         /**
225          * figure out the old config. fsdb_gen = 0 means old log
226          * It is obsoleted and not supported anymore
227          */
228         if (fsdb->fsdb_gen == 0) {
229                 CERROR("Old config format is not supported\n");
230                 RETURN(-EINVAL);
231         }
232
233         /*
234          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
235          */
236         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
237             lcfg->lcfg_command == LCFG_ATTACH &&
238             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
239                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
240                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
241                         CWARN("MDT using 1.8 OSC name scheme\n");
242                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
243                 }
244         }
245
246         if (lcfg->lcfg_command == LCFG_MARKER) {
247                 struct cfg_marker *marker;
248                 marker = lustre_cfg_buf(lcfg, 1);
249
250                 d->ver = marker->cm_vers;
251
252                 /* Keep track of the latest marker step */
253                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
254         }
255
256         RETURN(rc);
257 }
258
259 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
260 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
261                                   struct mgs_device *mgs,
262                                   struct fs_db *fsdb)
263 {
264         char                            *logname;
265         struct llog_handle              *loghandle;
266         struct llog_ctxt                *ctxt;
267         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
268         int rc;
269
270         ENTRY;
271
272         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
273         LASSERT(ctxt != NULL);
274         rc = name_create(&logname, fsdb->fsdb_name, "-client");
275         if (rc)
276                 GOTO(out_put, rc);
277         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
278         if (rc)
279                 GOTO(out_pop, rc);
280
281         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
282         if (rc)
283                 GOTO(out_close, rc);
284
285         if (llog_get_size(loghandle) <= 1)
286                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
287
288         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
289         CDEBUG(D_INFO, "get_db = %d\n", rc);
290 out_close:
291         llog_close(env, loghandle);
292 out_pop:
293         name_destroy(&logname);
294 out_put:
295         llog_ctxt_put(ctxt);
296
297         RETURN(rc);
298 }
299
300 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
301 {
302         struct mgs_tgt_srpc_conf *tgtconf;
303
304         /* free target-specific rules */
305         while (fsdb->fsdb_srpc_tgt) {
306                 tgtconf = fsdb->fsdb_srpc_tgt;
307                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
308
309                 LASSERT(tgtconf->mtsc_tgt);
310
311                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
312                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
313                 OBD_FREE_PTR(tgtconf);
314         }
315
316         /* free general rules */
317         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
318 }
319
320 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
321 {
322         struct fs_db *fsdb;
323         struct list_head *tmp;
324
325         list_for_each(tmp, &mgs->mgs_fs_db_list) {
326                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
327                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
328                         return fsdb;
329         }
330         return NULL;
331 }
332
333 /* caller must hold the mgs->mgs_fs_db_lock */
334 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
335                                   struct mgs_device *mgs, char *fsname)
336 {
337         struct fs_db *fsdb;
338         int rc;
339         ENTRY;
340
341         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
342                 CERROR("fsname %s is too long\n", fsname);
343                 RETURN(ERR_PTR(-EINVAL));
344         }
345
346         OBD_ALLOC_PTR(fsdb);
347         if (!fsdb)
348                 RETURN(ERR_PTR(-ENOMEM));
349
350         strcpy(fsdb->fsdb_name, fsname);
351         mutex_init(&fsdb->fsdb_mutex);
352         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
353         fsdb->fsdb_gen = 1;
354
355         if (strcmp(fsname, MGSSELF_NAME) == 0) {
356                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
357                 fsdb->fsdb_mgs = mgs;
358         } else {
359                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
360                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
361                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
362                         CERROR("No memory for index maps\n");
363                         GOTO(err, rc = -ENOMEM);
364                 }
365
366                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
367                 if (rc)
368                         GOTO(err, rc);
369                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
370                 if (rc)
371                         GOTO(err, rc);
372
373                 /* initialise data for NID table */
374                 mgs_ir_init_fs(env, mgs, fsdb);
375
376                 lproc_mgs_add_live(mgs, fsdb);
377         }
378
379         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
380
381         RETURN(fsdb);
382 err:
383         if (fsdb->fsdb_ost_index_map)
384                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
385         if (fsdb->fsdb_mdt_index_map)
386                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
387         name_destroy(&fsdb->fsdb_clilov);
388         name_destroy(&fsdb->fsdb_clilmv);
389         OBD_FREE_PTR(fsdb);
390         RETURN(ERR_PTR(rc));
391 }
392
393 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
394 {
395         /* wait for anyone with the sem */
396         mutex_lock(&fsdb->fsdb_mutex);
397         lproc_mgs_del_live(mgs, fsdb);
398         list_del(&fsdb->fsdb_list);
399
400         /* deinitialize fsr */
401         mgs_ir_fini_fs(mgs, fsdb);
402
403         if (fsdb->fsdb_ost_index_map)
404                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
405         if (fsdb->fsdb_mdt_index_map)
406                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
407         name_destroy(&fsdb->fsdb_clilov);
408         name_destroy(&fsdb->fsdb_clilmv);
409         mgs_free_fsdb_srpc(fsdb);
410         mutex_unlock(&fsdb->fsdb_mutex);
411         OBD_FREE_PTR(fsdb);
412 }
413
414 int mgs_init_fsdb_list(struct mgs_device *mgs)
415 {
416         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
417         return 0;
418 }
419
420 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
421 {
422         struct fs_db *fsdb;
423         struct list_head *tmp, *tmp2;
424
425         mutex_lock(&mgs->mgs_mutex);
426         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
427                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
428                 mgs_free_fsdb(mgs, fsdb);
429         }
430         mutex_unlock(&mgs->mgs_mutex);
431         return 0;
432 }
433
434 int mgs_find_or_make_fsdb(const struct lu_env *env,
435                           struct mgs_device *mgs, char *name,
436                           struct fs_db **dbh)
437 {
438         struct fs_db *fsdb;
439         int rc = 0;
440
441         ENTRY;
442         mutex_lock(&mgs->mgs_mutex);
443         fsdb = mgs_find_fsdb(mgs, name);
444         if (fsdb) {
445                 mutex_unlock(&mgs->mgs_mutex);
446                 *dbh = fsdb;
447                 RETURN(0);
448         }
449
450         CDEBUG(D_MGS, "Creating new db\n");
451         fsdb = mgs_new_fsdb(env, mgs, name);
452         /* lock fsdb_mutex until the db is loaded from llogs */
453         if (!IS_ERR(fsdb))
454                 mutex_lock(&fsdb->fsdb_mutex);
455         mutex_unlock(&mgs->mgs_mutex);
456         if (IS_ERR(fsdb))
457                 RETURN(PTR_ERR(fsdb));
458
459         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
460                 /* populate the db from the client llog */
461                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
462                 if (rc) {
463                         CERROR("Can't get db from client log %d\n", rc);
464                         GOTO(out_free, rc);
465                 }
466         }
467
468         /* populate srpc rules from params llog */
469         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
470         if (rc) {
471                 CERROR("Can't get db from params log %d\n", rc);
472                 GOTO(out_free, rc);
473         }
474
475         mutex_unlock(&fsdb->fsdb_mutex);
476         *dbh = fsdb;
477
478         RETURN(0);
479
480 out_free:
481         mutex_unlock(&fsdb->fsdb_mutex);
482         mgs_free_fsdb(mgs, fsdb);
483         return rc;
484 }
485
486 /* 1 = index in use
487    0 = index unused
488    -1= empty client log */
489 int mgs_check_index(const struct lu_env *env,
490                     struct mgs_device *mgs,
491                     struct mgs_target_info *mti)
492 {
493         struct fs_db *fsdb;
494         void *imap;
495         int rc = 0;
496         ENTRY;
497
498         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
499
500         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
501         if (rc) {
502                 CERROR("Can't get db for %s\n", mti->mti_fsname);
503                 RETURN(rc);
504         }
505
506         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
507                 RETURN(-1);
508
509         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
510                 imap = fsdb->fsdb_ost_index_map;
511         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
512                 imap = fsdb->fsdb_mdt_index_map;
513         else
514                 RETURN(-EINVAL);
515
516         if (test_bit(mti->mti_stripe_index, imap))
517                 RETURN(1);
518         RETURN(0);
519 }
520
521 static __inline__ int next_index(void *index_map, int map_len)
522 {
523         int i;
524         for (i = 0; i < map_len * 8; i++)
525                 if (!test_bit(i, index_map)) {
526                          return i;
527                  }
528         CERROR("max index %d exceeded.\n", i);
529         return -1;
530 }
531
532 /* Return codes:
533         0  newly marked as in use
534         <0 err
535         +EALREADY for update of an old index */
536 static int mgs_set_index(const struct lu_env *env,
537                          struct mgs_device *mgs,
538                          struct mgs_target_info *mti)
539 {
540         struct fs_db *fsdb;
541         void *imap;
542         int rc = 0;
543         ENTRY;
544
545         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
546         if (rc) {
547                 CERROR("Can't get db for %s\n", mti->mti_fsname);
548                 RETURN(rc);
549         }
550
551         mutex_lock(&fsdb->fsdb_mutex);
552         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
553                 imap = fsdb->fsdb_ost_index_map;
554         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
555                 imap = fsdb->fsdb_mdt_index_map;
556         } else {
557                 GOTO(out_up, rc = -EINVAL);
558         }
559
560         if (mti->mti_flags & LDD_F_NEED_INDEX) {
561                 rc = next_index(imap, INDEX_MAP_SIZE);
562                 if (rc == -1)
563                         GOTO(out_up, rc = -ERANGE);
564                 mti->mti_stripe_index = rc;
565                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
566                         fsdb->fsdb_mdt_count ++;
567         }
568
569         /* the last index(0xffff) is reserved for default value. */
570         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
571                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
572                                    "but index must be less than %u.\n",
573                                    mti->mti_svname, mti->mti_stripe_index,
574                                    INDEX_MAP_SIZE * 8 - 1);
575                 GOTO(out_up, rc = -ERANGE);
576         }
577
578         if (test_bit(mti->mti_stripe_index, imap)) {
579                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
580                     !(mti->mti_flags & LDD_F_WRITECONF)) {
581                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
582                                            "%d, but that index is already in "
583                                            "use. Use --writeconf to force\n",
584                                            mti->mti_svname,
585                                            mti->mti_stripe_index);
586                         GOTO(out_up, rc = -EADDRINUSE);
587                 } else {
588                         CDEBUG(D_MGS, "Server %s updating index %d\n",
589                                mti->mti_svname, mti->mti_stripe_index);
590                         GOTO(out_up, rc = EALREADY);
591                 }
592         }
593
594         set_bit(mti->mti_stripe_index, imap);
595         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
596         mutex_unlock(&fsdb->fsdb_mutex);
597         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
598                              mti->mti_stripe_index, mti->mti_fsname,
599                              mti->mti_svname)) {
600                 CERROR("unknown server type %#x\n", mti->mti_flags);
601                 return -EINVAL;
602         }
603
604         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
605                mti->mti_stripe_index);
606
607         RETURN(0);
608 out_up:
609         mutex_unlock(&fsdb->fsdb_mutex);
610         return rc;
611 }
612
613 struct mgs_modify_lookup {
614         struct cfg_marker mml_marker;
615         int               mml_modified;
616 };
617
618 static int mgs_modify_handler(const struct lu_env *env,
619                               struct llog_handle *llh,
620                               struct llog_rec_hdr *rec, void *data)
621 {
622         struct mgs_modify_lookup *mml = data;
623         struct cfg_marker *marker;
624         struct lustre_cfg *lcfg = REC_DATA(rec);
625         int cfg_len = REC_DATA_LEN(rec);
626         int rc;
627         ENTRY;
628
629         if (rec->lrh_type != OBD_CFG_REC) {
630                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
631                 RETURN(-EINVAL);
632         }
633
634         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
635         if (rc) {
636                 CERROR("Insane cfg\n");
637                 RETURN(rc);
638         }
639
640         /* We only care about markers */
641         if (lcfg->lcfg_command != LCFG_MARKER)
642                 RETURN(0);
643
644         marker = lustre_cfg_buf(lcfg, 1);
645         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
646             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
647             !(marker->cm_flags & CM_SKIP)) {
648                 /* Found a non-skipped marker match */
649                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
650                        rec->lrh_index, marker->cm_step,
651                        marker->cm_flags, mml->mml_marker.cm_flags,
652                        marker->cm_tgtname, marker->cm_comment);
653                 /* Overwrite the old marker llog entry */
654                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
655                 marker->cm_flags |= mml->mml_marker.cm_flags;
656                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
657                 rc = llog_write(env, llh, rec, rec->lrh_index);
658                 if (!rc)
659                          mml->mml_modified++;
660         }
661
662         RETURN(rc);
663 }
664
665 /**
666  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
667  * Return code:
668  * 0 - modified successfully,
669  * 1 - no modification was done
670  * negative - error
671  */
672 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
673                       struct fs_db *fsdb, struct mgs_target_info *mti,
674                       char *logname, char *devname, char *comment, int flags)
675 {
676         struct llog_handle *loghandle;
677         struct llog_ctxt *ctxt;
678         struct mgs_modify_lookup *mml;
679         int rc;
680
681         ENTRY;
682
683         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
684         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
685                flags);
686
687         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
688         LASSERT(ctxt != NULL);
689         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
690         if (rc < 0) {
691                 if (rc == -ENOENT)
692                         rc = 0;
693                 GOTO(out_pop, rc);
694         }
695
696         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
697         if (rc)
698                 GOTO(out_close, rc);
699
700         if (llog_get_size(loghandle) <= 1)
701                 GOTO(out_close, rc = 0);
702
703         OBD_ALLOC_PTR(mml);
704         if (!mml)
705                 GOTO(out_close, rc = -ENOMEM);
706         if (strlcpy(mml->mml_marker.cm_comment, comment,
707                     sizeof(mml->mml_marker.cm_comment)) >=
708             sizeof(mml->mml_marker.cm_comment))
709                 GOTO(out_free, rc = -E2BIG);
710         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
711                     sizeof(mml->mml_marker.cm_tgtname)) >=
712             sizeof(mml->mml_marker.cm_tgtname))
713                 GOTO(out_free, rc = -E2BIG);
714         /* Modify mostly means cancel */
715         mml->mml_marker.cm_flags = flags;
716         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
717         mml->mml_modified = 0;
718         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
719                           NULL);
720         if (!rc && !mml->mml_modified)
721                 rc = 1;
722
723 out_free:
724         OBD_FREE_PTR(mml);
725
726 out_close:
727         llog_close(env, loghandle);
728 out_pop:
729         if (rc < 0)
730                 CERROR("%s: modify %s/%s failed: rc = %d\n",
731                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
732         llog_ctxt_put(ctxt);
733         RETURN(rc);
734 }
735
736 /** This structure is passed to mgs_replace_handler */
737 struct mgs_replace_uuid_lookup {
738         /* Nids are replaced for this target device */
739         struct mgs_target_info target;
740         /* Temporary modified llog */
741         struct llog_handle *temp_llh;
742         /* Flag is set if in target block*/
743         int in_target_device;
744         /* Nids already added. Just skip (multiple nids) */
745         int device_nids_added;
746         /* Flag is set if this block should not be copied */
747         int skip_it;
748 };
749
750 /**
751  * Check: a) if block should be skipped
752  * b) is it target block
753  *
754  * \param[in] lcfg
755  * \param[in] mrul
756  *
757  * \retval 0 should not to be skipped
758  * \retval 1 should to be skipped
759  */
760 static int check_markers(struct lustre_cfg *lcfg,
761                          struct mgs_replace_uuid_lookup *mrul)
762 {
763          struct cfg_marker *marker;
764
765         /* Track markers. Find given device */
766         if (lcfg->lcfg_command == LCFG_MARKER) {
767                 marker = lustre_cfg_buf(lcfg, 1);
768                 /* Clean llog from records marked as CM_EXCLUDE.
769                    CM_SKIP records are used for "active" command
770                    and can be restored if needed */
771                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
772                     (CM_EXCLUDE | CM_START)) {
773                         mrul->skip_it = 1;
774                         return 1;
775                 }
776
777                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
778                     (CM_EXCLUDE | CM_END)) {
779                         mrul->skip_it = 0;
780                         return 1;
781                 }
782
783                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
784                         LASSERT(!(marker->cm_flags & CM_START) ||
785                                 !(marker->cm_flags & CM_END));
786                         if (marker->cm_flags & CM_START) {
787                                 mrul->in_target_device = 1;
788                                 mrul->device_nids_added = 0;
789                         } else if (marker->cm_flags & CM_END)
790                                 mrul->in_target_device = 0;
791                 }
792         }
793
794         return 0;
795 }
796
797 static int record_base(const struct lu_env *env, struct llog_handle *llh,
798                      char *cfgname, lnet_nid_t nid, int cmd,
799                      char *s1, char *s2, char *s3, char *s4)
800 {
801         struct mgs_thread_info  *mgi = mgs_env_info(env);
802         struct llog_cfg_rec     *lcr;
803         int rc;
804
805         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
806                cmd, s1, s2, s3, s4);
807
808         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
809         if (s1)
810                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
811         if (s2)
812                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
813         if (s3)
814                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
815         if (s4)
816                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
817
818         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
819         if (lcr == NULL)
820                 return -ENOMEM;
821
822         lcr->lcr_cfg.lcfg_nid = nid;
823         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
824
825         lustre_cfg_rec_free(lcr);
826
827         if (rc < 0)
828                 CDEBUG(D_MGS,
829                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
830                        cfgname, cmd, s1, s2, s3, s4, rc);
831         return rc;
832 }
833
834 static inline int record_add_uuid(const struct lu_env *env,
835                                   struct llog_handle *llh,
836                                   uint64_t nid, char *uuid)
837 {
838         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
839                            NULL, NULL, NULL);
840 }
841
842 static inline int record_add_conn(const struct lu_env *env,
843                                   struct llog_handle *llh,
844                                   char *devname, char *uuid)
845 {
846         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
847                            NULL, NULL, NULL);
848 }
849
850 static inline int record_attach(const struct lu_env *env,
851                                 struct llog_handle *llh, char *devname,
852                                 char *type, char *uuid)
853 {
854         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
855                            NULL, NULL);
856 }
857
858 static inline int record_setup(const struct lu_env *env,
859                                struct llog_handle *llh, char *devname,
860                                char *s1, char *s2, char *s3, char *s4)
861 {
862         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
863 }
864
865 /**
866  * \retval <0 record processing error
867  * \retval n record is processed. No need copy original one.
868  * \retval 0 record is not processed.
869  */
870 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
871                            struct mgs_replace_uuid_lookup *mrul)
872 {
873         int nids_added = 0;
874         lnet_nid_t nid;
875         char *ptr;
876         int rc;
877
878         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
879                 /* LCFG_ADD_UUID command found. Let's skip original command
880                    and add passed nids */
881                 ptr = mrul->target.mti_params;
882                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
883                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
884                                "device %s\n", libcfs_nid2str(nid),
885                                 mrul->target.mti_params,
886                                 mrul->target.mti_svname);
887                         rc = record_add_uuid(env,
888                                              mrul->temp_llh, nid,
889                                              mrul->target.mti_params);
890                         if (!rc)
891                                 nids_added++;
892                 }
893
894                 if (nids_added == 0) {
895                         CERROR("No new nids were added, nid %s with uuid %s, "
896                                "device %s\n", libcfs_nid2str(nid),
897                                mrul->target.mti_params,
898                                mrul->target.mti_svname);
899                         RETURN(-ENXIO);
900                 } else {
901                         mrul->device_nids_added = 1;
902                 }
903
904                 return nids_added;
905         }
906
907         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
908                 /* LCFG_SETUP command found. UUID should be changed */
909                 rc = record_setup(env,
910                                   mrul->temp_llh,
911                                   /* devname the same */
912                                   lustre_cfg_string(lcfg, 0),
913                                   /* s1 is not changed */
914                                   lustre_cfg_string(lcfg, 1),
915                                   /* new uuid should be
916                                   the full nidlist */
917                                   mrul->target.mti_params,
918                                   /* s3 is not changed */
919                                   lustre_cfg_string(lcfg, 3),
920                                   /* s4 is not changed */
921                                   lustre_cfg_string(lcfg, 4));
922                 return rc ? rc : 1;
923         }
924
925         /* Another commands in target device block */
926         return 0;
927 }
928
929 /**
930  * Handler that called for every record in llog.
931  * Records are processed in order they placed in llog.
932  *
933  * \param[in] llh       log to be processed
934  * \param[in] rec       current record
935  * \param[in] data      mgs_replace_uuid_lookup structure
936  *
937  * \retval 0    success
938  */
939 static int mgs_replace_handler(const struct lu_env *env,
940                                struct llog_handle *llh,
941                                struct llog_rec_hdr *rec,
942                                void *data)
943 {
944         struct mgs_replace_uuid_lookup *mrul;
945         struct lustre_cfg *lcfg = REC_DATA(rec);
946         int cfg_len = REC_DATA_LEN(rec);
947         int rc;
948         ENTRY;
949
950         mrul = (struct mgs_replace_uuid_lookup *)data;
951
952         if (rec->lrh_type != OBD_CFG_REC) {
953                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
954                        rec->lrh_type, lcfg->lcfg_command,
955                        lustre_cfg_string(lcfg, 0),
956                        lustre_cfg_string(lcfg, 1));
957                 RETURN(-EINVAL);
958         }
959
960         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
961         if (rc) {
962                 /* Do not copy any invalidated records */
963                 GOTO(skip_out, rc = 0);
964         }
965
966         rc = check_markers(lcfg, mrul);
967         if (rc || mrul->skip_it)
968                 GOTO(skip_out, rc = 0);
969
970         /* Write to new log all commands outside target device block */
971         if (!mrul->in_target_device)
972                 GOTO(copy_out, rc = 0);
973
974         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
975            (failover nids) for this target, assuming that if then
976            primary is changing then so is the failover */
977         if (mrul->device_nids_added &&
978             (lcfg->lcfg_command == LCFG_ADD_UUID ||
979              lcfg->lcfg_command == LCFG_ADD_CONN))
980                 GOTO(skip_out, rc = 0);
981
982         rc = process_command(env, lcfg, mrul);
983         if (rc < 0)
984                 RETURN(rc);
985
986         if (rc)
987                 RETURN(0);
988 copy_out:
989         /* Record is placed in temporary llog as is */
990         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
991
992         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
993                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
994                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
995         RETURN(rc);
996
997 skip_out:
998         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
999                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1000                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1001         RETURN(rc);
1002 }
1003
1004 static int mgs_log_is_empty(const struct lu_env *env,
1005                             struct mgs_device *mgs, char *name)
1006 {
1007         struct llog_ctxt        *ctxt;
1008         int                      rc;
1009
1010         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1011         LASSERT(ctxt != NULL);
1012
1013         rc = llog_is_empty(env, ctxt, name);
1014         llog_ctxt_put(ctxt);
1015         return rc;
1016 }
1017
1018 static int mgs_replace_nids_log(const struct lu_env *env,
1019                                 struct obd_device *mgs, struct fs_db *fsdb,
1020                                 char *logname, char *devname, char *nids)
1021 {
1022         struct llog_handle *orig_llh, *backup_llh;
1023         struct llog_ctxt *ctxt;
1024         struct mgs_replace_uuid_lookup *mrul;
1025         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1026         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1027         char *backup;
1028         int rc, rc2;
1029         ENTRY;
1030
1031         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1032
1033         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1034         LASSERT(ctxt != NULL);
1035
1036         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1037                 /* Log is empty. Nothing to replace */
1038                 GOTO(out_put, rc = 0);
1039         }
1040
1041         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1042         if (backup == NULL)
1043                 GOTO(out_put, rc = -ENOMEM);
1044
1045         sprintf(backup, "%s.bak", logname);
1046
1047         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1048         if (rc == 0) {
1049                 /* Now erase original log file. Connections are not allowed.
1050                    Backup is already saved */
1051                 rc = llog_erase(env, ctxt, NULL, logname);
1052                 if (rc < 0)
1053                         GOTO(out_free, rc);
1054         } else if (rc != -ENOENT) {
1055                 CERROR("%s: can't make backup for %s: rc = %d\n",
1056                        mgs->obd_name, logname, rc);
1057                 GOTO(out_free,rc);
1058         }
1059
1060         /* open local log */
1061         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1062         if (rc)
1063                 GOTO(out_restore, rc);
1064
1065         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1066         if (rc)
1067                 GOTO(out_closel, rc);
1068
1069         /* open backup llog */
1070         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1071                        LLOG_OPEN_EXISTS);
1072         if (rc)
1073                 GOTO(out_closel, rc);
1074
1075         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1076         if (rc)
1077                 GOTO(out_close, rc);
1078
1079         if (llog_get_size(backup_llh) <= 1)
1080                 GOTO(out_close, rc = 0);
1081
1082         OBD_ALLOC_PTR(mrul);
1083         if (!mrul)
1084                 GOTO(out_close, rc = -ENOMEM);
1085         /* devname is only needed information to replace UUID records */
1086         strlcpy(mrul->target.mti_svname, devname,
1087                 sizeof(mrul->target.mti_svname));
1088         /* parse nids later */
1089         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1090         /* Copy records to this temporary llog */
1091         mrul->temp_llh = orig_llh;
1092
1093         rc = llog_process(env, backup_llh, mgs_replace_handler,
1094                           (void *)mrul, NULL);
1095         OBD_FREE_PTR(mrul);
1096 out_close:
1097         rc2 = llog_close(NULL, backup_llh);
1098         if (!rc)
1099                 rc = rc2;
1100 out_closel:
1101         rc2 = llog_close(NULL, orig_llh);
1102         if (!rc)
1103                 rc = rc2;
1104
1105 out_restore:
1106         if (rc) {
1107                 CERROR("%s: llog should be restored: rc = %d\n",
1108                        mgs->obd_name, rc);
1109                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1110                                   logname);
1111                 if (rc2 < 0)
1112                         CERROR("%s: can't restore backup %s: rc = %d\n",
1113                                mgs->obd_name, logname, rc2);
1114         }
1115
1116 out_free:
1117         OBD_FREE(backup, strlen(backup) + 1);
1118
1119 out_put:
1120         llog_ctxt_put(ctxt);
1121
1122         if (rc)
1123                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1124                        mgs->obd_name, logname, rc);
1125
1126         RETURN(rc);
1127 }
1128
1129 /**
1130  * Parse device name and get file system name and/or device index
1131  *
1132  * \param[in]   devname device name (ex. lustre-MDT0000)
1133  * \param[out]  fsname  file system name(optional)
1134  * \param[out]  index   device index(optional)
1135  *
1136  * \retval 0    success
1137  */
1138 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1139 {
1140         int rc;
1141         ENTRY;
1142
1143         /* Extract fsname */
1144         if (fsname) {
1145                 rc = server_name2fsname(devname, fsname, NULL);
1146                 if (rc < 0) {
1147                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1148                                devname);
1149                         RETURN(-EINVAL);
1150                 }
1151         }
1152
1153         if (index) {
1154                 rc = server_name2index(devname, index, NULL);
1155                 if (rc < 0) {
1156                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1157                                devname);
1158                         RETURN(-EINVAL);
1159                 }
1160         }
1161
1162         RETURN(0);
1163 }
1164
1165 /* This is only called during replace_nids */
1166 static int only_mgs_is_running(struct obd_device *mgs_obd)
1167 {
1168         /* TDB: Is global variable with devices count exists? */
1169         int num_devices = get_devices_count();
1170         int num_exports = 0;
1171         struct obd_export *exp;
1172
1173         spin_lock(&mgs_obd->obd_dev_lock);
1174         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1175                 /* skip self export */
1176                 if (exp == mgs_obd->obd_self_export)
1177                         continue;
1178                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1179                         continue;
1180
1181                 ++num_exports;
1182
1183                 CERROR("%s: node %s still connected during replace_nids "
1184                        "connect_flags:%llx\n",
1185                        mgs_obd->obd_name,
1186                        libcfs_nid2str(exp->exp_nid_stats->nid),
1187                        exp_connect_flags(exp));
1188
1189         }
1190         spin_unlock(&mgs_obd->obd_dev_lock);
1191
1192         /* osd, MGS and MGC + self_export
1193            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1194         return (num_devices <= 3) && (num_exports == 0);
1195 }
1196
1197 static int name_create_mdt(char **logname, char *fsname, int i)
1198 {
1199         char mdt_index[9];
1200
1201         sprintf(mdt_index, "-MDT%04x", i);
1202         return name_create(logname, fsname, mdt_index);
1203 }
1204
1205 /**
1206  * Replace nids for \a device to \a nids values
1207  *
1208  * \param obd           MGS obd device
1209  * \param devname       nids need to be replaced for this device
1210  * (ex. lustre-OST0000)
1211  * \param nids          nids list (ex. nid1,nid2,nid3)
1212  *
1213  * \retval 0    success
1214  */
1215 int mgs_replace_nids(const struct lu_env *env,
1216                      struct mgs_device *mgs,
1217                      char *devname, char *nids)
1218 {
1219         /* Assume fsname is part of device name */
1220         char fsname[MTI_NAME_MAXLEN];
1221         int rc;
1222         __u32 index;
1223         char *logname;
1224         struct fs_db *fsdb;
1225         unsigned int i;
1226         int conn_state;
1227         struct obd_device *mgs_obd = mgs->mgs_obd;
1228         ENTRY;
1229
1230         /* We can only change NIDs if no other nodes are connected */
1231         spin_lock(&mgs_obd->obd_dev_lock);
1232         conn_state = mgs_obd->obd_no_conn;
1233         mgs_obd->obd_no_conn = 1;
1234         spin_unlock(&mgs_obd->obd_dev_lock);
1235
1236         /* We can not change nids if not only MGS is started */
1237         if (!only_mgs_is_running(mgs_obd)) {
1238                 CERROR("Only MGS is allowed to be started\n");
1239                 GOTO(out, rc = -EINPROGRESS);
1240         }
1241
1242         /* Get fsname and index*/
1243         rc = mgs_parse_devname(devname, fsname, &index);
1244         if (rc)
1245                 GOTO(out, rc);
1246
1247         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1248         if (rc) {
1249                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1250                 GOTO(out, rc);
1251         }
1252
1253         /* Process client llogs */
1254         name_create(&logname, fsname, "-client");
1255         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1256         name_destroy(&logname);
1257         if (rc) {
1258                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1259                        fsname, devname, rc);
1260                 GOTO(out, rc);
1261         }
1262
1263         /* Process MDT llogs */
1264         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1265                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1266                         continue;
1267                 name_create_mdt(&logname, fsname, i);
1268                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1269                 name_destroy(&logname);
1270                 if (rc)
1271                         GOTO(out, rc);
1272         }
1273
1274 out:
1275         spin_lock(&mgs_obd->obd_dev_lock);
1276         mgs_obd->obd_no_conn = conn_state;
1277         spin_unlock(&mgs_obd->obd_dev_lock);
1278
1279         RETURN(rc);
1280 }
1281
1282 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1283                             char *devname, struct lov_desc *desc)
1284 {
1285         struct mgs_thread_info  *mgi = mgs_env_info(env);
1286         struct llog_cfg_rec     *lcr;
1287         int rc;
1288
1289         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1290         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1291         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1292         if (lcr == NULL)
1293                 return -ENOMEM;
1294
1295         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1296         lustre_cfg_rec_free(lcr);
1297         return rc;
1298 }
1299
1300 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1301                             char *devname, struct lmv_desc *desc)
1302 {
1303         struct mgs_thread_info  *mgi = mgs_env_info(env);
1304         struct llog_cfg_rec     *lcr;
1305         int rc;
1306
1307         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1308         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1309         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1310         if (lcr == NULL)
1311                 return -ENOMEM;
1312
1313         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1314         lustre_cfg_rec_free(lcr);
1315         return rc;
1316 }
1317
1318 static inline int record_mdc_add(const struct lu_env *env,
1319                                  struct llog_handle *llh,
1320                                  char *logname, char *mdcuuid,
1321                                  char *mdtuuid, char *index,
1322                                  char *gen)
1323 {
1324         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1325                            mdtuuid,index,gen,mdcuuid);
1326 }
1327
1328 static inline int record_lov_add(const struct lu_env *env,
1329                                  struct llog_handle *llh,
1330                                  char *lov_name, char *ost_uuid,
1331                                  char *index, char *gen)
1332 {
1333         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1334                            ost_uuid, index, gen, NULL);
1335 }
1336
1337 static inline int record_mount_opt(const struct lu_env *env,
1338                                    struct llog_handle *llh,
1339                                    char *profile, char *lov_name,
1340                                    char *mdc_name)
1341 {
1342         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1343                            profile, lov_name, mdc_name, NULL);
1344 }
1345
1346 static int record_marker(const struct lu_env *env,
1347                          struct llog_handle *llh,
1348                          struct fs_db *fsdb, __u32 flags,
1349                          char *tgtname, char *comment)
1350 {
1351         struct mgs_thread_info *mgi = mgs_env_info(env);
1352         struct llog_cfg_rec *lcr;
1353         int rc;
1354         int cplen = 0;
1355
1356         if (flags & CM_START)
1357                 fsdb->fsdb_gen++;
1358         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1359         mgi->mgi_marker.cm_flags = flags;
1360         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1361         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1362                         sizeof(mgi->mgi_marker.cm_tgtname));
1363         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1364                 return -E2BIG;
1365         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1366                         sizeof(mgi->mgi_marker.cm_comment));
1367         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1368                 return -E2BIG;
1369         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1370         mgi->mgi_marker.cm_canceltime = 0;
1371         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1372         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1373                             sizeof(mgi->mgi_marker));
1374         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1375         if (lcr == NULL)
1376                 return -ENOMEM;
1377
1378         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1379         lustre_cfg_rec_free(lcr);
1380         return rc;
1381 }
1382
1383 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1384                             struct llog_handle **llh, char *name)
1385 {
1386         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1387         struct llog_ctxt        *ctxt;
1388         int                      rc = 0;
1389         ENTRY;
1390
1391         if (*llh)
1392                 GOTO(out, rc = -EBUSY);
1393
1394         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1395         if (!ctxt)
1396                 GOTO(out, rc = -ENODEV);
1397         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1398
1399         rc = llog_open_create(env, ctxt, llh, NULL, name);
1400         if (rc)
1401                 GOTO(out_ctxt, rc);
1402         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1403         if (rc)
1404                 llog_close(env, *llh);
1405 out_ctxt:
1406         llog_ctxt_put(ctxt);
1407 out:
1408         if (rc) {
1409                 CERROR("%s: can't start log %s: rc = %d\n",
1410                        mgs->mgs_obd->obd_name, name, rc);
1411                 *llh = NULL;
1412         }
1413         RETURN(rc);
1414 }
1415
1416 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1417 {
1418         int rc;
1419
1420         rc = llog_close(env, *llh);
1421         *llh = NULL;
1422
1423         return rc;
1424 }
1425
1426 /******************** config "macros" *********************/
1427
1428 /* write an lcfg directly into a log (with markers) */
1429 static int mgs_write_log_direct(const struct lu_env *env,
1430                                 struct mgs_device *mgs, struct fs_db *fsdb,
1431                                 char *logname, struct llog_cfg_rec *lcr,
1432                                 char *devname, char *comment)
1433 {
1434         struct llog_handle *llh = NULL;
1435         int rc;
1436
1437         ENTRY;
1438
1439         rc = record_start_log(env, mgs, &llh, logname);
1440         if (rc)
1441                 RETURN(rc);
1442
1443         /* FIXME These should be a single journal transaction */
1444         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1445         if (rc)
1446                 GOTO(out_end, rc);
1447         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1448         if (rc)
1449                 GOTO(out_end, rc);
1450         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1451         if (rc)
1452                 GOTO(out_end, rc);
1453 out_end:
1454         record_end_log(env, &llh);
1455         RETURN(rc);
1456 }
1457
1458 /* write the lcfg in all logs for the given fs */
1459 static int mgs_write_log_direct_all(const struct lu_env *env,
1460                                     struct mgs_device *mgs,
1461                                     struct fs_db *fsdb,
1462                                     struct mgs_target_info *mti,
1463                                     struct llog_cfg_rec *lcr, char *devname,
1464                                     char *comment, int server_only)
1465 {
1466         struct list_head         log_list;
1467         struct mgs_direntry     *dirent, *n;
1468         char                    *fsname = mti->mti_fsname;
1469         int                      rc = 0, len = strlen(fsname);
1470
1471         ENTRY;
1472         /* Find all the logs in the CONFIGS directory */
1473         rc = class_dentry_readdir(env, mgs, &log_list);
1474         if (rc)
1475                 RETURN(rc);
1476
1477         /* Could use fsdb index maps instead of directory listing */
1478         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1479                 list_del_init(&dirent->mde_list);
1480                 /* don't write to sptlrpc rule log */
1481                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1482                         goto next;
1483
1484                 /* caller wants write server logs only */
1485                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1486                         goto next;
1487
1488                 if (strlen(dirent->mde_name) <= len ||
1489                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1490                     dirent->mde_name[len] != '-')
1491                         goto next;
1492
1493                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1494                 /* Erase any old settings of this same parameter */
1495                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1496                                 devname, comment, CM_SKIP);
1497                 if (rc < 0)
1498                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1499                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1500                 if (lcr == NULL)
1501                         goto next;
1502                 /* Write the new one */
1503                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1504                                           lcr, devname, comment);
1505                 if (rc != 0)
1506                         CERROR("%s: writing log %s: rc = %d\n",
1507                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1508 next:
1509                 mgs_direntry_free(dirent);
1510         }
1511
1512         RETURN(rc);
1513 }
1514
1515 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1516                                     struct mgs_device *mgs,
1517                                     struct fs_db *fsdb,
1518                                     struct mgs_target_info *mti,
1519                                     int index, char *logname);
1520 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1521                                     struct mgs_device *mgs,
1522                                     struct fs_db *fsdb,
1523                                     struct mgs_target_info *mti,
1524                                     char *logname, char *suffix, char *lovname,
1525                                     enum lustre_sec_part sec_part, int flags);
1526 static int name_create_mdt_and_lov(char **logname, char **lovname,
1527                                    struct fs_db *fsdb, int i);
1528
1529 static int add_param(char *params, char *key, char *val)
1530 {
1531         char *start = params + strlen(params);
1532         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1533         int keylen = 0;
1534
1535         if (key != NULL)
1536                 keylen = strlen(key);
1537         if (start + 1 + keylen + strlen(val) >= end) {
1538                 CERROR("params are too long: %s %s%s\n",
1539                        params, key != NULL ? key : "", val);
1540                 return -EINVAL;
1541         }
1542
1543         sprintf(start, " %s%s", key != NULL ? key : "", val);
1544         return 0;
1545 }
1546
1547 /**
1548  * Walk through client config log record and convert the related records
1549  * into the target.
1550  **/
1551 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1552                                          struct llog_handle *llh,
1553                                          struct llog_rec_hdr *rec, void *data)
1554 {
1555         struct mgs_device *mgs;
1556         struct obd_device *obd;
1557         struct mgs_target_info *mti, *tmti;
1558         struct fs_db *fsdb;
1559         int cfg_len = rec->lrh_len;
1560         char *cfg_buf = (char*) (rec + 1);
1561         struct lustre_cfg *lcfg;
1562         int rc = 0;
1563         struct llog_handle *mdt_llh = NULL;
1564         static int got_an_osc_or_mdc = 0;
1565         /* 0: not found any osc/mdc;
1566            1: found osc;
1567            2: found mdc;
1568         */
1569         static int last_step = -1;
1570         int cplen = 0;
1571
1572         ENTRY;
1573
1574         mti = ((struct temp_comp*)data)->comp_mti;
1575         tmti = ((struct temp_comp*)data)->comp_tmti;
1576         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1577         obd = ((struct temp_comp *)data)->comp_obd;
1578         mgs = lu2mgs_dev(obd->obd_lu_dev);
1579         LASSERT(mgs);
1580
1581         if (rec->lrh_type != OBD_CFG_REC) {
1582                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1583                 RETURN(-EINVAL);
1584         }
1585
1586         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1587         if (rc) {
1588                 CERROR("Insane cfg\n");
1589                 RETURN(rc);
1590         }
1591
1592         lcfg = (struct lustre_cfg *)cfg_buf;
1593
1594         if (lcfg->lcfg_command == LCFG_MARKER) {
1595                 struct cfg_marker *marker;
1596                 marker = lustre_cfg_buf(lcfg, 1);
1597                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1598                     (marker->cm_flags & CM_START) &&
1599                      !(marker->cm_flags & CM_SKIP)) {
1600                         got_an_osc_or_mdc = 1;
1601                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1602                                         sizeof(tmti->mti_svname));
1603                         if (cplen >= sizeof(tmti->mti_svname))
1604                                 RETURN(-E2BIG);
1605                         rc = record_start_log(env, mgs, &mdt_llh,
1606                                               mti->mti_svname);
1607                         if (rc)
1608                                 RETURN(rc);
1609                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1610                                            mti->mti_svname, "add osc(copied)");
1611                         record_end_log(env, &mdt_llh);
1612                         last_step = marker->cm_step;
1613                         RETURN(rc);
1614                 }
1615                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1616                     (marker->cm_flags & CM_END) &&
1617                      !(marker->cm_flags & CM_SKIP)) {
1618                         LASSERT(last_step == marker->cm_step);
1619                         last_step = -1;
1620                         got_an_osc_or_mdc = 0;
1621                         memset(tmti, 0, sizeof(*tmti));
1622                         rc = record_start_log(env, mgs, &mdt_llh,
1623                                               mti->mti_svname);
1624                         if (rc)
1625                                 RETURN(rc);
1626                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1627                                            mti->mti_svname, "add osc(copied)");
1628                         record_end_log(env, &mdt_llh);
1629                         RETURN(rc);
1630                 }
1631                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1632                     (marker->cm_flags & CM_START) &&
1633                      !(marker->cm_flags & CM_SKIP)) {
1634                         got_an_osc_or_mdc = 2;
1635                         last_step = marker->cm_step;
1636                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1637                                strlen(marker->cm_tgtname));
1638
1639                         RETURN(rc);
1640                 }
1641                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1642                     (marker->cm_flags & CM_END) &&
1643                      !(marker->cm_flags & CM_SKIP)) {
1644                         LASSERT(last_step == marker->cm_step);
1645                         last_step = -1;
1646                         got_an_osc_or_mdc = 0;
1647                         memset(tmti, 0, sizeof(*tmti));
1648                         RETURN(rc);
1649                 }
1650         }
1651
1652         if (got_an_osc_or_mdc == 0 || last_step < 0)
1653                 RETURN(rc);
1654
1655         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1656                 __u64 nodenid = lcfg->lcfg_nid;
1657
1658                 if (strlen(tmti->mti_uuid) == 0) {
1659                         /* target uuid not set, this config record is before
1660                          * LCFG_SETUP, this nid is one of target node nid.
1661                          */
1662                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1663                         tmti->mti_nid_count++;
1664                 } else {
1665                         char nidstr[LNET_NIDSTR_SIZE];
1666
1667                         /* failover node nid */
1668                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1669                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1670                                         nidstr);
1671                 }
1672
1673                 RETURN(rc);
1674         }
1675
1676         if (lcfg->lcfg_command == LCFG_SETUP) {
1677                 char *target;
1678
1679                 target = lustre_cfg_string(lcfg, 1);
1680                 memcpy(tmti->mti_uuid, target, strlen(target));
1681                 RETURN(rc);
1682         }
1683
1684         /* ignore client side sptlrpc_conf_log */
1685         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1686                 RETURN(rc);
1687
1688         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1689                 int index;
1690
1691                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1692                         RETURN (-EINVAL);
1693
1694                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1695                        strlen(mti->mti_fsname));
1696                 tmti->mti_stripe_index = index;
1697
1698                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1699                                               mti->mti_stripe_index,
1700                                               mti->mti_svname);
1701                 memset(tmti, 0, sizeof(*tmti));
1702                 RETURN(rc);
1703         }
1704
1705         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1706                 int index;
1707                 char mdt_index[9];
1708                 char *logname, *lovname;
1709
1710                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1711                                              mti->mti_stripe_index);
1712                 if (rc)
1713                         RETURN(rc);
1714                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1715
1716                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1717                         name_destroy(&logname);
1718                         name_destroy(&lovname);
1719                         RETURN(-EINVAL);
1720                 }
1721
1722                 tmti->mti_stripe_index = index;
1723                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1724                                          mdt_index, lovname,
1725                                          LUSTRE_SP_MDT, 0);
1726                 name_destroy(&logname);
1727                 name_destroy(&lovname);
1728                 RETURN(rc);
1729         }
1730         RETURN(rc);
1731 }
1732
1733 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1734 /* stealed from mgs_get_fsdb_from_llog*/
1735 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1736                                               struct mgs_device *mgs,
1737                                               char *client_name,
1738                                               struct temp_comp* comp)
1739 {
1740         struct llog_handle *loghandle;
1741         struct mgs_target_info *tmti;
1742         struct llog_ctxt *ctxt;
1743         int rc;
1744
1745         ENTRY;
1746
1747         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1748         LASSERT(ctxt != NULL);
1749
1750         OBD_ALLOC_PTR(tmti);
1751         if (tmti == NULL)
1752                 GOTO(out_ctxt, rc = -ENOMEM);
1753
1754         comp->comp_tmti = tmti;
1755         comp->comp_obd = mgs->mgs_obd;
1756
1757         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1758                        LLOG_OPEN_EXISTS);
1759         if (rc < 0) {
1760                 if (rc == -ENOENT)
1761                         rc = 0;
1762                 GOTO(out_pop, rc);
1763         }
1764
1765         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1766         if (rc)
1767                 GOTO(out_close, rc);
1768
1769         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1770                                   (void *)comp, NULL, false);
1771         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1772 out_close:
1773         llog_close(env, loghandle);
1774 out_pop:
1775         OBD_FREE_PTR(tmti);
1776 out_ctxt:
1777         llog_ctxt_put(ctxt);
1778         RETURN(rc);
1779 }
1780
1781 /* lmv is the second thing for client logs */
1782 /* copied from mgs_write_log_lov. Please refer to that.  */
1783 static int mgs_write_log_lmv(const struct lu_env *env,
1784                              struct mgs_device *mgs,
1785                              struct fs_db *fsdb,
1786                              struct mgs_target_info *mti,
1787                              char *logname, char *lmvname)
1788 {
1789         struct llog_handle *llh = NULL;
1790         struct lmv_desc *lmvdesc;
1791         char *uuid;
1792         int rc = 0;
1793         ENTRY;
1794
1795         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1796
1797         OBD_ALLOC_PTR(lmvdesc);
1798         if (lmvdesc == NULL)
1799                 RETURN(-ENOMEM);
1800         lmvdesc->ld_active_tgt_count = 0;
1801         lmvdesc->ld_tgt_count = 0;
1802         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1803         uuid = (char *)lmvdesc->ld_uuid.uuid;
1804
1805         rc = record_start_log(env, mgs, &llh, logname);
1806         if (rc)
1807                 GOTO(out_free, rc);
1808         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1809         if (rc)
1810                 GOTO(out_end, rc);
1811         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1812         if (rc)
1813                 GOTO(out_end, rc);
1814         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1815         if (rc)
1816                 GOTO(out_end, rc);
1817         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1818         if (rc)
1819                 GOTO(out_end, rc);
1820 out_end:
1821         record_end_log(env, &llh);
1822 out_free:
1823         OBD_FREE_PTR(lmvdesc);
1824         RETURN(rc);
1825 }
1826
1827 /* lov is the first thing in the mdt and client logs */
1828 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1829                              struct fs_db *fsdb, struct mgs_target_info *mti,
1830                              char *logname, char *lovname)
1831 {
1832         struct llog_handle *llh = NULL;
1833         struct lov_desc *lovdesc;
1834         char *uuid;
1835         int rc = 0;
1836         ENTRY;
1837
1838         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1839
1840         /*
1841         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1842         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1843               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1844         */
1845
1846         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1847         OBD_ALLOC_PTR(lovdesc);
1848         if (lovdesc == NULL)
1849                 RETURN(-ENOMEM);
1850         lovdesc->ld_magic = LOV_DESC_MAGIC;
1851         lovdesc->ld_tgt_count = 0;
1852         /* Defaults.  Can be changed later by lcfg config_param */
1853         lovdesc->ld_default_stripe_count = 1;
1854         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1855         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1856         lovdesc->ld_default_stripe_offset = -1;
1857         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1858         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1859         /* can these be the same? */
1860         uuid = (char *)lovdesc->ld_uuid.uuid;
1861
1862         /* This should always be the first entry in a log.
1863         rc = mgs_clear_log(obd, logname); */
1864         rc = record_start_log(env, mgs, &llh, logname);
1865         if (rc)
1866                 GOTO(out_free, rc);
1867         /* FIXME these should be a single journal transaction */
1868         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1869         if (rc)
1870                 GOTO(out_end, rc);
1871         rc = record_attach(env, llh, lovname, "lov", uuid);
1872         if (rc)
1873                 GOTO(out_end, rc);
1874         rc = record_lov_setup(env, llh, lovname, lovdesc);
1875         if (rc)
1876                 GOTO(out_end, rc);
1877         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1878         if (rc)
1879                 GOTO(out_end, rc);
1880         EXIT;
1881 out_end:
1882         record_end_log(env, &llh);
1883 out_free:
1884         OBD_FREE_PTR(lovdesc);
1885         return rc;
1886 }
1887
1888 /* add failnids to open log */
1889 static int mgs_write_log_failnids(const struct lu_env *env,
1890                                   struct mgs_target_info *mti,
1891                                   struct llog_handle *llh,
1892                                   char *cliname)
1893 {
1894         char *failnodeuuid = NULL;
1895         char *ptr = mti->mti_params;
1896         lnet_nid_t nid;
1897         int rc = 0;
1898
1899         /*
1900         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1901         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1902         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1903         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1904         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1905         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1906         */
1907
1908         /*
1909          * Pull failnid info out of params string, which may contain something
1910          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1911          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1912          * etc.  However, convert_hostnames() should have caught those.
1913          */
1914         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1915                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1916                         char nidstr[LNET_NIDSTR_SIZE];
1917
1918                         if (failnodeuuid == NULL) {
1919                                 /* We don't know the failover node name,
1920                                  * so just use the first nid as the uuid */
1921                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
1922                                 rc = name_create(&failnodeuuid, nidstr, "");
1923                                 if (rc != 0)
1924                                         return rc;
1925                         }
1926                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1927                                 "client %s\n",
1928                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
1929                                 failnodeuuid, cliname);
1930                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1931                         /*
1932                          * If *ptr is ':', we have added all NIDs for
1933                          * failnodeuuid.
1934                          */
1935                         if (*ptr == ':') {
1936                                 rc = record_add_conn(env, llh, cliname,
1937                                                      failnodeuuid);
1938                                 name_destroy(&failnodeuuid);
1939                                 failnodeuuid = NULL;
1940                         }
1941                 }
1942                 if (failnodeuuid) {
1943                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1944                         name_destroy(&failnodeuuid);
1945                         failnodeuuid = NULL;
1946                 }
1947         }
1948
1949         return rc;
1950 }
1951
1952 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1953                                     struct mgs_device *mgs,
1954                                     struct fs_db *fsdb,
1955                                     struct mgs_target_info *mti,
1956                                     char *logname, char *lmvname)
1957 {
1958         struct llog_handle *llh = NULL;
1959         char *mdcname = NULL;
1960         char *nodeuuid = NULL;
1961         char *mdcuuid = NULL;
1962         char *lmvuuid = NULL;
1963         char index[6];
1964         char nidstr[LNET_NIDSTR_SIZE];
1965         int i, rc;
1966         ENTRY;
1967
1968         if (mgs_log_is_empty(env, mgs, logname)) {
1969                 CERROR("log is empty! Logical error\n");
1970                 RETURN(-EINVAL);
1971         }
1972
1973         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1974                mti->mti_svname, logname, lmvname);
1975
1976         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
1977         rc = name_create(&nodeuuid, nidstr, "");
1978         if (rc)
1979                 RETURN(rc);
1980         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1981         if (rc)
1982                 GOTO(out_free, rc);
1983         rc = name_create(&mdcuuid, mdcname, "_UUID");
1984         if (rc)
1985                 GOTO(out_free, rc);
1986         rc = name_create(&lmvuuid, lmvname, "_UUID");
1987         if (rc)
1988                 GOTO(out_free, rc);
1989
1990         rc = record_start_log(env, mgs, &llh, logname);
1991         if (rc)
1992                 GOTO(out_free, rc);
1993         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1994                            "add mdc");
1995         if (rc)
1996                 GOTO(out_end, rc);
1997         for (i = 0; i < mti->mti_nid_count; i++) {
1998                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1999                         libcfs_nid2str_r(mti->mti_nids[i],
2000                                          nidstr, sizeof(nidstr)));
2001
2002                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2003                 if (rc)
2004                         GOTO(out_end, rc);
2005         }
2006
2007         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2008         if (rc)
2009                 GOTO(out_end, rc);
2010         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2011                           NULL, NULL);
2012         if (rc)
2013                 GOTO(out_end, rc);
2014         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2015         if (rc)
2016                 GOTO(out_end, rc);
2017         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2018         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2019                             index, "1");
2020         if (rc)
2021                 GOTO(out_end, rc);
2022         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2023                            "add mdc");
2024         if (rc)
2025                 GOTO(out_end, rc);
2026 out_end:
2027         record_end_log(env, &llh);
2028 out_free:
2029         name_destroy(&lmvuuid);
2030         name_destroy(&mdcuuid);
2031         name_destroy(&mdcname);
2032         name_destroy(&nodeuuid);
2033         RETURN(rc);
2034 }
2035
2036 static inline int name_create_lov(char **lovname, char *mdtname,
2037                                   struct fs_db *fsdb, int index)
2038 {
2039         /* COMPAT_180 */
2040         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2041                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2042         else
2043                 return name_create(lovname, mdtname, "-mdtlov");
2044 }
2045
2046 static int name_create_mdt_and_lov(char **logname, char **lovname,
2047                                    struct fs_db *fsdb, int i)
2048 {
2049         int rc;
2050
2051         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2052         if (rc)
2053                 return rc;
2054         /* COMPAT_180 */
2055         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2056                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2057         else
2058                 rc = name_create(lovname, *logname, "-mdtlov");
2059         if (rc) {
2060                 name_destroy(logname);
2061                 *logname = NULL;
2062         }
2063         return rc;
2064 }
2065
2066 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2067                                       struct fs_db *fsdb, int i)
2068 {
2069         char suffix[16];
2070
2071         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2072                 sprintf(suffix, "-osc");
2073         else
2074                 sprintf(suffix, "-osc-MDT%04x", i);
2075         return name_create(oscname, ostname, suffix);
2076 }
2077
2078 /* add new mdc to already existent MDS */
2079 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2080                                     struct mgs_device *mgs,
2081                                     struct fs_db *fsdb,
2082                                     struct mgs_target_info *mti,
2083                                     int mdt_index, char *logname)
2084 {
2085         struct llog_handle      *llh = NULL;
2086         char    *nodeuuid = NULL;
2087         char    *ospname = NULL;
2088         char    *lovuuid = NULL;
2089         char    *mdtuuid = NULL;
2090         char    *svname = NULL;
2091         char    *mdtname = NULL;
2092         char    *lovname = NULL;
2093         char    index_str[16];
2094         char    nidstr[LNET_NIDSTR_SIZE];
2095         int     i, rc;
2096
2097         ENTRY;
2098         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2099                 CERROR("log is empty! Logical error\n");
2100                 RETURN (-EINVAL);
2101         }
2102
2103         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2104                logname);
2105
2106         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2107         if (rc)
2108                 RETURN(rc);
2109
2110         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2111         rc = name_create(&nodeuuid, nidstr, "");
2112         if (rc)
2113                 GOTO(out_destory, rc);
2114
2115         rc = name_create(&svname, mdtname, "-osp");
2116         if (rc)
2117                 GOTO(out_destory, rc);
2118
2119         sprintf(index_str, "-MDT%04x", mdt_index);
2120         rc = name_create(&ospname, svname, index_str);
2121         if (rc)
2122                 GOTO(out_destory, rc);
2123
2124         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2125         if (rc)
2126                 GOTO(out_destory, rc);
2127
2128         rc = name_create(&lovuuid, lovname, "_UUID");
2129         if (rc)
2130                 GOTO(out_destory, rc);
2131
2132         rc = name_create(&mdtuuid, mdtname, "_UUID");
2133         if (rc)
2134                 GOTO(out_destory, rc);
2135
2136         rc = record_start_log(env, mgs, &llh, logname);
2137         if (rc)
2138                 GOTO(out_destory, rc);
2139
2140         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2141                            "add osp");
2142         if (rc)
2143                 GOTO(out_destory, rc);
2144
2145         for (i = 0; i < mti->mti_nid_count; i++) {
2146                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2147                         libcfs_nid2str_r(mti->mti_nids[i],
2148                                          nidstr, sizeof(nidstr)));
2149                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2150                 if (rc)
2151                         GOTO(out_end, rc);
2152         }
2153
2154         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2155         if (rc)
2156                 GOTO(out_end, rc);
2157
2158         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2159                           NULL, NULL);
2160         if (rc)
2161                 GOTO(out_end, rc);
2162
2163         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2164         if (rc)
2165                 GOTO(out_end, rc);
2166
2167         /* Add mdc(osp) to lod */
2168         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2169         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2170                          index_str, "1", NULL);
2171         if (rc)
2172                 GOTO(out_end, rc);
2173
2174         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2175         if (rc)
2176                 GOTO(out_end, rc);
2177
2178 out_end:
2179         record_end_log(env, &llh);
2180
2181 out_destory:
2182         name_destroy(&mdtuuid);
2183         name_destroy(&lovuuid);
2184         name_destroy(&lovname);
2185         name_destroy(&ospname);
2186         name_destroy(&svname);
2187         name_destroy(&nodeuuid);
2188         name_destroy(&mdtname);
2189         RETURN(rc);
2190 }
2191
2192 static int mgs_write_log_mdt0(const struct lu_env *env,
2193                               struct mgs_device *mgs,
2194                               struct fs_db *fsdb,
2195                               struct mgs_target_info *mti)
2196 {
2197         char *log = mti->mti_svname;
2198         struct llog_handle *llh = NULL;
2199         char *uuid, *lovname;
2200         char mdt_index[6];
2201         char *ptr = mti->mti_params;
2202         int rc = 0, failout = 0;
2203         ENTRY;
2204
2205         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2206         if (uuid == NULL)
2207                 RETURN(-ENOMEM);
2208
2209         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2210                 failout = (strncmp(ptr, "failout", 7) == 0);
2211
2212         rc = name_create(&lovname, log, "-mdtlov");
2213         if (rc)
2214                 GOTO(out_free, rc);
2215         if (mgs_log_is_empty(env, mgs, log)) {
2216                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2217                 if (rc)
2218                         GOTO(out_lod, rc);
2219         }
2220
2221         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2222
2223         rc = record_start_log(env, mgs, &llh, log);
2224         if (rc)
2225                 GOTO(out_lod, rc);
2226
2227         /* add MDT itself */
2228
2229         /* FIXME this whole fn should be a single journal transaction */
2230         sprintf(uuid, "%s_UUID", log);
2231         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2232         if (rc)
2233                 GOTO(out_lod, rc);
2234         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2235         if (rc)
2236                 GOTO(out_end, rc);
2237         rc = record_mount_opt(env, llh, log, lovname, NULL);
2238         if (rc)
2239                 GOTO(out_end, rc);
2240         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2241                         failout ? "n" : "f");
2242         if (rc)
2243                 GOTO(out_end, rc);
2244         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2245         if (rc)
2246                 GOTO(out_end, rc);
2247 out_end:
2248         record_end_log(env, &llh);
2249 out_lod:
2250         name_destroy(&lovname);
2251 out_free:
2252         OBD_FREE(uuid, sizeof(struct obd_uuid));
2253         RETURN(rc);
2254 }
2255
2256 /* envelope method for all layers log */
2257 static int mgs_write_log_mdt(const struct lu_env *env,
2258                              struct mgs_device *mgs,
2259                              struct fs_db *fsdb,
2260                              struct mgs_target_info *mti)
2261 {
2262         struct mgs_thread_info *mgi = mgs_env_info(env);
2263         struct llog_handle *llh = NULL;
2264         char *cliname;
2265         int rc, i = 0;
2266         ENTRY;
2267
2268         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2269
2270         if (mti->mti_uuid[0] == '\0') {
2271                 /* Make up our own uuid */
2272                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2273                          "%s_UUID", mti->mti_svname);
2274         }
2275
2276         /* add mdt */
2277         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2278         if (rc)
2279                 RETURN(rc);
2280         /* Append the mdt info to the client log */
2281         rc = name_create(&cliname, mti->mti_fsname, "-client");
2282         if (rc)
2283                 RETURN(rc);
2284
2285         if (mgs_log_is_empty(env, mgs, cliname)) {
2286                 /* Start client log */
2287                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2288                                        fsdb->fsdb_clilov);
2289                 if (rc)
2290                         GOTO(out_free, rc);
2291                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2292                                        fsdb->fsdb_clilmv);
2293                 if (rc)
2294                         GOTO(out_free, rc);
2295         }
2296
2297         /*
2298         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2299         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2300         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2301         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2302         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2303         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2304         */
2305
2306         /* copy client info about lov/lmv */
2307         mgi->mgi_comp.comp_mti = mti;
2308         mgi->mgi_comp.comp_fsdb = fsdb;
2309
2310         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2311                                                 &mgi->mgi_comp);
2312         if (rc)
2313                 GOTO(out_free, rc);
2314         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2315                                       fsdb->fsdb_clilmv);
2316         if (rc)
2317                 GOTO(out_free, rc);
2318
2319         /* add mountopts */
2320         rc = record_start_log(env, mgs, &llh, cliname);
2321         if (rc)
2322                 GOTO(out_free, rc);
2323
2324         rc = record_marker(env, llh, fsdb, CM_START, cliname,
2325                            "mount opts");
2326         if (rc)
2327                 GOTO(out_end, rc);
2328         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2329                               fsdb->fsdb_clilmv);
2330         if (rc)
2331                 GOTO(out_end, rc);
2332         rc = record_marker(env, llh, fsdb, CM_END, cliname,
2333                            "mount opts");
2334
2335         if (rc)
2336                 GOTO(out_end, rc);
2337
2338         /* for_all_existing_mdt except current one */
2339         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2340                 if (i !=  mti->mti_stripe_index &&
2341                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2342                         char *logname;
2343
2344                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2345                         if (rc)
2346                                 GOTO(out_end, rc);
2347
2348                         /* NB: If the log for the MDT is empty, it means
2349                          * the MDT is only added to the index
2350                          * map, and not being process yet, i.e. this
2351                          * is an unregistered MDT, see mgs_write_log_target().
2352                          * so we should skip it. Otherwise
2353                          *
2354                          * 1. MGS get register request for MDT1 and MDT2.
2355                          *
2356                          * 2. Then both MDT1 and MDT2 are added into
2357                          * fsdb_mdt_index_map. (see mgs_set_index()).
2358                          *
2359                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2360                          * generate the config log, here, it will regard MDT2
2361                          * as an existent MDT, and generate "add osp" for
2362                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2363                          * MDT0002 config log is still empty, so it will
2364                          * add "add osp" even before "lov setup", which
2365                          * will definitly cause trouble.
2366                          *
2367                          * 4. MDT1 registeration finished, fsdb_mutex is
2368                          * released, then MDT2 get in, then in above
2369                          * mgs_steal_llog_for_mdt_from_client(), it will
2370                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2371                          * which will cause another trouble.*/
2372                         if (!mgs_log_is_empty(env, mgs, logname))
2373                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2374                                                               mti, i, logname);
2375
2376                         name_destroy(&logname);
2377                         if (rc)
2378                                 GOTO(out_end, rc);
2379                 }
2380         }
2381 out_end:
2382         record_end_log(env, &llh);
2383 out_free:
2384         name_destroy(&cliname);
2385         RETURN(rc);
2386 }
2387
2388 /* Add the ost info to the client/mdt lov */
2389 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2390                                     struct mgs_device *mgs, struct fs_db *fsdb,
2391                                     struct mgs_target_info *mti,
2392                                     char *logname, char *suffix, char *lovname,
2393                                     enum lustre_sec_part sec_part, int flags)
2394 {
2395         struct llog_handle *llh = NULL;
2396         char *nodeuuid = NULL;
2397         char *oscname = NULL;
2398         char *oscuuid = NULL;
2399         char *lovuuid = NULL;
2400         char *svname = NULL;
2401         char index[6];
2402         char nidstr[LNET_NIDSTR_SIZE];
2403         int i, rc;
2404         ENTRY;
2405
2406         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2407                 mti->mti_svname, logname);
2408
2409         if (mgs_log_is_empty(env, mgs, logname)) {
2410                 CERROR("log is empty! Logical error\n");
2411                 RETURN(-EINVAL);
2412         }
2413
2414         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2415         rc = name_create(&nodeuuid, nidstr, "");
2416         if (rc)
2417                 RETURN(rc);
2418         rc = name_create(&svname, mti->mti_svname, "-osc");
2419         if (rc)
2420                 GOTO(out_free, rc);
2421
2422         /* for the system upgraded from old 1.8, keep using the old osc naming
2423          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2424         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2425                 rc = name_create(&oscname, svname, "");
2426         else
2427                 rc = name_create(&oscname, svname, suffix);
2428         if (rc)
2429                 GOTO(out_free, rc);
2430
2431         rc = name_create(&oscuuid, oscname, "_UUID");
2432         if (rc)
2433                 GOTO(out_free, rc);
2434         rc = name_create(&lovuuid, lovname, "_UUID");
2435         if (rc)
2436                 GOTO(out_free, rc);
2437
2438
2439         /*
2440         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2441         multihomed (#4)
2442         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2443         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2444         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2445         failover (#6,7)
2446         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2447         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2448         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2449         */
2450
2451         rc = record_start_log(env, mgs, &llh, logname);
2452         if (rc)
2453                 GOTO(out_free, rc);
2454
2455         /* FIXME these should be a single journal transaction */
2456         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2457                            "add osc");
2458         if (rc)
2459                 GOTO(out_end, rc);
2460
2461         /* NB: don't change record order, because upon MDT steal OSC config
2462          * from client, it treats all nids before LCFG_SETUP as target nids
2463          * (multiple interfaces), while nids after as failover node nids.
2464          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2465          */
2466         for (i = 0; i < mti->mti_nid_count; i++) {
2467                 CDEBUG(D_MGS, "add nid %s\n",
2468                         libcfs_nid2str_r(mti->mti_nids[i],
2469                                          nidstr, sizeof(nidstr)));
2470                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2471                 if (rc)
2472                         GOTO(out_end, rc);
2473         }
2474         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2475         if (rc)
2476                 GOTO(out_end, rc);
2477         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2478                           NULL, NULL);
2479         if (rc)
2480                 GOTO(out_end, rc);
2481         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2482         if (rc)
2483                 GOTO(out_end, rc);
2484
2485         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2486
2487         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2488         if (rc)
2489                 GOTO(out_end, rc);
2490         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2491                            "add osc");
2492         if (rc)
2493                 GOTO(out_end, rc);
2494 out_end:
2495         record_end_log(env, &llh);
2496 out_free:
2497         name_destroy(&lovuuid);
2498         name_destroy(&oscuuid);
2499         name_destroy(&oscname);
2500         name_destroy(&svname);
2501         name_destroy(&nodeuuid);
2502         RETURN(rc);
2503 }
2504
2505 static int mgs_write_log_ost(const struct lu_env *env,
2506                              struct mgs_device *mgs, struct fs_db *fsdb,
2507                              struct mgs_target_info *mti)
2508 {
2509         struct llog_handle *llh = NULL;
2510         char *logname, *lovname;
2511         char *ptr = mti->mti_params;
2512         int rc, flags = 0, failout = 0, i;
2513         ENTRY;
2514
2515         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2516
2517         /* The ost startup log */
2518
2519         /* If the ost log already exists, that means that someone reformatted
2520            the ost and it called target_add again. */
2521         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2522                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2523                                    "exists, yet the server claims it never "
2524                                    "registered. It may have been reformatted, "
2525                                    "or the index changed. writeconf the MDT to "
2526                                    "regenerate all logs.\n", mti->mti_svname);
2527                 RETURN(-EALREADY);
2528         }
2529
2530         /*
2531         attach obdfilter ost1 ost1_UUID
2532         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2533         */
2534         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2535                 failout = (strncmp(ptr, "failout", 7) == 0);
2536         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2537         if (rc)
2538                 RETURN(rc);
2539         /* FIXME these should be a single journal transaction */
2540         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2541         if (rc)
2542                 GOTO(out_end, rc);
2543         if (*mti->mti_uuid == '\0')
2544                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2545                          "%s_UUID", mti->mti_svname);
2546         rc = record_attach(env, llh, mti->mti_svname,
2547                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2548         if (rc)
2549                 GOTO(out_end, rc);
2550         rc = record_setup(env, llh, mti->mti_svname,
2551                           "dev"/*ignored*/, "type"/*ignored*/,
2552                           failout ? "n" : "f", NULL/*options*/);
2553         if (rc)
2554                 GOTO(out_end, rc);
2555         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2556         if (rc)
2557                 GOTO(out_end, rc);
2558 out_end:
2559         record_end_log(env, &llh);
2560         if (rc)
2561                 RETURN(rc);
2562         /* We also have to update the other logs where this osc is part of
2563            the lov */
2564
2565         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2566                 /* If we're upgrading, the old mdt log already has our
2567                    entry. Let's do a fake one for fun. */
2568                 /* Note that we can't add any new failnids, since we don't
2569                    know the old osc names. */
2570                 flags = CM_SKIP | CM_UPGRADE146;
2571
2572         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2573                 /* If the update flag isn't set, don't update client/mdt
2574                    logs. */
2575                 flags |= CM_SKIP;
2576                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2577                               "the MDT first to regenerate it.\n",
2578                               mti->mti_svname);
2579         }
2580
2581         /* Add ost to all MDT lov defs */
2582         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2583                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2584                         char mdt_index[9];
2585
2586                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2587                                                      i);
2588                         if (rc)
2589                                 RETURN(rc);
2590                         sprintf(mdt_index, "-MDT%04x", i);
2591                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2592                                                       logname, mdt_index,
2593                                                       lovname, LUSTRE_SP_MDT,
2594                                                       flags);
2595                         name_destroy(&logname);
2596                         name_destroy(&lovname);
2597                         if (rc)
2598                                 RETURN(rc);
2599                 }
2600         }
2601
2602         /* Append ost info to the client log */
2603         rc = name_create(&logname, mti->mti_fsname, "-client");
2604         if (rc)
2605                 RETURN(rc);
2606         if (mgs_log_is_empty(env, mgs, logname)) {
2607                 /* Start client log */
2608                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2609                                        fsdb->fsdb_clilov);
2610                 if (rc)
2611                         GOTO(out_free, rc);
2612                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2613                                        fsdb->fsdb_clilmv);
2614                 if (rc)
2615                         GOTO(out_free, rc);
2616         }
2617         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2618                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
2619 out_free:
2620         name_destroy(&logname);
2621         RETURN(rc);
2622 }
2623
2624 static __inline__ int mgs_param_empty(char *ptr)
2625 {
2626         char *tmp;
2627
2628         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2629                 return 1;
2630         return 0;
2631 }
2632
2633 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2634                                           struct mgs_device *mgs,
2635                                           struct fs_db *fsdb,
2636                                           struct mgs_target_info *mti,
2637                                           char *logname, char *cliname)
2638 {
2639         int rc;
2640         struct llog_handle *llh = NULL;
2641
2642         if (mgs_param_empty(mti->mti_params)) {
2643                 /* Remove _all_ failnids */
2644                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2645                                 mti->mti_svname, "add failnid", CM_SKIP);
2646                 return rc < 0 ? rc : 0;
2647         }
2648
2649         /* Otherwise failover nids are additive */
2650         rc = record_start_log(env, mgs, &llh, logname);
2651         if (rc)
2652                 return rc;
2653                 /* FIXME this should be a single journal transaction */
2654         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2655                            "add failnid");
2656         if (rc)
2657                 goto out_end;
2658         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2659         if (rc)
2660                 goto out_end;
2661         rc = record_marker(env, llh, fsdb, CM_END,
2662                            mti->mti_svname, "add failnid");
2663 out_end:
2664         record_end_log(env, &llh);
2665         return rc;
2666 }
2667
2668
2669 /* Add additional failnids to an existing log.
2670    The mdc/osc must have been added to logs first */
2671 /* tcp nids must be in dotted-quad ascii -
2672    we can't resolve hostnames from the kernel. */
2673 static int mgs_write_log_add_failnid(const struct lu_env *env,
2674                                      struct mgs_device *mgs,
2675                                      struct fs_db *fsdb,
2676                                      struct mgs_target_info *mti)
2677 {
2678         char *logname, *cliname;
2679         int rc;
2680         ENTRY;
2681
2682         /* FIXME we currently can't erase the failnids
2683          * given when a target first registers, since they aren't part of
2684          * an "add uuid" stanza */
2685
2686         /* Verify that we know about this target */
2687         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2688                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2689                                    "yet. It must be started before failnids "
2690                                    "can be added.\n", mti->mti_svname);
2691                 RETURN(-ENOENT);
2692         }
2693
2694         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2695         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2696                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2697         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2698                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2699         } else {
2700                 RETURN(-EINVAL);
2701         }
2702         if (rc)
2703                 RETURN(rc);
2704         /* Add failover nids to the client log */
2705         rc = name_create(&logname, mti->mti_fsname, "-client");
2706         if (rc) {
2707                 name_destroy(&cliname);
2708                 RETURN(rc);
2709         }
2710         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2711         name_destroy(&logname);
2712         name_destroy(&cliname);
2713         if (rc)
2714                 RETURN(rc);
2715
2716         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2717                 /* Add OST failover nids to the MDT logs as well */
2718                 int i;
2719
2720                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2721                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2722                                 continue;
2723                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2724                         if (rc)
2725                                 RETURN(rc);
2726                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2727                                                  fsdb, i);
2728                         if (rc) {
2729                                 name_destroy(&logname);
2730                                 RETURN(rc);
2731                         }
2732                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2733                                                             mti, logname,
2734                                                             cliname);
2735                         name_destroy(&cliname);
2736                         name_destroy(&logname);
2737                         if (rc)
2738                                 RETURN(rc);
2739                 }
2740         }
2741
2742         RETURN(rc);
2743 }
2744
2745 static int mgs_wlp_lcfg(const struct lu_env *env,
2746                         struct mgs_device *mgs, struct fs_db *fsdb,
2747                         struct mgs_target_info *mti,
2748                         char *logname, struct lustre_cfg_bufs *bufs,
2749                         char *tgtname, char *ptr)
2750 {
2751         char comment[MTI_NAME_MAXLEN];
2752         char *tmp;
2753         struct llog_cfg_rec *lcr;
2754         int rc, del;
2755
2756         /* Erase any old settings of this same parameter */
2757         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2758         comment[MTI_NAME_MAXLEN - 1] = 0;
2759         /* But don't try to match the value. */
2760         tmp = strchr(comment, '=');
2761         if (tmp != NULL)
2762                 *tmp = 0;
2763         /* FIXME we should skip settings that are the same as old values */
2764         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2765         if (rc < 0)
2766                 return rc;
2767         del = mgs_param_empty(ptr);
2768
2769         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2770                       "Setting" : "Modifying", tgtname, comment, logname);
2771         if (del) {
2772                 /* mgs_modify() will return 1 if nothing had to be done */
2773                 if (rc == 1)
2774                         rc = 0;
2775                 return rc;
2776         }
2777
2778         lustre_cfg_bufs_reset(bufs, tgtname);
2779         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2780         if (mti->mti_flags & LDD_F_PARAM2)
2781                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2782
2783         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2784                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2785         if (lcr == NULL)
2786                 return -ENOMEM;
2787
2788         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2789                                   comment);
2790         lustre_cfg_rec_free(lcr);
2791         return rc;
2792 }
2793
2794 static int mgs_write_log_param2(const struct lu_env *env,
2795                                 struct mgs_device *mgs,
2796                                 struct fs_db *fsdb,
2797                                 struct mgs_target_info *mti, char *ptr)
2798 {
2799         struct lustre_cfg_bufs  bufs;
2800         int                     rc = 0;
2801         ENTRY;
2802
2803         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2804         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2805                           mti->mti_svname, ptr);
2806
2807         RETURN(rc);
2808 }
2809
2810 /* write global variable settings into log */
2811 static int mgs_write_log_sys(const struct lu_env *env,
2812                              struct mgs_device *mgs, struct fs_db *fsdb,
2813                              struct mgs_target_info *mti, char *sys, char *ptr)
2814 {
2815         struct mgs_thread_info  *mgi = mgs_env_info(env);
2816         struct lustre_cfg       *lcfg;
2817         struct llog_cfg_rec     *lcr;
2818         char *tmp, sep;
2819         int rc, cmd, convert = 1;
2820
2821         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2822                 cmd = LCFG_SET_TIMEOUT;
2823         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2824                 cmd = LCFG_SET_LDLM_TIMEOUT;
2825         /* Check for known params here so we can return error to lctl */
2826         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2827                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2828                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2829                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2830                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2831                 cmd = LCFG_PARAM;
2832         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2833                 convert = 0; /* Don't convert string value to integer */
2834                 cmd = LCFG_PARAM;
2835         } else {
2836                 return -EINVAL;
2837         }
2838
2839         if (mgs_param_empty(ptr))
2840                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2841         else
2842                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2843
2844         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2845         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2846         if (!convert && *tmp != '\0')
2847                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2848         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2849         if (lcr == NULL)
2850                 return -ENOMEM;
2851
2852         lcfg = &lcr->lcr_cfg;
2853         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2854         /* truncate the comment to the parameter name */
2855         ptr = tmp - 1;
2856         sep = *ptr;
2857         *ptr = '\0';
2858         /* modify all servers and clients */
2859         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2860                                       *tmp == '\0' ? NULL : lcr,
2861                                       mti->mti_fsname, sys, 0);
2862         if (rc == 0 && *tmp != '\0') {
2863                 switch (cmd) {
2864                 case LCFG_SET_TIMEOUT:
2865                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2866                                 class_process_config(lcfg);
2867                         break;
2868                 case LCFG_SET_LDLM_TIMEOUT:
2869                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2870                                 class_process_config(lcfg);
2871                         break;
2872                 default:
2873                         break;
2874                 }
2875         }
2876         *ptr = sep;
2877         lustre_cfg_rec_free(lcr);
2878         return rc;
2879 }
2880
2881 /* write quota settings into log */
2882 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2883                                struct fs_db *fsdb, struct mgs_target_info *mti,
2884                                char *quota, char *ptr)
2885 {
2886         struct mgs_thread_info  *mgi = mgs_env_info(env);
2887         struct llog_cfg_rec     *lcr;
2888         char                    *tmp;
2889         char                     sep;
2890         int                      rc, cmd = LCFG_PARAM;
2891
2892         /* support only 'meta' and 'data' pools so far */
2893         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2894             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2895                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2896                        "& quota.ost are)\n", ptr);
2897                 return -EINVAL;
2898         }
2899
2900         if (*tmp == '\0') {
2901                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2902         } else {
2903                 CDEBUG(D_MGS, "global '%s'\n", quota);
2904
2905                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2906                     strcmp(tmp, "none") != 0) {
2907                         CERROR("enable option(%s) isn't supported\n", tmp);
2908                         return -EINVAL;
2909                 }
2910         }
2911
2912         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2913         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2914         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2915         if (lcr == NULL)
2916                 return -ENOMEM;
2917
2918         /* truncate the comment to the parameter name */
2919         ptr = tmp - 1;
2920         sep = *ptr;
2921         *ptr = '\0';
2922
2923         /* XXX we duplicated quota enable information in all server
2924          *     config logs, it should be moved to a separate config
2925          *     log once we cleanup the config log for global param. */
2926         /* modify all servers */
2927         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2928                                       *tmp == '\0' ? NULL : lcr,
2929                                       mti->mti_fsname, quota, 1);
2930         *ptr = sep;
2931         lustre_cfg_rec_free(lcr);
2932         return rc < 0 ? rc : 0;
2933 }
2934
2935 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2936                                    struct mgs_device *mgs,
2937                                    struct fs_db *fsdb,
2938                                    struct mgs_target_info *mti,
2939                                    char *param)
2940 {
2941         struct mgs_thread_info  *mgi = mgs_env_info(env);
2942         struct llog_cfg_rec     *lcr;
2943         struct llog_handle      *llh = NULL;
2944         char                    *logname;
2945         char                    *comment, *ptr;
2946         int                      rc, len;
2947
2948         ENTRY;
2949
2950         /* get comment */
2951         ptr = strchr(param, '=');
2952         LASSERT(ptr != NULL);
2953         len = ptr - param;
2954
2955         OBD_ALLOC(comment, len + 1);
2956         if (comment == NULL)
2957                 RETURN(-ENOMEM);
2958         strncpy(comment, param, len);
2959         comment[len] = '\0';
2960
2961         /* prepare lcfg */
2962         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2963         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2964         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2965         if (lcr == NULL)
2966                 GOTO(out_comment, rc = -ENOMEM);
2967
2968         /* construct log name */
2969         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2970         if (rc < 0)
2971                 GOTO(out_lcfg, rc);
2972
2973         if (mgs_log_is_empty(env, mgs, logname)) {
2974                 rc = record_start_log(env, mgs, &llh, logname);
2975                 if (rc < 0)
2976                         GOTO(out, rc);
2977                 record_end_log(env, &llh);
2978         }
2979
2980         /* obsolete old one */
2981         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2982                         comment, CM_SKIP);
2983         if (rc < 0)
2984                 GOTO(out, rc);
2985         /* write the new one */
2986         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2987                                   mti->mti_svname, comment);
2988         if (rc)
2989                 CERROR("%s: error writing log %s: rc = %d\n",
2990                        mgs->mgs_obd->obd_name, logname, rc);
2991 out:
2992         name_destroy(&logname);
2993 out_lcfg:
2994         lustre_cfg_rec_free(lcr);
2995 out_comment:
2996         OBD_FREE(comment, len + 1);
2997         RETURN(rc);
2998 }
2999
3000 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3001                                         char *param)
3002 {
3003         char    *ptr;
3004
3005         /* disable the adjustable udesc parameter for now, i.e. use default
3006          * setting that client always ship udesc to MDT if possible. to enable
3007          * it simply remove the following line */
3008         goto error_out;
3009
3010         ptr = strchr(param, '=');
3011         if (ptr == NULL)
3012                 goto error_out;
3013         *ptr++ = '\0';
3014
3015         if (strcmp(param, PARAM_SRPC_UDESC))
3016                 goto error_out;
3017
3018         if (strcmp(ptr, "yes") == 0) {
3019                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3020                 CWARN("Enable user descriptor shipping from client to MDT\n");
3021         } else if (strcmp(ptr, "no") == 0) {
3022                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3023                 CWARN("Disable user descriptor shipping from client to MDT\n");
3024         } else {
3025                 *(ptr - 1) = '=';
3026                 goto error_out;
3027         }
3028         return 0;
3029
3030 error_out:
3031         CERROR("Invalid param: %s\n", param);
3032         return -EINVAL;
3033 }
3034
3035 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3036                                   const char *svname,
3037                                   char *param)
3038 {
3039         struct sptlrpc_rule      rule;
3040         struct sptlrpc_rule_set *rset;
3041         int                      rc;
3042         ENTRY;
3043
3044         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3045                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3046                 RETURN(-EINVAL);
3047         }
3048
3049         if (strncmp(param, PARAM_SRPC_UDESC,
3050                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3051                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3052         }
3053
3054         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3055                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3056                 RETURN(-EINVAL);
3057         }
3058
3059         param += sizeof(PARAM_SRPC_FLVR) - 1;
3060
3061         rc = sptlrpc_parse_rule(param, &rule);
3062         if (rc)
3063                 RETURN(rc);
3064
3065         /* mgs rules implies must be mgc->mgs */
3066         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3067                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3068                      rule.sr_from != LUSTRE_SP_ANY) ||
3069                     (rule.sr_to != LUSTRE_SP_MGS &&
3070                      rule.sr_to != LUSTRE_SP_ANY))
3071                         RETURN(-EINVAL);
3072         }
3073
3074         /* preapre room for this coming rule. svcname format should be:
3075          * - fsname: general rule
3076          * - fsname-tgtname: target-specific rule
3077          */
3078         if (strchr(svname, '-')) {
3079                 struct mgs_tgt_srpc_conf *tgtconf;
3080                 int                       found = 0;
3081
3082                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3083                      tgtconf = tgtconf->mtsc_next) {
3084                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3085                                 found = 1;
3086                                 break;
3087                         }
3088                 }
3089
3090                 if (!found) {
3091                         int name_len;
3092
3093                         OBD_ALLOC_PTR(tgtconf);
3094                         if (tgtconf == NULL)
3095                                 RETURN(-ENOMEM);
3096
3097                         name_len = strlen(svname);
3098
3099                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3100                         if (tgtconf->mtsc_tgt == NULL) {
3101                                 OBD_FREE_PTR(tgtconf);
3102                                 RETURN(-ENOMEM);
3103                         }
3104                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3105
3106                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3107                         fsdb->fsdb_srpc_tgt = tgtconf;
3108                 }
3109
3110                 rset = &tgtconf->mtsc_rset;
3111         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3112                 /* put _mgs related srpc rule directly in mgs ruleset */
3113                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3114         } else {
3115                 rset = &fsdb->fsdb_srpc_gen;
3116         }
3117
3118         rc = sptlrpc_rule_set_merge(rset, &rule);
3119
3120         RETURN(rc);
3121 }
3122
3123 static int mgs_srpc_set_param(const struct lu_env *env,
3124                               struct mgs_device *mgs,
3125                               struct fs_db *fsdb,
3126                               struct mgs_target_info *mti,
3127                               char *param)
3128 {
3129         char                   *copy;
3130         int                     rc, copy_size;
3131         ENTRY;
3132
3133 #ifndef HAVE_GSS
3134         RETURN(-EINVAL);
3135 #endif
3136         /* keep a copy of original param, which could be destroied
3137          * during parsing */
3138         copy_size = strlen(param) + 1;
3139         OBD_ALLOC(copy, copy_size);
3140         if (copy == NULL)
3141                 return -ENOMEM;
3142         memcpy(copy, param, copy_size);
3143
3144         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3145         if (rc)
3146                 goto out_free;
3147
3148         /* previous steps guaranteed the syntax is correct */
3149         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3150         if (rc)
3151                 goto out_free;
3152
3153         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3154                 /*
3155                  * for mgs rules, make them effective immediately.
3156                  */
3157                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3158                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3159                                                  &fsdb->fsdb_srpc_gen);
3160         }
3161
3162 out_free:
3163         OBD_FREE(copy, copy_size);
3164         RETURN(rc);
3165 }
3166
3167 struct mgs_srpc_read_data {
3168         struct fs_db   *msrd_fsdb;
3169         int             msrd_skip;
3170 };
3171
3172 static int mgs_srpc_read_handler(const struct lu_env *env,
3173                                  struct llog_handle *llh,
3174                                  struct llog_rec_hdr *rec, void *data)
3175 {
3176         struct mgs_srpc_read_data *msrd = data;
3177         struct cfg_marker         *marker;
3178         struct lustre_cfg         *lcfg = REC_DATA(rec);
3179         char                      *svname, *param;
3180         int                        cfg_len, rc;
3181         ENTRY;
3182
3183         if (rec->lrh_type != OBD_CFG_REC) {
3184                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3185                 RETURN(-EINVAL);
3186         }
3187
3188         cfg_len = REC_DATA_LEN(rec);
3189
3190         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3191         if (rc) {
3192                 CERROR("Insane cfg\n");
3193                 RETURN(rc);
3194         }
3195
3196         if (lcfg->lcfg_command == LCFG_MARKER) {
3197                 marker = lustre_cfg_buf(lcfg, 1);
3198
3199                 if (marker->cm_flags & CM_START &&
3200                     marker->cm_flags & CM_SKIP)
3201                         msrd->msrd_skip = 1;
3202                 if (marker->cm_flags & CM_END)
3203                         msrd->msrd_skip = 0;
3204
3205                 RETURN(0);
3206         }
3207
3208         if (msrd->msrd_skip)
3209                 RETURN(0);
3210
3211         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3212                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3213                 RETURN(0);
3214         }
3215
3216         svname = lustre_cfg_string(lcfg, 0);
3217         if (svname == NULL) {
3218                 CERROR("svname is empty\n");
3219                 RETURN(0);
3220         }
3221
3222         param = lustre_cfg_string(lcfg, 1);
3223         if (param == NULL) {
3224                 CERROR("param is empty\n");
3225                 RETURN(0);
3226         }
3227
3228         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3229         if (rc)
3230                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3231
3232         RETURN(0);
3233 }
3234
3235 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3236                                 struct mgs_device *mgs,
3237                                 struct fs_db *fsdb)
3238 {
3239         struct llog_handle        *llh = NULL;
3240         struct llog_ctxt          *ctxt;
3241         char                      *logname;
3242         struct mgs_srpc_read_data  msrd;
3243         int                        rc;
3244         ENTRY;
3245
3246         /* construct log name */
3247         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3248         if (rc)
3249                 RETURN(rc);
3250
3251         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3252         LASSERT(ctxt != NULL);
3253
3254         if (mgs_log_is_empty(env, mgs, logname))
3255                 GOTO(out, rc = 0);
3256
3257         rc = llog_open(env, ctxt, &llh, NULL, logname,
3258                        LLOG_OPEN_EXISTS);
3259         if (rc < 0) {
3260                 if (rc == -ENOENT)
3261                         rc = 0;
3262                 GOTO(out, rc);
3263         }
3264
3265         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3266         if (rc)
3267                 GOTO(out_close, rc);
3268
3269         if (llog_get_size(llh) <= 1)
3270                 GOTO(out_close, rc = 0);
3271
3272         msrd.msrd_fsdb = fsdb;
3273         msrd.msrd_skip = 0;
3274
3275         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3276                           NULL);
3277
3278 out_close:
3279         llog_close(env, llh);
3280 out:
3281         llog_ctxt_put(ctxt);
3282         name_destroy(&logname);
3283
3284         if (rc)
3285                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3286         RETURN(rc);
3287 }
3288
3289 /* Permanent settings of all parameters by writing into the appropriate
3290  * configuration logs.
3291  * A parameter with null value ("<param>='\0'") means to erase it out of
3292  * the logs.
3293  */
3294 static int mgs_write_log_param(const struct lu_env *env,
3295                                struct mgs_device *mgs, struct fs_db *fsdb,
3296                                struct mgs_target_info *mti, char *ptr)
3297 {
3298         struct mgs_thread_info *mgi = mgs_env_info(env);
3299         char *logname;
3300         char *tmp;
3301         int rc = 0;
3302         ENTRY;
3303
3304         /* For various parameter settings, we have to figure out which logs
3305            care about them (e.g. both mdt and client for lov settings) */
3306         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3307
3308         /* The params are stored in MOUNT_DATA_FILE and modified via
3309            tunefs.lustre, or set using lctl conf_param */
3310
3311         /* Processed in lustre_start_mgc */
3312         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3313                 GOTO(end, rc);
3314
3315         /* Processed in ost/mdt */
3316         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3317                 GOTO(end, rc);
3318
3319         /* Processed in mgs_write_log_ost */
3320         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3321                 if (mti->mti_flags & LDD_F_PARAM) {
3322                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3323                                            "changed with tunefs.lustre"
3324                                            "and --writeconf\n", ptr);
3325                         rc = -EPERM;
3326                 }
3327                 GOTO(end, rc);
3328         }
3329
3330         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3331                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3332                 GOTO(end, rc);
3333         }
3334
3335         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3336                 /* Add a failover nidlist */
3337                 rc = 0;
3338                 /* We already processed failovers params for new
3339                    targets in mgs_write_log_target */
3340                 if (mti->mti_flags & LDD_F_PARAM) {
3341                         CDEBUG(D_MGS, "Adding failnode\n");
3342                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3343                 }
3344                 GOTO(end, rc);
3345         }
3346
3347         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3348                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3349                 GOTO(end, rc);
3350         }
3351
3352         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3353                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3354                 GOTO(end, rc);
3355         }
3356
3357         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3358             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3359                 /* active=0 means off, anything else means on */
3360                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3361                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3362                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3363                 int i;
3364
3365                 if (!deactive_osc) {
3366                         __u32   index;
3367
3368                         rc = server_name2index(mti->mti_svname, &index, NULL);
3369                         if (rc < 0)
3370                                 GOTO(end, rc);
3371
3372                         if (index == 0) {
3373                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3374                                                    " (de)activated.\n",
3375                                                    mti->mti_svname);
3376                                 GOTO(end, rc = -EINVAL);
3377                         }
3378                 }
3379
3380                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3381                               flag ? "de" : "re", mti->mti_svname);
3382                 /* Modify clilov */
3383                 rc = name_create(&logname, mti->mti_fsname, "-client");
3384                 if (rc < 0)
3385                         GOTO(end, rc);
3386                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3387                                 mti->mti_svname,
3388                                 deactive_osc ? "add osc" : "add mdc", flag);
3389                 name_destroy(&logname);
3390                 if (rc < 0)
3391                         goto active_err;
3392
3393                 /* Modify mdtlov */
3394                 /* Add to all MDT logs for DNE */
3395                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3396                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3397                                 continue;
3398                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3399                         if (rc < 0)
3400                                 GOTO(end, rc);
3401                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3402                                         mti->mti_svname,
3403                                         deactive_osc ? "add osc" : "add osp",
3404                                         flag);
3405                         name_destroy(&logname);
3406                         if (rc < 0)
3407                                 goto active_err;
3408                 }
3409 active_err:
3410                 if (rc < 0) {
3411                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3412                                            "log (%d). No permanent "
3413                                            "changes were made to the "
3414                                            "config log.\n",
3415                                            mti->mti_svname, rc);
3416                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3417                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3418                                                    " because the log"
3419                                                    "is in the old 1.4"
3420                                                    "style. Consider "
3421                                                    " --writeconf to "
3422                                                    "update the logs.\n");
3423                         GOTO(end, rc);
3424                 }
3425                 /* Fall through to osc/mdc proc for deactivating live
3426                    OSC/OSP on running MDT / clients. */
3427         }
3428         /* Below here, let obd's XXX_process_config methods handle it */
3429
3430         /* All lov. in proc */
3431         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3432                 char *mdtlovname;
3433
3434                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3435                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3436                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3437                                            "set on the MDT, not %s. "
3438                                            "Ignoring.\n",
3439                                            mti->mti_svname);
3440                         GOTO(end, rc = 0);
3441                 }
3442
3443                 /* Modify mdtlov */
3444                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3445                         GOTO(end, rc = -ENODEV);
3446
3447                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3448                                              mti->mti_stripe_index);
3449                 if (rc)
3450                         GOTO(end, rc);
3451                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3452                                   &mgi->mgi_bufs, mdtlovname, ptr);
3453                 name_destroy(&logname);
3454                 name_destroy(&mdtlovname);
3455                 if (rc)
3456                         GOTO(end, rc);
3457
3458                 /* Modify clilov */
3459                 rc = name_create(&logname, mti->mti_fsname, "-client");
3460                 if (rc)
3461                         GOTO(end, rc);
3462                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3463                                   fsdb->fsdb_clilov, ptr);
3464                 name_destroy(&logname);
3465                 GOTO(end, rc);
3466         }
3467
3468         /* All osc., mdc., llite. params in proc */
3469         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3470             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3471             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3472                 char *cname;
3473
3474                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3475                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3476                                            " cannot be modified. Consider"
3477                                            " updating the configuration with"
3478                                            " --writeconf\n",
3479                                            mti->mti_svname);
3480                         GOTO(end, rc = -EINVAL);
3481                 }
3482                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3483                         rc = name_create(&cname, mti->mti_fsname, "-client");
3484                         /* Add the client type to match the obdname in
3485                            class_config_llog_handler */
3486                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3487                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3488                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3489                         rc = name_create(&cname, mti->mti_svname, "-osc");
3490                 } else {
3491                         GOTO(end, rc = -EINVAL);
3492                 }
3493                 if (rc)
3494                         GOTO(end, rc);
3495
3496                 /* Forbid direct update of llite root squash parameters.
3497                  * These parameters are indirectly set via the MDT settings.
3498                  * See (LU-1778) */
3499                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3500                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3501                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3502                         LCONSOLE_ERROR("%s: root squash parameters can only "
3503                                 "be updated through MDT component\n",
3504                                 mti->mti_fsname);
3505                         name_destroy(&cname);
3506                         GOTO(end, rc = -EINVAL);
3507                 }
3508
3509                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3510
3511                 /* Modify client */
3512                 rc = name_create(&logname, mti->mti_fsname, "-client");
3513                 if (rc) {
3514                         name_destroy(&cname);
3515                         GOTO(end, rc);
3516                 }
3517                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3518                                   cname, ptr);
3519
3520                 /* osc params affect the MDT as well */
3521                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3522                         int i;
3523
3524                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3525                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3526                                         continue;
3527                                 name_destroy(&cname);
3528                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3529                                                          fsdb, i);
3530                                 name_destroy(&logname);
3531                                 if (rc)
3532                                         break;
3533                                 rc = name_create_mdt(&logname,
3534                                                      mti->mti_fsname, i);
3535                                 if (rc)
3536                                         break;
3537                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3538                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3539                                                           mti, logname,
3540                                                           &mgi->mgi_bufs,
3541                                                           cname, ptr);
3542                                         if (rc)
3543                                                 break;
3544                                 }
3545                         }
3546                 }
3547
3548                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
3549                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
3550                     rc == 0) {
3551                         char suffix[16];
3552                         char *lodname = NULL;
3553                         char *param_str = NULL;
3554                         int i;
3555                         int index;
3556
3557                         /* replace mdc with osp */
3558                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
3559                         rc = server_name2index(mti->mti_svname, &index, NULL);
3560                         if (rc < 0) {
3561                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3562                                 GOTO(end, rc);
3563                         }
3564
3565                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3566                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3567                                         continue;
3568
3569                                 if (i == index)
3570                                         continue;
3571
3572                                 name_destroy(&logname);
3573                                 rc = name_create_mdt(&logname, mti->mti_fsname,
3574                                                      i);
3575                                 if (rc < 0)
3576                                         break;
3577
3578                                 if (mgs_log_is_empty(env, mgs, logname))
3579                                         continue;
3580
3581                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
3582                                          i);
3583                                 name_destroy(&cname);
3584                                 rc = name_create(&cname, mti->mti_svname,
3585                                                  suffix);
3586                                 if (rc < 0)
3587                                         break;
3588
3589                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3590                                                   &mgi->mgi_bufs, cname, ptr);
3591                                 if (rc < 0)
3592                                         break;
3593
3594                                 /* Add configuration log for noitfying LOD
3595                                  * to active/deactive the OSP. */
3596                                 name_destroy(&param_str);
3597                                 rc = name_create(&param_str, cname,
3598                                                  (*tmp == '0') ?  ".active=0" :
3599                                                  ".active=1");
3600                                 if (rc < 0)
3601                                         break;
3602
3603                                 name_destroy(&lodname);
3604                                 rc = name_create(&lodname, logname, "-mdtlov");
3605                                 if (rc < 0)
3606                                         break;
3607
3608                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3609                                                   &mgi->mgi_bufs, lodname,
3610                                                   param_str);
3611                                 if (rc < 0)
3612                                         break;
3613                         }
3614                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3615                         name_destroy(&lodname);
3616                         name_destroy(&param_str);
3617                 }
3618
3619                 name_destroy(&logname);
3620                 name_destroy(&cname);
3621                 GOTO(end, rc);
3622         }
3623
3624         /* All mdt. params in proc */
3625         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3626                 int i;
3627                 __u32 idx;
3628
3629                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3630                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3631                             MTI_NAME_MAXLEN) == 0)
3632                         /* device is unspecified completely? */
3633                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3634                 else
3635                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3636                 if (rc < 0)
3637                         goto active_err;
3638                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3639                         goto active_err;
3640                 if (rc & LDD_F_SV_ALL) {
3641                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3642                                 if (!test_bit(i,
3643                                                   fsdb->fsdb_mdt_index_map))
3644                                         continue;
3645                                 rc = name_create_mdt(&logname,
3646                                                 mti->mti_fsname, i);
3647                                 if (rc)
3648                                         goto active_err;
3649                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3650                                                   logname, &mgi->mgi_bufs,
3651                                                   logname, ptr);
3652                                 name_destroy(&logname);
3653                                 if (rc)
3654                                         goto active_err;
3655                         }
3656                 } else {
3657                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3658                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3659                                 LCONSOLE_ERROR("%s: root squash parameters "
3660                                         "cannot be applied to a single MDT\n",
3661                                         mti->mti_fsname);
3662                                 GOTO(end, rc = -EINVAL);
3663                         }
3664                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3665                                           mti->mti_svname, &mgi->mgi_bufs,
3666                                           mti->mti_svname, ptr);
3667                         if (rc)
3668                                 goto active_err;
3669                 }
3670
3671                 /* root squash settings are also applied to llite
3672                  * config log (see LU-1778) */
3673                 if (rc == 0 &&
3674                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3675                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3676                         char *cname;
3677                         char *ptr2;
3678
3679                         rc = name_create(&cname, mti->mti_fsname, "-client");
3680                         if (rc)
3681                                 GOTO(end, rc);
3682                         rc = name_create(&logname, mti->mti_fsname, "-client");
3683                         if (rc) {
3684                                 name_destroy(&cname);
3685                                 GOTO(end, rc);
3686                         }
3687                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3688                         if (rc) {
3689                                 name_destroy(&cname);
3690                                 name_destroy(&logname);
3691                                 GOTO(end, rc);
3692                         }
3693                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3694                                           &mgi->mgi_bufs, cname, ptr2);
3695                         name_destroy(&ptr2);
3696                         name_destroy(&logname);
3697                         name_destroy(&cname);
3698                 }
3699                 GOTO(end, rc);
3700         }
3701
3702         /* All mdd., ost. and osd. params in proc */
3703         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3704             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3705             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3706                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3707                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3708                         GOTO(end, rc = -ENODEV);
3709
3710                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3711                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3712                 GOTO(end, rc);
3713         }
3714
3715         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3716
3717 end:
3718         if (rc)
3719                 CERROR("err %d on param '%s'\n", rc, ptr);
3720
3721         RETURN(rc);
3722 }
3723
3724 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3725                          struct mgs_target_info *mti, struct fs_db *fsdb)
3726 {
3727         char    *buf, *params;
3728         int      rc = -EINVAL;
3729
3730         ENTRY;
3731
3732         /* set/check the new target index */
3733         rc = mgs_set_index(env, mgs, mti);
3734         if (rc < 0)
3735                 RETURN(rc);
3736
3737         if (rc == EALREADY) {
3738                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3739                               mti->mti_stripe_index, mti->mti_svname);
3740                 /* We would like to mark old log sections as invalid
3741                    and add new log sections in the client and mdt logs.
3742                    But if we add new sections, then live clients will
3743                    get repeat setup instructions for already running
3744                    osc's. So don't update the client/mdt logs. */
3745                 mti->mti_flags &= ~LDD_F_UPDATE;
3746                 rc = 0;
3747         }
3748
3749         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
3750                          cfs_fail_val : 10);
3751
3752         mutex_lock(&fsdb->fsdb_mutex);
3753
3754         if (mti->mti_flags &
3755             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3756                 /* Generate a log from scratch */
3757                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3758                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3759                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3760                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3761                 } else {
3762                         CERROR("Unknown target type %#x, can't create log for "
3763                                "%s\n", mti->mti_flags, mti->mti_svname);
3764                 }
3765                 if (rc) {
3766                         CERROR("Can't write logs for %s (%d)\n",
3767                                mti->mti_svname, rc);
3768                         GOTO(out_up, rc);
3769                 }
3770         } else {
3771                 /* Just update the params from tunefs in mgs_write_log_params */
3772                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3773                 mti->mti_flags |= LDD_F_PARAM;
3774         }
3775
3776         /* allocate temporary buffer, where class_get_next_param will
3777            make copy of a current  parameter */
3778         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3779         if (buf == NULL)
3780                 GOTO(out_up, rc = -ENOMEM);
3781         params = mti->mti_params;
3782         while (params != NULL) {
3783                 rc = class_get_next_param(&params, buf);
3784                 if (rc) {
3785                         if (rc == 1)
3786                                 /* there is no next parameter, that is
3787                                    not an error */
3788                                 rc = 0;
3789                         break;
3790                 }
3791                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3792                        params, buf);
3793                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3794                 if (rc)
3795                         break;
3796         }
3797
3798         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3799
3800 out_up:
3801         mutex_unlock(&fsdb->fsdb_mutex);
3802         RETURN(rc);
3803 }
3804
3805 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3806 {
3807         struct llog_ctxt        *ctxt;
3808         int                      rc = 0;
3809
3810         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3811         if (ctxt == NULL) {
3812                 CERROR("%s: MGS config context doesn't exist\n",
3813                        mgs->mgs_obd->obd_name);
3814                 rc = -ENODEV;
3815         } else {
3816                 rc = llog_erase(env, ctxt, NULL, name);
3817                 /* llog may not exist */
3818                 if (rc == -ENOENT)
3819                         rc = 0;
3820                 llog_ctxt_put(ctxt);
3821         }
3822
3823         if (rc)
3824                 CERROR("%s: failed to clear log %s: %d\n",
3825                        mgs->mgs_obd->obd_name, name, rc);
3826
3827         return rc;
3828 }
3829
3830 /* erase all logs for the given fs */
3831 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3832 {
3833         struct fs_db *fsdb;
3834         struct list_head log_list;
3835         struct mgs_direntry *dirent, *n;
3836         int rc, len = strlen(fsname);
3837         char *suffix;
3838         ENTRY;
3839
3840         /* Find all the logs in the CONFIGS directory */
3841         rc = class_dentry_readdir(env, mgs, &log_list);
3842         if (rc)
3843                 RETURN(rc);
3844
3845         mutex_lock(&mgs->mgs_mutex);
3846
3847         /* Delete the fs db */
3848         fsdb = mgs_find_fsdb(mgs, fsname);
3849         if (fsdb)
3850                 mgs_free_fsdb(mgs, fsdb);
3851
3852         mutex_unlock(&mgs->mgs_mutex);
3853
3854         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3855                 list_del_init(&dirent->mde_list);
3856                 suffix = strrchr(dirent->mde_name, '-');
3857                 if (suffix != NULL) {
3858                         if ((len == suffix - dirent->mde_name) &&
3859                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3860                                 CDEBUG(D_MGS, "Removing log %s\n",
3861                                        dirent->mde_name);
3862                                 mgs_erase_log(env, mgs, dirent->mde_name);
3863                         }
3864                 }
3865                 mgs_direntry_free(dirent);
3866         }
3867
3868         RETURN(rc);
3869 }
3870
3871 /* list all logs for the given fs */
3872 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3873                   struct obd_ioctl_data *data)
3874 {
3875         struct list_head         log_list;
3876         struct mgs_direntry     *dirent, *n;
3877         char                    *out, *suffix;
3878         int                      l, remains, rc;
3879
3880         ENTRY;
3881
3882         /* Find all the logs in the CONFIGS directory */
3883         rc = class_dentry_readdir(env, mgs, &log_list);
3884         if (rc)
3885                 RETURN(rc);
3886
3887         out = data->ioc_bulk;
3888         remains = data->ioc_inllen1;
3889         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3890                 list_del_init(&dirent->mde_list);
3891                 suffix = strrchr(dirent->mde_name, '-');
3892                 if (suffix != NULL) {
3893                         l = snprintf(out, remains, "config log: $%s\n",
3894                                      dirent->mde_name);
3895                         out += l;
3896                         remains -= l;
3897                 }
3898                 mgs_direntry_free(dirent);
3899                 if (remains <= 0)
3900                         break;
3901         }
3902         RETURN(rc);
3903 }
3904
3905 /* from llog_swab */
3906 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3907 {
3908         int i;
3909         ENTRY;
3910
3911         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3912         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3913
3914         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3915         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3916         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3917         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3918
3919         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3920         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3921                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3922                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3923                                i, lcfg->lcfg_buflens[i],
3924                                lustre_cfg_string(lcfg, i));
3925                 }
3926         EXIT;
3927 }
3928
3929 /* Setup _mgs fsdb and log
3930  */
3931 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3932                           struct fs_db *fsdb)
3933 {
3934         int                     rc;
3935         ENTRY;
3936
3937         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
3938
3939         RETURN(rc);
3940 }
3941
3942 /* Setup params fsdb and log
3943  */
3944 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3945                           struct fs_db *fsdb)
3946 {
3947         struct llog_handle      *params_llh = NULL;
3948         int                     rc;
3949         ENTRY;
3950
3951         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3952         if (fsdb != NULL) {
3953                 mutex_lock(&fsdb->fsdb_mutex);
3954                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3955                 if (rc == 0)
3956                         rc = record_end_log(env, &params_llh);
3957                 mutex_unlock(&fsdb->fsdb_mutex);
3958         }
3959
3960         RETURN(rc);
3961 }
3962
3963 /* Cleanup params fsdb and log
3964  */
3965 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3966 {
3967         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3968 }
3969
3970 /* Set a permanent (config log) param for a target or fs
3971  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3972  *             buf1 contains the single parameter
3973  */
3974 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3975                  struct lustre_cfg *lcfg, char *fsname)
3976 {
3977         struct fs_db *fsdb;
3978         struct mgs_target_info *mti;
3979         char *devname, *param;
3980         char *ptr;
3981         const char *tmp;
3982         __u32 index;
3983         int rc = 0;
3984         ENTRY;
3985
3986         print_lustre_cfg(lcfg);
3987
3988         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3989         devname = lustre_cfg_string(lcfg, 0);
3990         param = lustre_cfg_string(lcfg, 1);
3991         if (!devname) {
3992                 /* Assume device name embedded in param:
3993                    lustre-OST0000.osc.max_dirty_mb=32 */
3994                 ptr = strchr(param, '.');
3995                 if (ptr) {
3996                         devname = param;
3997                         *ptr = 0;
3998                         param = ptr + 1;
3999                 }
4000         }
4001         if (!devname) {
4002                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
4003                 RETURN(-ENOSYS);
4004         }
4005
4006         rc = mgs_parse_devname(devname, fsname, NULL);
4007         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
4008                 /* param related to llite isn't allowed to set by OST or MDT */
4009                 if (rc == 0 && strncmp(param, PARAM_LLITE,
4010                                        sizeof(PARAM_LLITE) - 1) == 0)
4011                         RETURN(-EINVAL);
4012         } else {
4013                 /* assume devname is the fsname */
4014                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
4015         }
4016         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
4017
4018         rc = mgs_find_or_make_fsdb(env, mgs,
4019                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
4020                                    PARAMS_FILENAME : fsname, &fsdb);
4021         if (rc)
4022                 RETURN(rc);
4023
4024         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
4025             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
4026             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4027                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
4028                        "is '%s'\n", fsname, devname);
4029                 mgs_free_fsdb(mgs, fsdb);
4030                 RETURN(-EINVAL);
4031         }
4032
4033         /* Create a fake mti to hold everything */
4034         OBD_ALLOC_PTR(mti);
4035         if (!mti)
4036                 GOTO(out, rc = -ENOMEM);
4037         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
4038             >= sizeof(mti->mti_fsname))
4039                 GOTO(out, rc = -E2BIG);
4040         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
4041             >= sizeof(mti->mti_svname))
4042                 GOTO(out, rc = -E2BIG);
4043         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
4044             >= sizeof(mti->mti_params))
4045                 GOTO(out, rc = -E2BIG);
4046         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
4047         if (rc < 0)
4048                 /* Not a valid server; may be only fsname */
4049                 rc = 0;
4050         else
4051                 /* Strip -osc or -mdc suffix from svname */
4052                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
4053                                      mti->mti_svname))
4054                         GOTO(out, rc = -EINVAL);
4055         /*
4056          * Revoke lock so everyone updates.  Should be alright if
4057          * someone was already reading while we were updating the logs,
4058          * so we don't really need to hold the lock while we're
4059          * writing (above).
4060          */
4061         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
4062                 mti->mti_flags = rc | LDD_F_PARAM2;
4063                 mutex_lock(&fsdb->fsdb_mutex);
4064                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
4065                 mutex_unlock(&fsdb->fsdb_mutex);
4066                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
4067         } else {
4068                 mti->mti_flags = rc | LDD_F_PARAM;
4069                 mutex_lock(&fsdb->fsdb_mutex);
4070                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
4071                 mutex_unlock(&fsdb->fsdb_mutex);
4072                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4073         }
4074
4075 out:
4076         OBD_FREE_PTR(mti);
4077         RETURN(rc);
4078 }
4079
4080 static int mgs_write_log_pool(const struct lu_env *env,
4081                               struct mgs_device *mgs, char *logname,
4082                               struct fs_db *fsdb, char *tgtname,
4083                               enum lcfg_command_type cmd,
4084                               char *fsname, char *poolname,
4085                               char *ostname, char *comment)
4086 {
4087         struct llog_handle *llh = NULL;
4088         int rc;
4089
4090         rc = record_start_log(env, mgs, &llh, logname);
4091         if (rc)
4092                 return rc;
4093         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
4094         if (rc)
4095                 goto out;
4096         rc = record_base(env, llh, tgtname, 0, cmd,
4097                          fsname, poolname, ostname, NULL);
4098         if (rc)
4099                 goto out;
4100         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
4101 out:
4102         record_end_log(env, &llh);
4103         return rc;
4104 }
4105
4106 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
4107                     enum lcfg_command_type cmd, const char *nodemap_name,
4108                     char *param)
4109 {
4110         lnet_nid_t      nid[2];
4111         __u32           idmap[2];
4112         bool            bool_switch;
4113         __u32           int_id;
4114         int             rc = 0;
4115         ENTRY;
4116
4117         switch (cmd) {
4118         case LCFG_NODEMAP_ADD:
4119                 rc = nodemap_add(nodemap_name);
4120                 break;
4121         case LCFG_NODEMAP_DEL:
4122                 rc = nodemap_del(nodemap_name);
4123                 break;
4124         case LCFG_NODEMAP_ADD_RANGE:
4125                 rc = nodemap_parse_range(param, nid);
4126                 if (rc != 0)
4127                         break;
4128                 rc = nodemap_add_range(nodemap_name, nid);
4129                 break;
4130         case LCFG_NODEMAP_DEL_RANGE:
4131                 rc = nodemap_parse_range(param, nid);
4132                 if (rc != 0)
4133                         break;
4134                 rc = nodemap_del_range(nodemap_name, nid);
4135                 break;
4136         case LCFG_NODEMAP_ADMIN:
4137                 bool_switch = simple_strtoul(param, NULL, 10);
4138                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4139                 break;
4140         case LCFG_NODEMAP_DENY_UNKNOWN:
4141                 bool_switch = simple_strtoul(param, NULL, 10);
4142                 rc = nodemap_set_deny_unknown(nodemap_name, bool_switch);
4143                 break;
4144         case LCFG_NODEMAP_TRUSTED:
4145                 bool_switch = simple_strtoul(param, NULL, 10);
4146                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4147                 break;
4148         case LCFG_NODEMAP_SQUASH_UID:
4149                 int_id = simple_strtoul(param, NULL, 10);
4150                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4151                 break;
4152         case LCFG_NODEMAP_SQUASH_GID:
4153                 int_id = simple_strtoul(param, NULL, 10);
4154                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4155                 break;
4156         case LCFG_NODEMAP_ADD_UIDMAP:
4157         case LCFG_NODEMAP_ADD_GIDMAP:
4158                 rc = nodemap_parse_idmap(param, idmap);
4159                 if (rc != 0)
4160                         break;
4161                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4162                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4163                                                idmap);
4164                 else
4165                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4166                                                idmap);
4167                 break;
4168         case LCFG_NODEMAP_DEL_UIDMAP:
4169         case LCFG_NODEMAP_DEL_GIDMAP:
4170                 rc = nodemap_parse_idmap(param, idmap);
4171                 if (rc != 0)
4172                         break;
4173                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4174                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4175                                                idmap);
4176                 else
4177                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4178                                                idmap);
4179                 break;
4180         case LCFG_NODEMAP_SET_FILESET:
4181                 rc = nodemap_set_fileset(nodemap_name, param);
4182                 break;
4183         default:
4184                 rc = -EINVAL;
4185         }
4186
4187         RETURN(rc);
4188 }
4189
4190 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4191                  enum lcfg_command_type cmd, char *fsname,
4192                  char *poolname, char *ostname)
4193 {
4194         struct fs_db *fsdb;
4195         char *lovname;
4196         char *logname;
4197         char *label = NULL, *canceled_label = NULL;
4198         int label_sz;
4199         struct mgs_target_info *mti = NULL;
4200         int rc, i;
4201         ENTRY;
4202
4203         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4204         if (rc) {
4205                 CERROR("Can't get db for %s\n", fsname);
4206                 RETURN(rc);
4207         }
4208         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4209                 CERROR("%s is not defined\n", fsname);
4210                 mgs_free_fsdb(mgs, fsdb);
4211                 RETURN(-EINVAL);
4212         }
4213
4214         label_sz = 10 + strlen(fsname) + strlen(poolname);
4215
4216         /* check if ostname match fsname */
4217         if (ostname != NULL) {
4218                 char *ptr;
4219
4220                 ptr = strrchr(ostname, '-');
4221                 if ((ptr == NULL) ||
4222                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4223                         RETURN(-EINVAL);
4224                 label_sz += strlen(ostname);
4225         }
4226
4227         OBD_ALLOC(label, label_sz);
4228         if (label == NULL)
4229                 RETURN(-ENOMEM);
4230
4231         switch(cmd) {
4232         case LCFG_POOL_NEW:
4233                 sprintf(label,
4234                         "new %s.%s", fsname, poolname);
4235                 break;
4236         case LCFG_POOL_ADD:
4237                 sprintf(label,
4238                         "add %s.%s.%s", fsname, poolname, ostname);
4239                 break;
4240         case LCFG_POOL_REM:
4241                 OBD_ALLOC(canceled_label, label_sz);
4242                 if (canceled_label == NULL)
4243                         GOTO(out_label, rc = -ENOMEM);
4244                 sprintf(label,
4245                         "rem %s.%s.%s", fsname, poolname, ostname);
4246                 sprintf(canceled_label,
4247                         "add %s.%s.%s", fsname, poolname, ostname);
4248                 break;
4249         case LCFG_POOL_DEL:
4250                 OBD_ALLOC(canceled_label, label_sz);
4251                 if (canceled_label == NULL)
4252                         GOTO(out_label, rc = -ENOMEM);
4253                 sprintf(label,
4254                         "del %s.%s", fsname, poolname);
4255                 sprintf(canceled_label,
4256                         "new %s.%s", fsname, poolname);
4257                 break;
4258         default:
4259                 break;
4260         }
4261
4262         if (canceled_label != NULL) {
4263                 OBD_ALLOC_PTR(mti);
4264                 if (mti == NULL)
4265                         GOTO(out_cancel, rc = -ENOMEM);
4266         }
4267
4268         mutex_lock(&fsdb->fsdb_mutex);
4269         /* write pool def to all MDT logs */
4270         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4271                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4272                         rc = name_create_mdt_and_lov(&logname, &lovname,
4273                                                      fsdb, i);
4274                         if (rc) {
4275                                 mutex_unlock(&fsdb->fsdb_mutex);
4276                                 GOTO(out_mti, rc);
4277                         }
4278                         if (canceled_label != NULL) {
4279                                 strcpy(mti->mti_svname, "lov pool");
4280                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4281                                                 lovname, canceled_label,
4282                                                 CM_SKIP);
4283                         }
4284
4285                         if (rc >= 0)
4286                                 rc = mgs_write_log_pool(env, mgs, logname,
4287                                                         fsdb, lovname, cmd,
4288                                                         fsname, poolname,
4289                                                         ostname, label);
4290                         name_destroy(&logname);
4291                         name_destroy(&lovname);
4292                         if (rc) {
4293                                 mutex_unlock(&fsdb->fsdb_mutex);
4294                                 GOTO(out_mti, rc);
4295                         }
4296                 }
4297         }
4298
4299         rc = name_create(&logname, fsname, "-client");
4300         if (rc) {
4301                 mutex_unlock(&fsdb->fsdb_mutex);
4302                 GOTO(out_mti, rc);
4303         }
4304         if (canceled_label != NULL) {
4305                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4306                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4307                 if (rc < 0) {
4308                         mutex_unlock(&fsdb->fsdb_mutex);
4309                         name_destroy(&logname);
4310                         GOTO(out_mti, rc);
4311                 }
4312         }
4313
4314         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4315                                 cmd, fsname, poolname, ostname, label);
4316         mutex_unlock(&fsdb->fsdb_mutex);
4317         name_destroy(&logname);
4318         /* request for update */
4319         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4320
4321         EXIT;
4322 out_mti:
4323         if (mti != NULL)
4324                 OBD_FREE_PTR(mti);
4325 out_cancel:
4326         if (canceled_label != NULL)
4327                 OBD_FREE(canceled_label, label_sz);
4328 out_label:
4329         OBD_FREE(label, label_sz);
4330         return rc;
4331 }