Whamcloud - gitweb
LU-2675 lustre: remove lustre/include/linux/
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_ioctl.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 /* Find all logs in CONFIG directory and link then into list */
59 int class_dentry_readdir(const struct lu_env *env,
60                          struct mgs_device *mgs, struct list_head *log_list)
61 {
62         struct dt_object    *dir = mgs->mgs_configs_dir;
63         const struct dt_it_ops *iops;
64         struct dt_it        *it;
65         struct mgs_direntry *de;
66         char                *key;
67         int                  rc, key_sz;
68
69         INIT_LIST_HEAD(log_list);
70
71         LASSERT(dir);
72         LASSERT(dir->do_index_ops);
73
74         iops = &dir->do_index_ops->dio_it;
75         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
76         if (IS_ERR(it))
77                 RETURN(PTR_ERR(it));
78
79         rc = iops->load(env, it, 0);
80         if (rc <= 0)
81                 GOTO(fini, rc = 0);
82
83         /* main cycle */
84         do {
85                 key = (void *)iops->key(env, it);
86                 if (IS_ERR(key)) {
87                         CERROR("%s: key failed when listing %s: rc = %d\n",
88                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
89                                (int) PTR_ERR(key));
90                         goto next;
91                 }
92                 key_sz = iops->key_size(env, it);
93                 LASSERT(key_sz > 0);
94
95                 /* filter out "." and ".." entries */
96                 if (key[0] == '.') {
97                         if (key_sz == 1)
98                                 goto next;
99                         if (key_sz == 2 && key[1] == '.')
100                                 goto next;
101                 }
102
103                 de = mgs_direntry_alloc(key_sz + 1);
104                 if (de == NULL) {
105                         rc = -ENOMEM;
106                         break;
107                 }
108
109                 memcpy(de->mde_name, key, key_sz);
110                 de->mde_name[key_sz] = 0;
111
112                 list_add(&de->mde_list, log_list);
113
114 next:
115                 rc = iops->next(env, it);
116         } while (rc == 0);
117         rc = 0;
118
119         iops->put(env, it);
120
121 fini:
122         iops->fini(env, it);
123         if (rc)
124                 CERROR("%s: key failed when listing %s: rc = %d\n",
125                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
126         RETURN(rc);
127 }
128
129 /******************** DB functions *********************/
130
131 static inline int name_create(char **newname, char *prefix, char *suffix)
132 {
133         LASSERT(newname);
134         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
135         if (!*newname)
136                 return -ENOMEM;
137         sprintf(*newname, "%s%s", prefix, suffix);
138         return 0;
139 }
140
141 static inline void name_destroy(char **name)
142 {
143         if (*name)
144                 OBD_FREE(*name, strlen(*name) + 1);
145         *name = NULL;
146 }
147
148 struct mgs_fsdb_handler_data
149 {
150         struct fs_db   *fsdb;
151         __u32           ver;
152 };
153
154 /* from the (client) config log, figure out:
155         1. which ost's/mdt's are configured (by index)
156         2. what the last config step is
157         3. COMPAT_18 osc name
158 */
159 /* It might be better to have a separate db file, instead of parsing the info
160    out of the client log.  This is slow and potentially error-prone. */
161 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
162                             struct llog_rec_hdr *rec, void *data)
163 {
164         struct mgs_fsdb_handler_data *d = data;
165         struct fs_db *fsdb = d->fsdb;
166         int cfg_len = rec->lrh_len;
167         char *cfg_buf = (char*) (rec + 1);
168         struct lustre_cfg *lcfg;
169         __u32 index;
170         int rc = 0;
171         ENTRY;
172
173         if (rec->lrh_type != OBD_CFG_REC) {
174                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
175                 RETURN(-EINVAL);
176         }
177
178         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
179         if (rc) {
180                 CERROR("Insane cfg\n");
181                 RETURN(rc);
182         }
183
184         lcfg = (struct lustre_cfg *)cfg_buf;
185
186         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
187                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
188
189         /* Figure out ost indicies */
190         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
191         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
192             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
193                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
194                                        NULL, 10);
195                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
196                        lustre_cfg_string(lcfg, 1), index,
197                        lustre_cfg_string(lcfg, 2));
198                 set_bit(index, fsdb->fsdb_ost_index_map);
199         }
200
201         /* Figure out mdt indicies */
202         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
203         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
204             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
205                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
206                                        &index, NULL);
207                 if (rc != LDD_F_SV_TYPE_MDT) {
208                         CWARN("Unparsable MDC name %s, assuming index 0\n",
209                               lustre_cfg_string(lcfg, 0));
210                         index = 0;
211                 }
212                 rc = 0;
213                 CDEBUG(D_MGS, "MDT index is %u\n", index);
214                 set_bit(index, fsdb->fsdb_mdt_index_map);
215                 fsdb->fsdb_mdt_count ++;
216         }
217
218         /**
219          * figure out the old config. fsdb_gen = 0 means old log
220          * It is obsoleted and not supported anymore
221          */
222         if (fsdb->fsdb_gen == 0) {
223                 CERROR("Old config format is not supported\n");
224                 RETURN(-EINVAL);
225         }
226
227         /*
228          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
229          */
230         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
231             lcfg->lcfg_command == LCFG_ATTACH &&
232             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
233                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
234                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
235                         CWARN("MDT using 1.8 OSC name scheme\n");
236                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
237                 }
238         }
239
240         if (lcfg->lcfg_command == LCFG_MARKER) {
241                 struct cfg_marker *marker;
242                 marker = lustre_cfg_buf(lcfg, 1);
243
244                 d->ver = marker->cm_vers;
245
246                 /* Keep track of the latest marker step */
247                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
248         }
249
250         RETURN(rc);
251 }
252
253 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
254 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
255                                   struct mgs_device *mgs,
256                                   struct fs_db *fsdb)
257 {
258         char                            *logname;
259         struct llog_handle              *loghandle;
260         struct llog_ctxt                *ctxt;
261         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
262         int rc;
263
264         ENTRY;
265
266         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
267         LASSERT(ctxt != NULL);
268         rc = name_create(&logname, fsdb->fsdb_name, "-client");
269         if (rc)
270                 GOTO(out_put, rc);
271         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
276         if (rc)
277                 GOTO(out_close, rc);
278
279         if (llog_get_size(loghandle) <= 1)
280                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
281
282         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
283         CDEBUG(D_INFO, "get_db = %d\n", rc);
284 out_close:
285         llog_close(env, loghandle);
286 out_pop:
287         name_destroy(&logname);
288 out_put:
289         llog_ctxt_put(ctxt);
290
291         RETURN(rc);
292 }
293
294 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
295 {
296         struct mgs_tgt_srpc_conf *tgtconf;
297
298         /* free target-specific rules */
299         while (fsdb->fsdb_srpc_tgt) {
300                 tgtconf = fsdb->fsdb_srpc_tgt;
301                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
302
303                 LASSERT(tgtconf->mtsc_tgt);
304
305                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
306                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
307                 OBD_FREE_PTR(tgtconf);
308         }
309
310         /* free general rules */
311         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
312 }
313
314 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
315 {
316         struct fs_db *fsdb;
317         struct list_head *tmp;
318
319         list_for_each(tmp, &mgs->mgs_fs_db_list) {
320                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
321                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
322                         return fsdb;
323         }
324         return NULL;
325 }
326
327 /* caller must hold the mgs->mgs_fs_db_lock */
328 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
329                                   struct mgs_device *mgs, char *fsname)
330 {
331         struct fs_db *fsdb;
332         int rc;
333         ENTRY;
334
335         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
336                 CERROR("fsname %s is too long\n", fsname);
337                 RETURN(NULL);
338         }
339
340         OBD_ALLOC_PTR(fsdb);
341         if (!fsdb)
342                 RETURN(NULL);
343
344         strcpy(fsdb->fsdb_name, fsname);
345         mutex_init(&fsdb->fsdb_mutex);
346         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
347         fsdb->fsdb_gen = 1;
348
349         if (strcmp(fsname, MGSSELF_NAME) == 0) {
350                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
351         } else {
352                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
353                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
354                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
355                         CERROR("No memory for index maps\n");
356                         GOTO(err, rc = -ENOMEM);
357                 }
358
359                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
360                 if (rc)
361                         GOTO(err, rc);
362                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
363                 if (rc)
364                         GOTO(err, rc);
365
366                 /* initialise data for NID table */
367                 mgs_ir_init_fs(env, mgs, fsdb);
368
369                 lproc_mgs_add_live(mgs, fsdb);
370         }
371
372         list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
373
374         RETURN(fsdb);
375 err:
376         if (fsdb->fsdb_ost_index_map)
377                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
378         if (fsdb->fsdb_mdt_index_map)
379                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
380         name_destroy(&fsdb->fsdb_clilov);
381         name_destroy(&fsdb->fsdb_clilmv);
382         OBD_FREE_PTR(fsdb);
383         RETURN(NULL);
384 }
385
386 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
387 {
388         /* wait for anyone with the sem */
389         mutex_lock(&fsdb->fsdb_mutex);
390         lproc_mgs_del_live(mgs, fsdb);
391         list_del(&fsdb->fsdb_list);
392
393         /* deinitialize fsr */
394         mgs_ir_fini_fs(mgs, fsdb);
395
396         if (fsdb->fsdb_ost_index_map)
397                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
398         if (fsdb->fsdb_mdt_index_map)
399                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
400         name_destroy(&fsdb->fsdb_clilov);
401         name_destroy(&fsdb->fsdb_clilmv);
402         mgs_free_fsdb_srpc(fsdb);
403         mutex_unlock(&fsdb->fsdb_mutex);
404         OBD_FREE_PTR(fsdb);
405 }
406
407 int mgs_init_fsdb_list(struct mgs_device *mgs)
408 {
409         INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
410         return 0;
411 }
412
413 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
414 {
415         struct fs_db *fsdb;
416         struct list_head *tmp, *tmp2;
417
418         mutex_lock(&mgs->mgs_mutex);
419         list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
420                 fsdb = list_entry(tmp, struct fs_db, fsdb_list);
421                 mgs_free_fsdb(mgs, fsdb);
422         }
423         mutex_unlock(&mgs->mgs_mutex);
424         return 0;
425 }
426
427 int mgs_find_or_make_fsdb(const struct lu_env *env,
428                           struct mgs_device *mgs, char *name,
429                           struct fs_db **dbh)
430 {
431         struct fs_db *fsdb;
432         int rc = 0;
433
434         ENTRY;
435         mutex_lock(&mgs->mgs_mutex);
436         fsdb = mgs_find_fsdb(mgs, name);
437         if (fsdb) {
438                 mutex_unlock(&mgs->mgs_mutex);
439                 *dbh = fsdb;
440                 RETURN(0);
441         }
442
443         CDEBUG(D_MGS, "Creating new db\n");
444         fsdb = mgs_new_fsdb(env, mgs, name);
445         /* lock fsdb_mutex until the db is loaded from llogs */
446         if (fsdb)
447                 mutex_lock(&fsdb->fsdb_mutex);
448         mutex_unlock(&mgs->mgs_mutex);
449         if (!fsdb)
450                 RETURN(-ENOMEM);
451
452         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
453                 /* populate the db from the client llog */
454                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
455                 if (rc) {
456                         CERROR("Can't get db from client log %d\n", rc);
457                         GOTO(out_free, rc);
458                 }
459         }
460
461         /* populate srpc rules from params llog */
462         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
463         if (rc) {
464                 CERROR("Can't get db from params log %d\n", rc);
465                 GOTO(out_free, rc);
466         }
467
468         mutex_unlock(&fsdb->fsdb_mutex);
469         *dbh = fsdb;
470
471         RETURN(0);
472
473 out_free:
474         mutex_unlock(&fsdb->fsdb_mutex);
475         mgs_free_fsdb(mgs, fsdb);
476         return rc;
477 }
478
479 /* 1 = index in use
480    0 = index unused
481    -1= empty client log */
482 int mgs_check_index(const struct lu_env *env,
483                     struct mgs_device *mgs,
484                     struct mgs_target_info *mti)
485 {
486         struct fs_db *fsdb;
487         void *imap;
488         int rc = 0;
489         ENTRY;
490
491         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
492
493         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
494         if (rc) {
495                 CERROR("Can't get db for %s\n", mti->mti_fsname);
496                 RETURN(rc);
497         }
498
499         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
500                 RETURN(-1);
501
502         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
503                 imap = fsdb->fsdb_ost_index_map;
504         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
505                 imap = fsdb->fsdb_mdt_index_map;
506         else
507                 RETURN(-EINVAL);
508
509         if (test_bit(mti->mti_stripe_index, imap))
510                 RETURN(1);
511         RETURN(0);
512 }
513
514 static __inline__ int next_index(void *index_map, int map_len)
515 {
516         int i;
517         for (i = 0; i < map_len * 8; i++)
518                 if (!test_bit(i, index_map)) {
519                          return i;
520                  }
521         CERROR("max index %d exceeded.\n", i);
522         return -1;
523 }
524
525 /* Return codes:
526         0  newly marked as in use
527         <0 err
528         +EALREADY for update of an old index */
529 static int mgs_set_index(const struct lu_env *env,
530                          struct mgs_device *mgs,
531                          struct mgs_target_info *mti)
532 {
533         struct fs_db *fsdb;
534         void *imap;
535         int rc = 0;
536         ENTRY;
537
538         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
539         if (rc) {
540                 CERROR("Can't get db for %s\n", mti->mti_fsname);
541                 RETURN(rc);
542         }
543
544         mutex_lock(&fsdb->fsdb_mutex);
545         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
546                 imap = fsdb->fsdb_ost_index_map;
547         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
548                 imap = fsdb->fsdb_mdt_index_map;
549         } else {
550                 GOTO(out_up, rc = -EINVAL);
551         }
552
553         if (mti->mti_flags & LDD_F_NEED_INDEX) {
554                 rc = next_index(imap, INDEX_MAP_SIZE);
555                 if (rc == -1)
556                         GOTO(out_up, rc = -ERANGE);
557                 mti->mti_stripe_index = rc;
558                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
559                         fsdb->fsdb_mdt_count ++;
560         }
561
562         /* the last index(0xffff) is reserved for default value. */
563         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8 - 1) {
564                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %u, "
565                                    "but index must be less than %u.\n",
566                                    mti->mti_svname, mti->mti_stripe_index,
567                                    INDEX_MAP_SIZE * 8 - 1);
568                 GOTO(out_up, rc = -ERANGE);
569         }
570
571         if (test_bit(mti->mti_stripe_index, imap)) {
572                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
573                     !(mti->mti_flags & LDD_F_WRITECONF)) {
574                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
575                                            "%d, but that index is already in "
576                                            "use. Use --writeconf to force\n",
577                                            mti->mti_svname,
578                                            mti->mti_stripe_index);
579                         GOTO(out_up, rc = -EADDRINUSE);
580                 } else {
581                         CDEBUG(D_MGS, "Server %s updating index %d\n",
582                                mti->mti_svname, mti->mti_stripe_index);
583                         GOTO(out_up, rc = EALREADY);
584                 }
585         }
586
587         set_bit(mti->mti_stripe_index, imap);
588         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
589         mutex_unlock(&fsdb->fsdb_mutex);
590         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
591                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
592
593         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
594                mti->mti_stripe_index);
595
596         RETURN(0);
597 out_up:
598         mutex_unlock(&fsdb->fsdb_mutex);
599         return rc;
600 }
601
602 struct mgs_modify_lookup {
603         struct cfg_marker mml_marker;
604         int               mml_modified;
605 };
606
607 static int mgs_modify_handler(const struct lu_env *env,
608                               struct llog_handle *llh,
609                               struct llog_rec_hdr *rec, void *data)
610 {
611         struct mgs_modify_lookup *mml = data;
612         struct cfg_marker *marker;
613         struct lustre_cfg *lcfg = REC_DATA(rec);
614         int cfg_len = REC_DATA_LEN(rec);
615         int rc;
616         ENTRY;
617
618         if (rec->lrh_type != OBD_CFG_REC) {
619                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
620                 RETURN(-EINVAL);
621         }
622
623         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
624         if (rc) {
625                 CERROR("Insane cfg\n");
626                 RETURN(rc);
627         }
628
629         /* We only care about markers */
630         if (lcfg->lcfg_command != LCFG_MARKER)
631                 RETURN(0);
632
633         marker = lustre_cfg_buf(lcfg, 1);
634         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
635             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
636             !(marker->cm_flags & CM_SKIP)) {
637                 /* Found a non-skipped marker match */
638                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
639                        rec->lrh_index, marker->cm_step,
640                        marker->cm_flags, mml->mml_marker.cm_flags,
641                        marker->cm_tgtname, marker->cm_comment);
642                 /* Overwrite the old marker llog entry */
643                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
644                 marker->cm_flags |= mml->mml_marker.cm_flags;
645                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
646                 rc = llog_write(env, llh, rec, rec->lrh_index);
647                 if (!rc)
648                          mml->mml_modified++;
649         }
650
651         RETURN(rc);
652 }
653
654 /**
655  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
656  * Return code:
657  * 0 - modified successfully,
658  * 1 - no modification was done
659  * negative - error
660  */
661 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
662                       struct fs_db *fsdb, struct mgs_target_info *mti,
663                       char *logname, char *devname, char *comment, int flags)
664 {
665         struct llog_handle *loghandle;
666         struct llog_ctxt *ctxt;
667         struct mgs_modify_lookup *mml;
668         int rc;
669
670         ENTRY;
671
672         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
673         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
674                flags);
675
676         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
677         LASSERT(ctxt != NULL);
678         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
679         if (rc < 0) {
680                 if (rc == -ENOENT)
681                         rc = 0;
682                 GOTO(out_pop, rc);
683         }
684
685         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
686         if (rc)
687                 GOTO(out_close, rc);
688
689         if (llog_get_size(loghandle) <= 1)
690                 GOTO(out_close, rc = 0);
691
692         OBD_ALLOC_PTR(mml);
693         if (!mml)
694                 GOTO(out_close, rc = -ENOMEM);
695         if (strlcpy(mml->mml_marker.cm_comment, comment,
696                     sizeof(mml->mml_marker.cm_comment)) >=
697             sizeof(mml->mml_marker.cm_comment))
698                 GOTO(out_free, rc = -E2BIG);
699         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
700                     sizeof(mml->mml_marker.cm_tgtname)) >=
701             sizeof(mml->mml_marker.cm_tgtname))
702                 GOTO(out_free, rc = -E2BIG);
703         /* Modify mostly means cancel */
704         mml->mml_marker.cm_flags = flags;
705         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
706         mml->mml_modified = 0;
707         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
708                           NULL);
709         if (!rc && !mml->mml_modified)
710                 rc = 1;
711
712 out_free:
713         OBD_FREE_PTR(mml);
714
715 out_close:
716         llog_close(env, loghandle);
717 out_pop:
718         if (rc < 0)
719                 CERROR("%s: modify %s/%s failed: rc = %d\n",
720                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
721         llog_ctxt_put(ctxt);
722         RETURN(rc);
723 }
724
725 /** This structure is passed to mgs_replace_handler */
726 struct mgs_replace_uuid_lookup {
727         /* Nids are replaced for this target device */
728         struct mgs_target_info target;
729         /* Temporary modified llog */
730         struct llog_handle *temp_llh;
731         /* Flag is set if in target block*/
732         int in_target_device;
733         /* Nids already added. Just skip (multiple nids) */
734         int device_nids_added;
735         /* Flag is set if this block should not be copied */
736         int skip_it;
737 };
738
739 /**
740  * Check: a) if block should be skipped
741  * b) is it target block
742  *
743  * \param[in] lcfg
744  * \param[in] mrul
745  *
746  * \retval 0 should not to be skipped
747  * \retval 1 should to be skipped
748  */
749 static int check_markers(struct lustre_cfg *lcfg,
750                          struct mgs_replace_uuid_lookup *mrul)
751 {
752          struct cfg_marker *marker;
753
754         /* Track markers. Find given device */
755         if (lcfg->lcfg_command == LCFG_MARKER) {
756                 marker = lustre_cfg_buf(lcfg, 1);
757                 /* Clean llog from records marked as CM_EXCLUDE.
758                    CM_SKIP records are used for "active" command
759                    and can be restored if needed */
760                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
761                     (CM_EXCLUDE | CM_START)) {
762                         mrul->skip_it = 1;
763                         return 1;
764                 }
765
766                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
767                     (CM_EXCLUDE | CM_END)) {
768                         mrul->skip_it = 0;
769                         return 1;
770                 }
771
772                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
773                         LASSERT(!(marker->cm_flags & CM_START) ||
774                                 !(marker->cm_flags & CM_END));
775                         if (marker->cm_flags & CM_START) {
776                                 mrul->in_target_device = 1;
777                                 mrul->device_nids_added = 0;
778                         } else if (marker->cm_flags & CM_END)
779                                 mrul->in_target_device = 0;
780                 }
781         }
782
783         return 0;
784 }
785
786 static int record_base(const struct lu_env *env, struct llog_handle *llh,
787                      char *cfgname, lnet_nid_t nid, int cmd,
788                      char *s1, char *s2, char *s3, char *s4)
789 {
790         struct mgs_thread_info  *mgi = mgs_env_info(env);
791         struct llog_cfg_rec     *lcr;
792         int rc;
793
794         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
795                cmd, s1, s2, s3, s4);
796
797         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
798         if (s1)
799                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
800         if (s2)
801                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
802         if (s3)
803                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
804         if (s4)
805                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
806
807         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
808         if (lcr == NULL)
809                 return -ENOMEM;
810
811         lcr->lcr_cfg.lcfg_nid = nid;
812         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
813
814         lustre_cfg_rec_free(lcr);
815
816         if (rc < 0)
817                 CDEBUG(D_MGS,
818                        "failed to write lcfg %s %#x %s %s %s %s: rc = %d\n",
819                        cfgname, cmd, s1, s2, s3, s4, rc);
820         return rc;
821 }
822
823 static inline int record_add_uuid(const struct lu_env *env,
824                                   struct llog_handle *llh,
825                                   uint64_t nid, char *uuid)
826 {
827         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
828 }
829
830 static inline int record_add_conn(const struct lu_env *env,
831                                   struct llog_handle *llh,
832                                   char *devname, char *uuid)
833 {
834         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
835 }
836
837 static inline int record_attach(const struct lu_env *env,
838                                 struct llog_handle *llh, char *devname,
839                                 char *type, char *uuid)
840 {
841         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
842 }
843
844 static inline int record_setup(const struct lu_env *env,
845                                struct llog_handle *llh, char *devname,
846                                char *s1, char *s2, char *s3, char *s4)
847 {
848         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
849 }
850
851 /**
852  * \retval <0 record processing error
853  * \retval n record is processed. No need copy original one.
854  * \retval 0 record is not processed.
855  */
856 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
857                            struct mgs_replace_uuid_lookup *mrul)
858 {
859         int nids_added = 0;
860         lnet_nid_t nid;
861         char *ptr;
862         int rc;
863
864         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
865                 /* LCFG_ADD_UUID command found. Let's skip original command
866                    and add passed nids */
867                 ptr = mrul->target.mti_params;
868                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
869                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
870                                "device %s\n", libcfs_nid2str(nid),
871                                 mrul->target.mti_params,
872                                 mrul->target.mti_svname);
873                         rc = record_add_uuid(env,
874                                              mrul->temp_llh, nid,
875                                              mrul->target.mti_params);
876                         if (!rc)
877                                 nids_added++;
878                 }
879
880                 if (nids_added == 0) {
881                         CERROR("No new nids were added, nid %s with uuid %s, "
882                                "device %s\n", libcfs_nid2str(nid),
883                                mrul->target.mti_params,
884                                mrul->target.mti_svname);
885                         RETURN(-ENXIO);
886                 } else {
887                         mrul->device_nids_added = 1;
888                 }
889
890                 return nids_added;
891         }
892
893         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
894                 /* LCFG_SETUP command found. UUID should be changed */
895                 rc = record_setup(env,
896                                   mrul->temp_llh,
897                                   /* devname the same */
898                                   lustre_cfg_string(lcfg, 0),
899                                   /* s1 is not changed */
900                                   lustre_cfg_string(lcfg, 1),
901                                   /* new uuid should be
902                                   the full nidlist */
903                                   mrul->target.mti_params,
904                                   /* s3 is not changed */
905                                   lustre_cfg_string(lcfg, 3),
906                                   /* s4 is not changed */
907                                   lustre_cfg_string(lcfg, 4));
908                 return rc ? rc : 1;
909         }
910
911         /* Another commands in target device block */
912         return 0;
913 }
914
915 /**
916  * Handler that called for every record in llog.
917  * Records are processed in order they placed in llog.
918  *
919  * \param[in] llh       log to be processed
920  * \param[in] rec       current record
921  * \param[in] data      mgs_replace_uuid_lookup structure
922  *
923  * \retval 0    success
924  */
925 static int mgs_replace_handler(const struct lu_env *env,
926                                struct llog_handle *llh,
927                                struct llog_rec_hdr *rec,
928                                void *data)
929 {
930         struct mgs_replace_uuid_lookup *mrul;
931         struct lustre_cfg *lcfg = REC_DATA(rec);
932         int cfg_len = REC_DATA_LEN(rec);
933         int rc;
934         ENTRY;
935
936         mrul = (struct mgs_replace_uuid_lookup *)data;
937
938         if (rec->lrh_type != OBD_CFG_REC) {
939                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
940                        rec->lrh_type, lcfg->lcfg_command,
941                        lustre_cfg_string(lcfg, 0),
942                        lustre_cfg_string(lcfg, 1));
943                 RETURN(-EINVAL);
944         }
945
946         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
947         if (rc) {
948                 /* Do not copy any invalidated records */
949                 GOTO(skip_out, rc = 0);
950         }
951
952         rc = check_markers(lcfg, mrul);
953         if (rc || mrul->skip_it)
954                 GOTO(skip_out, rc = 0);
955
956         /* Write to new log all commands outside target device block */
957         if (!mrul->in_target_device)
958                 GOTO(copy_out, rc = 0);
959
960         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
961            (failover nids) for this target, assuming that if then
962            primary is changing then so is the failover */
963         if (mrul->device_nids_added &&
964             (lcfg->lcfg_command == LCFG_ADD_UUID ||
965              lcfg->lcfg_command == LCFG_ADD_CONN))
966                 GOTO(skip_out, rc = 0);
967
968         rc = process_command(env, lcfg, mrul);
969         if (rc < 0)
970                 RETURN(rc);
971
972         if (rc)
973                 RETURN(0);
974 copy_out:
975         /* Record is placed in temporary llog as is */
976         rc = llog_write(env, mrul->temp_llh, rec, LLOG_NEXT_IDX);
977
978         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
979                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
980                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
981         RETURN(rc);
982
983 skip_out:
984         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
985                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
986                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
987         RETURN(rc);
988 }
989
990 static int mgs_log_is_empty(const struct lu_env *env,
991                             struct mgs_device *mgs, char *name)
992 {
993         struct llog_ctxt        *ctxt;
994         int                      rc;
995
996         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
997         LASSERT(ctxt != NULL);
998
999         rc = llog_is_empty(env, ctxt, name);
1000         llog_ctxt_put(ctxt);
1001         return rc;
1002 }
1003
1004 static int mgs_replace_nids_log(const struct lu_env *env,
1005                                 struct obd_device *mgs, struct fs_db *fsdb,
1006                                 char *logname, char *devname, char *nids)
1007 {
1008         struct llog_handle *orig_llh, *backup_llh;
1009         struct llog_ctxt *ctxt;
1010         struct mgs_replace_uuid_lookup *mrul;
1011         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1012         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1013         char *backup;
1014         int rc, rc2;
1015         ENTRY;
1016
1017         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1018
1019         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1020         LASSERT(ctxt != NULL);
1021
1022         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1023                 /* Log is empty. Nothing to replace */
1024                 GOTO(out_put, rc = 0);
1025         }
1026
1027         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1028         if (backup == NULL)
1029                 GOTO(out_put, rc = -ENOMEM);
1030
1031         sprintf(backup, "%s.bak", logname);
1032
1033         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1034         if (rc == 0) {
1035                 /* Now erase original log file. Connections are not allowed.
1036                    Backup is already saved */
1037                 rc = llog_erase(env, ctxt, NULL, logname);
1038                 if (rc < 0)
1039                         GOTO(out_free, rc);
1040         } else if (rc != -ENOENT) {
1041                 CERROR("%s: can't make backup for %s: rc = %d\n",
1042                        mgs->obd_name, logname, rc);
1043                 GOTO(out_free,rc);
1044         }
1045
1046         /* open local log */
1047         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1048         if (rc)
1049                 GOTO(out_restore, rc);
1050
1051         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1052         if (rc)
1053                 GOTO(out_closel, rc);
1054
1055         /* open backup llog */
1056         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1057                        LLOG_OPEN_EXISTS);
1058         if (rc)
1059                 GOTO(out_closel, rc);
1060
1061         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1062         if (rc)
1063                 GOTO(out_close, rc);
1064
1065         if (llog_get_size(backup_llh) <= 1)
1066                 GOTO(out_close, rc = 0);
1067
1068         OBD_ALLOC_PTR(mrul);
1069         if (!mrul)
1070                 GOTO(out_close, rc = -ENOMEM);
1071         /* devname is only needed information to replace UUID records */
1072         strlcpy(mrul->target.mti_svname, devname,
1073                 sizeof(mrul->target.mti_svname));
1074         /* parse nids later */
1075         strlcpy(mrul->target.mti_params, nids, sizeof(mrul->target.mti_params));
1076         /* Copy records to this temporary llog */
1077         mrul->temp_llh = orig_llh;
1078
1079         rc = llog_process(env, backup_llh, mgs_replace_handler,
1080                           (void *)mrul, NULL);
1081         OBD_FREE_PTR(mrul);
1082 out_close:
1083         rc2 = llog_close(NULL, backup_llh);
1084         if (!rc)
1085                 rc = rc2;
1086 out_closel:
1087         rc2 = llog_close(NULL, orig_llh);
1088         if (!rc)
1089                 rc = rc2;
1090
1091 out_restore:
1092         if (rc) {
1093                 CERROR("%s: llog should be restored: rc = %d\n",
1094                        mgs->obd_name, rc);
1095                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1096                                   logname);
1097                 if (rc2 < 0)
1098                         CERROR("%s: can't restore backup %s: rc = %d\n",
1099                                mgs->obd_name, logname, rc2);
1100         }
1101
1102 out_free:
1103         OBD_FREE(backup, strlen(backup) + 1);
1104
1105 out_put:
1106         llog_ctxt_put(ctxt);
1107
1108         if (rc)
1109                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1110                        mgs->obd_name, logname, rc);
1111
1112         RETURN(rc);
1113 }
1114
1115 /**
1116  * Parse device name and get file system name and/or device index
1117  *
1118  * \param[in]   devname device name (ex. lustre-MDT0000)
1119  * \param[out]  fsname  file system name(optional)
1120  * \param[out]  index   device index(optional)
1121  *
1122  * \retval 0    success
1123  */
1124 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1125 {
1126         int rc;
1127         ENTRY;
1128
1129         /* Extract fsname */
1130         if (fsname) {
1131                 rc = server_name2fsname(devname, fsname, NULL);
1132                 if (rc < 0) {
1133                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1134                                devname);
1135                         RETURN(-EINVAL);
1136                 }
1137         }
1138
1139         if (index) {
1140                 rc = server_name2index(devname, index, NULL);
1141                 if (rc < 0) {
1142                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1143                                devname);
1144                         RETURN(-EINVAL);
1145                 }
1146         }
1147
1148         RETURN(0);
1149 }
1150
1151 /* This is only called during replace_nids */
1152 static int only_mgs_is_running(struct obd_device *mgs_obd)
1153 {
1154         /* TDB: Is global variable with devices count exists? */
1155         int num_devices = get_devices_count();
1156         int num_exports = 0;
1157         struct obd_export *exp;
1158
1159         spin_lock(&mgs_obd->obd_dev_lock);
1160         list_for_each_entry(exp, &mgs_obd->obd_exports, exp_obd_chain) {
1161                 /* skip self export */
1162                 if (exp == mgs_obd->obd_self_export)
1163                         continue;
1164                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1165                         continue;
1166
1167                 ++num_exports;
1168
1169                 CERROR("%s: node %s still connected during replace_nids "
1170                        "connect_flags:%llx\n",
1171                        mgs_obd->obd_name,
1172                        libcfs_nid2str(exp->exp_nid_stats->nid),
1173                        exp_connect_flags(exp));
1174
1175         }
1176         spin_unlock(&mgs_obd->obd_dev_lock);
1177
1178         /* osd, MGS and MGC + self_export
1179            (wc -l /proc/fs/lustre/devices <= 2) && (non self exports == 0) */
1180         return (num_devices <= 3) && (num_exports == 0);
1181 }
1182
1183 static int name_create_mdt(char **logname, char *fsname, int i)
1184 {
1185         char mdt_index[9];
1186
1187         sprintf(mdt_index, "-MDT%04x", i);
1188         return name_create(logname, fsname, mdt_index);
1189 }
1190
1191 /**
1192  * Replace nids for \a device to \a nids values
1193  *
1194  * \param obd           MGS obd device
1195  * \param devname       nids need to be replaced for this device
1196  * (ex. lustre-OST0000)
1197  * \param nids          nids list (ex. nid1,nid2,nid3)
1198  *
1199  * \retval 0    success
1200  */
1201 int mgs_replace_nids(const struct lu_env *env,
1202                      struct mgs_device *mgs,
1203                      char *devname, char *nids)
1204 {
1205         /* Assume fsname is part of device name */
1206         char fsname[MTI_NAME_MAXLEN];
1207         int rc;
1208         __u32 index;
1209         char *logname;
1210         struct fs_db *fsdb;
1211         unsigned int i;
1212         int conn_state;
1213         struct obd_device *mgs_obd = mgs->mgs_obd;
1214         ENTRY;
1215
1216         /* We can only change NIDs if no other nodes are connected */
1217         spin_lock(&mgs_obd->obd_dev_lock);
1218         conn_state = mgs_obd->obd_no_conn;
1219         mgs_obd->obd_no_conn = 1;
1220         spin_unlock(&mgs_obd->obd_dev_lock);
1221
1222         /* We can not change nids if not only MGS is started */
1223         if (!only_mgs_is_running(mgs_obd)) {
1224                 CERROR("Only MGS is allowed to be started\n");
1225                 GOTO(out, rc = -EINPROGRESS);
1226         }
1227
1228         /* Get fsname and index*/
1229         rc = mgs_parse_devname(devname, fsname, &index);
1230         if (rc)
1231                 GOTO(out, rc);
1232
1233         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1234         if (rc) {
1235                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1236                 GOTO(out, rc);
1237         }
1238
1239         /* Process client llogs */
1240         name_create(&logname, fsname, "-client");
1241         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1242         name_destroy(&logname);
1243         if (rc) {
1244                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1245                        fsname, devname, rc);
1246                 GOTO(out, rc);
1247         }
1248
1249         /* Process MDT llogs */
1250         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1251                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1252                         continue;
1253                 name_create_mdt(&logname, fsname, i);
1254                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1255                 name_destroy(&logname);
1256                 if (rc)
1257                         GOTO(out, rc);
1258         }
1259
1260 out:
1261         spin_lock(&mgs_obd->obd_dev_lock);
1262         mgs_obd->obd_no_conn = conn_state;
1263         spin_unlock(&mgs_obd->obd_dev_lock);
1264
1265         RETURN(rc);
1266 }
1267
1268 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1269                             char *devname, struct lov_desc *desc)
1270 {
1271         struct mgs_thread_info  *mgi = mgs_env_info(env);
1272         struct llog_cfg_rec     *lcr;
1273         int rc;
1274
1275         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1276         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1277         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1278         if (lcr == NULL)
1279                 return -ENOMEM;
1280
1281         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1282         lustre_cfg_rec_free(lcr);
1283         return rc;
1284 }
1285
1286 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1287                             char *devname, struct lmv_desc *desc)
1288 {
1289         struct mgs_thread_info  *mgi = mgs_env_info(env);
1290         struct llog_cfg_rec     *lcr;
1291         int rc;
1292
1293         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1294         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1295         lcr = lustre_cfg_rec_new(LCFG_SETUP, &mgi->mgi_bufs);
1296         if (lcr == NULL)
1297                 return -ENOMEM;
1298
1299         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1300         lustre_cfg_rec_free(lcr);
1301         return rc;
1302 }
1303
1304 static inline int record_mdc_add(const struct lu_env *env,
1305                                  struct llog_handle *llh,
1306                                  char *logname, char *mdcuuid,
1307                                  char *mdtuuid, char *index,
1308                                  char *gen)
1309 {
1310         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1311                            mdtuuid,index,gen,mdcuuid);
1312 }
1313
1314 static inline int record_lov_add(const struct lu_env *env,
1315                                  struct llog_handle *llh,
1316                                  char *lov_name, char *ost_uuid,
1317                                  char *index, char *gen)
1318 {
1319         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1320                            ost_uuid, index, gen, 0);
1321 }
1322
1323 static inline int record_mount_opt(const struct lu_env *env,
1324                                    struct llog_handle *llh,
1325                                    char *profile, char *lov_name,
1326                                    char *mdc_name)
1327 {
1328         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1329                            profile,lov_name,mdc_name,0);
1330 }
1331
1332 static int record_marker(const struct lu_env *env,
1333                          struct llog_handle *llh,
1334                          struct fs_db *fsdb, __u32 flags,
1335                          char *tgtname, char *comment)
1336 {
1337         struct mgs_thread_info *mgi = mgs_env_info(env);
1338         struct llog_cfg_rec *lcr;
1339         int rc;
1340         int cplen = 0;
1341
1342         if (flags & CM_START)
1343                 fsdb->fsdb_gen++;
1344         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1345         mgi->mgi_marker.cm_flags = flags;
1346         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1347         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1348                         sizeof(mgi->mgi_marker.cm_tgtname));
1349         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1350                 return -E2BIG;
1351         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1352                         sizeof(mgi->mgi_marker.cm_comment));
1353         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1354                 return -E2BIG;
1355         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1356         mgi->mgi_marker.cm_canceltime = 0;
1357         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1358         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1359                             sizeof(mgi->mgi_marker));
1360         lcr = lustre_cfg_rec_new(LCFG_MARKER, &mgi->mgi_bufs);
1361         if (lcr == NULL)
1362                 return -ENOMEM;
1363
1364         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1365         lustre_cfg_rec_free(lcr);
1366         return rc;
1367 }
1368
1369 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1370                             struct llog_handle **llh, char *name)
1371 {
1372         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1373         struct llog_ctxt        *ctxt;
1374         int                      rc = 0;
1375         ENTRY;
1376
1377         if (*llh)
1378                 GOTO(out, rc = -EBUSY);
1379
1380         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1381         if (!ctxt)
1382                 GOTO(out, rc = -ENODEV);
1383         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1384
1385         rc = llog_open_create(env, ctxt, llh, NULL, name);
1386         if (rc)
1387                 GOTO(out_ctxt, rc);
1388         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1389         if (rc)
1390                 llog_close(env, *llh);
1391 out_ctxt:
1392         llog_ctxt_put(ctxt);
1393 out:
1394         if (rc) {
1395                 CERROR("%s: can't start log %s: rc = %d\n",
1396                        mgs->mgs_obd->obd_name, name, rc);
1397                 *llh = NULL;
1398         }
1399         RETURN(rc);
1400 }
1401
1402 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1403 {
1404         int rc;
1405
1406         rc = llog_close(env, *llh);
1407         *llh = NULL;
1408
1409         return rc;
1410 }
1411
1412 /******************** config "macros" *********************/
1413
1414 /* write an lcfg directly into a log (with markers) */
1415 static int mgs_write_log_direct(const struct lu_env *env,
1416                                 struct mgs_device *mgs, struct fs_db *fsdb,
1417                                 char *logname, struct llog_cfg_rec *lcr,
1418                                 char *devname, char *comment)
1419 {
1420         struct llog_handle *llh = NULL;
1421         int rc;
1422
1423         ENTRY;
1424
1425         rc = record_start_log(env, mgs, &llh, logname);
1426         if (rc)
1427                 RETURN(rc);
1428
1429         /* FIXME These should be a single journal transaction */
1430         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1431         if (rc)
1432                 GOTO(out_end, rc);
1433         rc = llog_write(env, llh, &lcr->lcr_hdr, LLOG_NEXT_IDX);
1434         if (rc)
1435                 GOTO(out_end, rc);
1436         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1437         if (rc)
1438                 GOTO(out_end, rc);
1439 out_end:
1440         record_end_log(env, &llh);
1441         RETURN(rc);
1442 }
1443
1444 /* write the lcfg in all logs for the given fs */
1445 int mgs_write_log_direct_all(const struct lu_env *env, struct mgs_device *mgs,
1446                              struct fs_db *fsdb, struct mgs_target_info *mti,
1447                              struct llog_cfg_rec *lcr, char *devname,
1448                              char *comment, int server_only)
1449 {
1450         struct list_head         log_list;
1451         struct mgs_direntry     *dirent, *n;
1452         char                    *fsname = mti->mti_fsname;
1453         int                      rc = 0, len = strlen(fsname);
1454
1455         ENTRY;
1456         /* Find all the logs in the CONFIGS directory */
1457         rc = class_dentry_readdir(env, mgs, &log_list);
1458         if (rc)
1459                 RETURN(rc);
1460
1461         /* Could use fsdb index maps instead of directory listing */
1462         list_for_each_entry_safe(dirent, n, &log_list, mde_list) {
1463                 list_del_init(&dirent->mde_list);
1464                 /* don't write to sptlrpc rule log */
1465                 if (strstr(dirent->mde_name, "-sptlrpc") != NULL)
1466                         goto next;
1467
1468                 /* caller wants write server logs only */
1469                 if (server_only && strstr(dirent->mde_name, "-client") != NULL)
1470                         goto next;
1471
1472                 if (strlen(dirent->mde_name) <= len ||
1473                     strncmp(fsname, dirent->mde_name, len) != 0 ||
1474                     dirent->mde_name[len] != '-')
1475                         goto next;
1476
1477                 CDEBUG(D_MGS, "Changing log %s\n", dirent->mde_name);
1478                 /* Erase any old settings of this same parameter */
1479                 rc = mgs_modify(env, mgs, fsdb, mti, dirent->mde_name,
1480                                 devname, comment, CM_SKIP);
1481                 if (rc < 0)
1482                         CERROR("%s: Can't modify llog %s: rc = %d\n",
1483                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1484                 if (lcr == NULL)
1485                         goto next;
1486                 /* Write the new one */
1487                 rc = mgs_write_log_direct(env, mgs, fsdb, dirent->mde_name,
1488                                           lcr, devname, comment);
1489                 if (rc != 0)
1490                         CERROR("%s: writing log %s: rc = %d\n",
1491                                mgs->mgs_obd->obd_name, dirent->mde_name, rc);
1492 next:
1493                 mgs_direntry_free(dirent);
1494         }
1495
1496         RETURN(rc);
1497 }
1498
1499 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1500                                     struct mgs_device *mgs,
1501                                     struct fs_db *fsdb,
1502                                     struct mgs_target_info *mti,
1503                                     int index, char *logname);
1504 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1505                                     struct mgs_device *mgs,
1506                                     struct fs_db *fsdb,
1507                                     struct mgs_target_info *mti,
1508                                     char *logname, char *suffix, char *lovname,
1509                                     enum lustre_sec_part sec_part, int flags);
1510 static int name_create_mdt_and_lov(char **logname, char **lovname,
1511                                    struct fs_db *fsdb, int i);
1512
1513 static int add_param(char *params, char *key, char *val)
1514 {
1515         char *start = params + strlen(params);
1516         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1517         int keylen = 0;
1518
1519         if (key != NULL)
1520                 keylen = strlen(key);
1521         if (start + 1 + keylen + strlen(val) >= end) {
1522                 CERROR("params are too long: %s %s%s\n",
1523                        params, key != NULL ? key : "", val);
1524                 return -EINVAL;
1525         }
1526
1527         sprintf(start, " %s%s", key != NULL ? key : "", val);
1528         return 0;
1529 }
1530
1531 /**
1532  * Walk through client config log record and convert the related records
1533  * into the target.
1534  **/
1535 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1536                                          struct llog_handle *llh,
1537                                          struct llog_rec_hdr *rec, void *data)
1538 {
1539         struct mgs_device *mgs;
1540         struct obd_device *obd;
1541         struct mgs_target_info *mti, *tmti;
1542         struct fs_db *fsdb;
1543         int cfg_len = rec->lrh_len;
1544         char *cfg_buf = (char*) (rec + 1);
1545         struct lustre_cfg *lcfg;
1546         int rc = 0;
1547         struct llog_handle *mdt_llh = NULL;
1548         static int got_an_osc_or_mdc = 0;
1549         /* 0: not found any osc/mdc;
1550            1: found osc;
1551            2: found mdc;
1552         */
1553         static int last_step = -1;
1554         int cplen = 0;
1555
1556         ENTRY;
1557
1558         mti = ((struct temp_comp*)data)->comp_mti;
1559         tmti = ((struct temp_comp*)data)->comp_tmti;
1560         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1561         obd = ((struct temp_comp *)data)->comp_obd;
1562         mgs = lu2mgs_dev(obd->obd_lu_dev);
1563         LASSERT(mgs);
1564
1565         if (rec->lrh_type != OBD_CFG_REC) {
1566                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1567                 RETURN(-EINVAL);
1568         }
1569
1570         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1571         if (rc) {
1572                 CERROR("Insane cfg\n");
1573                 RETURN(rc);
1574         }
1575
1576         lcfg = (struct lustre_cfg *)cfg_buf;
1577
1578         if (lcfg->lcfg_command == LCFG_MARKER) {
1579                 struct cfg_marker *marker;
1580                 marker = lustre_cfg_buf(lcfg, 1);
1581                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1582                     (marker->cm_flags & CM_START) &&
1583                      !(marker->cm_flags & CM_SKIP)) {
1584                         got_an_osc_or_mdc = 1;
1585                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1586                                         sizeof(tmti->mti_svname));
1587                         if (cplen >= sizeof(tmti->mti_svname))
1588                                 RETURN(-E2BIG);
1589                         rc = record_start_log(env, mgs, &mdt_llh,
1590                                               mti->mti_svname);
1591                         if (rc)
1592                                 RETURN(rc);
1593                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1594                                            mti->mti_svname, "add osc(copied)");
1595                         record_end_log(env, &mdt_llh);
1596                         last_step = marker->cm_step;
1597                         RETURN(rc);
1598                 }
1599                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1600                     (marker->cm_flags & CM_END) &&
1601                      !(marker->cm_flags & CM_SKIP)) {
1602                         LASSERT(last_step == marker->cm_step);
1603                         last_step = -1;
1604                         got_an_osc_or_mdc = 0;
1605                         memset(tmti, 0, sizeof(*tmti));
1606                         rc = record_start_log(env, mgs, &mdt_llh,
1607                                               mti->mti_svname);
1608                         if (rc)
1609                                 RETURN(rc);
1610                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1611                                            mti->mti_svname, "add osc(copied)");
1612                         record_end_log(env, &mdt_llh);
1613                         RETURN(rc);
1614                 }
1615                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1616                     (marker->cm_flags & CM_START) &&
1617                      !(marker->cm_flags & CM_SKIP)) {
1618                         got_an_osc_or_mdc = 2;
1619                         last_step = marker->cm_step;
1620                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1621                                strlen(marker->cm_tgtname));
1622
1623                         RETURN(rc);
1624                 }
1625                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1626                     (marker->cm_flags & CM_END) &&
1627                      !(marker->cm_flags & CM_SKIP)) {
1628                         LASSERT(last_step == marker->cm_step);
1629                         last_step = -1;
1630                         got_an_osc_or_mdc = 0;
1631                         memset(tmti, 0, sizeof(*tmti));
1632                         RETURN(rc);
1633                 }
1634         }
1635
1636         if (got_an_osc_or_mdc == 0 || last_step < 0)
1637                 RETURN(rc);
1638
1639         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1640                 uint64_t nodenid = lcfg->lcfg_nid;
1641
1642                 if (strlen(tmti->mti_uuid) == 0) {
1643                         /* target uuid not set, this config record is before
1644                          * LCFG_SETUP, this nid is one of target node nid.
1645                          */
1646                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1647                         tmti->mti_nid_count++;
1648                 } else {
1649                         /* failover node nid */
1650                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1651                                        libcfs_nid2str(nodenid));
1652                 }
1653
1654                 RETURN(rc);
1655         }
1656
1657         if (lcfg->lcfg_command == LCFG_SETUP) {
1658                 char *target;
1659
1660                 target = lustre_cfg_string(lcfg, 1);
1661                 memcpy(tmti->mti_uuid, target, strlen(target));
1662                 RETURN(rc);
1663         }
1664
1665         /* ignore client side sptlrpc_conf_log */
1666         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1667                 RETURN(rc);
1668
1669         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1670                 int index;
1671
1672                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1673                         RETURN (-EINVAL);
1674
1675                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1676                        strlen(mti->mti_fsname));
1677                 tmti->mti_stripe_index = index;
1678
1679                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1680                                               mti->mti_stripe_index,
1681                                               mti->mti_svname);
1682                 memset(tmti, 0, sizeof(*tmti));
1683                 RETURN(rc);
1684         }
1685
1686         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1687                 int index;
1688                 char mdt_index[9];
1689                 char *logname, *lovname;
1690
1691                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1692                                              mti->mti_stripe_index);
1693                 if (rc)
1694                         RETURN(rc);
1695                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1696
1697                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1698                         name_destroy(&logname);
1699                         name_destroy(&lovname);
1700                         RETURN(-EINVAL);
1701                 }
1702
1703                 tmti->mti_stripe_index = index;
1704                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1705                                          mdt_index, lovname,
1706                                          LUSTRE_SP_MDT, 0);
1707                 name_destroy(&logname);
1708                 name_destroy(&lovname);
1709                 RETURN(rc);
1710         }
1711         RETURN(rc);
1712 }
1713
1714 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1715 /* stealed from mgs_get_fsdb_from_llog*/
1716 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1717                                               struct mgs_device *mgs,
1718                                               char *client_name,
1719                                               struct temp_comp* comp)
1720 {
1721         struct llog_handle *loghandle;
1722         struct mgs_target_info *tmti;
1723         struct llog_ctxt *ctxt;
1724         int rc;
1725
1726         ENTRY;
1727
1728         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1729         LASSERT(ctxt != NULL);
1730
1731         OBD_ALLOC_PTR(tmti);
1732         if (tmti == NULL)
1733                 GOTO(out_ctxt, rc = -ENOMEM);
1734
1735         comp->comp_tmti = tmti;
1736         comp->comp_obd = mgs->mgs_obd;
1737
1738         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1739                        LLOG_OPEN_EXISTS);
1740         if (rc < 0) {
1741                 if (rc == -ENOENT)
1742                         rc = 0;
1743                 GOTO(out_pop, rc);
1744         }
1745
1746         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1747         if (rc)
1748                 GOTO(out_close, rc);
1749
1750         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1751                                   (void *)comp, NULL, false);
1752         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1753 out_close:
1754         llog_close(env, loghandle);
1755 out_pop:
1756         OBD_FREE_PTR(tmti);
1757 out_ctxt:
1758         llog_ctxt_put(ctxt);
1759         RETURN(rc);
1760 }
1761
1762 /* lmv is the second thing for client logs */
1763 /* copied from mgs_write_log_lov. Please refer to that.  */
1764 static int mgs_write_log_lmv(const struct lu_env *env,
1765                              struct mgs_device *mgs,
1766                              struct fs_db *fsdb,
1767                              struct mgs_target_info *mti,
1768                              char *logname, char *lmvname)
1769 {
1770         struct llog_handle *llh = NULL;
1771         struct lmv_desc *lmvdesc;
1772         char *uuid;
1773         int rc = 0;
1774         ENTRY;
1775
1776         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1777
1778         OBD_ALLOC_PTR(lmvdesc);
1779         if (lmvdesc == NULL)
1780                 RETURN(-ENOMEM);
1781         lmvdesc->ld_active_tgt_count = 0;
1782         lmvdesc->ld_tgt_count = 0;
1783         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1784         uuid = (char *)lmvdesc->ld_uuid.uuid;
1785
1786         rc = record_start_log(env, mgs, &llh, logname);
1787         if (rc)
1788                 GOTO(out_free, rc);
1789         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1790         if (rc)
1791                 GOTO(out_end, rc);
1792         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1793         if (rc)
1794                 GOTO(out_end, rc);
1795         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1796         if (rc)
1797                 GOTO(out_end, rc);
1798         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1799         if (rc)
1800                 GOTO(out_end, rc);
1801 out_end:
1802         record_end_log(env, &llh);
1803 out_free:
1804         OBD_FREE_PTR(lmvdesc);
1805         RETURN(rc);
1806 }
1807
1808 /* lov is the first thing in the mdt and client logs */
1809 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1810                              struct fs_db *fsdb, struct mgs_target_info *mti,
1811                              char *logname, char *lovname)
1812 {
1813         struct llog_handle *llh = NULL;
1814         struct lov_desc *lovdesc;
1815         char *uuid;
1816         int rc = 0;
1817         ENTRY;
1818
1819         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1820
1821         /*
1822         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1823         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1824               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1825         */
1826
1827         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1828         OBD_ALLOC_PTR(lovdesc);
1829         if (lovdesc == NULL)
1830                 RETURN(-ENOMEM);
1831         lovdesc->ld_magic = LOV_DESC_MAGIC;
1832         lovdesc->ld_tgt_count = 0;
1833         /* Defaults.  Can be changed later by lcfg config_param */
1834         lovdesc->ld_default_stripe_count = 1;
1835         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1836         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1837         lovdesc->ld_default_stripe_offset = -1;
1838         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1839         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1840         /* can these be the same? */
1841         uuid = (char *)lovdesc->ld_uuid.uuid;
1842
1843         /* This should always be the first entry in a log.
1844         rc = mgs_clear_log(obd, logname); */
1845         rc = record_start_log(env, mgs, &llh, logname);
1846         if (rc)
1847                 GOTO(out_free, rc);
1848         /* FIXME these should be a single journal transaction */
1849         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1850         if (rc)
1851                 GOTO(out_end, rc);
1852         rc = record_attach(env, llh, lovname, "lov", uuid);
1853         if (rc)
1854                 GOTO(out_end, rc);
1855         rc = record_lov_setup(env, llh, lovname, lovdesc);
1856         if (rc)
1857                 GOTO(out_end, rc);
1858         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1859         if (rc)
1860                 GOTO(out_end, rc);
1861         EXIT;
1862 out_end:
1863         record_end_log(env, &llh);
1864 out_free:
1865         OBD_FREE_PTR(lovdesc);
1866         return rc;
1867 }
1868
1869 /* add failnids to open log */
1870 static int mgs_write_log_failnids(const struct lu_env *env,
1871                                   struct mgs_target_info *mti,
1872                                   struct llog_handle *llh,
1873                                   char *cliname)
1874 {
1875         char *failnodeuuid = NULL;
1876         char *ptr = mti->mti_params;
1877         lnet_nid_t nid;
1878         int rc = 0;
1879
1880         /*
1881         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1882         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1883         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1884         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1885         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1886         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1887         */
1888
1889         /*
1890          * Pull failnid info out of params string, which may contain something
1891          * like "<nid1>,<nid2>:<nid3>,<nid4>".  class_parse_nid() does not
1892          * complain about abnormal inputs like ",:<nid1>", "<nid1>:,<nid2>",
1893          * etc.  However, convert_hostnames() should have caught those.
1894          */
1895         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1896                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1897                         if (failnodeuuid == NULL) {
1898                                 /* We don't know the failover node name,
1899                                    so just use the first nid as the uuid */
1900                                 rc = name_create(&failnodeuuid,
1901                                                  libcfs_nid2str(nid), "");
1902                                 if (rc)
1903                                         return rc;
1904                         }
1905                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1906                                "client %s\n", libcfs_nid2str(nid),
1907                                failnodeuuid, cliname);
1908                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1909                         /*
1910                          * If *ptr is ':', we have added all NIDs for
1911                          * failnodeuuid.
1912                          */
1913                         if (*ptr == ':') {
1914                                 rc = record_add_conn(env, llh, cliname,
1915                                                      failnodeuuid);
1916                                 name_destroy(&failnodeuuid);
1917                                 failnodeuuid = NULL;
1918                         }
1919                 }
1920                 if (failnodeuuid) {
1921                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1922                         name_destroy(&failnodeuuid);
1923                         failnodeuuid = NULL;
1924                 }
1925         }
1926
1927         return rc;
1928 }
1929
1930 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1931                                     struct mgs_device *mgs,
1932                                     struct fs_db *fsdb,
1933                                     struct mgs_target_info *mti,
1934                                     char *logname, char *lmvname)
1935 {
1936         struct llog_handle *llh = NULL;
1937         char *mdcname = NULL;
1938         char *nodeuuid = NULL;
1939         char *mdcuuid = NULL;
1940         char *lmvuuid = NULL;
1941         char index[6];
1942         int i, rc;
1943         ENTRY;
1944
1945         if (mgs_log_is_empty(env, mgs, logname)) {
1946                 CERROR("log is empty! Logical error\n");
1947                 RETURN(-EINVAL);
1948         }
1949
1950         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1951                mti->mti_svname, logname, lmvname);
1952
1953         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1954         if (rc)
1955                 RETURN(rc);
1956         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1957         if (rc)
1958                 GOTO(out_free, rc);
1959         rc = name_create(&mdcuuid, mdcname, "_UUID");
1960         if (rc)
1961                 GOTO(out_free, rc);
1962         rc = name_create(&lmvuuid, lmvname, "_UUID");
1963         if (rc)
1964                 GOTO(out_free, rc);
1965
1966         rc = record_start_log(env, mgs, &llh, logname);
1967         if (rc)
1968                 GOTO(out_free, rc);
1969         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1970                            "add mdc");
1971         if (rc)
1972                 GOTO(out_end, rc);
1973         for (i = 0; i < mti->mti_nid_count; i++) {
1974                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1975                        libcfs_nid2str(mti->mti_nids[i]));
1976
1977                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1978                 if (rc)
1979                         GOTO(out_end, rc);
1980         }
1981
1982         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1983         if (rc)
1984                 GOTO(out_end, rc);
1985         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1986         if (rc)
1987                 GOTO(out_end, rc);
1988         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1989         if (rc)
1990                 GOTO(out_end, rc);
1991         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1992         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1993                             index, "1");
1994         if (rc)
1995                 GOTO(out_end, rc);
1996         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1997                            "add mdc");
1998         if (rc)
1999                 GOTO(out_end, rc);
2000 out_end:
2001         record_end_log(env, &llh);
2002 out_free:
2003         name_destroy(&lmvuuid);
2004         name_destroy(&mdcuuid);
2005         name_destroy(&mdcname);
2006         name_destroy(&nodeuuid);
2007         RETURN(rc);
2008 }
2009
2010 static inline int name_create_lov(char **lovname, char *mdtname,
2011                                   struct fs_db *fsdb, int index)
2012 {
2013         /* COMPAT_180 */
2014         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2015                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2016         else
2017                 return name_create(lovname, mdtname, "-mdtlov");
2018 }
2019
2020 static int name_create_mdt_and_lov(char **logname, char **lovname,
2021                                    struct fs_db *fsdb, int i)
2022 {
2023         int rc;
2024
2025         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2026         if (rc)
2027                 return rc;
2028         /* COMPAT_180 */
2029         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2030                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2031         else
2032                 rc = name_create(lovname, *logname, "-mdtlov");
2033         if (rc) {
2034                 name_destroy(logname);
2035                 *logname = NULL;
2036         }
2037         return rc;
2038 }
2039
2040 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2041                                       struct fs_db *fsdb, int i)
2042 {
2043         char suffix[16];
2044
2045         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2046                 sprintf(suffix, "-osc");
2047         else
2048                 sprintf(suffix, "-osc-MDT%04x", i);
2049         return name_create(oscname, ostname, suffix);
2050 }
2051
2052 /* add new mdc to already existent MDS */
2053 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2054                                     struct mgs_device *mgs,
2055                                     struct fs_db *fsdb,
2056                                     struct mgs_target_info *mti,
2057                                     int mdt_index, char *logname)
2058 {
2059         struct llog_handle      *llh = NULL;
2060         char    *nodeuuid = NULL;
2061         char    *ospname = NULL;
2062         char    *lovuuid = NULL;
2063         char    *mdtuuid = NULL;
2064         char    *svname = NULL;
2065         char    *mdtname = NULL;
2066         char    *lovname = NULL;
2067         char    index_str[16];
2068         int     i, rc;
2069
2070         ENTRY;
2071         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2072                 CERROR("log is empty! Logical error\n");
2073                 RETURN (-EINVAL);
2074         }
2075
2076         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2077                logname);
2078
2079         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2080         if (rc)
2081                 RETURN(rc);
2082
2083         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2084         if (rc)
2085                 GOTO(out_destory, rc);
2086
2087         rc = name_create(&svname, mdtname, "-osp");
2088         if (rc)
2089                 GOTO(out_destory, rc);
2090
2091         sprintf(index_str, "-MDT%04x", mdt_index);
2092         rc = name_create(&ospname, svname, index_str);
2093         if (rc)
2094                 GOTO(out_destory, rc);
2095
2096         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2097         if (rc)
2098                 GOTO(out_destory, rc);
2099
2100         rc = name_create(&lovuuid, lovname, "_UUID");
2101         if (rc)
2102                 GOTO(out_destory, rc);
2103
2104         rc = name_create(&mdtuuid, mdtname, "_UUID");
2105         if (rc)
2106                 GOTO(out_destory, rc);
2107
2108         rc = record_start_log(env, mgs, &llh, logname);
2109         if (rc)
2110                 GOTO(out_destory, rc);
2111
2112         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2113                            "add osp");
2114         if (rc)
2115                 GOTO(out_destory, rc);
2116
2117         for (i = 0; i < mti->mti_nid_count; i++) {
2118                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2119                        libcfs_nid2str(mti->mti_nids[i]));
2120                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2121                 if (rc)
2122                         GOTO(out_end, rc);
2123         }
2124
2125         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2126         if (rc)
2127                 GOTO(out_end, rc);
2128
2129         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2130                           NULL, NULL);
2131         if (rc)
2132                 GOTO(out_end, rc);
2133
2134         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2135         if (rc)
2136                 GOTO(out_end, rc);
2137
2138         /* Add mdc(osp) to lod */
2139         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2140                  mti->mti_stripe_index);
2141         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2142                          index_str, "1", NULL);
2143         if (rc)
2144                 GOTO(out_end, rc);
2145
2146         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2147         if (rc)
2148                 GOTO(out_end, rc);
2149
2150 out_end:
2151         record_end_log(env, &llh);
2152
2153 out_destory:
2154         name_destroy(&mdtuuid);
2155         name_destroy(&lovuuid);
2156         name_destroy(&lovname);
2157         name_destroy(&ospname);
2158         name_destroy(&svname);
2159         name_destroy(&nodeuuid);
2160         name_destroy(&mdtname);
2161         RETURN(rc);
2162 }
2163
2164 static int mgs_write_log_mdt0(const struct lu_env *env,
2165                               struct mgs_device *mgs,
2166                               struct fs_db *fsdb,
2167                               struct mgs_target_info *mti)
2168 {
2169         char *log = mti->mti_svname;
2170         struct llog_handle *llh = NULL;
2171         char *uuid, *lovname;
2172         char mdt_index[6];
2173         char *ptr = mti->mti_params;
2174         int rc = 0, failout = 0;
2175         ENTRY;
2176
2177         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2178         if (uuid == NULL)
2179                 RETURN(-ENOMEM);
2180
2181         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2182                 failout = (strncmp(ptr, "failout", 7) == 0);
2183
2184         rc = name_create(&lovname, log, "-mdtlov");
2185         if (rc)
2186                 GOTO(out_free, rc);
2187         if (mgs_log_is_empty(env, mgs, log)) {
2188                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2189                 if (rc)
2190                         GOTO(out_lod, rc);
2191         }
2192
2193         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2194
2195         rc = record_start_log(env, mgs, &llh, log);
2196         if (rc)
2197                 GOTO(out_lod, rc);
2198
2199         /* add MDT itself */
2200
2201         /* FIXME this whole fn should be a single journal transaction */
2202         sprintf(uuid, "%s_UUID", log);
2203         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2204         if (rc)
2205                 GOTO(out_lod, rc);
2206         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2207         if (rc)
2208                 GOTO(out_end, rc);
2209         rc = record_mount_opt(env, llh, log, lovname, NULL);
2210         if (rc)
2211                 GOTO(out_end, rc);
2212         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2213                         failout ? "n" : "f");
2214         if (rc)
2215                 GOTO(out_end, rc);
2216         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2217         if (rc)
2218                 GOTO(out_end, rc);
2219 out_end:
2220         record_end_log(env, &llh);
2221 out_lod:
2222         name_destroy(&lovname);
2223 out_free:
2224         OBD_FREE(uuid, sizeof(struct obd_uuid));
2225         RETURN(rc);
2226 }
2227
2228 /* envelope method for all layers log */
2229 static int mgs_write_log_mdt(const struct lu_env *env,
2230                              struct mgs_device *mgs,
2231                              struct fs_db *fsdb,
2232                              struct mgs_target_info *mti)
2233 {
2234         struct mgs_thread_info *mgi = mgs_env_info(env);
2235         struct llog_handle *llh = NULL;
2236         char *cliname;
2237         int rc, i = 0;
2238         ENTRY;
2239
2240         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2241
2242         if (mti->mti_uuid[0] == '\0') {
2243                 /* Make up our own uuid */
2244                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2245                          "%s_UUID", mti->mti_svname);
2246         }
2247
2248         /* add mdt */
2249         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2250         if (rc)
2251                 RETURN(rc);
2252         /* Append the mdt info to the client log */
2253         rc = name_create(&cliname, mti->mti_fsname, "-client");
2254         if (rc)
2255                 RETURN(rc);
2256
2257         if (mgs_log_is_empty(env, mgs, cliname)) {
2258                 /* Start client log */
2259                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2260                                        fsdb->fsdb_clilov);
2261                 if (rc)
2262                         GOTO(out_free, rc);
2263                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2264                                        fsdb->fsdb_clilmv);
2265                 if (rc)
2266                         GOTO(out_free, rc);
2267         }
2268
2269         /*
2270         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2271         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2272         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2273         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2274         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2275         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2276         */
2277
2278                 /* copy client info about lov/lmv */
2279                 mgi->mgi_comp.comp_mti = mti;
2280                 mgi->mgi_comp.comp_fsdb = fsdb;
2281
2282                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2283                                                         &mgi->mgi_comp);
2284                 if (rc)
2285                         GOTO(out_free, rc);
2286                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2287                                               fsdb->fsdb_clilmv);
2288                 if (rc)
2289                         GOTO(out_free, rc);
2290
2291                 /* add mountopts */
2292                 rc = record_start_log(env, mgs, &llh, cliname);
2293                 if (rc)
2294                         GOTO(out_free, rc);
2295
2296                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2297                                    "mount opts");
2298                 if (rc)
2299                         GOTO(out_end, rc);
2300                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2301                                       fsdb->fsdb_clilmv);
2302                 if (rc)
2303                         GOTO(out_end, rc);
2304                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2305                                    "mount opts");
2306
2307         if (rc)
2308                 GOTO(out_end, rc);
2309
2310         /* for_all_existing_mdt except current one */
2311         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2312                 if (i !=  mti->mti_stripe_index &&
2313                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2314                         char *logname;
2315
2316                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2317                         if (rc)
2318                                 GOTO(out_end, rc);
2319
2320                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2321                                                       i, logname);
2322                         name_destroy(&logname);
2323                         if (rc)
2324                                 GOTO(out_end, rc);
2325                 }
2326         }
2327 out_end:
2328         record_end_log(env, &llh);
2329 out_free:
2330         name_destroy(&cliname);
2331         RETURN(rc);
2332 }
2333
2334 /* Add the ost info to the client/mdt lov */
2335 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2336                                     struct mgs_device *mgs, struct fs_db *fsdb,
2337                                     struct mgs_target_info *mti,
2338                                     char *logname, char *suffix, char *lovname,
2339                                     enum lustre_sec_part sec_part, int flags)
2340 {
2341         struct llog_handle *llh = NULL;
2342         char *nodeuuid = NULL;
2343         char *oscname = NULL;
2344         char *oscuuid = NULL;
2345         char *lovuuid = NULL;
2346         char *svname = NULL;
2347         char index[6];
2348         int i, rc;
2349
2350         ENTRY;
2351         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2352                mti->mti_svname, logname);
2353
2354         if (mgs_log_is_empty(env, mgs, logname)) {
2355                 CERROR("log is empty! Logical error\n");
2356                 RETURN (-EINVAL);
2357         }
2358
2359         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2360         if (rc)
2361                 RETURN(rc);
2362         rc = name_create(&svname, mti->mti_svname, "-osc");
2363         if (rc)
2364                 GOTO(out_free, rc);
2365
2366         /* for the system upgraded from old 1.8, keep using the old osc naming
2367          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2368         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2369                 rc = name_create(&oscname, svname, "");
2370         else
2371                 rc = name_create(&oscname, svname, suffix);
2372         if (rc)
2373                 GOTO(out_free, rc);
2374
2375         rc = name_create(&oscuuid, oscname, "_UUID");
2376         if (rc)
2377                 GOTO(out_free, rc);
2378         rc = name_create(&lovuuid, lovname, "_UUID");
2379         if (rc)
2380                 GOTO(out_free, rc);
2381
2382
2383         /*
2384         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2385         multihomed (#4)
2386         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2387         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2388         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2389         failover (#6,7)
2390         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2391         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2392         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2393         */
2394
2395         rc = record_start_log(env, mgs, &llh, logname);
2396         if (rc)
2397                 GOTO(out_free, rc);
2398
2399         /* FIXME these should be a single journal transaction */
2400         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2401                            "add osc");
2402         if (rc)
2403                 GOTO(out_end, rc);
2404
2405         /* NB: don't change record order, because upon MDT steal OSC config
2406          * from client, it treats all nids before LCFG_SETUP as target nids
2407          * (multiple interfaces), while nids after as failover node nids.
2408          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2409          */
2410         for (i = 0; i < mti->mti_nid_count; i++) {
2411                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2412                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2413                 if (rc)
2414                         GOTO(out_end, rc);
2415         }
2416         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2417         if (rc)
2418                 GOTO(out_end, rc);
2419         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2420         if (rc)
2421                 GOTO(out_end, rc);
2422         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2423         if (rc)
2424                 GOTO(out_end, rc);
2425
2426         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2427
2428         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2429         if (rc)
2430                 GOTO(out_end, rc);
2431         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2432                            "add osc");
2433         if (rc)
2434                 GOTO(out_end, rc);
2435 out_end:
2436         record_end_log(env, &llh);
2437 out_free:
2438         name_destroy(&lovuuid);
2439         name_destroy(&oscuuid);
2440         name_destroy(&oscname);
2441         name_destroy(&svname);
2442         name_destroy(&nodeuuid);
2443         RETURN(rc);
2444 }
2445
2446 static int mgs_write_log_ost(const struct lu_env *env,
2447                              struct mgs_device *mgs, struct fs_db *fsdb,
2448                              struct mgs_target_info *mti)
2449 {
2450         struct llog_handle *llh = NULL;
2451         char *logname, *lovname;
2452         char *ptr = mti->mti_params;
2453         int rc, flags = 0, failout = 0, i;
2454         ENTRY;
2455
2456         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2457
2458         /* The ost startup log */
2459
2460         /* If the ost log already exists, that means that someone reformatted
2461            the ost and it called target_add again. */
2462         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2463                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2464                                    "exists, yet the server claims it never "
2465                                    "registered. It may have been reformatted, "
2466                                    "or the index changed. writeconf the MDT to "
2467                                    "regenerate all logs.\n", mti->mti_svname);
2468                 RETURN(-EALREADY);
2469         }
2470
2471         /*
2472         attach obdfilter ost1 ost1_UUID
2473         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2474         */
2475         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2476                 failout = (strncmp(ptr, "failout", 7) == 0);
2477         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2478         if (rc)
2479                 RETURN(rc);
2480         /* FIXME these should be a single journal transaction */
2481         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2482         if (rc)
2483                 GOTO(out_end, rc);
2484         if (*mti->mti_uuid == '\0')
2485                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2486                          "%s_UUID", mti->mti_svname);
2487         rc = record_attach(env, llh, mti->mti_svname,
2488                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2489         if (rc)
2490                 GOTO(out_end, rc);
2491         rc = record_setup(env, llh, mti->mti_svname,
2492                           "dev"/*ignored*/, "type"/*ignored*/,
2493                           failout ? "n" : "f", 0/*options*/);
2494         if (rc)
2495                 GOTO(out_end, rc);
2496         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2497         if (rc)
2498                 GOTO(out_end, rc);
2499 out_end:
2500         record_end_log(env, &llh);
2501         if (rc)
2502                 RETURN(rc);
2503         /* We also have to update the other logs where this osc is part of
2504            the lov */
2505
2506         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2507                 /* If we're upgrading, the old mdt log already has our
2508                    entry. Let's do a fake one for fun. */
2509                 /* Note that we can't add any new failnids, since we don't
2510                    know the old osc names. */
2511                 flags = CM_SKIP | CM_UPGRADE146;
2512
2513         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2514                 /* If the update flag isn't set, don't update client/mdt
2515                    logs. */
2516                 flags |= CM_SKIP;
2517                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2518                               "the MDT first to regenerate it.\n",
2519                               mti->mti_svname);
2520         }
2521
2522         /* Add ost to all MDT lov defs */
2523         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2524                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2525                         char mdt_index[9];
2526
2527                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2528                                                      i);
2529                         if (rc)
2530                                 RETURN(rc);
2531                         sprintf(mdt_index, "-MDT%04x", i);
2532                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2533                                                       logname, mdt_index,
2534                                                       lovname, LUSTRE_SP_MDT,
2535                                                       flags);
2536                         name_destroy(&logname);
2537                         name_destroy(&lovname);
2538                         if (rc)
2539                                 RETURN(rc);
2540                 }
2541         }
2542
2543         /* Append ost info to the client log */
2544         rc = name_create(&logname, mti->mti_fsname, "-client");
2545         if (rc)
2546                 RETURN(rc);
2547         if (mgs_log_is_empty(env, mgs, logname)) {
2548                 /* Start client log */
2549                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2550                                        fsdb->fsdb_clilov);
2551                 if (rc)
2552                         GOTO(out_free, rc);
2553                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2554                                        fsdb->fsdb_clilmv);
2555                 if (rc)
2556                         GOTO(out_free, rc);
2557         }
2558         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2559                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2560 out_free:
2561         name_destroy(&logname);
2562         RETURN(rc);
2563 }
2564
2565 static __inline__ int mgs_param_empty(char *ptr)
2566 {
2567         char *tmp;
2568
2569         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2570                 return 1;
2571         return 0;
2572 }
2573
2574 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2575                                           struct mgs_device *mgs,
2576                                           struct fs_db *fsdb,
2577                                           struct mgs_target_info *mti,
2578                                           char *logname, char *cliname)
2579 {
2580         int rc;
2581         struct llog_handle *llh = NULL;
2582
2583         if (mgs_param_empty(mti->mti_params)) {
2584                 /* Remove _all_ failnids */
2585                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2586                                 mti->mti_svname, "add failnid", CM_SKIP);
2587                 return rc < 0 ? rc : 0;
2588         }
2589
2590         /* Otherwise failover nids are additive */
2591         rc = record_start_log(env, mgs, &llh, logname);
2592         if (rc)
2593                 return rc;
2594                 /* FIXME this should be a single journal transaction */
2595         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2596                            "add failnid");
2597         if (rc)
2598                 goto out_end;
2599         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2600         if (rc)
2601                 goto out_end;
2602         rc = record_marker(env, llh, fsdb, CM_END,
2603                            mti->mti_svname, "add failnid");
2604 out_end:
2605         record_end_log(env, &llh);
2606         return rc;
2607 }
2608
2609
2610 /* Add additional failnids to an existing log.
2611    The mdc/osc must have been added to logs first */
2612 /* tcp nids must be in dotted-quad ascii -
2613    we can't resolve hostnames from the kernel. */
2614 static int mgs_write_log_add_failnid(const struct lu_env *env,
2615                                      struct mgs_device *mgs,
2616                                      struct fs_db *fsdb,
2617                                      struct mgs_target_info *mti)
2618 {
2619         char *logname, *cliname;
2620         int rc;
2621         ENTRY;
2622
2623         /* FIXME we currently can't erase the failnids
2624          * given when a target first registers, since they aren't part of
2625          * an "add uuid" stanza */
2626
2627         /* Verify that we know about this target */
2628         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2629                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2630                                    "yet. It must be started before failnids "
2631                                    "can be added.\n", mti->mti_svname);
2632                 RETURN(-ENOENT);
2633         }
2634
2635         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2636         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2637                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2638         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2639                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2640         } else {
2641                 RETURN(-EINVAL);
2642         }
2643         if (rc)
2644                 RETURN(rc);
2645         /* Add failover nids to the client log */
2646         rc = name_create(&logname, mti->mti_fsname, "-client");
2647         if (rc) {
2648                 name_destroy(&cliname);
2649                 RETURN(rc);
2650         }
2651         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2652         name_destroy(&logname);
2653         name_destroy(&cliname);
2654         if (rc)
2655                 RETURN(rc);
2656
2657         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2658                 /* Add OST failover nids to the MDT logs as well */
2659                 int i;
2660
2661                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2662                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2663                                 continue;
2664                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2665                         if (rc)
2666                                 RETURN(rc);
2667                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2668                                                  fsdb, i);
2669                         if (rc) {
2670                                 name_destroy(&logname);
2671                                 RETURN(rc);
2672                         }
2673                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2674                                                             mti, logname,
2675                                                             cliname);
2676                         name_destroy(&cliname);
2677                         name_destroy(&logname);
2678                         if (rc)
2679                                 RETURN(rc);
2680                 }
2681         }
2682
2683         RETURN(rc);
2684 }
2685
2686 static int mgs_wlp_lcfg(const struct lu_env *env,
2687                         struct mgs_device *mgs, struct fs_db *fsdb,
2688                         struct mgs_target_info *mti,
2689                         char *logname, struct lustre_cfg_bufs *bufs,
2690                         char *tgtname, char *ptr)
2691 {
2692         char comment[MTI_NAME_MAXLEN];
2693         char *tmp;
2694         struct llog_cfg_rec *lcr;
2695         int rc, del;
2696
2697         /* Erase any old settings of this same parameter */
2698         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2699         comment[MTI_NAME_MAXLEN - 1] = 0;
2700         /* But don't try to match the value. */
2701         tmp = strchr(comment, '=');
2702         if (tmp != NULL)
2703                 *tmp = 0;
2704         /* FIXME we should skip settings that are the same as old values */
2705         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2706         if (rc < 0)
2707                 return rc;
2708         del = mgs_param_empty(ptr);
2709
2710         LCONSOLE_INFO("%s parameter %s.%s in log %s\n", del ? "Disabling" : rc ?
2711                       "Setting" : "Modifying", tgtname, comment, logname);
2712         if (del) {
2713                 /* mgs_modify() will return 1 if nothing had to be done */
2714                 if (rc == 1)
2715                         rc = 0;
2716                 return rc;
2717         }
2718
2719         lustre_cfg_bufs_reset(bufs, tgtname);
2720         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2721         if (mti->mti_flags & LDD_F_PARAM2)
2722                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2723
2724         lcr = lustre_cfg_rec_new((mti->mti_flags & LDD_F_PARAM2) ?
2725                                  LCFG_SET_PARAM : LCFG_PARAM, bufs);
2726         if (lcr == NULL)
2727                 return -ENOMEM;
2728
2729         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr, tgtname,
2730                                   comment);
2731         lustre_cfg_rec_free(lcr);
2732         return rc;
2733 }
2734
2735 static int mgs_write_log_param2(const struct lu_env *env,
2736                                 struct mgs_device *mgs,
2737                                 struct fs_db *fsdb,
2738                                 struct mgs_target_info *mti, char *ptr)
2739 {
2740         struct lustre_cfg_bufs  bufs;
2741         int                     rc = 0;
2742         ENTRY;
2743
2744         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2745         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2746                           mti->mti_svname, ptr);
2747
2748         RETURN(rc);
2749 }
2750
2751 /* write global variable settings into log */
2752 static int mgs_write_log_sys(const struct lu_env *env,
2753                              struct mgs_device *mgs, struct fs_db *fsdb,
2754                              struct mgs_target_info *mti, char *sys, char *ptr)
2755 {
2756         struct mgs_thread_info  *mgi = mgs_env_info(env);
2757         struct lustre_cfg       *lcfg;
2758         struct llog_cfg_rec     *lcr;
2759         char *tmp, sep;
2760         int rc, cmd, convert = 1;
2761
2762         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2763                 cmd = LCFG_SET_TIMEOUT;
2764         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2765                 cmd = LCFG_SET_LDLM_TIMEOUT;
2766         /* Check for known params here so we can return error to lctl */
2767         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2768                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2769                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2770                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2771                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2772                 cmd = LCFG_PARAM;
2773         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2774                 convert = 0; /* Don't convert string value to integer */
2775                 cmd = LCFG_PARAM;
2776         } else {
2777                 return -EINVAL;
2778         }
2779
2780         if (mgs_param_empty(ptr))
2781                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2782         else
2783                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2784
2785         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2786         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2787         if (!convert && *tmp != '\0')
2788                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2789         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2790         if (lcr == NULL)
2791                 return -ENOMEM;
2792
2793         lcfg = &lcr->lcr_cfg;
2794         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2795         /* truncate the comment to the parameter name */
2796         ptr = tmp - 1;
2797         sep = *ptr;
2798         *ptr = '\0';
2799         /* modify all servers and clients */
2800         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2801                                       *tmp == '\0' ? NULL : lcr,
2802                                       mti->mti_fsname, sys, 0);
2803         if (rc == 0 && *tmp != '\0') {
2804                 switch (cmd) {
2805                 case LCFG_SET_TIMEOUT:
2806                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2807                                 class_process_config(lcfg);
2808                         break;
2809                 case LCFG_SET_LDLM_TIMEOUT:
2810                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2811                                 class_process_config(lcfg);
2812                         break;
2813                 default:
2814                         break;
2815                 }
2816         }
2817         *ptr = sep;
2818         lustre_cfg_rec_free(lcr);
2819         return rc;
2820 }
2821
2822 /* write quota settings into log */
2823 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2824                                struct fs_db *fsdb, struct mgs_target_info *mti,
2825                                char *quota, char *ptr)
2826 {
2827         struct mgs_thread_info  *mgi = mgs_env_info(env);
2828         struct llog_cfg_rec     *lcr;
2829         char                    *tmp;
2830         char                     sep;
2831         int                      rc, cmd = LCFG_PARAM;
2832
2833         /* support only 'meta' and 'data' pools so far */
2834         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2835             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2836                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2837                        "& quota.ost are)\n", ptr);
2838                 return -EINVAL;
2839         }
2840
2841         if (*tmp == '\0') {
2842                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2843         } else {
2844                 CDEBUG(D_MGS, "global '%s'\n", quota);
2845
2846                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2847                     strcmp(tmp, "none") != 0) {
2848                         CERROR("enable option(%s) isn't supported\n", tmp);
2849                         return -EINVAL;
2850                 }
2851         }
2852
2853         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2854         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2855         lcr = lustre_cfg_rec_new(cmd, &mgi->mgi_bufs);
2856         if (lcr == NULL)
2857                 return -ENOMEM;
2858
2859         /* truncate the comment to the parameter name */
2860         ptr = tmp - 1;
2861         sep = *ptr;
2862         *ptr = '\0';
2863
2864         /* XXX we duplicated quota enable information in all server
2865          *     config logs, it should be moved to a separate config
2866          *     log once we cleanup the config log for global param. */
2867         /* modify all servers */
2868         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2869                                       *tmp == '\0' ? NULL : lcr,
2870                                       mti->mti_fsname, quota, 1);
2871         *ptr = sep;
2872         lustre_cfg_rec_free(lcr);
2873         return rc < 0 ? rc : 0;
2874 }
2875
2876 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2877                                    struct mgs_device *mgs,
2878                                    struct fs_db *fsdb,
2879                                    struct mgs_target_info *mti,
2880                                    char *param)
2881 {
2882         struct mgs_thread_info  *mgi = mgs_env_info(env);
2883         struct llog_cfg_rec     *lcr;
2884         struct llog_handle      *llh = NULL;
2885         char                    *logname;
2886         char                    *comment, *ptr;
2887         int                      rc, len;
2888
2889         ENTRY;
2890
2891         /* get comment */
2892         ptr = strchr(param, '=');
2893         LASSERT(ptr != NULL);
2894         len = ptr - param;
2895
2896         OBD_ALLOC(comment, len + 1);
2897         if (comment == NULL)
2898                 RETURN(-ENOMEM);
2899         strncpy(comment, param, len);
2900         comment[len] = '\0';
2901
2902         /* prepare lcfg */
2903         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2904         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2905         lcr = lustre_cfg_rec_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2906         if (lcr == NULL)
2907                 GOTO(out_comment, rc = -ENOMEM);
2908
2909         /* construct log name */
2910         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2911         if (rc < 0)
2912                 GOTO(out_lcfg, rc);
2913
2914         if (mgs_log_is_empty(env, mgs, logname)) {
2915                 rc = record_start_log(env, mgs, &llh, logname);
2916                 if (rc < 0)
2917                         GOTO(out, rc);
2918                 record_end_log(env, &llh);
2919         }
2920
2921         /* obsolete old one */
2922         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2923                         comment, CM_SKIP);
2924         if (rc < 0)
2925                 GOTO(out, rc);
2926         /* write the new one */
2927         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcr,
2928                                   mti->mti_svname, comment);
2929         if (rc)
2930                 CERROR("%s: error writing log %s: rc = %d\n",
2931                        mgs->mgs_obd->obd_name, logname, rc);
2932 out:
2933         name_destroy(&logname);
2934 out_lcfg:
2935         lustre_cfg_rec_free(lcr);
2936 out_comment:
2937         OBD_FREE(comment, len + 1);
2938         RETURN(rc);
2939 }
2940
2941 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2942                                         char *param)
2943 {
2944         char    *ptr;
2945
2946         /* disable the adjustable udesc parameter for now, i.e. use default
2947          * setting that client always ship udesc to MDT if possible. to enable
2948          * it simply remove the following line */
2949         goto error_out;
2950
2951         ptr = strchr(param, '=');
2952         if (ptr == NULL)
2953                 goto error_out;
2954         *ptr++ = '\0';
2955
2956         if (strcmp(param, PARAM_SRPC_UDESC))
2957                 goto error_out;
2958
2959         if (strcmp(ptr, "yes") == 0) {
2960                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2961                 CWARN("Enable user descriptor shipping from client to MDT\n");
2962         } else if (strcmp(ptr, "no") == 0) {
2963                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2964                 CWARN("Disable user descriptor shipping from client to MDT\n");
2965         } else {
2966                 *(ptr - 1) = '=';
2967                 goto error_out;
2968         }
2969         return 0;
2970
2971 error_out:
2972         CERROR("Invalid param: %s\n", param);
2973         return -EINVAL;
2974 }
2975
2976 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2977                                   const char *svname,
2978                                   char *param)
2979 {
2980         struct sptlrpc_rule      rule;
2981         struct sptlrpc_rule_set *rset;
2982         int                      rc;
2983         ENTRY;
2984
2985         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2986                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2987                 RETURN(-EINVAL);
2988         }
2989
2990         if (strncmp(param, PARAM_SRPC_UDESC,
2991                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2992                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2993         }
2994
2995         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2996                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2997                 RETURN(-EINVAL);
2998         }
2999
3000         param += sizeof(PARAM_SRPC_FLVR) - 1;
3001
3002         rc = sptlrpc_parse_rule(param, &rule);
3003         if (rc)
3004                 RETURN(rc);
3005
3006         /* mgs rules implies must be mgc->mgs */
3007         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3008                 if ((rule.sr_from != LUSTRE_SP_MGC &&
3009                      rule.sr_from != LUSTRE_SP_ANY) ||
3010                     (rule.sr_to != LUSTRE_SP_MGS &&
3011                      rule.sr_to != LUSTRE_SP_ANY))
3012                         RETURN(-EINVAL);
3013         }
3014
3015         /* preapre room for this coming rule. svcname format should be:
3016          * - fsname: general rule
3017          * - fsname-tgtname: target-specific rule
3018          */
3019         if (strchr(svname, '-')) {
3020                 struct mgs_tgt_srpc_conf *tgtconf;
3021                 int                       found = 0;
3022
3023                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3024                      tgtconf = tgtconf->mtsc_next) {
3025                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3026                                 found = 1;
3027                                 break;
3028                         }
3029                 }
3030
3031                 if (!found) {
3032                         int name_len;
3033
3034                         OBD_ALLOC_PTR(tgtconf);
3035                         if (tgtconf == NULL)
3036                                 RETURN(-ENOMEM);
3037
3038                         name_len = strlen(svname);
3039
3040                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3041                         if (tgtconf->mtsc_tgt == NULL) {
3042                                 OBD_FREE_PTR(tgtconf);
3043                                 RETURN(-ENOMEM);
3044                         }
3045                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3046
3047                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3048                         fsdb->fsdb_srpc_tgt = tgtconf;
3049                 }
3050
3051                 rset = &tgtconf->mtsc_rset;
3052         } else {
3053                 rset = &fsdb->fsdb_srpc_gen;
3054         }
3055
3056         rc = sptlrpc_rule_set_merge(rset, &rule);
3057
3058         RETURN(rc);
3059 }
3060
3061 static int mgs_srpc_set_param(const struct lu_env *env,
3062                               struct mgs_device *mgs,
3063                               struct fs_db *fsdb,
3064                               struct mgs_target_info *mti,
3065                               char *param)
3066 {
3067         char                   *copy;
3068         int                     rc, copy_size;
3069         ENTRY;
3070
3071 #ifndef HAVE_GSS
3072         RETURN(-EINVAL);
3073 #endif
3074         /* keep a copy of original param, which could be destroied
3075          * during parsing */
3076         copy_size = strlen(param) + 1;
3077         OBD_ALLOC(copy, copy_size);
3078         if (copy == NULL)
3079                 return -ENOMEM;
3080         memcpy(copy, param, copy_size);
3081
3082         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3083         if (rc)
3084                 goto out_free;
3085
3086         /* previous steps guaranteed the syntax is correct */
3087         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3088         if (rc)
3089                 goto out_free;
3090
3091         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3092                 /*
3093                  * for mgs rules, make them effective immediately.
3094                  */
3095                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3096                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3097                                                  &fsdb->fsdb_srpc_gen);
3098         }
3099
3100 out_free:
3101         OBD_FREE(copy, copy_size);
3102         RETURN(rc);
3103 }
3104
3105 struct mgs_srpc_read_data {
3106         struct fs_db   *msrd_fsdb;
3107         int             msrd_skip;
3108 };
3109
3110 static int mgs_srpc_read_handler(const struct lu_env *env,
3111                                  struct llog_handle *llh,
3112                                  struct llog_rec_hdr *rec, void *data)
3113 {
3114         struct mgs_srpc_read_data *msrd = data;
3115         struct cfg_marker         *marker;
3116         struct lustre_cfg         *lcfg = REC_DATA(rec);
3117         char                      *svname, *param;
3118         int                        cfg_len, rc;
3119         ENTRY;
3120
3121         if (rec->lrh_type != OBD_CFG_REC) {
3122                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3123                 RETURN(-EINVAL);
3124         }
3125
3126         cfg_len = REC_DATA_LEN(rec);
3127
3128         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3129         if (rc) {
3130                 CERROR("Insane cfg\n");
3131                 RETURN(rc);
3132         }
3133
3134         if (lcfg->lcfg_command == LCFG_MARKER) {
3135                 marker = lustre_cfg_buf(lcfg, 1);
3136
3137                 if (marker->cm_flags & CM_START &&
3138                     marker->cm_flags & CM_SKIP)
3139                         msrd->msrd_skip = 1;
3140                 if (marker->cm_flags & CM_END)
3141                         msrd->msrd_skip = 0;
3142
3143                 RETURN(0);
3144         }
3145
3146         if (msrd->msrd_skip)
3147                 RETURN(0);
3148
3149         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3150                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3151                 RETURN(0);
3152         }
3153
3154         svname = lustre_cfg_string(lcfg, 0);
3155         if (svname == NULL) {
3156                 CERROR("svname is empty\n");
3157                 RETURN(0);
3158         }
3159
3160         param = lustre_cfg_string(lcfg, 1);
3161         if (param == NULL) {
3162                 CERROR("param is empty\n");
3163                 RETURN(0);
3164         }
3165
3166         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3167         if (rc)
3168                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3169
3170         RETURN(0);
3171 }
3172
3173 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3174                                 struct mgs_device *mgs,
3175                                 struct fs_db *fsdb)
3176 {
3177         struct llog_handle        *llh = NULL;
3178         struct llog_ctxt          *ctxt;
3179         char                      *logname;
3180         struct mgs_srpc_read_data  msrd;
3181         int                        rc;
3182         ENTRY;
3183
3184         /* construct log name */
3185         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3186         if (rc)
3187                 RETURN(rc);
3188
3189         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3190         LASSERT(ctxt != NULL);
3191
3192         if (mgs_log_is_empty(env, mgs, logname))
3193                 GOTO(out, rc = 0);
3194
3195         rc = llog_open(env, ctxt, &llh, NULL, logname,
3196                        LLOG_OPEN_EXISTS);
3197         if (rc < 0) {
3198                 if (rc == -ENOENT)
3199                         rc = 0;
3200                 GOTO(out, rc);
3201         }
3202
3203         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3204         if (rc)
3205                 GOTO(out_close, rc);
3206
3207         if (llog_get_size(llh) <= 1)
3208                 GOTO(out_close, rc = 0);
3209
3210         msrd.msrd_fsdb = fsdb;
3211         msrd.msrd_skip = 0;
3212
3213         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3214                           NULL);
3215
3216 out_close:
3217         llog_close(env, llh);
3218 out:
3219         llog_ctxt_put(ctxt);
3220         name_destroy(&logname);
3221
3222         if (rc)
3223                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3224         RETURN(rc);
3225 }
3226
3227 /* Permanent settings of all parameters by writing into the appropriate
3228  * configuration logs.
3229  * A parameter with null value ("<param>='\0'") means to erase it out of
3230  * the logs.
3231  */
3232 static int mgs_write_log_param(const struct lu_env *env,
3233                                struct mgs_device *mgs, struct fs_db *fsdb,
3234                                struct mgs_target_info *mti, char *ptr)
3235 {
3236         struct mgs_thread_info *mgi = mgs_env_info(env);
3237         char *logname;
3238         char *tmp;
3239         int rc = 0, rc2 = 0;
3240         ENTRY;
3241
3242         /* For various parameter settings, we have to figure out which logs
3243            care about them (e.g. both mdt and client for lov settings) */
3244         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3245
3246         /* The params are stored in MOUNT_DATA_FILE and modified via
3247            tunefs.lustre, or set using lctl conf_param */
3248
3249         /* Processed in lustre_start_mgc */
3250         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3251                 GOTO(end, rc);
3252
3253         /* Processed in ost/mdt */
3254         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)