Whamcloud - gitweb
LU-4781 utils: handle EEXIST in lustre_rsync
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(ERR_PTR(-EINVAL));
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(ERR_PTR(-ENOMEM));
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351                 fsdb->fsdb_mgs = mgs;
352         } else {
353                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
354                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
355                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
356                         CERROR("No memory for index maps\n");
357                         GOTO(err, rc = -ENOMEM);
358                 }
359
360                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
361                 if (rc)
362                         GOTO(err, rc);
363                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
364                 if (rc)
365                         GOTO(err, rc);
366
367                 /* initialise data for NID table */
368                 mgs_ir_init_fs(env, mgs, fsdb);
369
370                 lproc_mgs_add_live(mgs, fsdb);
371         }
372
373         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
374
375         RETURN(fsdb);
376 err:
377         if (fsdb->fsdb_ost_index_map)
378                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
379         if (fsdb->fsdb_mdt_index_map)
380                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
381         name_destroy(&fsdb->fsdb_clilov);
382         name_destroy(&fsdb->fsdb_clilmv);
383         OBD_FREE_PTR(fsdb);
384         RETURN(ERR_PTR(rc));
385 }
386
387 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
388 {
389         /* wait for anyone with the sem */
390         mutex_lock(&fsdb->fsdb_mutex);
391         lproc_mgs_del_live(mgs, fsdb);
392         list_del(&fsdb->fsdb_list);
393
394         /* deinitialize fsr */
395         mgs_ir_fini_fs(mgs, fsdb);
396
397         if (fsdb->fsdb_ost_index_map)
398                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
399         if (fsdb->fsdb_mdt_index_map)
400                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
401         name_destroy(&fsdb->fsdb_clilov);
402         name_destroy(&fsdb->fsdb_clilmv);
403         mgs_free_fsdb_srpc(fsdb);
404         mutex_unlock(&fsdb->fsdb_mutex);
405         OBD_FREE_PTR(fsdb);
406 }
407
408 int mgs_init_fsdb_list(struct mgs_device *mgs)
409 {
410         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
411         return 0;
412 }
413
414 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
415 {
416         struct fs_db *fsdb;
417         struct list_head *tmp, *tmp2;
418
419         mutex_lock(&mgs->mgs_mutex);
420         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
421                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
422                 mgs_free_fsdb(mgs, fsdb);
423         }
424         mutex_unlock(&mgs->mgs_mutex);
425         return 0;
426 }
427
428 int mgs_find_or_make_fsdb(const struct lu_env *env,
429                           struct mgs_device *mgs, char *name,
430                           struct fs_db **dbh)
431 {
432         struct fs_db *fsdb;
433         int rc = 0;
434
435         ENTRY;
436         mutex_lock(&mgs->mgs_mutex);
437         fsdb = mgs_find_fsdb(mgs, name);
438         if (fsdb) {
439                 mutex_unlock(&mgs->mgs_mutex);
440                 *dbh = fsdb;
441                 RETURN(0);
442         }
443
444         CDEBUG(D_MGS, "Creating new db\n");
445         fsdb = mgs_new_fsdb(env, mgs, name);
446         /* lock fsdb_mutex until the db is loaded from llogs */
447         if (!IS_ERR(fsdb))
448                 mutex_lock(&fsdb->fsdb_mutex);
449         mutex_unlock(&mgs->mgs_mutex);
450         if (IS_ERR(fsdb))
451                 RETURN(PTR_ERR(fsdb));
452
453         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
454                 /* populate the db from the client llog */
455                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
456                 if (rc) {
457                         CERROR("Can't get db from client log %d\n", rc);
458                         GOTO(out_free, rc);
459                 }
460         }
461
462         /* populate srpc rules from params llog */
463         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
464         if (rc) {
465                 CERROR("Can't get db from params log %d\n", rc);
466                 GOTO(out_free, rc);
467         }
468
469         mutex_unlock(&fsdb->fsdb_mutex);
470         *dbh = fsdb;
471
472         RETURN(0);
473
474 out_free:
475         mutex_unlock(&fsdb->fsdb_mutex);
476         mgs_free_fsdb(mgs, fsdb);
477         return rc;
478 }
479
480 /* 1 = index in use
481    0 = index unused
482    -1= empty client log */
483 int mgs_check_index(const struct lu_env *env,
484                     struct mgs_device *mgs,
485                     struct mgs_target_info *mti)
486 {
487         struct fs_db *fsdb;
488         void *imap;
489         int rc = 0;
490         ENTRY;
491
492         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
493
494         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
495         if (rc) {
496                 CERROR("Can't get db for %s\n", mti->mti_fsname);
497                 RETURN(rc);
498         }
499
500         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
501                 RETURN(-1);
502
503         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
504                 imap = fsdb->fsdb_ost_index_map;
505         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
506                 imap = fsdb->fsdb_mdt_index_map;
507         else
508                 RETURN(-EINVAL);
509
510         if (test_bit(mti->mti_stripe_index, imap))
511                 RETURN(1);
512         RETURN(0);
513 }
514
515 static __inline__ int next_index(void *index_map, int map_len)
516 {
517         int i;
518         for (i = 0; i < map_len * 8; i++)
519                 if (!test_bit(i, index_map)) {
520                          return i;
521                  }
522         CERROR("max index %d exceeded.\n", i);
523         return -1;
524 }
525
526 /* Return codes:
527         0  newly marked as in use
528         <0 err
529         +EALREADY for update of an old index */
530 static int mgs_set_index(const struct lu_env *env,
531                          struct mgs_device *mgs,
532                          struct mgs_target_info *mti)
533 {
534         struct fs_db *fsdb;
535         void *imap;
536         int rc = 0;
537         ENTRY;
538
539         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
540         if (rc) {
541                 CERROR("Can't get db for %s\n", mti->mti_fsname);
542                 RETURN(rc);
543         }
544
545         mutex_lock(&fsdb->fsdb_mutex);
546         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
547                 imap = fsdb->fsdb_ost_index_map;
548         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
549                 imap = fsdb->fsdb_mdt_index_map;
550         } else {
551                 GOTO(out_up, rc = -EINVAL);
552         }
553
554         if (mti->mti_flags & LDD_F_NEED_INDEX) {
555                 rc = next_index(imap, INDEX_MAP_SIZE);
556                 if (rc == -1)
557                         GOTO(out_up, rc = -ERANGE);
558                 mti->mti_stripe_index = rc;
559                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
560                         fsdb->fsdb_mdt_count ++;
561         }
562
563         /* the last index(0xffff) is reserved for default value. */
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
566                                    "but index must be less than %u.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8 - 1);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 rc = llog_write(env, llh, rec, rec->lrh_index);
648                 if (!rc)
649                          mml->mml_modified++;
650         }
651
652         RETURN(rc);
653 }
654
655 /**
656  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
657  * Return code:
658  * 0 - modified successfully,
659  * 1 - no modification was done
660  * negative - error
661  */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct llog_ctxt *ctxt;
668         struct mgs_modify_lookup *mml;
669         int rc;
670
671         ENTRY;
672
673         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
674         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
675                flags);
676
677         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
678         LASSERT(ctxt != NULL);
679         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
680         if (rc < 0) {
681                 if (rc == -ENOENT)
682                         rc = 0;
683                 GOTO(out_pop, rc);
684         }
685
686         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
687         if (rc)
688                 GOTO(out_close, rc);
689
690         if (llog_get_size(loghandle) <= 1)
691                 GOTO(out_close, rc = 0);
692
693         OBD_ALLOC_PTR(mml);
694         if (!mml)
695                 GOTO(out_close, rc = -ENOMEM);
696         if (strlcpy(mml->mml_marker.cm_comment, comment,
697                     sizeof(mml->mml_marker.cm_comment)) >=
698             sizeof(mml->mml_marker.cm_comment))
699                 GOTO(out_free, rc = -E2BIG);
700         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
701                     sizeof(mml->mml_marker.cm_tgtname)) >=
702             sizeof(mml->mml_marker.cm_tgtname))
703                 GOTO(out_free, rc = -E2BIG);
704         /* Modify mostly means cancel */
705         mml->mml_marker.cm_flags = flags;
706         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
707         mml->mml_modified = 0;
708         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
709                           NULL);
710         if (!rc && !mml->mml_modified)
711                 rc = 1;
712
713 out_free:
714         OBD_FREE_PTR(mml);
715
716 out_close:
717         llog_close(env, loghandle);
718 out_pop:
719         if (rc < 0)
720                 CERROR("%s: modify %s/%s failed: rc = %d\n",
721                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
722         llog_ctxt_put(ctxt);
723         RETURN(rc);
724 }
725
726 /** This structure is passed to mgs_replace_handler */
727 struct mgs_replace_uuid_lookup {
728         /* Nids are replaced for this target device */
729         struct mgs_target_info target;
730         /* Temporary modified llog */
731         struct llog_handle *temp_llh;
732         /* Flag is set if in target block*/
733         int in_target_device;
734         /* Nids already added. Just skip (multiple nids) */
735         int device_nids_added;
736         /* Flag is set if this block should not be copied */
737         int skip_it;
738 };
739
740 /**
741  * Check: a) if block should be skipped
742  * b) is it target block
743  *
744  * \param[in] lcfg
745  * \param[in] mrul
746  *
747  * \retval 0 should not to be skipped
748  * \retval 1 should to be skipped
749  */
750 static int check_markers(struct lustre_cfg *lcfg,
751                          struct mgs_replace_uuid_lookup *mrul)
752 {
753          struct cfg_marker *marker;
754
755         /* Track markers. Find given device */
756         if (lcfg->lcfg_command == LCFG_MARKER) {
757                 marker = lustre_cfg_buf(lcfg, 1);
758                 /* Clean llog from records marked as CM_EXCLUDE.
759                    CM_SKIP records are used for "active" command
760                    and can be restored if needed */
761                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
762                     (CM_EXCLUDE | CM_START)) {
763                         mrul->skip_it = 1;
764                         return 1;
765                 }
766
767                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
768                     (CM_EXCLUDE | CM_END)) {
769                         mrul->skip_it = 0;
770                         return 1;
771                 }
772
773                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
774                         LASSERT(!(marker->cm_flags & CM_START) ||
775                                 !(marker->cm_flags & CM_END));
776                         if (marker->cm_flags & CM_START) {
777                                 mrul->in_target_device = 1;
778                                 mrul->device_nids_added = 0;
779                         } else if (marker->cm_flags & CM_END)
780                                 mrul->in_target_device = 0;
781                 }
782         }
783
784         return 0;
785 }
786
787 static int record_base(const struct lu_env *env, struct llog_handle *llh,
788                      char *cfgname, lnet_nid_t nid, int cmd,
789                      char *s1, char *s2, char *s3, char *s4)
790 {
791         struct mgs_thread_info  *mgi = mgs_env_info(env);
792         struct llog_cfg_rec     *lcr;
793         int rc;
794
795         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
796                cmd, s1, s2, s3, s4);
797
798         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
799         if (s1)
800                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
801         if (s2)
802                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
803         if (s3)
804                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
805         if (s4)
806                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
807
808         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
809         if (lcr == NULL)
810                 return -ENOMEM;
811
812         lcr->lcr_cfg.lcfg_nid = nid;
813         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
814
815         lustre_cfg_rec_free(lcr);
816
817         if (rc < 0)
818                 CDEBUG(D_MGS,
819                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
820                        cfgname, cmd, s1, s2, s3, s4, rc);
821         return rc;
822 }
823
824 static inline int record_add_uuid(const struct lu_env *env,
825                                   struct llog_handle *llh,
826                                   uint64_t nid, char *uuid)
827 {
828         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
829                            NULL, NULL, NULL);
830 }
831
832 static inline int record_add_conn(const struct lu_env *env,
833                                   struct llog_handle *llh,
834                                   char *devname, char *uuid)
835 {
836         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
837                            NULL, NULL, NULL);
838 }
839
840 static inline int record_attach(const struct lu_env *env,
841                                 struct llog_handle *llh, char *devname,
842                                 char *type, char *uuid)
843 {
844         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
845                            NULL, NULL);
846 }
847
848 static inline int record_setup(const struct lu_env *env,
849                                struct llog_handle *llh, char *devname,
850                                char *s1, char *s2, char *s3, char *s4)
851 {
852         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
853 }
854
855 /**
856  * \retval <0 record processing error
857  * \retval n record is processed. No need copy original one.
858  * \retval 0 record is not processed.
859  */
860 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
861                            struct mgs_replace_uuid_lookup *mrul)
862 {
863         int nids_added = 0;
864         lnet_nid_t nid;
865         char *ptr;
866         int rc;
867
868         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
869                 /* LCFG_ADD_UUID command found. Let's skip original command
870                    and add passed nids */
871                 ptr = mrul->target.mti_params;
872                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
873                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
874                                "device %s\n", libcfs_nid2str(nid),
875                                 mrul->target.mti_params,
876                                 mrul->target.mti_svname);
877                         rc = record_add_uuid(env,
878                                              mrul->temp_llh, nid,
879                                              mrul->target.mti_params);
880                         if (!rc)
881                                 nids_added++;
882                 }
883
884                 if (nids_added == 0) {
885                         CERROR("No new nids were added, nid %s with uuid %s, "
886                                "device %s\n", libcfs_nid2str(nid),
887                                mrul->target.mti_params,
888                                mrul->target.mti_svname);
889                         RETURN(-ENXIO);
890                 } else {
891                         mrul->device_nids_added = 1;
892                 }
893
894                 return nids_added;
895         }
896
897         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
898                 /* LCFG_SETUP command found. UUID should be changed */
899                 rc = record_setup(env,
900                                   mrul->temp_llh,
901                                   /* devname the same */
902                                   lustre_cfg_string(lcfg, 0),
903                                   /* s1 is not changed */
904                                   lustre_cfg_string(lcfg, 1),
905                                   /* new uuid should be
906                                   the full nidlist */
907                                   mrul->target.mti_params,
908                                   /* s3 is not changed */
909                                   lustre_cfg_string(lcfg, 3),
910                                   /* s4 is not changed */
911                                   lustre_cfg_string(lcfg, 4));
912                 return rc ? rc : 1;
913         }
914
915         /* Another commands in target device block */
916         return 0;
917 }
918
919 /**
920  * Handler that called for every record in llog.
921  * Records are processed in order they placed in llog.
922  *
923  * \param[in] llh       log to be processed
924  * \param[in] rec       current record
925  * \param[in] data      mgs_replace_uuid_lookup structure
926  *
927  * \retval 0    success
928  */
929 static int mgs_replace_handler(const struct lu_env *env,
930                                struct llog_handle *llh,
931                                struct llog_rec_hdr *rec,
932                                void *data)
933 {
934         struct mgs_replace_uuid_lookup *mrul;
935         struct lustre_cfg *lcfg = REC_DATA(rec);
936         int cfg_len = REC_DATA_LEN(rec);
937         int rc;
938         ENTRY;
939
940         mrul = (struct mgs_replace_uuid_lookup *)data;
941
942         if (rec->lrh_type != OBD_CFG_REC) {
943                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
944                        rec->lrh_type, lcfg->lcfg_command,
945                        lustre_cfg_string(lcfg, 0),
946                        lustre_cfg_string(lcfg, 1));
947                 RETURN(-EINVAL);
948         }
949
950         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
951         if (rc) {
952                 /* Do not copy any invalidated records */
953                 GOTO(skip_out, rc = 0);
954         }
955
956         rc = check_markers(lcfg, mrul);
957         if (rc || mrul->skip_it)
958                 GOTO(skip_out, rc = 0);
959
960         /* Write to new log all commands outside target device block */
961         if (!mrul->in_target_device)
962                 GOTO(copy_out, rc = 0);
963
964         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
965            (failover nids) for this target, assuming that if then
966            primary is changing then so is the failover */
967         if (mrul->device_nids_added &&
968             (lcfg->lcfg_command == LCFG_ADD_UUID ||
969              lcfg->lcfg_command == LCFG_ADD_CONN))
970                 GOTO(skip_out, rc = 0);
971
972         rc = process_command(env, lcfg, mrul);
973         if (rc < 0)
974                 RETURN(rc);
975
976         if (rc)
977                 RETURN(0);
978 copy_out:
979         /* Record is placed in temporary llog as is */
980         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
981
982         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
983                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
984                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
985         RETURN(rc);
986
987 skip_out:
988         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
989                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
990                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
991         RETURN(rc);
992 }
993
994 static int mgs_log_is_empty(const struct lu_env *env,
995                             struct mgs_device *mgs, char *name)
996 {
997         struct llog_ctxt        *ctxt;
998         int                      rc;
999
1000         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1001         LASSERT(ctxt != NULL);
1002
1003         rc = llog_is_empty(env, ctxt, name);
1004         llog_ctxt_put(ctxt);
1005         return rc;
1006 }
1007
1008 static int mgs_replace_nids_log(const struct lu_env *env,
1009                                 struct obd_device *mgs, struct fs_db *fsdb,
1010                                 char *logname, char *devname, char *nids)
1011 {
1012         struct llog_handle *orig_llh, *backup_llh;
1013         struct llog_ctxt *ctxt;
1014         struct mgs_replace_uuid_lookup *mrul;
1015         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1016         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1017         char *backup;
1018         int rc, rc2;
1019         ENTRY;
1020
1021         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1022
1023         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1024         LASSERT(ctxt != NULL);
1025
1026         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1027                 /* Log is empty. Nothing to replace */
1028                 GOTO(out_put, rc = 0);
1029         }
1030
1031         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1032         if (backup == NULL)
1033                 GOTO(out_put, rc = -ENOMEM);
1034
1035         sprintf(backup, "%s.bak", logname);
1036
1037         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1038         if (rc == 0) {
1039                 /* Now erase original log file. Connections are not allowed.
1040                    Backup is already saved */
1041                 rc = llog_erase(env, ctxt, NULL, logname);
1042                 if (rc < 0)
1043                         GOTO(out_free, rc);
1044         } else if (rc != -ENOENT) {
1045                 CERROR("%s: can't make backup for %s: rc = %d\n",
1046                        mgs->obd_name, logname, rc);
1047                 GOTO(out_free,rc);
1048         }
1049
1050         /* open local log */
1051         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1052         if (rc)
1053                 GOTO(out_restore, rc);
1054
1055         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1056         if (rc)
1057                 GOTO(out_closel, rc);
1058
1059         /* open backup llog */
1060         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1061                        LLOG_OPEN_EXISTS);
1062         if (rc)
1063                 GOTO(out_closel, rc);
1064
1065         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1066         if (rc)
1067                 GOTO(out_close, rc);
1068
1069         if (llog_get_size(backup_llh) <= 1)
1070                 GOTO(out_close, rc = 0);
1071
1072         OBD_ALLOC_PTR(mrul);
1073         if (!mrul)
1074                 GOTO(out_close, rc = -ENOMEM);
1075         /* devname is only needed information to replace UUID records */
1076         strlcpy(mrul->target.mti_svname, devname,
1077                 sizeof(mrul->target.mti_svname));
1078         /* parse nids later */
1079         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1080         /* Copy records to this temporary llog */
1081         mrul->temp_llh = orig_llh;
1082
1083         rc = llog_process(env, backup_llh, mgs_replace_handler,
1084                           (void *)mrul, NULL);
1085         OBD_FREE_PTR(mrul);
1086 out_close:
1087         rc2 = llog_close(NULL, backup_llh);
1088         if (!rc)
1089                 rc = rc2;
1090 out_closel:
1091         rc2 = llog_close(NULL, orig_llh);
1092         if (!rc)
1093                 rc = rc2;
1094
1095 out_restore:
1096         if (rc) {
1097                 CERROR("%s: llog should be restored: rc = %d\n",
1098                        mgs->obd_name, rc);
1099                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1100                                   logname);
1101                 if (rc2 < 0)
1102                         CERROR("%s: can't restore backup %s: rc = %d\n",
1103                                mgs->obd_name, logname, rc2);
1104         }
1105
1106 out_free:
1107         OBD_FREE(backup, strlen(backup) + 1);
1108
1109 out_put:
1110         llog_ctxt_put(ctxt);
1111
1112         if (rc)
1113                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1114                        mgs->obd_name, logname, rc);
1115
1116         RETURN(rc);
1117 }
1118
1119 /**
1120  * Parse device name and get file system name and/or device index
1121  *
1122  * \param[in]   devname device name (ex. lustre-MDT0000)
1123  * \param[out]  fsname  file system name(optional)
1124  * \param[out]  index   device index(optional)
1125  *
1126  * \retval 0    success
1127  */
1128 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1129 {
1130         int rc;
1131         ENTRY;
1132
1133         /* Extract fsname */
1134         if (fsname) {
1135                 rc = server_name2fsname(devname, fsname, NULL);
1136                 if (rc < 0) {
1137                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1138                                devname);
1139                         RETURN(-EINVAL);
1140                 }
1141         }
1142
1143         if (index) {
1144                 rc = server_name2index(devname, index, NULL);
1145                 if (rc < 0) {
1146                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1147                                devname);
1148                         RETURN(-EINVAL);
1149                 }
1150         }
1151
1152         RETURN(0);
1153 }
1154
1155 /* This is only called during replace_nids */
1156 static int only_mgs_is_running(struct obd_device *mgs_obd)
1157 {
1158         /* TDB: Is global variable with devices count exists? */
1159         int num_devices = get_devices_count();
1160         int num_exports = 0;
1161         struct obd_export *exp;
1162
1163         spin_lock(&mgs_obd->obd_dev_lock);
1164         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1165                 /* skip self export */
1166                 if (exp == mgs_obd->obd_self_export)
1167                         continue;
1168                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1169                         continue;
1170
1171                 ++num_exports;
1172
1173                 CERROR("%s: node %s still connected during replace_nids "
1174                        "connect_flags:%llx\n",
1175                        mgs_obd->obd_name,
1176                        libcfs_nid2str(exp->exp_nid_stats->nid),
1177                        exp_connect_flags(exp));
1178
1179         }
1180         spin_unlock(&mgs_obd->obd_dev_lock);
1181
1182         /* osd, MGS and MGC + self_export
1183            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1184         return (num_devices <= 3) && (num_exports == 0);
1185 }
1186
1187 static int name_create_mdt(char **logname, char *fsname, int i)
1188 {
1189         char mdt_index[9];
1190
1191         sprintf(mdt_index, "-MDT%04x", i);
1192         return name_create(logname, fsname, mdt_index);
1193 }
1194
1195 /**
1196  * Replace nids for \a device to \a nids values
1197  *
1198  * \param obd           MGS obd device
1199  * \param devname       nids need to be replaced for this device
1200  * (ex. lustre-OST0000)
1201  * \param nids          nids list (ex. nid1,nid2,nid3)
1202  *
1203  * \retval 0    success
1204  */
1205 int mgs_replace_nids(const struct lu_env *env,
1206                      struct mgs_device *mgs,
1207                      char *devname, char *nids)
1208 {
1209         /* Assume fsname is part of device name */
1210         char fsname[MTI_NAME_MAXLEN];
1211         int rc;
1212         __u32 index;
1213         char *logname;
1214         struct fs_db *fsdb;
1215         unsigned int i;
1216         int conn_state;
1217         struct obd_device *mgs_obd = mgs->mgs_obd;
1218         ENTRY;
1219
1220         /* We can only change NIDs if no other nodes are connected */
1221         spin_lock(&mgs_obd->obd_dev_lock);
1222         conn_state = mgs_obd->obd_no_conn;
1223         mgs_obd->obd_no_conn = 1;
1224         spin_unlock(&mgs_obd->obd_dev_lock);
1225
1226         /* We can not change nids if not only MGS is started */
1227         if (!only_mgs_is_running(mgs_obd)) {
1228                 CERROR("Only MGS is allowed to be started\n");
1229                 GOTO(out, rc = -EINPROGRESS);
1230         }
1231
1232         /* Get fsname and index*/
1233         rc = mgs_parse_devname(devname, fsname, &index);
1234         if (rc)
1235                 GOTO(out, rc);
1236
1237         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1238         if (rc) {
1239                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1240                 GOTO(out, rc);
1241         }
1242
1243         /* Process client llogs */
1244         name_create(&logname, fsname, "-client");
1245         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1246         name_destroy(&logname);
1247         if (rc) {
1248                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1249                        fsname, devname, rc);
1250                 GOTO(out, rc);
1251         }
1252
1253         /* Process MDT llogs */
1254         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1255                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1256                         continue;
1257                 name_create_mdt(&logname, fsname, i);
1258                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1259                 name_destroy(&logname);
1260                 if (rc)
1261                         GOTO(out, rc);
1262         }
1263
1264 out:
1265         spin_lock(&mgs_obd->obd_dev_lock);
1266         mgs_obd->obd_no_conn = conn_state;
1267         spin_unlock(&mgs_obd->obd_dev_lock);
1268
1269         RETURN(rc);
1270 }
1271
1272 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1273                             char *devname, struct lov_desc *desc)
1274 {
1275         struct mgs_thread_info  *mgi = mgs_env_info(env);
1276         struct llog_cfg_rec     *lcr;
1277         int rc;
1278
1279         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1280         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1281         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1282         if (lcr == NULL)
1283                 return -ENOMEM;
1284
1285         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1286         lustre_cfg_rec_free(lcr);
1287         return rc;
1288 }
1289
1290 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1291                             char *devname, struct lmv_desc *desc)
1292 {
1293         struct mgs_thread_info  *mgi = mgs_env_info(env);
1294         struct llog_cfg_rec     *lcr;
1295         int rc;
1296
1297         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1298         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1299         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1300         if (lcr == NULL)
1301                 return -ENOMEM;
1302
1303         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1304         lustre_cfg_rec_free(lcr);
1305         return rc;
1306 }
1307
1308 static inline int record_mdc_add(const struct lu_env *env,
1309                                  struct llog_handle *llh,
1310                                  char *logname, char *mdcuuid,
1311                                  char *mdtuuid, char *index,
1312                                  char *gen)
1313 {
1314         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1315                            mdtuuid,index,gen,mdcuuid);
1316 }
1317
1318 static inline int record_lov_add(const struct lu_env *env,
1319                                  struct llog_handle *llh,
1320                                  char *lov_name, char *ost_uuid,
1321                                  char *index, char *gen)
1322 {
1323         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1324                            ost_uuid, index, gen, NULL);
1325 }
1326
1327 static inline int record_mount_opt(const struct lu_env *env,
1328                                    struct llog_handle *llh,
1329                                    char *profile, char *lov_name,
1330                                    char *mdc_name)
1331 {
1332         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1333                            profile, lov_name, mdc_name, NULL);
1334 }
1335
1336 static int record_marker(const struct lu_env *env,
1337                          struct llog_handle *llh,
1338                          struct fs_db *fsdb, __u32 flags,
1339                          char *tgtname, char *comment)
1340 {
1341         struct mgs_thread_info *mgi = mgs_env_info(env);
1342         struct llog_cfg_rec *lcr;
1343         int rc;
1344         int cplen = 0;
1345
1346         if (flags & CM_START)
1347                 fsdb->fsdb_gen++;
1348         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1349         mgi->mgi_marker.cm_flags = flags;
1350         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1351         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1352                         sizeof(mgi->mgi_marker.cm_tgtname));
1353         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1354                 return -E2BIG;
1355         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1356                         sizeof(mgi->mgi_marker.cm_comment));
1357         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1358                 return -E2BIG;
1359         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1360         mgi->mgi_marker.cm_canceltime = 0;
1361         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1362         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1363                             sizeof(mgi->mgi_marker));
1364         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1365         if (lcr == NULL)
1366                 return -ENOMEM;
1367
1368         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1369         lustre_cfg_rec_free(lcr);
1370         return rc;
1371 }
1372
1373 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1374                             struct llog_handle **llh, char *name)
1375 {
1376         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1377         struct llog_ctxt        *ctxt;
1378         int                      rc = 0;
1379         ENTRY;
1380
1381         if (*llh)
1382                 GOTO(out, rc = -EBUSY);
1383
1384         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1385         if (!ctxt)
1386                 GOTO(out, rc = -ENODEV);
1387         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1388
1389         rc = llog_open_create(env, ctxt, llh, NULL, name);
1390         if (rc)
1391                 GOTO(out_ctxt, rc);
1392         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1393         if (rc)
1394                 llog_close(env, *llh);
1395 out_ctxt:
1396         llog_ctxt_put(ctxt);
1397 out:
1398         if (rc) {
1399                 CERROR("%s: can't start log %s: rc = %d\n",
1400                        mgs->mgs_obd->obd_name, name, rc);
1401                 *llh = NULL;
1402         }
1403         RETURN(rc);
1404 }
1405
1406 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1407 {
1408         int rc;
1409
1410         rc = llog_close(env, *llh);
1411         *llh = NULL;
1412
1413         return rc;
1414 }
1415
1416 /******************** config "macros" *********************/
1417
1418 /* write an lcfg directly into a log (with markers) */
1419 static int mgs_write_log_direct(const struct lu_env *env,
1420                                 struct mgs_device *mgs, struct fs_db *fsdb,
1421                                 char *logname, struct llog_cfg_rec *lcr,
1422                                 char *devname, char *comment)
1423 {
1424         struct llog_handle *llh = NULL;
1425         int rc;
1426
1427         ENTRY;
1428
1429         rc = record_start_log(env, mgs, &llh, logname);
1430         if (rc)
1431                 RETURN(rc);
1432
1433         /* FIXME These should be a single journal transaction */
1434         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1435         if (rc)
1436                 GOTO(out_end, rc);
1437         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1438         if (rc)
1439                 GOTO(out_end, rc);
1440         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1441         if (rc)
1442                 GOTO(out_end, rc);
1443 out_end:
1444         record_end_log(env, &llh);
1445         RETURN(rc);
1446 }
1447
1448 /* write the lcfg in all logs for the given fs */
1449 static int mgs_write_log_direct_all(const struct lu_env *env,
1450                                     struct mgs_device *mgs,
1451                                     struct fs_db *fsdb,
1452                                     struct mgs_target_info *mti,
1453                                     struct llog_cfg_rec *lcr, char *devname,
1454                                     char *comment, int server_only)
1455 {
1456         struct list_head         log_list;
1457         struct mgs_direntry     *dirent, *n;
1458         char                    *fsname = mti->mti_fsname;
1459         int                      rc = 0, len = strlen(fsname);
1460
1461         ENTRY;
1462         /* Find all the logs in the CONFIGS directory */
1463         rc = class_dentry_readdir(env, mgs, &log_list);
1464         if (rc)
1465                 RETURN(rc);
1466
1467         /* Could use fsdb index maps instead of directory listing */
1468         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1469                 list_del_init(&dirent->mde_list);
1470                 /* don't write to sptlrpc rule log */
1471                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1472                         goto next;
1473
1474                 /* caller wants write server logs only */
1475                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1476                         goto next;
1477
1478                 if (strlen(dirent->mde_name) <= len ||
1479                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1480                     dirent->mde_name[len] != '-')
1481                         goto next;
1482
1483                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1484                 /* Erase any old settings of this same parameter */
1485                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1486                                 devname, comment, CM_SKIP);
1487                 if (rc < 0)
1488                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1489                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1490                 if (lcr == NULL)
1491                         goto next;
1492                 /* Write the new one */
1493                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1494                                           lcr, devname, comment);
1495                 if (rc != 0)
1496                         CERROR("%s: writing log %s: rc = %d\n",
1497                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1498 next:
1499                 mgs_direntry_free(dirent);
1500         }
1501
1502         RETURN(rc);
1503 }
1504
1505 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1506                                     struct mgs_device *mgs,
1507                                     struct fs_db *fsdb,
1508                                     struct mgs_target_info *mti,
1509                                     int index, char *logname);
1510 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1511                                     struct mgs_device *mgs,
1512                                     struct fs_db *fsdb,
1513                                     struct mgs_target_info *mti,
1514                                     char *logname, char *suffix, char *lovname,
1515                                     enum lustre_sec_part sec_part, int flags);
1516 static int name_create_mdt_and_lov(char **logname, char **lovname,
1517                                    struct fs_db *fsdb, int i);
1518
1519 static int add_param(char *params, char *key, char *val)
1520 {
1521         char *start = params + strlen(params);
1522         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1523         int keylen = 0;
1524
1525         if (key != NULL)
1526                 keylen = strlen(key);
1527         if (start + 1 + keylen + strlen(val) >= end) {
1528                 CERROR("params are too long: %s %s%s\n",
1529                        params, key != NULL ? key : "", val);
1530                 return -EINVAL;
1531         }
1532
1533         sprintf(start, " %s%s", key != NULL ? key : "", val);
1534         return 0;
1535 }
1536
1537 /**
1538  * Walk through client config log record and convert the related records
1539  * into the target.
1540  **/
1541 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1542                                          struct llog_handle *llh,
1543                                          struct llog_rec_hdr *rec, void *data)
1544 {
1545         struct mgs_device *mgs;
1546         struct obd_device *obd;
1547         struct mgs_target_info *mti, *tmti;
1548         struct fs_db *fsdb;
1549         int cfg_len = rec->lrh_len;
1550         char *cfg_buf = (char*) (rec + 1);
1551         struct lustre_cfg *lcfg;
1552         int rc = 0;
1553         struct llog_handle *mdt_llh = NULL;
1554         static int got_an_osc_or_mdc = 0;
1555         /* 0: not found any osc/mdc;
1556            1: found osc;
1557            2: found mdc;
1558         */
1559         static int last_step = -1;
1560         int cplen = 0;
1561
1562         ENTRY;
1563
1564         mti = ((struct temp_comp*)data)->comp_mti;
1565         tmti = ((struct temp_comp*)data)->comp_tmti;
1566         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1567         obd = ((struct temp_comp *)data)->comp_obd;
1568         mgs = lu2mgs_dev(obd->obd_lu_dev);
1569         LASSERT(mgs);
1570
1571         if (rec->lrh_type != OBD_CFG_REC) {
1572                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1573                 RETURN(-EINVAL);
1574         }
1575
1576         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1577         if (rc) {
1578                 CERROR("Insane cfg\n");
1579                 RETURN(rc);
1580         }
1581
1582         lcfg = (struct lustre_cfg *)cfg_buf;
1583
1584         if (lcfg->lcfg_command == LCFG_MARKER) {
1585                 struct cfg_marker *marker;
1586                 marker = lustre_cfg_buf(lcfg, 1);
1587                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1588                     (marker->cm_flags & CM_START) &&
1589                      !(marker->cm_flags & CM_SKIP)) {
1590                         got_an_osc_or_mdc = 1;
1591                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1592                                         sizeof(tmti->mti_svname));
1593                         if (cplen >= sizeof(tmti->mti_svname))
1594                                 RETURN(-E2BIG);
1595                         rc = record_start_log(env, mgs, &mdt_llh,
1596                                               mti->mti_svname);
1597                         if (rc)
1598                                 RETURN(rc);
1599                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1600                                            mti->mti_svname, "add osc(copied)");
1601                         record_end_log(env, &mdt_llh);
1602                         last_step = marker->cm_step;
1603                         RETURN(rc);
1604                 }
1605                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1606                     (marker->cm_flags & CM_END) &&
1607                      !(marker->cm_flags & CM_SKIP)) {
1608                         LASSERT(last_step == marker->cm_step);
1609                         last_step = -1;
1610                         got_an_osc_or_mdc = 0;
1611                         memset(tmti, 0, sizeof(*tmti));
1612                         rc = record_start_log(env, mgs, &mdt_llh,
1613                                               mti->mti_svname);
1614                         if (rc)
1615                                 RETURN(rc);
1616                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1617                                            mti->mti_svname, "add osc(copied)");
1618                         record_end_log(env, &mdt_llh);
1619                         RETURN(rc);
1620                 }
1621                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1622                     (marker->cm_flags & CM_START) &&
1623                      !(marker->cm_flags & CM_SKIP)) {
1624                         got_an_osc_or_mdc = 2;
1625                         last_step = marker->cm_step;
1626                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1627                                strlen(marker->cm_tgtname));
1628
1629                         RETURN(rc);
1630                 }
1631                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1632                     (marker->cm_flags & CM_END) &&
1633                      !(marker->cm_flags & CM_SKIP)) {
1634                         LASSERT(last_step == marker->cm_step);
1635                         last_step = -1;
1636                         got_an_osc_or_mdc = 0;
1637                         memset(tmti, 0, sizeof(*tmti));
1638                         RETURN(rc);
1639                 }
1640         }
1641
1642         if (got_an_osc_or_mdc == 0 || last_step < 0)
1643                 RETURN(rc);
1644
1645         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1646                 __u64 nodenid = lcfg->lcfg_nid;
1647
1648                 if (strlen(tmti->mti_uuid) == 0) {
1649                         /* target uuid not set, this config record is before
1650                          * LCFG_SETUP, this nid is one of target node nid.
1651                          */
1652                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1653                         tmti->mti_nid_count++;
1654                 } else {
1655                         char nidstr[LNET_NIDSTR_SIZE];
1656
1657                         /* failover node nid */
1658                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1659                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1660                                         nidstr);
1661                 }
1662
1663                 RETURN(rc);
1664         }
1665
1666         if (lcfg->lcfg_command == LCFG_SETUP) {
1667                 char *target;
1668
1669                 target = lustre_cfg_string(lcfg, 1);
1670                 memcpy(tmti->mti_uuid, target, strlen(target));
1671                 RETURN(rc);
1672         }
1673
1674         /* ignore client side sptlrpc_conf_log */
1675         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1676                 RETURN(rc);
1677
1678         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1679                 int index;
1680
1681                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1682                         RETURN (-EINVAL);
1683
1684                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1685                        strlen(mti->mti_fsname));
1686                 tmti->mti_stripe_index = index;
1687
1688                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1689                                               mti->mti_stripe_index,
1690                                               mti->mti_svname);
1691                 memset(tmti, 0, sizeof(*tmti));
1692                 RETURN(rc);
1693         }
1694
1695         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1696                 int index;
1697                 char mdt_index[9];
1698                 char *logname, *lovname;
1699
1700                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1701                                              mti->mti_stripe_index);
1702                 if (rc)
1703                         RETURN(rc);
1704                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1705
1706                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1707                         name_destroy(&logname);
1708                         name_destroy(&lovname);
1709                         RETURN(-EINVAL);
1710                 }
1711
1712                 tmti->mti_stripe_index = index;
1713                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1714                                          mdt_index, lovname,
1715                                          LUSTRE_SP_MDT, 0);
1716                 name_destroy(&logname);
1717                 name_destroy(&lovname);
1718                 RETURN(rc);
1719         }
1720         RETURN(rc);
1721 }
1722
1723 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1724 /* stealed from mgs_get_fsdb_from_llog*/
1725 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1726                                               struct mgs_device *mgs,
1727                                               char *client_name,
1728                                               struct temp_comp* comp)
1729 {
1730         struct llog_handle *loghandle;
1731         struct mgs_target_info *tmti;
1732         struct llog_ctxt *ctxt;
1733         int rc;
1734
1735         ENTRY;
1736
1737         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1738         LASSERT(ctxt != NULL);
1739
1740         OBD_ALLOC_PTR(tmti);
1741         if (tmti == NULL)
1742                 GOTO(out_ctxt, rc = -ENOMEM);
1743
1744         comp->comp_tmti = tmti;
1745         comp->comp_obd = mgs->mgs_obd;
1746
1747         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1748                        LLOG_OPEN_EXISTS);
1749         if (rc < 0) {
1750                 if (rc == -ENOENT)
1751                         rc = 0;
1752                 GOTO(out_pop, rc);
1753         }
1754
1755         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1756         if (rc)
1757                 GOTO(out_close, rc);
1758
1759         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1760                                   (void *)comp, NULL, false);
1761         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1762 out_close:
1763         llog_close(env, loghandle);
1764 out_pop:
1765         OBD_FREE_PTR(tmti);
1766 out_ctxt:
1767         llog_ctxt_put(ctxt);
1768         RETURN(rc);
1769 }
1770
1771 /* lmv is the second thing for client logs */
1772 /* copied from mgs_write_log_lov. Please refer to that.  */
1773 static int mgs_write_log_lmv(const struct lu_env *env,
1774                              struct mgs_device *mgs,
1775                              struct fs_db *fsdb,
1776                              struct mgs_target_info *mti,
1777                              char *logname, char *lmvname)
1778 {
1779         struct llog_handle *llh = NULL;
1780         struct lmv_desc *lmvdesc;
1781         char *uuid;
1782         int rc = 0;
1783         ENTRY;
1784
1785         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1786
1787         OBD_ALLOC_PTR(lmvdesc);
1788         if (lmvdesc == NULL)
1789                 RETURN(-ENOMEM);
1790         lmvdesc->ld_active_tgt_count = 0;
1791         lmvdesc->ld_tgt_count = 0;
1792         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1793         uuid = (char *)lmvdesc->ld_uuid.uuid;
1794
1795         rc = record_start_log(env, mgs, &llh, logname);
1796         if (rc)
1797                 GOTO(out_free, rc);
1798         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1799         if (rc)
1800                 GOTO(out_end, rc);
1801         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1802         if (rc)
1803                 GOTO(out_end, rc);
1804         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1805         if (rc)
1806                 GOTO(out_end, rc);
1807         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1808         if (rc)
1809                 GOTO(out_end, rc);
1810 out_end:
1811         record_end_log(env, &llh);
1812 out_free:
1813         OBD_FREE_PTR(lmvdesc);
1814         RETURN(rc);
1815 }
1816
1817 /* lov is the first thing in the mdt and client logs */
1818 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1819                              struct fs_db *fsdb, struct mgs_target_info *mti,
1820                              char *logname, char *lovname)
1821 {
1822         struct llog_handle *llh = NULL;
1823         struct lov_desc *lovdesc;
1824         char *uuid;
1825         int rc = 0;
1826         ENTRY;
1827
1828         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1829
1830         /*
1831         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1832         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1833               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1834         */
1835
1836         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1837         OBD_ALLOC_PTR(lovdesc);
1838         if (lovdesc == NULL)
1839                 RETURN(-ENOMEM);
1840         lovdesc->ld_magic = LOV_DESC_MAGIC;
1841         lovdesc->ld_tgt_count = 0;
1842         /* Defaults.  Can be changed later by lcfg config_param */
1843         lovdesc->ld_default_stripe_count = 1;
1844         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1845         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1846         lovdesc->ld_default_stripe_offset = -1;
1847         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1848         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1849         /* can these be the same? */
1850         uuid = (char *)lovdesc->ld_uuid.uuid;
1851
1852         /* This should always be the first entry in a log.
1853         rc = mgs_clear_log(obd, logname); */
1854         rc = record_start_log(env, mgs, &llh, logname);
1855         if (rc)
1856                 GOTO(out_free, rc);
1857         /* FIXME these should be a single journal transaction */
1858         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1859         if (rc)
1860                 GOTO(out_end, rc);
1861         rc = record_attach(env, llh, lovname, "lov", uuid);
1862         if (rc)
1863                 GOTO(out_end, rc);
1864         rc = record_lov_setup(env, llh, lovname, lovdesc);
1865         if (rc)
1866                 GOTO(out_end, rc);
1867         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1868         if (rc)
1869                 GOTO(out_end, rc);
1870         EXIT;
1871 out_end:
1872         record_end_log(env, &llh);
1873 out_free:
1874         OBD_FREE_PTR(lovdesc);
1875         return rc;
1876 }
1877
1878 /* add failnids to open log */
1879 static int mgs_write_log_failnids(const struct lu_env *env,
1880                                   struct mgs_target_info *mti,
1881                                   struct llog_handle *llh,
1882                                   char *cliname)
1883 {
1884         char *failnodeuuid = NULL;
1885         char *ptr = mti->mti_params;
1886         lnet_nid_t nid;
1887         int rc = 0;
1888
1889         /*
1890         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1891         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1892         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1893         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1894         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1895         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1896         */
1897
1898         /*
1899          * Pull failnid info out of params string, which may contain something
1900          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1901          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1902          * etc.  However, convert_hostnames() should have caught those.
1903          */
1904         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1905                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1906                         char nidstr[LNET_NIDSTR_SIZE];
1907
1908                         if (failnodeuuid == NULL) {
1909                                 /* We don't know the failover node name,
1910                                  * so just use the first nid as the uuid */
1911                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
1912                                 rc = name_create(&failnodeuuid, nidstr, "");
1913                                 if (rc != 0)
1914                                         return rc;
1915                         }
1916                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1917                                 "client %s\n",
1918                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
1919                                 failnodeuuid, cliname);
1920                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1921                         /*
1922                          * If *ptr is ':', we have added all NIDs for
1923                          * failnodeuuid.
1924                          */
1925                         if (*ptr == ':') {
1926                                 rc = record_add_conn(env, llh, cliname,
1927                                                      failnodeuuid);
1928                                 name_destroy(&failnodeuuid);
1929                                 failnodeuuid = NULL;
1930                         }
1931                 }
1932                 if (failnodeuuid) {
1933                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1934                         name_destroy(&failnodeuuid);
1935                         failnodeuuid = NULL;
1936                 }
1937         }
1938
1939         return rc;
1940 }
1941
1942 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1943                                     struct mgs_device *mgs,
1944                                     struct fs_db *fsdb,
1945                                     struct mgs_target_info *mti,
1946                                     char *logname, char *lmvname)
1947 {
1948         struct llog_handle *llh = NULL;
1949         char *mdcname = NULL;
1950         char *nodeuuid = NULL;
1951         char *mdcuuid = NULL;
1952         char *lmvuuid = NULL;
1953         char index[6];
1954         char nidstr[LNET_NIDSTR_SIZE];
1955         int i, rc;
1956         ENTRY;
1957
1958         if (mgs_log_is_empty(env, mgs, logname)) {
1959                 CERROR("log is empty! Logical error\n");
1960                 RETURN(-EINVAL);
1961         }
1962
1963         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1964                mti->mti_svname, logname, lmvname);
1965
1966         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
1967         rc = name_create(&nodeuuid, nidstr, "");
1968         if (rc)
1969                 RETURN(rc);
1970         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1971         if (rc)
1972                 GOTO(out_free, rc);
1973         rc = name_create(&mdcuuid, mdcname, "_UUID");
1974         if (rc)
1975                 GOTO(out_free, rc);
1976         rc = name_create(&lmvuuid, lmvname, "_UUID");
1977         if (rc)
1978                 GOTO(out_free, rc);
1979
1980         rc = record_start_log(env, mgs, &llh, logname);
1981         if (rc)
1982                 GOTO(out_free, rc);
1983         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1984                            "add mdc");
1985         if (rc)
1986                 GOTO(out_end, rc);
1987         for (i = 0; i < mti->mti_nid_count; i++) {
1988                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1989                         libcfs_nid2str_r(mti->mti_nids[i],
1990                                          nidstr, sizeof(nidstr)));
1991
1992                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1993                 if (rc)
1994                         GOTO(out_end, rc);
1995         }
1996
1997         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1998         if (rc)
1999                 GOTO(out_end, rc);
2000         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2001                           NULL, NULL);
2002         if (rc)
2003                 GOTO(out_end, rc);
2004         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2005         if (rc)
2006                 GOTO(out_end, rc);
2007         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2008         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2009                             index, "1");
2010         if (rc)
2011                 GOTO(out_end, rc);
2012         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2013                            "add mdc");
2014         if (rc)
2015                 GOTO(out_end, rc);
2016 out_end:
2017         record_end_log(env, &llh);
2018 out_free:
2019         name_destroy(&lmvuuid);
2020         name_destroy(&mdcuuid);
2021         name_destroy(&mdcname);
2022         name_destroy(&nodeuuid);
2023         RETURN(rc);
2024 }
2025
2026 static inline int name_create_lov(char **lovname, char *mdtname,
2027                                   struct fs_db *fsdb, int index)
2028 {
2029         /* COMPAT_180 */
2030         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2031                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2032         else
2033                 return name_create(lovname, mdtname, "-mdtlov");
2034 }
2035
2036 static int name_create_mdt_and_lov(char **logname, char **lovname,
2037                                    struct fs_db *fsdb, int i)
2038 {
2039         int rc;
2040
2041         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2042         if (rc)
2043                 return rc;
2044         /* COMPAT_180 */
2045         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2046                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2047         else
2048                 rc = name_create(lovname, *logname, "-mdtlov");
2049         if (rc) {
2050                 name_destroy(logname);
2051                 *logname = NULL;
2052         }
2053         return rc;
2054 }
2055
2056 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2057                                       struct fs_db *fsdb, int i)
2058 {
2059         char suffix[16];
2060
2061         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2062                 sprintf(suffix, "-osc");
2063         else
2064                 sprintf(suffix, "-osc-MDT%04x", i);
2065         return name_create(oscname, ostname, suffix);
2066 }
2067
2068 /* add new mdc to already existent MDS */
2069 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2070                                     struct mgs_device *mgs,
2071                                     struct fs_db *fsdb,
2072                                     struct mgs_target_info *mti,
2073                                     int mdt_index, char *logname)
2074 {
2075         struct llog_handle      *llh = NULL;
2076         char    *nodeuuid = NULL;
2077         char    *ospname = NULL;
2078         char    *lovuuid = NULL;
2079         char    *mdtuuid = NULL;
2080         char    *svname = NULL;
2081         char    *mdtname = NULL;
2082         char    *lovname = NULL;
2083         char    index_str[16];
2084         char    nidstr[LNET_NIDSTR_SIZE];
2085         int     i, rc;
2086
2087         ENTRY;
2088         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2089                 CERROR("log is empty! Logical error\n");
2090                 RETURN (-EINVAL);
2091         }
2092
2093         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2094                logname);
2095
2096         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2097         if (rc)
2098                 RETURN(rc);
2099
2100         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2101         rc = name_create(&nodeuuid, nidstr, "");
2102         if (rc)
2103                 GOTO(out_destory, rc);
2104
2105         rc = name_create(&svname, mdtname, "-osp");
2106         if (rc)
2107                 GOTO(out_destory, rc);
2108
2109         sprintf(index_str, "-MDT%04x", mdt_index);
2110         rc = name_create(&ospname, svname, index_str);
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2115         if (rc)
2116                 GOTO(out_destory, rc);
2117
2118         rc = name_create(&lovuuid, lovname, "_UUID");
2119         if (rc)
2120                 GOTO(out_destory, rc);
2121
2122         rc = name_create(&mdtuuid, mdtname, "_UUID");
2123         if (rc)
2124                 GOTO(out_destory, rc);
2125
2126         rc = record_start_log(env, mgs, &llh, logname);
2127         if (rc)
2128                 GOTO(out_destory, rc);
2129
2130         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2131                            "add osp");
2132         if (rc)
2133                 GOTO(out_destory, rc);
2134
2135         for (i = 0; i < mti->mti_nid_count; i++) {
2136                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2137                         libcfs_nid2str_r(mti->mti_nids[i],
2138                                          nidstr, sizeof(nidstr)));
2139                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2140                 if (rc)
2141                         GOTO(out_end, rc);
2142         }
2143
2144         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2145         if (rc)
2146                 GOTO(out_end, rc);
2147
2148         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2149                           NULL, NULL);
2150         if (rc)
2151                 GOTO(out_end, rc);
2152
2153         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2154         if (rc)
2155                 GOTO(out_end, rc);
2156
2157         /* Add mdc(osp) to lod */
2158         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2159         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2160                          index_str, "1", NULL);
2161         if (rc)
2162                 GOTO(out_end, rc);
2163
2164         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2165         if (rc)
2166                 GOTO(out_end, rc);
2167
2168 out_end:
2169         record_end_log(env, &llh);
2170
2171 out_destory:
2172         name_destroy(&mdtuuid);
2173         name_destroy(&lovuuid);
2174         name_destroy(&lovname);
2175         name_destroy(&ospname);
2176         name_destroy(&svname);
2177         name_destroy(&nodeuuid);
2178         name_destroy(&mdtname);
2179         RETURN(rc);
2180 }
2181
2182 static int mgs_write_log_mdt0(const struct lu_env *env,
2183                               struct mgs_device *mgs,
2184                               struct fs_db *fsdb,
2185                               struct mgs_target_info *mti)
2186 {
2187         char *log = mti->mti_svname;
2188         struct llog_handle *llh = NULL;
2189         char *uuid, *lovname;
2190         char mdt_index[6];
2191         char *ptr = mti->mti_params;
2192         int rc = 0, failout = 0;
2193         ENTRY;
2194
2195         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2196         if (uuid == NULL)
2197                 RETURN(-ENOMEM);
2198
2199         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2200                 failout = (strncmp(ptr, "failout", 7) == 0);
2201
2202         rc = name_create(&lovname, log, "-mdtlov");
2203         if (rc)
2204                 GOTO(out_free, rc);
2205         if (mgs_log_is_empty(env, mgs, log)) {
2206                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2207                 if (rc)
2208                         GOTO(out_lod, rc);
2209         }
2210
2211         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2212
2213         rc = record_start_log(env, mgs, &llh, log);
2214         if (rc)
2215                 GOTO(out_lod, rc);
2216
2217         /* add MDT itself */
2218
2219         /* FIXME this whole fn should be a single journal transaction */
2220         sprintf(uuid, "%s_UUID", log);
2221         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2222         if (rc)
2223                 GOTO(out_lod, rc);
2224         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2225         if (rc)
2226                 GOTO(out_end, rc);
2227         rc = record_mount_opt(env, llh, log, lovname, NULL);
2228         if (rc)
2229                 GOTO(out_end, rc);
2230         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2231                         failout ? "n" : "f");
2232         if (rc)
2233                 GOTO(out_end, rc);
2234         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2235         if (rc)
2236                 GOTO(out_end, rc);
2237 out_end:
2238         record_end_log(env, &llh);
2239 out_lod:
2240         name_destroy(&lovname);
2241 out_free:
2242         OBD_FREE(uuid, sizeof(struct obd_uuid));
2243         RETURN(rc);
2244 }
2245
2246 /* envelope method for all layers log */
2247 static int mgs_write_log_mdt(const struct lu_env *env,
2248                              struct mgs_device *mgs,
2249                              struct fs_db *fsdb,
2250                              struct mgs_target_info *mti)
2251 {
2252         struct mgs_thread_info *mgi = mgs_env_info(env);
2253         struct llog_handle *llh = NULL;
2254         char *cliname;
2255         int rc, i = 0;
2256         ENTRY;
2257
2258         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2259
2260         if (mti->mti_uuid[0] == '\0') {
2261                 /* Make up our own uuid */
2262                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2263                          "%s_UUID", mti->mti_svname);
2264         }
2265
2266         /* add mdt */
2267         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2268         if (rc)
2269                 RETURN(rc);
2270         /* Append the mdt info to the client log */
2271         rc = name_create(&cliname, mti->mti_fsname, "-client");
2272         if (rc)
2273                 RETURN(rc);
2274
2275         if (mgs_log_is_empty(env, mgs, cliname)) {
2276                 /* Start client log */
2277                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2278                                        fsdb->fsdb_clilov);
2279                 if (rc)
2280                         GOTO(out_free, rc);
2281                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2282                                        fsdb->fsdb_clilmv);
2283                 if (rc)
2284                         GOTO(out_free, rc);
2285         }
2286
2287         /*
2288         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2289         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2290         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2291         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2292         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2293         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2294         */
2295
2296         /* copy client info about lov/lmv */
2297         mgi->mgi_comp.comp_mti = mti;
2298         mgi->mgi_comp.comp_fsdb = fsdb;
2299
2300         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2301                                                 &mgi->mgi_comp);
2302         if (rc)
2303                 GOTO(out_free, rc);
2304         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2305                                       fsdb->fsdb_clilmv);
2306         if (rc)
2307                 GOTO(out_free, rc);
2308
2309         /* add mountopts */
2310         rc = record_start_log(env, mgs, &llh, cliname);
2311         if (rc)
2312                 GOTO(out_free, rc);
2313
2314         rc = record_marker(env, llh, fsdb, CM_START, cliname,
2315                            "mount opts");
2316         if (rc)
2317                 GOTO(out_end, rc);
2318         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2319                               fsdb->fsdb_clilmv);
2320         if (rc)
2321                 GOTO(out_end, rc);
2322         rc = record_marker(env, llh, fsdb, CM_END, cliname,
2323                            "mount opts");
2324
2325         if (rc)
2326                 GOTO(out_end, rc);
2327
2328         /* for_all_existing_mdt except current one */
2329         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2330                 if (i !=  mti->mti_stripe_index &&
2331                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2332                         char *logname;
2333
2334                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2335                         if (rc)
2336                                 GOTO(out_end, rc);
2337
2338                         /* NB: If the log for the MDT is empty, it means
2339                          * the MDT is only added to the index
2340                          * map, and not being process yet, i.e. this
2341                          * is an unregistered MDT, see mgs_write_log_target().
2342                          * so we should skip it. Otherwise
2343                          *
2344                          * 1. MGS get register request for MDT1 and MDT2.
2345                          *
2346                          * 2. Then both MDT1 and MDT2 are added into
2347                          * fsdb_mdt_index_map. (see mgs_set_index()).
2348                          *
2349                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2350                          * generate the config log, here, it will regard MDT2
2351                          * as an existent MDT, and generate "add osp" for
2352                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2353                          * MDT0002 config log is still empty, so it will
2354                          * add "add osp" even before "lov setup", which
2355                          * will definitly cause trouble.
2356                          *
2357                          * 4. MDT1 registeration finished, fsdb_mutex is
2358                          * released, then MDT2 get in, then in above
2359                          * mgs_steal_llog_for_mdt_from_client(), it will
2360                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2361                          * which will cause another trouble.*/
2362                         if (!mgs_log_is_empty(env, mgs, logname))
2363                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2364                                                               mti, i, logname);
2365
2366                         name_destroy(&logname);
2367                         if (rc)
2368                                 GOTO(out_end, rc);
2369                 }
2370         }
2371 out_end:
2372         record_end_log(env, &llh);
2373 out_free:
2374         name_destroy(&cliname);
2375         RETURN(rc);
2376 }
2377
2378 /* Add the ost info to the client/mdt lov */
2379 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2380                                     struct mgs_device *mgs, struct fs_db *fsdb,
2381                                     struct mgs_target_info *mti,
2382                                     char *logname, char *suffix, char *lovname,
2383                                     enum lustre_sec_part sec_part, int flags)
2384 {
2385         struct llog_handle *llh = NULL;
2386         char *nodeuuid = NULL;
2387         char *oscname = NULL;
2388         char *oscuuid = NULL;
2389         char *lovuuid = NULL;
2390         char *svname = NULL;
2391         char index[6];
2392         char nidstr[LNET_NIDSTR_SIZE];
2393         int i, rc;
2394         ENTRY;
2395
2396         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2397                 mti->mti_svname, logname);
2398
2399         if (mgs_log_is_empty(env, mgs, logname)) {
2400                 CERROR("log is empty! Logical error\n");
2401                 RETURN(-EINVAL);
2402         }
2403
2404         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2405         rc = name_create(&nodeuuid, nidstr, "");
2406         if (rc)
2407                 RETURN(rc);
2408         rc = name_create(&svname, mti->mti_svname, "-osc");
2409         if (rc)
2410                 GOTO(out_free, rc);
2411
2412         /* for the system upgraded from old 1.8, keep using the old osc naming
2413          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2414         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2415                 rc = name_create(&oscname, svname, "");
2416         else
2417                 rc = name_create(&oscname, svname, suffix);
2418         if (rc)
2419                 GOTO(out_free, rc);
2420
2421         rc = name_create(&oscuuid, oscname, "_UUID");
2422         if (rc)
2423                 GOTO(out_free, rc);
2424         rc = name_create(&lovuuid, lovname, "_UUID");
2425         if (rc)
2426                 GOTO(out_free, rc);
2427
2428
2429         /*
2430         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2431         multihomed (#4)
2432         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2433         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2434         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2435         failover (#6,7)
2436         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2437         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2438         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2439         */
2440
2441         rc = record_start_log(env, mgs, &llh, logname);
2442         if (rc)
2443                 GOTO(out_free, rc);
2444
2445         /* FIXME these should be a single journal transaction */
2446         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2447                            "add osc");
2448         if (rc)
2449                 GOTO(out_end, rc);
2450
2451         /* NB: don't change record order, because upon MDT steal OSC config
2452          * from client, it treats all nids before LCFG_SETUP as target nids
2453          * (multiple interfaces), while nids after as failover node nids.
2454          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2455          */
2456         for (i = 0; i < mti->mti_nid_count; i++) {
2457                 CDEBUG(D_MGS, "add nid %s\n",
2458                         libcfs_nid2str_r(mti->mti_nids[i],
2459                                          nidstr, sizeof(nidstr)));
2460                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2461                 if (rc)
2462                         GOTO(out_end, rc);
2463         }
2464         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2465         if (rc)
2466                 GOTO(out_end, rc);
2467         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2468                           NULL, NULL);
2469         if (rc)
2470                 GOTO(out_end, rc);
2471         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2472         if (rc)
2473                 GOTO(out_end, rc);
2474
2475         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2476
2477         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2478         if (rc)
2479                 GOTO(out_end, rc);
2480         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2481                            "add osc");
2482         if (rc)
2483                 GOTO(out_end, rc);
2484 out_end:
2485         record_end_log(env, &llh);
2486 out_free:
2487         name_destroy(&lovuuid);
2488         name_destroy(&oscuuid);
2489         name_destroy(&oscname);
2490         name_destroy(&svname);
2491         name_destroy(&nodeuuid);
2492         RETURN(rc);
2493 }
2494
2495 static int mgs_write_log_ost(const struct lu_env *env,
2496                              struct mgs_device *mgs, struct fs_db *fsdb,
2497                              struct mgs_target_info *mti)
2498 {
2499         struct llog_handle *llh = NULL;
2500         char *logname, *lovname;
2501         char *ptr = mti->mti_params;
2502         int rc, flags = 0, failout = 0, i;
2503         ENTRY;
2504
2505         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2506
2507         /* The ost startup log */
2508
2509         /* If the ost log already exists, that means that someone reformatted
2510            the ost and it called target_add again. */
2511         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2512                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2513                                    "exists, yet the server claims it never "
2514                                    "registered. It may have been reformatted, "
2515                                    "or the index changed. writeconf the MDT to "
2516                                    "regenerate all logs.\n", mti->mti_svname);
2517                 RETURN(-EALREADY);
2518         }
2519
2520         /*
2521         attach obdfilter ost1 ost1_UUID
2522         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2523         */
2524         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2525                 failout = (strncmp(ptr, "failout", 7) == 0);
2526         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2527         if (rc)
2528                 RETURN(rc);
2529         /* FIXME these should be a single journal transaction */
2530         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2531         if (rc)
2532                 GOTO(out_end, rc);
2533         if (*mti->mti_uuid == '\0')
2534                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2535                          "%s_UUID", mti->mti_svname);
2536         rc = record_attach(env, llh, mti->mti_svname,
2537                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2538         if (rc)
2539                 GOTO(out_end, rc);
2540         rc = record_setup(env, llh, mti->mti_svname,
2541                           "dev"/*ignored*/, "type"/*ignored*/,
2542                           failout ? "n" : "f", NULL/*options*/);
2543         if (rc)
2544                 GOTO(out_end, rc);
2545         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2546         if (rc)
2547                 GOTO(out_end, rc);
2548 out_end:
2549         record_end_log(env, &llh);
2550         if (rc)
2551                 RETURN(rc);
2552         /* We also have to update the other logs where this osc is part of
2553            the lov */
2554
2555         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2556                 /* If we're upgrading, the old mdt log already has our
2557                    entry. Let's do a fake one for fun. */
2558                 /* Note that we can't add any new failnids, since we don't
2559                    know the old osc names. */
2560                 flags = CM_SKIP | CM_UPGRADE146;
2561
2562         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2563                 /* If the update flag isn't set, don't update client/mdt
2564                    logs. */
2565                 flags |= CM_SKIP;
2566                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2567                               "the MDT first to regenerate it.\n",
2568                               mti->mti_svname);
2569         }
2570
2571         /* Add ost to all MDT lov defs */
2572         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2573                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2574                         char mdt_index[9];
2575
2576                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2577                                                      i);
2578                         if (rc)
2579                                 RETURN(rc);
2580                         sprintf(mdt_index, "-MDT%04x", i);
2581                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2582                                                       logname, mdt_index,
2583                                                       lovname, LUSTRE_SP_MDT,
2584                                                       flags);
2585                         name_destroy(&logname);
2586                         name_destroy(&lovname);
2587                         if (rc)
2588                                 RETURN(rc);
2589                 }
2590         }
2591
2592         /* Append ost info to the client log */
2593         rc = name_create(&logname, mti->mti_fsname, "-client");
2594         if (rc)
2595                 RETURN(rc);
2596         if (mgs_log_is_empty(env, mgs, logname)) {
2597                 /* Start client log */
2598                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2599                                        fsdb->fsdb_clilov);
2600                 if (rc)
2601                         GOTO(out_free, rc);
2602                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2603                                        fsdb->fsdb_clilmv);
2604                 if (rc)
2605                         GOTO(out_free, rc);
2606         }
2607         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2608                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
2609 out_free:
2610         name_destroy(&logname);
2611         RETURN(rc);
2612 }
2613
2614 static __inline__ int mgs_param_empty(char *ptr)
2615 {
2616         char *tmp;
2617
2618         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2619                 return 1;
2620         return 0;
2621 }
2622
2623 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2624                                           struct mgs_device *mgs,
2625                                           struct fs_db *fsdb,
2626                                           struct mgs_target_info *mti,
2627                                           char *logname, char *cliname)
2628 {
2629         int rc;
2630         struct llog_handle *llh = NULL;
2631
2632         if (mgs_param_empty(mti->mti_params)) {
2633                 /* Remove _all_ failnids */
2634                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2635                                 mti->mti_svname, "add failnid", CM_SKIP);
2636                 return rc < 0 ? rc : 0;
2637         }
2638
2639         /* Otherwise failover nids are additive */
2640         rc = record_start_log(env, mgs, &llh, logname);
2641         if (rc)
2642                 return rc;
2643                 /* FIXME this should be a single journal transaction */
2644         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2645                            "add failnid");
2646         if (rc)
2647                 goto out_end;
2648         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2649         if (rc)
2650                 goto out_end;
2651         rc = record_marker(env, llh, fsdb, CM_END,
2652                            mti->mti_svname, "add failnid");
2653 out_end:
2654         record_end_log(env, &llh);
2655         return rc;
2656 }
2657
2658
2659 /* Add additional failnids to an existing log.
2660    The mdc/osc must have been added to logs first */
2661 /* tcp nids must be in dotted-quad ascii -
2662    we can't resolve hostnames from the kernel. */
2663 static int mgs_write_log_add_failnid(const struct lu_env *env,
2664                                      struct mgs_device *mgs,
2665                                      struct fs_db *fsdb,
2666                                      struct mgs_target_info *mti)
2667 {
2668         char *logname, *cliname;
2669         int rc;
2670         ENTRY;
2671
2672         /* FIXME we currently can't erase the failnids
2673          * given when a target first registers, since they aren't part of
2674          * an "add uuid" stanza */
2675
2676         /* Verify that we know about this target */
2677         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2678                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2679                                    "yet. It must be started before failnids "
2680                                    "can be added.\n", mti->mti_svname);
2681                 RETURN(-ENOENT);
2682         }
2683
2684         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2685         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2686                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2687         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2688                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2689         } else {
2690                 RETURN(-EINVAL);
2691         }
2692         if (rc)
2693                 RETURN(rc);
2694         /* Add failover nids to the client log */
2695         rc = name_create(&logname, mti->mti_fsname, "-client");
2696         if (rc) {
2697                 name_destroy(&cliname);
2698                 RETURN(rc);
2699         }
2700         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2701         name_destroy(&logname);
2702         name_destroy(&cliname);
2703         if (rc)
2704                 RETURN(rc);
2705
2706         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2707                 /* Add OST failover nids to the MDT logs as well */
2708                 int i;
2709
2710                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2711                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2712                                 continue;
2713                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2714                         if (rc)
2715                                 RETURN(rc);
2716                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2717                                                  fsdb, i);
2718                         if (rc) {
2719                                 name_destroy(&logname);
2720                                 RETURN(rc);
2721                         }
2722                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2723                                                             mti, logname,
2724                                                             cliname);
2725                         name_destroy(&cliname);
2726                         name_destroy(&logname);
2727                         if (rc)
2728                                 RETURN(rc);
2729                 }
2730         }
2731
2732         RETURN(rc);
2733 }
2734
2735 static int mgs_wlp_lcfg(const struct lu_env *env,
2736                         struct mgs_device *mgs, struct fs_db *fsdb,
2737                         struct mgs_target_info *mti,
2738                         char *logname, struct lustre_cfg_bufs *bufs,
2739                         char *tgtname, char *ptr)
2740 {
2741         char comment[MTI_NAME_MAXLEN];
2742         char *tmp;
2743         struct llog_cfg_rec *lcr;
2744         int rc, del;
2745
2746         /* Erase any old settings of this same parameter */
2747         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2748         comment[MTI_NAME_MAXLEN - 1] = 0;
2749         /* But don't try to match the value. */
2750         tmp = strchr(comment, '=');
2751         if (tmp != NULL)
2752                 *tmp = 0;
2753         /* FIXME we should skip settings that are the same as old values */
2754         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2755         if (rc < 0)
2756                 return rc;
2757         del = mgs_param_empty(ptr);
2758
2759         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2760                       "Setting" : "Modifying", tgtname, comment, logname);
2761         if (del) {
2762                 /* mgs_modify() will return 1 if nothing had to be done */
2763                 if (rc == 1)
2764                         rc = 0;
2765                 return rc;
2766         }
2767
2768         lustre_cfg_bufs_reset(bufs, tgtname);
2769         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2770         if (mti->mti_flags & LDD_F_PARAM2)
2771                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2772
2773         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2774                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2775         if (lcr == NULL)
2776                 return -ENOMEM;
2777
2778         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2779                                   comment);
2780         lustre_cfg_rec_free(lcr);
2781         return rc;
2782 }
2783
2784 static int mgs_write_log_param2(const struct lu_env *env,
2785                                 struct mgs_device *mgs,
2786                                 struct fs_db *fsdb,
2787                                 struct mgs_target_info *mti, char *ptr)
2788 {
2789         struct lustre_cfg_bufs  bufs;
2790         int                     rc = 0;
2791         ENTRY;
2792
2793         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2794         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2795                           mti->mti_svname, ptr);
2796
2797         RETURN(rc);
2798 }
2799
2800 /* write global variable settings into log */
2801 static int mgs_write_log_sys(const struct lu_env *env,
2802                              struct mgs_device *mgs, struct fs_db *fsdb,
2803                              struct mgs_target_info *mti, char *sys, char *ptr)
2804 {
2805         struct mgs_thread_info  *mgi = mgs_env_info(env);
2806         struct lustre_cfg       *lcfg;
2807         struct llog_cfg_rec     *lcr;
2808         char *tmp, sep;
2809         int rc, cmd, convert = 1;
2810
2811         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2812                 cmd = LCFG_SET_TIMEOUT;
2813         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2814                 cmd = LCFG_SET_LDLM_TIMEOUT;
2815         /* Check for known params here so we can return error to lctl */
2816         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2817                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2818                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2819                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2820                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2821                 cmd = LCFG_PARAM;
2822         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2823                 convert = 0; /* Don't convert string value to integer */
2824                 cmd = LCFG_PARAM;
2825         } else {
2826                 return -EINVAL;
2827         }
2828
2829         if (mgs_param_empty(ptr))
2830                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2831         else
2832                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2833
2834         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2835         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2836         if (!convert && *tmp != '\0')
2837                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2838         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2839         if (lcr == NULL)
2840                 return -ENOMEM;
2841
2842         lcfg = &lcr->lcr_cfg;
2843         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2844         /* truncate the comment to the parameter name */
2845         ptr = tmp - 1;
2846         sep = *ptr;
2847         *ptr = '\0';
2848         /* modify all servers and clients */
2849         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2850                                       *tmp == '\0' ? NULL : lcr,
2851                                       mti->mti_fsname, sys, 0);
2852         if (rc == 0 && *tmp != '\0') {
2853                 switch (cmd) {
2854                 case LCFG_SET_TIMEOUT:
2855                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2856                                 class_process_config(lcfg);
2857                         break;
2858                 case LCFG_SET_LDLM_TIMEOUT:
2859                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2860                                 class_process_config(lcfg);
2861                         break;
2862                 default:
2863                         break;
2864                 }
2865         }
2866         *ptr = sep;
2867         lustre_cfg_rec_free(lcr);
2868         return rc;
2869 }
2870
2871 /* write quota settings into log */
2872 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2873                                struct fs_db *fsdb, struct mgs_target_info *mti,
2874                                char *quota, char *ptr)
2875 {
2876         struct mgs_thread_info  *mgi = mgs_env_info(env);
2877         struct llog_cfg_rec     *lcr;
2878         char                    *tmp;
2879         char                     sep;
2880         int                      rc, cmd = LCFG_PARAM;
2881
2882         /* support only 'meta' and 'data' pools so far */
2883         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2884             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2885                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2886                        "& quota.ost are)\n", ptr);
2887                 return -EINVAL;
2888         }
2889
2890         if (*tmp == '\0') {
2891                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2892         } else {
2893                 CDEBUG(D_MGS, "global '%s'\n", quota);
2894
2895                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2896                     strcmp(tmp, "none") != 0) {
2897                         CERROR("enable option(%s) isn't supported\n", tmp);
2898                         return -EINVAL;
2899                 }
2900         }
2901
2902         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2903         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2904         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2905         if (lcr == NULL)
2906                 return -ENOMEM;
2907
2908         /* truncate the comment to the parameter name */
2909         ptr = tmp - 1;
2910         sep = *ptr;
2911         *ptr = '\0';
2912
2913         /* XXX we duplicated quota enable information in all server
2914          *     config logs, it should be moved to a separate config
2915          *     log once we cleanup the config log for global param. */
2916         /* modify all servers */
2917         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2918                                       *tmp == '\0' ? NULL : lcr,
2919                                       mti->mti_fsname, quota, 1);
2920         *ptr = sep;
2921         lustre_cfg_rec_free(lcr);
2922         return rc < 0 ? rc : 0;
2923 }
2924
2925 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2926                                    struct mgs_device *mgs,
2927                                    struct fs_db *fsdb,
2928                                    struct mgs_target_info *mti,
2929                                    char *param)
2930 {
2931         struct mgs_thread_info  *mgi = mgs_env_info(env);
2932         struct llog_cfg_rec     *lcr;
2933         struct llog_handle      *llh = NULL;
2934         char                    *logname;
2935         char                    *comment, *ptr;
2936         int                      rc, len;
2937
2938         ENTRY;
2939
2940         /* get comment */
2941         ptr = strchr(param, '=');
2942         LASSERT(ptr != NULL);
2943         len = ptr - param;
2944
2945         OBD_ALLOC(comment, len + 1);
2946         if (comment == NULL)
2947                 RETURN(-ENOMEM);
2948         strncpy(comment, param, len);
2949         comment[len] = '\0';
2950
2951         /* prepare lcfg */
2952         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2953         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2954         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2955         if (lcr == NULL)
2956                 GOTO(out_comment, rc = -ENOMEM);
2957
2958         /* construct log name */
2959         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2960         if (rc < 0)
2961                 GOTO(out_lcfg, rc);
2962
2963         if (mgs_log_is_empty(env, mgs, logname)) {
2964                 rc = record_start_log(env, mgs, &llh, logname);
2965                 if (rc < 0)
2966                         GOTO(out, rc);
2967                 record_end_log(env, &llh);
2968         }
2969
2970         /* obsolete old one */
2971         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2972                         comment, CM_SKIP);
2973         if (rc < 0)
2974                 GOTO(out, rc);
2975         /* write the new one */
2976         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2977                                   mti->mti_svname, comment);
2978         if (rc)
2979                 CERROR("%s: error writing log %s: rc = %d\n",
2980                        mgs->mgs_obd->obd_name, logname, rc);
2981 out:
2982         name_destroy(&logname);
2983 out_lcfg:
2984         lustre_cfg_rec_free(lcr);
2985 out_comment:
2986         OBD_FREE(comment, len + 1);
2987         RETURN(rc);
2988 }
2989
2990 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2991                                         char *param)
2992 {
2993         char    *ptr;
2994
2995         /* disable the adjustable udesc parameter for now, i.e. use default
2996          * setting that client always ship udesc to MDT if possible. to enable
2997          * it simply remove the following line */
2998         goto error_out;
2999
3000         ptr = strchr(param, '=');
3001         if (ptr == NULL)
3002                 goto error_out;
3003         *ptr++ = '\0';
3004
3005         if (strcmp(param, PARAM_SRPC_UDESC))
3006                 goto error_out;
3007
3008         if (strcmp(ptr, "yes") == 0) {
3009                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3010                 CWARN("Enable user descriptor shipping from client to MDT\n");
3011         } else if (strcmp(ptr, "no") == 0) {
3012                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3013                 CWARN("Disable user descriptor shipping from client to MDT\n");
3014         } else {
3015                 *(ptr - 1) = '=';
3016                 goto error_out;
3017         }
3018         return 0;
3019
3020 error_out:
3021         CERROR("Invalid param: %s\n", param);
3022         return -EINVAL;
3023 }
3024
3025 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3026                                   const char *svname,
3027                                   char *param)
3028 {
3029         struct sptlrpc_rule      rule;
3030         struct sptlrpc_rule_set *rset;
3031         int                      rc;
3032         ENTRY;
3033
3034         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3035                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3036                 RETURN(-EINVAL);
3037         }
3038
3039         if (strncmp(param, PARAM_SRPC_UDESC,
3040                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3041                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3042         }
3043
3044         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3045                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3046                 RETURN(-EINVAL);
3047         }
3048
3049         param += sizeof(PARAM_SRPC_FLVR) - 1;
3050
3051         rc = sptlrpc_parse_rule(param, &rule);
3052         if (rc)
3053                 RETURN(rc);
3054
3055         /* mgs rules implies must be mgc->mgs */
3056         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3057                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3058                      rule.sr_from != LUSTRE_SP_ANY) ||
3059                     (rule.sr_to != LUSTRE_SP_MGS &&
3060                      rule.sr_to != LUSTRE_SP_ANY))
3061                         RETURN(-EINVAL);
3062         }
3063
3064         /* preapre room for this coming rule. svcname format should be:
3065          * - fsname: general rule
3066          * - fsname-tgtname: target-specific rule
3067          */
3068         if (strchr(svname, '-')) {
3069                 struct mgs_tgt_srpc_conf *tgtconf;
3070                 int                       found = 0;
3071
3072                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3073                      tgtconf = tgtconf->mtsc_next) {
3074                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3075                                 found = 1;
3076                                 break;
3077                         }
3078                 }
3079
3080                 if (!found) {
3081                         int name_len;
3082
3083                         OBD_ALLOC_PTR(tgtconf);
3084                         if (tgtconf == NULL)
3085                                 RETURN(-ENOMEM);
3086
3087                         name_len = strlen(svname);
3088
3089                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3090                         if (tgtconf->mtsc_tgt == NULL) {
3091                                 OBD_FREE_PTR(tgtconf);
3092                                 RETURN(-ENOMEM);
3093                         }
3094                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3095
3096                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3097                         fsdb->fsdb_srpc_tgt = tgtconf;
3098                 }
3099
3100                 rset = &tgtconf->mtsc_rset;
3101         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3102                 /* put _mgs related srpc rule directly in mgs ruleset */
3103                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3104         } else {
3105                 rset = &fsdb->fsdb_srpc_gen;
3106         }
3107
3108         rc = sptlrpc_rule_set_merge(rset, &rule);
3109
3110         RETURN(rc);
3111 }
3112
3113 static int mgs_srpc_set_param(const struct lu_env *env,
3114                               struct mgs_device *mgs,
3115                               struct fs_db *fsdb,
3116                               struct mgs_target_info *mti,
3117                               char *param)
3118 {
3119         char                   *copy;
3120         int                     rc, copy_size;
3121         ENTRY;
3122
3123 #ifndef HAVE_GSS
3124         RETURN(-EINVAL);
3125 #endif
3126         /* keep a copy of original param, which could be destroied
3127          * during parsing */
3128         copy_size = strlen(param) + 1;
3129         OBD_ALLOC(copy, copy_size);
3130         if (copy == NULL)
3131                 return -ENOMEM;
3132         memcpy(copy, param, copy_size);
3133
3134         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3135         if (rc)
3136                 goto out_free;
3137
3138         /* previous steps guaranteed the syntax is correct */
3139         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3140         if (rc)
3141                 goto out_free;
3142
3143         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3144                 /*
3145                  * for mgs rules, make them effective immediately.
3146                  */
3147                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3148                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3149                                                  &fsdb->fsdb_srpc_gen);
3150         }
3151
3152 out_free:
3153         OBD_FREE(copy, copy_size);
3154         RETURN(rc);
3155 }
3156
3157 struct mgs_srpc_read_data {
3158         struct fs_db   *msrd_fsdb;
3159         int             msrd_skip;
3160 };
3161
3162 static int mgs_srpc_read_handler(const struct lu_env *env,
3163                                  struct llog_handle *llh,
3164                                  struct llog_rec_hdr *rec, void *data)
3165 {
3166         struct mgs_srpc_read_data *msrd = data;
3167         struct cfg_marker         *marker;
3168         struct lustre_cfg         *lcfg = REC_DATA(rec);
3169         char                      *svname, *param;
3170         int                        cfg_len, rc;
3171         ENTRY;
3172
3173         if (rec->lrh_type != OBD_CFG_REC) {
3174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3175                 RETURN(-EINVAL);
3176         }
3177
3178         cfg_len = REC_DATA_LEN(rec);
3179
3180         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3181         if (rc) {
3182                 CERROR("Insane cfg\n");
3183                 RETURN(rc);
3184         }
3185
3186         if (lcfg->lcfg_command == LCFG_MARKER) {
3187                 marker = lustre_cfg_buf(lcfg, 1);
3188
3189                 if (marker->cm_flags & CM_START &&
3190                     marker->cm_flags & CM_SKIP)
3191                         msrd->msrd_skip = 1;
3192                 if (marker->cm_flags & CM_END)
3193                         msrd->msrd_skip = 0;
3194
3195                 RETURN(0);
3196         }
3197
3198         if (msrd->msrd_skip)
3199                 RETURN(0);
3200
3201         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3202                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3203                 RETURN(0);
3204         }
3205
3206         svname = lustre_cfg_string(lcfg, 0);
3207         if (svname == NULL) {
3208                 CERROR("svname is empty\n");
3209                 RETURN(0);
3210         }
3211
3212         param = lustre_cfg_string(lcfg, 1);
3213         if (param == NULL) {
3214                 CERROR("param is empty\n");
3215                 RETURN(0);
3216         }
3217
3218         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3219         if (rc)
3220                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3221
3222         RETURN(0);
3223 }
3224
3225 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3226                                 struct mgs_device *mgs,
3227                                 struct fs_db *fsdb)
3228 {
3229         struct llog_handle        *llh = NULL;
3230         struct llog_ctxt          *ctxt;
3231         char                      *logname;
3232         struct mgs_srpc_read_data  msrd;
3233         int                        rc;
3234         ENTRY;
3235
3236         /* construct log name */
3237         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3238         if (rc)
3239                 RETURN(rc);
3240
3241         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3242         LASSERT(ctxt != NULL);
3243
3244         if (mgs_log_is_empty(env, mgs, logname))
3245                 GOTO(out, rc = 0);
3246
3247         rc = llog_open(env, ctxt, &llh, NULL, logname,
3248                        LLOG_OPEN_EXISTS);
3249         if (rc < 0) {
3250                 if (rc == -ENOENT)
3251                         rc = 0;
3252                 GOTO(out, rc);
3253         }
3254
3255         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3256         if (rc)
3257                 GOTO(out_close, rc);
3258
3259         if (llog_get_size(llh) <= 1)
3260                 GOTO(out_close, rc = 0);
3261
3262         msrd.msrd_fsdb = fsdb;
3263         msrd.msrd_skip = 0;
3264
3265         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3266                           NULL);
3267
3268 out_close:
3269         llog_close(env, llh);
3270 out:
3271         llog_ctxt_put(ctxt);
3272         name_destroy(&logname);
3273
3274         if (rc)
3275                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3276         RETURN(rc);
3277 }
3278
3279 /* Permanent settings of all parameters by writing into the appropriate
3280  * configuration logs.
3281  * A parameter with null value ("<param>='\0'") means to erase it out of
3282  * the logs.
3283  */
3284 static int mgs_write_log_param(const struct lu_env *env,
3285                                struct mgs_device *mgs, struct fs_db *fsdb,
3286                                struct mgs_target_info *mti, char *ptr)
3287 {
3288         struct mgs_thread_info *mgi = mgs_env_info(env);
3289         char *logname;
3290         char *tmp;
3291         int rc = 0, rc2 = 0;
3292         ENTRY;
3293
3294         /* For various parameter settings, we have to figure out which logs
3295            care about them (e.g. both mdt and client for lov settings) */
3296         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3297
3298         /* The params are stored in MOUNT_DATA_FILE and modified via
3299            tunefs.lustre, or set using lctl conf_param */
3300
3301         /* Processed in lustre_start_mgc */
3302         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3303                 GOTO(end, rc);
3304
3305         /* Processed in ost/mdt */
3306         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3307                 GOTO(end, rc);
3308
3309         /* Processed in mgs_write_log_ost */
3310         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3311                 if (mti->mti_flags & LDD_F_PARAM) {
3312                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3313                                            "changed with tunefs.lustre"
3314                                            "and --writeconf\n", ptr);
3315                         rc = -EPERM;
3316                 }
3317                 GOTO(end, rc);
3318         }
3319
3320         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3321                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3322                 GOTO(end, rc);
3323         }
3324
3325         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3326                 /* Add a failover nidlist */
3327                 rc = 0;
3328                 /* We already processed failovers params for new
3329                    targets in mgs_write_log_target */
3330                 if (mti->mti_flags & LDD_F_PARAM) {
3331                         CDEBUG(D_MGS, "Adding failnode\n");
3332                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3333                 }
3334                 GOTO(end, rc);
3335         }
3336
3337         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3338                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3339                 GOTO(end, rc);
3340         }
3341
3342         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3343                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3344                 GOTO(end, rc);
3345         }
3346
3347         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3348             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3349                 /* active=0 means off, anything else means on */
3350                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3351                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3352                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3353                 int i;
3354
3355                 if (!deactive_osc) {
3356                         __u32   index;
3357
3358                         rc = server_name2index(mti->mti_svname, &index, NULL);
3359                         if (rc < 0)
3360                                 GOTO(end, rc);
3361
3362                         if (index == 0) {
3363                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3364                                                    " (de)activated.\n",
3365                                                    mti->mti_svname);
3366                                 GOTO(end, rc = -EINVAL);
3367                         }
3368                 }
3369
3370                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3371                               flag ? "de" : "re", mti->mti_svname);
3372                 /* Modify clilov */
3373                 rc = name_create(&logname, mti->mti_fsname, "-client");
3374                 if (rc < 0)
3375                         GOTO(end, rc);
3376                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3377                                 mti->mti_svname,
3378                                 deactive_osc ? "add osc" : "add mdc", flag);
3379                 name_destroy(&logname);
3380                 if (rc < 0)
3381                         goto active_err;
3382
3383                 /* Modify mdtlov */
3384                 /* Add to all MDT logs for DNE */
3385                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3386                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3387                                 continue;
3388                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3389                         if (rc < 0)
3390                                 GOTO(end, rc);
3391                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3392                                         mti->mti_svname,
3393                                         deactive_osc ? "add osc" : "add osp",
3394                                         flag);
3395                         name_destroy(&logname);
3396                         if (rc < 0)
3397                                 goto active_err;
3398                 }
3399 active_err:
3400                 if (rc < 0) {
3401                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3402                                            "log (%d). No permanent "
3403                                            "changes were made to the "
3404                                            "config log.\n",
3405                                            mti->mti_svname, rc);
3406                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3407                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3408                                                    " because the log"
3409                                                    "is in the old 1.4"
3410                                                    "style. Consider "
3411                                                    " --writeconf to "
3412                                                    "update the logs.\n");
3413                         GOTO(end, rc);
3414                 }
3415                 /* Fall through to osc/mdc proc for deactivating live
3416                    OSC/OSP on running MDT / clients. */
3417         }
3418         /* Below here, let obd's XXX_process_config methods handle it */
3419
3420         /* All lov. in proc */
3421         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3422                 char *mdtlovname;
3423
3424                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3425                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3426                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3427                                            "set on the MDT, not %s. "
3428                                            "Ignoring.\n",
3429                                            mti->mti_svname);
3430                         GOTO(end, rc = 0);
3431                 }
3432
3433                 /* Modify mdtlov */
3434                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3435                         GOTO(end, rc = -ENODEV);
3436
3437                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3438                                              mti->mti_stripe_index);
3439                 if (rc)
3440                         GOTO(end, rc);
3441                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3442                                   &mgi->mgi_bufs, mdtlovname, ptr);
3443                 name_destroy(&logname);
3444                 name_destroy(&mdtlovname);
3445                 if (rc)
3446                         GOTO(end, rc);
3447
3448                 /* Modify clilov */
3449                 rc = name_create(&logname, mti->mti_fsname, "-client");
3450                 if (rc)
3451                         GOTO(end, rc);
3452                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3453                                   fsdb->fsdb_clilov, ptr);
3454                 name_destroy(&logname);
3455                 GOTO(end, rc);
3456         }
3457
3458         /* All osc., mdc., llite. params in proc */
3459         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3460             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3461             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3462                 char *cname;
3463
3464                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3465                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3466                                            " cannot be modified. Consider"
3467                                            " updating the configuration with"
3468                                            " --writeconf\n",
3469                                            mti->mti_svname);
3470                         GOTO(end, rc = -EINVAL);
3471                 }
3472                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3473                         rc = name_create(&cname, mti->mti_fsname, "-client");
3474                         /* Add the client type to match the obdname in
3475                            class_config_llog_handler */
3476                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3477                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3478                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3479                         rc = name_create(&cname, mti->mti_svname, "-osc");
3480                 } else {
3481                         GOTO(end, rc = -EINVAL);
3482                 }
3483                 if (rc)
3484                         GOTO(end, rc);
3485
3486                 /* Forbid direct update of llite root squash parameters.
3487                  * These parameters are indirectly set via the MDT settings.
3488                  * See (LU-1778) */
3489                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3490                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3491                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3492                         LCONSOLE_ERROR("%s: root squash parameters can only "
3493                                 "be updated through MDT component\n",
3494                                 mti->mti_fsname);
3495                         name_destroy(&cname);
3496                         GOTO(end, rc = -EINVAL);
3497                 }
3498
3499                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3500
3501                 /* Modify client */
3502                 rc = name_create(&logname, mti->mti_fsname, "-client");
3503                 if (rc) {
3504                         name_destroy(&cname);
3505                         GOTO(end, rc);
3506                 }
3507                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3508                                   cname, ptr);
3509
3510                 /* osc params affect the MDT as well */
3511                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3512                         int i;
3513
3514                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3515                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3516                                         continue;
3517                                 name_destroy(&cname);
3518                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3519                                                          fsdb, i);
3520                                 name_destroy(&logname);
3521                                 if (rc)
3522                                         break;
3523                                 rc = name_create_mdt(&logname,
3524                                                      mti->mti_fsname, i);
3525                                 if (rc)
3526                                         break;
3527                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3528                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3529                                                           mti, logname,
3530                                                           &mgi->mgi_bufs,
3531                                                           cname, ptr);
3532                                         if (rc)
3533                                                 break;
3534                                 }
3535                         }
3536                 }
3537
3538                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
3539                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
3540                     rc == 0) {
3541                         char suffix[16];
3542                         char *lodname = NULL;
3543                         char *param_str = NULL;
3544                         int i;
3545                         int index;
3546
3547                         /* replace mdc with osp */
3548                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
3549                         rc = server_name2index(mti->mti_svname, &index, NULL);
3550                         if (rc < 0) {
3551                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3552                                 GOTO(end, rc);
3553                         }
3554
3555                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3556                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3557                                         continue;
3558
3559                                 if (i == index)
3560                                         continue;
3561
3562                                 name_destroy(&logname);
3563                                 rc = name_create_mdt(&logname, mti->mti_fsname,
3564                                                      i);
3565                                 if (rc < 0)
3566                                         break;
3567
3568                                 if (mgs_log_is_empty(env, mgs, logname))
3569                                         continue;
3570
3571                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
3572                                          i);
3573                                 name_destroy(&cname);
3574                                 rc = name_create(&cname, mti->mti_svname,
3575                                                  suffix);
3576                                 if (rc < 0)
3577                                         break;
3578
3579                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3580                                                   &mgi->mgi_bufs, cname, ptr);
3581                                 if (rc < 0)
3582                                         break;
3583
3584                                 /* Add configuration log for noitfying LOD
3585                                  * to active/deactive the OSP. */
3586                                 name_destroy(&param_str);
3587                                 rc = name_create(&param_str, cname,
3588                                                  (*tmp == '0') ?  ".active=0" :
3589                                                  ".active=1");
3590                                 if (rc < 0)
3591                                         break;
3592
3593                                 name_destroy(&lodname);
3594                                 rc = name_create(&lodname, logname, "-mdtlov");
3595                                 if (rc < 0)
3596                                         break;
3597
3598                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3599                                                   &mgi->mgi_bufs, lodname,
3600                                                   param_str);
3601                                 if (rc < 0)
3602                                         break;
3603                         }
3604                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3605                         name_destroy(&lodname);
3606                         name_destroy(&param_str);
3607                 }
3608
3609                 name_destroy(&logname);
3610                 name_destroy(&cname);
3611                 GOTO(end, rc);
3612         }
3613
3614         /* All mdt. params in proc */
3615         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3616                 int i;
3617                 __u32 idx;
3618
3619                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3620                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3621                             MTI_NAME_MAXLEN) == 0)
3622                         /* device is unspecified completely? */
3623                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3624                 else
3625                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3626                 if (rc < 0)
3627                         goto active_err;
3628                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3629                         goto active_err;
3630                 if (rc & LDD_F_SV_ALL) {
3631                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3632                                 if (!test_bit(i,
3633                                                   fsdb->fsdb_mdt_index_map))
3634                                         continue;
3635                                 rc = name_create_mdt(&logname,
3636                                                 mti->mti_fsname, i);
3637                                 if (rc)
3638                                         goto active_err;
3639                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3640                                                   logname, &mgi->mgi_bufs,
3641                                                   logname, ptr);
3642                                 name_destroy(&logname);
3643                                 if (rc)
3644                                         goto active_err;
3645                         }
3646                 } else {
3647                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3648                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3649                                 LCONSOLE_ERROR("%s: root squash parameters "
3650                                         "cannot be applied to a single MDT\n",
3651                                         mti->mti_fsname);
3652                                 GOTO(end, rc = -EINVAL);
3653                         }
3654                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3655                                           mti->mti_svname, &mgi->mgi_bufs,
3656                                           mti->mti_svname, ptr);
3657                         if (rc)
3658                                 goto active_err;
3659                 }
3660
3661                 /* root squash settings are also applied to llite
3662                  * config log (see LU-1778) */
3663                 if (rc == 0 &&
3664                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3665                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3666                         char *cname;
3667                         char *ptr2;
3668
3669                         rc = name_create(&cname, mti->mti_fsname, "-client");
3670                         if (rc)
3671                                 GOTO(end, rc);
3672                         rc = name_create(&logname, mti->mti_fsname, "-client");
3673                         if (rc) {
3674                                 name_destroy(&cname);
3675                                 GOTO(end, rc);
3676                         }
3677                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3678                         if (rc) {
3679                                 name_destroy(&cname);
3680                                 name_destroy(&logname);
3681                                 GOTO(end, rc);
3682                         }
3683                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3684                                           &mgi->mgi_bufs, cname, ptr2);
3685                         name_destroy(&ptr2);
3686                         name_destroy(&logname);
3687                         name_destroy(&cname);
3688                 }
3689                 GOTO(end, rc);
3690         }
3691
3692         /* All mdd., ost. and osd. params in proc */
3693         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3694             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3695             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3696                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3697                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3698                         GOTO(end, rc = -ENODEV);
3699
3700                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3701                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3702                 GOTO(end, rc);
3703         }
3704
3705         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3706         rc2 = -ENOSYS;
3707
3708 end:
3709         if (rc)
3710                 CERROR("err %d on param '%s'\n", rc, ptr);
3711
3712         RETURN(rc ?: rc2);
3713 }
3714
3715 /* Not implementing automatic failover nid addition at this time. */
3716 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3717                       struct mgs_target_info *mti)
3718 {
3719 #if 0
3720         struct fs_db *fsdb;
3721         int rc;
3722         ENTRY;
3723
3724         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3725         if (rc)
3726                 RETURN(rc);
3727
3728         if (mgs_log_is_empty(obd, mti->mti_svname))
3729                 /* should never happen */
3730                 RETURN(-ENOENT);
3731
3732         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3733
3734         /* FIXME We can just check mti->params to see if we're already in
3735            the failover list.  Modify mti->params for rewriting back at
3736            server_register_target(). */
3737
3738         mutex_lock(&fsdb->fsdb_mutex);
3739         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3740         mutex_unlock(&fsdb->fsdb_mutex);
3741         char    *buf, *params;
3742         int      rc = -EINVAL;
3743
3744         RETURN(rc);
3745 #endif
3746         return 0;
3747 }
3748
3749 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
3750                          struct mgs_target_info *mti, struct fs_db *fsdb)
3751 {
3752         char    *buf, *params;
3753         int      rc = -EINVAL;
3754
3755         ENTRY;
3756
3757         /* set/check the new target index */
3758         rc = mgs_set_index(env, mgs, mti);
3759         if (rc < 0)
3760                 RETURN(rc);
3761
3762         if (rc == EALREADY) {
3763                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3764                               mti->mti_stripe_index, mti->mti_svname);
3765                 /* We would like to mark old log sections as invalid
3766                    and add new log sections in the client and mdt logs.
3767                    But if we add new sections, then live clients will
3768                    get repeat setup instructions for already running
3769                    osc's. So don't update the client/mdt logs. */
3770                 mti->mti_flags &= ~LDD_F_UPDATE;
3771                 rc = 0;
3772         }
3773
3774         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
3775                          cfs_fail_val : 10);
3776
3777         mutex_lock(&fsdb->fsdb_mutex);
3778
3779         if (mti->mti_flags &
3780             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3781                 /* Generate a log from scratch */
3782                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3783                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3784                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3785                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3786                 } else {
3787                         CERROR("Unknown target type %#x, can't create log for "
3788                                "%s\n", mti->mti_flags, mti->mti_svname);
3789                 }
3790                 if (rc) {
3791                         CERROR("Can't write logs for %s (%d)\n",
3792                                mti->mti_svname, rc);
3793                         GOTO(out_up, rc);
3794                 }
3795         } else {
3796                 /* Just update the params from tunefs in mgs_write_log_params */
3797                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3798                 mti->mti_flags |= LDD_F_PARAM;
3799         }
3800
3801         /* allocate temporary buffer, where class_get_next_param will
3802            make copy of a current  parameter */
3803         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3804         if (buf == NULL)
3805                 GOTO(out_up, rc = -ENOMEM);
3806         params = mti->mti_params;
3807         while (params != NULL) {
3808                 rc = class_get_next_param(&params, buf);
3809                 if (rc) {
3810                         if (rc == 1)
3811                                 /* there is no next parameter, that is
3812                                    not an error */
3813                                 rc = 0;
3814                         break;
3815                 }
3816                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3817                        params, buf);
3818                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3819                 if (rc)
3820                         break;
3821         }
3822
3823         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3824
3825 out_up:
3826         mutex_unlock(&fsdb->fsdb_mutex);
3827         RETURN(rc);
3828 }
3829
3830 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3831 {
3832         struct llog_ctxt        *ctxt;
3833         int                      rc = 0;
3834
3835         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3836         if (ctxt == NULL) {
3837                 CERROR("%s: MGS config context doesn't exist\n",
3838                        mgs->mgs_obd->obd_name);
3839                 rc = -ENODEV;
3840         } else {
3841                 rc = llog_erase(env, ctxt, NULL, name);
3842                 /* llog may not exist */
3843                 if (rc == -ENOENT)
3844                         rc = 0;
3845                 llog_ctxt_put(ctxt);
3846         }
3847
3848         if (rc)
3849                 CERROR("%s: failed to clear log %s: %d\n",
3850                        mgs->mgs_obd->obd_name, name, rc);
3851
3852         return rc;
3853 }
3854
3855 /* erase all logs for the given fs */
3856 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3857 {
3858         struct fs_db *fsdb;
3859         struct list_head log_list;
3860         struct mgs_direntry *dirent, *n;
3861         int rc, len = strlen(fsname);
3862         char *suffix;
3863         ENTRY;
3864
3865         /* Find all the logs in the CONFIGS directory */
3866         rc = class_dentry_readdir(env, mgs, &log_list);
3867         if (rc)
3868                 RETURN(rc);
3869
3870         mutex_lock(&mgs->mgs_mutex);
3871
3872         /* Delete the fs db */
3873         fsdb = mgs_find_fsdb(mgs, fsname);
3874         if (fsdb)
3875                 mgs_free_fsdb(mgs, fsdb);
3876
3877         mutex_unlock(&mgs->mgs_mutex);
3878
3879         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3880                 list_del_init(&dirent->mde_list);
3881                 suffix = strrchr(dirent->mde_name, '-');
3882                 if (suffix != NULL) {
3883                         if ((len == suffix - dirent->mde_name) &&
3884                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
3885                                 CDEBUG(D_MGS, "Removing log %s\n",
3886                                        dirent->mde_name);
3887                                 mgs_erase_log(env, mgs, dirent->mde_name);
3888                         }
3889                 }
3890                 mgs_direntry_free(dirent);
3891         }
3892
3893         RETURN(rc);
3894 }
3895
3896 /* list all logs for the given fs */
3897 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3898                   struct obd_ioctl_data *data)
3899 {
3900         struct list_head         log_list;
3901         struct mgs_direntry     *dirent, *n;
3902         char                    *out, *suffix;
3903         int                      l, remains, rc;
3904
3905         ENTRY;
3906
3907         /* Find all the logs in the CONFIGS directory */
3908         rc = class_dentry_readdir(env, mgs, &log_list);
3909         if (rc)
3910                 RETURN(rc);
3911
3912         out = data->ioc_bulk;
3913         remains = data->ioc_inllen1;
3914         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
3915                 list_del_init(&dirent->mde_list);
3916                 suffix = strrchr(dirent->mde_name, '-');
3917                 if (suffix != NULL) {
3918                         l = snprintf(out, remains, "config log: $%s\n",
3919                                      dirent->mde_name);
3920                         out += l;
3921                         remains -= l;
3922                 }
3923                 mgs_direntry_free(dirent);
3924                 if (remains <= 0)
3925                         break;
3926         }
3927         RETURN(rc);
3928 }
3929
3930 /* from llog_swab */
3931 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3932 {
3933         int i;
3934         ENTRY;
3935
3936         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3937         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3938
3939         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3940         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3941         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3942         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3943
3944         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3945         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3946                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3947                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3948                                i, lcfg->lcfg_buflens[i],
3949                                lustre_cfg_string(lcfg, i));
3950                 }
3951         EXIT;
3952 }
3953
3954 /* Setup _mgs fsdb and log
3955  */
3956 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3957                           struct fs_db *fsdb)
3958 {
3959         int                     rc;
3960         ENTRY;
3961
3962         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
3963
3964         RETURN(rc);
3965 }
3966
3967 /* Setup params fsdb and log
3968  */
3969 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs,
3970                           struct fs_db *fsdb)
3971 {
3972         struct llog_handle      *params_llh = NULL;
3973         int                     rc;
3974         ENTRY;
3975
3976         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
3977         if (fsdb != NULL) {
3978                 mutex_lock(&fsdb->fsdb_mutex);
3979                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
3980                 if (rc == 0)
3981                         rc = record_end_log(env, &params_llh);
3982                 mutex_unlock(&fsdb->fsdb_mutex);
3983         }
3984
3985         RETURN(rc);
3986 }
3987
3988 /* Cleanup params fsdb and log
3989  */
3990 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
3991 {
3992         return mgs_erase_logs(env, mgs, PARAMS_FILENAME);
3993 }
3994
3995 /* Set a permanent (config log) param for a target or fs
3996  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3997  *             buf1 contains the single parameter
3998  */
3999 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
4000                  struct lustre_cfg *lcfg, char *fsname)
4001 {
4002         struct fs_db *fsdb;
4003         struct mgs_target_info *mti;
4004         char *devname, *param;
4005         char *ptr;
4006         const char *tmp;
4007         __u32 index;
4008         int rc = 0;
4009         ENTRY;
4010
4011         print_lustre_cfg(lcfg);
4012
4013         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
4014         devname = lustre_cfg_string(lcfg, 0);
4015         param = lustre_cfg_string(lcfg, 1);
4016         if (!devname) {
4017                 /* Assume device name embedded in param:
4018                    lustre-OST0000.osc.max_dirty_mb=32 */
4019                 ptr = strchr(param, '.');
4020                 if (ptr) {
4021                         devname = param;
4022                         *ptr = 0;
4023                         param = ptr + 1;
4024                 }
4025         }
4026         if (!devname) {
4027                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
4028                 RETURN(-ENOSYS);
4029         }
4030
4031         rc = mgs_parse_devname(devname, fsname, NULL);
4032         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
4033                 /* param related to llite isn't allowed to set by OST or MDT */
4034                 if (rc == 0 && strncmp(param, PARAM_LLITE,
4035                                        sizeof(PARAM_LLITE) - 1) == 0)
4036                         RETURN(-EINVAL);
4037         } else {
4038                 /* assume devname is the fsname */
4039                 strlcpy(fsname, devname, MTI_NAME_MAXLEN);
4040         }
4041         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
4042
4043         rc = mgs_find_or_make_fsdb(env, mgs,
4044                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
4045                                    PARAMS_FILENAME : fsname, &fsdb);
4046         if (rc)
4047                 RETURN(rc);
4048
4049         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
4050             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
4051             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4052                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
4053                        "is '%s'\n", fsname, devname);
4054                 mgs_free_fsdb(mgs, fsdb);
4055                 RETURN(-EINVAL);
4056         }
4057
4058         /* Create a fake mti to hold everything */
4059         OBD_ALLOC_PTR(mti);
4060         if (!mti)
4061                 GOTO(out, rc = -ENOMEM);
4062         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
4063             >= sizeof(mti->mti_fsname))
4064                 GOTO(out, rc = -E2BIG);
4065         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
4066             >= sizeof(mti->mti_svname))
4067                 GOTO(out, rc = -E2BIG);
4068         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
4069             >= sizeof(mti->mti_params))
4070                 GOTO(out, rc = -E2BIG);
4071         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
4072         if (rc < 0)
4073                 /* Not a valid server; may be only fsname */
4074                 rc = 0;
4075         else
4076                 /* Strip -osc or -mdc suffix from svname */
4077                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
4078                                      mti->mti_svname))
4079                         GOTO(out, rc = -EINVAL);
4080         /*
4081          * Revoke lock so everyone updates.  Should be alright if
4082          * someone was already reading while we were updating the logs,
4083          * so we don't really need to hold the lock while we're
4084          * writing (above).
4085          */
4086         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
4087                 mti->mti_flags = rc | LDD_F_PARAM2;
4088                 mutex_lock(&fsdb->fsdb_mutex);
4089                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
4090                 mutex_unlock(&fsdb->fsdb_mutex);
4091                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
4092         } else {
4093                 mti->mti_flags = rc | LDD_F_PARAM;
4094                 mutex_lock(&fsdb->fsdb_mutex);
4095                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
4096                 mutex_unlock(&fsdb->fsdb_mutex);
4097                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4098         }
4099
4100 out:
4101         OBD_FREE_PTR(mti);
4102         RETURN(rc);
4103 }
4104
4105 static int mgs_write_log_pool(const struct lu_env *env,
4106                               struct mgs_device *mgs, char *logname,
4107                               struct fs_db *fsdb, char *tgtname,
4108                               enum lcfg_command_type cmd,
4109                               char *fsname, char *poolname,
4110                               char *ostname, char *comment)
4111 {
4112         struct llog_handle *llh = NULL;
4113         int rc;
4114
4115         rc = record_start_log(env, mgs, &llh, logname);
4116         if (rc)
4117                 return rc;
4118         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
4119         if (rc)
4120                 goto out;
4121         rc = record_base(env, llh, tgtname, 0, cmd,
4122                          fsname, poolname, ostname, NULL);
4123         if (rc)
4124                 goto out;
4125         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
4126 out:
4127         record_end_log(env, &llh);
4128         return rc;
4129 }
4130
4131 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
4132                     enum lcfg_command_type cmd, const char *nodemap_name,
4133                     char *param)
4134 {
4135         lnet_nid_t      nid[2];
4136         __u32           idmap[2];
4137         bool            bool_switch;
4138         __u32           int_id;
4139         int             rc = 0;
4140         ENTRY;
4141
4142         switch (cmd) {
4143         case LCFG_NODEMAP_ADD:
4144                 rc = nodemap_add(nodemap_name);
4145                 break;
4146         case LCFG_NODEMAP_DEL:
4147                 rc = nodemap_del(nodemap_name);
4148                 break;
4149         case LCFG_NODEMAP_ADD_RANGE:
4150                 rc = nodemap_parse_range(param, nid);
4151                 if (rc != 0)
4152                         break;
4153                 rc = nodemap_add_range(nodemap_name, nid);
4154                 break;
4155         case LCFG_NODEMAP_DEL_RANGE:
4156                 rc = nodemap_parse_range(param, nid);
4157                 if (rc != 0)
4158                         break;
4159                 rc = nodemap_del_range(nodemap_name, nid);
4160                 break;
4161         case LCFG_NODEMAP_ADMIN:
4162                 bool_switch = simple_strtoul(param, NULL, 10);
4163                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
4164                 break;
4165         case LCFG_NODEMAP_TRUSTED:
4166                 bool_switch = simple_strtoul(param, NULL, 10);
4167                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
4168                 break;
4169         case LCFG_NODEMAP_SQUASH_UID:
4170                 int_id = simple_strtoul(param, NULL, 10);
4171                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
4172                 break;
4173         case LCFG_NODEMAP_SQUASH_GID:
4174                 int_id = simple_strtoul(param, NULL, 10);
4175                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
4176                 break;
4177         case LCFG_NODEMAP_ADD_UIDMAP:
4178         case LCFG_NODEMAP_ADD_GIDMAP:
4179                 rc = nodemap_parse_idmap(param, idmap);
4180                 if (rc != 0)
4181                         break;
4182                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
4183                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
4184                                                idmap);
4185                 else
4186                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
4187                                                idmap);
4188                 break;
4189         case LCFG_NODEMAP_DEL_UIDMAP:
4190         case LCFG_NODEMAP_DEL_GIDMAP:
4191                 rc = nodemap_parse_idmap(param, idmap);
4192                 if (rc != 0)
4193                         break;
4194                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
4195                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
4196                                                idmap);
4197                 else
4198                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
4199                                                idmap);
4200                 break;
4201         case LCFG_NODEMAP_SET_FILESET:
4202                 rc = nodemap_set_fileset(nodemap_name, param);
4203                 break;
4204         default:
4205                 rc = -EINVAL;
4206         }
4207
4208         RETURN(rc);
4209 }
4210
4211 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
4212                  enum lcfg_command_type cmd, char *fsname,
4213                  char *poolname, char *ostname)
4214 {
4215         struct fs_db *fsdb;
4216         char *lovname;
4217         char *logname;
4218         char *label = NULL, *canceled_label = NULL;
4219         int label_sz;
4220         struct mgs_target_info *mti = NULL;
4221         int rc, i;
4222         ENTRY;
4223
4224         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
4225         if (rc) {
4226                 CERROR("Can't get db for %s\n", fsname);
4227                 RETURN(rc);
4228         }
4229         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4230                 CERROR("%s is not defined\n", fsname);
4231                 mgs_free_fsdb(mgs, fsdb);
4232                 RETURN(-EINVAL);
4233         }
4234
4235         label_sz = 10 + strlen(fsname) + strlen(poolname);
4236
4237         /* check if ostname match fsname */
4238         if (ostname != NULL) {
4239                 char *ptr;
4240
4241                 ptr = strrchr(ostname, '-');
4242                 if ((ptr == NULL) ||
4243                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4244                         RETURN(-EINVAL);
4245                 label_sz += strlen(ostname);
4246         }
4247
4248         OBD_ALLOC(label, label_sz);
4249         if (label == NULL)
4250                 RETURN(-ENOMEM);
4251
4252         switch(cmd) {
4253         case LCFG_POOL_NEW:
4254                 sprintf(label,
4255                         "new %s.%s", fsname, poolname);
4256                 break;
4257         case LCFG_POOL_ADD:
4258                 sprintf(label,
4259                         "add %s.%s.%s", fsname, poolname, ostname);
4260                 break;
4261         case LCFG_POOL_REM:
4262                 OBD_ALLOC(canceled_label, label_sz);
4263                 if (canceled_label == NULL)
4264                         GOTO(out_label, rc = -ENOMEM);
4265                 sprintf(label,
4266                         "rem %s.%s.%s", fsname, poolname, ostname);
4267                 sprintf(canceled_label,
4268                         "add %s.%s.%s", fsname, poolname, ostname);
4269                 break;
4270         case LCFG_POOL_DEL:
4271                 OBD_ALLOC(canceled_label, label_sz);
4272                 if (canceled_label == NULL)
4273                         GOTO(out_label, rc = -ENOMEM);
4274                 sprintf(label,
4275                         "del %s.%s", fsname, poolname);
4276                 sprintf(canceled_label,
4277                         "new %s.%s", fsname, poolname);
4278                 break;
4279         default:
4280                 break;
4281         }
4282
4283         if (canceled_label != NULL) {
4284                 OBD_ALLOC_PTR(mti);
4285                 if (mti == NULL)
4286                         GOTO(out_cancel, rc = -ENOMEM);
4287         }
4288
4289         mutex_lock(&fsdb->fsdb_mutex);
4290         /* write pool def to all MDT logs */
4291         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4292                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4293                         rc = name_create_mdt_and_lov(&logname, &lovname,
4294                                                      fsdb, i);
4295                         if (rc) {
4296                                 mutex_unlock(&fsdb->fsdb_mutex);
4297                                 GOTO(out_mti, rc);
4298                         }
4299                         if (canceled_label != NULL) {
4300                                 strcpy(mti->mti_svname, "lov pool");
4301                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4302                                                 lovname, canceled_label,
4303                                                 CM_SKIP);
4304                         }
4305
4306                         if (rc >= 0)
4307                                 rc = mgs_write_log_pool(env, mgs, logname,
4308                                                         fsdb, lovname, cmd,
4309                                                         fsname, poolname,
4310                                                         ostname, label);
4311                         name_destroy(&logname);
4312                         name_destroy(&lovname);
4313                         if (rc) {
4314                                 mutex_unlock(&fsdb->fsdb_mutex);
4315                                 GOTO(out_mti, rc);
4316                         }
4317                 }
4318         }
4319
4320         rc = name_create(&logname, fsname, "-client");
4321         if (rc) {
4322                 mutex_unlock(&fsdb->fsdb_mutex);
4323                 GOTO(out_mti, rc);
4324         }
4325         if (canceled_label != NULL) {
4326                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4327                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4328                 if (rc < 0) {
4329                         mutex_unlock(&fsdb->fsdb_mutex);
4330                         name_destroy(&logname);
4331                         GOTO(out_mti, rc);
4332                 }
4333         }
4334
4335         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4336                                 cmd, fsname, poolname, ostname, label);
4337         mutex_unlock(&fsdb->fsdb_mutex);
4338         name_destroy(&logname);
4339         /* request for update */
4340         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4341
4342         EXIT;
4343 out_mti:
4344         if (mti != NULL)
4345                 OBD_FREE_PTR(mti);
4346 out_cancel:
4347         if (canceled_label != NULL)
4348                 OBD_FREE(canceled_label, label_sz);
4349 out_label:
4350         OBD_FREE(label, label_sz);
4351         return rc;
4352 }