Whamcloud - gitweb
LU-7649 mgs: skip single OST conf update
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(ERR_PTR(-EINVAL));
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(ERR_PTR(-ENOMEM));
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351                 fsdb->fsdb_mgs = mgs;
352         } else {
353                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
354                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
355                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
356                         CERROR("No memory for index maps\n");
357                         GOTO(err, rc = -ENOMEM);
358                 }
359
360                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
361                 if (rc)
362                         GOTO(err, rc);
363                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
364                 if (rc)
365                         GOTO(err, rc);
366
367                 /* initialise data for NID table */
368                 mgs_ir_init_fs(env, mgs, fsdb);
369
370                 lproc_mgs_add_live(mgs, fsdb);
371         }
372
373         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
374
375         RETURN(fsdb);
376 err:
377         if (fsdb->fsdb_ost_index_map)
378                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
379         if (fsdb->fsdb_mdt_index_map)
380                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
381         name_destroy(&fsdb->fsdb_clilov);
382         name_destroy(&fsdb->fsdb_clilmv);
383         OBD_FREE_PTR(fsdb);
384         RETURN(ERR_PTR(rc));
385 }
386
387 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
388 {
389         /* wait for anyone with the sem */
390         mutex_lock(&fsdb->fsdb_mutex);
391         lproc_mgs_del_live(mgs, fsdb);
392         list_del(&fsdb->fsdb_list);
393
394         /* deinitialize fsr */
395         mgs_ir_fini_fs(mgs, fsdb);
396
397         if (fsdb->fsdb_ost_index_map)
398                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
399         if (fsdb->fsdb_mdt_index_map)
400                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
401         name_destroy(&fsdb->fsdb_clilov);
402         name_destroy(&fsdb->fsdb_clilmv);
403         mgs_free_fsdb_srpc(fsdb);
404         mutex_unlock(&fsdb->fsdb_mutex);
405         OBD_FREE_PTR(fsdb);
406 }
407
408 int mgs_init_fsdb_list(struct mgs_device *mgs)
409 {
410         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
411         return 0;
412 }
413
414 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
415 {
416         struct fs_db *fsdb;
417         struct list_head *tmp, *tmp2;
418
419         mutex_lock(&mgs->mgs_mutex);
420         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
421                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
422                 mgs_free_fsdb(mgs, fsdb);
423         }
424         mutex_unlock(&mgs->mgs_mutex);
425         return 0;
426 }
427
428 int mgs_find_or_make_fsdb(const struct lu_env *env,
429                           struct mgs_device *mgs, char *name,
430                           struct fs_db **dbh)
431 {
432         struct fs_db *fsdb;
433         int rc = 0;
434
435         ENTRY;
436         mutex_lock(&mgs->mgs_mutex);
437         fsdb = mgs_find_fsdb(mgs, name);
438         if (fsdb) {
439                 mutex_unlock(&mgs->mgs_mutex);
440                 *dbh = fsdb;
441                 RETURN(0);
442         }
443
444         CDEBUG(D_MGS, "Creating new db\n");
445         fsdb = mgs_new_fsdb(env, mgs, name);
446         /* lock fsdb_mutex until the db is loaded from llogs */
447         if (!IS_ERR(fsdb))
448                 mutex_lock(&fsdb->fsdb_mutex);
449         mutex_unlock(&mgs->mgs_mutex);
450         if (IS_ERR(fsdb))
451                 RETURN(PTR_ERR(fsdb));
452
453         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
454                 /* populate the db from the client llog */
455                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
456                 if (rc) {
457                         CERROR("Can't get db from client log %d\n", rc);
458                         GOTO(out_free, rc);
459                 }
460         }
461
462         /* populate srpc rules from params llog */
463         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
464         if (rc) {
465                 CERROR("Can't get db from params log %d\n", rc);
466                 GOTO(out_free, rc);
467         }
468
469         mutex_unlock(&fsdb->fsdb_mutex);
470         *dbh = fsdb;
471
472         RETURN(0);
473
474 out_free:
475         mutex_unlock(&fsdb->fsdb_mutex);
476         mgs_free_fsdb(mgs, fsdb);
477         return rc;
478 }
479
480 /* 1 = index in use
481    0 = index unused
482    -1= empty client log */
483 int mgs_check_index(const struct lu_env *env,
484                     struct mgs_device *mgs,
485                     struct mgs_target_info *mti)
486 {
487         struct fs_db *fsdb;
488         void *imap;
489         int rc = 0;
490         ENTRY;
491
492         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
493
494         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
495         if (rc) {
496                 CERROR("Can't get db for %s\n", mti->mti_fsname);
497                 RETURN(rc);
498         }
499
500         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
501                 RETURN(-1);
502
503         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
504                 imap = fsdb->fsdb_ost_index_map;
505         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
506                 imap = fsdb->fsdb_mdt_index_map;
507         else
508                 RETURN(-EINVAL);
509
510         if (test_bit(mti->mti_stripe_index, imap))
511                 RETURN(1);
512         RETURN(0);
513 }
514
515 static __inline__ int next_index(void *index_map, int map_len)
516 {
517         int i;
518         for (i = 0; i < map_len * 8; i++)
519                 if (!test_bit(i, index_map)) {
520                          return i;
521                  }
522         CERROR("max index %d exceeded.\n", i);
523         return -1;
524 }
525
526 /* Return codes:
527         0  newly marked as in use
528         <0 err
529         +EALREADY for update of an old index */
530 static int mgs_set_index(const struct lu_env *env,
531                          struct mgs_device *mgs,
532                          struct mgs_target_info *mti)
533 {
534         struct fs_db *fsdb;
535         void *imap;
536         int rc = 0;
537         ENTRY;
538
539         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
540         if (rc) {
541                 CERROR("Can't get db for %s\n", mti->mti_fsname);
542                 RETURN(rc);
543         }
544
545         mutex_lock(&fsdb->fsdb_mutex);
546         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
547                 imap = fsdb->fsdb_ost_index_map;
548         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
549                 imap = fsdb->fsdb_mdt_index_map;
550         } else {
551                 GOTO(out_up, rc = -EINVAL);
552         }
553
554         if (mti->mti_flags & LDD_F_NEED_INDEX) {
555                 rc = next_index(imap, INDEX_MAP_SIZE);
556                 if (rc == -1)
557                         GOTO(out_up, rc = -ERANGE);
558                 mti->mti_stripe_index = rc;
559                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
560                         fsdb->fsdb_mdt_count ++;
561         }
562
563         /* the last index(0xffff) is reserved for default value. */
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
566                                    "but index must be less than %u.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8 - 1);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 rc = llog_write(env, llh, rec, rec->lrh_index);
648                 if (!rc)
649                          mml->mml_modified++;
650         }
651
652         RETURN(rc);
653 }
654
655 /**
656  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
657  * Return code:
658  * 0 - modified successfully,
659  * 1 - no modification was done
660  * negative - error
661  */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct llog_ctxt *ctxt;
668         struct mgs_modify_lookup *mml;
669         int rc;
670
671         ENTRY;
672
673         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
674         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
675                flags);
676
677         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
678         LASSERT(ctxt != NULL);
679         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
680         if (rc < 0) {
681                 if (rc == -ENOENT)
682                         rc = 0;
683                 GOTO(out_pop, rc);
684         }
685
686         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
687         if (rc)
688                 GOTO(out_close, rc);
689
690         if (llog_get_size(loghandle) <= 1)
691                 GOTO(out_close, rc = 0);
692
693         OBD_ALLOC_PTR(mml);
694         if (!mml)
695                 GOTO(out_close, rc = -ENOMEM);
696         if (strlcpy(mml->mml_marker.cm_comment, comment,
697                     sizeof(mml->mml_marker.cm_comment)) >=
698             sizeof(mml->mml_marker.cm_comment))
699                 GOTO(out_free, rc = -E2BIG);
700         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
701                     sizeof(mml->mml_marker.cm_tgtname)) >=
702             sizeof(mml->mml_marker.cm_tgtname))
703                 GOTO(out_free, rc = -E2BIG);
704         /* Modify mostly means cancel */
705         mml->mml_marker.cm_flags = flags;
706         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
707         mml->mml_modified = 0;
708         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
709                           NULL);
710         if (!rc && !mml->mml_modified)
711                 rc = 1;
712
713 out_free:
714         OBD_FREE_PTR(mml);
715
716 out_close:
717         llog_close(env, loghandle);
718 out_pop:
719         if (rc < 0)
720                 CERROR("%s: modify %s/%s failed: rc = %d\n",
721                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
722         llog_ctxt_put(ctxt);
723         RETURN(rc);
724 }
725
726 /** This structure is passed to mgs_replace_handler */
727 struct mgs_replace_uuid_lookup {
728         /* Nids are replaced for this target device */
729         struct mgs_target_info target;
730         /* Temporary modified llog */
731         struct llog_handle *temp_llh;
732         /* Flag is set if in target block*/
733         int in_target_device;
734         /* Nids already added. Just skip (multiple nids) */
735         int device_nids_added;
736         /* Flag is set if this block should not be copied */
737         int skip_it;
738 };
739
740 /**
741  * Check: a) if block should be skipped
742  * b) is it target block
743  *
744  * \param[in] lcfg
745  * \param[in] mrul
746  *
747  * \retval 0 should not to be skipped
748  * \retval 1 should to be skipped
749  */
750 static int check_markers(struct lustre_cfg *lcfg,
751                          struct mgs_replace_uuid_lookup *mrul)
752 {
753          struct cfg_marker *marker;
754
755         /* Track markers. Find given device */
756         if (lcfg->lcfg_command == LCFG_MARKER) {
757                 marker = lustre_cfg_buf(lcfg, 1);
758                 /* Clean llog from records marked as CM_EXCLUDE.
759                    CM_SKIP records are used for "active" command
760                    and can be restored if needed */
761                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
762                     (CM_EXCLUDE | CM_START)) {
763                         mrul->skip_it = 1;
764                         return 1;
765                 }
766
767                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
768                     (CM_EXCLUDE | CM_END)) {
769                         mrul->skip_it = 0;
770                         return 1;
771                 }
772
773                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
774                         LASSERT(!(marker->cm_flags & CM_START) ||
775                                 !(marker->cm_flags & CM_END));
776                         if (marker->cm_flags & CM_START) {
777                                 mrul->in_target_device = 1;
778                                 mrul->device_nids_added = 0;
779                         } else if (marker->cm_flags & CM_END)
780                                 mrul->in_target_device = 0;
781                 }
782         }
783
784         return 0;
785 }
786
787 static int record_base(const struct lu_env *env, struct llog_handle *llh,
788                      char *cfgname, lnet_nid_t nid, int cmd,
789                      char *s1, char *s2, char *s3, char *s4)
790 {
791         struct mgs_thread_info  *mgi = mgs_env_info(env);
792         struct llog_cfg_rec     *lcr;
793         int rc;
794
795         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
796                cmd, s1, s2, s3, s4);
797
798         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
799         if (s1)
800                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
801         if (s2)
802                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
803         if (s3)
804                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
805         if (s4)
806                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
807
808         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
809         if (lcr == NULL)
810                 return -ENOMEM;
811
812         lcr->lcr_cfg.lcfg_nid = nid;
813         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
814
815         lustre_cfg_rec_free(lcr);
816
817         if (rc < 0)
818                 CDEBUG(D_MGS,
819                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
820                        cfgname, cmd, s1, s2, s3, s4, rc);
821         return rc;
822 }
823
824 static inline int record_add_uuid(const struct lu_env *env,
825                                   struct llog_handle *llh,
826                                   uint64_t nid, char *uuid)
827 {
828         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid,
829                            NULL, NULL, NULL);
830 }
831
832 static inline int record_add_conn(const struct lu_env *env,
833                                   struct llog_handle *llh,
834                                   char *devname, char *uuid)
835 {
836         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid,
837                            NULL, NULL, NULL);
838 }
839
840 static inline int record_attach(const struct lu_env *env,
841                                 struct llog_handle *llh, char *devname,
842                                 char *type, char *uuid)
843 {
844         return record_base(env, llh, devname, 0, LCFG_ATTACH, type, uuid,
845                            NULL, NULL);
846 }
847
848 static inline int record_setup(const struct lu_env *env,
849                                struct llog_handle *llh, char *devname,
850                                char *s1, char *s2, char *s3, char *s4)
851 {
852         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
853 }
854
855 /**
856  * \retval <0 record processing error
857  * \retval n record is processed. No need copy original one.
858  * \retval 0 record is not processed.
859  */
860 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
861                            struct mgs_replace_uuid_lookup *mrul)
862 {
863         int nids_added = 0;
864         lnet_nid_t nid;
865         char *ptr;
866         int rc;
867
868         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
869                 /* LCFG_ADD_UUID command found. Let's skip original command
870                    and add passed nids */
871                 ptr = mrul->target.mti_params;
872                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
873                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
874                                "device %s\n", libcfs_nid2str(nid),
875                                 mrul->target.mti_params,
876                                 mrul->target.mti_svname);
877                         rc = record_add_uuid(env,
878                                              mrul->temp_llh, nid,
879                                              mrul->target.mti_params);
880                         if (!rc)
881                                 nids_added++;
882                 }
883
884                 if (nids_added == 0) {
885                         CERROR("No new nids were added, nid %s with uuid %s, "
886                                "device %s\n", libcfs_nid2str(nid),
887                                mrul->target.mti_params,
888                                mrul->target.mti_svname);
889                         RETURN(-ENXIO);
890                 } else {
891                         mrul->device_nids_added = 1;
892                 }
893
894                 return nids_added;
895         }
896
897         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
898                 /* LCFG_SETUP command found. UUID should be changed */
899                 rc = record_setup(env,
900                                   mrul->temp_llh,
901                                   /* devname the same */
902                                   lustre_cfg_string(lcfg, 0),
903                                   /* s1 is not changed */
904                                   lustre_cfg_string(lcfg, 1),
905                                   /* new uuid should be
906                                   the full nidlist */
907                                   mrul->target.mti_params,
908                                   /* s3 is not changed */
909                                   lustre_cfg_string(lcfg, 3),
910                                   /* s4 is not changed */
911                                   lustre_cfg_string(lcfg, 4));
912                 return rc ? rc : 1;
913         }
914
915         /* Another commands in target device block */
916         return 0;
917 }
918
919 /**
920  * Handler that called for every record in llog.
921  * Records are processed in order they placed in llog.
922  *
923  * \param[in] llh       log to be processed
924  * \param[in] rec       current record
925  * \param[in] data      mgs_replace_uuid_lookup structure
926  *
927  * \retval 0    success
928  */
929 static int mgs_replace_handler(const struct lu_env *env,
930                                struct llog_handle *llh,
931                                struct llog_rec_hdr *rec,
932                                void *data)
933 {
934         struct mgs_replace_uuid_lookup *mrul;
935         struct lustre_cfg *lcfg = REC_DATA(rec);
936         int cfg_len = REC_DATA_LEN(rec);
937         int rc;
938         ENTRY;
939
940         mrul = (struct mgs_replace_uuid_lookup *)data;
941
942         if (rec->lrh_type != OBD_CFG_REC) {
943                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
944                        rec->lrh_type, lcfg->lcfg_command,
945                        lustre_cfg_string(lcfg, 0),
946                        lustre_cfg_string(lcfg, 1));
947                 RETURN(-EINVAL);
948         }
949
950         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
951         if (rc) {
952                 /* Do not copy any invalidated records */
953                 GOTO(skip_out, rc = 0);
954         }
955
956         rc = check_markers(lcfg, mrul);
957         if (rc || mrul->skip_it)
958                 GOTO(skip_out, rc = 0);
959
960         /* Write to new log all commands outside target device block */
961         if (!mrul->in_target_device)
962                 GOTO(copy_out, rc = 0);
963
964         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
965            (failover nids) for this target, assuming that if then
966            primary is changing then so is the failover */
967         if (mrul->device_nids_added &&
968             (lcfg->lcfg_command == LCFG_ADD_UUID ||
969              lcfg->lcfg_command == LCFG_ADD_CONN))
970                 GOTO(skip_out, rc = 0);
971
972         rc = process_command(env, lcfg, mrul);
973         if (rc < 0)
974                 RETURN(rc);
975
976         if (rc)
977                 RETURN(0);
978 copy_out:
979         /* Record is placed in temporary llog as is */
980         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
981
982         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
983                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
984                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
985         RETURN(rc);
986
987 skip_out:
988         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
989                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
990                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
991         RETURN(rc);
992 }
993
994 static int mgs_log_is_empty(const struct lu_env *env,
995                             struct mgs_device *mgs, char *name)
996 {
997         struct llog_ctxt        *ctxt;
998         int                      rc;
999
1000         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1001         LASSERT(ctxt != NULL);
1002
1003         rc = llog_is_empty(env, ctxt, name);
1004         llog_ctxt_put(ctxt);
1005         return rc;
1006 }
1007
1008 static int mgs_replace_nids_log(const struct lu_env *env,
1009                                 struct obd_device *mgs, struct fs_db *fsdb,
1010                                 char *logname, char *devname, char *nids)
1011 {
1012         struct llog_handle *orig_llh, *backup_llh;
1013         struct llog_ctxt *ctxt;
1014         struct mgs_replace_uuid_lookup *mrul;
1015         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1016         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1017         char *backup;
1018         int rc, rc2;
1019         ENTRY;
1020
1021         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1022
1023         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1024         LASSERT(ctxt != NULL);
1025
1026         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1027                 /* Log is empty. Nothing to replace */
1028                 GOTO(out_put, rc = 0);
1029         }
1030
1031         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1032         if (backup == NULL)
1033                 GOTO(out_put, rc = -ENOMEM);
1034
1035         sprintf(backup, "%s.bak", logname);
1036
1037         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1038         if (rc == 0) {
1039                 /* Now erase original log file. Connections are not allowed.
1040                    Backup is already saved */
1041                 rc = llog_erase(env, ctxt, NULL, logname);
1042                 if (rc < 0)
1043                         GOTO(out_free, rc);
1044         } else if (rc != -ENOENT) {
1045                 CERROR("%s: can't make backup for %s: rc = %d\n",
1046                        mgs->obd_name, logname, rc);
1047                 GOTO(out_free,rc);
1048         }
1049
1050         /* open local log */
1051         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1052         if (rc)
1053                 GOTO(out_restore, rc);
1054
1055         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1056         if (rc)
1057                 GOTO(out_closel, rc);
1058
1059         /* open backup llog */
1060         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1061                        LLOG_OPEN_EXISTS);
1062         if (rc)
1063                 GOTO(out_closel, rc);
1064
1065         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1066         if (rc)
1067                 GOTO(out_close, rc);
1068
1069         if (llog_get_size(backup_llh) <= 1)
1070                 GOTO(out_close, rc = 0);
1071
1072         OBD_ALLOC_PTR(mrul);
1073         if (!mrul)
1074                 GOTO(out_close, rc = -ENOMEM);
1075         /* devname is only needed information to replace UUID records */
1076         strlcpy(mrul->target.mti_svname, devname,
1077                 sizeof(mrul->target.mti_svname));
1078         /* parse nids later */
1079         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1080         /* Copy records to this temporary llog */
1081         mrul->temp_llh = orig_llh;
1082
1083         rc = llog_process(env, backup_llh, mgs_replace_handler,
1084                           (void *)mrul, NULL);
1085         OBD_FREE_PTR(mrul);
1086 out_close:
1087         rc2 = llog_close(NULL, backup_llh);
1088         if (!rc)
1089                 rc = rc2;
1090 out_closel:
1091         rc2 = llog_close(NULL, orig_llh);
1092         if (!rc)
1093                 rc = rc2;
1094
1095 out_restore:
1096         if (rc) {
1097                 CERROR("%s: llog should be restored: rc = %d\n",
1098                        mgs->obd_name, rc);
1099                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1100                                   logname);
1101                 if (rc2 < 0)
1102                         CERROR("%s: can't restore backup %s: rc = %d\n",
1103                                mgs->obd_name, logname, rc2);
1104         }
1105
1106 out_free:
1107         OBD_FREE(backup, strlen(backup) + 1);
1108
1109 out_put:
1110         llog_ctxt_put(ctxt);
1111
1112         if (rc)
1113                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1114                        mgs->obd_name, logname, rc);
1115
1116         RETURN(rc);
1117 }
1118
1119 /**
1120  * Parse device name and get file system name and/or device index
1121  *
1122  * \param[in]   devname device name (ex. lustre-MDT0000)
1123  * \param[out]  fsname  file system name(optional)
1124  * \param[out]  index   device index(optional)
1125  *
1126  * \retval 0    success
1127  */
1128 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1129 {
1130         int rc;
1131         ENTRY;
1132
1133         /* Extract fsname */
1134         if (fsname) {
1135                 rc = server_name2fsname(devname, fsname, NULL);
1136                 if (rc < 0) {
1137                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1138                                devname);
1139                         RETURN(-EINVAL);
1140                 }
1141         }
1142
1143         if (index) {
1144                 rc = server_name2index(devname, index, NULL);
1145                 if (rc < 0) {
1146                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1147                                devname);
1148                         RETURN(-EINVAL);
1149                 }
1150         }
1151
1152         RETURN(0);
1153 }
1154
1155 /* This is only called during replace_nids */
1156 static int only_mgs_is_running(struct obd_device *mgs_obd)
1157 {
1158         /* TDB: Is global variable with devices count exists? */
1159         int num_devices = get_devices_count();
1160         int num_exports = 0;
1161         struct obd_export *exp;
1162
1163         spin_lock(&mgs_obd->obd_dev_lock);
1164         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1165                 /* skip self export */
1166                 if (exp == mgs_obd->obd_self_export)
1167                         continue;
1168                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1169                         continue;
1170
1171                 ++num_exports;
1172
1173                 CERROR("%s: node %s still connected during replace_nids "
1174                        "connect_flags:%llx\n",
1175                        mgs_obd->obd_name,
1176                        libcfs_nid2str(exp->exp_nid_stats->nid),
1177                        exp_connect_flags(exp));
1178
1179         }
1180         spin_unlock(&mgs_obd->obd_dev_lock);
1181
1182         /* osd, MGS and MGC + self_export
1183            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1184         return (num_devices <= 3) && (num_exports == 0);
1185 }
1186
1187 static int name_create_mdt(char **logname, char *fsname, int i)
1188 {
1189         char mdt_index[9];
1190
1191         sprintf(mdt_index, "-MDT%04x", i);
1192         return name_create(logname, fsname, mdt_index);
1193 }
1194
1195 /**
1196  * Replace nids for \a device to \a nids values
1197  *
1198  * \param obd           MGS obd device
1199  * \param devname       nids need to be replaced for this device
1200  * (ex. lustre-OST0000)
1201  * \param nids          nids list (ex. nid1,nid2,nid3)
1202  *
1203  * \retval 0    success
1204  */
1205 int mgs_replace_nids(const struct lu_env *env,
1206                      struct mgs_device *mgs,
1207                      char *devname, char *nids)
1208 {
1209         /* Assume fsname is part of device name */
1210         char fsname[MTI_NAME_MAXLEN];
1211         int rc;
1212         __u32 index;
1213         char *logname;
1214         struct fs_db *fsdb;
1215         unsigned int i;
1216         int conn_state;
1217         struct obd_device *mgs_obd = mgs->mgs_obd;
1218         ENTRY;
1219
1220         /* We can only change NIDs if no other nodes are connected */
1221         spin_lock(&mgs_obd->obd_dev_lock);
1222         conn_state = mgs_obd->obd_no_conn;
1223         mgs_obd->obd_no_conn = 1;
1224         spin_unlock(&mgs_obd->obd_dev_lock);
1225
1226         /* We can not change nids if not only MGS is started */
1227         if (!only_mgs_is_running(mgs_obd)) {
1228                 CERROR("Only MGS is allowed to be started\n");
1229                 GOTO(out, rc = -EINPROGRESS);
1230         }
1231
1232         /* Get fsname and index*/
1233         rc = mgs_parse_devname(devname, fsname, &index);
1234         if (rc)
1235                 GOTO(out, rc);
1236
1237         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1238         if (rc) {
1239                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1240                 GOTO(out, rc);
1241         }
1242
1243         /* Process client llogs */
1244         name_create(&logname, fsname, "-client");
1245         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1246         name_destroy(&logname);
1247         if (rc) {
1248                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1249                        fsname, devname, rc);
1250                 GOTO(out, rc);
1251         }
1252
1253         /* Process MDT llogs */
1254         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1255                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1256                         continue;
1257                 name_create_mdt(&logname, fsname, i);
1258                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1259                 name_destroy(&logname);
1260                 if (rc)
1261                         GOTO(out, rc);
1262         }
1263
1264 out:
1265         spin_lock(&mgs_obd->obd_dev_lock);
1266         mgs_obd->obd_no_conn = conn_state;
1267         spin_unlock(&mgs_obd->obd_dev_lock);
1268
1269         RETURN(rc);
1270 }
1271
1272 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1273                             char *devname, struct lov_desc *desc)
1274 {
1275         struct mgs_thread_info  *mgi = mgs_env_info(env);
1276         struct llog_cfg_rec     *lcr;
1277         int rc;
1278
1279         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1280         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1281         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1282         if (lcr == NULL)
1283                 return -ENOMEM;
1284
1285         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1286         lustre_cfg_rec_free(lcr);
1287         return rc;
1288 }
1289
1290 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1291                             char *devname, struct lmv_desc *desc)
1292 {
1293         struct mgs_thread_info  *mgi = mgs_env_info(env);
1294         struct llog_cfg_rec     *lcr;
1295         int rc;
1296
1297         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1298         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1299         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1300         if (lcr == NULL)
1301                 return -ENOMEM;
1302
1303         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1304         lustre_cfg_rec_free(lcr);
1305         return rc;
1306 }
1307
1308 static inline int record_mdc_add(const struct lu_env *env,
1309                                  struct llog_handle *llh,
1310                                  char *logname, char *mdcuuid,
1311                                  char *mdtuuid, char *index,
1312                                  char *gen)
1313 {
1314         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1315                            mdtuuid,index,gen,mdcuuid);
1316 }
1317
1318 static inline int record_lov_add(const struct lu_env *env,
1319                                  struct llog_handle *llh,
1320                                  char *lov_name, char *ost_uuid,
1321                                  char *index, char *gen)
1322 {
1323         return record_base(env, llh, lov_name, 0, LCFG_LOV_ADD_OBD,
1324                            ost_uuid, index, gen, NULL);
1325 }
1326
1327 static inline int record_mount_opt(const struct lu_env *env,
1328                                    struct llog_handle *llh,
1329                                    char *profile, char *lov_name,
1330                                    char *mdc_name)
1331 {
1332         return record_base(env, llh, NULL, 0, LCFG_MOUNTOPT,
1333                            profile, lov_name, mdc_name, NULL);
1334 }
1335
1336 static int record_marker(const struct lu_env *env,
1337                          struct llog_handle *llh,
1338                          struct fs_db *fsdb, __u32 flags,
1339                          char *tgtname, char *comment)
1340 {
1341         struct mgs_thread_info *mgi = mgs_env_info(env);
1342         struct llog_cfg_rec *lcr;
1343         int rc;
1344         int cplen = 0;
1345
1346         if (flags & CM_START)
1347                 fsdb->fsdb_gen++;
1348         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1349         mgi->mgi_marker.cm_flags = flags;
1350         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1351         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1352                         sizeof(mgi->mgi_marker.cm_tgtname));
1353         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1354                 return -E2BIG;
1355         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1356                         sizeof(mgi->mgi_marker.cm_comment));
1357         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1358                 return -E2BIG;
1359         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1360         mgi->mgi_marker.cm_canceltime = 0;
1361         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1362         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1363                             sizeof(mgi->mgi_marker));
1364         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1365         if (lcr == NULL)
1366                 return -ENOMEM;
1367
1368         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1369         lustre_cfg_rec_free(lcr);
1370         return rc;
1371 }
1372
1373 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1374                             struct llog_handle **llh, char *name)
1375 {
1376         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1377         struct llog_ctxt        *ctxt;
1378         int                      rc = 0;
1379         ENTRY;
1380
1381         if (*llh)
1382                 GOTO(out, rc = -EBUSY);
1383
1384         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1385         if (!ctxt)
1386                 GOTO(out, rc = -ENODEV);
1387         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1388
1389         rc = llog_open_create(env, ctxt, llh, NULL, name);
1390         if (rc)
1391                 GOTO(out_ctxt, rc);
1392         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1393         if (rc)
1394                 llog_close(env, *llh);
1395 out_ctxt:
1396         llog_ctxt_put(ctxt);
1397 out:
1398         if (rc) {
1399                 CERROR("%s: can't start log %s: rc = %d\n",
1400                        mgs->mgs_obd->obd_name, name, rc);
1401                 *llh = NULL;
1402         }
1403         RETURN(rc);
1404 }
1405
1406 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1407 {
1408         int rc;
1409
1410         rc = llog_close(env, *llh);
1411         *llh = NULL;
1412
1413         return rc;
1414 }
1415
1416 /******************** config "macros" *********************/
1417
1418 /* write an lcfg directly into a log (with markers) */
1419 static int mgs_write_log_direct(const struct lu_env *env,
1420                                 struct mgs_device *mgs, struct fs_db *fsdb,
1421                                 char *logname, struct llog_cfg_rec *lcr,
1422                                 char *devname, char *comment)
1423 {
1424         struct llog_handle *llh = NULL;
1425         int rc;
1426
1427         ENTRY;
1428
1429         rc = record_start_log(env, mgs, &llh, logname);
1430         if (rc)
1431                 RETURN(rc);
1432
1433         /* FIXME These should be a single journal transaction */
1434         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1435         if (rc)
1436                 GOTO(out_end, rc);
1437         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1438         if (rc)
1439                 GOTO(out_end, rc);
1440         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1441         if (rc)
1442                 GOTO(out_end, rc);
1443 out_end:
1444         record_end_log(env, &llh);
1445         RETURN(rc);
1446 }
1447
1448 /* write the lcfg in all logs for the given fs */
1449 static int mgs_write_log_direct_all(const struct lu_env *env,
1450                                     struct mgs_device *mgs,
1451                                     struct fs_db *fsdb,
1452                                     struct mgs_target_info *mti,
1453                                     struct llog_cfg_rec *lcr, char *devname,
1454                                     char *comment, int server_only)
1455 {
1456         struct list_head         log_list;
1457         struct mgs_direntry     *dirent, *n;
1458         char                    *fsname = mti->mti_fsname;
1459         int                      rc = 0, len = strlen(fsname);
1460
1461         ENTRY;
1462         /* Find all the logs in the CONFIGS directory */
1463         rc = class_dentry_readdir(env, mgs, &log_list);
1464         if (rc)
1465                 RETURN(rc);
1466
1467         /* Could use fsdb index maps instead of directory listing */
1468         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1469                 list_del_init(&dirent->mde_list);
1470                 /* don't write to sptlrpc rule log */
1471                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1472                         goto next;
1473
1474                 /* caller wants write server logs only */
1475                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1476                         goto next;
1477
1478                 if (strlen(dirent->mde_name) <= len ||
1479                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1480                     dirent->mde_name[len] != '-')
1481                         goto next;
1482
1483                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1484                 /* Erase any old settings of this same parameter */
1485                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1486                                 devname, comment, CM_SKIP);
1487                 if (rc < 0)
1488                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1489                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1490                 if (lcr == NULL)
1491                         goto next;
1492                 /* Write the new one */
1493                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1494                                           lcr, devname, comment);
1495                 if (rc != 0)
1496                         CERROR("%s: writing log %s: rc = %d\n",
1497                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1498 next:
1499                 mgs_direntry_free(dirent);
1500         }
1501
1502         RETURN(rc);
1503 }
1504
1505 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1506                                     struct mgs_device *mgs,
1507                                     struct fs_db *fsdb,
1508                                     struct mgs_target_info *mti,
1509                                     int index, char *logname);
1510 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1511                                     struct mgs_device *mgs,
1512                                     struct fs_db *fsdb,
1513                                     struct mgs_target_info *mti,
1514                                     char *logname, char *suffix, char *lovname,
1515                                     enum lustre_sec_part sec_part, int flags);
1516 static int name_create_mdt_and_lov(char **logname, char **lovname,
1517                                    struct fs_db *fsdb, int i);
1518
1519 static int add_param(char *params, char *key, char *val)
1520 {
1521         char *start = params + strlen(params);
1522         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1523         int keylen = 0;
1524
1525         if (key != NULL)
1526                 keylen = strlen(key);
1527         if (start + 1 + keylen + strlen(val) >= end) {
1528                 CERROR("params are too long: %s %s%s\n",
1529                        params, key != NULL ? key : "", val);
1530                 return -EINVAL;
1531         }
1532
1533         sprintf(start, " %s%s", key != NULL ? key : "", val);
1534         return 0;
1535 }
1536
1537 /**
1538  * Walk through client config log record and convert the related records
1539  * into the target.
1540  **/
1541 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1542                                          struct llog_handle *llh,
1543                                          struct llog_rec_hdr *rec, void *data)
1544 {
1545         struct mgs_device *mgs;
1546         struct obd_device *obd;
1547         struct mgs_target_info *mti, *tmti;
1548         struct fs_db *fsdb;
1549         int cfg_len = rec->lrh_len;
1550         char *cfg_buf = (char*) (rec + 1);
1551         struct lustre_cfg *lcfg;
1552         int rc = 0;
1553         struct llog_handle *mdt_llh = NULL;
1554         static int got_an_osc_or_mdc = 0;
1555         /* 0: not found any osc/mdc;
1556            1: found osc;
1557            2: found mdc;
1558         */
1559         static int last_step = -1;
1560         int cplen = 0;
1561
1562         ENTRY;
1563
1564         mti = ((struct temp_comp*)data)->comp_mti;
1565         tmti = ((struct temp_comp*)data)->comp_tmti;
1566         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1567         obd = ((struct temp_comp *)data)->comp_obd;
1568         mgs = lu2mgs_dev(obd->obd_lu_dev);
1569         LASSERT(mgs);
1570
1571         if (rec->lrh_type != OBD_CFG_REC) {
1572                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1573                 RETURN(-EINVAL);
1574         }
1575
1576         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1577         if (rc) {
1578                 CERROR("Insane cfg\n");
1579                 RETURN(rc);
1580         }
1581
1582         lcfg = (struct lustre_cfg *)cfg_buf;
1583
1584         if (lcfg->lcfg_command == LCFG_MARKER) {
1585                 struct cfg_marker *marker;
1586                 marker = lustre_cfg_buf(lcfg, 1);
1587                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1588                     (marker->cm_flags & CM_START) &&
1589                      !(marker->cm_flags & CM_SKIP)) {
1590                         got_an_osc_or_mdc = 1;
1591                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1592                                         sizeof(tmti->mti_svname));
1593                         if (cplen >= sizeof(tmti->mti_svname))
1594                                 RETURN(-E2BIG);
1595                         rc = record_start_log(env, mgs, &mdt_llh,
1596                                               mti->mti_svname);
1597                         if (rc)
1598                                 RETURN(rc);
1599                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1600                                            mti->mti_svname, "add osc(copied)");
1601                         record_end_log(env, &mdt_llh);
1602                         last_step = marker->cm_step;
1603                         RETURN(rc);
1604                 }
1605                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1606                     (marker->cm_flags & CM_END) &&
1607                      !(marker->cm_flags & CM_SKIP)) {
1608                         LASSERT(last_step == marker->cm_step);
1609                         last_step = -1;
1610                         got_an_osc_or_mdc = 0;
1611                         memset(tmti, 0, sizeof(*tmti));
1612                         rc = record_start_log(env, mgs, &mdt_llh,
1613                                               mti->mti_svname);
1614                         if (rc)
1615                                 RETURN(rc);
1616                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1617                                            mti->mti_svname, "add osc(copied)");
1618                         record_end_log(env, &mdt_llh);
1619                         RETURN(rc);
1620                 }
1621                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1622                     (marker->cm_flags & CM_START) &&
1623                      !(marker->cm_flags & CM_SKIP)) {
1624                         got_an_osc_or_mdc = 2;
1625                         last_step = marker->cm_step;
1626                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1627                                strlen(marker->cm_tgtname));
1628
1629                         RETURN(rc);
1630                 }
1631                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1632                     (marker->cm_flags & CM_END) &&
1633                      !(marker->cm_flags & CM_SKIP)) {
1634                         LASSERT(last_step == marker->cm_step);
1635                         last_step = -1;
1636                         got_an_osc_or_mdc = 0;
1637                         memset(tmti, 0, sizeof(*tmti));
1638                         RETURN(rc);
1639                 }
1640         }
1641
1642         if (got_an_osc_or_mdc == 0 || last_step < 0)
1643                 RETURN(rc);
1644
1645         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1646                 __u64 nodenid = lcfg->lcfg_nid;
1647
1648                 if (strlen(tmti->mti_uuid) == 0) {
1649                         /* target uuid not set, this config record is before
1650                          * LCFG_SETUP, this nid is one of target node nid.
1651                          */
1652                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1653                         tmti->mti_nid_count++;
1654                 } else {
1655                         char nidstr[LNET_NIDSTR_SIZE];
1656
1657                         /* failover node nid */
1658                         libcfs_nid2str_r(nodenid, nidstr, sizeof(nidstr));
1659                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1660                                         nidstr);
1661                 }
1662
1663                 RETURN(rc);
1664         }
1665
1666         if (lcfg->lcfg_command == LCFG_SETUP) {
1667                 char *target;
1668
1669                 target = lustre_cfg_string(lcfg, 1);
1670                 memcpy(tmti->mti_uuid, target, strlen(target));
1671                 RETURN(rc);
1672         }
1673
1674         /* ignore client side sptlrpc_conf_log */
1675         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1676                 RETURN(rc);
1677
1678         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1679                 int index;
1680
1681                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1682                         RETURN (-EINVAL);
1683
1684                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1685                        strlen(mti->mti_fsname));
1686                 tmti->mti_stripe_index = index;
1687
1688                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1689                                               mti->mti_stripe_index,
1690                                               mti->mti_svname);
1691                 memset(tmti, 0, sizeof(*tmti));
1692                 RETURN(rc);
1693         }
1694
1695         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1696                 int index;
1697                 char mdt_index[9];
1698                 char *logname, *lovname;
1699
1700                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1701                                              mti->mti_stripe_index);
1702                 if (rc)
1703                         RETURN(rc);
1704                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1705
1706                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1707                         name_destroy(&logname);
1708                         name_destroy(&lovname);
1709                         RETURN(-EINVAL);
1710                 }
1711
1712                 tmti->mti_stripe_index = index;
1713                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1714                                          mdt_index, lovname,
1715                                          LUSTRE_SP_MDT, 0);
1716                 name_destroy(&logname);
1717                 name_destroy(&lovname);
1718                 RETURN(rc);
1719         }
1720         RETURN(rc);
1721 }
1722
1723 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1724 /* stealed from mgs_get_fsdb_from_llog*/
1725 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1726                                               struct mgs_device *mgs,
1727                                               char *client_name,
1728                                               struct temp_comp* comp)
1729 {
1730         struct llog_handle *loghandle;
1731         struct mgs_target_info *tmti;
1732         struct llog_ctxt *ctxt;
1733         int rc;
1734
1735         ENTRY;
1736
1737         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1738         LASSERT(ctxt != NULL);
1739
1740         OBD_ALLOC_PTR(tmti);
1741         if (tmti == NULL)
1742                 GOTO(out_ctxt, rc = -ENOMEM);
1743
1744         comp->comp_tmti = tmti;
1745         comp->comp_obd = mgs->mgs_obd;
1746
1747         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1748                        LLOG_OPEN_EXISTS);
1749         if (rc < 0) {
1750                 if (rc == -ENOENT)
1751                         rc = 0;
1752                 GOTO(out_pop, rc);
1753         }
1754
1755         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1756         if (rc)
1757                 GOTO(out_close, rc);
1758
1759         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1760                                   (void *)comp, NULL, false);
1761         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1762 out_close:
1763         llog_close(env, loghandle);
1764 out_pop:
1765         OBD_FREE_PTR(tmti);
1766 out_ctxt:
1767         llog_ctxt_put(ctxt);
1768         RETURN(rc);
1769 }
1770
1771 /* lmv is the second thing for client logs */
1772 /* copied from mgs_write_log_lov. Please refer to that.  */
1773 static int mgs_write_log_lmv(const struct lu_env *env,
1774                              struct mgs_device *mgs,
1775                              struct fs_db *fsdb,
1776                              struct mgs_target_info *mti,
1777                              char *logname, char *lmvname)
1778 {
1779         struct llog_handle *llh = NULL;
1780         struct lmv_desc *lmvdesc;
1781         char *uuid;
1782         int rc = 0;
1783         ENTRY;
1784
1785         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1786
1787         OBD_ALLOC_PTR(lmvdesc);
1788         if (lmvdesc == NULL)
1789                 RETURN(-ENOMEM);
1790         lmvdesc->ld_active_tgt_count = 0;
1791         lmvdesc->ld_tgt_count = 0;
1792         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1793         uuid = (char *)lmvdesc->ld_uuid.uuid;
1794
1795         rc = record_start_log(env, mgs, &llh, logname);
1796         if (rc)
1797                 GOTO(out_free, rc);
1798         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1799         if (rc)
1800                 GOTO(out_end, rc);
1801         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1802         if (rc)
1803                 GOTO(out_end, rc);
1804         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1805         if (rc)
1806                 GOTO(out_end, rc);
1807         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1808         if (rc)
1809                 GOTO(out_end, rc);
1810 out_end:
1811         record_end_log(env, &llh);
1812 out_free:
1813         OBD_FREE_PTR(lmvdesc);
1814         RETURN(rc);
1815 }
1816
1817 /* lov is the first thing in the mdt and client logs */
1818 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1819                              struct fs_db *fsdb, struct mgs_target_info *mti,
1820                              char *logname, char *lovname)
1821 {
1822         struct llog_handle *llh = NULL;
1823         struct lov_desc *lovdesc;
1824         char *uuid;
1825         int rc = 0;
1826         ENTRY;
1827
1828         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1829
1830         /*
1831         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1832         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1833               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1834         */
1835
1836         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1837         OBD_ALLOC_PTR(lovdesc);
1838         if (lovdesc == NULL)
1839                 RETURN(-ENOMEM);
1840         lovdesc->ld_magic = LOV_DESC_MAGIC;
1841         lovdesc->ld_tgt_count = 0;
1842         /* Defaults.  Can be changed later by lcfg config_param */
1843         lovdesc->ld_default_stripe_count = 1;
1844         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1845         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1846         lovdesc->ld_default_stripe_offset = -1;
1847         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1848         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1849         /* can these be the same? */
1850         uuid = (char *)lovdesc->ld_uuid.uuid;
1851
1852         /* This should always be the first entry in a log.
1853         rc = mgs_clear_log(obd, logname); */
1854         rc = record_start_log(env, mgs, &llh, logname);
1855         if (rc)
1856                 GOTO(out_free, rc);
1857         /* FIXME these should be a single journal transaction */
1858         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1859         if (rc)
1860                 GOTO(out_end, rc);
1861         rc = record_attach(env, llh, lovname, "lov", uuid);
1862         if (rc)
1863                 GOTO(out_end, rc);
1864         rc = record_lov_setup(env, llh, lovname, lovdesc);
1865         if (rc)
1866                 GOTO(out_end, rc);
1867         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1868         if (rc)
1869                 GOTO(out_end, rc);
1870         EXIT;
1871 out_end:
1872         record_end_log(env, &llh);
1873 out_free:
1874         OBD_FREE_PTR(lovdesc);
1875         return rc;
1876 }
1877
1878 /* add failnids to open log */
1879 static int mgs_write_log_failnids(const struct lu_env *env,
1880                                   struct mgs_target_info *mti,
1881                                   struct llog_handle *llh,
1882                                   char *cliname)
1883 {
1884         char *failnodeuuid = NULL;
1885         char *ptr = mti->mti_params;
1886         lnet_nid_t nid;
1887         int rc = 0;
1888
1889         /*
1890         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1891         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1892         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1893         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1894         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1895         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1896         */
1897
1898         /*
1899          * Pull failnid info out of params string, which may contain something
1900          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1901          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1902          * etc.  However, convert_hostnames() should have caught those.
1903          */
1904         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1905                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1906                         char nidstr[LNET_NIDSTR_SIZE];
1907
1908                         if (failnodeuuid == NULL) {
1909                                 /* We don't know the failover node name,
1910                                  * so just use the first nid as the uuid */
1911                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr));
1912                                 rc = name_create(&failnodeuuid, nidstr, "");
1913                                 if (rc != 0)
1914                                         return rc;
1915                         }
1916                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1917                                 "client %s\n",
1918                                 libcfs_nid2str_r(nid, nidstr, sizeof(nidstr)),
1919                                 failnodeuuid, cliname);
1920                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1921                         /*
1922                          * If *ptr is ':', we have added all NIDs for
1923                          * failnodeuuid.
1924                          */
1925                         if (*ptr == ':') {
1926                                 rc = record_add_conn(env, llh, cliname,
1927                                                      failnodeuuid);
1928                                 name_destroy(&failnodeuuid);
1929                                 failnodeuuid = NULL;
1930                         }
1931                 }
1932                 if (failnodeuuid) {
1933                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1934                         name_destroy(&failnodeuuid);
1935                         failnodeuuid = NULL;
1936                 }
1937         }
1938
1939         return rc;
1940 }
1941
1942 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1943                                     struct mgs_device *mgs,
1944                                     struct fs_db *fsdb,
1945                                     struct mgs_target_info *mti,
1946                                     char *logname, char *lmvname)
1947 {
1948         struct llog_handle *llh = NULL;
1949         char *mdcname = NULL;
1950         char *nodeuuid = NULL;
1951         char *mdcuuid = NULL;
1952         char *lmvuuid = NULL;
1953         char index[6];
1954         char nidstr[LNET_NIDSTR_SIZE];
1955         int i, rc;
1956         ENTRY;
1957
1958         if (mgs_log_is_empty(env, mgs, logname)) {
1959                 CERROR("log is empty! Logical error\n");
1960                 RETURN(-EINVAL);
1961         }
1962
1963         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1964                mti->mti_svname, logname, lmvname);
1965
1966         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
1967         rc = name_create(&nodeuuid, nidstr, "");
1968         if (rc)
1969                 RETURN(rc);
1970         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1971         if (rc)
1972                 GOTO(out_free, rc);
1973         rc = name_create(&mdcuuid, mdcname, "_UUID");
1974         if (rc)
1975                 GOTO(out_free, rc);
1976         rc = name_create(&lmvuuid, lmvname, "_UUID");
1977         if (rc)
1978                 GOTO(out_free, rc);
1979
1980         rc = record_start_log(env, mgs, &llh, logname);
1981         if (rc)
1982                 GOTO(out_free, rc);
1983         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1984                            "add mdc");
1985         if (rc)
1986                 GOTO(out_end, rc);
1987         for (i = 0; i < mti->mti_nid_count; i++) {
1988                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1989                         libcfs_nid2str_r(mti->mti_nids[i],
1990                                          nidstr, sizeof(nidstr)));
1991
1992                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1993                 if (rc)
1994                         GOTO(out_end, rc);
1995         }
1996
1997         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1998         if (rc)
1999                 GOTO(out_end, rc);
2000         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid,
2001                           NULL, NULL);
2002         if (rc)
2003                 GOTO(out_end, rc);
2004         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
2005         if (rc)
2006                 GOTO(out_end, rc);
2007         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2008         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
2009                             index, "1");
2010         if (rc)
2011                 GOTO(out_end, rc);
2012         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2013                            "add mdc");
2014         if (rc)
2015                 GOTO(out_end, rc);
2016 out_end:
2017         record_end_log(env, &llh);
2018 out_free:
2019         name_destroy(&lmvuuid);
2020         name_destroy(&mdcuuid);
2021         name_destroy(&mdcname);
2022         name_destroy(&nodeuuid);
2023         RETURN(rc);
2024 }
2025
2026 static inline int name_create_lov(char **lovname, char *mdtname,
2027                                   struct fs_db *fsdb, int index)
2028 {
2029         /* COMPAT_180 */
2030         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2031                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2032         else
2033                 return name_create(lovname, mdtname, "-mdtlov");
2034 }
2035
2036 static int name_create_mdt_and_lov(char **logname, char **lovname,
2037                                    struct fs_db *fsdb, int i)
2038 {
2039         int rc;
2040
2041         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2042         if (rc)
2043                 return rc;
2044         /* COMPAT_180 */
2045         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2046                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2047         else
2048                 rc = name_create(lovname, *logname, "-mdtlov");
2049         if (rc) {
2050                 name_destroy(logname);
2051                 *logname = NULL;
2052         }
2053         return rc;
2054 }
2055
2056 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2057                                       struct fs_db *fsdb, int i)
2058 {
2059         char suffix[16];
2060
2061         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2062                 sprintf(suffix, "-osc");
2063         else
2064                 sprintf(suffix, "-osc-MDT%04x", i);
2065         return name_create(oscname, ostname, suffix);
2066 }
2067
2068 /* add new mdc to already existent MDS */
2069 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2070                                     struct mgs_device *mgs,
2071                                     struct fs_db *fsdb,
2072                                     struct mgs_target_info *mti,
2073                                     int mdt_index, char *logname)
2074 {
2075         struct llog_handle      *llh = NULL;
2076         char    *nodeuuid = NULL;
2077         char    *ospname = NULL;
2078         char    *lovuuid = NULL;
2079         char    *mdtuuid = NULL;
2080         char    *svname = NULL;
2081         char    *mdtname = NULL;
2082         char    *lovname = NULL;
2083         char    index_str[16];
2084         char    nidstr[LNET_NIDSTR_SIZE];
2085         int     i, rc;
2086
2087         ENTRY;
2088         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2089                 CERROR("log is empty! Logical error\n");
2090                 RETURN (-EINVAL);
2091         }
2092
2093         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2094                logname);
2095
2096         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2097         if (rc)
2098                 RETURN(rc);
2099
2100         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2101         rc = name_create(&nodeuuid, nidstr, "");
2102         if (rc)
2103                 GOTO(out_destory, rc);
2104
2105         rc = name_create(&svname, mdtname, "-osp");
2106         if (rc)
2107                 GOTO(out_destory, rc);
2108
2109         sprintf(index_str, "-MDT%04x", mdt_index);
2110         rc = name_create(&ospname, svname, index_str);
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2115         if (rc)
2116                 GOTO(out_destory, rc);
2117
2118         rc = name_create(&lovuuid, lovname, "_UUID");
2119         if (rc)
2120                 GOTO(out_destory, rc);
2121
2122         rc = name_create(&mdtuuid, mdtname, "_UUID");
2123         if (rc)
2124                 GOTO(out_destory, rc);
2125
2126         rc = record_start_log(env, mgs, &llh, logname);
2127         if (rc)
2128                 GOTO(out_destory, rc);
2129
2130         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2131                            "add osp");
2132         if (rc)
2133                 GOTO(out_destory, rc);
2134
2135         for (i = 0; i < mti->mti_nid_count; i++) {
2136                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2137                         libcfs_nid2str_r(mti->mti_nids[i],
2138                                          nidstr, sizeof(nidstr)));
2139                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2140                 if (rc)
2141                         GOTO(out_end, rc);
2142         }
2143
2144         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2145         if (rc)
2146                 GOTO(out_end, rc);
2147
2148         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2149                           NULL, NULL);
2150         if (rc)
2151                 GOTO(out_end, rc);
2152
2153         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2154         if (rc)
2155                 GOTO(out_end, rc);
2156
2157         /* Add mdc(osp) to lod */
2158         snprintf(index_str, sizeof(index_str), "%d", mti->mti_stripe_index);
2159         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2160                          index_str, "1", NULL);
2161         if (rc)
2162                 GOTO(out_end, rc);
2163
2164         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2165         if (rc)
2166                 GOTO(out_end, rc);
2167
2168 out_end:
2169         record_end_log(env, &llh);
2170
2171 out_destory:
2172         name_destroy(&mdtuuid);
2173         name_destroy(&lovuuid);
2174         name_destroy(&lovname);
2175         name_destroy(&ospname);
2176         name_destroy(&svname);
2177         name_destroy(&nodeuuid);
2178         name_destroy(&mdtname);
2179         RETURN(rc);
2180 }
2181
2182 static int mgs_write_log_mdt0(const struct lu_env *env,
2183                               struct mgs_device *mgs,
2184                               struct fs_db *fsdb,
2185                               struct mgs_target_info *mti)
2186 {
2187         char *log = mti->mti_svname;
2188         struct llog_handle *llh = NULL;
2189         char *uuid, *lovname;
2190         char mdt_index[6];
2191         char *ptr = mti->mti_params;
2192         int rc = 0, failout = 0;
2193         ENTRY;
2194
2195         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2196         if (uuid == NULL)
2197                 RETURN(-ENOMEM);
2198
2199         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2200                 failout = (strncmp(ptr, "failout", 7) == 0);
2201
2202         rc = name_create(&lovname, log, "-mdtlov");
2203         if (rc)
2204                 GOTO(out_free, rc);
2205         if (mgs_log_is_empty(env, mgs, log)) {
2206                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2207                 if (rc)
2208                         GOTO(out_lod, rc);
2209         }
2210
2211         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2212
2213         rc = record_start_log(env, mgs, &llh, log);
2214         if (rc)
2215                 GOTO(out_lod, rc);
2216
2217         /* add MDT itself */
2218
2219         /* FIXME this whole fn should be a single journal transaction */
2220         sprintf(uuid, "%s_UUID", log);
2221         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2222         if (rc)
2223                 GOTO(out_lod, rc);
2224         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2225         if (rc)
2226                 GOTO(out_end, rc);
2227         rc = record_mount_opt(env, llh, log, lovname, NULL);
2228         if (rc)
2229                 GOTO(out_end, rc);
2230         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2231                         failout ? "n" : "f");
2232         if (rc)
2233                 GOTO(out_end, rc);
2234         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2235         if (rc)
2236                 GOTO(out_end, rc);
2237 out_end:
2238         record_end_log(env, &llh);
2239 out_lod:
2240         name_destroy(&lovname);
2241 out_free:
2242         OBD_FREE(uuid, sizeof(struct obd_uuid));
2243         RETURN(rc);
2244 }
2245
2246 /* envelope method for all layers log */
2247 static int mgs_write_log_mdt(const struct lu_env *env,
2248                              struct mgs_device *mgs,
2249                              struct fs_db *fsdb,
2250                              struct mgs_target_info *mti)
2251 {
2252         struct mgs_thread_info *mgi = mgs_env_info(env);
2253         struct llog_handle *llh = NULL;
2254         char *cliname;
2255         int rc, i = 0;
2256         ENTRY;
2257
2258         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2259
2260         if (mti->mti_uuid[0] == '\0') {
2261                 /* Make up our own uuid */
2262                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2263                          "%s_UUID", mti->mti_svname);
2264         }
2265
2266         /* add mdt */
2267         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2268         if (rc)
2269                 RETURN(rc);
2270         /* Append the mdt info to the client log */
2271         rc = name_create(&cliname, mti->mti_fsname, "-client");
2272         if (rc)
2273                 RETURN(rc);
2274
2275         if (mgs_log_is_empty(env, mgs, cliname)) {
2276                 /* Start client log */
2277                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2278                                        fsdb->fsdb_clilov);
2279                 if (rc)
2280                         GOTO(out_free, rc);
2281                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2282                                        fsdb->fsdb_clilmv);
2283                 if (rc)
2284                         GOTO(out_free, rc);
2285         }
2286
2287         /*
2288         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2289         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2290         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2291         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2292         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2293         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2294         */
2295
2296                 /* copy client info about lov/lmv */
2297                 mgi->mgi_comp.comp_mti = mti;
2298                 mgi->mgi_comp.comp_fsdb = fsdb;
2299
2300                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2301                                                         &mgi->mgi_comp);
2302                 if (rc)
2303                         GOTO(out_free, rc);
2304                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2305                                               fsdb->fsdb_clilmv);
2306                 if (rc)
2307                         GOTO(out_free, rc);
2308
2309                 /* add mountopts */
2310                 rc = record_start_log(env, mgs, &llh, cliname);
2311                 if (rc)
2312                         GOTO(out_free, rc);
2313
2314                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2315                                    "mount opts");
2316                 if (rc)
2317                         GOTO(out_end, rc);
2318                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2319                                       fsdb->fsdb_clilmv);
2320                 if (rc)
2321                         GOTO(out_end, rc);
2322                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2323                                    "mount opts");
2324
2325         if (rc)
2326                 GOTO(out_end, rc);
2327
2328         /* for_all_existing_mdt except current one */
2329         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2330                 if (i !=  mti->mti_stripe_index &&
2331                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2332                         char *logname;
2333
2334                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2335                         if (rc)
2336                                 GOTO(out_end, rc);
2337
2338                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2339                                                       i, logname);
2340                         name_destroy(&logname);
2341                         if (rc)
2342                                 GOTO(out_end, rc);
2343                 }
2344         }
2345 out_end:
2346         record_end_log(env, &llh);
2347 out_free:
2348         name_destroy(&cliname);
2349         RETURN(rc);
2350 }
2351
2352 /* Add the ost info to the client/mdt lov */
2353 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2354                                     struct mgs_device *mgs, struct fs_db *fsdb,
2355                                     struct mgs_target_info *mti,
2356                                     char *logname, char *suffix, char *lovname,
2357                                     enum lustre_sec_part sec_part, int flags)
2358 {
2359         struct llog_handle *llh = NULL;
2360         char *nodeuuid = NULL;
2361         char *oscname = NULL;
2362         char *oscuuid = NULL;
2363         char *lovuuid = NULL;
2364         char *svname = NULL;
2365         char index[6];
2366         char nidstr[LNET_NIDSTR_SIZE];
2367         int i, rc;
2368         ENTRY;
2369
2370         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2371                 mti->mti_svname, logname);
2372
2373         if (mgs_log_is_empty(env, mgs, logname)) {
2374                 CERROR("log is empty! Logical error\n");
2375                 RETURN(-EINVAL);
2376         }
2377
2378         libcfs_nid2str_r(mti->mti_nids[0], nidstr, sizeof(nidstr));
2379         rc = name_create(&nodeuuid, nidstr, "");
2380         if (rc)
2381                 RETURN(rc);
2382         rc = name_create(&svname, mti->mti_svname, "-osc");
2383         if (rc)
2384                 GOTO(out_free, rc);
2385
2386         /* for the system upgraded from old 1.8, keep using the old osc naming
2387          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2388         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2389                 rc = name_create(&oscname, svname, "");
2390         else
2391                 rc = name_create(&oscname, svname, suffix);
2392         if (rc)
2393                 GOTO(out_free, rc);
2394
2395         rc = name_create(&oscuuid, oscname, "_UUID");
2396         if (rc)
2397                 GOTO(out_free, rc);
2398         rc = name_create(&lovuuid, lovname, "_UUID");
2399         if (rc)
2400                 GOTO(out_free, rc);
2401
2402
2403         /*
2404         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2405         multihomed (#4)
2406         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2407         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2408         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2409         failover (#6,7)
2410         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2411         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2412         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2413         */
2414
2415         rc = record_start_log(env, mgs, &llh, logname);
2416         if (rc)
2417                 GOTO(out_free, rc);
2418
2419         /* FIXME these should be a single journal transaction */
2420         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2421                            "add osc");
2422         if (rc)
2423                 GOTO(out_end, rc);
2424
2425         /* NB: don't change record order, because upon MDT steal OSC config
2426          * from client, it treats all nids before LCFG_SETUP as target nids
2427          * (multiple interfaces), while nids after as failover node nids.
2428          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2429          */
2430         for (i = 0; i < mti->mti_nid_count; i++) {
2431                 CDEBUG(D_MGS, "add nid %s\n",
2432                         libcfs_nid2str_r(mti->mti_nids[i],
2433                                          nidstr, sizeof(nidstr)));
2434                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2435                 if (rc)
2436                         GOTO(out_end, rc);
2437         }
2438         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2439         if (rc)
2440                 GOTO(out_end, rc);
2441         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid,
2442                           NULL, NULL);
2443         if (rc)
2444                 GOTO(out_end, rc);
2445         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2446         if (rc)
2447                 GOTO(out_end, rc);
2448
2449         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2450
2451         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2452         if (rc)
2453                 GOTO(out_end, rc);
2454         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2455                            "add osc");
2456         if (rc)
2457                 GOTO(out_end, rc);
2458 out_end:
2459         record_end_log(env, &llh);
2460 out_free:
2461         name_destroy(&lovuuid);
2462         name_destroy(&oscuuid);
2463         name_destroy(&oscname);
2464         name_destroy(&svname);
2465         name_destroy(&nodeuuid);
2466         RETURN(rc);
2467 }
2468
2469 static int mgs_write_log_ost(const struct lu_env *env,
2470                              struct mgs_device *mgs, struct fs_db *fsdb,
2471                              struct mgs_target_info *mti)
2472 {
2473         struct llog_handle *llh = NULL;
2474         char *logname, *lovname;
2475         char *ptr = mti->mti_params;
2476         int rc, flags = 0, failout = 0, i;
2477         ENTRY;
2478
2479         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2480
2481         /* The ost startup log */
2482
2483         /* If the ost log already exists, that means that someone reformatted
2484            the ost and it called target_add again. */
2485         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2486                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2487                                    "exists, yet the server claims it never "
2488                                    "registered. It may have been reformatted, "
2489                                    "or the index changed. writeconf the MDT to "
2490                                    "regenerate all logs.\n", mti->mti_svname);
2491                 RETURN(-EALREADY);
2492         }
2493
2494         /*
2495         attach obdfilter ost1 ost1_UUID
2496         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2497         */
2498         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2499                 failout = (strncmp(ptr, "failout", 7) == 0);
2500         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2501         if (rc)
2502                 RETURN(rc);
2503         /* FIXME these should be a single journal transaction */
2504         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2505         if (rc)
2506                 GOTO(out_end, rc);
2507         if (*mti->mti_uuid == '\0')
2508                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2509                          "%s_UUID", mti->mti_svname);
2510         rc = record_attach(env, llh, mti->mti_svname,
2511                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2512         if (rc)
2513                 GOTO(out_end, rc);
2514         rc = record_setup(env, llh, mti->mti_svname,
2515                           "dev"/*ignored*/, "type"/*ignored*/,
2516                           failout ? "n" : "f", NULL/*options*/);
2517         if (rc)
2518                 GOTO(out_end, rc);
2519         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2520         if (rc)
2521                 GOTO(out_end, rc);
2522 out_end:
2523         record_end_log(env, &llh);
2524         if (rc)
2525                 RETURN(rc);
2526         /* We also have to update the other logs where this osc is part of
2527            the lov */
2528
2529         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2530                 /* If we're upgrading, the old mdt log already has our
2531                    entry. Let's do a fake one for fun. */
2532                 /* Note that we can't add any new failnids, since we don't
2533                    know the old osc names. */
2534                 flags = CM_SKIP | CM_UPGRADE146;
2535
2536         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2537                 /* If the update flag isn't set, don't update client/mdt
2538                    logs. */
2539                 flags |= CM_SKIP;
2540                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2541                               "the MDT first to regenerate it.\n",
2542                               mti->mti_svname);
2543         }
2544
2545         /* Add ost to all MDT lov defs */
2546         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2547                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2548                         char mdt_index[9];
2549
2550                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2551                                                      i);
2552                         if (rc)
2553                                 RETURN(rc);
2554                         sprintf(mdt_index, "-MDT%04x", i);
2555                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2556                                                       logname, mdt_index,
2557                                                       lovname, LUSTRE_SP_MDT,
2558                                                       flags);
2559                         name_destroy(&logname);
2560                         name_destroy(&lovname);
2561                         if (rc)
2562                                 RETURN(rc);
2563                 }
2564         }
2565
2566         /* Append ost info to the client log */
2567         rc = name_create(&logname, mti->mti_fsname, "-client");
2568         if (rc)
2569                 RETURN(rc);
2570         if (mgs_log_is_empty(env, mgs, logname)) {
2571                 /* Start client log */
2572                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2573                                        fsdb->fsdb_clilov);
2574                 if (rc)
2575                         GOTO(out_free, rc);
2576                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2577                                        fsdb->fsdb_clilmv);
2578                 if (rc)
2579                         GOTO(out_free, rc);
2580         }
2581         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2582                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, flags);
2583 out_free:
2584         name_destroy(&logname);
2585         RETURN(rc);
2586 }
2587
2588 static __inline__ int mgs_param_empty(char *ptr)
2589 {
2590         char *tmp;
2591
2592         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2593                 return 1;
2594         return 0;
2595 }
2596
2597 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2598                                           struct mgs_device *mgs,
2599                                           struct fs_db *fsdb,
2600                                           struct mgs_target_info *mti,
2601                                           char *logname, char *cliname)
2602 {
2603         int rc;
2604         struct llog_handle *llh = NULL;
2605
2606         if (mgs_param_empty(mti->mti_params)) {
2607                 /* Remove _all_ failnids */
2608                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2609                                 mti->mti_svname, "add failnid", CM_SKIP);
2610                 return rc < 0 ? rc : 0;
2611         }
2612
2613         /* Otherwise failover nids are additive */
2614         rc = record_start_log(env, mgs, &llh, logname);
2615         if (rc)
2616                 return rc;
2617                 /* FIXME this should be a single journal transaction */
2618         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2619                            "add failnid");
2620         if (rc)
2621                 goto out_end;
2622         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2623         if (rc)
2624                 goto out_end;
2625         rc = record_marker(env, llh, fsdb, CM_END,
2626                            mti->mti_svname, "add failnid");
2627 out_end:
2628         record_end_log(env, &llh);
2629         return rc;
2630 }
2631
2632
2633 /* Add additional failnids to an existing log.
2634    The mdc/osc must have been added to logs first */
2635 /* tcp nids must be in dotted-quad ascii -
2636    we can't resolve hostnames from the kernel. */
2637 static int mgs_write_log_add_failnid(const struct lu_env *env,
2638                                      struct mgs_device *mgs,
2639                                      struct fs_db *fsdb,
2640                                      struct mgs_target_info *mti)
2641 {
2642         char *logname, *cliname;
2643         int rc;
2644         ENTRY;
2645
2646         /* FIXME we currently can't erase the failnids
2647          * given when a target first registers, since they aren't part of
2648          * an "add uuid" stanza */
2649
2650         /* Verify that we know about this target */
2651         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2652                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2653                                    "yet. It must be started before failnids "
2654                                    "can be added.\n", mti->mti_svname);
2655                 RETURN(-ENOENT);
2656         }
2657
2658         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2659         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2660                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2661         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2662                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2663         } else {
2664                 RETURN(-EINVAL);
2665         }
2666         if (rc)
2667                 RETURN(rc);
2668         /* Add failover nids to the client log */
2669         rc = name_create(&logname, mti->mti_fsname, "-client");
2670         if (rc) {
2671                 name_destroy(&cliname);
2672                 RETURN(rc);
2673         }
2674         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2675         name_destroy(&logname);
2676         name_destroy(&cliname);
2677         if (rc)
2678                 RETURN(rc);
2679
2680         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2681                 /* Add OST failover nids to the MDT logs as well */
2682                 int i;
2683
2684                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2685                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2686                                 continue;
2687                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2688                         if (rc)
2689                                 RETURN(rc);
2690                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2691                                                  fsdb, i);
2692                         if (rc) {
2693                                 name_destroy(&logname);
2694                                 RETURN(rc);
2695                         }
2696                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2697                                                             mti, logname,
2698                                                             cliname);
2699                         name_destroy(&cliname);
2700                         name_destroy(&logname);
2701                         if (rc)
2702                                 RETURN(rc);
2703                 }
2704         }
2705
2706         RETURN(rc);
2707 }
2708
2709 static int mgs_wlp_lcfg(const struct lu_env *env,
2710                         struct mgs_device *mgs, struct fs_db *fsdb,
2711                         struct mgs_target_info *mti,
2712                         char *logname, struct lustre_cfg_bufs *bufs,
2713                         char *tgtname, char *ptr)
2714 {
2715         char comment[MTI_NAME_MAXLEN];
2716         char *tmp;
2717         struct llog_cfg_rec *lcr;
2718         int rc, del;
2719
2720         /* Erase any old settings of this same parameter */
2721         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2722         comment[MTI_NAME_MAXLEN - 1] = 0;
2723         /* But don't try to match the value. */
2724         tmp = strchr(comment, '=');
2725         if (tmp != NULL)
2726                 *tmp = 0;
2727         /* FIXME we should skip settings that are the same as old values */
2728         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2729         if (rc < 0)
2730                 return rc;
2731         del = mgs_param_empty(ptr);
2732
2733         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2734                       "Setting" : "Modifying", tgtname, comment, logname);
2735         if (del) {
2736                 /* mgs_modify() will return 1 if nothing had to be done */
2737                 if (rc == 1)
2738                         rc = 0;
2739                 return rc;
2740         }
2741
2742         lustre_cfg_bufs_reset(bufs, tgtname);
2743         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2744         if (mti->mti_flags & LDD_F_PARAM2)
2745                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2746
2747         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2748                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2749         if (lcr == NULL)
2750                 return -ENOMEM;
2751
2752         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2753                                   comment);
2754         lustre_cfg_rec_free(lcr);
2755         return rc;
2756 }
2757
2758 static int mgs_write_log_param2(const struct lu_env *env,
2759                                 struct mgs_device *mgs,
2760                                 struct fs_db *fsdb,
2761                                 struct mgs_target_info *mti, char *ptr)
2762 {
2763         struct lustre_cfg_bufs  bufs;
2764         int                     rc = 0;
2765         ENTRY;
2766
2767         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2768         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2769                           mti->mti_svname, ptr);
2770
2771         RETURN(rc);
2772 }
2773
2774 /* write global variable settings into log */
2775 static int mgs_write_log_sys(const struct lu_env *env,
2776                              struct mgs_device *mgs, struct fs_db *fsdb,
2777                              struct mgs_target_info *mti, char *sys, char *ptr)
2778 {
2779         struct mgs_thread_info  *mgi = mgs_env_info(env);
2780         struct lustre_cfg       *lcfg;
2781         struct llog_cfg_rec     *lcr;
2782         char *tmp, sep;
2783         int rc, cmd, convert = 1;
2784
2785         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2786                 cmd = LCFG_SET_TIMEOUT;
2787         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2788                 cmd = LCFG_SET_LDLM_TIMEOUT;
2789         /* Check for known params here so we can return error to lctl */
2790         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2791                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2792                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2793                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2794                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2795                 cmd = LCFG_PARAM;
2796         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2797                 convert = 0; /* Don't convert string value to integer */
2798                 cmd = LCFG_PARAM;
2799         } else {
2800                 return -EINVAL;
2801         }
2802
2803         if (mgs_param_empty(ptr))
2804                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2805         else
2806                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2807
2808         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2809         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2810         if (!convert && *tmp != '\0')
2811                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2812         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2813         if (lcr == NULL)
2814                 return -ENOMEM;
2815
2816         lcfg = &lcr->lcr_cfg;
2817         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2818         /* truncate the comment to the parameter name */
2819         ptr = tmp - 1;
2820         sep = *ptr;
2821         *ptr = '\0';
2822         /* modify all servers and clients */
2823         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2824                                       *tmp == '\0' ? NULL : lcr,
2825                                       mti->mti_fsname, sys, 0);
2826         if (rc == 0 && *tmp != '\0') {
2827                 switch (cmd) {
2828                 case LCFG_SET_TIMEOUT:
2829                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2830                                 class_process_config(lcfg);
2831                         break;
2832                 case LCFG_SET_LDLM_TIMEOUT:
2833                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2834                                 class_process_config(lcfg);
2835                         break;
2836                 default:
2837                         break;
2838                 }
2839         }
2840         *ptr = sep;
2841         lustre_cfg_rec_free(lcr);
2842         return rc;
2843 }
2844
2845 /* write quota settings into log */
2846 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2847                                struct fs_db *fsdb, struct mgs_target_info *mti,
2848                                char *quota, char *ptr)
2849 {
2850         struct mgs_thread_info  *mgi = mgs_env_info(env);
2851         struct llog_cfg_rec     *lcr;
2852         char                    *tmp;
2853         char                     sep;
2854         int                      rc, cmd = LCFG_PARAM;
2855
2856         /* support only 'meta' and 'data' pools so far */
2857         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2858             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2859                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2860                        "& quota.ost are)\n", ptr);
2861                 return -EINVAL;
2862         }
2863
2864         if (*tmp == '\0') {
2865                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2866         } else {
2867                 CDEBUG(D_MGS, "global '%s'\n", quota);
2868
2869                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2870                     strcmp(tmp, "none") != 0) {
2871                         CERROR("enable option(%s) isn't supported\n", tmp);
2872                         return -EINVAL;
2873                 }
2874         }
2875
2876         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2877         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2878         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2879         if (lcr == NULL)
2880                 return -ENOMEM;
2881
2882         /* truncate the comment to the parameter name */
2883         ptr = tmp - 1;
2884         sep = *ptr;
2885         *ptr = '\0';
2886
2887         /* XXX we duplicated quota enable information in all server
2888          *     config logs, it should be moved to a separate config
2889          *     log once we cleanup the config log for global param. */
2890         /* modify all servers */
2891         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2892                                       *tmp == '\0' ? NULL : lcr,
2893                                       mti->mti_fsname, quota, 1);
2894         *ptr = sep;
2895         lustre_cfg_rec_free(lcr);
2896         return rc < 0 ? rc : 0;
2897 }
2898
2899 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2900                                    struct mgs_device *mgs,
2901                                    struct fs_db *fsdb,
2902                                    struct mgs_target_info *mti,
2903                                    char *param)
2904 {
2905         struct mgs_thread_info  *mgi = mgs_env_info(env);
2906         struct llog_cfg_rec     *lcr;
2907         struct llog_handle      *llh = NULL;
2908         char                    *logname;
2909         char                    *comment, *ptr;
2910         int                      rc, len;
2911
2912         ENTRY;
2913
2914         /* get comment */
2915         ptr = strchr(param, '=');
2916         LASSERT(ptr != NULL);
2917         len = ptr - param;
2918
2919         OBD_ALLOC(comment, len + 1);
2920         if (comment == NULL)
2921                 RETURN(-ENOMEM);
2922         strncpy(comment, param, len);
2923         comment[len] = '\0';
2924
2925         /* prepare lcfg */
2926         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2927         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2928         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2929         if (lcr == NULL)
2930                 GOTO(out_comment, rc = -ENOMEM);
2931
2932         /* construct log name */
2933         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2934         if (rc < 0)
2935                 GOTO(out_lcfg, rc);
2936
2937         if (mgs_log_is_empty(env, mgs, logname)) {
2938                 rc = record_start_log(env, mgs, &llh, logname);
2939                 if (rc < 0)
2940                         GOTO(out, rc);
2941                 record_end_log(env, &llh);
2942         }
2943
2944         /* obsolete old one */
2945         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2946                         comment, CM_SKIP);
2947         if (rc < 0)
2948                 GOTO(out, rc);
2949         /* write the new one */
2950         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2951                                   mti->mti_svname, comment);
2952         if (rc)
2953                 CERROR("%s: error writing log %s: rc = %d\n",
2954                        mgs->mgs_obd->obd_name, logname, rc);
2955 out:
2956         name_destroy(&logname);
2957 out_lcfg:
2958         lustre_cfg_rec_free(lcr);
2959 out_comment:
2960         OBD_FREE(comment, len + 1);
2961         RETURN(rc);
2962 }
2963
2964 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2965                                         char *param)
2966 {
2967         char    *ptr;
2968
2969         /* disable the adjustable udesc parameter for now, i.e. use default
2970          * setting that client always ship udesc to MDT if possible. to enable
2971          * it simply remove the following line */
2972         goto error_out;
2973
2974         ptr = strchr(param, '=');
2975         if (ptr == NULL)
2976                 goto error_out;
2977         *ptr++ = '\0';
2978
2979         if (strcmp(param, PARAM_SRPC_UDESC))
2980                 goto error_out;
2981
2982         if (strcmp(ptr, "yes") == 0) {
2983                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2984                 CWARN("Enable user descriptor shipping from client to MDT\n");
2985         } else if (strcmp(ptr, "no") == 0) {
2986                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2987                 CWARN("Disable user descriptor shipping from client to MDT\n");
2988         } else {
2989                 *(ptr - 1) = '=';
2990                 goto error_out;
2991         }
2992         return 0;
2993
2994 error_out:
2995         CERROR("Invalid param: %s\n", param);
2996         return -EINVAL;
2997 }
2998
2999 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
3000                                   const char *svname,
3001                                   char *param)
3002 {
3003         struct sptlrpc_rule      rule;
3004         struct sptlrpc_rule_set *rset;
3005         int                      rc;
3006         ENTRY;
3007
3008         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
3009                 CERROR("Invalid sptlrpc parameter: %s\n", param);
3010                 RETURN(-EINVAL);
3011         }
3012
3013         if (strncmp(param, PARAM_SRPC_UDESC,
3014                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
3015                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
3016         }
3017
3018         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
3019                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
3020                 RETURN(-EINVAL);
3021         }
3022
3023         param += sizeof(PARAM_SRPC_FLVR) - 1;
3024
3025         rc = sptlrpc_parse_rule(param, &rule);
3026         if (rc)
3027                 RETURN(rc);
3028
3029         /* mgs rules implies must be mgc->mgs */
3030         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3031                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3032                      rule.sr_from != LUSTRE_SP_ANY) ||
3033                     (rule.sr_to != LUSTRE_SP_MGS &&
3034                      rule.sr_to != LUSTRE_SP_ANY))
3035                         RETURN(-EINVAL);
3036         }
3037
3038         /* preapre room for this coming rule. svcname format should be:
3039          * - fsname: general rule
3040          * - fsname-tgtname: target-specific rule
3041          */
3042         if (strchr(svname, '-')) {
3043                 struct mgs_tgt_srpc_conf *tgtconf;
3044                 int                       found = 0;
3045
3046                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3047                      tgtconf = tgtconf->mtsc_next) {
3048                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3049                                 found = 1;
3050                                 break;
3051                         }
3052                 }
3053
3054                 if (!found) {
3055                         int name_len;
3056
3057                         OBD_ALLOC_PTR(tgtconf);
3058                         if (tgtconf == NULL)
3059                                 RETURN(-ENOMEM);
3060
3061                         name_len = strlen(svname);
3062
3063                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3064                         if (tgtconf->mtsc_tgt == NULL) {
3065                                 OBD_FREE_PTR(tgtconf);
3066                                 RETURN(-ENOMEM);
3067                         }
3068                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3069
3070                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3071                         fsdb->fsdb_srpc_tgt = tgtconf;
3072                 }
3073
3074                 rset = &tgtconf->mtsc_rset;
3075         } else if (strcmp(svname, MGSSELF_NAME) == 0) {
3076                 /* put _mgs related srpc rule directly in mgs ruleset */
3077                 rset = &fsdb->fsdb_mgs->mgs_lut.lut_sptlrpc_rset;
3078         } else {
3079                 rset = &fsdb->fsdb_srpc_gen;
3080         }
3081
3082         rc = sptlrpc_rule_set_merge(rset, &rule);
3083
3084         RETURN(rc);
3085 }
3086
3087 static int mgs_srpc_set_param(const struct lu_env *env,
3088                               struct mgs_device *mgs,
3089                               struct fs_db *fsdb,
3090                               struct mgs_target_info *mti,
3091                               char *param)
3092 {
3093         char                   *copy;
3094         int                     rc, copy_size;
3095         ENTRY;
3096
3097 #ifndef HAVE_GSS
3098         RETURN(-EINVAL);
3099 #endif
3100         /* keep a copy of original param, which could be destroied
3101          * during parsing */
3102         copy_size = strlen(param) + 1;
3103         OBD_ALLOC(copy, copy_size);
3104         if (copy == NULL)
3105                 return -ENOMEM;
3106         memcpy(copy, param, copy_size);
3107
3108         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3109         if (rc)
3110                 goto out_free;
3111
3112         /* previous steps guaranteed the syntax is correct */
3113         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3114         if (rc)
3115                 goto out_free;
3116
3117         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3118                 /*
3119                  * for mgs rules, make them effective immediately.
3120                  */
3121                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3122                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3123                                                  &fsdb->fsdb_srpc_gen);
3124         }
3125
3126 out_free:
3127         OBD_FREE(copy, copy_size);
3128         RETURN(rc);
3129 }
3130
3131 struct mgs_srpc_read_data {
3132         struct fs_db   *msrd_fsdb;
3133         int             msrd_skip;
3134 };
3135
3136 static int mgs_srpc_read_handler(const struct lu_env *env,
3137                                  struct llog_handle *llh,
3138                                  struct llog_rec_hdr *rec, void *data)
3139 {
3140         struct mgs_srpc_read_data *msrd = data;
3141         struct cfg_marker         *marker;
3142         struct lustre_cfg         *lcfg = REC_DATA(rec);
3143         char                      *svname, *param;
3144         int                        cfg_len, rc;
3145         ENTRY;
3146
3147         if (rec->lrh_type != OBD_CFG_REC) {
3148                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3149                 RETURN(-EINVAL);
3150         }
3151
3152         cfg_len = REC_DATA_LEN(rec);
3153
3154         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3155         if (rc) {
3156                 CERROR("Insane cfg\n");
3157                 RETURN(rc);
3158         }
3159
3160         if (lcfg->lcfg_command == LCFG_MARKER) {
3161                 marker = lustre_cfg_buf(lcfg, 1);
3162
3163                 if (marker->cm_flags & CM_START &&
3164                     marker->cm_flags & CM_SKIP)
3165                         msrd->msrd_skip = 1;
3166                 if (marker->cm_flags & CM_END)
3167                         msrd->msrd_skip = 0;
3168
3169                 RETURN(0);
3170         }
3171
3172         if (msrd->msrd_skip)
3173                 RETURN(0);
3174
3175         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3176                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3177                 RETURN(0);
3178         }
3179
3180         svname = lustre_cfg_string(lcfg, 0);
3181         if (svname == NULL) {
3182                 CERROR("svname is empty\n");
3183                 RETURN(0);
3184         }
3185
3186         param = lustre_cfg_string(lcfg, 1);
3187         if (param == NULL) {
3188                 CERROR("param is empty\n");
3189                 RETURN(0);
3190         }
3191
3192         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3193         if (rc)
3194                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3195
3196         RETURN(0);
3197 }
3198
3199 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3200                                 struct mgs_device *mgs,
3201                                 struct fs_db *fsdb)
3202 {
3203         struct llog_handle        *llh = NULL;
3204         struct llog_ctxt          *ctxt;
3205         char                      *logname;
3206         struct mgs_srpc_read_data  msrd;
3207         int                        rc;
3208         ENTRY;
3209
3210         /* construct log name */
3211         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3212         if (rc)
3213                 RETURN(rc);
3214
3215         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3216         LASSERT(ctxt != NULL);
3217
3218         if (mgs_log_is_empty(env, mgs, logname))
3219                 GOTO(out, rc = 0);
3220
3221         rc = llog_open(env, ctxt, &llh, NULL, logname,
3222                        LLOG_OPEN_EXISTS);
3223         if (rc < 0) {
3224                 if (rc == -ENOENT)
3225                         rc = 0;
3226                 GOTO(out, rc);
3227         }
3228
3229         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3230         if (rc)
3231                 GOTO(out_close, rc);
3232
3233         if (llog_get_size(llh) <= 1)
3234                 GOTO(out_close, rc = 0);
3235
3236         msrd.msrd_fsdb = fsdb;
3237         msrd.msrd_skip = 0;
3238
3239         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3240                           NULL);
3241
3242 out_close:
3243         llog_close(env, llh);
3244 out:
3245         llog_ctxt_put(ctxt);
3246         name_destroy(&logname);
3247
3248         if (rc)