Whamcloud - gitweb
350ab003e804df71bdee16fd894d582c9cc3b453
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgs/mgs_llog.c
33  *
34  * Lustre Management Server (mgs) config llog creation
35  *
36  * Author: Nathan Rutman <nathan@clusterfs.com>
37  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
38  * Author: Mikhail Pershin <tappro@whamcloud.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MGS
42 #define D_MGS D_CONFIG
43
44 #include <obd.h>
45 #include <uapi/linux/lustre_ioctl.h>
46 #include <uapi/linux/lustre_param.h>
47 #include <lustre_sec.h>
48 #include <lustre_quota.h>
49
50 #include "mgs_internal.h"
51
52 /********************** Class functions ********************/
53
54 /**
55  * Find all logs in CONFIG directory and link then into list.
56  *
57  * \param[in] env       pointer to the thread context
58  * \param[in] mgs       pointer to the mgs device
59  * \param[out] log_list the list to hold the found llog name entry
60  *
61  * \retval              0 for success
62  * \retval              negative error number on failure
63  **/
64 int class_dentry_readdir(const struct lu_env *env, struct mgs_device *mgs,
65                          struct list_head *log_list)
66 {
67         struct dt_object *dir = mgs->mgs_configs_dir;
68         const struct dt_it_ops *iops;
69         struct dt_it *it;
70         struct mgs_direntry *de;
71         char *key;
72         int rc, key_sz;
73
74         INIT_LIST_HEAD(log_list);
75
76         LASSERT(dir);
77         LASSERT(dir->do_index_ops);
78
79         iops = &dir->do_index_ops->dio_it;
80         it = iops->init(env, dir, LUDA_64BITHASH);
81         if (IS_ERR(it))
82                 RETURN(PTR_ERR(it));
83
84         rc = iops->load(env, it, 0);
85         if (rc <= 0)
86                 GOTO(fini, rc = 0);
87
88         /* main cycle */
89         do {
90                 key = (void *)iops->key(env, it);
91                 if (IS_ERR(key)) {
92                         CERROR("%s: key failed when listing %s: rc = %d\n",
93                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
94                                (int) PTR_ERR(key));
95                         goto next;
96                 }
97                 key_sz = iops->key_size(env, it);
98                 LASSERT(key_sz > 0);
99
100                 /* filter out "." and ".." entries */
101                 if (key[0] == '.') {
102                         if (key_sz == 1)
103                                 goto next;
104                         if (key_sz == 2 && key[1] == '.')
105                                 goto next;
106                 }
107
108                 /* filter out ".bak" files */
109                 /* sizeof(".bak") - 1 == 3 */
110                 if (key_sz >= 3 &&
111                     !memcmp(".bak", key + key_sz - 3, 3)) {
112                         CDEBUG(D_MGS, "Skipping backup file %.*s\n",
113                                key_sz, key);
114                         goto next;
115                 }
116
117                 de = mgs_direntry_alloc(key_sz + 1);
118                 if (de == NULL) {
119                         rc = -ENOMEM;
120                         break;
121                 }
122
123                 memcpy(de->mde_name, key, key_sz);
124                 de->mde_name[key_sz] = 0;
125
126                 list_add(&de->mde_list, log_list);
127
128 next:
129                 rc = iops->next(env, it);
130         } while (rc == 0);
131         if (rc > 0)
132                 rc = 0;
133
134         iops->put(env, it);
135
136 fini:
137         iops->fini(env, it);
138         if (rc) {
139                 struct mgs_direntry *n;
140
141                 CERROR("%s: key failed when listing %s: rc = %d\n",
142                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
143
144                 list_for_each_entry_safe(de, n, log_list, mde_list) {
145                         list_del_init(&de->mde_list);
146                         mgs_direntry_free(de);
147                 }
148         }
149
150         RETURN(rc);
151 }
152
153 /******************** DB functions *********************/
154
155 static inline int name_create(char **newname, char *prefix, char *suffix)
156 {
157         LASSERT(newname);
158         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
159         if (!*newname)
160                 return -ENOMEM;
161         sprintf(*newname, "%s%s", prefix, suffix);
162         return 0;
163 }
164
165 static inline void name_destroy(char **name)
166 {
167         if (*name)
168                 OBD_FREE(*name, strlen(*name) + 1);
169         *name = NULL;
170 }
171
172 struct mgs_fsdb_handler_data
173 {
174         struct fs_db   *fsdb;
175         __u32           ver;
176 };
177
178 /* from the (client) config log, figure out:
179         1. which ost's/mdt's are configured (by index)
180         2. what the last config step is
181         3. COMPAT_18 osc name
182 */
183 /* It might be better to have a separate db file, instead of parsing the info
184    out of the client log.  This is slow and potentially error-prone. */
185 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
186                             struct llog_rec_hdr *rec, void *data)
187 {
188         struct mgs_fsdb_handler_data *d = data;
189         struct fs_db *fsdb = d->fsdb;
190         int cfg_len = rec->lrh_len;
191         char *cfg_buf = (char*) (rec + 1);
192         struct lustre_cfg *lcfg;
193         __u32 index;
194         int rc = 0;
195         ENTRY;
196
197         if (rec->lrh_type != OBD_CFG_REC) {
198                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
199                 RETURN(-EINVAL);
200         }
201
202         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
203         if (rc) {
204                 CERROR("Insane cfg\n");
205                 RETURN(rc);
206         }
207
208         lcfg = (struct lustre_cfg *)cfg_buf;
209
210         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
211                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
212
213         /* Figure out ost indicies */
214         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
215         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
216             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
217                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
218                                        NULL, 10);
219                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
220                        lustre_cfg_string(lcfg, 1), index,
221                        lustre_cfg_string(lcfg, 2));
222                 set_bit(index, fsdb->fsdb_ost_index_map);
223         }
224
225         /* Figure out mdt indicies */
226         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
227         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
228             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
229                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
230                                        &index, NULL);
231                 if (rc != LDD_F_SV_TYPE_MDT) {
232                         CWARN("Unparsable MDC name %s, assuming index 0\n",
233                               lustre_cfg_string(lcfg, 0));
234                         index = 0;
235                 }
236                 rc = 0;
237                 CDEBUG(D_MGS, "MDT index is %u\n", index);
238                 if (!test_bit(index, fsdb->fsdb_mdt_index_map)) {
239                         set_bit(index, fsdb->fsdb_mdt_index_map);
240                         fsdb->fsdb_mdt_count++;
241                 }
242         }
243
244         /**
245          * figure out the old config. fsdb_gen = 0 means old log
246          * It is obsoleted and not supported anymore
247          */
248         if (fsdb->fsdb_gen == 0) {
249                 CERROR("Old config format is not supported\n");
250                 RETURN(-EINVAL);
251         }
252
253         /*
254          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
255          */
256         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
257             lcfg->lcfg_command == LCFG_ATTACH &&
258             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
259                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
260                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
261                         CWARN("MDT using 1.8 OSC name scheme\n");
262                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
263                 }
264         }
265
266         if (lcfg->lcfg_command == LCFG_MARKER) {
267                 struct cfg_marker *marker;
268                 marker = lustre_cfg_buf(lcfg, 1);
269
270                 d->ver = marker->cm_vers;
271
272                 /* Keep track of the latest marker step */
273                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
274         }
275
276         RETURN(rc);
277 }
278
279 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
280 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
281                                   struct mgs_device *mgs,
282                                   struct fs_db *fsdb)
283 {
284         char *logname;
285         struct llog_handle *loghandle;
286         struct llog_ctxt *ctxt;
287         struct mgs_fsdb_handler_data d = {
288                 .fsdb = fsdb,
289         };
290         int rc;
291
292         ENTRY;
293
294         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
295         LASSERT(ctxt != NULL);
296         rc = name_create(&logname, fsdb->fsdb_name, "-client");
297         if (rc)
298                 GOTO(out_put, rc);
299         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
300         if (rc)
301                 GOTO(out_pop, rc);
302
303         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
304         if (rc)
305                 GOTO(out_close, rc);
306
307         if (llog_get_size(loghandle) <= 1)
308                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
309
310         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
311         CDEBUG(D_INFO, "get_db = %d\n", rc);
312 out_close:
313         llog_close(env, loghandle);
314 out_pop:
315         name_destroy(&logname);
316 out_put:
317         llog_ctxt_put(ctxt);
318
319         RETURN(rc);
320 }
321
322 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
323 {
324         struct mgs_tgt_srpc_conf *tgtconf;
325
326         /* free target-specific rules */
327         while (fsdb->fsdb_srpc_tgt) {
328                 tgtconf = fsdb->fsdb_srpc_tgt;
329                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
330
331                 LASSERT(tgtconf->mtsc_tgt);
332
333                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
334                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
335                 OBD_FREE_PTR(tgtconf);
336         }
337
338         /* free general rules */
339         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
340 }
341
342 static void mgs_unlink_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
343 {
344         mutex_lock(&mgs->mgs_mutex);
345         if (likely(!list_empty(&fsdb->fsdb_list))) {
346                 LASSERTF(atomic_read(&fsdb->fsdb_ref) >= 2,
347                          "Invalid ref %d on %s\n",
348                          atomic_read(&fsdb->fsdb_ref),
349                          fsdb->fsdb_name);
350
351                 list_del_init(&fsdb->fsdb_list);
352                 /* Drop the reference on the list.*/
353                 mgs_put_fsdb(mgs, fsdb);
354         }
355         mutex_unlock(&mgs->mgs_mutex);
356 }
357
358 /* The caller must hold mgs->mgs_mutex. */
359 static inline struct fs_db *
360 mgs_find_fsdb_noref(struct mgs_device *mgs, const char *fsname)
361 {
362         struct fs_db *fsdb;
363         struct list_head *tmp;
364
365         list_for_each(tmp, &mgs->mgs_fs_db_list) {
366                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
367                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
368                         return fsdb;
369         }
370
371         return NULL;
372 }
373
374 /* The caller must hold mgs->mgs_mutex. */
375 static void mgs_remove_fsdb_by_name(struct mgs_device *mgs, const char *name)
376 {
377         struct fs_db *fsdb;
378
379         fsdb = mgs_find_fsdb_noref(mgs, name);
380         if (fsdb) {
381                 list_del_init(&fsdb->fsdb_list);
382                 /* Drop the reference on the list.*/
383                 mgs_put_fsdb(mgs, fsdb);
384         }
385 }
386
387 /* The caller must hold mgs->mgs_mutex. */
388 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, const char *fsname)
389 {
390         struct fs_db *fsdb;
391
392         fsdb = mgs_find_fsdb_noref(mgs, fsname);
393         if (fsdb)
394                 atomic_inc(&fsdb->fsdb_ref);
395
396         return fsdb;
397 }
398
399 /* The caller must hold mgs->mgs_mutex. */
400 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
401                                   struct mgs_device *mgs, char *fsname)
402 {
403         struct fs_db *fsdb;
404         int rc;
405         ENTRY;
406
407         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
408                 CERROR("fsname %s is too long\n", fsname);
409
410                 RETURN(ERR_PTR(-EINVAL));
411         }
412
413         OBD_ALLOC_PTR(fsdb);
414         if (!fsdb)
415                 RETURN(ERR_PTR(-ENOMEM));
416
417         strncpy(fsdb->fsdb_name, fsname, sizeof(fsdb->fsdb_name));
418         mutex_init(&fsdb->fsdb_mutex);
419         INIT_LIST_HEAD(&fsdb->fsdb_list);
420         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
421         fsdb->fsdb_gen = 1;
422         INIT_LIST_HEAD(&fsdb->fsdb_clients);
423         atomic_set(&fsdb->fsdb_notify_phase, 0);
424         init_waitqueue_head(&fsdb->fsdb_notify_waitq);
425         init_completion(&fsdb->fsdb_notify_comp);
426
427         if (strcmp(fsname, MGSSELF_NAME) == 0) {
428                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
429                 fsdb->fsdb_mgs = mgs;
430                 if (logname_is_barrier(fsname))
431                         goto add;
432         } else {
433                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
434                 if (!fsdb->fsdb_mdt_index_map) {
435                         CERROR("No memory for MDT index maps\n");
436
437                         GOTO(err, rc = -ENOMEM);
438                 }
439
440                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
441                 if (!fsdb->fsdb_ost_index_map) {
442                         CERROR("No memory for OST index maps\n");
443
444                         GOTO(err, rc = -ENOMEM);
445                 }
446
447                 if (logname_is_barrier(fsname))
448                         goto add;
449
450                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
451                 if (rc)
452                         GOTO(err, rc);
453
454                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
455                 if (rc)
456                         GOTO(err, rc);
457
458                 /* initialise data for NID table */
459                 mgs_ir_init_fs(env, mgs, fsdb);
460                 lproc_mgs_add_live(mgs, fsdb);
461         }
462
463         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
464             strcmp(PARAMS_FILENAME, fsname) != 0) {
465                 /* populate the db from the client llog */
466                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
467                 if (rc) {
468                         CERROR("Can't get db from client log %d\n", rc);
469
470                         GOTO(err, rc);
471                 }
472         }
473
474         /* populate srpc rules from params llog */
475         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
476         if (rc) {
477                 CERROR("Can't get db from params log %d\n", rc);
478
479                 GOTO(err, rc);
480         }
481
482 add:
483         /* One ref is for the fsdb on the list.
484          * The other ref is for the caller. */
485         atomic_set(&fsdb->fsdb_ref, 2);
486         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
487
488         RETURN(fsdb);
489
490 err:
491         atomic_set(&fsdb->fsdb_ref, 1);
492         mgs_put_fsdb(mgs, fsdb);
493
494         RETURN(ERR_PTR(rc));
495 }
496
497 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
498 {
499         LASSERT(list_empty(&fsdb->fsdb_list));
500
501         lproc_mgs_del_live(mgs, fsdb);
502
503         /* deinitialize fsr */
504         if (fsdb->fsdb_mgs)
505                 mgs_ir_fini_fs(mgs, fsdb);
506
507         if (fsdb->fsdb_ost_index_map)
508                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
509         if (fsdb->fsdb_mdt_index_map)
510                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
511         name_destroy(&fsdb->fsdb_clilov);
512         name_destroy(&fsdb->fsdb_clilmv);
513         mgs_free_fsdb_srpc(fsdb);
514         OBD_FREE_PTR(fsdb);
515 }
516
517 void mgs_put_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
518 {
519         if (atomic_dec_and_test(&fsdb->fsdb_ref))
520                 mgs_free_fsdb(mgs, fsdb);
521 }
522
523 int mgs_init_fsdb_list(struct mgs_device *mgs)
524 {
525         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
526         return 0;
527 }
528
529 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
530 {
531         struct fs_db *fsdb;
532         struct list_head *tmp, *tmp2;
533
534         mutex_lock(&mgs->mgs_mutex);
535         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
536                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
537                 list_del_init(&fsdb->fsdb_list);
538                 mgs_put_fsdb(mgs, fsdb);
539         }
540         mutex_unlock(&mgs->mgs_mutex);
541         return 0;
542 }
543
544 int mgs_find_or_make_fsdb(const struct lu_env *env, struct mgs_device *mgs,
545                           char *name, struct fs_db **dbh)
546 {
547         struct fs_db *fsdb;
548         int rc = 0;
549         ENTRY;
550
551         mutex_lock(&mgs->mgs_mutex);
552         fsdb = mgs_find_fsdb(mgs, name);
553         if (!fsdb) {
554                 fsdb = mgs_new_fsdb(env, mgs, name);
555                 if (IS_ERR(fsdb))
556                         rc = PTR_ERR(fsdb);
557
558                 CDEBUG(D_MGS, "Created new db: rc = %d\n", rc);
559         }
560         mutex_unlock(&mgs->mgs_mutex);
561
562         if (!rc)
563                 *dbh = fsdb;
564
565         RETURN(rc);
566 }
567
568 /* 1 = index in use
569    0 = index unused
570    -1= empty client log */
571 int mgs_check_index(const struct lu_env *env,
572                     struct mgs_device *mgs,
573                     struct mgs_target_info *mti)
574 {
575         struct fs_db *fsdb;
576         void *imap;
577         int rc = 0;
578         ENTRY;
579
580         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
581
582         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
583         if (rc) {
584                 CERROR("Can't get db for %s\n", mti->mti_fsname);
585                 RETURN(rc);
586         }
587
588         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
589                 GOTO(out, rc = -1);
590
591         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
592                 imap = fsdb->fsdb_ost_index_map;
593         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
594                 imap = fsdb->fsdb_mdt_index_map;
595         else
596                 GOTO(out, rc = -EINVAL);
597
598         if (test_bit(mti->mti_stripe_index, imap))
599                 GOTO(out, rc = 1);
600
601         GOTO(out, rc = 0);
602
603 out:
604         mgs_put_fsdb(mgs, fsdb);
605         return rc;
606 }
607
608 static __inline__ int next_index(void *index_map, int map_len)
609 {
610         int i;
611         for (i = 0; i < map_len * 8; i++)
612                 if (!test_bit(i, index_map)) {
613                          return i;
614                  }
615         CERROR("max index %d exceeded.\n", i);
616         return -1;
617 }
618
619 /* Make the mdt/ost server obd name based on the filesystem name */
620 static bool server_make_name(u32 flags, u16 index, const char *fs,
621                              char *name_buf, size_t name_buf_size)
622 {
623         bool invalid_flag = false;
624
625         if (flags & (LDD_F_SV_TYPE_MDT | LDD_F_SV_TYPE_OST)) {
626                 if (!(flags & LDD_F_SV_ALL))
627                         snprintf(name_buf, name_buf_size, "%.8s%c%s%04x", fs,
628                                 (flags & LDD_F_VIRGIN) ? ':' :
629                                         ((flags & LDD_F_WRITECONF) ? '=' : '-'),
630                                 (flags & LDD_F_SV_TYPE_MDT) ? "MDT" : "OST",
631                                 index);
632         } else if (flags & LDD_F_SV_TYPE_MGS) {
633                 snprintf(name_buf, name_buf_size, "MGS");
634         } else {
635                 CERROR("unknown server type %#x\n", flags);
636                 invalid_flag = true;
637         }
638         return invalid_flag;
639 }
640
641 /* Return codes:
642         0  newly marked as in use
643         <0 err
644         +EALREADY for update of an old index */
645 static int mgs_set_index(const struct lu_env *env,
646                          struct mgs_device *mgs,
647                          struct mgs_target_info *mti)
648 {
649         struct fs_db *fsdb;
650         void *imap;
651         int rc = 0;
652         ENTRY;
653
654         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
655         if (rc) {
656                 CERROR("Can't get db for %s\n", mti->mti_fsname);
657                 RETURN(rc);
658         }
659
660         mutex_lock(&fsdb->fsdb_mutex);
661         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
662                 imap = fsdb->fsdb_ost_index_map;
663         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
664                 imap = fsdb->fsdb_mdt_index_map;
665         } else {
666                 GOTO(out_up, rc = -EINVAL);
667         }
668
669         if (mti->mti_flags & LDD_F_NEED_INDEX) {
670                 rc = next_index(imap, INDEX_MAP_SIZE);
671                 if (rc == -1)
672                         GOTO(out_up, rc = -ERANGE);
673                 mti->mti_stripe_index = rc;
674         }
675
676         /* the last index(0xffff) is reserved for default value. */
677         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
678                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
679                                    "but index must be less than %u.\n",
680                                    mti->mti_svname, mti->mti_stripe_index,
681                                    INDEX_MAP_SIZE * 8 - 1);
682                 GOTO(out_up, rc = -ERANGE);
683         }
684
685         if (test_bit(mti->mti_stripe_index, imap)) {
686                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
687                     !(mti->mti_flags & LDD_F_WRITECONF)) {
688                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
689                                            "%d, but that index is already in "
690                                            "use. Use --writeconf to force\n",
691                                            mti->mti_svname,
692                                            mti->mti_stripe_index);
693                         GOTO(out_up, rc = -EADDRINUSE);
694                 } else {
695                         CDEBUG(D_MGS, "Server %s updating index %d\n",
696                                mti->mti_svname, mti->mti_stripe_index);
697                         GOTO(out_up, rc = EALREADY);
698                 }
699         } else {
700                 set_bit(mti->mti_stripe_index, imap);
701                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
702                         fsdb->fsdb_mdt_count++;
703         }
704
705         set_bit(mti->mti_stripe_index, imap);
706         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
707         if (server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
708                              mti->mti_stripe_index, mti->mti_fsname,
709                              mti->mti_svname, sizeof(mti->mti_svname))) {
710                 CERROR("unknown server type %#x\n", mti->mti_flags);
711                 GOTO(out_up, rc = -EINVAL);
712         }
713
714         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
715                mti->mti_stripe_index);
716
717         GOTO(out_up, rc = 0);
718
719 out_up:
720         mutex_unlock(&fsdb->fsdb_mutex);
721         mgs_put_fsdb(mgs, fsdb);
722         return rc;
723 }
724
725 struct mgs_modify_lookup {
726         struct cfg_marker mml_marker;
727         int               mml_modified;
728 };
729
730 static int mgs_check_record_match(const struct lu_env *env,
731                                 struct llog_handle *llh,
732                                 struct llog_rec_hdr *rec, void *data)
733 {
734         struct cfg_marker *mc_marker = data;
735         struct cfg_marker *marker;
736         struct lustre_cfg *lcfg = REC_DATA(rec);
737         int cfg_len = REC_DATA_LEN(rec);
738         int rc;
739         ENTRY;
740
741
742         if (rec->lrh_type != OBD_CFG_REC) {
743                 CDEBUG(D_ERROR, "Unhandled lrh_type: %#x\n", rec->lrh_type);
744                 RETURN(-EINVAL);
745         }
746
747         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
748         if (rc) {
749                 CDEBUG(D_ERROR, "Insane cfg\n");
750                 RETURN(rc);
751         }
752
753         /* We only care about markers */
754         if (lcfg->lcfg_command != LCFG_MARKER)
755                 RETURN(0);
756
757         marker = lustre_cfg_buf(lcfg, 1);
758
759         if (marker->cm_flags & CM_SKIP)
760                 RETURN(0);
761
762         if ((strcmp(mc_marker->cm_comment, marker->cm_comment) == 0) &&
763                 (strcmp(mc_marker->cm_tgtname, marker->cm_tgtname) == 0)) {
764                 /* Found a non-skipped marker match */
765                 CDEBUG(D_MGS, "Matched rec %u marker %d flag %x %s %s\n",
766                         rec->lrh_index, marker->cm_step,
767                         marker->cm_flags, marker->cm_tgtname,
768                         marker->cm_comment);
769                 rc = LLOG_PROC_BREAK;
770         }
771
772         RETURN(rc);
773 }
774
775 /**
776  * Check an existing config log record with matching comment and device
777  * Return code:
778  * 0 - checked successfully,
779  * LLOG_PROC_BREAK - record matches
780  * negative - error
781  */
782 static int mgs_check_marker(const struct lu_env *env, struct mgs_device *mgs,
783                 struct fs_db *fsdb, struct mgs_target_info *mti,
784                 char *logname, char *devname, char *comment)
785 {
786         struct llog_handle *loghandle;
787         struct llog_ctxt *ctxt;
788         struct cfg_marker *mc_marker;
789         int rc;
790
791         ENTRY;
792
793         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
794         CDEBUG(D_MGS, "mgs check %s/%s/%s\n", logname, devname, comment);
795
796         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
797         LASSERT(ctxt != NULL);
798         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
799         if (rc < 0) {
800                 if (rc == -ENOENT)
801                         rc = 0;
802                 GOTO(out_pop, rc);
803         }
804
805         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
806         if (rc)
807                 GOTO(out_close, rc);
808
809         if (llog_get_size(loghandle) <= 1)
810                 GOTO(out_close, rc = 0);
811
812         OBD_ALLOC_PTR(mc_marker);
813         if (!mc_marker)
814                 GOTO(out_close, rc = -ENOMEM);
815         if (strlcpy(mc_marker->cm_comment, comment,
816                 sizeof(mc_marker->cm_comment)) >=
817                 sizeof(mc_marker->cm_comment))
818                 GOTO(out_free, rc = -E2BIG);
819         if (strlcpy(mc_marker->cm_tgtname, devname,
820                 sizeof(mc_marker->cm_tgtname)) >=
821                 sizeof(mc_marker->cm_tgtname))
822                 GOTO(out_free, rc = -E2BIG);
823
824         rc = llog_process(env, loghandle, mgs_check_record_match,
825                         (void *)mc_marker, NULL);
826
827 out_free:
828         OBD_FREE_PTR(mc_marker);
829
830 out_close:
831         llog_close(env, loghandle);
832 out_pop:
833         if (rc && rc != LLOG_PROC_BREAK)
834                 CDEBUG(D_ERROR, "%s: mgs check %s/%s failed: rc = %d\n",
835                         mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
836         llog_ctxt_put(ctxt);
837         RETURN(rc);
838 }
839
840 static int mgs_modify_handler(const struct lu_env *env,
841                               struct llog_handle *llh,
842                               struct llog_rec_hdr *rec, void *data)
843 {
844         struct mgs_modify_lookup *mml = data;
845         struct cfg_marker *marker;
846         struct lustre_cfg *lcfg = REC_DATA(rec);
847         int cfg_len = REC_DATA_LEN(rec);
848         int rc;
849         ENTRY;
850
851         if (rec->lrh_type != OBD_CFG_REC) {
852                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
853                 RETURN(-EINVAL);
854         }
855
856         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
857         if (rc) {
858                 CERROR("Insane cfg\n");
859                 RETURN(rc);
860         }
861
862         /* We only care about markers */
863         if (lcfg->lcfg_command != LCFG_MARKER)
864                 RETURN(0);
865
866         marker = lustre_cfg_buf(lcfg, 1);
867         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
868             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
869             !(marker->cm_flags & CM_SKIP)) {
870                 /* Found a non-skipped marker match */
871                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
872                        rec->lrh_index, marker->cm_step,
873                        marker->cm_flags, mml->mml_marker.cm_flags,
874                        marker->cm_tgtname, marker->cm_comment);
875                 /* Overwrite the old marker llog entry */
876                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
877                 marker->cm_flags |= mml->mml_marker.cm_flags;
878                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
879                 rc = llog_write(env, llh, rec, rec->lrh_index);
880                 if (!rc)
881                          mml->mml_modified++;
882         }
883
884         RETURN(rc);
885 }
886
887 /**
888  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
889  * Return code:
890  * 0 - modified successfully,
891  * 1 - no modification was done
892  * negative - error
893  */
894 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
895                       struct fs_db *fsdb, struct mgs_target_info *mti,
896                       char *logname, char *devname, char *comment, int flags)
897 {
898         struct llog_handle *loghandle;
899         struct llog_ctxt *ctxt;
900         struct mgs_modify_lookup *mml;
901         int rc;
902
903         ENTRY;
904
905         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
906         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
907                flags);
908
909         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
910         LASSERT(ctxt != NULL);
911         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
912         if (rc < 0) {
913                 if (rc == -ENOENT)
914                         rc = 0;
915                 GOTO(out_pop, rc);
916         }
917
918         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
919         if (rc)
920                 GOTO(out_close, rc);
921
922         if (llog_get_size(loghandle) <= 1)
923                 GOTO(out_close, rc = 0);
924
925         OBD_ALLOC_PTR(mml);
926         if (!mml)
927                 GOTO(out_close, rc = -ENOMEM);
928         if (strlcpy(mml->mml_marker.cm_comment, comment,
929                     sizeof(mml->mml_marker.cm_comment)) >=
930             sizeof(mml->mml_marker.cm_comment))
931                 GOTO(out_free, rc = -E2BIG);
932         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
933                     sizeof(mml->mml_marker.cm_tgtname)) >=
934             sizeof(mml->mml_marker.cm_tgtname))
935                 GOTO(out_free, rc = -E2BIG);
936         /* Modify mostly means cancel */
937         mml->mml_marker.cm_flags = flags;
938         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
939         mml->mml_modified = 0;
940         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
941                           NULL);
942         if (!rc && !mml->mml_modified)
943                 rc = 1;
944
945 out_free:
946         OBD_FREE_PTR(mml);
947
948 out_close:
949         llog_close(env, loghandle);
950 out_pop:
951         if (rc < 0)
952                 CERROR("%s: modify %s/%s failed: rc = %d\n",
953                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
954         llog_ctxt_put(ctxt);
955         RETURN(rc);
956 }
957
958 /** This structure is passed to mgs_replace_handler */
959 struct mgs_replace_uuid_lookup {
960         /* Nids are replaced for this target device */
961         struct mgs_target_info target;
962         /* Temporary modified llog */
963         struct llog_handle *temp_llh;
964         /* Flag is set if in target block*/
965         int in_target_device;
966         /* Nids already added. Just skip (multiple nids) */
967         int device_nids_added;
968         /* Flag is set if this block should not be copied */
969         int skip_it;
970 };
971
972 /**
973  * Check: a) if block should be skipped
974  * b) is it target block
975  *
976  * \param[in] lcfg
977  * \param[in] mrul
978  *
979  * \retval 0 should not to be skipped
980  * \retval 1 should to be skipped
981  */
982 static int check_markers(struct lustre_cfg *lcfg,
983                          struct mgs_replace_uuid_lookup *mrul)
984 {
985          struct cfg_marker *marker;
986
987         /* Track markers. Find given device */
988         if (lcfg->lcfg_command == LCFG_MARKER) {
989                 marker = lustre_cfg_buf(lcfg, 1);
990                 /* Clean llog from records marked as CM_EXCLUDE.
991                    CM_SKIP records are used for "active" command
992                    and can be restored if needed */
993                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
994                     (CM_EXCLUDE | CM_START)) {
995                         mrul->skip_it = 1;
996                         return 1;
997                 }
998
999                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
1000                     (CM_EXCLUDE | CM_END)) {
1001                         mrul->skip_it = 0;
1002                         return 1;
1003                 }
1004
1005                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
1006                         LASSERT(!(marker->cm_flags & CM_START) ||
1007                                 !(marker->cm_flags & CM_END));
1008                         if (marker->cm_flags & CM_START) {
1009                                 mrul->in_target_device = 1;
1010                                 mrul->device_nids_added = 0;
1011                         } else if (marker->cm_flags & CM_END)
1012                                 mrul->in_target_device = 0;
1013                 }
1014         }
1015
1016         return 0;
1017 }
1018
1019 static int record_base(const struct lu_env *env, struct llog_handle *llh,
1020                      char *cfgname, lnet_nid_t nid, int cmd,
1021                      char *s1, char *s2, char *s3, char *s4)
1022 {
1023         struct mgs_thread_info  *mgi = mgs_env_info(env);
1024         struct llog_cfg_rec     *lcr;
1025         int rc;
1026
1027         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
1028                cmd, s1, s2, s3, s4);
1029
1030         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
1031         if (s1)
1032                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
1033         if (s2)
1034                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
1035         if (s3)
1036                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
1037         if (s4)
1038                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
1039
1040         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
1041         if (lcr == NULL)
1042                 return -ENOMEM;
1043
1044         lcr->lcr_cfg.lcfg_nid = nid;
1045         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1046
1047         lustre_cfg_rec_free(lcr);
1048
1049         if (rc < 0)
1050                 CDEBUG(D_MGS,
1051                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
1052                        cfgname, cmd, s1, s2, s3, s4, rc);
1053         return rc;
1054 }
1055
1056 static inline int record_add_uuid(const struct lu_env *env,
1057                                   struct llog_handle *llh,
1058                                   uint64_t nid, char *uuid)
1059 {
1060         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
1061                            NULL, NULL, NULL);
1062 }
1063
1064 static inline int record_add_conn(const struct lu_env *env,
1065                                   struct llog_handle *llh,
1066                                   char *devname, char *uuid)
1067 {
1068         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
1069                            NULL, NULL, NULL);
1070 }
1071
1072 static inline int record_attach(const struct lu_env *env,
1073                                 struct llog_handle *llh, char *devname,
1074                                 char *type, char *uuid)
1075 {
1076         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
1077                            NULL, NULL);
1078 }
1079
1080 static inline int record_setup(const struct lu_env *env,
1081                                struct llog_handle *llh, char *devname,
1082                                char *s1, char *s2, char *s3, char *s4)
1083 {
1084         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
1085 }
1086
1087 /**
1088  * \retval <0 record processing error
1089  * \retval n record is processed. No need copy original one.
1090  * \retval 0 record is not processed.
1091  */
1092 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
1093                            struct mgs_replace_uuid_lookup *mrul)
1094 {
1095         int nids_added = 0;
1096         lnet_nid_t nid;
1097         char *ptr;
1098         int rc;
1099
1100         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1101                 /* LCFG_ADD_UUID command found. Let's skip original command
1102                    and add passed nids */
1103                 ptr = mrul->target.mti_params;
1104                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1105                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
1106                                "device %s\n", libcfs_nid2str(nid),
1107                                 mrul->target.mti_params,
1108                                 mrul->target.mti_svname);
1109                         rc = record_add_uuid(env,
1110                                              mrul->temp_llh, nid,
1111                                              mrul->target.mti_params);
1112                         if (!rc)
1113                                 nids_added++;
1114                 }
1115
1116                 if (nids_added == 0) {
1117                         CERROR("No new nids were added, nid %s with uuid %s, "
1118                                "device %s\n", libcfs_nid2str(nid),
1119                                mrul->target.mti_params,
1120                                mrul->target.mti_svname);
1121                         RETURN(-ENXIO);
1122                 } else {
1123                         mrul->device_nids_added = 1;
1124                 }
1125
1126                 return nids_added;
1127         }
1128
1129         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
1130                 /* LCFG_SETUP command found. UUID should be changed */
1131                 rc = record_setup(env,
1132                                   mrul->temp_llh,
1133                                   /* devname the same */
1134                                   lustre_cfg_string(lcfg, 0),
1135                                   /* s1 is not changed */
1136                                   lustre_cfg_string(lcfg, 1),
1137                                   /* new uuid should be
1138                                   the full nidlist */
1139                                   mrul->target.mti_params,
1140                                   /* s3 is not changed */
1141                                   lustre_cfg_string(lcfg, 3),
1142                                   /* s4 is not changed */
1143                                   lustre_cfg_string(lcfg, 4));
1144                 return rc ? rc : 1;
1145         }
1146
1147         /* Another commands in target device block */
1148         return 0;
1149 }
1150
1151 /**
1152  * Handler that called for every record in llog.
1153  * Records are processed in order they placed in llog.
1154  *
1155  * \param[in] llh       log to be processed
1156  * \param[in] rec       current record
1157  * \param[in] data      mgs_replace_uuid_lookup structure
1158  *
1159  * \retval 0    success
1160  */
1161 static int mgs_replace_handler(const struct lu_env *env,
1162                                struct llog_handle *llh,
1163                                struct llog_rec_hdr *rec,
1164                                void *data)
1165 {
1166         struct mgs_replace_uuid_lookup *mrul;
1167         struct lustre_cfg *lcfg = REC_DATA(rec);
1168         int cfg_len = REC_DATA_LEN(rec);
1169         int rc;
1170         ENTRY;
1171
1172         mrul = (struct mgs_replace_uuid_lookup *)data;
1173
1174         if (rec->lrh_type != OBD_CFG_REC) {
1175                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
1176                        rec->lrh_type, lcfg->lcfg_command,
1177                        lustre_cfg_string(lcfg, 0),
1178                        lustre_cfg_string(lcfg, 1));
1179                 RETURN(-EINVAL);
1180         }
1181
1182         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
1183         if (rc) {
1184                 /* Do not copy any invalidated records */
1185                 GOTO(skip_out, rc = 0);
1186         }
1187
1188         rc = check_markers(lcfg, mrul);
1189         if (rc || mrul->skip_it)
1190                 GOTO(skip_out, rc = 0);
1191
1192         /* Write to new log all commands outside target device block */
1193         if (!mrul->in_target_device)
1194                 GOTO(copy_out, rc = 0);
1195
1196         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
1197            (failover nids) for this target, assuming that if then
1198            primary is changing then so is the failover */
1199         if (mrul->device_nids_added &&
1200             (lcfg->lcfg_command == LCFG_ADD_UUID ||
1201              lcfg->lcfg_command == LCFG_ADD_CONN))
1202                 GOTO(skip_out, rc = 0);
1203
1204         rc = process_command(env, lcfg, mrul);
1205         if (rc < 0)
1206                 RETURN(rc);
1207
1208         if (rc)
1209                 RETURN(0);
1210 copy_out:
1211         /* Record is placed in temporary llog as is */
1212         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
1213
1214         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1215                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1216                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1217         RETURN(rc);
1218
1219 skip_out:
1220         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1221                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1222                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1223         RETURN(rc);
1224 }
1225
1226 static int mgs_log_is_empty(const struct lu_env *env,
1227                             struct mgs_device *mgs, char *name)
1228 {
1229         struct llog_ctxt        *ctxt;
1230         int                      rc;
1231
1232         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1233         LASSERT(ctxt != NULL);
1234
1235         rc = llog_is_empty(env, ctxt, name);
1236         llog_ctxt_put(ctxt);
1237         return rc;
1238 }
1239
1240 static int mgs_replace_nids_log(const struct lu_env *env,
1241                                 struct obd_device *mgs, struct fs_db *fsdb,
1242                                 char *logname, char *devname, char *nids)
1243 {
1244         struct llog_handle *orig_llh, *backup_llh;
1245         struct llog_ctxt *ctxt;
1246         struct mgs_replace_uuid_lookup *mrul;
1247         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1248         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1249         char *backup;
1250         int rc, rc2;
1251         ENTRY;
1252
1253         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1254
1255         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1256         LASSERT(ctxt != NULL);
1257
1258         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1259                 /* Log is empty. Nothing to replace */
1260                 GOTO(out_put, rc = 0);
1261         }
1262
1263         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1264         if (backup == NULL)
1265                 GOTO(out_put, rc = -ENOMEM);
1266
1267         sprintf(backup, "%s.bak", logname);
1268
1269         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1270         if (rc == 0) {
1271                 /* Now erase original log file. Connections are not allowed.
1272                    Backup is already saved */
1273                 rc = llog_erase(env, ctxt, NULL, logname);
1274                 if (rc < 0)
1275                         GOTO(out_free, rc);
1276         } else if (rc != -ENOENT) {
1277                 CERROR("%s: can't make backup for %s: rc = %d\n",
1278                        mgs->obd_name, logname, rc);
1279                 GOTO(out_free,rc);
1280         }
1281
1282         /* open local log */
1283         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1284         if (rc)
1285                 GOTO(out_restore, rc);
1286
1287         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1288         if (rc)
1289                 GOTO(out_closel, rc);
1290
1291         /* open backup llog */
1292         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1293                        LLOG_OPEN_EXISTS);
1294         if (rc)
1295                 GOTO(out_closel, rc);
1296
1297         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1298         if (rc)
1299                 GOTO(out_close, rc);
1300
1301         if (llog_get_size(backup_llh) <= 1)
1302                 GOTO(out_close, rc = 0);
1303
1304         OBD_ALLOC_PTR(mrul);
1305         if (!mrul)
1306                 GOTO(out_close, rc = -ENOMEM);
1307         /* devname is only needed information to replace UUID records */
1308         strlcpy(mrul->target.mti_svname, devname,
1309                 sizeof(mrul->target.mti_svname));
1310         /* parse nids later */
1311         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1312         /* Copy records to this temporary llog */
1313         mrul->temp_llh = orig_llh;
1314
1315         rc = llog_process(env, backup_llh, mgs_replace_handler,
1316                           (void *)mrul, NULL);
1317         OBD_FREE_PTR(mrul);
1318 out_close:
1319         rc2 = llog_close(NULL, backup_llh);
1320         if (!rc)
1321                 rc = rc2;
1322 out_closel:
1323         rc2 = llog_close(NULL, orig_llh);
1324         if (!rc)
1325                 rc = rc2;
1326
1327 out_restore:
1328         if (rc) {
1329                 CERROR("%s: llog should be restored: rc = %d\n",
1330                        mgs->obd_name, rc);
1331                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1332                                   logname);
1333                 if (rc2 < 0)
1334                         CERROR("%s: can't restore backup %s: rc = %d\n",
1335                                mgs->obd_name, logname, rc2);
1336         }
1337
1338 out_free:
1339         OBD_FREE(backup, strlen(backup) + 1);
1340
1341 out_put:
1342         llog_ctxt_put(ctxt);
1343
1344         if (rc)
1345                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1346                        mgs->obd_name, logname, rc);
1347
1348         RETURN(rc);
1349 }
1350
1351 /**
1352  * Parse device name and get file system name and/or device index
1353  *
1354  * @devname     device name (ex. lustre-MDT0000)
1355  * @fsname      file system name extracted from @devname and returned
1356  *              to the caller (optional)
1357  * @index       device index extracted from @devname and returned to
1358  *              the caller (optional)
1359  *
1360  * RETURN       0                       success if we are only interested in
1361  *                                      extracting fsname from devname.
1362  *                                      i.e index is NULL
1363  *
1364  *              LDD_F_SV_TYPE_*         Besides extracting the fsname the
1365  *                                      user also wants the index. Report to
1366  *                                      the user the type of obd device the
1367  *                                      returned index belongs too.
1368  *
1369  *              -EINVAL                 The obd device name is improper so
1370  *                                      fsname could not be extracted.
1371  *
1372  *              -ENXIO                  Failed to extract the index out of
1373  *                                      the obd device name. Most likely an
1374  *                                      invalid obd device name
1375  */
1376 static int mgs_parse_devname(char *devname, char *fsname, u32 *index)
1377 {
1378         int rc = 0;
1379         ENTRY;
1380
1381         /* Extract fsname */
1382         if (fsname) {
1383                 rc = server_name2fsname(devname, fsname, NULL);
1384                 if (rc < 0) {
1385                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1386                                devname);
1387                         RETURN(-EINVAL);
1388                 }
1389         }
1390
1391         if (index) {
1392                 rc = server_name2index(devname, index, NULL);
1393                 if (rc < 0) {
1394                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1395                                devname);
1396                         RETURN(-ENXIO);
1397                 }
1398         }
1399
1400         /* server_name2index can return LDD_F_SV_TYPE_* so always return rc */
1401         RETURN(rc);
1402 }
1403
1404 /* This is only called during replace_nids */
1405 static int only_mgs_is_running(struct obd_device *mgs_obd)
1406 {
1407         /* TDB: Is global variable with devices count exists? */
1408         int num_devices = get_devices_count();
1409         int num_exports = 0;
1410         struct obd_export *exp;
1411
1412         spin_lock(&mgs_obd->obd_dev_lock);
1413         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1414                 /* skip self export */
1415                 if (exp == mgs_obd->obd_self_export)
1416                         continue;
1417                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1418                         continue;
1419
1420                 ++num_exports;
1421
1422                 CERROR("%s: node %s still connected during replace_nids "
1423                        "connect_flags:%llx\n",
1424                        mgs_obd->obd_name,
1425                        libcfs_nid2str(exp->exp_nid_stats->nid),
1426                        exp_connect_flags(exp));
1427
1428         }
1429         spin_unlock(&mgs_obd->obd_dev_lock);
1430
1431         /* osd, MGS and MGC + self_export
1432            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1433         return (num_devices <= 3) && (num_exports == 0);
1434 }
1435
1436 static int name_create_mdt(char **logname, char *fsname, int i)
1437 {
1438         char mdt_index[9];
1439
1440         sprintf(mdt_index, "-MDT%04x", i);
1441         return name_create(logname, fsname, mdt_index);
1442 }
1443
1444 /**
1445  * Replace nids for \a device to \a nids values
1446  *
1447  * \param obd           MGS obd device
1448  * \param devname       nids need to be replaced for this device
1449  * (ex. lustre-OST0000)
1450  * \param nids          nids list (ex. nid1,nid2,nid3)
1451  *
1452  * \retval 0    success
1453  */
1454 int mgs_replace_nids(const struct lu_env *env,
1455                      struct mgs_device *mgs,
1456                      char *devname, char *nids)
1457 {
1458         /* Assume fsname is part of device name */
1459         char fsname[MTI_NAME_MAXLEN];
1460         int rc;
1461         __u32 index;
1462         char *logname;
1463         struct fs_db *fsdb = NULL;
1464         unsigned int i;
1465         int conn_state;
1466         struct obd_device *mgs_obd = mgs->mgs_obd;
1467         ENTRY;
1468
1469         /* We can only change NIDs if no other nodes are connected */
1470         spin_lock(&mgs_obd->obd_dev_lock);
1471         conn_state = mgs_obd->obd_no_conn;
1472         mgs_obd->obd_no_conn = 1;
1473         spin_unlock(&mgs_obd->obd_dev_lock);
1474
1475         /* We can not change nids if not only MGS is started */
1476         if (!only_mgs_is_running(mgs_obd)) {
1477                 CERROR("Only MGS is allowed to be started\n");
1478                 GOTO(out, rc = -EINPROGRESS);
1479         }
1480
1481         /* Get fsname and index */
1482         rc = mgs_parse_devname(devname, fsname, &index);
1483         if (rc < 0)
1484                 GOTO(out, rc);
1485
1486         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1487         if (rc) {
1488                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1489                 GOTO(out, rc);
1490         }
1491
1492         /* Process client llogs */
1493         name_create(&logname, fsname, "-client");
1494         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1495         name_destroy(&logname);
1496         if (rc) {
1497                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1498                        fsname, devname, rc);
1499                 GOTO(out, rc);
1500         }
1501
1502         /* Process MDT llogs */
1503         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1504                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1505                         continue;
1506                 name_create_mdt(&logname, fsname, i);
1507                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1508                 name_destroy(&logname);
1509                 if (rc)
1510                         GOTO(out, rc);
1511         }
1512
1513 out:
1514         spin_lock(&mgs_obd->obd_dev_lock);
1515         mgs_obd->obd_no_conn = conn_state;
1516         spin_unlock(&mgs_obd->obd_dev_lock);
1517
1518         if (fsdb)
1519                 mgs_put_fsdb(mgs, fsdb);
1520
1521         RETURN(rc);
1522 }
1523
1524 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1525                             char *devname, struct lov_desc *desc)
1526 {
1527         struct mgs_thread_info  *mgi = mgs_env_info(env);
1528         struct llog_cfg_rec     *lcr;
1529         int rc;
1530
1531         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1532         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1533         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1534         if (lcr == NULL)
1535                 return -ENOMEM;
1536
1537         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1538         lustre_cfg_rec_free(lcr);
1539         return rc;
1540 }
1541
1542 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1543                             char *devname, struct lmv_desc *desc)
1544 {
1545         struct mgs_thread_info  *mgi = mgs_env_info(env);
1546         struct llog_cfg_rec     *lcr;
1547         int rc;
1548
1549         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1550         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1551         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1552         if (lcr == NULL)
1553                 return -ENOMEM;
1554
1555         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1556         lustre_cfg_rec_free(lcr);
1557         return rc;
1558 }
1559
1560 static inline int record_mdc_add(const struct lu_env *env,
1561                                  struct llog_handle *llh,
1562                                  char *logname, char *mdcuuid,
1563                                  char *mdtuuid, char *index,
1564                                  char *gen)
1565 {
1566         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1567                            mdtuuid,index,gen,mdcuuid);
1568 }
1569
1570 static inline int record_lov_add(const struct lu_env *env,
1571                                  struct llog_handle *llh,
1572                                  char *lov_name, char *ost_uuid,
1573                                  char *index, char *gen)
1574 {
1575         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1576                            ost_uuid, index, gen, NULL);
1577 }
1578
1579 static inline int record_mount_opt(const struct lu_env *env,
1580                                    struct llog_handle *llh,
1581                                    char *profile, char *lov_name,
1582                                    char *mdc_name)
1583 {
1584         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1585                            profile, lov_name, mdc_name, NULL);
1586 }
1587
1588 static int record_marker(const struct lu_env *env,
1589                          struct llog_handle *llh,
1590                          struct fs_db *fsdb, __u32 flags,
1591                          char *tgtname, char *comment)
1592 {
1593         struct mgs_thread_info *mgi = mgs_env_info(env);
1594         struct llog_cfg_rec *lcr;
1595         int rc;
1596         int cplen = 0;
1597
1598         if (flags & CM_START)
1599                 fsdb->fsdb_gen++;
1600         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1601         mgi->mgi_marker.cm_flags = flags;
1602         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1603         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1604                         sizeof(mgi->mgi_marker.cm_tgtname));
1605         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1606                 return -E2BIG;
1607         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1608                         sizeof(mgi->mgi_marker.cm_comment));
1609         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1610                 return -E2BIG;
1611         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1612         mgi->mgi_marker.cm_canceltime = 0;
1613         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1614         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1615                             sizeof(mgi->mgi_marker));
1616         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1617         if (lcr == NULL)
1618                 return -ENOMEM;
1619
1620         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1621         lustre_cfg_rec_free(lcr);
1622         return rc;
1623 }
1624
1625 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1626                             struct llog_handle **llh, char *name)
1627 {
1628         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1629         struct llog_ctxt        *ctxt;
1630         int                      rc = 0;
1631         ENTRY;
1632
1633         if (*llh)
1634                 GOTO(out, rc = -EBUSY);
1635
1636         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1637         if (!ctxt)
1638                 GOTO(out, rc = -ENODEV);
1639         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1640
1641         rc = llog_open_create(env, ctxt, llh, NULL, name);
1642         if (rc)
1643                 GOTO(out_ctxt, rc);
1644         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1645         if (rc)
1646                 llog_close(env, *llh);
1647 out_ctxt:
1648         llog_ctxt_put(ctxt);
1649 out:
1650         if (rc) {
1651                 CERROR("%s: can't start log %s: rc = %d\n",
1652                        mgs->mgs_obd->obd_name, name, rc);
1653                 *llh = NULL;
1654         }
1655         RETURN(rc);
1656 }
1657
1658 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1659 {
1660         int rc;
1661
1662         rc = llog_close(env, *llh);
1663         *llh = NULL;
1664
1665         return rc;
1666 }
1667
1668 /******************** config "macros" *********************/
1669
1670 /* write an lcfg directly into a log (with markers) */
1671 static int mgs_write_log_direct(const struct lu_env *env,
1672                                 struct mgs_device *mgs, struct fs_db *fsdb,
1673                                 char *logname, struct llog_cfg_rec *lcr,
1674                                 char *devname, char *comment)
1675 {
1676         struct llog_handle *llh = NULL;
1677         int rc;
1678
1679         ENTRY;
1680
1681         rc = record_start_log(env, mgs, &llh, logname);
1682         if (rc)
1683                 RETURN(rc);
1684
1685         /* FIXME These should be a single journal transaction */
1686         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1687         if (rc)
1688                 GOTO(out_end, rc);
1689         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1690         if (rc)
1691                 GOTO(out_end, rc);
1692         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1693         if (rc)
1694                 GOTO(out_end, rc);
1695 out_end:
1696         record_end_log(env, &llh);
1697         RETURN(rc);
1698 }
1699
1700 /* write the lcfg in all logs for the given fs */
1701 static int mgs_write_log_direct_all(const struct lu_env *env,
1702                                     struct mgs_device *mgs,
1703                                     struct fs_db *fsdb,
1704                                     struct mgs_target_info *mti,
1705                                     struct llog_cfg_rec *lcr, char *devname,
1706                                     char *comment, int server_only)
1707 {
1708         struct list_head         log_list;
1709         struct mgs_direntry     *dirent, *n;
1710         char                    *fsname = mti->mti_fsname;
1711         int                      rc = 0, len = strlen(fsname);
1712
1713         ENTRY;
1714         /* Find all the logs in the CONFIGS directory */
1715         rc = class_dentry_readdir(env, mgs, &log_list);
1716         if (rc)
1717                 RETURN(rc);
1718
1719         /* Could use fsdb index maps instead of directory listing */
1720         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1721                 list_del_init(&dirent->mde_list);
1722                 /* don't write to sptlrpc rule log */
1723                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1724                         goto next;
1725
1726                 /* caller wants write server logs only */
1727                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1728                         goto next;
1729
1730                 if (strlen(dirent->mde_name) <= len ||
1731                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1732                     dirent->mde_name[len] != '-')
1733                         goto next;
1734
1735                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1736                 /* Erase any old settings of this same parameter */
1737                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1738                                 devname, comment, CM_SKIP);
1739                 if (rc < 0)
1740                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1741                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1742                 if (lcr == NULL)
1743                         goto next;
1744                 /* Write the new one */
1745                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1746                                           lcr, devname, comment);
1747                 if (rc != 0)
1748                         CERROR("%s: writing log %s: rc = %d\n",
1749                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1750 next:
1751                 mgs_direntry_free(dirent);
1752         }
1753
1754         RETURN(rc);
1755 }
1756
1757 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1758                                     struct mgs_device *mgs,
1759                                     struct fs_db *fsdb,
1760                                     struct mgs_target_info *mti,
1761                                     int index, char *logname);
1762 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1763                                     struct mgs_device *mgs,
1764                                     struct fs_db *fsdb,
1765                                     struct mgs_target_info *mti,
1766                                     char *logname, char *suffix, char *lovname,
1767                                     enum lustre_sec_part sec_part, int flags);
1768 static int name_create_mdt_and_lov(char **logname, char **lovname,
1769                                    struct fs_db *fsdb, int i);
1770
1771 static int add_param(char *params, char *key, char *val)
1772 {
1773         char *start = params + strlen(params);
1774         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1775         int keylen = 0;
1776
1777         if (key != NULL)
1778                 keylen = strlen(key);
1779         if (start + 1 + keylen + strlen(val) >= end) {
1780                 CERROR("params are too long: %s %s%s\n",
1781                        params, key != NULL ? key : "", val);
1782                 return -EINVAL;
1783         }
1784
1785         sprintf(start, " %s%s", key != NULL ? key : "", val);
1786         return 0;
1787 }
1788
1789 /**
1790  * Walk through client config log record and convert the related records
1791  * into the target.
1792  **/
1793 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1794                                          struct llog_handle *llh,
1795                                          struct llog_rec_hdr *rec, void *data)
1796 {
1797         struct mgs_device *mgs;
1798         struct obd_device *obd;
1799         struct mgs_target_info *mti, *tmti;
1800         struct fs_db *fsdb;
1801         int cfg_len = rec->lrh_len;
1802         char *cfg_buf = (char*) (rec + 1);
1803         struct lustre_cfg *lcfg;
1804         int rc = 0;
1805         struct llog_handle *mdt_llh = NULL;
1806         static int got_an_osc_or_mdc = 0;
1807         /* 0: not found any osc/mdc;
1808            1: found osc;
1809            2: found mdc;
1810         */
1811         static int last_step = -1;
1812         int cplen = 0;
1813
1814         ENTRY;
1815
1816         mti = ((struct temp_comp*)data)->comp_mti;
1817         tmti = ((struct temp_comp*)data)->comp_tmti;
1818         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1819         obd = ((struct temp_comp *)data)->comp_obd;
1820         mgs = lu2mgs_dev(obd->obd_lu_dev);
1821         LASSERT(mgs);
1822
1823         if (rec->lrh_type != OBD_CFG_REC) {
1824                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1825                 RETURN(-EINVAL);
1826         }
1827
1828         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1829         if (rc) {
1830                 CERROR("Insane cfg\n");
1831                 RETURN(rc);
1832         }
1833
1834         lcfg = (struct lustre_cfg *)cfg_buf;
1835
1836         if (lcfg->lcfg_command == LCFG_MARKER) {
1837                 struct cfg_marker *marker;
1838                 marker = lustre_cfg_buf(lcfg, 1);
1839                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1840                     (marker->cm_flags & CM_START) &&
1841                      !(marker->cm_flags & CM_SKIP)) {
1842                         got_an_osc_or_mdc = 1;
1843                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1844                                         sizeof(tmti->mti_svname));
1845                         if (cplen >= sizeof(tmti->mti_svname))
1846                                 RETURN(-E2BIG);
1847                         rc = record_start_log(env, mgs, &mdt_llh,
1848                                               mti->mti_svname);
1849                         if (rc)
1850                                 RETURN(rc);
1851                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1852                                            mti->mti_svname, "add osc(copied)");
1853                         record_end_log(env, &mdt_llh);
1854                         last_step = marker->cm_step;
1855                         RETURN(rc);
1856                 }
1857                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1858                     (marker->cm_flags & CM_END) &&
1859                      !(marker->cm_flags & CM_SKIP)) {
1860                         LASSERT(last_step == marker->cm_step);
1861                         last_step = -1;
1862                         got_an_osc_or_mdc = 0;
1863                         memset(tmti, 0, sizeof(*tmti));
1864                         rc = record_start_log(env, mgs, &mdt_llh,
1865                                               mti->mti_svname);
1866                         if (rc)
1867                                 RETURN(rc);
1868                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1869                                            mti->mti_svname, "add osc(copied)");
1870                         record_end_log(env, &mdt_llh);
1871                         RETURN(rc);
1872                 }
1873                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1874                     (marker->cm_flags & CM_START) &&
1875                      !(marker->cm_flags & CM_SKIP)) {
1876                         got_an_osc_or_mdc = 2;
1877                         last_step = marker->cm_step;
1878                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1879                                strlen(marker->cm_tgtname));
1880
1881                         RETURN(rc);
1882                 }
1883                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1884                     (marker->cm_flags & CM_END) &&
1885                      !(marker->cm_flags & CM_SKIP)) {
1886                         LASSERT(last_step == marker->cm_step);
1887                         last_step = -1;
1888                         got_an_osc_or_mdc = 0;
1889                         memset(tmti, 0, sizeof(*tmti));
1890                         RETURN(rc);
1891                 }
1892         }
1893
1894         if (got_an_osc_or_mdc == 0 || last_step < 0)
1895                 RETURN(rc);
1896
1897         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1898                 __u64 nodenid = lcfg->lcfg_nid;
1899
1900                 if (strlen(tmti->mti_uuid) == 0) {
1901                         /* target uuid not set, this config record is before
1902                          * LCFG_SETUP, this nid is one of target node nid.
1903                          */
1904                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1905                         tmti->mti_nid_count++;
1906                 } else {
1907                         char nidstr[LNET_NIDSTR_SIZE];
1908
1909                         /* failover node nid */
1910                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1911                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1912                                         nidstr);
1913                 }
1914
1915                 RETURN(rc);
1916         }
1917
1918         if (lcfg->lcfg_command == LCFG_SETUP) {
1919                 char *target;
1920
1921                 target = lustre_cfg_string(lcfg, 1);
1922                 memcpy(tmti->mti_uuid, target, strlen(target));
1923                 RETURN(rc);
1924         }
1925
1926         /* ignore client side sptlrpc_conf_log */
1927         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1928                 RETURN(rc);
1929
1930         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1931                 int index;
1932
1933                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1934                         RETURN (-EINVAL);
1935
1936                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1937                        strlen(mti->mti_fsname));
1938                 tmti->mti_stripe_index = index;
1939
1940                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1941                                               mti->mti_stripe_index,
1942                                               mti->mti_svname);
1943                 memset(tmti, 0, sizeof(*tmti));
1944                 RETURN(rc);
1945         }
1946
1947         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1948                 int index;
1949                 char mdt_index[9];
1950                 char *logname, *lovname;
1951
1952                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1953                                              mti->mti_stripe_index);
1954                 if (rc)
1955                         RETURN(rc);
1956                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1957
1958                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1959                         name_destroy(&logname);
1960                         name_destroy(&lovname);
1961                         RETURN(-EINVAL);
1962                 }
1963
1964                 tmti->mti_stripe_index = index;
1965                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1966                                          mdt_index, lovname,
1967                                          LUSTRE_SP_MDT, 0);
1968                 name_destroy(&logname);
1969                 name_destroy(&lovname);
1970                 RETURN(rc);
1971         }
1972         RETURN(rc);
1973 }
1974
1975 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1976 /* stealed from mgs_get_fsdb_from_llog*/
1977 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1978                                               struct mgs_device *mgs,
1979                                               char *client_name,
1980                                               struct temp_comp* comp)
1981 {
1982         struct llog_handle *loghandle;
1983         struct mgs_target_info *tmti;
1984         struct llog_ctxt *ctxt;
1985         int rc;
1986
1987         ENTRY;
1988
1989         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1990         LASSERT(ctxt != NULL);
1991
1992         OBD_ALLOC_PTR(tmti);
1993         if (tmti == NULL)
1994                 GOTO(out_ctxt, rc = -ENOMEM);
1995
1996         comp->comp_tmti = tmti;
1997         comp->comp_obd = mgs->mgs_obd;
1998
1999         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
2000                        LLOG_OPEN_EXISTS);
2001         if (rc < 0) {
2002                 if (rc == -ENOENT)
2003                         rc = 0;
2004                 GOTO(out_pop, rc);
2005         }
2006
2007         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
2008         if (rc)
2009                 GOTO(out_close, rc);
2010
2011         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
2012                                   (void *)comp, NULL, false);
2013         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
2014 out_close:
2015         llog_close(env, loghandle);
2016 out_pop:
2017         OBD_FREE_PTR(tmti);
2018 out_ctxt:
2019         llog_ctxt_put(ctxt);
2020         RETURN(rc);
2021 }
2022
2023 /* lmv is the second thing for client logs */
2024 /* copied from mgs_write_log_lov. Please refer to that.  */
2025 static int mgs_write_log_lmv(const struct lu_env *env,
2026                              struct mgs_device *mgs,
2027                              struct fs_db *fsdb,
2028                              struct mgs_target_info *mti,
2029                              char *logname, char *lmvname)
2030 {
2031         struct llog_handle *llh = NULL;
2032         struct lmv_desc *lmvdesc;
2033         char *uuid;
2034         int rc = 0;
2035         ENTRY;
2036
2037         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
2038
2039         OBD_ALLOC_PTR(lmvdesc);
2040         if (lmvdesc == NULL)
2041                 RETURN(-ENOMEM);
2042         lmvdesc->ld_active_tgt_count = 0;
2043         lmvdesc->ld_tgt_count = 0;
2044         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
2045         uuid = (char *)lmvdesc->ld_uuid.uuid;
2046
2047         rc = record_start_log(env, mgs, &llh, logname);
2048         if (rc)
2049                 GOTO(out_free, rc);
2050         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
2051         if (rc)
2052                 GOTO(out_end, rc);
2053         rc = record_attach(env, llh, lmvname, "lmv", uuid);
2054         if (rc)
2055                 GOTO(out_end, rc);
2056         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
2057         if (rc)
2058                 GOTO(out_end, rc);
2059         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
2060         if (rc)
2061                 GOTO(out_end, rc);
2062 out_end:
2063         record_end_log(env, &llh);
2064 out_free:
2065         OBD_FREE_PTR(lmvdesc);
2066         RETURN(rc);
2067 }
2068
2069 /* lov is the first thing in the mdt and client logs */
2070 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
2071                              struct fs_db *fsdb, struct mgs_target_info *mti,
2072                              char *logname, char *lovname)
2073 {
2074         struct llog_handle *llh = NULL;
2075         struct lov_desc *lovdesc;
2076         char *uuid;
2077         int rc = 0;
2078         ENTRY;
2079
2080         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
2081
2082         /*
2083         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
2084         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
2085               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
2086         */
2087
2088         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
2089         OBD_ALLOC_PTR(lovdesc);
2090         if (lovdesc == NULL)
2091                 RETURN(-ENOMEM);
2092         lovdesc->ld_magic = LOV_DESC_MAGIC;
2093         lovdesc->ld_tgt_count = 0;
2094         /* Defaults.  Can be changed later by lcfg config_param */
2095         lovdesc->ld_default_stripe_count = 1;
2096         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
2097         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
2098         lovdesc->ld_default_stripe_offset = -1;
2099         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
2100         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
2101         /* can these be the same? */
2102         uuid = (char *)lovdesc->ld_uuid.uuid;
2103
2104         /* This should always be the first entry in a log.
2105         rc = mgs_clear_log(obd, logname); */
2106         rc = record_start_log(env, mgs, &llh, logname);
2107         if (rc)
2108                 GOTO(out_free, rc);
2109         /* FIXME these should be a single journal transaction */
2110         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
2111         if (rc)
2112                 GOTO(out_end, rc);
2113         rc = record_attach(env, llh, lovname, "lov", uuid);
2114         if (rc)
2115                 GOTO(out_end, rc);
2116         rc = record_lov_setup(env, llh, lovname, lovdesc);
2117         if (rc)
2118                 GOTO(out_end, rc);
2119         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
2120         if (rc)
2121                 GOTO(out_end, rc);
2122         EXIT;
2123 out_end:
2124         record_end_log(env, &llh);
2125 out_free:
2126         OBD_FREE_PTR(lovdesc);
2127         return rc;
2128 }
2129
2130 /* add failnids to open log */
2131 static int mgs_write_log_failnids(const struct lu_env *env,
2132                                   struct mgs_target_info *mti,
2133                                   struct llog_handle *llh,
2134                                   char *cliname)
2135 {
2136         char *failnodeuuid = NULL;
2137         char *ptr = mti->mti_params;
2138         lnet_nid_t nid;
2139         int rc = 0;
2140
2141         /*
2142         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
2143         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
2144         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
2145         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
2146         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
2147         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
2148         */
2149
2150         /*
2151          * Pull failnid info out of params string, which may contain something
2152          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
2153          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
2154          * etc.  However, convert_hostnames() should have caught those.
2155          */
2156         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
2157                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
2158                         char nidstr[LNET_NIDSTR_SIZE];
2159
2160                         if (failnodeuuid == NULL) {
2161                                 /* We don't know the failover node name,
2162                                  * so just use the first nid as the uuid */
2163                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
2164                                 rc = name_create(&failnodeuuid, nidstr, "");
2165                                 if (rc != 0)
2166                                         return rc;
2167                         }
2168                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
2169                                 "client %s\n",
2170                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
2171                                 failnodeuuid, cliname);
2172                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
2173                         /*
2174                          * If *ptr is ':', we have added all NIDs for
2175                          * failnodeuuid.
2176                          */
2177                         if (*ptr == ':') {
2178                                 rc = record_add_conn(env, llh, cliname,
2179                                                      failnodeuuid);
2180                                 name_destroy(&failnodeuuid);
2181                                 failnodeuuid = NULL;
2182                         }
2183                 }
2184                 if (failnodeuuid) {
2185                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
2186                         name_destroy(&failnodeuuid);
2187                         failnodeuuid = NULL;
2188                 }
2189         }
2190
2191         return rc;
2192 }
2193
2194 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
2195                                     struct mgs_device *mgs,
2196                                     struct fs_db *fsdb,
2197                                     struct mgs_target_info *mti,
2198                                     char *logname, char *lmvname)
2199 {
2200         struct llog_handle *llh = NULL;
2201         char *mdcname = NULL;
2202         char *nodeuuid = NULL;
2203         char *mdcuuid = NULL;
2204         char *lmvuuid = NULL;
2205         char index[6];
2206         char nidstr[LNET_NIDSTR_SIZE];
2207         int i, rc;
2208         ENTRY;
2209
2210         if (mgs_log_is_empty(env, mgs, logname)) {
2211                 CERROR("log is empty! Logical error\n");
2212                 RETURN(-EINVAL);
2213         }
2214
2215         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
2216                mti->mti_svname, logname, lmvname);
2217
2218         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2219         rc = name_create(&nodeuuid, nidstr, "");
2220         if (rc)
2221                 RETURN(rc);
2222         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
2223         if (rc)
2224                 GOTO(out_free, rc);
2225         rc = name_create(&mdcuuid, mdcname, "_UUID");
2226         if (rc)
2227                 GOTO(out_free, rc);
2228         rc = name_create(&lmvuuid, lmvname, "_UUID");
2229         if (rc)
2230                 GOTO(out_free, rc);
2231
2232         rc = record_start_log(env, mgs, &llh, logname);
2233         if (rc)
2234                 GOTO(out_free, rc);
2235         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2236                            "add mdc");
2237         if (rc)
2238                 GOTO(out_end, rc);
2239         for (i = 0; i < mti->mti_nid_count; i++) {
2240                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2241                         libcfs_nid2str_r(mti->mti_nids[i],
2242                                          nidstr, sizeof(nidstr)));
2243
2244                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2245                 if (rc)
2246                         GOTO(out_end, rc);
2247         }
2248
2249         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
2250         if (rc)
2251                 GOTO(out_end, rc);
2252         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2253                           NULL, NULL);
2254         if (rc)
2255                 GOTO(out_end, rc);
2256         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2257         if (rc)
2258                 GOTO(out_end, rc);
2259         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2260         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2261                             index, "1");
2262         if (rc)
2263                 GOTO(out_end, rc);
2264         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2265                            "add mdc");
2266         if (rc)
2267                 GOTO(out_end, rc);
2268 out_end:
2269         record_end_log(env, &llh);
2270 out_free:
2271         name_destroy(&lmvuuid);
2272         name_destroy(&mdcuuid);
2273         name_destroy(&mdcname);
2274         name_destroy(&nodeuuid);
2275         RETURN(rc);
2276 }
2277
2278 static inline int name_create_lov(char **lovname, char *mdtname,
2279                                   struct fs_db *fsdb, int index)
2280 {
2281         /* COMPAT_180 */
2282         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2283                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2284         else
2285                 return name_create(lovname, mdtname, "-mdtlov");
2286 }
2287
2288 static int name_create_mdt_and_lov(char **logname, char **lovname,
2289                                    struct fs_db *fsdb, int i)
2290 {
2291         int rc;
2292
2293         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2294         if (rc)
2295                 return rc;
2296         /* COMPAT_180 */
2297         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2298                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2299         else
2300                 rc = name_create(lovname, *logname, "-mdtlov");
2301         if (rc) {
2302                 name_destroy(logname);
2303                 *logname = NULL;
2304         }
2305         return rc;
2306 }
2307
2308 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2309                                       struct fs_db *fsdb, int i)
2310 {
2311         char suffix[16];
2312
2313         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2314                 sprintf(suffix, "-osc");
2315         else
2316                 sprintf(suffix, "-osc-MDT%04x", i);
2317         return name_create(oscname, ostname, suffix);
2318 }
2319
2320 /* add new mdc to already existent MDS */
2321 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2322                                     struct mgs_device *mgs,
2323                                     struct fs_db *fsdb,
2324                                     struct mgs_target_info *mti,
2325                                     int mdt_index, char *logname)
2326 {
2327         struct llog_handle      *llh = NULL;
2328         char    *nodeuuid = NULL;
2329         char    *ospname = NULL;
2330         char    *lovuuid = NULL;
2331         char    *mdtuuid = NULL;
2332         char    *svname = NULL;
2333         char    *mdtname = NULL;
2334         char    *lovname = NULL;
2335         char    index_str[16];
2336         char    nidstr[LNET_NIDSTR_SIZE];
2337         int     i, rc;
2338
2339         ENTRY;
2340         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2341                 CERROR("log is empty! Logical error\n");
2342                 RETURN (-EINVAL);
2343         }
2344
2345         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2346                logname);
2347
2348         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2349         if (rc)
2350                 RETURN(rc);
2351
2352         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2353         rc = name_create(&nodeuuid, nidstr, "");
2354         if (rc)
2355                 GOTO(out_destory, rc);
2356
2357         rc = name_create(&svname, mdtname, "-osp");
2358         if (rc)
2359                 GOTO(out_destory, rc);
2360
2361         sprintf(index_str, "-MDT%04x", mdt_index);
2362         rc = name_create(&ospname, svname, index_str);
2363         if (rc)
2364                 GOTO(out_destory, rc);
2365
2366         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2367         if (rc)
2368                 GOTO(out_destory, rc);
2369
2370         rc = name_create(&lovuuid, lovname, "_UUID");
2371         if (rc)
2372                 GOTO(out_destory, rc);
2373
2374         rc = name_create(&mdtuuid, mdtname, "_UUID");
2375         if (rc)
2376                 GOTO(out_destory, rc);
2377
2378         rc = record_start_log(env, mgs, &llh, logname);
2379         if (rc)
2380                 GOTO(out_destory, rc);
2381
2382         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2383                            "add osp");
2384         if (rc)
2385                 GOTO(out_destory, rc);
2386
2387         for (i = 0; i < mti->mti_nid_count; i++) {
2388                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2389                         libcfs_nid2str_r(mti->mti_nids[i],
2390                                          nidstr, sizeof(nidstr)));
2391                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2392                 if (rc)
2393                         GOTO(out_end, rc);
2394         }
2395
2396         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2397         if (rc)
2398                 GOTO(out_end, rc);
2399
2400         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2401                           NULL, NULL);
2402         if (rc)
2403                 GOTO(out_end, rc);
2404
2405         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2406         if (rc)
2407                 GOTO(out_end, rc);
2408
2409         /* Add mdc(osp) to lod */
2410         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2411         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2412                          index_str, "1", NULL);
2413         if (rc)
2414                 GOTO(out_end, rc);
2415
2416         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2417         if (rc)
2418                 GOTO(out_end, rc);
2419
2420 out_end:
2421         record_end_log(env, &llh);
2422
2423 out_destory:
2424         name_destroy(&mdtuuid);
2425         name_destroy(&lovuuid);
2426         name_destroy(&lovname);
2427         name_destroy(&ospname);
2428         name_destroy(&svname);
2429         name_destroy(&nodeuuid);
2430         name_destroy(&mdtname);
2431         RETURN(rc);
2432 }
2433
2434 static int mgs_write_log_mdt0(const struct lu_env *env,
2435                               struct mgs_device *mgs,
2436                               struct fs_db *fsdb,
2437                               struct mgs_target_info *mti)
2438 {
2439         char *log = mti->mti_svname;
2440         struct llog_handle *llh = NULL;
2441         char *uuid, *lovname;
2442         char mdt_index[6];
2443         char *ptr = mti->mti_params;
2444         int rc = 0, failout = 0;
2445         ENTRY;
2446
2447         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2448         if (uuid == NULL)
2449                 RETURN(-ENOMEM);
2450
2451         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2452                 failout = (strncmp(ptr, "failout", 7) == 0);
2453
2454         rc = name_create(&lovname, log, "-mdtlov");
2455         if (rc)
2456                 GOTO(out_free, rc);
2457         if (mgs_log_is_empty(env, mgs, log)) {
2458                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2459                 if (rc)
2460                         GOTO(out_lod, rc);
2461         }
2462
2463         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2464
2465         rc = record_start_log(env, mgs, &llh, log);
2466         if (rc)
2467                 GOTO(out_lod, rc);
2468
2469         /* add MDT itself */
2470
2471         /* FIXME this whole fn should be a single journal transaction */
2472         sprintf(uuid, "%s_UUID", log);
2473         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2474         if (rc)
2475                 GOTO(out_lod, rc);
2476         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2477         if (rc)
2478                 GOTO(out_end, rc);
2479         rc = record_mount_opt(env, llh, log, lovname, NULL);
2480         if (rc)
2481                 GOTO(out_end, rc);
2482         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2483                         failout ? "n" : "f");
2484         if (rc)
2485                 GOTO(out_end, rc);
2486         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2487         if (rc)
2488                 GOTO(out_end, rc);
2489 out_end:
2490         record_end_log(env, &llh);
2491 out_lod:
2492         name_destroy(&lovname);
2493 out_free:
2494         OBD_FREE(uuid, sizeof(struct obd_uuid));
2495         RETURN(rc);
2496 }
2497
2498 /* envelope method for all layers log */
2499 static int mgs_write_log_mdt(const struct lu_env *env,
2500                              struct mgs_device *mgs,
2501                              struct fs_db *fsdb,
2502                              struct mgs_target_info *mti)
2503 {
2504         struct mgs_thread_info *mgi = mgs_env_info(env);
2505         struct llog_handle *llh = NULL;
2506         char *cliname;
2507         int rc, i = 0;
2508         ENTRY;
2509
2510         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2511
2512         if (mti->mti_uuid[0] == '\0') {
2513                 /* Make up our own uuid */
2514                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2515                          "%s_UUID", mti->mti_svname);
2516         }
2517
2518         /* add mdt */
2519         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2520         if (rc)
2521                 RETURN(rc);
2522         /* Append the mdt info to the client log */
2523         rc = name_create(&cliname, mti->mti_fsname, "-client");
2524         if (rc)
2525                 RETURN(rc);
2526
2527         if (mgs_log_is_empty(env, mgs, cliname)) {
2528                 /* Start client log */
2529                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2530                                        fsdb->fsdb_clilov);
2531                 if (rc)
2532                         GOTO(out_free, rc);
2533                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2534                                        fsdb->fsdb_clilmv);
2535                 if (rc)
2536                         GOTO(out_free, rc);
2537         }
2538
2539         /*
2540         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2541         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2542         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2543         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2544         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2545         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2546         */
2547
2548         /* copy client info about lov/lmv */
2549         mgi->mgi_comp.comp_mti = mti;
2550         mgi->mgi_comp.comp_fsdb = fsdb;
2551
2552         rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2553                                                 &mgi->mgi_comp);
2554         if (rc)
2555                 GOTO(out_free, rc);
2556         rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2557                                       fsdb->fsdb_clilmv);
2558         if (rc)
2559                 GOTO(out_free, rc);
2560
2561         /* add mountopts */
2562         rc = record_start_log(env, mgs, &llh, cliname);
2563         if (rc)
2564                 GOTO(out_free, rc);
2565
2566         rc = record_marker(env, llh, fsdb, CM_START, cliname,
2567                            "mount opts");
2568         if (rc)
2569                 GOTO(out_end, rc);
2570         rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2571                               fsdb->fsdb_clilmv);
2572         if (rc)
2573                 GOTO(out_end, rc);
2574         rc = record_marker(env, llh, fsdb, CM_END, cliname,
2575                            "mount opts");
2576
2577         if (rc)
2578                 GOTO(out_end, rc);
2579
2580         /* for_all_existing_mdt except current one */
2581         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2582                 if (i !=  mti->mti_stripe_index &&
2583                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2584                         char *logname;
2585
2586                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2587                         if (rc)
2588                                 GOTO(out_end, rc);
2589
2590                         /* NB: If the log for the MDT is empty, it means
2591                          * the MDT is only added to the index
2592                          * map, and not being process yet, i.e. this
2593                          * is an unregistered MDT, see mgs_write_log_target().
2594                          * so we should skip it. Otherwise
2595                          *
2596                          * 1. MGS get register request for MDT1 and MDT2.
2597                          *
2598                          * 2. Then both MDT1 and MDT2 are added into
2599                          * fsdb_mdt_index_map. (see mgs_set_index()).
2600                          *
2601                          * 3. Then MDT1 get the lock of fsdb_mutex, then
2602                          * generate the config log, here, it will regard MDT2
2603                          * as an existent MDT, and generate "add osp" for
2604                          * lustre-MDT0001-osp-MDT0002. Note: at the moment
2605                          * MDT0002 config log is still empty, so it will
2606                          * add "add osp" even before "lov setup", which
2607                          * will definitly cause trouble.
2608                          *
2609                          * 4. MDT1 registeration finished, fsdb_mutex is
2610                          * released, then MDT2 get in, then in above
2611                          * mgs_steal_llog_for_mdt_from_client(), it will
2612                          * add another osp log for lustre-MDT0001-osp-MDT0002,
2613                          * which will cause another trouble.*/
2614                         if (!mgs_log_is_empty(env, mgs, logname))
2615                                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb,
2616                                                               mti, i, logname);
2617
2618                         name_destroy(&logname);
2619                         if (rc)
2620                                 GOTO(out_end, rc);
2621                 }
2622         }
2623 out_end:
2624         record_end_log(env, &llh);
2625 out_free:
2626         name_destroy(&cliname);
2627         RETURN(rc);
2628 }
2629
2630 /* Add the ost info to the client/mdt lov */
2631 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2632                                     struct mgs_device *mgs, struct fs_db *fsdb,
2633                                     struct mgs_target_info *mti,
2634                                     char *logname, char *suffix, char *lovname,
2635                                     enum lustre_sec_part sec_part, int flags)
2636 {
2637         struct llog_handle *llh = NULL;
2638         char *nodeuuid = NULL;
2639         char *oscname = NULL;
2640         char *oscuuid = NULL;
2641         char *lovuuid = NULL;
2642         char *svname = NULL;
2643         char index[6];
2644         char nidstr[LNET_NIDSTR_SIZE];
2645         int i, rc;
2646         ENTRY;
2647
2648         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2649                 mti->mti_svname, logname);
2650
2651         if (mgs_log_is_empty(env, mgs, logname)) {
2652                 CERROR("log is empty! Logical error\n");
2653                 RETURN(-EINVAL);
2654         }
2655
2656         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2657         rc = name_create(&nodeuuid, nidstr, "");
2658         if (rc)
2659                 RETURN(rc);
2660         rc = name_create(&svname, mti->mti_svname, "-osc");
2661         if (rc)
2662                 GOTO(out_free, rc);
2663
2664         /* for the system upgraded from old 1.8, keep using the old osc naming
2665          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2666         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2667                 rc = name_create(&oscname, svname, "");
2668         else
2669                 rc = name_create(&oscname, svname, suffix);
2670         if (rc)
2671                 GOTO(out_free, rc);
2672
2673         rc = name_create(&oscuuid, oscname, "_UUID");
2674         if (rc)
2675                 GOTO(out_free, rc);
2676         rc = name_create(&lovuuid, lovname, "_UUID");
2677         if (rc)
2678                 GOTO(out_free, rc);
2679
2680
2681         /*
2682         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2683         multihomed (#4)
2684         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2685         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2686         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2687         failover (#6,7)
2688         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2689         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2690         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2691         */
2692
2693         rc = record_start_log(env, mgs, &llh, logname);
2694         if (rc)
2695                 GOTO(out_free, rc);
2696
2697         /* FIXME these should be a single journal transaction */
2698         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2699                            "add osc");
2700         if (rc)
2701                 GOTO(out_end, rc);
2702
2703         /* NB: don't change record order, because upon MDT steal OSC config
2704          * from client, it treats all nids before LCFG_SETUP as target nids
2705          * (multiple interfaces), while nids after as failover node nids.
2706          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2707          */
2708         for (i = 0; i < mti->mti_nid_count; i++) {
2709                 CDEBUG(D_MGS, "add nid %s\n",
2710                         libcfs_nid2str_r(mti->mti_nids[i],
2711                                          nidstr, sizeof(nidstr)));
2712                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2713                 if (rc)
2714                         GOTO(out_end, rc);
2715         }
2716         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2717         if (rc)
2718                 GOTO(out_end, rc);
2719         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2720                           NULL, NULL);
2721         if (rc)
2722                 GOTO(out_end, rc);
2723         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2724         if (rc)
2725                 GOTO(out_end, rc);
2726
2727         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2728
2729         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2730         if (rc)
2731                 GOTO(out_end, rc);
2732         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2733                            "add osc");
2734         if (rc)
2735                 GOTO(out_end, rc);
2736 out_end:
2737         record_end_log(env, &llh);
2738 out_free:
2739         name_destroy(&lovuuid);
2740         name_destroy(&oscuuid);
2741         name_destroy(&oscname);
2742         name_destroy(&svname);
2743         name_destroy(&nodeuuid);
2744         RETURN(rc);
2745 }
2746
2747 static int mgs_write_log_ost(const struct lu_env *env,
2748                              struct mgs_device *mgs, struct fs_db *fsdb,
2749                              struct mgs_target_info *mti)
2750 {
2751         struct llog_handle *llh = NULL;
2752         char *logname, *lovname;
2753         char *ptr = mti->mti_params;
2754         int rc, flags = 0, failout = 0, i;
2755         ENTRY;
2756
2757         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2758
2759         /* The ost startup log */
2760
2761         /* If the ost log already exists, that means that someone reformatted
2762            the ost and it called target_add again. */
2763         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2764                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2765                                    "exists, yet the server claims it never "
2766                                    "registered. It may have been reformatted, "
2767                                    "or the index changed. writeconf the MDT to "
2768                                    "regenerate all logs.\n", mti->mti_svname);
2769                 RETURN(-EALREADY);
2770         }
2771
2772         /*
2773         attach obdfilter ost1 ost1_UUID
2774         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2775         */
2776         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2777                 failout = (strncmp(ptr, "failout", 7) == 0);
2778         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2779         if (rc)
2780                 RETURN(rc);
2781         /* FIXME these should be a single journal transaction */
2782         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2783         if (rc)
2784                 GOTO(out_end, rc);
2785         if (*mti->mti_uuid == '\0')
2786                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2787                          "%s_UUID", mti->mti_svname);
2788         rc = record_attach(env, llh, mti->mti_svname,
2789                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2790         if (rc)
2791                 GOTO(out_end, rc);
2792         rc = record_setup(env, llh, mti->mti_svname,
2793                           "dev"/*ignored*/, "type"/*ignored*/,
2794                           failout ? "n" : "f", NULL/*options*/);
2795         if (rc)
2796                 GOTO(out_end, rc);
2797         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2798         if (rc)
2799                 GOTO(out_end, rc);
2800 out_end:
2801         record_end_log(env, &llh);
2802         if (rc)
2803                 RETURN(rc);
2804         /* We also have to update the other logs where this osc is part of
2805            the lov */
2806
2807         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2808                 /* If we're upgrading, the old mdt log already has our
2809                    entry. Let's do a fake one for fun. */
2810                 /* Note that we can't add any new failnids, since we don't
2811                    know the old osc names. */
2812                 flags = CM_SKIP | CM_UPGRADE146;
2813
2814         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2815                 /* If the update flag isn't set, don't update client/mdt
2816                    logs. */
2817                 flags |= CM_SKIP;
2818                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2819                               "the MDT first to regenerate it.\n",
2820                               mti->mti_svname);
2821         }
2822
2823         /* Add ost to all MDT lov defs */
2824         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2825                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2826                         char mdt_index[9];
2827
2828                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2829                                                      i);
2830                         if (rc)
2831                                 RETURN(rc);
2832                         sprintf(mdt_index, "-MDT%04x", i);
2833                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2834                                                       logname, mdt_index,
2835                                                       lovname, LUSTRE_SP_MDT,
2836                                                       flags);
2837                         name_destroy(&logname);
2838                         name_destroy(&lovname);
2839                         if (rc)
2840                                 RETURN(rc);
2841                 }
2842         }
2843
2844         /* Append ost info to the client log */
2845         rc = name_create(&logname, mti->mti_fsname, "-client");
2846         if (rc)
2847                 RETURN(rc);
2848         if (mgs_log_is_empty(env, mgs, logname)) {
2849                 /* Start client log */
2850                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2851                                        fsdb->fsdb_clilov);
2852                 if (rc)
2853                         GOTO(out_free, rc);
2854                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2855                                        fsdb->fsdb_clilmv);
2856                 if (rc)
2857                         GOTO(out_free, rc);
2858         }
2859         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2860                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
2861 out_free:
2862         name_destroy(&logname);
2863         RETURN(rc);
2864 }
2865
2866 static __inline__ int mgs_param_empty(char *ptr)
2867 {
2868         char *tmp;
2869
2870         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2871                 return 1;
2872         return 0;
2873 }
2874
2875 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2876                                           struct mgs_device *mgs,
2877                                           struct fs_db *fsdb,
2878                                           struct mgs_target_info *mti,
2879                                           char *logname, char *cliname)
2880 {
2881         int rc;
2882         struct llog_handle *llh = NULL;
2883
2884         if (mgs_param_empty(mti->mti_params)) {
2885                 /* Remove _all_ failnids */
2886                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2887                                 mti->mti_svname, "add failnid", CM_SKIP);
2888                 return rc < 0 ? rc : 0;
2889         }
2890
2891         /* Otherwise failover nids are additive */
2892         rc = record_start_log(env, mgs, &llh, logname);
2893         if (rc)
2894                 return rc;
2895                 /* FIXME this should be a single journal transaction */
2896         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2897                            "add failnid");
2898         if (rc)
2899                 goto out_end;
2900         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2901         if (rc)
2902                 goto out_end;
2903         rc = record_marker(env, llh, fsdb, CM_END,
2904                            mti->mti_svname, "add failnid");
2905 out_end:
2906         record_end_log(env, &llh);
2907         return rc;
2908 }
2909
2910
2911 /* Add additional failnids to an existing log.
2912    The mdc/osc must have been added to logs first */
2913 /* tcp nids must be in dotted-quad ascii -
2914    we can't resolve hostnames from the kernel. */
2915 static int mgs_write_log_add_failnid(const struct lu_env *env,
2916                                      struct mgs_device *mgs,
2917                                      struct fs_db *fsdb,
2918                                      struct mgs_target_info *mti)
2919 {
2920         char *logname, *cliname;
2921         int rc;
2922         ENTRY;
2923
2924         /* FIXME we currently can't erase the failnids
2925          * given when a target first registers, since they aren't part of
2926          * an "add uuid" stanza
2927          */
2928
2929         /* Verify that we know about this target */
2930         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2931                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2932                                    "yet. It must be started before failnids "
2933                                    "can be added.\n", mti->mti_svname);
2934                 RETURN(-ENOENT);
2935         }
2936
2937         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2938         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2939                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2940         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2941                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2942         } else {
2943                 RETURN(-EINVAL);
2944         }
2945         if (rc)
2946                 RETURN(rc);
2947
2948         /* Add failover nids to the client log */
2949         rc = name_create(&logname, mti->mti_fsname, "-client");
2950         if (rc) {
2951                 name_destroy(&cliname);
2952                 RETURN(rc);
2953         }
2954
2955         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2956         name_destroy(&logname);
2957         name_destroy(&cliname);
2958         if (rc)
2959                 RETURN(rc);
2960
2961         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2962                 /* Add OST failover nids to the MDT logs as well */
2963                 int i;
2964
2965                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2966                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2967                                 continue;
2968                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2969                         if (rc)
2970                                 RETURN(rc);
2971                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2972                                                  fsdb, i);
2973                         if (rc) {
2974                                 name_destroy(&logname);
2975                                 RETURN(rc);
2976                         }
2977                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2978                                                             mti, logname,
2979                                                             cliname);
2980                         name_destroy(&cliname);
2981                         name_destroy(&logname);
2982                         if (rc)
2983                                 RETURN(rc);
2984                 }
2985         }
2986
2987         RETURN(rc);
2988 }
2989
2990 static int mgs_wlp_lcfg(const struct lu_env *env,
2991                         struct mgs_device *mgs, struct fs_db *fsdb,
2992                         struct mgs_target_info *mti,
2993                         char *logname, struct lustre_cfg_bufs *bufs,
2994                         char *tgtname, char *ptr)
2995 {
2996         char comment[MTI_NAME_MAXLEN];
2997         char *tmp;
2998         struct llog_cfg_rec *lcr;
2999         int rc, del;
3000
3001         /* Erase any old settings of this same parameter */
3002         memcpy(comment, ptr, MTI_NAME_MAXLEN);
3003         comment[MTI_NAME_MAXLEN - 1] = 0;
3004         /* But don't try to match the value. */
3005         tmp = strchr(comment, '=');
3006         if (tmp != NULL)
3007                 *tmp = 0;
3008         /* FIXME we should skip settings that are the same as old values */
3009         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
3010         if (rc < 0)
3011                 return rc;
3012         del = mgs_param_empty(ptr);
3013
3014         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
3015                       "Setting" : "Modifying", tgtname, comment, logname);
3016         if (del) {
3017                 /* mgs_modify() will return 1 if nothing had to be done */
3018                 if (rc == 1)
3019                         rc = 0;
3020                 return rc;
3021         }
3022
3023         lustre_cfg_bufs_reset(bufs, tgtname);
3024         lustre_cfg_bufs_set_string(bufs, 1, ptr);
3025         if (mti->mti_flags & LDD_F_PARAM2)
3026                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
3027
3028         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
3029                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
3030         if (lcr == NULL)
3031                 return -ENOMEM;
3032
3033         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
3034                                   comment);
3035         lustre_cfg_rec_free(lcr);
3036         return rc;
3037 }
3038
3039 /* write global variable settings into log */
3040 static int mgs_write_log_sys(const struct lu_env *env,
3041                              struct mgs_device *mgs, struct fs_db *fsdb,
3042                              struct mgs_target_info *mti, char *sys, char *ptr)
3043 {
3044         struct mgs_thread_info  *mgi = mgs_env_info(env);
3045         struct lustre_cfg       *lcfg;
3046         struct llog_cfg_rec     *lcr;
3047         char *tmp, sep;
3048         int rc, cmd, convert = 1;
3049
3050         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
3051                 cmd = LCFG_SET_TIMEOUT;
3052         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
3053                 cmd = LCFG_SET_LDLM_TIMEOUT;
3054         /* Check for known params here so we can return error to lctl */
3055         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
3056                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
3057                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
3058                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
3059                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
3060                 cmd = LCFG_PARAM;
3061         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
3062                 convert = 0; /* Don't convert string value to integer */
3063                 cmd = LCFG_PARAM;
3064         } else {
3065                 return -EINVAL;
3066         }
3067
3068         if (mgs_param_empty(ptr))
3069                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
3070         else
3071                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
3072
3073         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
3074         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
3075         if (!convert && *tmp != '\0')
3076                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
3077         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3078         if (lcr == NULL)
3079                 return -ENOMEM;
3080
3081         lcfg = &lcr->lcr_cfg;
3082         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
3083         /* truncate the comment to the parameter name */
3084         ptr = tmp - 1;
3085         sep = *ptr;
3086         *ptr = '\0';
3087         /* modify all servers and clients */
3088         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3089                                       *tmp == '\0' ? NULL : lcr,
3090                                       mti->mti_fsname, sys, 0);
3091         if (rc == 0 && *tmp != '\0') {
3092                 switch (cmd) {
3093                 case LCFG_SET_TIMEOUT:
3094                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
3095                                 class_process_config(lcfg);
3096                         break;
3097                 case LCFG_SET_LDLM_TIMEOUT:
3098                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
3099                                 class_process_config(lcfg);
3100                         break;
3101                 default:
3102                         break;
3103                 }
3104         }
3105         *ptr = sep;
3106         lustre_cfg_rec_free(lcr);
3107         return rc;
3108 }
3109
3110 /* write quota settings into log */
3111 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
3112                                struct fs_db *fsdb, struct mgs_target_info *mti,
3113                                char *quota, char *ptr)
3114 {
3115         struct mgs_thread_info  *mgi = mgs_env_info(env);
3116         struct llog_cfg_rec     *lcr;
3117         char                    *tmp;
3118         char                     sep;
3119         int                      rc, cmd = LCFG_PARAM;
3120
3121         /* support only 'meta' and 'data' pools so far */
3122         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
3123             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
3124                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
3125                        "& quota.ost are)\n", ptr);
3126                 return -EINVAL;
3127         }
3128
3129         if (*tmp == '\0') {
3130                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
3131         } else {
3132                 CDEBUG(D_MGS, "global '%s'\n", quota);
3133
3134                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
3135                     strchr(tmp, 'p') == NULL &&
3136                     strcmp(tmp, "none") != 0) {
3137                         CERROR("enable option(%s) isn't supported\n", tmp);
3138                         return -EINVAL;
3139                 }
3140         }
3141
3142         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
3143         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
3144         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
3145         if (lcr == NULL)
3146                 return -ENOMEM;
3147
3148         /* truncate the comment to the parameter name */
3149         ptr = tmp - 1;
3150         sep = *ptr;
3151         *ptr = '\0';
3152
3153         /* XXX we duplicated quota enable information in all server
3154          *     config logs, it should be moved to a separate config
3155          *     log once we cleanup the config log for global param. */
3156         /* modify all servers */
3157         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
3158                                       *tmp == '\0' ? NULL : lcr,
3159                                       mti->mti_fsname, quota, 1);
3160         *ptr = sep;
3161         lustre_cfg_rec_free(lcr);
3162         return rc < 0 ? rc : 0;
3163 }
3164
3165 static int mgs_srpc_set_param_disk(const struct lu_env *env,
3166                                    struct mgs_device *mgs,
3167                                    struct fs_db *fsdb,
3168                                    struct mgs_target_info *mti,
3169                                    char *param)
3170 {
3171         struct mgs_thread_info  *mgi = mgs_env_info(env);
3172         struct llog_cfg_rec     *lcr;
3173         struct llog_handle      *llh = NULL;
3174         char                    *logname;
3175         char                    *comment, *ptr;
3176         int                      rc, len;
3177
3178         ENTRY;
3179
3180         /* get comment */
3181         ptr = strchr(param, '=');
3182         LASSERT(ptr != NULL);
3183         len = ptr - param;
3184
3185         OBD_ALLOC(comment, len + 1);
3186         if (comment == NULL)
3187                 RETURN(-ENOMEM);
3188         strncpy(comment, param, len);
3189         comment[len] = '\0';
3190
3191         /* prepare lcfg */
3192         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
3193         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
3194         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
3195         if (lcr == NULL)
3196                 GOTO(out_comment, rc = -ENOMEM);
3197
3198         /* construct log name */
3199         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
3200         if (rc < 0)
3201                 GOTO(out_lcfg, rc);
3202
3203         if (mgs_log_is_empty(env, mgs, logname)) {
3204                 rc = record_start_log(env, mgs, &llh, logname);
3205                 if (rc < 0)
3206                         GOTO(out, rc);
3207                 record_end_log(env, &llh);
3208         }
3209
3210         /* obsolete old one */
3211         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
3212                         comment, CM_SKIP);
3213         if (rc < 0)
3214                 GOTO(out, rc);
3215         /* write the new one */
3216         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
3217                                   mti->mti_svname, comment);
3218         if (rc)
3219                 CERROR("%s: error writing log %s: rc = %d\n",
3220                        mgs->mgs_obd->obd_name, logname, rc);
3221 out:
3222         name_destroy(&logname);
3223 out_lcfg:
3224         lustre_cfg_rec_free(lcr);
3225 out_comment:
3226         OBD_FREE(comment, len + 1);
3227         RETURN(rc);
3228 }
3229
3230 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
3231                                         char *param)
3232 {
3233         char    *ptr;
3234
3235         /* disable the adjustable udesc parameter for now, i.e. use default
3236          * setting that client always ship udesc to MDT if possible. to enable
3237          * it simply remove the following line */
3238         goto error_out;
3239
3240         ptr = strchr(param, '=');
3241         if (ptr == NULL)
3242                 goto error_out;
3243         *ptr++ = '\0';
3244
3245         if (strcmp(param, PARAM_SRPC_UDESC))
3246                 goto error_out;
3247
3248         if (strcmp(ptr, "yes") == 0) {
3249                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3250                 CWARN("Enable user descriptor shipping from client to MDT\n");
3251         } else if (strcmp(ptr, "no") == 0) {
3252                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
3253                 CWARN("Disable user descriptor shipping from client to MDT\n");
3254         } else {
3255                 *(ptr - 1) = '=';
3256                 goto error_out;
3257         }
3258         return 0;
3259
3260 error_out:
3261         CERROR("Invalid param: %s\n", param);
3262         return -EINVAL;
3263 }
3264
3265 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3266                                   const char *svname,
3267                                   char *param)
3268 {
3269         struct sptlrpc_rule      rule;
3270         struct sptlrpc_rule_set *rset;
3271         int                      rc;
3272         ENTRY;
3273
3274         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3275                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3276                 RETURN(-EINVAL);
3277         }
3278
3279         if (strncmp(param, PARAM_SRPC_UDESC,
3280                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3281                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3282         }
3283
3284         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3285                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3286                 RETURN(-EINVAL);
3287         }
3288
3289         param += sizeof(PARAM_SRPC_FLVR) - 1;
3290
3291         rc = sptlrpc_parse_rule(param, &rule);
3292         if (rc)
3293                 RETURN(rc);
3294
3295         /* mgs rules implies must be mgc->mgs */
3296         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3297                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3298                      rule.sr_from != LUSTRE_SP_ANY) ||
3299                     (rule.sr_to != LUSTRE_SP_MGS &&
3300                      rule.sr_to != LUSTRE_SP_ANY))
3301                         RETURN(-EINVAL);
3302         }
3303
3304         /* preapre room for this coming rule. svcname format should be:
3305          * - fsname: general rule
3306          * - fsname-tgtname: target-specific rule
3307          */
3308         if (strchr(svname, '-')) {
3309                 struct mgs_tgt_srpc_conf *tgtconf;
3310                 int                       found = 0;
3311
3312                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3313                      tgtconf = tgtconf->mtsc_next) {
3314                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3315                                 found = 1;
3316                                 break;
3317                         }
3318                 }
3319
3320                 if (!found) {
3321                         int name_len;
3322
3323                         OBD_ALLOC_PTR(tgtconf);
3324                         if (tgtconf == NULL)
3325                                 RETURN(-ENOMEM);
3326
3327                         name_len = strlen(svname);
3328
3329                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3330                         if (tgtconf->mtsc_tgt == NULL) {
3331                                 OBD_FREE_PTR(tgtconf);
3332                                 RETURN(-ENOMEM);
3333                         }
3334                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3335
3336                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3337                         fsdb->fsdb_srpc_tgt = tgtconf;
3338                 }
3339
3340                 rset = &tgtconf->mtsc_rset;
3341         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3342                 /* put _mgs related srpc rule directly in mgs ruleset */
3343                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3344         } else {
3345                 rset = &fsdb->fsdb_srpc_gen;
3346         }
3347
3348         rc = sptlrpc_rule_set_merge(rset, &rule);
3349
3350         RETURN(rc);
3351 }
3352
3353 static int mgs_srpc_set_param(const struct lu_env *env,
3354                               struct mgs_device *mgs,
3355                               struct fs_db *fsdb,
3356                               struct mgs_target_info *mti,
3357                               char *param)
3358 {
3359         char                   *copy;
3360         int                     rc, copy_size;
3361         ENTRY;
3362
3363 #ifndef HAVE_GSS
3364         RETURN(-EINVAL);
3365 #endif
3366         /* keep a copy of original param, which could be destroied
3367          * during parsing */
3368         copy_size = strlen(param) + 1;
3369         OBD_ALLOC(copy, copy_size);
3370         if (copy == NULL)
3371                 return -ENOMEM;
3372         memcpy(copy, param, copy_size);
3373
3374         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3375         if (rc)
3376                 goto out_free;
3377
3378         /* previous steps guaranteed the syntax is correct */
3379         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3380         if (rc)
3381                 goto out_free;
3382
3383         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3384                 /*
3385                  * for mgs rules, make them effective immediately.
3386                  */
3387                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3388                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3389                                                  &fsdb->fsdb_srpc_gen);
3390         }
3391
3392 out_free:
3393         OBD_FREE(copy, copy_size);
3394         RETURN(rc);
3395 }
3396
3397 struct mgs_srpc_read_data {
3398         struct fs_db   *msrd_fsdb;
3399         int             msrd_skip;
3400 };
3401
3402 static int mgs_srpc_read_handler(const struct lu_env *env,
3403                                  struct llog_handle *llh,
3404                                  struct llog_rec_hdr *rec, void *data)
3405 {
3406         struct mgs_srpc_read_data *msrd = data;
3407         struct cfg_marker         *marker;
3408         struct lustre_cfg         *lcfg = REC_DATA(rec);
3409         char                      *svname, *param;
3410         int                        cfg_len, rc;
3411         ENTRY;
3412
3413         if (rec->lrh_type != OBD_CFG_REC) {
3414                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3415                 RETURN(-EINVAL);
3416         }
3417
3418         cfg_len = REC_DATA_LEN(rec);
3419
3420         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3421         if (rc) {
3422                 CERROR("Insane cfg\n");
3423                 RETURN(rc);
3424         }
3425
3426         if (lcfg->lcfg_command == LCFG_MARKER) {
3427                 marker = lustre_cfg_buf(lcfg, 1);
3428
3429                 if (marker->cm_flags & CM_START &&
3430                     marker->cm_flags & CM_SKIP)
3431                         msrd->msrd_skip = 1;
3432                 if (marker->cm_flags & CM_END)
3433                         msrd->msrd_skip = 0;
3434
3435                 RETURN(0);
3436         }
3437
3438         if (msrd->msrd_skip)
3439                 RETURN(0);
3440
3441         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3442                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3443                 RETURN(0);
3444         }
3445
3446         svname = lustre_cfg_string(lcfg, 0);
3447         if (svname == NULL) {
3448                 CERROR("svname is empty\n");
3449                 RETURN(0);
3450         }
3451
3452         param = lustre_cfg_string(lcfg, 1);
3453         if (param == NULL) {
3454                 CERROR("param is empty\n");
3455                 RETURN(0);
3456         }
3457
3458         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3459         if (rc)
3460                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3461
3462         RETURN(0);
3463 }
3464
3465 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3466                                 struct mgs_device *mgs,
3467                                 struct fs_db *fsdb)
3468 {
3469         struct llog_handle        *llh = NULL;
3470         struct llog_ctxt          *ctxt;
3471         char                      *logname;
3472         struct mgs_srpc_read_data  msrd;
3473         int                        rc;
3474         ENTRY;
3475
3476         /* construct log name */
3477         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3478         if (rc)
3479                 RETURN(rc);
3480
3481         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3482         LASSERT(ctxt != NULL);
3483
3484         if (mgs_log_is_empty(env, mgs, logname))
3485                 GOTO(out, rc = 0);
3486
3487         rc = llog_open(env, ctxt, &llh, NULL, logname,
3488                        LLOG_OPEN_EXISTS);
3489         if (rc < 0) {
3490                 if (rc == -ENOENT)
3491                         rc = 0;
3492                 GOTO(out, rc);
3493         }
3494
3495         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3496         if (rc)
3497                 GOTO(out_close, rc);
3498
3499         if (llog_get_size(llh) <= 1)
3500                 GOTO(out_close, rc = 0);
3501
3502         msrd.msrd_fsdb = fsdb;
3503         msrd.msrd_skip = 0;
3504
3505         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3506                           NULL);
3507
3508 out_close:
3509         llog_close(env, llh);
3510 out:
3511         llog_ctxt_put(ctxt);
3512         name_destroy(&logname);
3513
3514         if (rc)
3515                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3516         RETURN(rc);
3517 }
3518
3519 static int mgs_write_log_param2(const struct lu_env *env,
3520                                 struct mgs_device *mgs,
3521                                 struct fs_db *fsdb,
3522                                 struct mgs_target_info *mti, char *ptr)
3523 {
3524         struct lustre_cfg_bufs bufs;
3525         int rc;
3526
3527         ENTRY;
3528         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3529
3530         /* PARAM_MGSNODE and PARAM_NETWORK are set only when formating
3531          * or during the inital mount. It can never change after that.
3532          */
3533         if (!class_match_param(ptr, PARAM_MGSNODE, NULL) ||
3534             !class_match_param(ptr, PARAM_NETWORK, NULL)) {
3535                 rc = 0;
3536                 goto end;
3537         }
3538
3539         /* Processed in mgs_write_log_ost. Another value that can't
3540          * be changed by lctl set_param -P.
3541          */
3542         if (!class_match_param(ptr, PARAM_FAILMODE, NULL)) {
3543                 LCONSOLE_ERROR_MSG(0x169,
3544                                    "%s can only be changed with tunefs.lustre and --writeconf\n",
3545                                    ptr);
3546                 rc = -EPERM;
3547                 goto end;
3548         }
3549
3550         /* FIXME !!! Support for sptlrpc is incomplete. Currently the change
3551          * doesn't transmit to the client. See LU-7183.
3552          */
3553         if (!class_match_param(ptr, PARAM_SRPC, NULL)) {
3554                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3555                 goto end;
3556         }
3557
3558         /* Can't use class_match_param since ptr doesn't start with
3559          * PARAM_FAILNODE. So we look for PARAM_FAILNODE contained in ptr.
3560          */
3561         if (strstr(ptr, PARAM_FAILNODE)) {
3562                 /* Add a failover nidlist. We already processed failovers
3563                  * params for new targets in mgs_write_log_target.
3564                  */
3565                 const char *param;
3566
3567                 /* can't use wildcards with failover.node */
3568                 if (strchr(ptr, '*')) {
3569                         rc = -ENODEV;
3570                         goto end;
3571                 }
3572
3573                 param = strstr(ptr, PARAM_FAILNODE);
3574                 if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
3575                     sizeof(mti->mti_params)) {
3576                         rc = -E2BIG;
3577                         goto end;
3578                 }
3579
3580                 CDEBUG(D_MGS, "Adding failnode with param %s\n",
3581                        mti->mti_params);
3582                 rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3583                 goto end;
3584         }
3585
3586         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
3587                           mti->mti_svname, ptr);
3588 end:
3589         RETURN(rc);
3590 }
3591
3592 /* Permanent settings of all parameters by writing into the appropriate
3593  * configuration logs.
3594  * A parameter with null value ("<param>='\0'") means to erase it out of
3595  * the logs.
3596  */
3597 static int mgs_write_log_param(const struct lu_env *env,
3598                                struct mgs_device *mgs, struct fs_db *fsdb,
3599                                struct mgs_target_info *mti, char *ptr)
3600 {
3601         struct mgs_thread_info *mgi = mgs_env_info(env);
3602         char *logname;
3603         char *tmp;
3604         int rc = 0;
3605         ENTRY;
3606
3607         /* For various parameter settings, we have to figure out which logs
3608            care about them (e.g. both mdt and client for lov settings) */
3609         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3610
3611         /* The params are stored in MOUNT_DATA_FILE and modified via
3612            tunefs.lustre, or set using lctl conf_param */
3613
3614         /* Processed in lustre_start_mgc */
3615         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3616                 GOTO(end, rc);
3617
3618         /* Processed in ost/mdt */
3619         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3620                 GOTO(end, rc);
3621
3622         /* Processed in mgs_write_log_ost */
3623         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3624                 if (mti->mti_flags & LDD_F_PARAM) {
3625                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3626                                            "changed with tunefs.lustre"
3627                                            "and --writeconf\n", ptr);
3628                         rc = -EPERM;
3629                 }
3630                 GOTO(end, rc);
3631         }
3632
3633         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3634                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3635                 GOTO(end, rc);
3636         }
3637
3638         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3639                 /* Add a failover nidlist */
3640                 rc = 0;
3641                 /* We already processed failovers params for new
3642                    targets in mgs_write_log_target */
3643                 if (mti->mti_flags & LDD_F_PARAM) {
3644                         CDEBUG(D_MGS, "Adding failnode\n");
3645                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3646                 }
3647                 GOTO(end, rc);
3648         }
3649
3650         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3651                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3652                 GOTO(end, rc);
3653         }
3654
3655         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3656                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3657                 GOTO(end, rc);
3658         }
3659
3660         if (class_match_param(ptr, PARAM_OSC PARAM_ACTIVE, &tmp) == 0 ||
3661             class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0) {
3662                 /* active=0 means off, anything else means on */
3663                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3664                 bool deactive_osc = memcmp(ptr, PARAM_OSC PARAM_ACTIVE,
3665                                           strlen(PARAM_OSC PARAM_ACTIVE)) == 0;
3666                 int i;
3667
3668                 if (!deactive_osc) {
3669                         __u32   index;
3670
3671                         rc = server_name2index(mti->mti_svname, &index, NULL);
3672                         if (rc < 0)
3673                                 GOTO(end, rc);
3674
3675                         if (index == 0) {
3676                                 LCONSOLE_ERROR_MSG(0x144, "%s: MDC0 can not be"
3677                                                    " (de)activated.\n",
3678                                                    mti->mti_svname);
3679                                 GOTO(end, rc = -EPERM);
3680                         }
3681                 }
3682
3683                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3684                               flag ? "de" : "re", mti->mti_svname);
3685                 /* Modify clilov */
3686                 rc = name_create(&logname, mti->mti_fsname, "-client");
3687                 if (rc < 0)
3688                         GOTO(end, rc);
3689                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3690                                 mti->mti_svname,
3691                                 deactive_osc ? "add osc" : "add mdc", flag);
3692                 name_destroy(&logname);
3693                 if (rc < 0)
3694                         goto active_err;
3695
3696                 /* Modify mdtlov */
3697                 /* Add to all MDT logs for DNE */
3698                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3699                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3700                                 continue;
3701                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3702                         if (rc < 0)
3703                                 GOTO(end, rc);
3704                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3705                                         mti->mti_svname,
3706                                         deactive_osc ? "add osc" : "add osp",
3707                                         flag);
3708                         name_destroy(&logname);
3709                         if (rc < 0)
3710                                 goto active_err;
3711                 }
3712 active_err:
3713                 if (rc < 0) {
3714                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3715                                            "log (%d). No permanent "
3716                                            "changes were made to the "
3717                                            "config log.\n",
3718                                            mti->mti_svname, rc);
3719                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3720                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3721                                                    " because the log"
3722                                                    "is in the old 1.4"
3723                                                    "style. Consider "
3724                                                    " --writeconf to "
3725                                                    "update the logs.\n");
3726                         GOTO(end, rc);
3727                 }
3728                 /* Fall through to osc/mdc proc for deactivating live
3729                    OSC/OSP on running MDT / clients. */
3730         }
3731         /* Below here, let obd's XXX_process_config methods handle it */
3732
3733         /* All lov. in proc */
3734         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3735                 char *mdtlovname;
3736
3737                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3738                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3739                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3740                                            "set on the MDT, not %s. "
3741                                            "Ignoring.\n",
3742                                            mti->mti_svname);
3743                         GOTO(end, rc = 0);
3744                 }
3745
3746                 /* Modify mdtlov */
3747                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3748                         GOTO(end, rc = -ENODEV);
3749
3750                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3751                                              mti->mti_stripe_index);
3752                 if (rc)
3753                         GOTO(end, rc);
3754                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3755                                   &mgi->mgi_bufs, mdtlovname, ptr);
3756                 name_destroy(&logname);
3757                 name_destroy(&mdtlovname);
3758                 if (rc)
3759                         GOTO(end, rc);
3760
3761                 /* Modify clilov */
3762                 rc = name_create(&logname, mti->mti_fsname, "-client");
3763                 if (rc)
3764                         GOTO(end, rc);
3765                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3766                                   fsdb->fsdb_clilov, ptr);
3767                 name_destroy(&logname);
3768                 GOTO(end, rc);
3769         }
3770
3771         /* All osc., mdc., llite. params in proc */
3772         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3773             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3774             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3775                 char *cname;
3776
3777                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3778                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3779                                            " cannot be modified. Consider"
3780                                            " updating the configuration with"
3781                                            " --writeconf\n",
3782                                            mti->mti_svname);
3783                         GOTO(end, rc = -EINVAL);
3784                 }
3785                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3786                         rc = name_create(&cname, mti->mti_fsname, "-client");
3787                         /* Add the client type to match the obdname in
3788                            class_config_llog_handler */
3789                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3790                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3791                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3792                         rc = name_create(&cname, mti->mti_svname, "-osc");
3793                 } else {
3794                         GOTO(end, rc = -EINVAL);
3795                 }
3796                 if (rc)
3797                         GOTO(end, rc);
3798
3799                 /* Forbid direct update of llite root squash parameters.
3800                  * These parameters are indirectly set via the MDT settings.
3801                  * See (LU-1778) */
3802                 if ((class_match_param(ptr, PARAM_LLITE, &tmp) == 0) &&
3803                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3804                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3805                         LCONSOLE_ERROR("%s: root squash parameters can only "
3806                                 "be updated through MDT component\n",
3807                                 mti->mti_fsname);
3808                         name_destroy(&cname);
3809                         GOTO(end, rc = -EINVAL);
3810                 }
3811
3812                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3813
3814                 /* Modify client */
3815                 rc = name_create(&logname, mti->mti_fsname, "-client");
3816                 if (rc) {
3817                         name_destroy(&cname);
3818                         GOTO(end, rc);
3819                 }
3820                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3821                                   cname, ptr);
3822
3823                 /* osc params affect the MDT as well */
3824                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3825                         int i;
3826
3827                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3828                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3829                                         continue;
3830                                 name_destroy(&cname);
3831                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3832                                                          fsdb, i);
3833                                 name_destroy(&logname);
3834                                 if (rc)
3835                                         break;
3836                                 rc = name_create_mdt(&logname,
3837                                                      mti->mti_fsname, i);
3838                                 if (rc)
3839                                         break;
3840                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3841                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3842                                                           mti, logname,
3843                                                           &mgi->mgi_bufs,
3844                                                           cname, ptr);
3845                                         if (rc)
3846                                                 break;
3847                                 }
3848                         }
3849                 }
3850
3851                 /* For mdc activate/deactivate, it affects OSP on MDT as well */
3852                 if (class_match_param(ptr, PARAM_MDC PARAM_ACTIVE, &tmp) == 0 &&
3853                     rc == 0) {
3854                         char suffix[16];
3855                         char *lodname = NULL;
3856                         char *param_str = NULL;
3857                         int i;
3858                         int index;
3859
3860                         /* replace mdc with osp */
3861                         memcpy(ptr, PARAM_OSP, strlen(PARAM_OSP));
3862                         rc = server_name2index(mti->mti_svname, &index, NULL);
3863                         if (rc < 0) {
3864                                 memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3865                                 GOTO(end, rc);
3866                         }
3867
3868                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3869                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3870                                         continue;
3871
3872                                 if (i == index)
3873                                         continue;
3874
3875                                 name_destroy(&logname);
3876                                 rc = name_create_mdt(&logname, mti->mti_fsname,
3877                                                      i);
3878                                 if (rc < 0)
3879                                         break;
3880
3881                                 if (mgs_log_is_empty(env, mgs, logname))
3882                                         continue;
3883
3884                                 snprintf(suffix, sizeof(suffix), "-osp-MDT%04x",
3885                                          i);
3886                                 name_destroy(&cname);
3887                                 rc = name_create(&cname, mti->mti_svname,
3888                                                  suffix);
3889                                 if (rc < 0)
3890                                         break;
3891
3892                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3893                                                   &mgi->mgi_bufs, cname, ptr);
3894                                 if (rc < 0)
3895                                         break;
3896
3897                                 /* Add configuration log for noitfying LOD
3898                                  * to active/deactive the OSP. */
3899                                 name_destroy(&param_str);
3900                                 rc = name_create(&param_str, cname,
3901                                                  (*tmp == '0') ?  ".active=0" :
3902                                                  ".active=1");
3903                                 if (rc < 0)
3904                                         break;
3905
3906                                 name_destroy(&lodname);
3907                                 rc = name_create(&lodname, logname, "-mdtlov");
3908                                 if (rc < 0)
3909                                         break;
3910
3911                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3912                                                   &mgi->mgi_bufs, lodname,
3913                                                   param_str);
3914                                 if (rc < 0)
3915                                         break;
3916                         }
3917                         memcpy(ptr, PARAM_MDC, strlen(PARAM_MDC));
3918                         name_destroy(&lodname);
3919                         name_destroy(&param_str);
3920                 }
3921
3922                 name_destroy(&logname);
3923                 name_destroy(&cname);
3924                 GOTO(end, rc);
3925         }
3926
3927         /* All mdt. params in proc */
3928         if (class_match_param(ptr, PARAM_MDT, &tmp) == 0) {
3929                 int i;
3930                 __u32 idx;
3931
3932                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3933                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3934                             MTI_NAME_MAXLEN) == 0)
3935                         /* device is unspecified completely? */
3936                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3937                 else
3938                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3939                 if (rc < 0)
3940                         goto active_err;
3941                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3942                         goto active_err;
3943                 if (rc & LDD_F_SV_ALL) {
3944                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3945                                 if (!test_bit(i,
3946                                                   fsdb->fsdb_mdt_index_map))
3947                                         continue;
3948                                 rc = name_create_mdt(&logname,
3949                                                 mti->mti_fsname, i);
3950                                 if (rc)
3951                                         goto active_err;
3952                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3953                                                   logname, &mgi->mgi_bufs,
3954                                                   logname, ptr);
3955                                 name_destroy(&logname);
3956                                 if (rc)
3957                                         goto active_err;
3958                         }
3959                 } else {
3960                         if ((memcmp(tmp, "root_squash=", 12) == 0) ||
3961                             (memcmp(tmp, "nosquash_nids=", 14) == 0)) {
3962                                 LCONSOLE_ERROR("%s: root squash parameters "
3963                                         "cannot be applied to a single MDT\n",
3964                                         mti->mti_fsname);
3965                                 GOTO(end, rc = -EINVAL);
3966                         }
3967                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3968                                           mti->mti_svname, &mgi->mgi_bufs,
3969                                           mti->mti_svname, ptr);
3970                         if (rc)
3971                                 goto active_err;
3972                 }
3973
3974                 /* root squash settings are also applied to llite
3975                  * config log (see LU-1778) */
3976                 if (rc == 0 &&
3977                     ((memcmp(tmp, "root_squash=", 12) == 0) ||
3978                      (memcmp(tmp, "nosquash_nids=", 14) == 0))) {
3979                         char *cname;
3980                         char *ptr2;
3981
3982                         rc = name_create(&cname, mti->mti_fsname, "-client");
3983                         if (rc)
3984                                 GOTO(end, rc);
3985                         rc = name_create(&logname, mti->mti_fsname, "-client");
3986                         if (rc) {
3987                                 name_destroy(&cname);
3988                                 GOTO(end, rc);
3989                         }
3990                         rc = name_create(&ptr2, PARAM_LLITE, tmp);
3991                         if (rc) {
3992                                 name_destroy(&cname);
3993                                 name_destroy(&logname);
3994                                 GOTO(end, rc);
3995                         }
3996                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname,
3997                                           &mgi->mgi_bufs, cname, ptr2);
3998                         name_destroy(&ptr2);
3999                         name_destroy(&logname);
4000                         name_destroy(&cname);
4001                 }
4002                 GOTO(end, rc);
4003         }
4004
4005         /* All mdd., ost. and osd. params in proc */
4006         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
4007             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
4008             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
4009                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
4010                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
4011                         GOTO(end, rc = -ENODEV);
4012
4013                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
4014                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
4015                 GOTO(end, rc);
4016         }
4017
4018         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
4019
4020 end:
4021         if (rc)
4022                 CERROR("err %d on param '%s'\n", rc, ptr);
4023
4024         RETURN(rc);
4025 }
4026
4027 int mgs_write_log_target(const struct lu_env *env, struct mgs_device *mgs,
4028                          struct mgs_target_info *mti, struct fs_db *fsdb)
4029 {
4030         char    *buf, *params;
4031         int      rc = -EINVAL;
4032
4033         ENTRY;
4034
4035         /* set/check the new target index */
4036         rc = mgs_set_index(env, mgs, mti);
4037         if (rc < 0)
4038                 RETURN(rc);
4039
4040         if (rc == EALREADY) {
4041                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
4042                               mti->mti_stripe_index, mti->mti_svname);
4043                 /* We would like to mark old log sections as invalid
4044                    and add new log sections in the client and mdt logs.
4045                    But if we add new sections, then live clients will
4046                    get repeat setup instructions for already running
4047                    osc's. So don't update the client/mdt logs. */
4048                 mti->mti_flags &= ~LDD_F_UPDATE;
4049                 rc = 0;
4050         }
4051
4052         OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_WRITE_TARGET_DELAY, cfs_fail_val > 0 ?
4053                          cfs_fail_val : 10);
4054
4055         mutex_lock(&fsdb->fsdb_mutex);
4056
4057         if (mti->mti_flags &
4058             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
4059                 /* Generate a log from scratch */
4060                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
4061                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
4062                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
4063                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
4064                 } else {
4065                         CERROR("Unknown target type %#x, can't create log for "
4066                                "%s\n", mti->mti_flags, mti->mti_svname);
4067                 }
4068                 if (rc) {
4069                         CERROR("Can't write logs for %s (%d)\n",
4070                                mti->mti_svname, rc);
4071                         GOTO(out_up, rc);
4072                 }
4073         } else {
4074                 /* Just update the params from tunefs in mgs_write_log_params */
4075                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
4076                 mti->mti_flags |= LDD_F_PARAM;
4077         }
4078
4079         /* allocate temporary buffer, where class_get_next_param will
4080            make copy of a current  parameter */
4081         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
4082         if (buf == NULL)
4083                 GOTO(out_up, rc = -ENOMEM);
4084         params = mti->mti_params;
4085         while (params != NULL) {
4086                 rc = class_get_next_param(&params, buf);
4087                 if (rc) {
4088                         if (rc == 1)
4089                                 /* there is no next parameter, that is
4090                                    not an error */
4091                                 rc = 0;
4092                         break;
4093                 }
4094                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
4095                        params, buf);
4096                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
4097                 if (rc)
4098                         break;
4099         }
4100
4101         OBD_FREE(buf, strlen(mti->mti_params) + 1);
4102
4103 out_up:
4104         mutex_unlock(&fsdb->fsdb_mutex);
4105         RETURN(rc);
4106 }
4107
4108 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
4109 {
4110         struct llog_ctxt        *ctxt;
4111         int                      rc = 0;
4112
4113         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4114         if (ctxt == NULL) {
4115                 CERROR("%s: MGS config context doesn't exist\n",
4116                        mgs->mgs_obd->obd_name);
4117                 rc = -ENODEV;
4118         } else {
4119                 rc = llog_erase(env, ctxt, NULL, name);
4120                 /* llog may not exist */
4121                 if (rc == -ENOENT)
4122                         rc = 0;
4123                 llog_ctxt_put(ctxt);
4124         }
4125
4126         if (rc)
4127                 CERROR("%s: failed to clear log %s: %d\n",
4128                        mgs->mgs_obd->obd_name, name, rc);
4129
4130         return rc;
4131 }
4132
4133 /* erase all logs for the given fs */
4134 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs,
4135                    const char *fsname)
4136 {
4137         struct list_head log_list;
4138         struct mgs_direntry *dirent, *n;
4139         char barrier_name[20] = {};
4140         char *suffix;
4141         int count = 0;
4142         int rc, len = strlen(fsname);
4143         ENTRY;
4144
4145         mutex_lock(&mgs->mgs_mutex);
4146
4147         /* Find all the logs in the CONFIGS directory */
4148         rc = class_dentry_readdir(env, mgs, &log_list);
4149         if (rc) {
4150                 mutex_unlock(&mgs->mgs_mutex);
4151                 RETURN(rc);
4152         }
4153
4154         if (list_empty(&log_list)) {
4155                 mutex_unlock(&mgs->mgs_mutex);
4156                 RETURN(-ENOENT);
4157         }
4158
4159         snprintf(barrier_name, sizeof(barrier_name) - 1, "%s-%s",
4160                  fsname, BARRIER_FILENAME);
4161         /* Delete the barrier fsdb */
4162         mgs_remove_fsdb_by_name(mgs, barrier_name);
4163         /* Delete the fs db */
4164         mgs_remove_fsdb_by_name(mgs, fsname);
4165         mutex_unlock(&mgs->mgs_mutex);
4166
4167         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4168                 list_del_init(&dirent->mde_list);
4169                 suffix = strrchr(dirent->mde_name, '-');
4170                 if (suffix != NULL) {
4171                         if ((len == suffix - dirent->mde_name) &&
4172                             (strncmp(fsname, dirent->mde_name, len) == 0)) {
4173                                 CDEBUG(D_MGS, "Removing log %s\n",
4174                                        dirent->mde_name);
4175                                 mgs_erase_log(env, mgs, dirent->mde_name);
4176                                 count++;
4177                         }
4178                 }
4179                 mgs_direntry_free(dirent);
4180         }
4181
4182         if (count == 0)
4183                 rc = -ENOENT;
4184
4185         RETURN(rc);
4186 }
4187
4188 /* list all logs for the given fs */
4189 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
4190                   struct obd_ioctl_data *data)
4191 {
4192         struct list_head         log_list;
4193         struct mgs_direntry     *dirent, *n;
4194         char                    *out, *suffix;
4195         int                      l, remains, rc;
4196
4197         ENTRY;
4198
4199         /* Find all the logs in the CONFIGS directory */
4200         rc = class_dentry_readdir(env, mgs, &log_list);
4201         if (rc)
4202                 RETURN(rc);
4203
4204         out = data->ioc_bulk;
4205         remains = data->ioc_inllen1;
4206         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4207                 list_del_init(&dirent->mde_list);
4208                 suffix = strrchr(dirent->mde_name, '-');
4209                 if (suffix != NULL) {
4210                         l = snprintf(out, remains, "config_log: %s\n",
4211                                      dirent->mde_name);
4212                         out += l;
4213                         remains -= l;
4214                 }
4215                 mgs_direntry_free(dirent);
4216                 if (remains <= 0)
4217                         break;
4218         }
4219         RETURN(rc);
4220 }
4221
4222 struct mgs_lcfg_fork_data {
4223         struct lustre_cfg_bufs   mlfd_bufs;
4224         struct mgs_device       *mlfd_mgs;
4225         struct llog_handle      *mlfd_llh;
4226         const char              *mlfd_oldname;
4227         const char              *mlfd_newname;
4228         char                     mlfd_data[0];
4229 };
4230
4231 static bool contain_valid_fsname(char *buf, const char *fsname,
4232                                  int buflen, int namelen)
4233 {
4234         if (buflen < namelen)
4235                 return false;
4236
4237         if (memcmp(buf, fsname, namelen) != 0)
4238                 return false;
4239
4240         if (buf[namelen] != '\0' && buf[namelen] != '-')
4241                 return false;
4242
4243         return true;
4244 }
4245
4246 static int mgs_lcfg_fork_handler(const struct lu_env *env,
4247                                  struct llog_handle *o_llh,
4248                                  struct llog_rec_hdr *o_rec, void *data)
4249 {
4250         struct mgs_lcfg_fork_data *mlfd = data;
4251         struct lustre_cfg_bufs *n_bufs = &mlfd->mlfd_bufs;
4252         struct lustre_cfg *o_lcfg = (struct lustre_cfg *)(o_rec + 1);
4253         struct llog_cfg_rec *lcr;
4254         char *o_buf;
4255         char *n_buf = mlfd->mlfd_data;
4256         int o_buflen;
4257         int o_namelen = strlen(mlfd->mlfd_oldname);
4258         int n_namelen = strlen(mlfd->mlfd_newname);
4259         int diff = n_namelen - o_namelen;
4260         __u32 cmd = o_lcfg->lcfg_command;
4261         __u32 cnt = o_lcfg->lcfg_bufcount;
4262         int rc;
4263         int i;
4264         ENTRY;
4265
4266         /* buf[0] */
4267         o_buf = lustre_cfg_buf(o_lcfg, 0);
4268         o_buflen = o_lcfg->lcfg_buflens[0];
4269         if (contain_valid_fsname(o_buf, mlfd->mlfd_oldname, o_buflen,
4270                                  o_namelen)) {
4271                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4272                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4273                        o_buflen - o_namelen);
4274                 lustre_cfg_bufs_reset(n_bufs, n_buf);
4275                 n_buf += cfs_size_round(o_buflen + diff);
4276         } else {
4277                 lustre_cfg_bufs_reset(n_bufs, o_buflen != 0 ? o_buf : NULL);
4278         }
4279
4280         switch (cmd) {
4281         case LCFG_MARKER: {
4282                 struct cfg_marker *o_marker;
4283                 struct cfg_marker *n_marker;
4284                 int tgt_namelen;
4285
4286                 if (cnt != 2) {
4287                         CDEBUG(D_MGS, "Unknown cfg marker entry with %d "
4288                                "buffers\n", cnt);
4289                         RETURN(-EINVAL);
4290                 }
4291
4292                 /* buf[1] is marker */
4293                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4294                 o_buflen = o_lcfg->lcfg_buflens[1];
4295                 o_marker = (struct cfg_marker *)o_buf;
4296                 if (!contain_valid_fsname(o_marker->cm_tgtname,
4297                                           mlfd->mlfd_oldname,
4298                                           sizeof(o_marker->cm_tgtname),
4299                                           o_namelen)) {
4300                         lustre_cfg_bufs_set(n_bufs, 1, o_marker,
4301                                             sizeof(*o_marker));
4302                         break;
4303                 }
4304
4305                 n_marker = (struct cfg_marker *)n_buf;
4306                 *n_marker = *o_marker;
4307                 memcpy(n_marker->cm_tgtname, mlfd->mlfd_newname, n_namelen);
4308                 tgt_namelen = strlen(o_marker->cm_tgtname);
4309                 if (tgt_namelen > o_namelen)
4310                         memcpy(n_marker->cm_tgtname + n_namelen,
4311                                o_marker->cm_tgtname + o_namelen,
4312                                tgt_namelen - o_namelen);
4313                 n_marker->cm_tgtname[tgt_namelen + diff] = '\0';
4314                 lustre_cfg_bufs_set(n_bufs, 1, n_marker, sizeof(*n_marker));
4315                 break;
4316         }
4317         case LCFG_PARAM:
4318         case LCFG_SET_PARAM: {
4319                 for (i = 1; i < cnt; i++)
4320                         /* buf[i] is the param value, reuse it directly */
4321                         lustre_cfg_bufs_set(n_bufs, i,
4322                                             lustre_cfg_buf(o_lcfg, i),
4323                                             o_lcfg->lcfg_buflens[i]);
4324                 break;
4325         }
4326         case LCFG_POOL_NEW:
4327         case LCFG_POOL_ADD:
4328         case LCFG_POOL_REM:
4329         case LCFG_POOL_DEL: {
4330                 if (cnt < 3 || cnt > 4) {
4331                         CDEBUG(D_MGS, "Unknown cfg pool (%x) entry with %d "
4332                                "buffers\n", cmd, cnt);
4333                         RETURN(-EINVAL);
4334                 }
4335
4336                 /* buf[1] is fsname */
4337                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4338                 o_buflen = o_lcfg->lcfg_buflens[1];
4339                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4340                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4341                        o_buflen - o_namelen);
4342                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen + diff);
4343                 n_buf += cfs_size_round(o_buflen + diff);
4344
4345                 /* buf[2] is the pool name, reuse it directly */
4346                 lustre_cfg_bufs_set(n_bufs, 2, lustre_cfg_buf(o_lcfg, 2),
4347                                     o_lcfg->lcfg_buflens[2]);
4348
4349                 if (cnt == 3)
4350                         break;
4351
4352                 /* buf[3] is ostname */
4353                 o_buf = lustre_cfg_buf(o_lcfg, 3);
4354                 o_buflen = o_lcfg->lcfg_buflens[3];
4355                 memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4356                 memcpy(n_buf + n_namelen, o_buf + o_namelen,
4357                        o_buflen - o_namelen);
4358                 lustre_cfg_bufs_set(n_bufs, 3, n_buf, o_buflen + diff);
4359                 break;
4360         }
4361         case LCFG_SETUP: {
4362                 if (cnt == 2) {
4363                         o_buflen = o_lcfg->lcfg_buflens[1];
4364                         if (o_buflen == sizeof(struct lov_desc) ||
4365                             o_buflen == sizeof(struct lmv_desc)) {
4366                                 char *o_uuid;
4367                                 char *n_uuid;
4368                                 int uuid_len;
4369
4370                                 /* buf[1] */
4371                                 o_buf = lustre_cfg_buf(o_lcfg, 1);
4372                                 if (o_buflen == sizeof(struct lov_desc)) {
4373                                         struct lov_desc *o_desc =
4374                                                 (struct lov_desc *)o_buf;
4375                                         struct lov_desc *n_desc =
4376                                                 (struct lov_desc *)n_buf;
4377
4378                                         *n_desc = *o_desc;
4379                                         o_uuid = o_desc->ld_uuid.uuid;
4380                                         n_uuid = n_desc->ld_uuid.uuid;
4381                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4382                                 } else {
4383                                         struct lmv_desc *o_desc =
4384                                                 (struct lmv_desc *)o_buf;
4385                                         struct lmv_desc *n_desc =
4386                                                 (struct lmv_desc *)n_buf;
4387
4388                                         *n_desc = *o_desc;
4389                                         o_uuid = o_desc->ld_uuid.uuid;
4390                                         n_uuid = n_desc->ld_uuid.uuid;
4391                                         uuid_len = sizeof(o_desc->ld_uuid.uuid);
4392                                 }
4393
4394                                 if (unlikely(!contain_valid_fsname(o_uuid,
4395                                                 mlfd->mlfd_oldname, uuid_len,
4396                                                 o_namelen))) {
4397                                         lustre_cfg_bufs_set(n_bufs, 1, o_buf,
4398                                                             o_buflen);
4399                                         break;
4400                                 }
4401
4402                                 memcpy(n_uuid, mlfd->mlfd_newname, n_namelen);
4403                                 uuid_len = strlen(o_uuid);
4404                                 if (uuid_len > o_namelen)
4405                                         memcpy(n_uuid + n_namelen,
4406                                                o_uuid + o_namelen,
4407                                                uuid_len - o_namelen);
4408                                 n_uuid[uuid_len + diff] = '\0';
4409                                 lustre_cfg_bufs_set(n_bufs, 1, n_buf, o_buflen);
4410                                 break;
4411                         } /* else case fall through */
4412                 } /* else case fall through */
4413         }
4414         default: {
4415                 for (i = 1; i < cnt; i++) {
4416                         o_buflen = o_lcfg->lcfg_buflens[i];
4417                         if (o_buflen == 0)
4418                                 continue;
4419
4420                         o_buf = lustre_cfg_buf(o_lcfg, i);
4421                         if (!contain_valid_fsname(o_buf, mlfd->mlfd_oldname,
4422                                                   o_buflen, o_namelen)) {
4423                                 lustre_cfg_bufs_set(n_bufs, i, o_buf, o_buflen);
4424                                 continue;
4425                         }
4426
4427                         memcpy(n_buf, mlfd->mlfd_newname, n_namelen);
4428                         if (o_buflen == o_namelen) {
4429                                 lustre_cfg_bufs_set(n_bufs, i, n_buf,
4430                                                     n_namelen);
4431                                 n_buf += cfs_size_round(n_namelen);
4432                                 continue;
4433                         }
4434
4435                         memcpy(n_buf + n_namelen, o_buf + o_namelen,
4436                                o_buflen - o_namelen);
4437                         lustre_cfg_bufs_set(n_bufs, i, n_buf, o_buflen + diff);
4438                         n_buf += cfs_size_round(o_buflen + diff);
4439                 }
4440                 break;
4441         }
4442         }
4443
4444         lcr = lustre_cfg_rec_new(cmd, n_bufs);
4445         if (!lcr)
4446                 RETURN(-ENOMEM);
4447
4448         lcr->lcr_cfg = *o_lcfg;
4449         rc = llog_write(env, mlfd->mlfd_llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
4450         lustre_cfg_rec_free(lcr);
4451
4452         RETURN(rc);
4453 }
4454
4455 static int mgs_lcfg_fork_one(const struct lu_env *env, struct mgs_device *mgs,
4456                              struct mgs_direntry *mde, const char *oldname,
4457                              const char *newname)
4458 {
4459         struct llog_handle *old_llh = NULL;
4460         struct llog_handle *new_llh = NULL;
4461         struct llog_ctxt *ctxt = NULL;
4462         struct mgs_lcfg_fork_data *mlfd = NULL;
4463         char *name_buf = NULL;
4464         int name_buflen;
4465         int old_namelen = strlen(oldname);
4466         int new_namelen = strlen(newname);
4467         int rc;
4468         ENTRY;
4469
4470         name_buflen = mde->mde_len + new_namelen - old_namelen;
4471         OBD_ALLOC(name_buf, name_buflen);
4472         if (!name_buf)
4473                 RETURN(-ENOMEM);
4474
4475         memcpy(name_buf, newname, new_namelen);
4476         memcpy(name_buf + new_namelen, mde->mde_name + old_namelen,
4477                mde->mde_len - old_namelen);
4478
4479         CDEBUG(D_MGS, "Fork the config-log from %s to %s\n",
4480                mde->mde_name, name_buf);
4481
4482         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
4483         LASSERT(ctxt);
4484
4485         rc = llog_open_create(env, ctxt, &new_llh, NULL, name_buf);
4486         if (rc)
4487                 GOTO(out, rc);
4488
4489         rc = llog_init_handle(env, new_llh, LLOG_F_IS_PLAIN, NULL);
4490         if (rc)
4491                 GOTO(out, rc);
4492
4493         if (unlikely(mgs_log_is_empty(env, mgs, mde->mde_name)))
4494                 GOTO(out, rc = 0);
4495
4496         rc = llog_open(env, ctxt, &old_llh, NULL, mde->mde_name,
4497                        LLOG_OPEN_EXISTS);
4498         if (rc)
4499                 GOTO(out, rc);
4500
4501         rc = llog_init_handle(env, old_llh, LLOG_F_IS_PLAIN, NULL);
4502         if (rc)
4503                 GOTO(out, rc);
4504
4505         new_llh->lgh_hdr->llh_tgtuuid = old_llh->lgh_hdr->llh_tgtuuid;
4506
4507         OBD_ALLOC(mlfd, LLOG_MIN_CHUNK_SIZE);
4508         if (!mlfd)
4509                 GOTO(out, rc = -ENOMEM);
4510
4511         mlfd->mlfd_mgs = mgs;
4512         mlfd->mlfd_llh = new_llh;
4513         mlfd->mlfd_oldname = oldname;
4514         mlfd->mlfd_newname = newname;
4515
4516         rc = llog_process(env, old_llh, mgs_lcfg_fork_handler, mlfd, NULL);
4517         OBD_FREE(mlfd, LLOG_MIN_CHUNK_SIZE);
4518
4519         GOTO(out, rc);
4520
4521 out:
4522         if (old_llh)
4523                 llog_close(env, old_llh);
4524         if (new_llh)
4525                 llog_close(env, new_llh);
4526         if (name_buf)
4527                 OBD_FREE(name_buf, name_buflen);
4528         if (ctxt)
4529                 llog_ctxt_put(ctxt);
4530
4531         return rc;
4532 }
4533
4534 int mgs_lcfg_fork(const struct lu_env *env, struct mgs_device *mgs,
4535                   const char *oldname, const char *newname)
4536 {
4537         struct list_head log_list;
4538         struct mgs_direntry *dirent, *n;
4539         int olen = strlen(oldname);
4540         int nlen = strlen(newname);
4541         int count = 0;
4542         int rc = 0;
4543         ENTRY;
4544
4545         if (unlikely(!oldname || oldname[0] == '\0' ||
4546                      !newname || newname[0] == '\0'))
4547                 RETURN(-EINVAL);
4548
4549         if (strcmp(oldname, newname) == 0)
4550                 RETURN(-EINVAL);
4551
4552         /* lock it to prevent fork/erase/register in parallel. */
4553         mutex_lock(&mgs->mgs_mutex);
4554
4555         rc = class_dentry_readdir(env, mgs, &log_list);
4556         if (rc) {
4557                 mutex_unlock(&mgs->mgs_mutex);
4558                 RETURN(rc);
4559         }
4560
4561         if (list_empty(&log_list)) {
4562                 mutex_unlock(&mgs->mgs_mutex);
4563                 RETURN(-ENOENT);
4564         }
4565
4566         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4567                 char *ptr;
4568
4569                 ptr = strrchr(dirent->mde_name, '-');
4570                 if (ptr) {
4571                         int tlen = ptr - dirent->mde_name;
4572
4573                         if (tlen == nlen &&
4574                             strncmp(newname, dirent->mde_name, tlen) == 0)
4575                                 GOTO(out, rc = -EEXIST);
4576
4577                         if (tlen == olen &&
4578                             strncmp(oldname, dirent->mde_name, tlen) == 0)
4579                                 continue;
4580                 }
4581
4582                 list_del_init(&dirent->mde_list);
4583                 mgs_direntry_free(dirent);
4584         }
4585
4586         if (list_empty(&log_list)) {
4587                 mutex_unlock(&mgs->mgs_mutex);
4588                 RETURN(-ENOENT);
4589         }
4590
4591         list_for_each_entry(dirent, &log_list, mde_list) {
4592                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, newname);
4593                 if (rc)
4594                         break;
4595
4596                 count++;
4597         }
4598
4599 out:
4600         mutex_unlock(&mgs->mgs_mutex);
4601
4602         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4603                 list_del_init(&dirent->mde_list);
4604                 mgs_direntry_free(dirent);
4605         }
4606
4607         if (rc && count > 0)
4608                 mgs_erase_logs(env, mgs, newname);
4609
4610         RETURN(rc);
4611 }
4612
4613 int mgs_lcfg_erase(const struct lu_env *env, struct mgs_device *mgs,
4614                    const char *fsname)
4615 {
4616         int rc;
4617         ENTRY;
4618
4619         if (unlikely(!fsname || fsname[0] == '\0'))
4620                 RETURN(-EINVAL);
4621
4622         rc = mgs_erase_logs(env, mgs, fsname);
4623
4624         RETURN(rc);
4625 }
4626
4627 static int mgs_xattr_del(const struct lu_env *env, struct dt_object *obj)
4628 {
4629         struct dt_device *dev;
4630         struct thandle *th = NULL;
4631         int rc = 0;
4632
4633         ENTRY;
4634
4635         dev = container_of0(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
4636         th = dt_trans_create(env, dev);
4637         if (IS_ERR(th))
4638                 RETURN(PTR_ERR(th));
4639
4640         rc = dt_declare_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4641         if (rc)
4642                 GOTO(stop, rc);
4643
4644         rc = dt_trans_start_local(env, dev, th);
4645         if (rc)
4646                 GOTO(stop, rc);
4647
4648         dt_write_lock(env, obj, 0);
4649         rc = dt_xattr_del(env, obj, XATTR_TARGET_RENAME, th);
4650
4651         GOTO(unlock, rc);
4652
4653 unlock:
4654         dt_write_unlock(env, obj);
4655
4656 stop:
4657         dt_trans_stop(env, dev, th);
4658
4659         return rc;
4660 }
4661
4662 int mgs_lcfg_rename(const struct lu_env *env, struct mgs_device *mgs)
4663 {
4664         struct list_head log_list;
4665         struct mgs_direntry *dirent, *n;
4666         char fsname[16];
4667         struct lu_buf buf = {
4668                 .lb_buf = fsname,
4669                 .lb_len = sizeof(fsname)
4670         };
4671         int rc = 0;
4672
4673         ENTRY;
4674
4675         rc = class_dentry_readdir(env, mgs, &log_list);
4676         if (rc)
4677                 RETURN(rc);
4678
4679         if (list_empty(&log_list))
4680                 RETURN(0);
4681
4682         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4683                 struct dt_object *o = NULL;
4684                 char oldname[16];
4685                 char *ptr;
4686                 int len;
4687
4688                 list_del_init(&dirent->mde_list);
4689                 ptr = strrchr(dirent->mde_name, '-');
4690                 if (!ptr)
4691                         goto next;
4692
4693                 len = ptr - dirent->mde_name;
4694                 if (unlikely(len >= sizeof(oldname))) {
4695                         CDEBUG(D_MGS, "Skip invalid configuration file %s\n",
4696                                dirent->mde_name);
4697                         goto next;
4698                 }
4699
4700                 o = local_file_find(env, mgs->mgs_los, mgs->mgs_configs_dir,
4701                                     dirent->mde_name);
4702                 if (IS_ERR(o)) {
4703                         rc = PTR_ERR(o);
4704                         CDEBUG(D_MGS, "Fail to locate file %s: rc = %d\n",
4705                                dirent->mde_name, rc);
4706                         goto next;
4707                 }
4708
4709                 rc = dt_xattr_get(env, o, &buf, XATTR_TARGET_RENAME);
4710                 if (rc < 0) {
4711                         if (rc == -ENODATA)
4712                                 rc = 0;
4713                         else
4714                                 CDEBUG(D_MGS,
4715                                        "Fail to get EA for %s: rc = %d\n",
4716                                        dirent->mde_name, rc);
4717                         goto next;
4718                 }
4719
4720                 if (unlikely(rc == len &&
4721                              memcmp(fsname, dirent->mde_name, len) == 0)) {
4722                         /* The new fsname is the same as the old one. */
4723                         rc = mgs_xattr_del(env, o);
4724                         goto next;
4725                 }
4726
4727                 memcpy(oldname, dirent->mde_name, len);
4728                 oldname[len] = '\0';
4729                 fsname[rc] = '\0';
4730                 rc = mgs_lcfg_fork_one(env, mgs, dirent, oldname, fsname);
4731                 if (rc && rc != -EEXIST) {
4732                         CDEBUG(D_MGS, "Fail to fork %s: rc = %d\n",
4733                                dirent->mde_name, rc);
4734                         goto next;
4735                 }
4736
4737                 rc = mgs_erase_log(env, mgs, dirent->mde_name);
4738                 if (rc) {
4739                         CDEBUG(D_MGS, "Fail to erase old %s: rc = %d\n",
4740                                dirent->mde_name, rc);
4741                         /* keep it there if failed to remove it. */
4742                         rc = 0;
4743                 }
4744
4745 next:
4746                 if (o && !IS_ERR(o))
4747                         lu_object_put(env, &o->do_lu);
4748
4749                 mgs_direntry_free(dirent);
4750                 if (rc)
4751                         break;
4752         }
4753
4754         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
4755                 list_del_init(&dirent->mde_list);
4756                 mgs_direntry_free(dirent);
4757         }
4758
4759         RETURN(rc);
4760 }
4761
4762 /* Setup _mgs fsdb and log
4763  */
4764 int mgs__mgs_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
4765 {
4766         struct fs_db *fsdb = NULL;
4767         int rc;
4768         ENTRY;
4769
4770         rc = mgs_find_or_make_fsdb(env, mgs, MGSSELF_NAME, &fsdb);
4771         if (!rc)
4772                 mgs_put_fsdb(mgs, fsdb);
4773
4774         RETURN(rc);
4775 }
4776
4777 /* Setup params fsdb and log
4778  */
4779 int mgs_params_fsdb_setup(const struct lu_env *env, struct mgs_device *mgs)
4780 {
4781         struct fs_db *fsdb = NULL;
4782         struct llog_handle *params_llh = NULL;
4783         int rc;
4784         ENTRY;
4785
4786         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
4787         if (!rc) {
4788                 mutex_lock(&fsdb->fsdb_mutex);
4789                 rc = record_start_log(env, mgs, &params_llh, PARAMS_FILENAME);
4790                 if (!rc)
4791                         rc = record_end_log(env, &params_llh);
4792                 mutex_unlock(&fsdb->fsdb_mutex);
4793                 mgs_put_fsdb(mgs, fsdb);
4794         }
4795
4796         RETURN(rc);
4797 }
4798
4799 /* Cleanup params fsdb and log
4800  */
4801 int mgs_params_fsdb_cleanup(const struct lu_env *env, struct mgs_device *mgs)
4802 {
4803         int rc;
4804
4805         rc = mgs_erase_logs(env, mgs, PARAMS_FILENAME);
4806         return rc == -ENOENT ? 0 : rc;
4807 }
4808
4809 /**
4810  * Fill in the mgs_target_info based on data devname and param provide.
4811  *
4812  * @env         thread context
4813  * @mgs         mgs device
4814  * @mti         mgs target info. We want to set this based other paramters
4815  *              passed to this function. Once setup we write it to the config
4816  *              logs.
4817  * @devname     optional OBD device name
4818  * @param       string that contains both what tunable to set and the value to
4819  *              set it to.
4820  *
4821  * RETURN       0 for success
4822  *              negative error number on failure
4823  **/
4824 static int mgs_set_conf_param(const struct lu_env *env, struct mgs_device *mgs,
4825                               struct mgs_target_info *mti, const char *devname,
4826                               const char *param)
4827 {
4828         struct fs_db *fsdb = NULL;
4829         int dev_type;
4830         int rc = 0;
4831
4832         ENTRY;
4833         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
4834         if (!devname) {
4835                 size_t len;
4836
4837                 /* We have two possible cases here:
4838                  *
4839                  * 1) the device name embedded in the param:
4840                  *    lustre-OST0000.osc.max_dirty_mb=32
4841                  *
4842                  * 2) the file system name is embedded in
4843                  *    the param: lustre.sys.at.min=0
4844                  */
4845                 len = strcspn(param, ".=");
4846                 if (!len || param[len] == '=')
4847                         RETURN(-EINVAL);
4848
4849                 if (len >= sizeof(mti->mti_svname))
4850                         RETURN(-E2BIG);
4851
4852                 snprintf(mti->mti_svname, sizeof(mti->mti_svname),
4853                          "%.*s", (int)len, param);
4854                 param += len + 1;
4855         } else {
4856                 if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname)) >=
4857                     sizeof(mti->mti_svname))
4858                         RETURN(-E2BIG);
4859         }
4860
4861         if (!strlen(mti->mti_svname)) {
4862                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
4863                 RETURN(-ENOSYS);
4864         }
4865
4866         dev_type = mgs_parse_devname(mti->mti_svname, mti->mti_fsname,
4867                                      &mti->mti_stripe_index);
4868         switch (dev_type) {
4869         /* For this case we have an invalid obd device name */
4870         case -ENXIO:
4871                 CDEBUG(D_MGS, "%s don't contain an index\n", mti->mti_svname);
4872                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
4873                 dev_type = 0;
4874                 break;
4875         /* Not an obd device, assume devname is the fsname.
4876          * User might of only provided fsname and not obd device
4877          */
4878         case -EINVAL:
4879                 CDEBUG(D_MGS, "%s is seen as a file system name\n", mti->mti_svname);
4880                 strlcpy(mti->mti_fsname, mti->mti_svname, MTI_NAME_MAXLEN);
4881                 dev_type = 0;
4882                 break;
4883         default:
4884                 if (dev_type < 0)
4885                         GOTO(out, rc = dev_type);
4886
4887                 /* param related to llite isn't allowed to set by OST or MDT */
4888                 if (dev_type & LDD_F_SV_TYPE_OST ||
4889                     dev_type & LDD_F_SV_TYPE_MDT) {
4890                         /* param related to llite isn't allowed to set by OST
4891                          * or MDT
4892                          */
4893                         if (!strncmp(param, PARAM_LLITE,
4894                                      sizeof(PARAM_LLITE) - 1))
4895                                 GOTO(out, rc = -EINVAL);
4896
4897                         /* Strip -osc or -mdc suffix from svname */
4898                         if (server_make_name(dev_type, mti->mti_stripe_index,
4899                                              mti->mti_fsname, mti->mti_svname,
4900                                              sizeof(mti->mti_svname)))
4901                                 GOTO(out, rc = -EINVAL);
4902                 }
4903                 break;
4904         }
4905
4906         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
4907             sizeof(mti->mti_params))
4908                 GOTO(out, rc = -E2BIG);
4909
4910         CDEBUG(D_MGS, "set_conf_param fs='%s' device='%s' param='%s'\n",
4911                mti->mti_fsname, mti->mti_svname, mti->mti_params);
4912
4913         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
4914         if (rc)
4915                 GOTO(out, rc);
4916
4917         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
4918             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
4919                 CERROR("No filesystem targets for %s. cfg_device from lctl "
4920                        "is '%s'\n", mti->mti_fsname, mti->mti_svname);
4921                 mgs_unlink_fsdb(mgs, fsdb);
4922                 GOTO(out, rc = -EINVAL);
4923         }
4924
4925         /*
4926          * Revoke lock so everyone updates.  Should be alright if
4927          * someone was already reading while we were updating the logs,
4928          * so we don't really need to hold the lock while we're
4929          * writing (above).
4930          */
4931         mti->mti_flags = dev_type | LDD_F_PARAM;
4932         mutex_lock(&fsdb->fsdb_mutex);
4933         rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
4934         mutex_unlock(&fsdb->fsdb_mutex);
4935         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4936
4937 out:
4938         if (fsdb)
4939                 mgs_put_fsdb(mgs, fsdb);
4940
4941         RETURN(rc);
4942 }
4943
4944 static int mgs_set_param2(const struct lu_env *env, struct mgs_device *mgs,
4945                           struct mgs_target_info *mti, const char *param)
4946 {
4947         struct fs_db *fsdb = NULL;
4948         int dev_type;
4949         size_t len;
4950         int rc;
4951
4952         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params)) >=
4953             sizeof(mti->mti_params))
4954                 GOTO(out, rc = -E2BIG);
4955
4956         /* obdname2fsname reports devname as an obd device */
4957         len = strcspn(param, ".=");
4958         if (len && param[len] != '=') {
4959                 char *ptr;
4960
4961                 param += len + 1;
4962                 ptr = strchr(param, '.');
4963
4964                 len = strlen(param);
4965                 if (ptr)
4966                         len -= strlen(ptr);
4967                 if (len >= sizeof(mti->mti_svname))
4968                         GOTO(out, rc = -E2BIG);
4969
4970                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "%.*s",
4971                         (int)len, param);
4972
4973                 obdname2fsname(mti->mti_svname, mti->mti_fsname,
4974                                sizeof(mti->mti_fsname));
4975         } else {
4976                 snprintf(mti->mti_svname, sizeof(mti->mti_svname), "general");
4977         }
4978
4979         CDEBUG(D_MGS, "set_param2 fs='%s' device='%s' param='%s'\n",
4980                mti->mti_fsname, mti->mti_svname, mti->mti_params);
4981
4982         /* The return value should be the device type i.e LDD_F_SV_TYPE_XXX.
4983          * A returned error tells us we don't have a target obd device.
4984          */
4985         dev_type = server_name2index(mti->mti_svname, &mti->mti_stripe_index,
4986                                      NULL);
4987         if (dev_type < 0)
4988                 dev_type = 0;
4989
4990         /* the return value should be the device type i.e LDD_F_SV_TYPE_XXX.
4991          * Strip -osc or -mdc suffix from svname
4992          */
4993         if ((dev_type & LDD_F_SV_TYPE_OST || dev_type & LDD_F_SV_TYPE_MDT) &&
4994             server_make_name(dev_type, mti->mti_stripe_index,
4995                              mti->mti_fsname, mti->mti_svname,
4996                              sizeof(mti->mti_svname)))
4997                 GOTO(out, rc = -EINVAL);
4998
4999         rc = mgs_find_or_make_fsdb(env, mgs, PARAMS_FILENAME, &fsdb);
5000         if (rc)
5001                 GOTO(out, rc);
5002         /*
5003          * Revoke lock so everyone updates.  Should be alright if
5004          * someone was already reading while we were updating the logs,
5005          * so we don't really need to hold the lock while we're
5006          * writing (above).
5007          */
5008         mti->mti_flags = dev_type | LDD_F_PARAM2;
5009         mutex_lock(&fsdb->fsdb_mutex);
5010         rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
5011         mutex_unlock(&fsdb->fsdb_mutex);
5012         mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
5013         mgs_put_fsdb(mgs, fsdb);
5014 out:
5015         RETURN(rc);
5016 }
5017
5018 /* Set a permanent (config log) param for a target or fs
5019  *
5020  * @lcfg buf0 may contain the device (testfs-MDT0000) name
5021  *       buf1 contains the single parameter
5022  */
5023 int mgs_set_param(const struct lu_env *env, struct mgs_device *mgs,
5024                   struct lustre_cfg *lcfg)
5025 {
5026         const char *param = lustre_cfg_string(lcfg, 1);
5027         struct mgs_target_info *mti;
5028         int rc;
5029
5030         /* Create a fake mti to hold everything */
5031         OBD_ALLOC_PTR(mti);
5032         if (!mti)
5033                 return -ENOMEM;
5034
5035         print_lustre_cfg(lcfg);
5036
5037         if (lcfg->lcfg_command == LCFG_PARAM) {
5038                 /* For the case of lctl conf_param devname can be
5039                  * lustre, lustre-mdtlov, lustre-client, lustre-MDT0000
5040                  */
5041                 const char *devname = lustre_cfg_string(lcfg, 0);
5042
5043                 rc = mgs_set_conf_param(env, mgs, mti, devname, param);
5044         } else {
5045                 /* In the case of lctl set_param -P lcfg[0] will always
5046                  * be 'general'. At least for now.
5047                  */
5048                 rc = mgs_set_param2(env, mgs, mti, param);
5049         }
5050
5051         OBD_FREE_PTR(mti);
5052
5053         return rc;
5054 }
5055
5056 static int mgs_write_log_pool(const struct lu_env *env,
5057                               struct mgs_device *mgs, char *logname,
5058                               struct fs_db *fsdb, char *tgtname,
5059                               enum lcfg_command_type cmd,
5060                               char *fsname, char *poolname,
5061                               char *ostname, char *comment)
5062 {
5063         struct llog_handle *llh = NULL;
5064         int rc;
5065
5066         rc = record_start_log(env, mgs, &llh, logname);
5067         if (rc)
5068                 return rc;
5069         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
5070         if (rc)
5071                 goto out;
5072         rc = record_base(env, llh, tgtname, 0, cmd,
5073                          fsname, poolname, ostname, NULL);
5074         if (rc)
5075                 goto out;
5076         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
5077 out:
5078         record_end_log(env, &llh);
5079         return rc;
5080 }
5081
5082 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
5083                     enum lcfg_command_type cmd, const char *nodemap_name,
5084                     char *param)
5085 {
5086         lnet_nid_t      nid[2];
5087         __u32           idmap[2];
5088         bool            bool_switch;
5089         __u32           int_id;
5090         int             rc = 0;
5091         ENTRY;
5092
5093         switch (cmd) {
5094         case LCFG_NODEMAP_ADD:
5095                 rc = nodemap_add(nodemap_name);
5096                 break;
5097         case LCFG_NODEMAP_DEL:
5098                 rc = nodemap_del(nodemap_name);
5099                 break;
5100         case LCFG_NODEMAP_ADD_RANGE:
5101                 rc = nodemap_parse_range(param, nid);
5102                 if (rc != 0)
5103                         break;
5104                 rc = nodemap_add_range(nodemap_name, nid);
5105                 break;
5106         case LCFG_NODEMAP_DEL_RANGE:
5107                 rc = nodemap_parse_range(param, nid);
5108                 if (rc != 0)
5109                         break;
5110                 rc = nodemap_del_range(nodemap_name, nid);
5111                 break;
5112         case LCFG_NODEMAP_ADMIN:
5113                 bool_switch = simple_strtoul(param, NULL, 10);
5114                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
5115                 break;
5116         case LCFG_NODEMAP_DENY_UNKNOWN:
5117                 bool_switch = simple_strtoul(param, NULL, 10);
5118                 rc = nodemap_set_deny_unknown(nodemap_name, bool_switch);
5119                 break;
5120         case LCFG_NODEMAP_MAP_MODE:
5121                 if (strcmp("both", param) == 0)
5122                         rc = nodemap_set_mapping_mode(nodemap_name,
5123                                                       NODEMAP_MAP_BOTH);
5124                 else if (strcmp("uid_only", param) == 0)
5125                         rc = nodemap_set_mapping_mode(nodemap_name,
5126                                                       NODEMAP_MAP_UID_ONLY);
5127                 else if (strcmp("gid_only", param) == 0)
5128                         rc = nodemap_set_mapping_mode(nodemap_name,
5129                                                       NODEMAP_MAP_GID_ONLY);
5130                 else
5131                         rc = -EINVAL;
5132                 break;
5133         case LCFG_NODEMAP_TRUSTED:
5134                 bool_switch = simple_strtoul(param, NULL, 10);
5135                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
5136                 break;
5137         case LCFG_NODEMAP_SQUASH_UID:
5138                 int_id = simple_strtoul(param, NULL, 10);
5139                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
5140                 break;
5141         case LCFG_NODEMAP_SQUASH_GID:
5142                 int_id = simple_strtoul(param, NULL, 10);
5143                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
5144                 break;
5145         case LCFG_NODEMAP_ADD_UIDMAP:
5146         case LCFG_NODEMAP_ADD_GIDMAP:
5147                 rc = nodemap_parse_idmap(param, idmap);
5148                 if (rc != 0)
5149                         break;
5150                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
5151                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
5152                                                idmap);
5153                 else
5154                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
5155                                                idmap);
5156                 break;
5157         case LCFG_NODEMAP_DEL_UIDMAP:
5158         case LCFG_NODEMAP_DEL_GIDMAP:
5159                 rc = nodemap_parse_idmap(param, idmap);
5160                 if (rc != 0)
5161                         break;
5162                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
5163                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
5164                                                idmap);
5165                 else
5166                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
5167                                                idmap);
5168                 break;
5169         case LCFG_NODEMAP_SET_FILESET:
5170                 rc = nodemap_set_fileset(nodemap_name, param);
5171                 break;
5172         default:
5173                 rc = -EINVAL;
5174         }
5175
5176         RETURN(rc);
5177 }
5178
5179 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
5180                  enum lcfg_command_type cmd, char *fsname,
5181                  char *poolname, char *ostname)
5182 {
5183         struct fs_db *fsdb;
5184         char *lovname;
5185         char *logname;
5186         char *label = NULL, *canceled_label = NULL;
5187         int label_sz;
5188         struct mgs_target_info *mti = NULL;
5189         bool checked = false;
5190         bool locked = false;
5191         bool free = false;
5192         int rc, i;
5193         ENTRY;
5194
5195         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
5196         if (rc) {
5197                 CERROR("Can't get db for %s\n", fsname);
5198                 RETURN(rc);
5199         }
5200         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
5201                 CERROR("%s is not defined\n", fsname);
5202                 free = true;
5203                 GOTO(out_fsdb, rc = -EINVAL);
5204         }
5205
5206         label_sz = 10 + strlen(fsname) + strlen(poolname);
5207
5208         /* check if ostname match fsname */
5209         if (ostname != NULL) {
5210                 char *ptr;
5211
5212                 ptr = strrchr(ostname, '-');
5213                 if ((ptr == NULL) ||
5214                     (strncmp(fsname, ostname, ptr-ostname) != 0))
5215                         RETURN(-EINVAL);
5216                 label_sz += strlen(ostname);
5217         }
5218
5219         OBD_ALLOC(label, label_sz);
5220         if (!label)
5221                 GOTO(out_fsdb, rc = -ENOMEM);
5222
5223         switch(cmd) {
5224         case LCFG_POOL_NEW:
5225                 sprintf(label,
5226                         "new %s.%s", fsname, poolname);
5227                 break;
5228         case LCFG_POOL_ADD:
5229                 sprintf(label,
5230                         "add %s.%s.%s", fsname, poolname, ostname);
5231                 break;
5232         case LCFG_POOL_REM:
5233                 OBD_ALLOC(canceled_label, label_sz);
5234                 if (canceled_label == NULL)
5235                         GOTO(out_label, rc = -ENOMEM);
5236                 sprintf(label,
5237                         "rem %s.%s.%s", fsname, poolname, ostname);
5238                 sprintf(canceled_label,
5239                         "add %s.%s.%s", fsname, poolname, ostname);
5240                 break;
5241         case LCFG_POOL_DEL:
5242                 OBD_ALLOC(canceled_label, label_sz);
5243                 if (canceled_label == NULL)
5244                         GOTO(out_label, rc = -ENOMEM);
5245                 sprintf(label,
5246                         "del %s.%s", fsname, poolname);
5247                 sprintf(canceled_label,
5248                         "new %s.%s", fsname, poolname);
5249                 break;
5250         default:
5251                 break;
5252         }
5253
5254         OBD_ALLOC_PTR(mti);
5255         if (mti == NULL)
5256                 GOTO(out_cancel, rc = -ENOMEM);
5257         strncpy(mti->mti_svname, "lov pool", sizeof(mti->mti_svname));
5258
5259         mutex_lock(&fsdb->fsdb_mutex);
5260         locked = true;
5261         /* write pool def to all MDT logs */
5262         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
5263                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
5264                         rc = name_create_mdt_and_lov(&logname, &lovname,
5265                                                      fsdb, i);
5266                         if (rc)
5267                                 GOTO(out_mti, rc);
5268
5269                         if (!checked && (canceled_label == NULL)) {
5270                                 rc = mgs_check_marker(env, mgs, fsdb, mti,
5271                                                 logname, lovname, label);
5272                                 if (rc) {
5273                                         name_destroy(&logname);
5274                                         name_destroy(&lovname);
5275                                         GOTO(out_mti,
5276                                                 rc = (rc == LLOG_PROC_BREAK ?
5277                                                         -EEXIST : rc));
5278                                 }
5279                                 checked = true;
5280                         }
5281                         if (canceled_label != NULL)
5282                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5283                                                 lovname, canceled_label,
5284                                                 CM_SKIP);
5285
5286                         if (rc >= 0)
5287                                 rc = mgs_write_log_pool(env, mgs, logname,
5288                                                         fsdb, lovname, cmd,
5289                                                         fsname, poolname,
5290                                                         ostname, label);
5291                         name_destroy(&logname);
5292                         name_destroy(&lovname);
5293                         if (rc)
5294                                 GOTO(out_mti, rc);
5295                 }
5296         }
5297
5298         rc = name_create(&logname, fsname, "-client");
5299         if (rc)
5300                 GOTO(out_mti, rc);
5301
5302         if (!checked && (canceled_label == NULL)) {
5303                 rc = mgs_check_marker(env, mgs, fsdb, mti, logname,
5304                                 fsdb->fsdb_clilov, label);
5305                 if (rc) {
5306                         name_destroy(&logname);
5307                         GOTO(out_mti, rc = (rc == LLOG_PROC_BREAK ?
5308                                 -EEXIST : rc));
5309                 }
5310         }
5311         if (canceled_label != NULL) {
5312                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
5313                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
5314                 if (rc < 0) {
5315                         name_destroy(&logname);
5316                         GOTO(out_mti, rc);
5317                 }
5318         }
5319
5320         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
5321                                 cmd, fsname, poolname, ostname, label);
5322         mutex_unlock(&fsdb->fsdb_mutex);
5323         locked = false;
5324         name_destroy(&logname);
5325         /* request for update */
5326         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
5327
5328         GOTO(out_mti, rc);
5329
5330 out_mti:
5331         if (locked)
5332                 mutex_unlock(&fsdb->fsdb_mutex);
5333         if (mti != NULL)
5334                 OBD_FREE_PTR(mti);
5335 out_cancel:
5336         if (canceled_label != NULL)
5337                 OBD_FREE(canceled_label, label_sz);
5338 out_label:
5339         OBD_FREE(label, label_sz);
5340 out_fsdb:
5341         if (free)
5342                 mgs_unlink_fsdb(mgs, fsdb);
5343         mgs_put_fsdb(mgs, fsdb);
5344
5345         return rc;
5346 }