Whamcloud - gitweb
6b0aadb7e32269084c4568c53d4e4a025364afae
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgs/mgs_llog.c
33  *
34  * Lustre Management Server (mgs) config llog creation
35  *
36  * Author: Nathan Rutman <nathan@clusterfs.com>
37  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
38  * Author: Mikhail Pershin <tappro@whamcloud.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MGS
42 #define D_MGS D_CONFIG
43
44 #include <obd.h>
45 #include <lustre_ioctl.h>
46 #include <lustre_param.h>
47 #include <lustre_sec.h>
48 #include <lustre_quota.h>
49
50 #include "mgs_internal.h"
51
52 /********************** Class functions ********************/
53
54 /* Find all logs in CONFIG directory and link then into list */
55 int class_dentry_readdir(const struct lu_env *env,
56                          struct mgs_device *mgs, struct list_head *log_list)
57 {
58         struct dt_object    *dir = mgs->mgs_configs_dir;
59         const struct dt_it_ops *iops;
60         struct dt_it        *it;
61         struct mgs_direntry *de;
62         char                *key;
63         int                  rc, key_sz;
64
65         INIT_LIST_HEAD(log_list);
66
67         LASSERT(dir);
68         LASSERT(dir->do_index_ops);
69
70         iops = &dir->do_index_ops->dio_it;
71         it = iops->init(env, dir, LUDA_64BITHASH);
72         if (IS_ERR(it))
73                 RETURN(PTR_ERR(it));
74
75         rc = iops->load(env, it, 0);
76         if (rc <= 0)
77                 GOTO(fini, rc = 0);
78
79         /* main cycle */
80         do {
81                 key = (void *)iops->key(env, it);
82                 if (IS_ERR(key)) {
83                         CERROR("%s: key failed when listing %s: rc = %d\n",
84                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
85                                (int) PTR_ERR(key));
86                         goto next;
87                 }
88                 key_sz = iops->key_size(env, it);
89                 LASSERT(key_sz > 0);
90
91                 /* filter out "." and ".." entries */
92                 if (key[0] == '.') {
93                         if (key_sz == 1)
94                                 goto next;
95                         if (key_sz == 2 && key[1] == '.')
96                                 goto next;
97                 }
98
99                 /* filter out ".bak" files */
100                 /* sizeof(".bak") - 1 == 3 */
101                 if (key_sz >= 3 &&
102                     !memcmp(".bak", key + key_sz - 3, 3)) {
103                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
104                                key_sz, key);
105                         goto next;
106                 }
107
108                 de = mgs_direntry_alloc(key_sz + 1);
109                 if (de == NULL) {
110                         rc = -ENOMEM;
111                         break;
112                 }
113
114                 memcpy(de->mde_name, key, key_sz);
115                 de->mde_name[key_sz] = 0;
116
117                 list_add(&de->mde_list, log_list);
118
119 next:
120                 rc = iops->next(env, it);
121         } while (rc == 0);
122         if (rc > 0)
123                 rc = 0;
124
125         iops->put(env, it);
126
127 fini:
128         iops->fini(env, it);
129         if (rc)
130                 CERROR("%s: key failed when listing %s: rc = %d\n",
131                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
132         RETURN(rc);
133 }
134
135 /******************** DB functions *********************/
136
137 static inline int name_create(char **newname, char *prefix, char *suffix)
138 {
139         LASSERT(newname);
140         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
141         if (!*newname)
142                 return -ENOMEM;
143         sprintf(*newname, "%s%s", prefix, suffix);
144         return 0;
145 }
146
147 static inline void name_destroy(char **name)
148 {
149         if (*name)
150                 OBD_FREE(*name, strlen(*name) + 1);
151         *name = NULL;
152 }
153
154 struct mgs_fsdb_handler_data
155 {
156         struct fs_db   *fsdb;
157         __u32           ver;
158 };
159
160 /* from the (client) config log, figure out:
161         1. which ost's/mdt's are configured (by index)
162         2. what the last config step is
163         3. COMPAT_18 osc name
164 */
165 /* It might be better to have a separate db file, instead of parsing the info
166    out of the client log.  This is slow and potentially error-prone. */
167 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
168                             struct llog_rec_hdr *rec, void *data)
169 {
170         struct mgs_fsdb_handler_data *d = data;
171         struct fs_db *fsdb = d->fsdb;
172         int cfg_len = rec->lrh_len;
173         char *cfg_buf = (char*) (rec + 1);
174         struct lustre_cfg *lcfg;
175         __u32 index;
176         int rc = 0;
177         ENTRY;
178
179         if (rec->lrh_type != OBD_CFG_REC) {
180                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
181                 RETURN(-EINVAL);
182         }
183
184         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
185         if (rc) {
186                 CERROR("Insane cfg\n");
187                 RETURN(rc);
188         }
189
190         lcfg = (struct lustre_cfg *)cfg_buf;
191
192         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
193                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
194
195         /* Figure out ost indicies */
196         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
197         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
198             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
199                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
200                                        NULL, 10);
201                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
202                        lustre_cfg_string(lcfg, 1), index,
203                        lustre_cfg_string(lcfg, 2));
204                 set_bit(index, fsdb->fsdb_ost_index_map);
205         }
206
207         /* Figure out mdt indicies */
208         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
209         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
210             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
211                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
212                                        &index, NULL);
213                 if (rc != LDD_F_SV_TYPE_MDT) {
214                         CWARN("Unparsable MDC name %s, assuming index 0\n",
215                               lustre_cfg_string(lcfg, 0));
216                         index = 0;
217                 }
218                 rc = 0;
219                 CDEBUG(D_MGS, "MDT index is %u\n", index);
220                 set_bit(index, fsdb->fsdb_mdt_index_map);
221                 fsdb->fsdb_mdt_count ++;
222         }
223
224         /**
225          * figure out the old config. fsdb_gen = 0 means old log
226          * It is obsoleted and not supported anymore
227          */
228         if (fsdb->fsdb_gen == 0) {
229                 CERROR("Old config format is not supported\n");
230                 RETURN(-EINVAL);
231         }
232
233         /*
234          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
235          */
236         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
237             lcfg->lcfg_command == LCFG_ATTACH &&
238             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
239                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
240                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
241                         CWARN("MDT using 1.8 OSC name scheme\n");
242                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
243                 }
244         }
245
246         if (lcfg->lcfg_command == LCFG_MARKER) {
247                 struct cfg_marker *marker;
248                 marker = lustre_cfg_buf(lcfg, 1);
249
250                 d->ver = marker->cm_vers;
251
252                 /* Keep track of the latest marker step */
253                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
254         }
255
256         RETURN(rc);
257 }
258
259 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
260 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
261                                   struct mgs_device *mgs,
262                                   struct fs_db *fsdb)
263 {
264         char                            *logname;
265         struct llog_handle              *loghandle;
266         struct llog_ctxt                *ctxt;
267         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
268         int rc;
269
270         ENTRY;
271
272         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
273         LASSERT(ctxt != NULL);
274         rc = name_create(&logname, fsdb->fsdb_name, "-client");
275         if (rc)
276                 GOTO(out_put, rc);
277         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
278         if (rc)
279                 GOTO(out_pop, rc);
280
281         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
282         if (rc)
283                 GOTO(out_close, rc);
284
285         if (llog_get_size(loghandle) <= 1)
286                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
287
288         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
289         CDEBUG(D_INFO, "get_db = %d\n", rc);
290 out_close:
291         llog_close(env, loghandle);
292 out_pop:
293         name_destroy(&logname);
294 out_put:
295         llog_ctxt_put(ctxt);
296
297         RETURN(rc);
298 }
299
300 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
301 {
302         struct mgs_tgt_srpc_conf *tgtconf;
303
304         /* free target-specific rules */
305         while (fsdb->fsdb_srpc_tgt) {
306                 tgtconf = fsdb->fsdb_srpc_tgt;
307                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
308
309                 LASSERT(tgtconf->mtsc_tgt);
310
311                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
312                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
313                 OBD_FREE_PTR(tgtconf);
314         }
315
316         /* free general rules */
317         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
318 }
319
320 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
321 {
322         struct fs_db *fsdb;
323         struct list_head *tmp;
324
325         list_for_each(tmp, &mgs->mgs_fs_db_list) {
326                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
327                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
328                         return fsdb;
329         }
330         return NULL;
331 }
332
333 /* caller must hold the mgs->mgs_fs_db_lock */
334 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
335                                   struct mgs_device *mgs, char *fsname)
336 {
337         struct fs_db *fsdb;
338         int rc;
339         ENTRY;
340
341         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
342                 CERROR("fsname %s is too long\n", fsname);
343                 RETURN(ERR_PTR(-EINVAL));
344         }
345
346         OBD_ALLOC_PTR(fsdb);
347         if (!fsdb)
348                 RETURN(ERR_PTR(-ENOMEM));
349
350         strcpy(fsdb->fsdb_name, fsname);
351         mutex_init(&fsdb->fsdb_mutex);
352         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
353         fsdb->fsdb_gen = 1;
354
355         if (strcmp(fsname, MGSSELF_NAME) == 0) {
356                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
357                 fsdb->fsdb_mgs = mgs;
358         } else {
359                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
360                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
361                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
362                         CERROR("No memory for index maps\n");
363                         GOTO(err, rc = -ENOMEM);
364                 }
365
366                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
367                 if (rc)
368                         GOTO(err, rc);
369                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
370                 if (rc)
371                         GOTO(err, rc);
372
373                 /* initialise data for NID table */
374                 mgs_ir_init_fs(env, mgs, fsdb);
375
376                 lproc_mgs_add_live(mgs, fsdb);
377         }
378
379         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
380
381         RETURN(fsdb);
382 err:
383         if (fsdb->fsdb_ost_index_map)
384                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
385         if (fsdb->fsdb_mdt_index_map)
386                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
387         name_destroy(&fsdb->fsdb_clilov);
388         name_destroy(&fsdb->fsdb_clilmv);
389         OBD_FREE_PTR(fsdb);
390         RETURN(ERR_PTR(rc));
391 }
392
393 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
394 {
395         /* wait for anyone with the sem */
396         mutex_lock(&fsdb->fsdb_mutex);
397         lproc_mgs_del_live(mgs, fsdb);
398         list_del(&fsdb->fsdb_list);
399
400         /* deinitialize fsr */
401         mgs_ir_fini_fs(mgs, fsdb);
402
403         if (fsdb->fsdb_ost_index_map)
404                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
405         if (fsdb->fsdb_mdt_index_map)
406                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
407         name_destroy(&fsdb->fsdb_clilov);
408         name_destroy(&fsdb->fsdb_clilmv);
409         mgs_free_fsdb_srpc(fsdb);
410         mutex_unlock(&fsdb->fsdb_mutex);
411         OBD_FREE_PTR(fsdb);
412 }
413
414 int mgs_init_fsdb_list(struct mgs_device *mgs)
415 {
416         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
417         return 0;
418 }
419
420 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
421 {
422         struct fs_db *fsdb;
423         struct list_head *tmp, *tmp2;
424
425         mutex_lock(&mgs->mgs_mutex);
426         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
427                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
428                 mgs_free_fsdb(mgs, fsdb);
429         }
430         mutex_unlock(&mgs->mgs_mutex);
431         return 0;
432 }
433
434 int mgs_find_or_make_fsdb(const struct lu_env *env,
435                           struct mgs_device *mgs, char *name,
436                           struct fs_db **dbh)
437 {
438         struct fs_db *fsdb;
439         int rc = 0;
440
441         ENTRY;
442         mutex_lock(&mgs->mgs_mutex);
443         fsdb = mgs_find_fsdb(mgs, name);
444         if (fsdb) {
445                 mutex_unlock(&mgs->mgs_mutex);
446                 *dbh = fsdb;
447                 RETURN(0);
448         }
449
450         CDEBUG(D_MGS, "Creating new db\n");
451         fsdb = mgs_new_fsdb(env, mgs, name);
452         /* lock fsdb_mutex until the db is loaded from llogs */
453         if (!IS_ERR(fsdb))
454                 mutex_lock(&fsdb->fsdb_mutex);
455         mutex_unlock(&mgs->mgs_mutex);
456         if (IS_ERR(fsdb))
457                 RETURN(PTR_ERR(fsdb));
458
459         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
460                 /* populate the db from the client llog */
461                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
462                 if (rc) {
463                         CERROR("Can't get db from client log %d\n", rc);
464                         GOTO(out_free, rc);
465                 }
466         }
467
468         /* populate srpc rules from params llog */
469         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
470         if (rc) {
471                 CERROR("Can't get db from params log %d\n", rc);
472                 GOTO(out_free, rc);
473         }
474
475         mutex_unlock(&fsdb->fsdb_mutex);
476         *dbh = fsdb;
477
478         RETURN(0);
479
480 out_free:
481         mutex_unlock(&fsdb->fsdb_mutex);
482         mgs_free_fsdb(mgs, fsdb);
483         return rc;
484 }
485
486 /* 1 = index in use
487    0 = index unused
488    -1= empty client log */
489 int mgs_check_index(const struct lu_env *env,
490                     struct mgs_device *mgs,
491                     struct mgs_target_info *mti)
492 {
493         struct fs_db *fsdb;
494         void *imap;
495         int rc = 0;
496         ENTRY;
497
498         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
499
500         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
501         if (rc) {
502                 CERROR("Can't get db for %s\n", mti->mti_fsname);
503                 RETURN(rc);
504         }
505
506         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
507                 RETURN(-1);
508
509         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
510                 imap = fsdb->fsdb_ost_index_map;
511         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
512                 imap = fsdb->fsdb_mdt_index_map;
513         else
514                 RETURN(-EINVAL);
515
516         if (test_bit(mti->mti_stripe_index, imap))
517                 RETURN(1);
518         RETURN(0);
519 }
520
521 static __inline__ int next_index(void *index_map, int map_len)
522 {
523         int i;
524         for (i = 0; i < map_len * 8; i++)
525                 if (!test_bit(i, index_map)) {
526                          return i;
527                  }
528         CERROR("max index %d exceeded.\n", i);
529         return -1;
530 }
531
532 /* Return codes:
533         0  newly marked as in use
534         <0 err
535         +EALREADY for update of an old index */
536 static int mgs_set_index(const struct lu_env *env,
537                          struct mgs_device *mgs,
538                          struct mgs_target_info *mti)
539 {
540         struct fs_db *fsdb;
541         void *imap;
542         int rc = 0;
543         ENTRY;
544
545         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
546         if (rc) {
547                 CERROR("Can't get db for %s\n", mti->mti_fsname);
548                 RETURN(rc);
549         }
550
551         mutex_lock(&fsdb->fsdb_mutex);
552         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
553                 imap = fsdb->fsdb_ost_index_map;
554         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
555                 imap = fsdb->fsdb_mdt_index_map;
556         } else {
557                 GOTO(out_up, rc = -EINVAL);
558         }
559
560         if (mti->mti_flags & LDD_F_NEED_INDEX) {
561                 rc = next_index(imap, INDEX_MAP_SIZE);
562                 if (rc == -1)
563                         GOTO(out_up, rc = -ERANGE);
564                 mti->mti_stripe_index = rc;
565                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
566                         fsdb->fsdb_mdt_count ++;
567         }
568
569         /* the last index(0xffff) is reserved for default value. */
570         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
571                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
572                                    "but index must be less than %u.\n",
573                                    mti->mti_svname, mti->mti_stripe_index,
574                                    INDEX_MAP_SIZE * 8 - 1);
575                 GOTO(out_up, rc = -ERANGE);
576         }
577
578         if (test_bit(mti->mti_stripe_index, imap)) {
579                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
580                     !(mti->mti_flags & LDD_F_WRITECONF)) {
581                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
582                                            "%d, but that index is already in "
583                                            "use. Use --writeconf to force\n",
584                                            mti->mti_svname,
585                                            mti->mti_stripe_index);
586                         GOTO(out_up, rc = -EADDRINUSE);
587                 } else {
588                         CDEBUG(D_MGS, "Server %s updating index %d\n",
589                                mti->mti_svname, mti->mti_stripe_index);
590                         GOTO(out_up, rc = EALREADY);
591                 }
592         }
593
594         set_bit(mti->mti_stripe_index, imap);
595         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
596         mutex_unlock(&fsdb->fsdb_mutex);
597         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
598                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
599
600         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
601                mti->mti_stripe_index);
602
603         RETURN(0);
604 out_up:
605         mutex_unlock(&fsdb->fsdb_mutex);
606         return rc;
607 }
608
609 struct mgs_modify_lookup {
610         struct cfg_marker mml_marker;
611         int               mml_modified;
612 };
613
614 static int mgs_modify_handler(const struct lu_env *env,
615                               struct llog_handle *llh,
616                               struct llog_rec_hdr *rec, void *data)
617 {
618         struct mgs_modify_lookup *mml = data;
619         struct cfg_marker *marker;
620         struct lustre_cfg *lcfg = REC_DATA(rec);
621         int cfg_len = REC_DATA_LEN(rec);
622         int rc;
623         ENTRY;
624
625         if (rec->lrh_type != OBD_CFG_REC) {
626                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
627                 RETURN(-EINVAL);
628         }
629
630         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
631         if (rc) {
632                 CERROR("Insane cfg\n");
633                 RETURN(rc);
634         }
635
636         /* We only care about markers */
637         if (lcfg->lcfg_command != LCFG_MARKER)
638                 RETURN(0);
639
640         marker = lustre_cfg_buf(lcfg, 1);
641         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
642             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
643             !(marker->cm_flags & CM_SKIP)) {
644                 /* Found a non-skipped marker match */
645                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
646                        rec->lrh_index, marker->cm_step,
647                        marker->cm_flags, mml->mml_marker.cm_flags,
648                        marker->cm_tgtname, marker->cm_comment);
649                 /* Overwrite the old marker llog entry */
650                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
651                 marker->cm_flags |= mml->mml_marker.cm_flags;
652                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
653                 rc = llog_write(env, llh, rec, rec->lrh_index);
654                 if (!rc)
655                          mml->mml_modified++;
656         }
657
658         RETURN(rc);
659 }
660
661 /**
662  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
663  * Return code:
664  * 0 - modified successfully,
665  * 1 - no modification was done
666  * negative - error
667  */
668 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
669                       struct fs_db *fsdb, struct mgs_target_info *mti,
670                       char *logname, char *devname, char *comment, int flags)
671 {
672         struct llog_handle *loghandle;
673         struct llog_ctxt *ctxt;
674         struct mgs_modify_lookup *mml;
675         int rc;
676
677         ENTRY;
678
679         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
680         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
681                flags);
682
683         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
684         LASSERT(ctxt != NULL);
685         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
686         if (rc < 0) {
687                 if (rc == -ENOENT)
688                         rc = 0;
689                 GOTO(out_pop, rc);
690         }
691
692         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
693         if (rc)
694                 GOTO(out_close, rc);
695
696         if (llog_get_size(loghandle) <= 1)
697                 GOTO(out_close, rc = 0);
698
699         OBD_ALLOC_PTR(mml);
700         if (!mml)
701                 GOTO(out_close, rc = -ENOMEM);
702         if (strlcpy(mml->mml_marker.cm_comment, comment,
703                     sizeof(mml->mml_marker.cm_comment)) >=
704             sizeof(mml->mml_marker.cm_comment))
705                 GOTO(out_free, rc = -E2BIG);
706         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
707                     sizeof(mml->mml_marker.cm_tgtname)) >=
708             sizeof(mml->mml_marker.cm_tgtname))
709                 GOTO(out_free, rc = -E2BIG);
710         /* Modify mostly means cancel */
711         mml->mml_marker.cm_flags = flags;
712         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
713         mml->mml_modified = 0;
714         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
715                           NULL);
716         if (!rc && !mml->mml_modified)
717                 rc = 1;
718
719 out_free:
720         OBD_FREE_PTR(mml);
721
722 out_close:
723         llog_close(env, loghandle);
724 out_pop:
725         if (rc < 0)
726                 CERROR("%s: modify %s/%s failed: rc = %d\n",
727                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
728         llog_ctxt_put(ctxt);
729         RETURN(rc);
730 }
731
732 /** This structure is passed to mgs_replace_handler */
733 struct mgs_replace_uuid_lookup {
734         /* Nids are replaced for this target device */
735         struct mgs_target_info target;
736         /* Temporary modified llog */
737         struct llog_handle *temp_llh;
738         /* Flag is set if in target block*/
739         int in_target_device;
740         /* Nids already added. Just skip (multiple nids) */
741         int device_nids_added;
742         /* Flag is set if this block should not be copied */
743         int skip_it;
744 };
745
746 /**
747  * Check: a) if block should be skipped
748  * b) is it target block
749  *
750  * \param[in] lcfg
751  * \param[in] mrul
752  *
753  * \retval 0 should not to be skipped
754  * \retval 1 should to be skipped
755  */
756 static int check_markers(struct lustre_cfg *lcfg,
757                          struct mgs_replace_uuid_lookup *mrul)
758 {
759          struct cfg_marker *marker;
760
761         /* Track markers. Find given device */
762         if (lcfg->lcfg_command == LCFG_MARKER) {
763                 marker = lustre_cfg_buf(lcfg, 1);
764                 /* Clean llog from records marked as CM_EXCLUDE.
765                    CM_SKIP records are used for "active" command
766                    and can be restored if needed */
767                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
768                     (CM_EXCLUDE | CM_START)) {
769                         mrul->skip_it = 1;
770                         return 1;
771                 }
772
773                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
774                     (CM_EXCLUDE | CM_END)) {
775                         mrul->skip_it = 0;
776                         return 1;
777                 }
778
779                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
780                         LASSERT(!(marker->cm_flags & CM_START) ||
781                                 !(marker->cm_flags & CM_END));
782                         if (marker->cm_flags & CM_START) {
783                                 mrul->in_target_device = 1;
784                                 mrul->device_nids_added = 0;
785                         } else if (marker->cm_flags & CM_END)
786                                 mrul->in_target_device = 0;
787                 }
788         }
789
790         return 0;
791 }
792
793 static int record_base(const struct lu_env *env, struct llog_handle *llh,
794                      char *cfgname, lnet_nid_t nid, int cmd,
795                      char *s1, char *s2, char *s3, char *s4)
796 {
797         struct mgs_thread_info  *mgi = mgs_env_info(env);
798         struct llog_cfg_rec     *lcr;
799         int rc;
800
801         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
802                cmd, s1, s2, s3, s4);
803
804         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
805         if (s1)
806                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
807         if (s2)
808                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
809         if (s3)
810                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
811         if (s4)
812                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
813
814         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
815         if (lcr == NULL)
816                 return -ENOMEM;
817
818         lcr->lcr_cfg.lcfg_nid = nid;
819         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
820
821         lustre_cfg_rec_free(lcr);
822
823         if (rc < 0)
824                 CDEBUG(D_MGS,
825                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
826                        cfgname, cmd, s1, s2, s3, s4, rc);
827         return rc;
828 }
829
830 static inline int record_add_uuid(const struct lu_env *env,
831                                   struct llog_handle *llh,
832                                   uint64_t nid, char *uuid)
833 {
834         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
835                            NULL, NULL, NULL);
836 }
837
838 static inline int record_add_conn(const struct lu_env *env,
839                                   struct llog_handle *llh,
840                                   char *devname, char *uuid)
841 {
842         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
843                            NULL, NULL, NULL);
844 }
845
846 static inline int record_attach(const struct lu_env *env,
847                                 struct llog_handle *llh, char *devname,
848                                 char *type, char *uuid)
849 {
850         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
851                            NULL, NULL);
852 }
853
854 static inline int record_setup(const struct lu_env *env,
855                                struct llog_handle *llh, char *devname,
856                                char *s1, char *s2, char *s3, char *s4)
857 {
858         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
859 }
860
861 /**
862  * \retval <0 record processing error
863  * \retval n record is processed. No need copy original one.
864  * \retval 0 record is not processed.
865  */
866 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
867                            struct mgs_replace_uuid_lookup *mrul)
868 {
869         int nids_added = 0;
870         lnet_nid_t nid;
871         char *ptr;
872         int rc;
873
874         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
875                 /* LCFG_ADD_UUID command found. Let's skip original command
876                    and add passed nids */
877                 ptr = mrul->target.mti_params;
878                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
879                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
880                                "device %s\n", libcfs_nid2str(nid),
881                                 mrul->target.mti_params,
882                                 mrul->target.mti_svname);
883                         rc = record_add_uuid(env,
884                                              mrul->temp_llh, nid,
885                                              mrul->target.mti_params);
886                         if (!rc)
887                                 nids_added++;
888                 }
889
890                 if (nids_added == 0) {
891                         CERROR("No new nids were added, nid %s with uuid %s, "
892                                "device %s\n", libcfs_nid2str(nid),
893                                mrul->target.mti_params,
894                                mrul->target.mti_svname);
895                         RETURN(-ENXIO);
896                 } else {
897                         mrul->device_nids_added = 1;
898                 }
899
900                 return nids_added;
901         }
902
903         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
904                 /* LCFG_SETUP command found. UUID should be changed */
905                 rc = record_setup(env,
906                                   mrul->temp_llh,
907                                   /* devname the same */
908                                   lustre_cfg_string(lcfg, 0),
909                                   /* s1 is not changed */
910                                   lustre_cfg_string(lcfg, 1),
911                                   /* new uuid should be
912                                   the full nidlist */
913                                   mrul->target.mti_params,
914                                   /* s3 is not changed */
915                                   lustre_cfg_string(lcfg, 3),
916                                   /* s4 is not changed */
917                                   lustre_cfg_string(lcfg, 4));
918                 return rc ? rc : 1;
919         }
920
921         /* Another commands in target device block */
922         return 0;
923 }
924
925 /**
926  * Handler that called for every record in llog.
927  * Records are processed in order they placed in llog.
928  *
929  * \param[in] llh       log to be processed
930  * \param[in] rec       current record
931  * \param[in] data      mgs_replace_uuid_lookup structure
932  *
933  * \retval 0    success
934  */
935 static int mgs_replace_handler(const struct lu_env *env,
936                                struct llog_handle *llh,
937                                struct llog_rec_hdr *rec,
938                                void *data)
939 {
940         struct mgs_replace_uuid_lookup *mrul;
941         struct lustre_cfg *lcfg = REC_DATA(rec);
942         int cfg_len = REC_DATA_LEN(rec);
943         int rc;
944         ENTRY;
945
946         mrul = (struct mgs_replace_uuid_lookup *)data;
947
948         if (rec->lrh_type != OBD_CFG_REC) {
949                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
950                        rec->lrh_type, lcfg->lcfg_command,
951                        lustre_cfg_string(lcfg, 0),
952                        lustre_cfg_string(lcfg, 1));
953                 RETURN(-EINVAL);
954         }
955
956         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
957         if (rc) {
958                 /* Do not copy any invalidated records */
959                 GOTO(skip_out, rc = 0);
960         }
961
962         rc = check_markers(lcfg, mrul);
963         if (rc || mrul->skip_it)
964                 GOTO(skip_out, rc = 0);
965
966         /* Write to new log all commands outside target device block */
967         if (!mrul->in_target_device)
968                 GOTO(copy_out, rc = 0);
969
970         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
971            (failover nids) for this target, assuming that if then
972            primary is changing then so is the failover */
973         if (mrul->device_nids_added &&
974             (lcfg->lcfg_command == LCFG_ADD_UUID ||
975              lcfg->lcfg_command == LCFG_ADD_CONN))
976                 GOTO(skip_out, rc = 0);
977
978         rc = process_command(env, lcfg, mrul);
979         if (rc < 0)
980                 RETURN(rc);
981
982         if (rc)
983                 RETURN(0);
984 copy_out:
985         /* Record is placed in temporary llog as is */
986         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
987
988         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
989                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
990                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
991         RETURN(rc);
992
993 skip_out:
994         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
995                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
996                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
997         RETURN(rc);
998 }
999
1000 static int mgs_log_is_empty(const struct lu_env *env,
1001                             struct mgs_device *mgs, char *name)
1002 {
1003         struct llog_ctxt        *ctxt;
1004         int                      rc;
1005
1006         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1007         LASSERT(ctxt != NULL);
1008
1009         rc = llog_is_empty(env, ctxt, name);
1010         llog_ctxt_put(ctxt);
1011         return rc;
1012 }
1013
1014 static int mgs_replace_nids_log(const struct lu_env *env,
1015                                 struct obd_device *mgs, struct fs_db *fsdb,
1016                                 char *logname, char *devname, char *nids)
1017 {
1018         struct llog_handle *orig_llh, *backup_llh;
1019         struct llog_ctxt *ctxt;
1020         struct mgs_replace_uuid_lookup *mrul;
1021         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1022         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1023         char *backup;
1024         int rc, rc2;
1025         ENTRY;
1026
1027         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1028
1029         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1030         LASSERT(ctxt != NULL);
1031
1032         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1033                 /* Log is empty. Nothing to replace */
1034                 GOTO(out_put, rc = 0);
1035         }
1036
1037         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1038         if (backup == NULL)
1039                 GOTO(out_put, rc = -ENOMEM);
1040
1041         sprintf(backup, "%s.bak", logname);
1042
1043         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1044         if (rc == 0) {
1045                 /* Now erase original log file. Connections are not allowed.
1046                    Backup is already saved */
1047                 rc = llog_erase(env, ctxt, NULL, logname);
1048                 if (rc < 0)
1049                         GOTO(out_free, rc);
1050         } else if (rc != -ENOENT) {
1051                 CERROR("%s: can't make backup for %s: rc = %d\n",
1052                        mgs->obd_name, logname, rc);
1053                 GOTO(out_free,rc);
1054         }
1055
1056         /* open local log */
1057         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1058         if (rc)
1059                 GOTO(out_restore, rc);
1060
1061         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1062         if (rc)
1063                 GOTO(out_closel, rc);
1064
1065         /* open backup llog */
1066         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1067                        LLOG_OPEN_EXISTS);
1068         if (rc)
1069                 GOTO(out_closel, rc);
1070
1071         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1072         if (rc)
1073                 GOTO(out_close, rc);
1074
1075         if (llog_get_size(backup_llh) <= 1)
1076                 GOTO(out_close, rc = 0);
1077
1078         OBD_ALLOC_PTR(mrul);
1079         if (!mrul)
1080                 GOTO(out_close, rc = -ENOMEM);
1081         /* devname is only needed information to replace UUID records */
1082         strlcpy(mrul->target.mti_svname, devname,
1083                 sizeof(mrul->target.mti_svname));
1084         /* parse nids later */
1085         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1086         /* Copy records to this temporary llog */
1087         mrul->temp_llh = orig_llh;
1088
1089         rc = llog_process(env, backup_llh, mgs_replace_handler,
1090                           (void *)mrul, NULL);
1091         OBD_FREE_PTR(mrul);
1092 out_close:
1093         rc2 = llog_close(NULL, backup_llh);
1094         if (!rc)
1095                 rc = rc2;
1096 out_closel:
1097         rc2 = llog_close(NULL, orig_llh);
1098         if (!rc)
1099                 rc = rc2;
1100
1101 out_restore:
1102         if (rc) {
1103                 CERROR("%s: llog should be restored: rc = %d\n",
1104                        mgs->obd_name, rc);
1105                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1106                                   logname);
1107                 if (rc2 < 0)
1108                         CERROR("%s: can't restore backup %s: rc = %d\n",
1109                                mgs->obd_name, logname, rc2);
1110         }
1111
1112 out_free:
1113         OBD_FREE(backup, strlen(backup) + 1);
1114
1115 out_put:
1116         llog_ctxt_put(ctxt);
1117
1118         if (rc)
1119                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1120                        mgs->obd_name, logname, rc);
1121
1122         RETURN(rc);
1123 }
1124
1125 /**
1126  * Parse device name and get file system name and/or device index
1127  *
1128  * \param[in]   devname device name (ex. lustre-MDT0000)
1129  * \param[out]  fsname  file system name(optional)
1130  * \param[out]  index   device index(optional)
1131  *
1132  * \retval 0    success
1133  */
1134 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1135 {
1136         int rc;
1137         ENTRY;
1138
1139         /* Extract fsname */
1140         if (fsname) {
1141                 rc = server_name2fsname(devname, fsname, NULL);
1142                 if (rc < 0) {
1143                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1144                                devname);
1145                         RETURN(-EINVAL);
1146                 }
1147         }
1148
1149         if (index) {
1150                 rc = server_name2index(devname, index, NULL);
1151                 if (rc < 0) {
1152                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1153                                devname);
1154                         RETURN(-EINVAL);
1155                 }
1156         }
1157
1158         RETURN(0);
1159 }
1160
1161 /* This is only called during replace_nids */
1162 static int only_mgs_is_running(struct obd_device *mgs_obd)
1163 {
1164         /* TDB: Is global variable with devices count exists? */
1165         int num_devices = get_devices_count();
1166         int num_exports = 0;
1167         struct obd_export *exp;
1168
1169         spin_lock(&mgs_obd->obd_dev_lock);
1170         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1171                 /* skip self export */
1172                 if (exp == mgs_obd->obd_self_export)
1173                         continue;
1174                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1175                         continue;
1176
1177                 ++num_exports;
1178
1179                 CERROR("%s: node %s still connected during replace_nids "
1180                        "connect_flags:%llx\n",
1181                        mgs_obd->obd_name,
1182                        libcfs_nid2str(exp->exp_nid_stats->nid),
1183                        exp_connect_flags(exp));
1184
1185         }
1186         spin_unlock(&mgs_obd->obd_dev_lock);
1187
1188         /* osd, MGS and MGC + self_export
1189            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1190         return (num_devices <= 3) && (num_exports == 0);
1191 }
1192
1193 static int name_create_mdt(char **logname, char *fsname, int i)
1194 {
1195         char mdt_index[9];
1196
1197         sprintf(mdt_index, "-MDT%04x", i);
1198         return name_create(logname, fsname, mdt_index);
1199 }
1200
1201 /**
1202  * Replace nids for \a device to \a nids values
1203  *
1204  * \param obd           MGS obd device
1205  * \param devname       nids need to be replaced for this device
1206  * (ex. lustre-OST0000)
1207  * \param nids          nids list (ex. nid1,nid2,nid3)
1208  *
1209  * \retval 0    success
1210  */
1211 int mgs_replace_nids(const struct lu_env *env,
1212                      struct mgs_device *mgs,
1213                      char *devname, char *nids)
1214 {
1215         /* Assume fsname is part of device name */
1216         char fsname[MTI_NAME_MAXLEN];
1217         int rc;
1218         __u32 index;
1219         char *logname;
1220         struct fs_db *fsdb;
1221         unsigned int i;
1222         int conn_state;
1223         struct obd_device *mgs_obd = mgs->mgs_obd;
1224         ENTRY;
1225
1226         /* We can only change NIDs if no other nodes are connected */
1227         spin_lock(&mgs_obd->obd_dev_lock);
1228         conn_state = mgs_obd->obd_no_conn;
1229         mgs_obd->obd_no_conn = 1;
1230         spin_unlock(&mgs_obd->obd_dev_lock);
1231
1232         /* We can not change nids if not only MGS is started */
1233         if (!only_mgs_is_running(mgs_obd)) {
1234                 CERROR("Only MGS is allowed to be started\n");
1235                 GOTO(out, rc = -EINPROGRESS);
1236         }
1237
1238         /* Get fsname and index*/
1239         rc = mgs_parse_devname(devname, fsname, &index);
1240         if (rc)
1241                 GOTO(out, rc);
1242
1243         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1244         if (rc) {
1245                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1246                 GOTO(out, rc);
1247         }
1248
1249         /* Process client llogs */
1250         name_create(&logname, fsname, "-client");
1251         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1252         name_destroy(&logname);
1253         if (rc) {
1254                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1255                        fsname, devname, rc);
1256                 GOTO(out, rc);
1257         }
1258
1259         /* Process MDT llogs */
1260         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1261                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1262                         continue;
1263                 name_create_mdt(&logname, fsname, i);
1264                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1265                 name_destroy(&logname);
1266                 if (rc)
1267                         GOTO(out, rc);
1268         }
1269
1270 out:
1271         spin_lock(&mgs_obd->obd_dev_lock);
1272         mgs_obd->obd_no_conn = conn_state;
1273         spin_unlock(&mgs_obd->obd_dev_lock);
1274
1275         RETURN(rc);
1276 }
1277
1278 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1279                             char *devname, struct lov_desc *desc)
1280 {
1281         struct mgs_thread_info  *mgi = mgs_env_info(env);
1282         struct llog_cfg_rec     *lcr;
1283         int rc;
1284
1285         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1286         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1287         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1288         if (lcr == NULL)
1289                 return -ENOMEM;
1290
1291         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1292         lustre_cfg_rec_free(lcr);
1293         return rc;
1294 }
1295
1296 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1297                             char *devname, struct lmv_desc *desc)
1298 {
1299         struct mgs_thread_info  *mgi = mgs_env_info(env);
1300         struct llog_cfg_rec     *lcr;
1301         int rc;
1302
1303         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1304         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1305         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1306         if (lcr == NULL)
1307                 return -ENOMEM;
1308
1309         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1310         lustre_cfg_rec_free(lcr);
1311         return rc;
1312 }
1313
1314 static inline int record_mdc_add(const struct lu_env *env,
1315                                  struct llog_handle *llh,
1316                                  char *logname, char *mdcuuid,
1317                                  char *mdtuuid, char *index,
1318                                  char *gen)
1319 {
1320         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1321                            mdtuuid,index,gen,mdcuuid);
1322 }
1323
1324 static inline int record_lov_add(const struct lu_env *env,
1325                                  struct llog_handle *llh,
1326                                  char *lov_name, char *ost_uuid,
1327                                  char *index, char *gen)
1328 {
1329         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1330                            ost_uuid, index, gen, NULL);
1331 }
1332
1333 static inline int record_mount_opt(const struct lu_env *env,
1334                                    struct llog_handle *llh,
1335                                    char *profile, char *lov_name,
1336                                    char *mdc_name)
1337 {
1338         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1339                            profile, lov_name, mdc_name, NULL);
1340 }
1341
1342 static int record_marker(const struct lu_env *env,
1343                          struct llog_handle *llh,
1344                          struct fs_db *fsdb, __u32 flags,
1345                          char *tgtname, char *comment)
1346 {
1347         struct mgs_thread_info *mgi = mgs_env_info(env);
1348         struct llog_cfg_rec *lcr;
1349         int rc;
1350         int cplen = 0;
1351
1352         if (flags & CM_START)
1353                 fsdb->fsdb_gen++;
1354         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1355         mgi->mgi_marker.cm_flags = flags;
1356         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1357         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1358                         sizeof(mgi->mgi_marker.cm_tgtname));
1359         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1360                 return -E2BIG;
1361         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1362                         sizeof(mgi->mgi_marker.cm_comment));
1363         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1364                 return -E2BIG;
1365         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1366         mgi->mgi_marker.cm_canceltime = 0;
1367         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1368         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1369                             sizeof(mgi->mgi_marker));
1370         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1371         if (lcr == NULL)
1372                 return -ENOMEM;
1373
1374         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1375         lustre_cfg_rec_free(lcr);
1376         return rc;
1377 }
1378
1379 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1380                             struct llog_handle **llh, char *name)
1381 {
1382         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1383         struct llog_ctxt        *ctxt;
1384         int                      rc = 0;
1385         ENTRY;
1386
1387         if (*llh)
1388                 GOTO(out, rc = -EBUSY);
1389
1390         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1391         if (!ctxt)
1392                 GOTO(out, rc = -ENODEV);
1393         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1394
1395         rc = llog_open_create(env, ctxt, llh, NULL, name);
1396         if (rc)
1397                 GOTO(out_ctxt, rc);
1398         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1399         if (rc)
1400                 llog_close(env, *llh);
1401 out_ctxt:
1402         llog_ctxt_put(ctxt);
1403 out:
1404         if (rc) {
1405                 CERROR("%s: can't start log %s: rc = %d\n",
1406                        mgs->mgs_obd->obd_name, name, rc);
1407                 *llh = NULL;
1408         }
1409         RETURN(rc);
1410 }
1411
1412 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1413 {
1414         int rc;
1415
1416         rc = llog_close(env, *llh);
1417         *llh = NULL;
1418
1419         return rc;
1420 }
1421
1422 /******************** config "macros" *********************/
1423
1424 /* write an lcfg directly into a log (with markers) */
1425 static int mgs_write_log_direct(const struct lu_env *env,
1426                                 struct mgs_device *mgs, struct fs_db *fsdb,
1427                                 char *logname, struct llog_cfg_rec *lcr,
1428                                 char *devname, char *comment)
1429 {
1430         struct llog_handle *llh = NULL;
1431         int rc;
1432
1433         ENTRY;
1434
1435         rc = record_start_log(env, mgs, &llh, logname);
1436         if (rc)
1437                 RETURN(rc);
1438
1439         /* FIXME These should be a single journal transaction */
1440         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1441         if (rc)
1442                 GOTO(out_end, rc);
1443         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1444         if (rc)
1445                 GOTO(out_end, rc);
1446         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1447         if (rc)
1448                 GOTO(out_end, rc);
1449 out_end:
1450         record_end_log(env, &llh);
1451         RETURN(rc);
1452 }
1453
1454 /* write the lcfg in all logs for the given fs */
1455 static int mgs_write_log_direct_all(const struct lu_env *env,
1456                                     struct mgs_device *mgs,
1457                                     struct fs_db *fsdb,
1458                                     struct mgs_target_info *mti,
1459                                     struct llog_cfg_rec *lcr, char *devname,
1460                                     char *comment, int server_only)
1461 {
1462         struct list_head         log_list;
1463         struct mgs_direntry     *dirent, *n;
1464         char                    *fsname = mti->mti_fsname;
1465         int                      rc = 0, len = strlen(fsname);
1466
1467         ENTRY;
1468         /* Find all the logs in the CONFIGS directory */
1469         rc = class_dentry_readdir(env, mgs, &log_list);
1470         if (rc)
1471                 RETURN(rc);
1472
1473         /* Could use fsdb index maps instead of directory listing */
1474         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1475                 list_del_init(&dirent->mde_list);
1476                 /* don't write to sptlrpc rule log */
1477                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1478                         goto next;
1479
1480                 /* caller wants write server logs only */
1481                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1482                         goto next;
1483
1484                 if (strlen(dirent->mde_name) <= len ||
1485                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1486                     dirent->mde_name[len] != '-')
1487                         goto next;
1488
1489                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1490                 /* Erase any old settings of this same parameter */
1491                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1492                                 devname, comment, CM_SKIP);
1493                 if (rc < 0)
1494                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1495                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1496                 if (lcr == NULL)
1497                         goto next;
1498                 /* Write the new one */
1499                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1500                                           lcr, devname, comment);
1501                 if (rc != 0)
1502                         CERROR("%s: writing log %s: rc = %d\n",
1503                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1504 next:
1505                 mgs_direntry_free(dirent);
1506         }
1507
1508         RETURN(rc);
1509 }
1510
1511 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1512                                     struct mgs_device *mgs,
1513                                     struct fs_db *fsdb,
1514                                     struct mgs_target_info *mti,
1515                                     int index, char *logname);
1516 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1517                                     struct mgs_device *mgs,
1518                                     struct fs_db *fsdb,
1519                                     struct mgs_target_info *mti,
1520                                     char *logname, char *suffix, char *lovname,
1521                                     enum lustre_sec_part sec_part, int flags);
1522 static int name_create_mdt_and_lov(char **logname, char **lovname,
1523                                    struct fs_db *fsdb, int i);
1524
1525 static int add_param(char *params, char *key, char *val)
1526 {
1527         char *start = params + strlen(params);
1528         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1529         int keylen = 0;
1530
1531         if (key != NULL)
1532                 keylen = strlen(key);
1533         if (start + 1 + keylen + strlen(val) >= end) {
1534                 CERROR("params are too long: %s %s%s\n",
1535                        params, key != NULL ? key : "", val);
1536                 return -EINVAL;
1537         }
1538
1539         sprintf(start, " %s%s", key != NULL ? key : "", val);
1540         return 0;
1541 }
1542
1543 /**
1544  * Walk through client config log record and convert the related records
1545  * into the target.
1546  **/
1547 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1548                                          struct llog_handle *llh,
1549                                          struct llog_rec_hdr *rec, void *data)
1550 {
1551         struct mgs_device *mgs;
1552         struct obd_device *obd;
1553         struct mgs_target_info *mti, *tmti;
1554         struct fs_db *fsdb;
1555         int cfg_len = rec->lrh_len;
1556         char *cfg_buf = (char*) (rec + 1);
1557         struct lustre_cfg *lcfg;
1558         int rc = 0;
1559         struct llog_handle *mdt_llh = NULL;
1560         static int got_an_osc_or_mdc = 0;
1561         /* 0: not found any osc/mdc;
1562            1: found osc;
1563            2: found mdc;
1564         */
1565         static int last_step = -1;
1566         int cplen = 0;
1567
1568         ENTRY;
1569
1570         mti = ((struct temp_comp*)data)->comp_mti;
1571         tmti = ((struct temp_comp*)data)->comp_tmti;
1572         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1573         obd = ((struct temp_comp *)data)->comp_obd;
1574         mgs = lu2mgs_dev(obd->obd_lu_dev);
1575         LASSERT(mgs);
1576
1577         if (rec->lrh_type != OBD_CFG_REC) {
1578                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1579                 RETURN(-EINVAL);
1580         }
1581
1582         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1583         if (rc) {
1584                 CERROR("Insane cfg\n");
1585                 RETURN(rc);
1586         }
1587
1588         lcfg = (struct lustre_cfg *)cfg_buf;
1589
1590         if (lcfg->lcfg_command == LCFG_MARKER) {
1591                 struct cfg_marker *marker;
1592                 marker = lustre_cfg_buf(lcfg, 1);
1593                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1594                     (marker->cm_flags & CM_START) &&
1595                      !(marker->cm_flags & CM_SKIP)) {
1596                         got_an_osc_or_mdc = 1;
1597                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1598                                         sizeof(tmti->mti_svname));
1599                         if (cplen >= sizeof(tmti->mti_svname))
1600                                 RETURN(-E2BIG);
1601                         rc = record_start_log(env, mgs, &mdt_llh,
1602                                               mti->mti_svname);
1603                         if (rc)
1604                                 RETURN(rc);
1605                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1606                                            mti->mti_svname, "add osc(copied)");
1607                         record_end_log(env, &mdt_llh);
1608                         last_step = marker->cm_step;
1609                         RETURN(rc);
1610                 }
1611                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1612                     (marker->cm_flags & CM_END) &&
1613                      !(marker->cm_flags & CM_SKIP)) {
1614                         LASSERT(last_step == marker->cm_step);
1615                         last_step = -1;
1616                         got_an_osc_or_mdc = 0;
1617                         memset(tmti, 0, sizeof(*tmti));
1618                         rc = record_start_log(env, mgs, &mdt_llh,
1619                                               mti->mti_svname);
1620                         if (rc)
1621                                 RETURN(rc);
1622                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1623                                            mti->mti_svname, "add osc(copied)");
1624                         record_end_log(env, &mdt_llh);
1625                         RETURN(rc);
1626                 }
1627                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1628                     (marker->cm_flags & CM_START) &&
1629                      !(marker->cm_flags & CM_SKIP)) {
1630                         got_an_osc_or_mdc = 2;
1631                         last_step = marker->cm_step;
1632                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1633                                strlen(marker->cm_tgtname));
1634
1635                         RETURN(rc);
1636                 }
1637                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1638                     (marker->cm_flags & CM_END) &&
1639                      !(marker->cm_flags & CM_SKIP)) {
1640                         LASSERT(last_step == marker->cm_step);
1641                         last_step = -1;
1642                         got_an_osc_or_mdc = 0;
1643                         memset(tmti, 0, sizeof(*tmti));
1644                         RETURN(rc);
1645                 }
1646         }
1647
1648         if (got_an_osc_or_mdc == 0 || last_step < 0)
1649                 RETURN(rc);
1650
1651         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1652                 __u64 nodenid = lcfg->lcfg_nid;
1653
1654                 if (strlen(tmti->mti_uuid) == 0) {
1655                         /* target uuid not set, this config record is before
1656                          * LCFG_SETUP, this nid is one of target node nid.
1657                          */
1658                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1659                         tmti->mti_nid_count++;
1660                 } else {
1661                         char nidstr[LNET_NIDSTR_SIZE];
1662
1663                         /* failover node nid */
1664                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1665                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1666                                         nidstr);
1667                 }
1668
1669                 RETURN(rc);
1670         }
1671
1672         if (lcfg->lcfg_command == LCFG_SETUP) {
1673                 char *target;
1674
1675                 target = lustre_cfg_string(lcfg, 1);
1676                 memcpy(tmti->mti_uuid, target, strlen(target));
1677                 RETURN(rc);
1678         }
1679
1680         /* ignore client side sptlrpc_conf_log */
1681         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1682                 RETURN(rc);
1683
1684         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1685                 int index;
1686
1687                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1688                         RETURN (-EINVAL);
1689
1690                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1691                        strlen(mti->mti_fsname));
1692                 tmti->mti_stripe_index = index;
1693
1694                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1695                                               mti->mti_stripe_index,
1696                                               mti->mti_svname);
1697                 memset(tmti, 0, sizeof(*tmti));
1698                 RETURN(rc);
1699         }
1700
1701         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1702                 int index;
1703                 char mdt_index[9];
1704                 char *logname, *lovname;
1705
1706                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1707                                              mti->mti_stripe_index);
1708                 if (rc)
1709                         RETURN(rc);
1710                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1711
1712                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1713                         name_destroy(&logname);
1714                         name_destroy(&lovname);
1715                         RETURN(-EINVAL);
1716                 }
1717
1718                 tmti->mti_stripe_index = index;
1719                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1720                                          mdt_index, lovname,
1721                                          LUSTRE_SP_MDT, 0);
1722                 name_destroy(&logname);
1723                 name_destroy(&lovname);
1724                 RETURN(rc);
1725         }
1726         RETURN(rc);
1727 }
1728
1729 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1730 /* stealed from mgs_get_fsdb_from_llog*/
1731 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1732                                               struct mgs_device *mgs,
1733                                               char *client_name,
1734                                               struct temp_comp* comp)
1735 {
1736         struct llog_handle *loghandle;
1737         struct mgs_target_info *tmti;
1738         struct llog_ctxt *ctxt;
1739         int rc;
1740
1741         ENTRY;
1742
1743         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1744         LASSERT(ctxt != NULL);
1745
1746         OBD_ALLOC_PTR(tmti);
1747         if (tmti == NULL)
1748                 GOTO(out_ctxt, rc = -ENOMEM);
1749
1750         comp->comp_tmti = tmti;
1751         comp->comp_obd = mgs->mgs_obd;
1752
1753         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1754                        LLOG_OPEN_EXISTS);
1755         if (rc < 0) {
1756                 if (rc == -ENOENT)
1757                         rc = 0;
1758                 GOTO(out_pop, rc);
1759         }
1760
1761         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1762         if (rc)
1763                 GOTO(out_close, rc);
1764
1765         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1766                                   (void *)comp, NULL, false);
1767         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1768 out_close:
1769         llog_close(env, loghandle);
1770 out_pop:
1771         OBD_FREE_PTR(tmti);
1772 out_ctxt:
1773         llog_ctxt_put(ctxt);
1774         RETURN(rc);
1775 }
1776
1777 /* lmv is the second thing for client logs */
1778 /* copied from mgs_write_log_lov. Please refer to that.  */
1779 static int mgs_write_log_lmv(const struct lu_env *env,
1780                              struct mgs_device *mgs,
1781                              struct fs_db *fsdb,
1782                              struct mgs_target_info *mti,
1783                              char *logname, char *lmvname)
1784 {
1785         struct llog_handle *llh = NULL;
1786         struct lmv_desc *lmvdesc;
1787         char *uuid;
1788         int rc = 0;
1789         ENTRY;
1790
1791         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1792
1793         OBD_ALLOC_PTR(lmvdesc);
1794         if (lmvdesc == NULL)
1795                 RETURN(-ENOMEM);
1796         lmvdesc->ld_active_tgt_count = 0;
1797         lmvdesc->ld_tgt_count = 0;
1798         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1799         uuid = (char *)lmvdesc->ld_uuid.uuid;
1800
1801         rc = record_start_log(env, mgs, &llh, logname);
1802         if (rc)
1803                 GOTO(out_free, rc);
1804         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1805         if (rc)
1806                 GOTO(out_end, rc);
1807         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1808         if (rc)
1809                 GOTO(out_end, rc);
1810         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1811         if (rc)
1812                 GOTO(out_end, rc);
1813         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1814         if (rc)
1815                 GOTO(out_end, rc);
1816 out_end:
1817         record_end_log(env, &llh);
1818 out_free:
1819         OBD_FREE_PTR(lmvdesc);
1820         RETURN(rc);
1821 }
1822
1823 /* lov is the first thing in the mdt and client logs */
1824 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1825                              struct fs_db *fsdb, struct mgs_target_info *mti,
1826                              char *logname, char *lovname)
1827 {
1828         struct llog_handle *llh = NULL;
1829         struct lov_desc *lovdesc;
1830         char *uuid;
1831         int rc = 0;
1832         ENTRY;
1833
1834         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1835
1836         /*
1837         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1838         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1839               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1840         */
1841
1842         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1843         OBD_ALLOC_PTR(lovdesc);
1844         if (lovdesc == NULL)
1845                 RETURN(-ENOMEM);
1846         lovdesc->ld_magic = LOV_DESC_MAGIC;
1847         lovdesc->ld_tgt_count = 0;
1848         /* Defaults.  Can be changed later by lcfg config_param */
1849         lovdesc->ld_default_stripe_count = 1;
1850         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1851         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1852         lovdesc->ld_default_stripe_offset = -1;
1853         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1854         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1855         /* can these be the same? */
1856         uuid = (char *)lovdesc->ld_uuid.uuid;
1857
1858         /* This should always be the first entry in a log.
1859         rc = mgs_clear_log(obd, logname); */
1860         rc = record_start_log(env, mgs, &llh, logname);
1861         if (rc)
1862                 GOTO(out_free, rc);
1863         /* FIXME these should be a single journal transaction */
1864         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1865         if (rc)
1866                 GOTO(out_end, rc);
1867         rc = record_attach(env, llh, lovname, "lov", uuid);
1868         if (rc)
1869                 GOTO(out_end, rc);
1870         rc = record_lov_setup(env, llh, lovname, lovdesc);
1871         if (rc)
1872                 GOTO(out_end, rc);
1873         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1874         if (rc)
1875                 GOTO(out_end, rc);
1876         EXIT;
1877 out_end:
1878         record_end_log(env, &llh);
1879 out_free:
1880         OBD_FREE_PTR(lovdesc);
1881         return rc;
1882 }
1883
1884 /* add failnids to open log */
1885 static int mgs_write_log_failnids(const struct lu_env *env,
1886                                   struct mgs_target_info *mti,
1887                                   struct llog_handle *llh,
1888                                   char *cliname)
1889 {
1890         char *failnodeuuid = NULL;
1891         char *ptr = mti->mti_params;
1892         lnet_nid_t nid;
1893         int rc = 0;
1894
1895         /*
1896         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1897         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1898         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1899         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1900         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1901         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1902         */
1903
1904         /*
1905          * Pull failnid info out of params string, which may contain something
1906          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1907          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1908          * etc.  However, convert_hostnames() should have caught those.
1909          */
1910         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1911                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1912                         char nidstr[LNET_NIDSTR_SIZE];
1913
1914                         if (failnodeuuid == NULL) {
1915                                 /* We don't know the failover node name,
1916                                  * so just use the first nid as the uuid */
1917                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
1918                                 rc = name_create(&failnodeuuid, nidstr, "");
1919                                 if (rc != 0)
1920                                         return rc;
1921                         }
1922                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1923                                 "client %s\n",
1924                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
1925                                 failnodeuuid, cliname);
1926                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1927                         /*
1928                          * If *ptr is ':', we have added all NIDs for
1929                          * failnodeuuid.
1930                          */
1931                         if (*ptr == ':') {
1932                                 rc = record_add_conn(env, llh, cliname,
1933                                                      failnodeuuid);
1934                                 name_destroy(&failnodeuuid);
1935                                 failnodeuuid = NULL;
1936                         }
1937                 }
1938                 if (failnodeuuid) {
1939                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1940                         name_destroy(&failnodeuuid);
1941                         failnodeuuid = NULL;
1942                 }
1943         }
1944
1945         return rc;
1946 }
1947
1948 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1949                                     struct mgs_device *mgs,
1950                                     struct fs_db *fsdb,
1951                                     struct mgs_target_info *mti,
1952                                     char *logname, char *lmvname)
1953 {
1954         struct llog_handle *llh = NULL;
1955         char *mdcname = NULL;
1956         char *nodeuuid = NULL;
1957         char *mdcuuid = NULL;
1958         char *lmvuuid = NULL;
1959         char index[6];
1960         char nidstr[LNET_NIDSTR_SIZE];
1961         int i, rc;
1962         ENTRY;
1963
1964         if (mgs_log_is_empty(env, mgs, logname)) {
1965                 CERROR("log is empty! Logical error\n");
1966                 RETURN(-EINVAL);
1967         }
1968
1969         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1970                mti->mti_svname, logname, lmvname);
1971
1972         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
1973         rc = name_create(&nodeuuid, nidstr, "");
1974         if (rc)
1975                 RETURN(rc);
1976         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1977         if (rc)
1978                 GOTO(out_free, rc);
1979         rc = name_create(&mdcuuid, mdcname, "_UUID");
1980         if (rc)
1981                 GOTO(out_free, rc);
1982         rc = name_create(&lmvuuid, lmvname, "_UUID");
1983         if (rc)
1984                 GOTO(out_free, rc);
1985
1986         rc = record_start_log(env, mgs, &llh, logname);
1987         if (rc)
1988                 GOTO(out_free, rc);
1989         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1990                            "add mdc");
1991         if (rc)
1992                 GOTO(out_end, rc);
1993         for (i = 0; i < mti->mti_nid_count; i++) {
1994                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1995                         libcfs_nid2str_r(mti->mti_nids[i],
1996                                          nidstr, sizeof(nidstr)));
1997
1998                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1999                 if (rc)
2000                         GOTO(out_end, rc);
2001         }
2002
2003         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2004         if (rc)
2005                 GOTO(out_end, rc);
2006         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2007                           NULL, NULL);
2008         if (rc)
2009                 GOTO(out_end, rc);
2010         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2011         if (rc)
2012                 GOTO(out_end, rc);
2013         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2014         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2015                             index, "1");
2016         if (rc)
2017                 GOTO(out_end, rc);
2018         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2019                            "add mdc");
2020         if (rc)
2021                 GOTO(out_end, rc);
2022 out_end:
2023         record_end_log(env, &llh);
2024 out_free:
2025         name_destroy(&lmvuuid);
2026         name_destroy(&mdcuuid);
2027         name_destroy(&mdcname);
2028         name_destroy(&nodeuuid);
2029         RETURN(rc);
2030 }
2031
2032 static inline int name_create_lov(char **lovname, char *mdtname,
2033                                   struct fs_db *fsdb, int index)
2034 {
2035         /* COMPAT_180 */
2036         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2037                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2038         else
2039                 return name_create(lovname, mdtname, "-mdtlov");
2040 }
2041
2042 static int name_create_mdt_and_lov(char **logname, char **lovname,
2043                                    struct fs_db *fsdb, int i)
2044 {
2045         int rc;
2046
2047         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2048         if (rc)
2049                 return rc;
2050         /* COMPAT_180 */
2051         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2052                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2053         else
2054                 rc = name_create(lovname, *logname, "-mdtlov");
2055         if (rc) {
2056                 name_destroy(logname);
2057                 *logname = NULL;
2058         }
2059         return rc;
2060 }
2061
2062 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2063                                       struct fs_db *fsdb, int i)
2064 {
2065         char suffix[16];
2066
2067         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2068                 sprintf(suffix, "-osc");
2069         else
2070                 sprintf(suffix, "-osc-MDT%04x", i);
2071         return name_create(oscname, ostname, suffix);
2072 }
2073
2074 /* add new mdc to already existent MDS */
2075 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2076                                     struct mgs_device *mgs,
2077                                     struct fs_db *fsdb,
2078                                     struct mgs_target_info *mti,
2079                                     int mdt_index, char *logname)
2080 {
2081         struct llog_handle      *llh = NULL;
2082         char    *nodeuuid = NULL;
2083         char    *ospname = NULL;
2084         char    *lovuuid = NULL;
2085         char    *mdtuuid = NULL;
2086         char    *svname = NULL;
2087         char    *mdtname = NULL;
2088         char    *lovname = NULL;
2089         char    index_str[16];
2090         char    nidstr[LNET_NIDSTR_SIZE];
2091         int     i, rc;
2092
2093         ENTRY;
2094         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2095                 CERROR("log is empty! Logical error\n");
2096                 RETURN (-EINVAL);
2097         }
2098
2099         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2100                logname);
2101
2102         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2103         if (rc)
2104                 RETURN(rc);
2105
2106         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2107         rc = name_create(&nodeuuid, nidstr, "");
2108         if (rc)
2109                 GOTO(out_destory, rc);
2110
2111         rc = name_create(&svname, mdtname, "-osp");
2112         if (rc)
2113                 GOTO(out_destory, rc);
2114
2115         sprintf(index_str, "-MDT%04x", mdt_index);
2116         rc = name_create(&ospname, svname, index_str);
2117         if (rc)
2118                 GOTO(out_destory, rc);
2119
2120         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2121         if (rc)
2122                 GOTO(out_destory, rc);
2123
2124         rc = name_create(&lovuuid, lovname, "_UUID");
2125         if (rc)
2126                 GOTO(out_destory, rc);
2127
2128         rc = name_create(&mdtuuid, mdtname, "_UUID");
2129         if (rc)
2130                 GOTO(out_destory, rc);
2131
2132         rc = record_start_log(env, mgs, &llh, logname);
2133         if (rc)
2134                 GOTO(out_destory, rc);
2135
2136         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2137                            "add osp");
2138         if (rc)
2139                 GOTO(out_destory, rc);
2140
2141         for (i = 0; i < mti->mti_nid_count; i++) {
2142                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2143                         libcfs_nid2str_r(mti->mti_nids[i],
2144                                          nidstr, sizeof(nidstr)));
2145                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2146                 if (rc)
2147                         GOTO(out_end, rc);
2148         }
2149
2150         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2151         if (rc)
2152                 GOTO(out_end, rc);
2153
2154         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2155                           NULL, NULL);
2156         if (rc)
2157                 GOTO(out_end, rc);
2158
2159         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2160         if (rc)
2161                 GOTO(out_end, rc);
2162
2163         /* Add mdc(osp) to lod */
2164         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2165         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2166                          index_str, "1", NULL);
2167         if (rc)
2168                 GOTO(out_end, rc);
2169
2170         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2171         if (rc)
2172                 GOTO(out_end, rc);
2173
2174 out_end:
2175         record_end_log(env, &llh);
2176
2177 out_destory:
2178         name_destroy(&mdtuuid);
2179         name_destroy(&lovuuid);
2180         name_destroy(&lovname);
2181         name_destroy(&ospname);
2182         name_destroy(&svname);
2183         name_destroy(&nodeuuid);
2184         name_destroy(&mdtname);
2185         RETURN(rc);
2186 }
2187
2188 static int mgs_write_log_mdt0(const struct lu_env *env,
2189                               struct mgs_device *mgs,
2190                               struct fs_db *fsdb,
2191                               struct mgs_target_info *mti)
2192 {
2193         char *log = mti->mti_svname;
2194         struct llog_handle *llh = NULL;
2195         char *uuid, *lovname;
2196         char mdt_index[6];
2197         char *ptr = mti->mti_params;
2198         int rc = 0, failout = 0;
2199         ENTRY;
2200
2201         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2202         if (uuid == NULL)
2203                 RETURN(-ENOMEM);
2204
2205         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2206                 failout = (strncmp(ptr, "failout", 7) == 0);
2207
2208         rc = name_create(&lovname, log, "-mdtlov");
2209         if (rc)
2210                 GOTO(out_free, rc);
2211         if (mgs_log_is_empty(env, mgs, log)) {
2212                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2213                 if (rc)
2214                         GOTO(out_lod, rc);
2215         }
2216
2217         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2218
2219         rc = record_start_log(env, mgs, &llh, log);
2220         if (rc)
2221                 GOTO(out_lod, rc);
2222
2223         /* add MDT itself */
2224
2225         /* FIXME this whole fn should be a single journal transaction */
2226         sprintf(uuid, "%s_UUID", log);
2227         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2228         if (rc)
2229                 GOTO(out_lod, rc);
2230         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2231         if (rc)
2232                 GOTO(out_end, rc);
2233         rc = record_mount_opt(env, llh, log, lovname, NULL);
2234         if (rc)
2235                 GOTO(out_end, rc);
2236         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2237                         failout ? "n" : "f");
2238         if (rc)
2239                 GOTO(out_end, rc);
2240         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2241         if (rc)
2242                 GOTO(out_end, rc);
2243 out_end:
2244         record_end_log(env, &llh);
2245 out_lod:
2246         name_destroy(&lovname);
2247 out_free:
2248         OBD_FREE(uuid, sizeof(struct obd_uuid));
2249         RETURN(rc);
2250 }
2251
2252 /* envelope method for all layers log */
2253 static int mgs_write_log_mdt(const struct lu_env *env,
2254                              struct mgs_device *mgs,
2255                              struct fs_db *fsdb,
2256                              struct mgs_target_info *mti)
2257 {
2258         struct mgs_thread_info *mgi = mgs_env_info(env);
2259         struct llog_handle *llh = NULL;
2260         char *cliname;
2261         int rc, i = 0;
2262         ENTRY;
2263
2264         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2265
2266         if (mti->mti_uuid[0] == '\0') {
2267                 /* Make up our own uuid */
2268                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2269                          "%s_UUID", mti->mti_svname);
2270         }
2271
2272         /* add mdt */
2273         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2274         if (rc)
2275                 RETURN(rc);
2276         /* Append the mdt info to the client log */
2277         rc = name_create(&cliname, mti->mti_fsname, "-client");
2278         if (rc)
2279                 RETURN(rc);
2280
2281         if (mgs_log_is_empty(env, mgs, cliname)) {
2282                 /* Start client log */
2283                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2284                                        fsdb->fsdb_clilov);
2285                 if (rc)
2286                         GOTO(out_free, rc);
2287                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2288                                        fsdb->fsdb_clilmv);
2289                 if (rc)
2290                         GOTO(out_free, rc);
2291         }
2292
2293         /*
2294         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2295         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2296         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2297         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2298         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2299         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2300         */
2301
2302         /* copy client info about lov/lmv */
2303         mgi->mgi_comp.comp_mti = mti;
2304         mgi->mgi_comp.comp_fsdb = fsdb;
2305
2306         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2307                                                 &mgi->mgi_comp);
2308         if (rc)
2309                 GOTO(out_free, rc);
2310         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2311                                       fsdb->fsdb_clilmv);
2312         if (rc)
2313                 GOTO(out_free, rc);
2314
2315         /* add mountopts */
2316         rc = record_start_log(env, mgs, &llh, cliname);
2317         if (rc)
2318                 GOTO(out_free, rc);
2319
2320         rc = record_marker(env, llh, fsdb, CM_START, cliname,
2321                            "mount opts");
2322         if (rc)
2323                 GOTO(out_end, rc);
2324         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2325                               fsdb->fsdb_clilmv);
2326         if (rc)
2327                 GOTO(out_end, rc);
2328         rc = record_marker(env, llh, fsdb, CM_END, cliname,
2329                            "mount opts");
2330
2331         if (rc)
2332                 GOTO(out_end, rc);
2333
2334         /* for_all_existing_mdt except current one */
2335         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2336                 if (i !=  mti->mti_stripe_index &&
2337                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2338                         char *logname;
2339
2340                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2341                         if (rc)
2342                                 GOTO(out_end, rc);
2343
2344                         /* NB: If the log for the MDT is empty, it means
2345                          * the MDT is only added to the index
2346                          * map, and not being process yet, i.e. this
2347                          * is an unregistered MDT, see mgs_write_log_target().
2348                          * so we should skip it. Otherwise
2349                          *
2350                          * 1. MGS get register request for MDT1 and MDT2.
2351                          *
2352                          * 2. Then both MDT1 and MDT2 are added into
2353                          * fsdb_mdt_index_map. (see mgs_set_index()).
2354                          *
2355                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2356                          * generate the config log, here, it will regard MDT2
2357                          * as an existent MDT, and generate "add osp" for
2358                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2359                          * MDT0002 config log is still empty, so it will
2360                          * add "add osp" even before "lov setup", which
2361                          * will definitly cause trouble.
2362                          *
2363                          * 4. MDT1 registeration finished, fsdb_mutex is
2364                          * released, then MDT2 get in, then in above
2365                          * mgs_steal_llog_for_mdt_from_client(), it will
2366                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2367                          * which will cause another trouble.*/
2368                         if (!mgs_log_is_empty(env, mgs, logname))
2369                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2370                                                               mti, i, logname);
2371
2372                         name_destroy(&logname);
2373                         if (rc)
2374                                 GOTO(out_end, rc);
2375                 }
2376         }
2377 out_end:
2378         record_end_log(env, &llh);
2379 out_free:
2380         name_destroy(&cliname);
2381         RETURN(rc);
2382 }
2383
2384 /* Add the ost info to the client/mdt lov */
2385 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2386                                     struct mgs_device *mgs, struct fs_db *fsdb,
2387                                     struct mgs_target_info *mti,
2388                                     char *logname, char *suffix, char *lovname,
2389                                     enum lustre_sec_part sec_part, int flags)
2390 {
2391         struct llog_handle *llh = NULL;
2392         char *nodeuuid = NULL;
2393         char *oscname = NULL;
2394         char *oscuuid = NULL;
2395         char *lovuuid = NULL;
2396         char *svname = NULL;
2397         char index[6];
2398         char nidstr[LNET_NIDSTR_SIZE];
2399         int i, rc;
2400         ENTRY;
2401
2402         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2403                 mti->mti_svname, logname);
2404
2405         if (mgs_log_is_empty(env, mgs, logname)) {
2406                 CERROR("log is empty! Logical error\n");
2407                 RETURN(-EINVAL);
2408         }
2409
2410         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2411         rc = name_create(&nodeuuid, nidstr, "");
2412         if (rc)
2413                 RETURN(rc);
2414         rc = name_create(&svname, mti->mti_svname, "-osc");
2415         if (rc)
2416                 GOTO(out_free, rc);
2417
2418         /* for the system upgraded from old 1.8, keep using the old osc naming
2419          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2420         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2421                 rc = name_create(&oscname, svname, "");
2422         else
2423                 rc = name_create(&oscname, svname, suffix);
2424         if (rc)
2425                 GOTO(out_free, rc);
2426
2427         rc = name_create(&oscuuid, oscname, "_UUID");
2428         if (rc)
2429                 GOTO(out_free, rc);
2430         rc = name_create(&lovuuid, lovname, "_UUID");
2431         if (rc)
2432                 GOTO(out_free, rc);
2433
2434
2435         /*
2436         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2437         multihomed (#4)
2438         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2439         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2440         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2441         failover (#6,7)
2442         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2443         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2444         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2445         */
2446
2447         rc = record_start_log(env, mgs, &llh, logname);
2448         if (rc)
2449                 GOTO(out_free, rc);
2450
2451         /* FIXME these should be a single journal transaction */
2452         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2453                            "add osc");
2454         if (rc)
2455                 GOTO(out_end, rc);
2456
2457         /* NB: don't change record order, because upon MDT steal OSC config
2458          * from client, it treats all nids before LCFG_SETUP as target nids
2459          * (multiple interfaces), while nids after as failover node nids.
2460          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2461          */
2462         for (i = 0; i < mti->mti_nid_count; i++) {
2463                 CDEBUG(D_MGS, "add nid %s\n",
2464                         libcfs_nid2str_r(mti->mti_nids[i],
2465                                          nidstr, sizeof(nidstr)));
2466                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2467                 if (rc)
2468                         GOTO(out_end, rc);
2469         }
2470         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2471         if (rc)
2472                 GOTO(out_end, rc);
2473         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2474                           NULL, NULL);
2475         if (rc)
2476                 GOTO(out_end, rc);
2477         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2478         if (rc)
2479                 GOTO(out_end, rc);
2480
2481         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2482
2483         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2484         if (rc)
2485                 GOTO(out_end, rc);
2486         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2487                            "add osc");
2488         if (rc)
2489                 GOTO(out_end, rc);
2490 out_end:
2491         record_end_log(env, &llh);
2492 out_free:
2493         name_destroy(&lovuuid);
2494         name_destroy(&oscuuid);
2495         name_destroy(&oscname);
2496         name_destroy(&svname);
2497         name_destroy(&nodeuuid);
2498         RETURN(rc);
2499 }
2500
2501 static int mgs_write_log_ost(const struct lu_env *env,
2502                              struct mgs_device *mgs, struct fs_db *fsdb,
2503                              struct mgs_target_info *mti)
2504 {
2505         struct llog_handle *llh = NULL;
2506         char *logname, *lovname;
2507         char *ptr = mti->mti_params;
2508         int rc, flags = 0, failout = 0, i;
2509         ENTRY;
2510
2511         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2512
2513         /* The ost startup log */
2514
2515         /* If the ost log already exists, that means that someone reformatted
2516            the ost and it called target_add again. */
2517         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2518                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2519                                    "exists, yet the server claims it never "
2520                                    "registered. It may have been reformatted, "
2521                                    "or the index changed. writeconf the MDT to "
2522                                    "regenerate all logs.\n", mti->mti_svname);
2523                 RETURN(-EALREADY);
2524         }
2525
2526         /*
2527         attach obdfilter ost1 ost1_UUID
2528         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2529         */
2530         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2531                 failout = (strncmp(ptr, "failout", 7) == 0);
2532         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2533         if (rc)
2534                 RETURN(rc);
2535         /* FIXME these should be a single journal transaction */
2536         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2537         if (rc)
2538                 GOTO(out_end, rc);
2539         if (*mti->mti_uuid == '\0')
2540                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2541                          "%s_UUID", mti->mti_svname);
2542         rc = record_attach(env, llh, mti->mti_svname,
2543                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2544         if (rc)
2545                 GOTO(out_end, rc);
2546         rc = record_setup(env, llh, mti->mti_svname,
2547                           "dev"/*ignored*/, "type"/*ignored*/,
2548                           failout ? "n" : "f", NULL/*options*/);
2549         if (rc)
2550                 GOTO(out_end, rc);
2551         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2552         if (rc)
2553                 GOTO(out_end, rc);
2554 out_end:
2555         record_end_log(env, &llh);
2556         if (rc)
2557                 RETURN(rc);
2558         /* We also have to update the other logs where this osc is part of
2559            the lov */
2560
2561         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2562                 /* If we're upgrading, the old mdt log already has our
2563                    entry. Let's do a fake one for fun. */
2564                 /* Note that we can't add any new failnids, since we don't
2565                    know the old osc names. */
2566                 flags = CM_SKIP | CM_UPGRADE146;
2567
2568         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2569                 /* If the update flag isn't set, don't update client/mdt
2570                    logs. */
2571                 flags |= CM_SKIP;
2572                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2573                               "the MDT first to regenerate it.\n",
2574                               mti->mti_svname);
2575         }
2576
2577         /* Add ost to all MDT lov defs */
2578         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2579                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2580                         char mdt_index[9];
2581
2582                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2583                                                      i);
2584                         if (rc)
2585                                 RETURN(rc);
2586                         sprintf(mdt_index, "-MDT%04x", i);
2587                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2588                                                       logname, mdt_index,
2589                                                       lovname, LUSTRE_SP_MDT,
2590                                                       flags);
2591                         name_destroy(&logname);
2592                         name_destroy(&lovname);
2593                         if (rc)
2594                                 RETURN(rc);
2595                 }
2596         }
2597
2598         /* Append ost info to the client log */
2599         rc = name_create(&logname, mti->mti_fsname, "-client");
2600         if (rc)
2601                 RETURN(rc);
2602         if (mgs_log_is_empty(env, mgs, logname)) {
2603                 /* Start client log */
2604                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2605                                        fsdb->fsdb_clilov);
2606                 if (rc)
2607                         GOTO(out_free, rc);
2608                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2609                                        fsdb->fsdb_clilmv);
2610                 if (rc)
2611                         GOTO(out_free, rc);
2612         }
2613         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2614                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
2615 out_free:
2616         name_destroy(&logname);
2617         RETURN(rc);
2618 }
2619
2620 static __inline__ int mgs_param_empty(char *ptr)
2621 {
2622         char *tmp;
2623
2624         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2625                 return 1;
2626         return 0;
2627 }
2628
2629 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2630                                           struct mgs_device *mgs,
2631                                           struct fs_db *fsdb,
2632                                           struct mgs_target_info *mti,
2633                                           char *logname, char *cliname)
2634 {
2635         int rc;
2636         struct llog_handle *llh = NULL;
2637
2638         if (mgs_param_empty(mti->mti_params)) {
2639                 /* Remove _all_ failnids */
2640                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2641                                 mti->mti_svname, "add failnid", CM_SKIP);
2642                 return rc < 0 ? rc : 0;
2643         }
2644
2645         /* Otherwise failover nids are additive */
2646         rc = record_start_log(env, mgs, &llh, logname);
2647         if (rc)
2648                 return rc;
2649                 /* FIXME this should be a single journal transaction */
2650         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2651                            "add failnid");
2652         if (rc)
2653                 goto out_end;
2654         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2655         if (rc)
2656                 goto out_end;
2657         rc = record_marker(env, llh, fsdb, CM_END,
2658                            mti->mti_svname, "add failnid");
2659 out_end:
2660         record_end_log(env, &llh);
2661         return rc;
2662 }
2663
2664
2665 /* Add additional failnids to an existing log.
2666    The mdc/osc must have been added to logs first */
2667 /* tcp nids must be in dotted-quad ascii -
2668    we can't resolve hostnames from the kernel. */
2669 static int mgs_write_log_add_failnid(const struct lu_env *env,
2670                                      struct mgs_device *mgs,
2671                                      struct fs_db *fsdb,
2672                                      struct mgs_target_info *mti)
2673 {
2674         char *logname, *cliname;
2675         int rc;
2676         ENTRY;
2677
2678         /* FIXME we currently can't erase the failnids
2679          * given when a target first registers, since they aren't part of
2680          * an "add uuid" stanza */
2681
2682         /* Verify that we know about this target */
2683         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2684                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2685                                    "yet. It must be started before failnids "
2686                                    "can be added.\n", mti->mti_svname);
2687                 RETURN(-ENOENT);
2688         }
2689
2690         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2691         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2692                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2693         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2694                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2695         } else {
2696                 RETURN(-EINVAL);
2697         }
2698         if (rc)
2699                 RETURN(rc);
2700         /* Add failover nids to the client log */
2701         rc = name_create(&logname, mti->mti_fsname, "-client");
2702         if (rc) {
2703                 name_destroy(&cliname);
2704                 RETURN(rc);
2705         }
2706         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2707         name_destroy(&logname);
2708         name_destroy(&cliname);
2709         if (rc)
2710                 RETURN(rc);
2711
2712         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2713                 /* Add OST failover nids to the MDT logs as well */
2714                 int i;
2715
2716                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2717                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2718                                 continue;
2719                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2720                         if (rc)
2721                                 RETURN(rc);
2722                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2723                                                  fsdb, i);
2724                         if (rc) {
2725                                 name_destroy(&logname);
2726                                 RETURN(rc);
2727                         }
2728                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2729                                                             mti, logname,
2730                                                             cliname);
2731                         name_destroy(&cliname);
2732                         name_destroy(&logname);
2733                         if (rc)
2734                                 RETURN(rc);
2735                 }
2736         }
2737
2738         RETURN(rc);
2739 }
2740
2741 static int mgs_wlp_lcfg(const struct lu_env *env,
2742                         struct mgs_device *mgs, struct fs_db *fsdb,
2743                         struct mgs_target_info *mti,
2744                         char *logname, struct lustre_cfg_bufs *bufs,
2745                         char *tgtname, char *ptr)
2746 {
2747         char comment[MTI_NAME_MAXLEN];
2748         char *tmp;
2749         struct llog_cfg_rec *lcr;
2750         int rc, del;
2751
2752         /* Erase any old settings of this same parameter */
2753         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2754         comment[MTI_NAME_MAXLEN - 1] = 0;
2755         /* But don't try to match the value. */
2756         tmp = strchr(comment, '=');
2757         if (tmp != NULL)
2758                 *tmp = 0;
2759         /* FIXME we should skip settings that are the same as old values */
2760         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2761         if (rc < 0)
2762                 return rc;
2763         del = mgs_param_empty(ptr);
2764
2765         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2766                       "Setting" : "Modifying", tgtname, comment, logname);
2767         if (del) {
2768                 /* mgs_modify() will return 1 if nothing had to be done */
2769                 if (rc == 1)
2770                         rc = 0;
2771                 return rc;
2772         }
2773
2774         lustre_cfg_bufs_reset(bufs, tgtname);
2775         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2776         if (mti->mti_flags & LDD_F_PARAM2)
2777                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2778
2779         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2780                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2781         if (lcr == NULL)
2782                 return -ENOMEM;
2783
2784         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2785                                   comment);
2786         lustre_cfg_rec_free(lcr);
2787         return rc;
2788 }
2789
2790 static int mgs_write_log_param2(const struct lu_env *env,
2791                                 struct mgs_device *mgs,
2792                                 struct fs_db *fsdb,
2793                                 struct mgs_target_info *mti, char *ptr)
2794 {
2795         struct lustre_cfg_bufs  bufs;
2796         int                     rc = 0;
2797         ENTRY;
2798
2799         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2800         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2801                           mti->mti_svname, ptr);
2802
2803         RETURN(rc);
2804 }
2805
2806 /* write global variable settings into log */
2807 static int mgs_write_log_sys(const struct lu_env *env,
2808                              struct mgs_device *mgs, struct fs_db *fsdb,
2809                              struct mgs_target_info *mti, char *sys, char *ptr)
2810 {
2811         struct mgs_thread_info  *mgi = mgs_env_info(env);
2812         struct lustre_cfg       *lcfg;
2813         struct llog_cfg_rec     *lcr;
2814         char *tmp, sep;
2815         int rc, cmd, convert = 1;
2816
2817         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2818                 cmd = LCFG_SET_TIMEOUT;
2819         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2820                 cmd = LCFG_SET_LDLM_TIMEOUT;
2821         /* Check for known params here so we can return error to lctl */
2822         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2823                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2824                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2825                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2826                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2827                 cmd = LCFG_PARAM;
2828         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2829                 convert = 0; /* Don't convert string value to integer */
2830                 cmd = LCFG_PARAM;
2831         } else {
2832                 return -EINVAL;
2833         }
2834
2835         if (mgs_param_empty(ptr))
2836                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2837         else
2838                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2839
2840         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2841         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2842         if (!convert && *tmp != '\0')
2843                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2844         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2845         if (lcr == NULL)
2846                 return -ENOMEM;
2847
2848         lcfg = &lcr->lcr_cfg;
2849         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2850         /* truncate the comment to the parameter name */
2851         ptr = tmp - 1;
2852         sep = *ptr;
2853         *ptr = '\0';
2854         /* modify all servers and clients */
2855         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2856                                       *tmp == '\0' ? NULL : lcr,
2857                                       mti->mti_fsname, sys, 0);
2858         if (rc == 0 && *tmp != '\0') {
2859                 switch (cmd) {
2860                 case LCFG_SET_TIMEOUT:
2861                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2862                                 class_process_config(lcfg);
2863                         break;
2864                 case LCFG_SET_LDLM_TIMEOUT:
2865                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2866                                 class_process_config(lcfg);
2867                         break;
2868                 default:
2869                         break;
2870                 }
2871         }
2872         *ptr = sep;
2873         lustre_cfg_rec_free(lcr);
2874         return rc;
2875 }
2876
2877 /* write quota settings into log */
2878 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2879                                struct fs_db *fsdb, struct mgs_target_info *mti,
2880                                char *quota, char *ptr)
2881 {
2882         struct mgs_thread_info  *mgi = mgs_env_info(env);
2883         struct llog_cfg_rec     *lcr;
2884         char                    *tmp;
2885         char                     sep;
2886         int                      rc, cmd = LCFG_PARAM;
2887
2888         /* support only 'meta' and 'data' pools so far */
2889         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2890             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2891                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2892                        "& quota.ost are)\n", ptr);
2893                 return -EINVAL;
2894         }
2895
2896         if (*tmp == '\0') {
2897                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2898         } else {
2899                 CDEBUG(D_MGS, "global '%s'\n", quota);
2900
2901                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2902                     strcmp(tmp, "none") != 0) {
2903                         CERROR("enable option(%s) isn't supported\n", tmp);
2904                         return -EINVAL;
2905                 }
2906         }
2907
2908         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2909         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2910         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2911         if (lcr == NULL)
2912                 return -ENOMEM;
2913
2914         /* truncate the comment to the parameter name */
2915         ptr = tmp - 1;
2916         sep = *ptr;
2917         *ptr = '\0';
2918
2919         /* XXX we duplicated quota enable information in all server
2920          *     config logs, it should be moved to a separate config
2921          *     log once we cleanup the config log for global param. */
2922         /* modify all servers */
2923         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2924                                       *tmp == '\0' ? NULL : lcr,
2925                                       mti->mti_fsname, quota, 1);
2926         *ptr = sep;
2927         lustre_cfg_rec_free(lcr);
2928         return rc < 0 ? rc : 0;
2929 }
2930
2931 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2932                                    struct mgs_device *mgs,
2933                                    struct fs_db *fsdb,
2934                                    struct mgs_target_info *mti,
2935                                    char *param)
2936 {
2937         struct mgs_thread_info  *mgi = mgs_env_info(env);
2938         struct llog_cfg_rec     *lcr;
2939         struct llog_handle      *llh = NULL;
2940         char                    *logname;
2941         char                    *comment, *ptr;
2942         int                      rc, len;
2943
2944         ENTRY;
2945
2946         /* get comment */
2947         ptr = strchr(param, '=');
2948         LASSERT(ptr != NULL);
2949         len = ptr - param;
2950
2951         OBD_ALLOC(comment, len + 1);
2952         if (comment == NULL)
2953                 RETURN(-ENOMEM);
2954         strncpy(comment, param, len);
2955         comment[len] = '\0';
2956
2957         /* prepare lcfg */
2958         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2959         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2960         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2961         if (lcr == NULL)
2962                 GOTO(out_comment, rc = -ENOMEM);
2963
2964         /* construct log name */
2965         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2966         if (rc < 0)
2967                 GOTO(out_lcfg, rc);
2968
2969         if (mgs_log_is_empty(env, mgs, logname)) {
2970                 rc = record_start_log(env, mgs, &llh, logname);
2971                 if (rc < 0)
2972                         GOTO(out, rc);
2973                 record_end_log(env, &llh);
2974         }
2975
2976         /* obsolete old one */
2977         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2978                         comment, CM_SKIP);
2979         if (rc < 0)
2980                 GOTO(out, rc);
2981         /* write the new one */
2982         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2983                                   mti->mti_svname, comment);
2984         if (rc)
2985                 CERROR("%s: error writing log %s: rc = %d\n",
2986                        mgs->mgs_obd->obd_name, logname, rc);
2987 out:
2988         name_destroy(&logname);
2989 out_lcfg:
2990         lustre_cfg_rec_free(lcr);
2991 out_comment:
2992         OBD_FREE(comment, len + 1);
2993         RETURN(rc);
2994 }
2995
2996 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2997                                         char *param)
2998 {
2999         char    *ptr;
3000
3001         /* disable the adjustable udesc parameter for now, i.e. use default
3002          * setting that client always ship udesc to MDT if possible. to enable
3003          * it simply remove the following line */
3004         goto error_out;
3005
3006         ptr = strchr(param, '=');
3007         if (ptr == NULL)
3008                 goto error_out;
3009         *ptr++ = '\0';
3010
3011         if (strcmp(param, PARAM_SRPC_UDESC))
3012                 goto error_out;
3013
3014         if (strcmp(ptr, "yes") == 0) {
3015                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3016                 CWARN("Enable user descriptor shipping from client to MDT\n");
3017         } else if (strcmp(ptr, "no") == 0) {
3018                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3019                 CWARN("Disable user descriptor shipping from client to MDT\n");
3020         } else {
3021                 *(ptr - 1) = '=';
3022                 goto error_out;
3023         }
3024         return 0;
3025
3026 error_out:
3027         CERROR("Invalid param: %s\n", param);
3028         return -EINVAL;
3029 }
3030
3031 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3032                                   const char *svname,
3033                                   char *param)
3034 {
3035         struct sptlrpc_rule      rule;
3036         struct sptlrpc_rule_set *rset;
3037         int                      rc;
3038         ENTRY;
3039
3040         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3041                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3042                 RETURN(-EINVAL);
3043         }
3044
3045         if (strncmp(param, PARAM_SRPC_UDESC,
3046                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3047                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3048         }
3049
3050         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3051                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3052                 RETURN(-EINVAL);
3053         }
3054
3055         param += sizeof(PARAM_SRPC_FLVR) - 1;
3056
3057         rc = sptlrpc_parse_rule(param, &rule);
3058         if (rc)
3059                 RETURN(rc);
3060
3061         /* mgs rules implies must be mgc->mgs */
3062         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3063                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3064                      rule.sr_from != LUSTRE_SP_ANY) ||
3065                     (rule.sr_to != LUSTRE_SP_MGS &&
3066                      rule.sr_to != LUSTRE_SP_ANY))
3067                         RETURN(-EINVAL);
3068         }
3069
3070         /* preapre room for this coming rule. svcname format should be:
3071          * - fsname: general rule
3072          * - fsname-tgtname: target-specific rule
3073          */
3074         if (strchr(svname, '-')) {
3075                 struct mgs_tgt_srpc_conf *tgtconf;
3076                 int                       found = 0;
3077
3078                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3079                      tgtconf = tgtconf->mtsc_next) {
3080                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3081                                 found = 1;
3082                                 break;
3083                         }
3084                 }
3085
3086                 if (!found) {
3087                         int name_len;
3088
3089                         OBD_ALLOC_PTR(tgtconf);
3090                         if (tgtconf == NULL)
3091                                 RETURN(-ENOMEM);
3092
3093                         name_len = strlen(svname);
3094
3095                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3096                         if (tgtconf->mtsc_tgt == NULL) {
3097                                 OBD_FREE_PTR(tgtconf);
3098                                 RETURN(-ENOMEM);
3099                         }
3100                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3101
3102                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3103                         fsdb->fsdb_srpc_tgt = tgtconf;
3104                 }
3105
3106                 rset = &tgtconf->mtsc_rset;
3107         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3108                 /* put _mgs related srpc rule directly in mgs ruleset */
3109                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3110         } else {
3111                 rset = &fsdb->fsdb_srpc_gen;
3112         }
3113
3114         rc = sptlrpc_rule_set_merge(rset, &rule);
3115
3116         RETURN(rc);
3117 }
3118
3119 static int mgs_srpc_set_param(const struct lu_env *env,
3120                               struct mgs_device *mgs,
3121                               struct fs_db *fsdb,
3122                               struct mgs_target_info *mti,
3123                               char *param)
3124 {
3125         char                   *copy;
3126         int                     rc, copy_size;
3127         ENTRY;
3128
3129 #ifndef HAVE_GSS
3130         RETURN(-EINVAL);
3131 #endif
3132         /* keep a copy of original param, which could be destroied
3133          * during parsing */
3134         copy_size = strlen(param) + 1;
3135         OBD_ALLOC(copy, copy_size);
3136         if (copy == NULL)
3137                 return -ENOMEM;
3138         memcpy(copy, param, copy_size);
3139
3140         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3141         if (rc)
3142                 goto out_free;
3143
3144         /* previous steps guaranteed the syntax is correct */
3145         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3146         if (rc)
3147                 goto out_free;
3148
3149         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3150                 /*
3151                  * for mgs rules, make them effective immediately.
3152                  */
3153                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3154                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3155                                                  &fsdb->fsdb_srpc_gen);
3156         }
3157
3158 out_free:
3159         OBD_FREE(copy, copy_size);
3160         RETURN(rc);
3161 }
3162
3163 struct mgs_srpc_read_data {
3164         struct fs_db   *msrd_fsdb;
3165         int             msrd_skip;
3166 };
3167
3168 static int mgs_srpc_read_handler(const struct lu_env *env,
3169                                  struct llog_handle *llh,
3170                                  struct llog_rec_hdr *rec, void *data)
3171 {
3172         struct mgs_srpc_read_data *msrd = data;
3173         struct cfg_marker         *marker;
3174         struct lustre_cfg         *lcfg = REC_DATA(rec);
3175         char                      *svname, *param;
3176         int                        cfg_len, rc;
3177         ENTRY;
3178
3179         if (rec->lrh_type != OBD_CFG_REC) {
3180                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3181                 RETURN(-EINVAL);
3182         }
3183
3184         cfg_len = REC_DATA_LEN(rec);
3185
3186         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3187         if (rc) {
3188                 CERROR("Insane cfg\n");
3189                 RETURN(rc);
3190         }
3191
3192         if (lcfg->lcfg_command == LCFG_MARKER) {
3193                 marker = lustre_cfg_buf(lcfg, 1);
3194
3195                 if (marker->cm_flags & CM_START &&
3196                     marker->cm_flags & CM_SKIP)
3197                         msrd->msrd_skip = 1;
3198                 if (marker->cm_flags & CM_END)
3199                         msrd->msrd_skip = 0;
3200
3201                 RETURN(0);
3202         }
3203
3204         if (msrd->msrd_skip)
3205                 RETURN(0);
3206
3207         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3208                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3209                 RETURN(0);
3210         }
3211
3212         svname = lustre_cfg_string(lcfg, 0);
3213         if (svname == NULL) {
3214                 CERROR("svname is empty\n");
3215                 RETURN(0);
3216         }
3217
3218         param = lustre_cfg_string(lcfg, 1);
3219         if (param == NULL) {
3220                 CERROR("param is empty\n");
3221                 RETURN(0);
3222         }
3223
3224         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3225         if (rc)
3226                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3227
3228         RETURN(0);
3229 }
3230
3231 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3232                                 struct mgs_device *mgs,
3233                                 struct fs_db *fsdb)
3234 {
3235         struct llog_handle        *llh = NULL;
3236         struct llog_ctxt          *ctxt;
3237         char                      *logname;
3238         struct mgs_srpc_read_data  msrd;
3239         int                        rc;
3240         ENTRY;
3241
3242         /* construct log name */
3243         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3244         if (rc)
3245                 RETURN(rc);
3246
3247         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3248         LASSERT(ctxt != NULL);
3249
3250         if (mgs_log_is_empty(env, mgs, logname))
3251                 GOTO(out, rc = 0);
3252
3253         rc = llog_open(env, ctxt, &llh, NULL, logname,
3254                        LLOG_OPEN_EXISTS);
3255         if (rc < 0) {
3256                 if (rc == -ENOENT)
3257                         rc = 0;
3258                 GOTO(out, rc);
3259         }
3260
3261         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3262         if (rc)
3263                 GOTO(out_close, rc);
3264
3265         if (llog_get_size(llh) <= 1)
3266                 GOTO(out_close, rc = 0);
3267
3268         msrd.msrd_fsdb = fsdb;
3269         msrd.msrd_skip = 0;
3270
3271         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3272                           NULL);
3273
3274 out_close:
3275         llog_close(env, llh);
3276 out:
3277         llog_ctxt_put(ctxt);
3278         name_destroy(&logname);
3279
3280         if (rc)
3281                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3282         RETURN(rc);
3283 }
3284
3285 /* Permanent settings of all parameters by writing into the appropriate
3286  * configuration logs.
3287  * A parameter with null value ("<param>='\0'") means to erase it out of
3288  * the logs.
3289  */
3290 static int mgs_write_log_param(const struct lu_env *env,
3291                                struct mgs_device *mgs, struct fs_db *fsdb,
3292                                struct mgs_target_info *mti, char *ptr)
3293 {
3294         struct mgs_thread_info *mgi = mgs_env_info(env);
3295         char *logname;
3296         char *tmp;
3297         int rc = 0;
3298         ENTRY;
3299
3300         /* For various parameter settings, we have to figure out which logs
3301            care about them (e.g. both mdt and client for lov settings) */
3302         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3303
3304         /* The params are stored in MOUNT_DATA_FILE and modified via
3305            tunefs.lustre, or set using lctl conf_param */
3306
3307         /* Processed in lustre_start_mgc */
3308         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3309                 GOTO(end, rc);
3310
3311         /* Processed in ost/mdt */
3312         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3313                 GOTO(end, rc);
3314
3315         /* Processed in mgs_write_log_ost */
3316         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3317                 if (mti->mti_flags & LDD_F_PARAM) {
3318                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3319                                            "changed with tunefs.lustre"
3320                                            "and --writeconf\n", ptr);
3321                         rc = -EPERM;
3322                 }
3323                 GOTO(end, rc);
3324         }
3325
3326         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3327                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3328                 GOTO(end, rc);
3329         }
3330
3331         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3332                 /* Add a failover nidlist */
3333                 rc = 0;
3334                 /* We already processed failovers params for new
3335                    targets in mgs_write_log_target */
3336                 if (mti->mti_flags & LDD_F_PARAM) {
3337                         CDEBUG(D_MGS, "Adding failnode\n");
3338                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3339                 }
3340                 GOTO(end, rc);
3341         }
3342
3343         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3344                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3345                 GOTO(end, rc);
3346         }
3347
3348         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3349                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3350                 GOTO(end, rc);
3351         }
3352
3353         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3354             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3355                 /* active=0 means off, anything else means on */
3356                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3357                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3358                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3359                 int i;
3360
3361                 if (!deactive_osc) {
3362                         __u32   index;
3363
3364                         rc = server_name2index(mti->mti_svname, &index, NULL);
3365                         if (rc < 0)
3366                                 GOTO(end, rc);
3367
3368                         if (index == 0) {
3369                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3370                                                    " (de)activated.\n",
3371                                                    mti->mti_svname);
3372                                 GOTO(end, rc = -EINVAL);
3373                         }
3374                 }
3375
3376                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3377                               flag ? "de" : "re", mti->mti_svname);
3378                 /* Modify clilov */
3379                 rc = name_create(&logname, mti->mti_fsname, "-client");
3380                 if (rc < 0)
3381                         GOTO(end, rc);
3382                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3383                                 mti->mti_svname,
3384                                 deactive_osc ? "add osc" : "add mdc", flag);
3385                 name_destroy(&logname);
3386                 if (rc < 0)
3387                         goto active_err;
3388
3389                 /* Modify mdtlov */
3390                 /* Add to all MDT logs for DNE */
3391                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3392                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3393                                 continue;
3394                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3395                         if (rc < 0)
3396                                 GOTO(end, rc);
3397                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3398                                         mti->mti_svname,
3399                                         deactive_osc ? "add osc" : "add osp",
3400                                         flag);
3401                         name_destroy(&logname);
3402                         if (rc < 0)
3403                                 goto active_err;
3404                 }
3405 active_err:
3406                 if (rc < 0) {
3407                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3408                                            "log (%d). No permanent "
3409                                            "changes were made to the "
3410                                            "config log.\n",
3411                                            mti->mti_svname, rc);
3412                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3413                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3414                                                    " because the log"
3415                                                    "is in the old 1.4"
3416                                                    "style. Consider "
3417                                                    " --writeconf to "
3418                                                    "update the logs.\n");
3419                         GOTO(end, rc);
3420                 }
3421                 /* Fall through to osc/mdc proc for deactivating live
3422                    OSC/OSP on running MDT / clients. */
3423         }
3424         /* Below here, let obd's XXX_process_config methods handle it */
3425
3426         /* All lov. in proc */
3427         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3428                 char *mdtlovname;
3429
3430                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3431                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3432                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3433                                            "set on the MDT, not %s. "
3434                                            "Ignoring.\n",
3435                                            mti->mti_svname);
3436                         GOTO(end, rc = 0);
3437                 }
3438
3439                 /* Modify mdtlov */
3440                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3441                         GOTO(end, rc = -ENODEV);
3442
3443                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3444                                              mti->mti_stripe_index);
3445                 if (rc)
3446                         GOTO(end, rc);
3447                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3448                                   &mgi->mgi_bufs, mdtlovname, ptr);
3449                 name_destroy(&logname);
3450                 name_destroy(&mdtlovname);
3451                 if (rc)
3452                         GOTO(end, rc);
3453
3454                 /* Modify clilov */
3455                 rc = name_create(&logname, mti->mti_fsname, "-client");
3456                 if (rc)
3457                         GOTO(end, rc);
3458                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3459                                   fsdb->fsdb_clilov, ptr);
3460                 name_destroy(&logname);
3461                 GOTO(end, rc);
3462         }
3463
3464         /* All osc., mdc., llite. params in proc */
3465         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3466             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3467             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3468                 char *cname;
3469
3470                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3471                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3472                                            " cannot be modified. Consider"
3473                                            " updating the configuration with"
3474                                            " --writeconf\n",
3475                                            mti->mti_svname);
3476                         GOTO(end, rc = -EINVAL);
3477                 }
3478                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3479                         rc = name_create(&cname, mti->mti_fsname, "-client");
3480                         /* Add the client type to match the obdname in
3481                            class_config_llog_handler */
3482                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3483                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3484                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3485                         rc = name_create(&cname, mti->mti_svname, "-osc");
3486                 } else {
3487                         GOTO(end, rc = -EINVAL);
3488                 }
3489                 if (rc)
3490                         GOTO(end, rc);
3491
3492                 /* Forbid direct update of llite root squash parameters.
3493                  * These parameters are indirectly set via the MDT settings.
3494                  * See (LU-1778) */
3495                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3496                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3497                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3498                         LCONSOLE_ERROR("%s: root squash parameters can only "
3499                                 "be updated through MDT component\n",
3500                                 mti->mti_fsname);
3501                         name_destroy(&cname);
3502                         GOTO(end, rc = -EINVAL);
3503                 }
3504
3505                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3506
3507                 /* Modify client */
3508                 rc = name_create(&logname, mti->mti_fsname, "-client");
3509                 if (rc) {
3510                         name_destroy(&cname);
3511                         GOTO(end, rc);
3512                 }
3513                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3514                                   cname, ptr);
3515
3516                 /* osc params affect the MDT as well */
3517                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3518                         int i;
3519
3520                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3521                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3522                                         continue;
3523                                 name_destroy(&cname);
3524                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3525                                                          fsdb, i);
3526                                 name_destroy(&logname);
3527                                 if (rc)
3528                                         break;
3529                                 rc = name_create_mdt(&logname,
3530                                                      mti->mti_fsname, i);
3531                                 if (rc)
3532                                         break;
3533                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3534                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3535                                                           mti, logname,
3536                                                           &mgi->mgi_bufs,
3537                                                           cname, ptr);
3538                                         if (rc)
3539                                                 break;
3540                                 }
3541                         }
3542                 }
3543
3544                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
3545                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
3546                     rc == 0) {
3547                         char suffix[16];
3548                         char *lodname = NULL;
3549                         char *param_str = NULL;
3550                         int i;
3551                         int index;
3552
3553                         /* replace mdc with osp */
3554                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
3555                         rc = server_name2index(mti->mti_svname, &index, NULL);
3556                         if (rc < 0) {
3557                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3558                                 GOTO(end, rc);
3559                         }
3560
3561                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3562                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3563                                         continue;
3564
3565                                 if (i == index)
3566                                         continue;
3567
3568                                 name_destroy(&logname);
3569                                 rc = name_create_mdt(&logname, mti->mti_fsname,
3570                                                      i);
3571                                 if (rc < 0)
3572                                         break;
3573
3574                                 if (mgs_log_is_empty(env, mgs, logname))
3575                                         continue;
3576
3577                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
3578                                          i);
3579                                 name_destroy(&cname);
3580                                 rc = name_create(&cname, mti->mti_svname,
3581                                                  suffix);
3582                                 if (rc < 0)
3583                                         break;
3584
3585                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3586                                                   &mgi->mgi_bufs, cname, ptr);
3587                                 if (rc < 0)
3588                                         break;
3589
3590                                 /* Add configuration log for noitfying LOD
3591                                  * to active/deactive the OSP. */
3592                                 name_destroy(&param_str);
3593                                 rc = name_create(&param_str, cname,
3594                                                  (*tmp == '0') ?  ".active=0" :
3595                                                  ".active=1");
3596                                 if (rc < 0)
3597                                         break;
3598
3599                                 name_destroy(&lodname);
3600                                 rc = name_create(&lodname, logname, "-mdtlov");
3601                                 if (rc < 0)
3602                                         break;
3603
3604                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3605                                                   &mgi->mgi_bufs, lodname,
3606                                                   param_str);
3607                                 if (rc < 0)
3608                                         break;
3609                         }
3610                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3611                         name_destroy(&lodname);
3612                         name_destroy(&param_str);
3613                 }
3614
3615                 name_destroy(&logname);
3616                 name_destroy(&cname);
3617                 GOTO(end, rc);
3618         }
3619
3620         /* All mdt. params in proc */
3621         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3622                 int i;
3623                 __u32 idx;
3624
3625                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3626                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3627                             MTI_NAME_MAXLEN) == 0)
3628                         /* device is unspecified completely? */
3629                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3630                 else
3631                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3632                 if (rc < 0)
3633                         goto active_err;
3634                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3635                         goto active_err;
3636                 if (rc & LDD_F_SV_ALL) {
3637                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3638                                 if (!test_bit(i,
3639                                                   fsdb->fsdb_mdt_index_map))
3640                                         continue;
3641                                 rc = name_create_mdt(&logname,
3642                                                 mti->mti_fsname, i);
3643                                 if (rc)
3644                                         goto active_err;
3645                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3646                                                   logname, &mgi->mgi_bufs,
3647                                                   logname, ptr);
3648                                 name_destroy(&logname);
3649                                 if (rc)
3650                                         goto active_err;
3651                         }
3652                 } else {
3653                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3654                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3655                                 LCONSOLE_ERROR("%s: root squash parameters "
3656                                         "cannot be applied to a single MDT\n",
3657                                         mti->mti_fsname);
3658                                 GOTO(end, rc = -EINVAL);
3659                         }
3660                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3661                                           mti->mti_svname, &mgi->mgi_bufs,
3662                                           mti->mti_svname, ptr);
3663                         if (rc)
3664                                 goto active_err;
3665                 }
3666
3667                 /* root squash settings are also applied to llite
3668                  * config log (see LU-1778) */
3669                 if (rc == 0 &&
3670                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3671                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3672                         char *cname;
3673                         char *ptr2;
3674
3675                         rc = name_create(&cname, mti->mti_fsname, "-client");
3676                         if (rc)
3677                                 GOTO(end, rc);
3678                         rc = name_create(&logname, mti->mti_fsname, "-client");
3679                         if (rc) {
3680                                 name_destroy(&cname);
3681                                 GOTO(end, rc);
3682                         }
3683                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3684                         if (rc) {
3685                                 name_destroy(&cname);
3686                                 name_destroy(&logname);
3687                                 GOTO(end, rc);
3688                         }
3689                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3690                                           &mgi->mgi_bufs, cname, ptr2);
3691                         name_destroy(&ptr2);
3692                         name_destroy(&logname);
3693                         name_destroy(&cname);
3694                 }
3695                 GOTO(end, rc);
3696         }
3697
3698         /* All mdd., ost. and osd. params in proc */
3699         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3700             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3701             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3702                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3703                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3704                         GOTO(end, rc = -ENODEV);
3705
3706                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3707                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3708                 GOTO(end, rc);
3709         }
3710
3711         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3712
3713 end:
3714         if (rc)
3715                 CERROR("err %d on param '%s'\n", rc, ptr);
3716
3717         RETURN(rc);
3718 }
3719
3720 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3721                          struct mgs_target_info *mti, struct fs_db *fsdb)
3722 {
3723         char    *buf, *params;
3724         int      rc = -EINVAL;
3725
3726         ENTRY;
3727
3728         /* set/check the new target index */
3729         rc = mgs_set_index(env, mgs, mti);
3730         if (rc < 0)
3731                 RETURN(rc);
3732
3733         if (rc == EALREADY) {
3734                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3735                               mti->mti_stripe_index, mti->mti_svname);
3736                 /* We would like to mark old log sections as invalid
3737                    and add new log sections in the client and mdt logs.
3738                    But if we add new sections, then live clients will
3739                    get repeat setup instructions for already running
3740                    osc's. So don't update the client/mdt logs. */
3741                 mti->mti_flags &= ~LDD_F_UPDATE;
3742                 rc = 0;
3743         }
3744
3745         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
3746                          cfs_fail_val : 10);
3747
3748         mutex_lock(&fsdb->fsdb_mutex);
3749
3750         if (mti->mti_flags &
3751             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3752                 /* Generate a log from scratch */
3753                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3754                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3755                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3756                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3757                 } else {
3758                         CERROR("Unknown target type %#x, can't create log for "
3759                                "%s\n", mti->mti_flags, mti->mti_svname);
3760                 }
3761                 if (rc) {
3762                         CERROR("Can't write logs for %s (%d)\n",
3763                                mti->mti_svname, rc);
3764                         GOTO(out_up, rc);
3765                 }
3766         } else {
3767                 /* Just update the params from tunefs in mgs_write_log_params */
3768                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3769                 mti->mti_flags |= LDD_F_PARAM;
3770         }
3771
3772         /* allocate temporary buffer, where class_get_next_param will
3773            make copy of a current  parameter */
3774         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3775         if (buf == NULL)
3776                 GOTO(out_up, rc = -ENOMEM);
3777         params = mti->mti_params;
3778         while (params != NULL) {
3779                 rc = class_get_next_param(&params, buf);
3780                 if (rc) {
3781                         if (rc == 1)
3782                                 /* there is no next parameter, that is
3783                                    not an error */
3784                                 rc = 0;
3785                         break;
3786                 }
3787                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3788                        params, buf);
3789                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3790                 if (rc)
3791                         break;
3792         }
3793
3794         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3795
3796 out_up:
3797         mutex_unlock(&fsdb->fsdb_mutex);
3798         RETURN(rc);
3799 }
3800
3801 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3802 {
3803         struct llog_ctxt        *ctxt;
3804         int                      rc = 0;
3805
3806         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3807         if (ctxt == NULL) {
3808                 CERROR("%s: MGS config context doesn't exist\n",
3809                        mgs->mgs_obd->obd_name);
3810                 rc = -ENODEV;
3811         } else {
3812                 rc = llog_erase(env, ctxt, NULL, name);
3813                 /* llog may not exist */
3814                 if (rc == -ENOENT)
3815                         rc = 0;
3816                 llog_ctxt_put(ctxt);
3817         }
3818
3819         if (rc)
3820                 CERROR("%s: failed to clear log %s: %d\n",
3821                        mgs->mgs_obd->obd_name, name, rc);
3822
3823         return rc;
3824 }
3825
3826 /* erase all logs for the given fs */
3827 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3828 {
3829         struct fs_db *fsdb;
3830         struct list_head log_list;
3831         struct mgs_direntry *dirent, *n;
3832         int rc, len = strlen(fsname);
3833         char *suffix;
3834         ENTRY;
3835
3836         /* Find all the logs in the CONFIGS directory */
3837         rc = class_dentry_readdir(env, mgs, &log_list);
3838         if (rc)
3839                 RETURN(rc);
3840
3841         mutex_lock(&mgs->mgs_mutex);
3842
3843         /* Delete the fs db */
3844         fsdb = mgs_find_fsdb(mgs, fsname);
3845         if (fsdb)
3846                 mgs_free_fsdb(mgs, fsdb);
3847
3848         mutex_unlock(&mgs->mgs_mutex);
3849
3850         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3851                 list_del_init(&dirent->mde_list);
3852                 suffix = strrchr(dirent->mde_name, '-');
3853                 if (suffix != NULL) {
3854                         if ((len == suffix - dirent->mde_name) &&
3855                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3856                                 CDEBUG(D_MGS, "Removing log %s\n",
3857                                        dirent->mde_name);
3858                                 mgs_erase_log(env, mgs, dirent->mde_name);
3859                         }
3860                 }
3861                 mgs_direntry_free(dirent);
3862         }
3863
3864         RETURN(rc);
3865 }
3866
3867 /* list all logs for the given fs */
3868 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3869                   struct obd_ioctl_data *data)
3870 {
3871         struct list_head         log_list;
3872         struct mgs_direntry     *dirent, *n;
3873         char                    *out, *suffix;
3874         int                      l, remains, rc;
3875
3876         ENTRY;
3877
3878         /* Find all the logs in the CONFIGS directory */
3879         rc = class_dentry_readdir(env, mgs, &log_list);
3880         if (rc)
3881                 RETURN(rc);
3882
3883         out = data->ioc_bulk;
3884         remains = data->ioc_inllen1;
3885         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3886                 list_del_init(&dirent->mde_list);
3887                 suffix = strrchr(dirent->mde_name, '-');
3888                 if (suffix != NULL) {
3889                         l = snprintf(out, remains, "config log: $%s\n",
3890                                      dirent->mde_name);
3891                         out += l;
3892                         remains -= l;
3893                 }
3894                 mgs_direntry_free(dirent);
3895                 if (remains <= 0)
3896                         break;
3897         }
3898         RETURN(rc);
3899 }
3900
3901 /* from llog_swab */
3902 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3903 {
3904         int i;
3905         ENTRY;
3906
3907         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3908         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3909
3910         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3911         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3912         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3913         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3914
3915         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3916         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3917                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3918                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3919                                i, lcfg->lcfg_buflens[i],
3920                                lustre_cfg_string(lcfg, i));
3921                 }
3922         EXIT;
3923 }
3924
3925 /* Setup _mgs fsdb and log
3926  */
3927 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3928                           struct fs_db *fsdb)
3929 {
3930         int                     rc;
3931         ENTRY;
3932
3933         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
3934
3935         RETURN(rc);
3936 }
3937
3938 /* Setup params fsdb and log
3939  */
3940 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3941                           struct fs_db *fsdb)
3942 {
3943         struct llog_handle      *params_llh = NULL;
3944         int                     rc;
3945         ENTRY;
3946
3947         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3948         if (fsdb != NULL) {
3949                 mutex_lock(&fsdb->fsdb_mutex);
3950                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3951                 if (rc == 0)
3952                         rc = record_end_log(env, &params_llh);
3953                 mutex_unlock(&fsdb->fsdb_mutex);
3954         }
3955
3956         RETURN(rc);
3957 }
3958
3959 /* Cleanup params fsdb and log
3960  */
3961 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3962 {
3963         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3964 }
3965
3966 /* Set a permanent (config log) param for a target or fs
3967  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3968  *             buf1 contains the single parameter
3969  */
3970 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3971                  struct lustre_cfg *lcfg, char *fsname)
3972 {
3973         struct fs_db *fsdb;
3974         struct mgs_target_info *mti;
3975         char *devname, *param;
3976         char *ptr;
3977         const char *tmp;
3978         __u32 index;
3979         int rc = 0;
3980         ENTRY;
3981
3982         print_lustre_cfg(lcfg);
3983
3984         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3985         devname = lustre_cfg_string(lcfg, 0);
3986         param = lustre_cfg_string(lcfg, 1);
3987         if (!devname) {
3988                 /* Assume device name embedded in param:
3989                    lustre-OST0000.osc.max_dirty_mb=32 */
3990                 ptr = strchr(param, '.');
3991                 if (ptr) {
3992                         devname = param;
3993                         *ptr = 0;
3994                         param = ptr + 1;
3995                 }
3996         }
3997         if (!devname) {
3998                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3999                 RETURN(-ENOSYS);
4000         }
4001
4002         rc = mgs_parse_devname(devname, fsname, NULL);
4003         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
4004                 /* param related to llite isn't allowed to set by OST or MDT */
4005                 if (rc == 0 && strncmp(param, PARAM_LLITE,
4006                                        sizeof(PARAM_LLITE) - 1) == 0)
4007                         RETURN(-EINVAL);
4008         } else {
4009                 /* assume devname is the fsname */
4010                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
4011         }
4012         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
4013
4014         rc = mgs_find_or_make_fsdb(env, mgs,
4015                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
4016                                    PARAMS_FILENAME : fsname, &fsdb);
4017         if (rc)
4018                 RETURN(rc);
4019
4020         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
4021             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
4022             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4023                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
4024                        "is '%s'\n", fsname, devname);
4025                 mgs_free_fsdb(mgs, fsdb);
4026                 RETURN(-EINVAL);
4027         }
4028
4029         /* Create a fake mti to hold everything */
4030         OBD_ALLOC_PTR(mti);
4031         if (!mti)
4032                 GOTO(out, rc = -ENOMEM);
4033         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
4034             >= sizeof(mti->mti_fsname))
4035                 GOTO(out, rc = -E2BIG);
4036         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
4037             >= sizeof(mti->mti_svname))
4038                 GOTO(out, rc = -E2BIG);
4039         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
4040             >= sizeof(mti->mti_params))
4041                 GOTO(out, rc = -E2BIG);
4042         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
4043         if (rc < 0)
4044                 /* Not a valid server; may be only fsname */
4045                 rc = 0;
4046         else
4047                 /* Strip -osc or -mdc suffix from svname */
4048                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
4049                                      mti->mti_svname))
4050                         GOTO(out, rc = -EINVAL);
4051         /*
4052          * Revoke lock so everyone updates.  Should be alright if
4053          * someone was already reading while we were updating the logs,
4054          * so we don't really need to hold the lock while we're
4055          * writing (above).
4056          */
4057         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
4058                 mti->mti_flags = rc | LDD_F_PARAM2;
4059                 mutex_lock(&fsdb->fsdb_mutex);
4060                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
4061                 mutex_unlock(&fsdb->fsdb_mutex);
4062                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
4063         } else {
4064                 mti->mti_flags = rc | LDD_F_PARAM;
4065                 mutex_lock(&fsdb->fsdb_mutex);
4066                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
4067                 mutex_unlock(&fsdb->fsdb_mutex);
4068                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4069         }
4070
4071 out:
4072         OBD_FREE_PTR(mti);
4073         RETURN(rc);
4074 }
4075
4076 static int mgs_write_log_pool(const struct lu_env *env,
4077                               struct mgs_device *mgs, char *logname,
4078                               struct fs_db *fsdb, char *tgtname,
4079                               enum lcfg_command_type cmd,
4080                               char *fsname, char *poolname,
4081                               char *ostname, char *comment)
4082 {
4083         struct llog_handle *llh = NULL;
4084         int rc;
4085
4086         rc = record_start_log(env, mgs, &llh, logname);
4087         if (rc)
4088                 return rc;
4089         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
4090         if (rc)
4091                 goto out;
4092         rc = record_base(env, llh, tgtname, 0, cmd,
4093                          fsname, poolname, ostname, NULL);
4094         if (rc)
4095                 goto out;
4096         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
4097 out:
4098         record_end_log(env, &llh);
4099         return rc;
4100 }
4101
4102 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
4103                     enum lcfg_command_type cmd, const char *nodemap_name,
4104                     char *param)
4105 {
4106         lnet_nid_t      nid[2];
4107         __u32           idmap[2];
4108         bool            bool_switch;
4109         __u32           int_id;
4110         int             rc = 0;
4111         ENTRY;
4112
4113         switch (cmd) {
4114         case LCFG_NODEMAP_ADD:
4115                 rc = nodemap_add(nodemap_name);
4116                 break;
4117         case LCFG_NODEMAP_DEL:
4118                 rc = nodemap_del(nodemap_name);
4119                 break;
4120         case LCFG_NODEMAP_ADD_RANGE:
4121                 rc = nodemap_parse_range(param, nid);
4122                 if (rc != 0)
4123                         break;
4124                 rc = nodemap_add_range(nodemap_name, nid);
4125                 break;
4126         case LCFG_NODEMAP_DEL_RANGE:
4127                 rc = nodemap_parse_range(param, nid);
4128                 if (rc != 0)
4129                         break;
4130                 rc = nodemap_del_range(nodemap_name, nid);
4131                 break;
4132         case LCFG_NODEMAP_ADMIN:
4133                 bool_switch = simple_strtoul(param, NULL, 10);
4134                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4135                 break;
4136         case LCFG_NODEMAP_DENY_UNKNOWN:
4137                 bool_switch = simple_strtoul(param, NULL, 10);
4138                 rc = nodemap_set_deny_unknown(nodemap_name, bool_switch);
4139                 break;
4140         case LCFG_NODEMAP_TRUSTED:
4141                 bool_switch = simple_strtoul(param, NULL, 10);
4142                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4143                 break;
4144         case LCFG_NODEMAP_SQUASH_UID:
4145                 int_id = simple_strtoul(param, NULL, 10);
4146                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4147                 break;
4148         case LCFG_NODEMAP_SQUASH_GID:
4149                 int_id = simple_strtoul(param, NULL, 10);
4150                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4151                 break;
4152         case LCFG_NODEMAP_ADD_UIDMAP:
4153         case LCFG_NODEMAP_ADD_GIDMAP:
4154                 rc = nodemap_parse_idmap(param, idmap);
4155                 if (rc != 0)
4156                         break;
4157                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4158                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4159                                                idmap);
4160                 else
4161                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4162                                                idmap);
4163                 break;
4164         case LCFG_NODEMAP_DEL_UIDMAP:
4165         case LCFG_NODEMAP_DEL_GIDMAP:
4166                 rc = nodemap_parse_idmap(param, idmap);
4167                 if (rc != 0)
4168                         break;
4169                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4170                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4171                                                idmap);
4172                 else
4173                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4174                                                idmap);
4175                 break;
4176         case LCFG_NODEMAP_SET_FILESET:
4177                 rc = nodemap_set_fileset(nodemap_name, param);
4178                 break;
4179         default:
4180                 rc = -EINVAL;
4181         }
4182
4183         RETURN(rc);
4184 }
4185
4186 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4187                  enum lcfg_command_type cmd, char *fsname,
4188                  char *poolname, char *ostname)
4189 {
4190         struct fs_db *fsdb;
4191         char *lovname;
4192         char *logname;
4193         char *label = NULL, *canceled_label = NULL;
4194         int label_sz;
4195         struct mgs_target_info *mti = NULL;
4196         int rc, i;
4197         ENTRY;
4198
4199         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4200         if (rc) {
4201                 CERROR("Can't get db for %s\n", fsname);
4202                 RETURN(rc);
4203         }
4204         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4205                 CERROR("%s is not defined\n", fsname);
4206                 mgs_free_fsdb(mgs, fsdb);
4207                 RETURN(-EINVAL);
4208         }
4209
4210         label_sz = 10 + strlen(fsname) + strlen(poolname);
4211
4212         /* check if ostname match fsname */
4213         if (ostname != NULL) {
4214                 char *ptr;
4215
4216                 ptr = strrchr(ostname, '-');
4217                 if ((ptr == NULL) ||
4218                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4219                         RETURN(-EINVAL);
4220                 label_sz += strlen(ostname);
4221         }
4222
4223         OBD_ALLOC(label, label_sz);
4224         if (label == NULL)
4225                 RETURN(-ENOMEM);
4226
4227         switch(cmd) {
4228         case LCFG_POOL_NEW:
4229                 sprintf(label,
4230                         "new %s.%s", fsname, poolname);
4231                 break;
4232         case LCFG_POOL_ADD:
4233                 sprintf(label,
4234                         "add %s.%s.%s", fsname, poolname, ostname);
4235                 break;
4236         case LCFG_POOL_REM:
4237                 OBD_ALLOC(canceled_label, label_sz);
4238                 if (canceled_label == NULL)
4239                         GOTO(out_label, rc = -ENOMEM);
4240                 sprintf(label,
4241                         "rem %s.%s.%s", fsname, poolname, ostname);
4242                 sprintf(canceled_label,
4243                         "add %s.%s.%s", fsname, poolname, ostname);
4244                 break;
4245         case LCFG_POOL_DEL:
4246                 OBD_ALLOC(canceled_label, label_sz);
4247                 if (canceled_label == NULL)
4248                         GOTO(out_label, rc = -ENOMEM);
4249                 sprintf(label,
4250                         "del %s.%s", fsname, poolname);
4251                 sprintf(canceled_label,
4252                         "new %s.%s", fsname, poolname);
4253                 break;
4254         default:
4255                 break;
4256         }
4257
4258         if (canceled_label != NULL) {
4259                 OBD_ALLOC_PTR(mti);
4260                 if (mti == NULL)
4261                         GOTO(out_cancel, rc = -ENOMEM);
4262         }
4263
4264         mutex_lock(&fsdb->fsdb_mutex);
4265         /* write pool def to all MDT logs */
4266         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4267                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4268                         rc = name_create_mdt_and_lov(&logname, &lovname,
4269                                                      fsdb, i);
4270                         if (rc) {
4271                                 mutex_unlock(&fsdb->fsdb_mutex);
4272                                 GOTO(out_mti, rc);
4273                         }
4274                         if (canceled_label != NULL) {
4275                                 strcpy(mti->mti_svname, "lov pool");
4276                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4277                                                 lovname, canceled_label,
4278                                                 CM_SKIP);
4279                         }
4280
4281                         if (rc >= 0)
4282                                 rc = mgs_write_log_pool(env, mgs, logname,
4283                                                         fsdb, lovname, cmd,
4284                                                         fsname, poolname,
4285                                                         ostname, label);
4286                         name_destroy(&logname);
4287                         name_destroy(&lovname);
4288                         if (rc) {
4289                                 mutex_unlock(&fsdb->fsdb_mutex);
4290                                 GOTO(out_mti, rc);
4291                         }
4292                 }
4293         }
4294
4295         rc = name_create(&logname, fsname, "-client");
4296         if (rc) {
4297                 mutex_unlock(&fsdb->fsdb_mutex);
4298                 GOTO(out_mti, rc);
4299         }
4300         if (canceled_label != NULL) {
4301                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4302                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4303                 if (rc < 0) {
4304                         mutex_unlock(&fsdb->fsdb_mutex);
4305                         name_destroy(&logname);
4306                         GOTO(out_mti, rc);
4307                 }
4308         }
4309
4310         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4311                                 cmd, fsname, poolname, ostname, label);
4312         mutex_unlock(&fsdb->fsdb_mutex);
4313         name_destroy(&logname);
4314         /* request for update */
4315         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4316
4317         EXIT;
4318 out_mti:
4319         if (mti != NULL)
4320                 OBD_FREE_PTR(mti);
4321 out_cancel:
4322         if (canceled_label != NULL)
4323                 OBD_FREE(canceled_label, label_sz);
4324 out_label:
4325         OBD_FREE(label, label_sz);
4326         return rc;
4327 }