Whamcloud - gitweb
4c546bbc91179abaf40f8a1b36d2329ee871fc2d
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <lustre_param.h>
50 #include <lustre_sec.h>
51 #include <lustre_quota.h>
52
53 #include "mgs_internal.h"
54
55 /********************** Class functions ********************/
56
57 int class_dentry_readdir(const struct lu_env *env,
58                          struct mgs_device *mgs, cfs_list_t *list)
59 {
60         struct dt_object    *dir = mgs->mgs_configs_dir;
61         const struct dt_it_ops *iops;
62         struct dt_it        *it;
63         struct mgs_direntry *de;
64         char                *key;
65         int                  rc, key_sz;
66
67         CFS_INIT_LIST_HEAD(list);
68
69         LASSERT(dir);
70         LASSERT(dir->do_index_ops);
71
72         iops = &dir->do_index_ops->dio_it;
73         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
74         if (IS_ERR(it))
75                 RETURN(PTR_ERR(it));
76
77         rc = iops->load(env, it, 0);
78         if (rc <= 0)
79                 GOTO(fini, rc = 0);
80
81         /* main cycle */
82         do {
83                 key = (void *)iops->key(env, it);
84                 if (IS_ERR(key)) {
85                         CERROR("%s: key failed when listing %s: rc = %d\n",
86                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
87                                (int) PTR_ERR(key));
88                         goto next;
89                 }
90                 key_sz = iops->key_size(env, it);
91                 LASSERT(key_sz > 0);
92
93                 /* filter out "." and ".." entries */
94                 if (key[0] == '.') {
95                         if (key_sz == 1)
96                                 goto next;
97                         if (key_sz == 2 && key[1] == '.')
98                                 goto next;
99                 }
100
101                 de = mgs_direntry_alloc(key_sz + 1);
102                 if (de == NULL) {
103                         rc = -ENOMEM;
104                         break;
105                 }
106
107                 memcpy(de->name, key, key_sz);
108                 de->name[key_sz] = 0;
109
110                 cfs_list_add(&de->list, list);
111
112 next:
113                 rc = iops->next(env, it);
114         } while (rc == 0);
115         rc = 0;
116
117         iops->put(env, it);
118
119 fini:
120         iops->fini(env, it);
121         if (rc)
122                 CERROR("%s: key failed when listing %s: rc = %d\n",
123                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
124         RETURN(rc);
125 }
126
127 /******************** DB functions *********************/
128
129 static inline int name_create(char **newname, char *prefix, char *suffix)
130 {
131         LASSERT(newname);
132         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
133         if (!*newname)
134                 return -ENOMEM;
135         sprintf(*newname, "%s%s", prefix, suffix);
136         return 0;
137 }
138
139 static inline void name_destroy(char **name)
140 {
141         if (*name)
142                 OBD_FREE(*name, strlen(*name) + 1);
143         *name = NULL;
144 }
145
146 struct mgs_fsdb_handler_data
147 {
148         struct fs_db   *fsdb;
149         __u32           ver;
150 };
151
152 /* from the (client) config log, figure out:
153         1. which ost's/mdt's are configured (by index)
154         2. what the last config step is
155         3. COMPAT_18 osc name
156 */
157 /* It might be better to have a separate db file, instead of parsing the info
158    out of the client log.  This is slow and potentially error-prone. */
159 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
160                             struct llog_rec_hdr *rec, void *data)
161 {
162         struct mgs_fsdb_handler_data *d = data;
163         struct fs_db *fsdb = d->fsdb;
164         int cfg_len = rec->lrh_len;
165         char *cfg_buf = (char*) (rec + 1);
166         struct lustre_cfg *lcfg;
167         __u32 index;
168         int rc = 0;
169         ENTRY;
170
171         if (rec->lrh_type != OBD_CFG_REC) {
172                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
173                 RETURN(-EINVAL);
174         }
175
176         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
177         if (rc) {
178                 CERROR("Insane cfg\n");
179                 RETURN(rc);
180         }
181
182         lcfg = (struct lustre_cfg *)cfg_buf;
183
184         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
185                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
186
187         /* Figure out ost indicies */
188         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
189         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
190             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
191                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
192                                        NULL, 10);
193                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
194                        lustre_cfg_string(lcfg, 1), index,
195                        lustre_cfg_string(lcfg, 2));
196                 set_bit(index, fsdb->fsdb_ost_index_map);
197         }
198
199         /* Figure out mdt indicies */
200         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
201         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
202             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
203                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
204                                        &index, NULL);
205                 if (rc != LDD_F_SV_TYPE_MDT) {
206                         CWARN("Unparsable MDC name %s, assuming index 0\n",
207                               lustre_cfg_string(lcfg, 0));
208                         index = 0;
209                 }
210                 rc = 0;
211                 CDEBUG(D_MGS, "MDT index is %u\n", index);
212                 set_bit(index, fsdb->fsdb_mdt_index_map);
213                 fsdb->fsdb_mdt_count ++;
214         }
215
216         /**
217          * figure out the old config. fsdb_gen = 0 means old log
218          * It is obsoleted and not supported anymore
219          */
220         if (fsdb->fsdb_gen == 0) {
221                 CERROR("Old config format is not supported\n");
222                 RETURN(-EINVAL);
223         }
224
225         /*
226          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
227          */
228         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
229             lcfg->lcfg_command == LCFG_ATTACH &&
230             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
231                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
232                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
233                         CWARN("MDT using 1.8 OSC name scheme\n");
234                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
235                 }
236         }
237
238         if (lcfg->lcfg_command == LCFG_MARKER) {
239                 struct cfg_marker *marker;
240                 marker = lustre_cfg_buf(lcfg, 1);
241
242                 d->ver = marker->cm_vers;
243
244                 /* Keep track of the latest marker step */
245                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
246         }
247
248         RETURN(rc);
249 }
250
251 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
252 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
253                                   struct mgs_device *mgs,
254                                   struct fs_db *fsdb)
255 {
256         char                            *logname;
257         struct llog_handle              *loghandle;
258         struct llog_ctxt                *ctxt;
259         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
260         int rc;
261
262         ENTRY;
263
264         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
265         LASSERT(ctxt != NULL);
266         rc = name_create(&logname, fsdb->fsdb_name, "-client");
267         if (rc)
268                 GOTO(out_put, rc);
269         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
270         if (rc)
271                 GOTO(out_pop, rc);
272
273         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
274         if (rc)
275                 GOTO(out_close, rc);
276
277         if (llog_get_size(loghandle) <= 1)
278                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
279
280         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
281         CDEBUG(D_INFO, "get_db = %d\n", rc);
282 out_close:
283         llog_close(env, loghandle);
284 out_pop:
285         name_destroy(&logname);
286 out_put:
287         llog_ctxt_put(ctxt);
288
289         RETURN(rc);
290 }
291
292 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
293 {
294         struct mgs_tgt_srpc_conf *tgtconf;
295
296         /* free target-specific rules */
297         while (fsdb->fsdb_srpc_tgt) {
298                 tgtconf = fsdb->fsdb_srpc_tgt;
299                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
300
301                 LASSERT(tgtconf->mtsc_tgt);
302
303                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
304                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
305                 OBD_FREE_PTR(tgtconf);
306         }
307
308         /* free general rules */
309         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
310 }
311
312 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
313 {
314         struct fs_db *fsdb;
315         cfs_list_t *tmp;
316
317         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
318                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
319                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
320                         return fsdb;
321         }
322         return NULL;
323 }
324
325 /* caller must hold the mgs->mgs_fs_db_lock */
326 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
327                                   struct mgs_device *mgs, char *fsname)
328 {
329         struct fs_db *fsdb;
330         int rc;
331         ENTRY;
332
333         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
334                 CERROR("fsname %s is too long\n", fsname);
335                 RETURN(NULL);
336         }
337
338         OBD_ALLOC_PTR(fsdb);
339         if (!fsdb)
340                 RETURN(NULL);
341
342         strcpy(fsdb->fsdb_name, fsname);
343         mutex_init(&fsdb->fsdb_mutex);
344         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
345         fsdb->fsdb_gen = 1;
346
347         if (strcmp(fsname, MGSSELF_NAME) == 0) {
348                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
349         } else {
350                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
351                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
352                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
353                         CERROR("No memory for index maps\n");
354                         GOTO(err, rc = -ENOMEM);
355                 }
356
357                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
358                 if (rc)
359                         GOTO(err, rc);
360                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
361                 if (rc)
362                         GOTO(err, rc);
363
364                 /* initialise data for NID table */
365                 mgs_ir_init_fs(env, mgs, fsdb);
366
367                 lproc_mgs_add_live(mgs, fsdb);
368         }
369
370         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
371
372         RETURN(fsdb);
373 err:
374         if (fsdb->fsdb_ost_index_map)
375                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
376         if (fsdb->fsdb_mdt_index_map)
377                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
378         name_destroy(&fsdb->fsdb_clilov);
379         name_destroy(&fsdb->fsdb_clilmv);
380         OBD_FREE_PTR(fsdb);
381         RETURN(NULL);
382 }
383
384 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
385 {
386         /* wait for anyone with the sem */
387         mutex_lock(&fsdb->fsdb_mutex);
388         lproc_mgs_del_live(mgs, fsdb);
389         cfs_list_del(&fsdb->fsdb_list);
390
391         /* deinitialize fsr */
392         mgs_ir_fini_fs(mgs, fsdb);
393
394         if (fsdb->fsdb_ost_index_map)
395                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
396         if (fsdb->fsdb_mdt_index_map)
397                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
398         name_destroy(&fsdb->fsdb_clilov);
399         name_destroy(&fsdb->fsdb_clilmv);
400         mgs_free_fsdb_srpc(fsdb);
401         mutex_unlock(&fsdb->fsdb_mutex);
402         OBD_FREE_PTR(fsdb);
403 }
404
405 int mgs_init_fsdb_list(struct mgs_device *mgs)
406 {
407         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
408         return 0;
409 }
410
411 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
412 {
413         struct fs_db *fsdb;
414         cfs_list_t *tmp, *tmp2;
415         mutex_lock(&mgs->mgs_mutex);
416         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
417                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
418                 mgs_free_fsdb(mgs, fsdb);
419         }
420         mutex_unlock(&mgs->mgs_mutex);
421         return 0;
422 }
423
424 int mgs_find_or_make_fsdb(const struct lu_env *env,
425                           struct mgs_device *mgs, char *name,
426                           struct fs_db **dbh)
427 {
428         struct fs_db *fsdb;
429         int rc = 0;
430
431         ENTRY;
432         mutex_lock(&mgs->mgs_mutex);
433         fsdb = mgs_find_fsdb(mgs, name);
434         if (fsdb) {
435                 mutex_unlock(&mgs->mgs_mutex);
436                 *dbh = fsdb;
437                 RETURN(0);
438         }
439
440         CDEBUG(D_MGS, "Creating new db\n");
441         fsdb = mgs_new_fsdb(env, mgs, name);
442         /* lock fsdb_mutex until the db is loaded from llogs */
443         if (fsdb)
444                 mutex_lock(&fsdb->fsdb_mutex);
445         mutex_unlock(&mgs->mgs_mutex);
446         if (!fsdb)
447                 RETURN(-ENOMEM);
448
449         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
450                 /* populate the db from the client llog */
451                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
452                 if (rc) {
453                         CERROR("Can't get db from client log %d\n", rc);
454                         GOTO(out_free, rc);
455                 }
456         }
457
458         /* populate srpc rules from params llog */
459         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
460         if (rc) {
461                 CERROR("Can't get db from params log %d\n", rc);
462                 GOTO(out_free, rc);
463         }
464
465         mutex_unlock(&fsdb->fsdb_mutex);
466         *dbh = fsdb;
467
468         RETURN(0);
469
470 out_free:
471         mutex_unlock(&fsdb->fsdb_mutex);
472         mgs_free_fsdb(mgs, fsdb);
473         return rc;
474 }
475
476 /* 1 = index in use
477    0 = index unused
478    -1= empty client log */
479 int mgs_check_index(const struct lu_env *env,
480                     struct mgs_device *mgs,
481                     struct mgs_target_info *mti)
482 {
483         struct fs_db *fsdb;
484         void *imap;
485         int rc = 0;
486         ENTRY;
487
488         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
489
490         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
491         if (rc) {
492                 CERROR("Can't get db for %s\n", mti->mti_fsname);
493                 RETURN(rc);
494         }
495
496         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
497                 RETURN(-1);
498
499         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
500                 imap = fsdb->fsdb_ost_index_map;
501         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
502                 imap = fsdb->fsdb_mdt_index_map;
503         else
504                 RETURN(-EINVAL);
505
506         if (test_bit(mti->mti_stripe_index, imap))
507                 RETURN(1);
508         RETURN(0);
509 }
510
511 static __inline__ int next_index(void *index_map, int map_len)
512 {
513         int i;
514         for (i = 0; i < map_len * 8; i++)
515                 if (!test_bit(i, index_map)) {
516                          return i;
517                  }
518         CERROR("max index %d exceeded.\n", i);
519         return -1;
520 }
521
522 /* Return codes:
523         0  newly marked as in use
524         <0 err
525         +EALREADY for update of an old index */
526 static int mgs_set_index(const struct lu_env *env,
527                          struct mgs_device *mgs,
528                          struct mgs_target_info *mti)
529 {
530         struct fs_db *fsdb;
531         void *imap;
532         int rc = 0;
533         ENTRY;
534
535         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
536         if (rc) {
537                 CERROR("Can't get db for %s\n", mti->mti_fsname);
538                 RETURN(rc);
539         }
540
541         mutex_lock(&fsdb->fsdb_mutex);
542         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
543                 imap = fsdb->fsdb_ost_index_map;
544         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
545                 imap = fsdb->fsdb_mdt_index_map;
546         } else {
547                 GOTO(out_up, rc = -EINVAL);
548         }
549
550         if (mti->mti_flags & LDD_F_NEED_INDEX) {
551                 rc = next_index(imap, INDEX_MAP_SIZE);
552                 if (rc == -1)
553                         GOTO(out_up, rc = -ERANGE);
554                 mti->mti_stripe_index = rc;
555                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
556                         fsdb->fsdb_mdt_count ++;
557         }
558
559         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
560                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
561                                    "but the max index is %d.\n",
562                                    mti->mti_svname, mti->mti_stripe_index,
563                                    INDEX_MAP_SIZE * 8);
564                 GOTO(out_up, rc = -ERANGE);
565         }
566
567         if (test_bit(mti->mti_stripe_index, imap)) {
568                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
569                     !(mti->mti_flags & LDD_F_WRITECONF)) {
570                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
571                                            "%d, but that index is already in "
572                                            "use. Use --writeconf to force\n",
573                                            mti->mti_svname,
574                                            mti->mti_stripe_index);
575                         GOTO(out_up, rc = -EADDRINUSE);
576                 } else {
577                         CDEBUG(D_MGS, "Server %s updating index %d\n",
578                                mti->mti_svname, mti->mti_stripe_index);
579                         GOTO(out_up, rc = EALREADY);
580                 }
581         }
582
583         set_bit(mti->mti_stripe_index, imap);
584         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
585         mutex_unlock(&fsdb->fsdb_mutex);
586         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
587                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
588
589         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
590                mti->mti_stripe_index);
591
592         RETURN(0);
593 out_up:
594         mutex_unlock(&fsdb->fsdb_mutex);
595         return rc;
596 }
597
598 struct mgs_modify_lookup {
599         struct cfg_marker mml_marker;
600         int               mml_modified;
601 };
602
603 static int mgs_modify_handler(const struct lu_env *env,
604                               struct llog_handle *llh,
605                               struct llog_rec_hdr *rec, void *data)
606 {
607         struct mgs_modify_lookup *mml = data;
608         struct cfg_marker *marker;
609         struct lustre_cfg *lcfg = REC_DATA(rec);
610         int cfg_len = REC_DATA_LEN(rec);
611         int rc;
612         ENTRY;
613
614         if (rec->lrh_type != OBD_CFG_REC) {
615                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
616                 RETURN(-EINVAL);
617         }
618
619         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
620         if (rc) {
621                 CERROR("Insane cfg\n");
622                 RETURN(rc);
623         }
624
625         /* We only care about markers */
626         if (lcfg->lcfg_command != LCFG_MARKER)
627                 RETURN(0);
628
629         marker = lustre_cfg_buf(lcfg, 1);
630         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
631             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
632             !(marker->cm_flags & CM_SKIP)) {
633                 /* Found a non-skipped marker match */
634                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
635                        rec->lrh_index, marker->cm_step,
636                        marker->cm_flags, mml->mml_marker.cm_flags,
637                        marker->cm_tgtname, marker->cm_comment);
638                 /* Overwrite the old marker llog entry */
639                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
640                 marker->cm_flags |= mml->mml_marker.cm_flags;
641                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
642                 /* Header and tail are added back to lrh_len in
643                    llog_lvfs_write_rec */
644                 rec->lrh_len = cfg_len;
645                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
646                                 rec->lrh_index);
647                 if (!rc)
648                          mml->mml_modified++;
649         }
650
651         RETURN(rc);
652 }
653
654 /**
655  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
656  * Return code:
657  * 0 - modified successfully,
658  * 1 - no modification was done
659  * negative - error
660  */
661 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
662                       struct fs_db *fsdb, struct mgs_target_info *mti,
663                       char *logname, char *devname, char *comment, int flags)
664 {
665         struct llog_handle *loghandle;
666         struct llog_ctxt *ctxt;
667         struct mgs_modify_lookup *mml;
668         int rc;
669
670         ENTRY;
671
672         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
673         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
674                flags);
675
676         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
677         LASSERT(ctxt != NULL);
678         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
679         if (rc < 0) {
680                 if (rc == -ENOENT)
681                         rc = 0;
682                 GOTO(out_pop, rc);
683         }
684
685         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
686         if (rc)
687                 GOTO(out_close, rc);
688
689         if (llog_get_size(loghandle) <= 1)
690                 GOTO(out_close, rc = 0);
691
692         OBD_ALLOC_PTR(mml);
693         if (!mml)
694                 GOTO(out_close, rc = -ENOMEM);
695         if (strlcpy(mml->mml_marker.cm_comment, comment,
696                     sizeof(mml->mml_marker.cm_comment)) >=
697             sizeof(mml->mml_marker.cm_comment))
698                 GOTO(out_free, rc = -E2BIG);
699         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
700                     sizeof(mml->mml_marker.cm_tgtname)) >=
701             sizeof(mml->mml_marker.cm_tgtname))
702                 GOTO(out_free, rc = -E2BIG);
703         /* Modify mostly means cancel */
704         mml->mml_marker.cm_flags = flags;
705         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
706         mml->mml_modified = 0;
707         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
708                           NULL);
709         if (!rc && !mml->mml_modified)
710                 rc = 1;
711
712 out_free:
713         OBD_FREE_PTR(mml);
714
715 out_close:
716         llog_close(env, loghandle);
717 out_pop:
718         if (rc < 0)
719                 CERROR("%s: modify %s/%s failed: rc = %d\n",
720                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
721         llog_ctxt_put(ctxt);
722         RETURN(rc);
723 }
724
725 /** This structure is passed to mgs_replace_handler */
726 struct mgs_replace_uuid_lookup {
727         /* Nids are replaced for this target device */
728         struct mgs_target_info target;
729         /* Temporary modified llog */
730         struct llog_handle *temp_llh;
731         /* Flag is set if in target block*/
732         int in_target_device;
733         /* Nids already added. Just skip (multiple nids) */
734         int device_nids_added;
735         /* Flag is set if this block should not be copied */
736         int skip_it;
737 };
738
739 /**
740  * Check: a) if block should be skipped
741  * b) is it target block
742  *
743  * \param[in] lcfg
744  * \param[in] mrul
745  *
746  * \retval 0 should not to be skipped
747  * \retval 1 should to be skipped
748  */
749 static int check_markers(struct lustre_cfg *lcfg,
750                          struct mgs_replace_uuid_lookup *mrul)
751 {
752          struct cfg_marker *marker;
753
754         /* Track markers. Find given device */
755         if (lcfg->lcfg_command == LCFG_MARKER) {
756                 marker = lustre_cfg_buf(lcfg, 1);
757                 /* Clean llog from records marked as CM_EXCLUDE.
758                    CM_SKIP records are used for "active" command
759                    and can be restored if needed */
760                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
761                     (CM_EXCLUDE | CM_START)) {
762                         mrul->skip_it = 1;
763                         return 1;
764                 }
765
766                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
767                     (CM_EXCLUDE | CM_END)) {
768                         mrul->skip_it = 0;
769                         return 1;
770                 }
771
772                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
773                         LASSERT(!(marker->cm_flags & CM_START) ||
774                                 !(marker->cm_flags & CM_END));
775                         if (marker->cm_flags & CM_START) {
776                                 mrul->in_target_device = 1;
777                                 mrul->device_nids_added = 0;
778                         } else if (marker->cm_flags & CM_END)
779                                 mrul->in_target_device = 0;
780                 }
781         }
782
783         return 0;
784 }
785
786 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
787                        struct lustre_cfg *lcfg)
788 {
789         struct llog_rec_hdr      rec;
790         int                      buflen, rc;
791
792         if (!lcfg || !llh)
793                 return -ENOMEM;
794
795         LASSERT(llh->lgh_ctxt);
796
797         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
798                                 lcfg->lcfg_buflens);
799         rec.lrh_len = llog_data_len(buflen);
800         rec.lrh_type = OBD_CFG_REC;
801
802         /* idx = -1 means append */
803         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
804         if (rc)
805                 CERROR("failed %d\n", rc);
806         return rc;
807 }
808
809 static int record_base(const struct lu_env *env, struct llog_handle *llh,
810                      char *cfgname, lnet_nid_t nid, int cmd,
811                      char *s1, char *s2, char *s3, char *s4)
812 {
813         struct mgs_thread_info *mgi = mgs_env_info(env);
814         struct lustre_cfg     *lcfg;
815         int rc;
816
817         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
818                cmd, s1, s2, s3, s4);
819
820         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
821         if (s1)
822                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
823         if (s2)
824                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
825         if (s3)
826                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
827         if (s4)
828                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
829
830         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
831         if (!lcfg)
832                 return -ENOMEM;
833         lcfg->lcfg_nid = nid;
834
835         rc = record_lcfg(env, llh, lcfg);
836
837         lustre_cfg_free(lcfg);
838
839         if (rc) {
840                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
841                        cmd, s1, s2, s3, s4);
842         }
843         return rc;
844 }
845
846 static inline int record_add_uuid(const struct lu_env *env,
847                                   struct llog_handle *llh,
848                                   uint64_t nid, char *uuid)
849 {
850         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
851 }
852
853 static inline int record_add_conn(const struct lu_env *env,
854                                   struct llog_handle *llh,
855                                   char *devname, char *uuid)
856 {
857         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
858 }
859
860 static inline int record_attach(const struct lu_env *env,
861                                 struct llog_handle *llh, char *devname,
862                                 char *type, char *uuid)
863 {
864         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
865 }
866
867 static inline int record_setup(const struct lu_env *env,
868                                struct llog_handle *llh, char *devname,
869                                char *s1, char *s2, char *s3, char *s4)
870 {
871         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
872 }
873
874 /**
875  * \retval <0 record processing error
876  * \retval n record is processed. No need copy original one.
877  * \retval 0 record is not processed.
878  */
879 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
880                            struct mgs_replace_uuid_lookup *mrul)
881 {
882         int nids_added = 0;
883         lnet_nid_t nid;
884         char *ptr;
885         int rc;
886
887         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
888                 /* LCFG_ADD_UUID command found. Let's skip original command
889                    and add passed nids */
890                 ptr = mrul->target.mti_params;
891                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
892                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
893                                "device %s\n", libcfs_nid2str(nid),
894                                 mrul->target.mti_params,
895                                 mrul->target.mti_svname);
896                         rc = record_add_uuid(env,
897                                              mrul->temp_llh, nid,
898                                              mrul->target.mti_params);
899                         if (!rc)
900                                 nids_added++;
901                 }
902
903                 if (nids_added == 0) {
904                         CERROR("No new nids were added, nid %s with uuid %s, "
905                                "device %s\n", libcfs_nid2str(nid),
906                                mrul->target.mti_params,
907                                mrul->target.mti_svname);
908                         RETURN(-ENXIO);
909                 } else {
910                         mrul->device_nids_added = 1;
911                 }
912
913                 return nids_added;
914         }
915
916         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
917                 /* LCFG_SETUP command found. UUID should be changed */
918                 rc = record_setup(env,
919                                   mrul->temp_llh,
920                                   /* devname the same */
921                                   lustre_cfg_string(lcfg, 0),
922                                   /* s1 is not changed */
923                                   lustre_cfg_string(lcfg, 1),
924                                   /* new uuid should be
925                                   the full nidlist */
926                                   mrul->target.mti_params,
927                                   /* s3 is not changed */
928                                   lustre_cfg_string(lcfg, 3),
929                                   /* s4 is not changed */
930                                   lustre_cfg_string(lcfg, 4));
931                 return rc ? rc : 1;
932         }
933
934         /* Another commands in target device block */
935         return 0;
936 }
937
938 /**
939  * Handler that called for every record in llog.
940  * Records are processed in order they placed in llog.
941  *
942  * \param[in] llh       log to be processed
943  * \param[in] rec       current record
944  * \param[in] data      mgs_replace_uuid_lookup structure
945  *
946  * \retval 0    success
947  */
948 static int mgs_replace_handler(const struct lu_env *env,
949                                struct llog_handle *llh,
950                                struct llog_rec_hdr *rec,
951                                void *data)
952 {
953         struct mgs_replace_uuid_lookup *mrul;
954         struct lustre_cfg *lcfg = REC_DATA(rec);
955         int cfg_len = REC_DATA_LEN(rec);
956         int rc;
957         ENTRY;
958
959         mrul = (struct mgs_replace_uuid_lookup *)data;
960
961         if (rec->lrh_type != OBD_CFG_REC) {
962                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
963                        rec->lrh_type, lcfg->lcfg_command,
964                        lustre_cfg_string(lcfg, 0),
965                        lustre_cfg_string(lcfg, 1));
966                 RETURN(-EINVAL);
967         }
968
969         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
970         if (rc) {
971                 /* Do not copy any invalidated records */
972                 GOTO(skip_out, rc = 0);
973         }
974
975         rc = check_markers(lcfg, mrul);
976         if (rc || mrul->skip_it)
977                 GOTO(skip_out, rc = 0);
978
979         /* Write to new log all commands outside target device block */
980         if (!mrul->in_target_device)
981                 GOTO(copy_out, rc = 0);
982
983         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
984            (failover nids) for this target, assuming that if then
985            primary is changing then so is the failover */
986         if (mrul->device_nids_added &&
987             (lcfg->lcfg_command == LCFG_ADD_UUID ||
988              lcfg->lcfg_command == LCFG_ADD_CONN))
989                 GOTO(skip_out, rc = 0);
990
991         rc = process_command(env, lcfg, mrul);
992         if (rc < 0)
993                 RETURN(rc);
994
995         if (rc)
996                 RETURN(0);
997 copy_out:
998         /* Record is placed in temporary llog as is */
999         rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1);
1000
1001         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1002                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1003                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1004         RETURN(rc);
1005
1006 skip_out:
1007         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1008                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1009                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1010         RETURN(rc);
1011 }
1012
1013 static int mgs_log_is_empty(const struct lu_env *env,
1014                             struct mgs_device *mgs, char *name)
1015 {
1016         struct llog_ctxt        *ctxt;
1017         int                      rc;
1018
1019         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1020         LASSERT(ctxt != NULL);
1021
1022         rc = llog_is_empty(env, ctxt, name);
1023         llog_ctxt_put(ctxt);
1024         return rc;
1025 }
1026
1027 static int mgs_replace_nids_log(const struct lu_env *env,
1028                                 struct obd_device *mgs, struct fs_db *fsdb,
1029                                 char *logname, char *devname, char *nids)
1030 {
1031         struct llog_handle *orig_llh, *backup_llh;
1032         struct llog_ctxt *ctxt;
1033         struct mgs_replace_uuid_lookup *mrul;
1034         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1035         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1036         char *backup;
1037         int rc, rc2;
1038         ENTRY;
1039
1040         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1041
1042         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1043         LASSERT(ctxt != NULL);
1044
1045         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1046                 /* Log is empty. Nothing to replace */
1047                 GOTO(out_put, rc = 0);
1048         }
1049
1050         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1051         if (backup == NULL)
1052                 GOTO(out_put, rc = -ENOMEM);
1053
1054         sprintf(backup, "%s.bak", logname);
1055
1056         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1057         if (rc == 0) {
1058                 /* Now erase original log file. Connections are not allowed.
1059                    Backup is already saved */
1060                 rc = llog_erase(env, ctxt, NULL, logname);
1061                 if (rc < 0)
1062                         GOTO(out_free, rc);
1063         } else if (rc != -ENOENT) {
1064                 CERROR("%s: can't make backup for %s: rc = %d\n",
1065                        mgs->obd_name, logname, rc);
1066                 GOTO(out_free,rc);
1067         }
1068
1069         /* open local log */
1070         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1071         if (rc)
1072                 GOTO(out_restore, rc);
1073
1074         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1075         if (rc)
1076                 GOTO(out_closel, rc);
1077
1078         /* open backup llog */
1079         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1080                        LLOG_OPEN_EXISTS);
1081         if (rc)
1082                 GOTO(out_closel, rc);
1083
1084         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1085         if (rc)
1086                 GOTO(out_close, rc);
1087
1088         if (llog_get_size(backup_llh) <= 1)
1089                 GOTO(out_close, rc = 0);
1090
1091         OBD_ALLOC_PTR(mrul);
1092         if (!mrul)
1093                 GOTO(out_close, rc = -ENOMEM);
1094         /* devname is only needed information to replace UUID records */
1095         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1096         /* parse nids later */
1097         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1098         /* Copy records to this temporary llog */
1099         mrul->temp_llh = orig_llh;
1100
1101         rc = llog_process(env, backup_llh, mgs_replace_handler,
1102                           (void *)mrul, NULL);
1103         OBD_FREE_PTR(mrul);
1104 out_close:
1105         rc2 = llog_close(NULL, backup_llh);
1106         if (!rc)
1107                 rc = rc2;
1108 out_closel:
1109         rc2 = llog_close(NULL, orig_llh);
1110         if (!rc)
1111                 rc = rc2;
1112
1113 out_restore:
1114         if (rc) {
1115                 CERROR("%s: llog should be restored: rc = %d\n",
1116                        mgs->obd_name, rc);
1117                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1118                                   logname);
1119                 if (rc2 < 0)
1120                         CERROR("%s: can't restore backup %s: rc = %d\n",
1121                                mgs->obd_name, logname, rc2);
1122         }
1123
1124 out_free:
1125         OBD_FREE(backup, strlen(backup) + 1);
1126
1127 out_put:
1128         llog_ctxt_put(ctxt);
1129
1130         if (rc)
1131                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1132                        mgs->obd_name, logname, rc);
1133
1134         RETURN(rc);
1135 }
1136
1137 /**
1138  * Parse device name and get file system name and/or device index
1139  *
1140  * \param[in]   devname device name (ex. lustre-MDT0000)
1141  * \param[out]  fsname  file system name(optional)
1142  * \param[out]  index   device index(optional)
1143  *
1144  * \retval 0    success
1145  */
1146 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1147 {
1148         int rc;
1149         ENTRY;
1150
1151         /* Extract fsname */
1152         if (fsname) {
1153                 rc = server_name2fsname(devname, fsname, NULL);
1154                 if (rc < 0) {
1155                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1156                                devname);
1157                         RETURN(-EINVAL);
1158                 }
1159         }
1160
1161         if (index) {
1162                 rc = server_name2index(devname, index, NULL);
1163                 if (rc < 0) {
1164                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1165                                devname);
1166                         RETURN(-EINVAL);
1167                 }
1168         }
1169
1170         RETURN(0);
1171 }
1172
1173 static int only_mgs_is_running(struct obd_device *mgs_obd)
1174 {
1175         /* TDB: Is global variable with devices count exists? */
1176         int num_devices = get_devices_count();
1177         /* osd, MGS and MGC + self_export
1178            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1179         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1180 }
1181
1182 static int name_create_mdt(char **logname, char *fsname, int i)
1183 {
1184         char mdt_index[9];
1185
1186         sprintf(mdt_index, "-MDT%04x", i);
1187         return name_create(logname, fsname, mdt_index);
1188 }
1189
1190 /**
1191  * Replace nids for \a device to \a nids values
1192  *
1193  * \param obd           MGS obd device
1194  * \param devname       nids need to be replaced for this device
1195  * (ex. lustre-OST0000)
1196  * \param nids          nids list (ex. nid1,nid2,nid3)
1197  *
1198  * \retval 0    success
1199  */
1200 int mgs_replace_nids(const struct lu_env *env,
1201                      struct mgs_device *mgs,
1202                      char *devname, char *nids)
1203 {
1204         /* Assume fsname is part of device name */
1205         char fsname[MTI_NAME_MAXLEN];
1206         int rc;
1207         __u32 index;
1208         char *logname;
1209         struct fs_db *fsdb;
1210         unsigned int i;
1211         int conn_state;
1212         struct obd_device *mgs_obd = mgs->mgs_obd;
1213         ENTRY;
1214
1215         /* We can only change NIDs if no other nodes are connected */
1216         spin_lock(&mgs_obd->obd_dev_lock);
1217         conn_state = mgs_obd->obd_no_conn;
1218         mgs_obd->obd_no_conn = 1;
1219         spin_unlock(&mgs_obd->obd_dev_lock);
1220
1221         /* We can not change nids if not only MGS is started */
1222         if (!only_mgs_is_running(mgs_obd)) {
1223                 CERROR("Only MGS is allowed to be started\n");
1224                 GOTO(out, rc = -EINPROGRESS);
1225         }
1226
1227         /* Get fsname and index*/
1228         rc = mgs_parse_devname(devname, fsname, &index);
1229         if (rc)
1230                 GOTO(out, rc);
1231
1232         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1233         if (rc) {
1234                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1235                 GOTO(out, rc);
1236         }
1237
1238         /* Process client llogs */
1239         name_create(&logname, fsname, "-client");
1240         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1241         name_destroy(&logname);
1242         if (rc) {
1243                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1244                        fsname, devname, rc);
1245                 GOTO(out, rc);
1246         }
1247
1248         /* Process MDT llogs */
1249         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1250                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1251                         continue;
1252                 name_create_mdt(&logname, fsname, i);
1253                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1254                 name_destroy(&logname);
1255                 if (rc)
1256                         GOTO(out, rc);
1257         }
1258
1259 out:
1260         spin_lock(&mgs_obd->obd_dev_lock);
1261         mgs_obd->obd_no_conn = conn_state;
1262         spin_unlock(&mgs_obd->obd_dev_lock);
1263
1264         RETURN(rc);
1265 }
1266
1267 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1268                             char *devname, struct lov_desc *desc)
1269 {
1270         struct mgs_thread_info *mgi = mgs_env_info(env);
1271         struct lustre_cfg *lcfg;
1272         int rc;
1273
1274         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1275         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1276         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1277         if (!lcfg)
1278                 return -ENOMEM;
1279         rc = record_lcfg(env, llh, lcfg);
1280
1281         lustre_cfg_free(lcfg);
1282         return rc;
1283 }
1284
1285 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1286                             char *devname, struct lmv_desc *desc)
1287 {
1288         struct mgs_thread_info *mgi = mgs_env_info(env);
1289         struct lustre_cfg *lcfg;
1290         int rc;
1291
1292         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1293         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1294         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1295
1296         rc = record_lcfg(env, llh, lcfg);
1297
1298         lustre_cfg_free(lcfg);
1299         return rc;
1300 }
1301
1302 static inline int record_mdc_add(const struct lu_env *env,
1303                                  struct llog_handle *llh,
1304                                  char *logname, char *mdcuuid,
1305                                  char *mdtuuid, char *index,
1306                                  char *gen)
1307 {
1308         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1309                            mdtuuid,index,gen,mdcuuid);
1310 }
1311
1312 static inline int record_lov_add(const struct lu_env *env,
1313                                  struct llog_handle *llh,
1314                                  char *lov_name, char *ost_uuid,
1315                                  char *index, char *gen)
1316 {
1317         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1318                            ost_uuid, index, gen, 0);
1319 }
1320
1321 static inline int record_mount_opt(const struct lu_env *env,
1322                                    struct llog_handle *llh,
1323                                    char *profile, char *lov_name,
1324                                    char *mdc_name)
1325 {
1326         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1327                            profile,lov_name,mdc_name,0);
1328 }
1329
1330 static int record_marker(const struct lu_env *env,
1331                          struct llog_handle *llh,
1332                          struct fs_db *fsdb, __u32 flags,
1333                          char *tgtname, char *comment)
1334 {
1335         struct mgs_thread_info *mgi = mgs_env_info(env);
1336         struct lustre_cfg *lcfg;
1337         int rc;
1338         int cplen = 0;
1339
1340         if (flags & CM_START)
1341                 fsdb->fsdb_gen++;
1342         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1343         mgi->mgi_marker.cm_flags = flags;
1344         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1345         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1346                         sizeof(mgi->mgi_marker.cm_tgtname));
1347         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1348                 return -E2BIG;
1349         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1350                         sizeof(mgi->mgi_marker.cm_comment));
1351         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1352                 return -E2BIG;
1353         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1354         mgi->mgi_marker.cm_canceltime = 0;
1355         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1356         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1357                             sizeof(mgi->mgi_marker));
1358         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1359         if (!lcfg)
1360                 return -ENOMEM;
1361         rc = record_lcfg(env, llh, lcfg);
1362
1363         lustre_cfg_free(lcfg);
1364         return rc;
1365 }
1366
1367 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1368                             struct llog_handle **llh, char *name)
1369 {
1370         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1371         struct llog_ctxt        *ctxt;
1372         int                      rc = 0;
1373
1374         if (*llh)
1375                 GOTO(out, rc = -EBUSY);
1376
1377         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1378         if (!ctxt)
1379                 GOTO(out, rc = -ENODEV);
1380         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1381
1382         rc = llog_open_create(env, ctxt, llh, NULL, name);
1383         if (rc)
1384                 GOTO(out_ctxt, rc);
1385         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1386         if (rc)
1387                 llog_close(env, *llh);
1388 out_ctxt:
1389         llog_ctxt_put(ctxt);
1390 out:
1391         if (rc) {
1392                 CERROR("%s: can't start log %s: rc = %d\n",
1393                        mgs->mgs_obd->obd_name, name, rc);
1394                 *llh = NULL;
1395         }
1396         RETURN(rc);
1397 }
1398
1399 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1400 {
1401         int rc;
1402
1403         rc = llog_close(env, *llh);
1404         *llh = NULL;
1405
1406         return rc;
1407 }
1408
1409 /******************** config "macros" *********************/
1410
1411 /* write an lcfg directly into a log (with markers) */
1412 static int mgs_write_log_direct(const struct lu_env *env,
1413                                 struct mgs_device *mgs, struct fs_db *fsdb,
1414                                 char *logname, struct lustre_cfg *lcfg,
1415                                 char *devname, char *comment)
1416 {
1417         struct llog_handle *llh = NULL;
1418         int rc;
1419         ENTRY;
1420
1421         if (!lcfg)
1422                 RETURN(-ENOMEM);
1423
1424         rc = record_start_log(env, mgs, &llh, logname);
1425         if (rc)
1426                 RETURN(rc);
1427
1428         /* FIXME These should be a single journal transaction */
1429         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1430         if (rc)
1431                 GOTO(out_end, rc);
1432         rc = record_lcfg(env, llh, lcfg);
1433         if (rc)
1434                 GOTO(out_end, rc);
1435         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1436         if (rc)
1437                 GOTO(out_end, rc);
1438 out_end:
1439         record_end_log(env, &llh);
1440         RETURN(rc);
1441 }
1442
1443 /* write the lcfg in all logs for the given fs */
1444 int mgs_write_log_direct_all(const struct lu_env *env,
1445                              struct mgs_device *mgs,
1446                              struct fs_db *fsdb,
1447                              struct mgs_target_info *mti,
1448                              struct lustre_cfg *lcfg,
1449                              char *devname, char *comment,
1450                              int server_only)
1451 {
1452         cfs_list_t list;
1453         struct mgs_direntry *dirent, *n;
1454         char *fsname = mti->mti_fsname;
1455         char *logname;
1456         int rc = 0, len = strlen(fsname);
1457         ENTRY;
1458
1459         /* We need to set params for any future logs
1460            as well. FIXME Append this file to every new log.
1461            Actually, we should store as params (text), not llogs.  Or
1462            in a database. */
1463         rc = name_create(&logname, fsname, "-params");
1464         if (rc)
1465                 RETURN(rc);
1466         if (mgs_log_is_empty(env, mgs, logname)) {
1467                 struct llog_handle *llh = NULL;
1468                 rc = record_start_log(env, mgs, &llh, logname);
1469                 if (rc == 0)
1470                         record_end_log(env, &llh);
1471         }
1472         name_destroy(&logname);
1473         if (rc)
1474                 RETURN(rc);
1475
1476         /* Find all the logs in the CONFIGS directory */
1477         rc = class_dentry_readdir(env, mgs, &list);
1478         if (rc)
1479                 RETURN(rc);
1480
1481         /* Could use fsdb index maps instead of directory listing */
1482         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1483                 cfs_list_del(&dirent->list);
1484                 /* don't write to sptlrpc rule log */
1485                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1486                         goto next;
1487
1488                 /* caller wants write server logs only */
1489                 if (server_only && strstr(dirent->name, "-client") != NULL)
1490                         goto next;
1491
1492                 if (strncmp(fsname, dirent->name, len) == 0) {
1493                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1494                         /* Erase any old settings of this same parameter */
1495                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1496                                         devname, comment, CM_SKIP);
1497                         if (rc < 0)
1498                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1499                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1500                         /* Write the new one */
1501                         if (lcfg) {
1502                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1503                                                           dirent->name,
1504                                                           lcfg, devname,
1505                                                           comment);
1506                                 if (rc)
1507                                         CERROR("%s: writing log %s: rc = %d\n",
1508                                                mgs->mgs_obd->obd_name,
1509                                                dirent->name, rc);
1510                         }
1511                 }
1512 next:
1513                 mgs_direntry_free(dirent);
1514         }
1515
1516         RETURN(rc);
1517 }
1518
1519 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1520                                     struct mgs_device *mgs,
1521                                     struct fs_db *fsdb,
1522                                     struct mgs_target_info *mti,
1523                                     int index, char *logname);
1524 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1525                                     struct mgs_device *mgs,
1526                                     struct fs_db *fsdb,
1527                                     struct mgs_target_info *mti,
1528                                     char *logname, char *suffix, char *lovname,
1529                                     enum lustre_sec_part sec_part, int flags);
1530 static int name_create_mdt_and_lov(char **logname, char **lovname,
1531                                    struct fs_db *fsdb, int i);
1532
1533 static int add_param(char *params, char *key, char *val)
1534 {
1535         char *start = params + strlen(params);
1536         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1537         int keylen = 0;
1538
1539         if (key != NULL)
1540                 keylen = strlen(key);
1541         if (start + 1 + keylen + strlen(val) >= end) {
1542                 CERROR("params are too long: %s %s%s\n",
1543                        params, key != NULL ? key : "", val);
1544                 return -EINVAL;
1545         }
1546
1547         sprintf(start, " %s%s", key != NULL ? key : "", val);
1548         return 0;
1549 }
1550
1551 /**
1552  * Walk through client config log record and convert the related records
1553  * into the target.
1554  **/
1555 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1556                                          struct llog_handle *llh,
1557                                          struct llog_rec_hdr *rec, void *data)
1558 {
1559         struct mgs_device *mgs;
1560         struct obd_device *obd;
1561         struct mgs_target_info *mti, *tmti;
1562         struct fs_db *fsdb;
1563         int cfg_len = rec->lrh_len;
1564         char *cfg_buf = (char*) (rec + 1);
1565         struct lustre_cfg *lcfg;
1566         int rc = 0;
1567         struct llog_handle *mdt_llh = NULL;
1568         static int got_an_osc_or_mdc = 0;
1569         /* 0: not found any osc/mdc;
1570            1: found osc;
1571            2: found mdc;
1572         */
1573         static int last_step = -1;
1574         int cplen = 0;
1575
1576         ENTRY;
1577
1578         mti = ((struct temp_comp*)data)->comp_mti;
1579         tmti = ((struct temp_comp*)data)->comp_tmti;
1580         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1581         obd = ((struct temp_comp *)data)->comp_obd;
1582         mgs = lu2mgs_dev(obd->obd_lu_dev);
1583         LASSERT(mgs);
1584
1585         if (rec->lrh_type != OBD_CFG_REC) {
1586                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1587                 RETURN(-EINVAL);
1588         }
1589
1590         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1591         if (rc) {
1592                 CERROR("Insane cfg\n");
1593                 RETURN(rc);
1594         }
1595
1596         lcfg = (struct lustre_cfg *)cfg_buf;
1597
1598         if (lcfg->lcfg_command == LCFG_MARKER) {
1599                 struct cfg_marker *marker;
1600                 marker = lustre_cfg_buf(lcfg, 1);
1601                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1602                     (marker->cm_flags & CM_START) &&
1603                      !(marker->cm_flags & CM_SKIP)) {
1604                         got_an_osc_or_mdc = 1;
1605                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1606                                         sizeof(tmti->mti_svname));
1607                         if (cplen >= sizeof(tmti->mti_svname))
1608                                 RETURN(-E2BIG);
1609                         rc = record_start_log(env, mgs, &mdt_llh,
1610                                               mti->mti_svname);
1611                         if (rc)
1612                                 RETURN(rc);
1613                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1614                                            mti->mti_svname, "add osc(copied)");
1615                         record_end_log(env, &mdt_llh);
1616                         last_step = marker->cm_step;
1617                         RETURN(rc);
1618                 }
1619                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1620                     (marker->cm_flags & CM_END) &&
1621                      !(marker->cm_flags & CM_SKIP)) {
1622                         LASSERT(last_step == marker->cm_step);
1623                         last_step = -1;
1624                         got_an_osc_or_mdc = 0;
1625                         memset(tmti, 0, sizeof(*tmti));
1626                         rc = record_start_log(env, mgs, &mdt_llh,
1627                                               mti->mti_svname);
1628                         if (rc)
1629                                 RETURN(rc);
1630                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1631                                            mti->mti_svname, "add osc(copied)");
1632                         record_end_log(env, &mdt_llh);
1633                         RETURN(rc);
1634                 }
1635                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1636                     (marker->cm_flags & CM_START) &&
1637                      !(marker->cm_flags & CM_SKIP)) {
1638                         got_an_osc_or_mdc = 2;
1639                         last_step = marker->cm_step;
1640                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1641                                strlen(marker->cm_tgtname));
1642
1643                         RETURN(rc);
1644                 }
1645                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1646                     (marker->cm_flags & CM_END) &&
1647                      !(marker->cm_flags & CM_SKIP)) {
1648                         LASSERT(last_step == marker->cm_step);
1649                         last_step = -1;
1650                         got_an_osc_or_mdc = 0;
1651                         memset(tmti, 0, sizeof(*tmti));
1652                         RETURN(rc);
1653                 }
1654         }
1655
1656         if (got_an_osc_or_mdc == 0 || last_step < 0)
1657                 RETURN(rc);
1658
1659         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1660                 uint64_t nodenid = lcfg->lcfg_nid;
1661
1662                 if (strlen(tmti->mti_uuid) == 0) {
1663                         /* target uuid not set, this config record is before
1664                          * LCFG_SETUP, this nid is one of target node nid.
1665                          */
1666                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1667                         tmti->mti_nid_count++;
1668                 } else {
1669                         /* failover node nid */
1670                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1671                                        libcfs_nid2str(nodenid));
1672                 }
1673
1674                 RETURN(rc);
1675         }
1676
1677         if (lcfg->lcfg_command == LCFG_SETUP) {
1678                 char *target;
1679
1680                 target = lustre_cfg_string(lcfg, 1);
1681                 memcpy(tmti->mti_uuid, target, strlen(target));
1682                 RETURN(rc);
1683         }
1684
1685         /* ignore client side sptlrpc_conf_log */
1686         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1687                 RETURN(rc);
1688
1689         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1690                 int index;
1691
1692                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1693                         RETURN (-EINVAL);
1694
1695                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1696                        strlen(mti->mti_fsname));
1697                 tmti->mti_stripe_index = index;
1698
1699                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1700                                               mti->mti_stripe_index,
1701                                               mti->mti_svname);
1702                 memset(tmti, 0, sizeof(*tmti));
1703                 RETURN(rc);
1704         }
1705
1706         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1707                 int index;
1708                 char mdt_index[9];
1709                 char *logname, *lovname;
1710
1711                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1712                                              mti->mti_stripe_index);
1713                 if (rc)
1714                         RETURN(rc);
1715                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1716
1717                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1718                         name_destroy(&logname);
1719                         name_destroy(&lovname);
1720                         RETURN(-EINVAL);
1721                 }
1722
1723                 tmti->mti_stripe_index = index;
1724                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1725                                          mdt_index, lovname,
1726                                          LUSTRE_SP_MDT, 0);
1727                 name_destroy(&logname);
1728                 name_destroy(&lovname);
1729                 RETURN(rc);
1730         }
1731         RETURN(rc);
1732 }
1733
1734 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1735 /* stealed from mgs_get_fsdb_from_llog*/
1736 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1737                                               struct mgs_device *mgs,
1738                                               char *client_name,
1739                                               struct temp_comp* comp)
1740 {
1741         struct llog_handle *loghandle;
1742         struct mgs_target_info *tmti;
1743         struct llog_ctxt *ctxt;
1744         int rc;
1745
1746         ENTRY;
1747
1748         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1749         LASSERT(ctxt != NULL);
1750
1751         OBD_ALLOC_PTR(tmti);
1752         if (tmti == NULL)
1753                 GOTO(out_ctxt, rc = -ENOMEM);
1754
1755         comp->comp_tmti = tmti;
1756         comp->comp_obd = mgs->mgs_obd;
1757
1758         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1759                        LLOG_OPEN_EXISTS);
1760         if (rc < 0) {
1761                 if (rc == -ENOENT)
1762                         rc = 0;
1763                 GOTO(out_pop, rc);
1764         }
1765
1766         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1767         if (rc)
1768                 GOTO(out_close, rc);
1769
1770         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1771                                   (void *)comp, NULL, false);
1772         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1773 out_close:
1774         llog_close(env, loghandle);
1775 out_pop:
1776         OBD_FREE_PTR(tmti);
1777 out_ctxt:
1778         llog_ctxt_put(ctxt);
1779         RETURN(rc);
1780 }
1781
1782 /* lmv is the second thing for client logs */
1783 /* copied from mgs_write_log_lov. Please refer to that.  */
1784 static int mgs_write_log_lmv(const struct lu_env *env,
1785                              struct mgs_device *mgs,
1786                              struct fs_db *fsdb,
1787                              struct mgs_target_info *mti,
1788                              char *logname, char *lmvname)
1789 {
1790         struct llog_handle *llh = NULL;
1791         struct lmv_desc *lmvdesc;
1792         char *uuid;
1793         int rc = 0;
1794         ENTRY;
1795
1796         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1797
1798         OBD_ALLOC_PTR(lmvdesc);
1799         if (lmvdesc == NULL)
1800                 RETURN(-ENOMEM);
1801         lmvdesc->ld_active_tgt_count = 0;
1802         lmvdesc->ld_tgt_count = 0;
1803         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1804         uuid = (char *)lmvdesc->ld_uuid.uuid;
1805
1806         rc = record_start_log(env, mgs, &llh, logname);
1807         if (rc)
1808                 GOTO(out_free, rc);
1809         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1810         if (rc)
1811                 GOTO(out_end, rc);
1812         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1813         if (rc)
1814                 GOTO(out_end, rc);
1815         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1816         if (rc)
1817                 GOTO(out_end, rc);
1818         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1819         if (rc)
1820                 GOTO(out_end, rc);
1821 out_end:
1822         record_end_log(env, &llh);
1823 out_free:
1824         OBD_FREE_PTR(lmvdesc);
1825         RETURN(rc);
1826 }
1827
1828 /* lov is the first thing in the mdt and client logs */
1829 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1830                              struct fs_db *fsdb, struct mgs_target_info *mti,
1831                              char *logname, char *lovname)
1832 {
1833         struct llog_handle *llh = NULL;
1834         struct lov_desc *lovdesc;
1835         char *uuid;
1836         int rc = 0;
1837         ENTRY;
1838
1839         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1840
1841         /*
1842         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1843         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1844               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1845         */
1846
1847         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1848         OBD_ALLOC_PTR(lovdesc);
1849         if (lovdesc == NULL)
1850                 RETURN(-ENOMEM);
1851         lovdesc->ld_magic = LOV_DESC_MAGIC;
1852         lovdesc->ld_tgt_count = 0;
1853         /* Defaults.  Can be changed later by lcfg config_param */
1854         lovdesc->ld_default_stripe_count = 1;
1855         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1856         lovdesc->ld_default_stripe_size = LOV_DESC_STRIPE_SIZE_DEFAULT;
1857         lovdesc->ld_default_stripe_offset = -1;
1858         lovdesc->ld_qos_maxage = LOV_DESC_QOS_MAXAGE_DEFAULT;
1859         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1860         /* can these be the same? */
1861         uuid = (char *)lovdesc->ld_uuid.uuid;
1862
1863         /* This should always be the first entry in a log.
1864         rc = mgs_clear_log(obd, logname); */
1865         rc = record_start_log(env, mgs, &llh, logname);
1866         if (rc)
1867                 GOTO(out_free, rc);
1868         /* FIXME these should be a single journal transaction */
1869         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1870         if (rc)
1871                 GOTO(out_end, rc);
1872         rc = record_attach(env, llh, lovname, "lov", uuid);
1873         if (rc)
1874                 GOTO(out_end, rc);
1875         rc = record_lov_setup(env, llh, lovname, lovdesc);
1876         if (rc)
1877                 GOTO(out_end, rc);
1878         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1879         if (rc)
1880                 GOTO(out_end, rc);
1881         EXIT;
1882 out_end:
1883         record_end_log(env, &llh);
1884 out_free:
1885         OBD_FREE_PTR(lovdesc);
1886         return rc;
1887 }
1888
1889 /* add failnids to open log */
1890 static int mgs_write_log_failnids(const struct lu_env *env,
1891                                   struct mgs_target_info *mti,
1892                                   struct llog_handle *llh,
1893                                   char *cliname)
1894 {
1895         char *failnodeuuid = NULL;
1896         char *ptr = mti->mti_params;
1897         lnet_nid_t nid;
1898         int rc = 0;
1899
1900         /*
1901         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1902         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1903         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1904         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1905         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1906         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1907         */
1908
1909         /* Pull failnid info out of params string */
1910         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1911                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1912                         if (failnodeuuid == NULL) {
1913                                 /* We don't know the failover node name,
1914                                    so just use the first nid as the uuid */
1915                                 rc = name_create(&failnodeuuid,
1916                                                  libcfs_nid2str(nid), "");
1917                                 if (rc)
1918                                         return rc;
1919                         }
1920                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1921                                "client %s\n", libcfs_nid2str(nid),
1922                                failnodeuuid, cliname);
1923                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1924                 }
1925                 if (failnodeuuid) {
1926                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1927                         name_destroy(&failnodeuuid);
1928                         failnodeuuid = NULL;
1929                 }
1930         }
1931
1932         return rc;
1933 }
1934
1935 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1936                                     struct mgs_device *mgs,
1937                                     struct fs_db *fsdb,
1938                                     struct mgs_target_info *mti,
1939                                     char *logname, char *lmvname)
1940 {
1941         struct llog_handle *llh = NULL;
1942         char *mdcname = NULL;
1943         char *nodeuuid = NULL;
1944         char *mdcuuid = NULL;
1945         char *lmvuuid = NULL;
1946         char index[6];
1947         int i, rc;
1948         ENTRY;
1949
1950         if (mgs_log_is_empty(env, mgs, logname)) {
1951                 CERROR("log is empty! Logical error\n");
1952                 RETURN(-EINVAL);
1953         }
1954
1955         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1956                mti->mti_svname, logname, lmvname);
1957
1958         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1959         if (rc)
1960                 RETURN(rc);
1961         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1962         if (rc)
1963                 GOTO(out_free, rc);
1964         rc = name_create(&mdcuuid, mdcname, "_UUID");
1965         if (rc)
1966                 GOTO(out_free, rc);
1967         rc = name_create(&lmvuuid, lmvname, "_UUID");
1968         if (rc)
1969                 GOTO(out_free, rc);
1970
1971         rc = record_start_log(env, mgs, &llh, logname);
1972         if (rc)
1973                 GOTO(out_free, rc);
1974         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1975                            "add mdc");
1976         if (rc)
1977                 GOTO(out_end, rc);
1978         for (i = 0; i < mti->mti_nid_count; i++) {
1979                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1980                        libcfs_nid2str(mti->mti_nids[i]));
1981
1982                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1983                 if (rc)
1984                         GOTO(out_end, rc);
1985         }
1986
1987         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1988         if (rc)
1989                 GOTO(out_end, rc);
1990         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1991         if (rc)
1992                 GOTO(out_end, rc);
1993         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1994         if (rc)
1995                 GOTO(out_end, rc);
1996         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1997         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1998                             index, "1");
1999         if (rc)
2000                 GOTO(out_end, rc);
2001         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2002                            "add mdc");
2003         if (rc)
2004                 GOTO(out_end, rc);
2005 out_end:
2006         record_end_log(env, &llh);
2007 out_free:
2008         name_destroy(&lmvuuid);
2009         name_destroy(&mdcuuid);
2010         name_destroy(&mdcname);
2011         name_destroy(&nodeuuid);
2012         RETURN(rc);
2013 }
2014
2015 static inline int name_create_lov(char **lovname, char *mdtname,
2016                                   struct fs_db *fsdb, int index)
2017 {
2018         /* COMPAT_180 */
2019         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2020                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2021         else
2022                 return name_create(lovname, mdtname, "-mdtlov");
2023 }
2024
2025 static int name_create_mdt_and_lov(char **logname, char **lovname,
2026                                    struct fs_db *fsdb, int i)
2027 {
2028         int rc;
2029
2030         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2031         if (rc)
2032                 return rc;
2033         /* COMPAT_180 */
2034         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2035                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2036         else
2037                 rc = name_create(lovname, *logname, "-mdtlov");
2038         if (rc) {
2039                 name_destroy(logname);
2040                 *logname = NULL;
2041         }
2042         return rc;
2043 }
2044
2045 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2046                                       struct fs_db *fsdb, int i)
2047 {
2048         char suffix[16];
2049
2050         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2051                 sprintf(suffix, "-osc");
2052         else
2053                 sprintf(suffix, "-osc-MDT%04x", i);
2054         return name_create(oscname, ostname, suffix);
2055 }
2056
2057 /* add new mdc to already existent MDS */
2058 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2059                                     struct mgs_device *mgs,
2060                                     struct fs_db *fsdb,
2061                                     struct mgs_target_info *mti,
2062                                     int mdt_index, char *logname)
2063 {
2064         struct llog_handle      *llh = NULL;
2065         char    *nodeuuid = NULL;
2066         char    *ospname = NULL;
2067         char    *lovuuid = NULL;
2068         char    *mdtuuid = NULL;
2069         char    *svname = NULL;
2070         char    *mdtname = NULL;
2071         char    *lovname = NULL;
2072         char    index_str[16];
2073         int     i, rc;
2074
2075         ENTRY;
2076         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2077                 CERROR("log is empty! Logical error\n");
2078                 RETURN (-EINVAL);
2079         }
2080
2081         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2082                logname);
2083
2084         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2085         if (rc)
2086                 RETURN(rc);
2087
2088         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2089         if (rc)
2090                 GOTO(out_destory, rc);
2091
2092         rc = name_create(&svname, mdtname, "-osp");
2093         if (rc)
2094                 GOTO(out_destory, rc);
2095
2096         sprintf(index_str, "-MDT%04x", mdt_index);
2097         rc = name_create(&ospname, svname, index_str);
2098         if (rc)
2099                 GOTO(out_destory, rc);
2100
2101         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2102         if (rc)
2103                 GOTO(out_destory, rc);
2104
2105         rc = name_create(&lovuuid, lovname, "_UUID");
2106         if (rc)
2107                 GOTO(out_destory, rc);
2108
2109         rc = name_create(&mdtuuid, mdtname, "_UUID");
2110         if (rc)
2111                 GOTO(out_destory, rc);
2112
2113         rc = record_start_log(env, mgs, &llh, logname);
2114         if (rc)
2115                 GOTO(out_destory, rc);
2116
2117         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2118                            "add osp");
2119         if (rc)
2120                 GOTO(out_destory, rc);
2121
2122         for (i = 0; i < mti->mti_nid_count; i++) {
2123                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2124                        libcfs_nid2str(mti->mti_nids[i]));
2125                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2126                 if (rc)
2127                         GOTO(out_end, rc);
2128         }
2129
2130         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2131         if (rc)
2132                 GOTO(out_end, rc);
2133
2134         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2135                           NULL, NULL);
2136         if (rc)
2137                 GOTO(out_end, rc);
2138
2139         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2140         if (rc)
2141                 GOTO(out_end, rc);
2142
2143         /* Add mdc(osp) to lod */
2144         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2145                  mti->mti_stripe_index);
2146         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2147                          index_str, "1", NULL);
2148         if (rc)
2149                 GOTO(out_end, rc);
2150
2151         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2152         if (rc)
2153                 GOTO(out_end, rc);
2154
2155 out_end:
2156         record_end_log(env, &llh);
2157
2158 out_destory:
2159         name_destroy(&mdtuuid);
2160         name_destroy(&lovuuid);
2161         name_destroy(&lovname);
2162         name_destroy(&ospname);
2163         name_destroy(&svname);
2164         name_destroy(&nodeuuid);
2165         name_destroy(&mdtname);
2166         RETURN(rc);
2167 }
2168
2169 static int mgs_write_log_mdt0(const struct lu_env *env,
2170                               struct mgs_device *mgs,
2171                               struct fs_db *fsdb,
2172                               struct mgs_target_info *mti)
2173 {
2174         char *log = mti->mti_svname;
2175         struct llog_handle *llh = NULL;
2176         char *uuid, *lovname;
2177         char mdt_index[6];
2178         char *ptr = mti->mti_params;
2179         int rc = 0, failout = 0;
2180         ENTRY;
2181
2182         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2183         if (uuid == NULL)
2184                 RETURN(-ENOMEM);
2185
2186         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2187                 failout = (strncmp(ptr, "failout", 7) == 0);
2188
2189         rc = name_create(&lovname, log, "-mdtlov");
2190         if (rc)
2191                 GOTO(out_free, rc);
2192         if (mgs_log_is_empty(env, mgs, log)) {
2193                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2194                 if (rc)
2195                         GOTO(out_lod, rc);
2196         }
2197
2198         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2199
2200         rc = record_start_log(env, mgs, &llh, log);
2201         if (rc)
2202                 GOTO(out_lod, rc);
2203
2204         /* add MDT itself */
2205
2206         /* FIXME this whole fn should be a single journal transaction */
2207         sprintf(uuid, "%s_UUID", log);
2208         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2209         if (rc)
2210                 GOTO(out_lod, rc);
2211         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2212         if (rc)
2213                 GOTO(out_end, rc);
2214         rc = record_mount_opt(env, llh, log, lovname, NULL);
2215         if (rc)
2216                 GOTO(out_end, rc);
2217         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2218                         failout ? "n" : "f");
2219         if (rc)
2220                 GOTO(out_end, rc);
2221         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2222         if (rc)
2223                 GOTO(out_end, rc);
2224 out_end:
2225         record_end_log(env, &llh);
2226 out_lod:
2227         name_destroy(&lovname);
2228 out_free:
2229         OBD_FREE(uuid, sizeof(struct obd_uuid));
2230         RETURN(rc);
2231 }
2232
2233 /* envelope method for all layers log */
2234 static int mgs_write_log_mdt(const struct lu_env *env,
2235                              struct mgs_device *mgs,
2236                              struct fs_db *fsdb,
2237                              struct mgs_target_info *mti)
2238 {
2239         struct mgs_thread_info *mgi = mgs_env_info(env);
2240         struct llog_handle *llh = NULL;
2241         char *cliname;
2242         int rc, i = 0;
2243         ENTRY;
2244
2245         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2246
2247         if (mti->mti_uuid[0] == '\0') {
2248                 /* Make up our own uuid */
2249                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2250                          "%s_UUID", mti->mti_svname);
2251         }
2252
2253         /* add mdt */
2254         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2255         if (rc)
2256                 RETURN(rc);
2257         /* Append the mdt info to the client log */
2258         rc = name_create(&cliname, mti->mti_fsname, "-client");
2259         if (rc)
2260                 RETURN(rc);
2261
2262         if (mgs_log_is_empty(env, mgs, cliname)) {
2263                 /* Start client log */
2264                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2265                                        fsdb->fsdb_clilov);
2266                 if (rc)
2267                         GOTO(out_free, rc);
2268                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2269                                        fsdb->fsdb_clilmv);
2270                 if (rc)
2271                         GOTO(out_free, rc);
2272         }
2273
2274         /*
2275         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2276         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2277         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2278         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2279         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2280         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2281         */
2282
2283                 /* copy client info about lov/lmv */
2284                 mgi->mgi_comp.comp_mti = mti;
2285                 mgi->mgi_comp.comp_fsdb = fsdb;
2286
2287                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2288                                                         &mgi->mgi_comp);
2289                 if (rc)
2290                         GOTO(out_free, rc);
2291                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2292                                               fsdb->fsdb_clilmv);
2293                 if (rc)
2294                         GOTO(out_free, rc);
2295
2296                 /* add mountopts */
2297                 rc = record_start_log(env, mgs, &llh, cliname);
2298                 if (rc)
2299                         GOTO(out_free, rc);
2300
2301                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2302                                    "mount opts");
2303                 if (rc)
2304                         GOTO(out_end, rc);
2305                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2306                                       fsdb->fsdb_clilmv);
2307                 if (rc)
2308                         GOTO(out_end, rc);
2309                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2310                                    "mount opts");
2311
2312         if (rc)
2313                 GOTO(out_end, rc);
2314
2315         /* for_all_existing_mdt except current one */
2316         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2317                 if (i !=  mti->mti_stripe_index &&
2318                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2319                         char *logname;
2320
2321                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2322                         if (rc)
2323                                 GOTO(out_end, rc);
2324
2325                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2326                                                       i, logname);
2327                         name_destroy(&logname);
2328                         if (rc)
2329                                 GOTO(out_end, rc);
2330                 }
2331         }
2332 out_end:
2333         record_end_log(env, &llh);
2334 out_free:
2335         name_destroy(&cliname);
2336         RETURN(rc);
2337 }
2338
2339 /* Add the ost info to the client/mdt lov */
2340 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2341                                     struct mgs_device *mgs, struct fs_db *fsdb,
2342                                     struct mgs_target_info *mti,
2343                                     char *logname, char *suffix, char *lovname,
2344                                     enum lustre_sec_part sec_part, int flags)
2345 {
2346         struct llog_handle *llh = NULL;
2347         char *nodeuuid = NULL;
2348         char *oscname = NULL;
2349         char *oscuuid = NULL;
2350         char *lovuuid = NULL;
2351         char *svname = NULL;
2352         char index[6];
2353         int i, rc;
2354
2355         ENTRY;
2356         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2357                mti->mti_svname, logname);
2358
2359         if (mgs_log_is_empty(env, mgs, logname)) {
2360                 CERROR("log is empty! Logical error\n");
2361                 RETURN (-EINVAL);
2362         }
2363
2364         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2365         if (rc)
2366                 RETURN(rc);
2367         rc = name_create(&svname, mti->mti_svname, "-osc");
2368         if (rc)
2369                 GOTO(out_free, rc);
2370
2371         /* for the system upgraded from old 1.8, keep using the old osc naming
2372          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2373         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2374                 rc = name_create(&oscname, svname, "");
2375         else
2376                 rc = name_create(&oscname, svname, suffix);
2377         if (rc)
2378                 GOTO(out_free, rc);
2379
2380         rc = name_create(&oscuuid, oscname, "_UUID");
2381         if (rc)
2382                 GOTO(out_free, rc);
2383         rc = name_create(&lovuuid, lovname, "_UUID");
2384         if (rc)
2385                 GOTO(out_free, rc);
2386
2387
2388         /*
2389         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2390         multihomed (#4)
2391         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2392         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2393         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2394         failover (#6,7)
2395         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2396         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2397         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2398         */
2399
2400         rc = record_start_log(env, mgs, &llh, logname);
2401         if (rc)
2402                 GOTO(out_free, rc);
2403
2404         /* FIXME these should be a single journal transaction */
2405         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2406                            "add osc");
2407         if (rc)
2408                 GOTO(out_end, rc);
2409
2410         /* NB: don't change record order, because upon MDT steal OSC config
2411          * from client, it treats all nids before LCFG_SETUP as target nids
2412          * (multiple interfaces), while nids after as failover node nids.
2413          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2414          */
2415         for (i = 0; i < mti->mti_nid_count; i++) {
2416                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2417                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2418                 if (rc)
2419                         GOTO(out_end, rc);
2420         }
2421         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2422         if (rc)
2423                 GOTO(out_end, rc);
2424         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2425         if (rc)
2426                 GOTO(out_end, rc);
2427         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2428         if (rc)
2429                 GOTO(out_end, rc);
2430
2431         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2432
2433         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2434         if (rc)
2435                 GOTO(out_end, rc);
2436         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2437                            "add osc");
2438         if (rc)
2439                 GOTO(out_end, rc);
2440 out_end:
2441         record_end_log(env, &llh);
2442 out_free:
2443         name_destroy(&lovuuid);
2444         name_destroy(&oscuuid);
2445         name_destroy(&oscname);
2446         name_destroy(&svname);
2447         name_destroy(&nodeuuid);
2448         RETURN(rc);
2449 }
2450
2451 static int mgs_write_log_ost(const struct lu_env *env,
2452                              struct mgs_device *mgs, struct fs_db *fsdb,
2453                              struct mgs_target_info *mti)
2454 {
2455         struct llog_handle *llh = NULL;
2456         char *logname, *lovname;
2457         char *ptr = mti->mti_params;
2458         int rc, flags = 0, failout = 0, i;
2459         ENTRY;
2460
2461         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2462
2463         /* The ost startup log */
2464
2465         /* If the ost log already exists, that means that someone reformatted
2466            the ost and it called target_add again. */
2467         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2468                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2469                                    "exists, yet the server claims it never "
2470                                    "registered. It may have been reformatted, "
2471                                    "or the index changed. writeconf the MDT to "
2472                                    "regenerate all logs.\n", mti->mti_svname);
2473                 RETURN(-EALREADY);
2474         }
2475
2476         /*
2477         attach obdfilter ost1 ost1_UUID
2478         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2479         */
2480         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2481                 failout = (strncmp(ptr, "failout", 7) == 0);
2482         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2483         if (rc)
2484                 RETURN(rc);
2485         /* FIXME these should be a single journal transaction */
2486         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2487         if (rc)
2488                 GOTO(out_end, rc);
2489         if (*mti->mti_uuid == '\0')
2490                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2491                          "%s_UUID", mti->mti_svname);
2492         rc = record_attach(env, llh, mti->mti_svname,
2493                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2494         if (rc)
2495                 GOTO(out_end, rc);
2496         rc = record_setup(env, llh, mti->mti_svname,
2497                           "dev"/*ignored*/, "type"/*ignored*/,
2498                           failout ? "n" : "f", 0/*options*/);
2499         if (rc)
2500                 GOTO(out_end, rc);
2501         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2502         if (rc)
2503                 GOTO(out_end, rc);
2504 out_end:
2505         record_end_log(env, &llh);
2506         if (rc)
2507                 RETURN(rc);
2508         /* We also have to update the other logs where this osc is part of
2509            the lov */
2510
2511         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2512                 /* If we're upgrading, the old mdt log already has our
2513                    entry. Let's do a fake one for fun. */
2514                 /* Note that we can't add any new failnids, since we don't
2515                    know the old osc names. */
2516                 flags = CM_SKIP | CM_UPGRADE146;
2517
2518         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2519                 /* If the update flag isn't set, don't update client/mdt
2520                    logs. */
2521                 flags |= CM_SKIP;
2522                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2523                               "the MDT first to regenerate it.\n",
2524                               mti->mti_svname);
2525         }
2526
2527         /* Add ost to all MDT lov defs */
2528         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2529                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2530                         char mdt_index[9];
2531
2532                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2533                                                      i);
2534                         if (rc)
2535                                 RETURN(rc);
2536                         sprintf(mdt_index, "-MDT%04x", i);
2537                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2538                                                       logname, mdt_index,
2539                                                       lovname, LUSTRE_SP_MDT,
2540                                                       flags);
2541                         name_destroy(&logname);
2542                         name_destroy(&lovname);
2543                         if (rc)
2544                                 RETURN(rc);
2545                 }
2546         }
2547
2548         /* Append ost info to the client log */
2549         rc = name_create(&logname, mti->mti_fsname, "-client");
2550         if (rc)
2551                 RETURN(rc);
2552         if (mgs_log_is_empty(env, mgs, logname)) {
2553                 /* Start client log */
2554                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2555                                        fsdb->fsdb_clilov);
2556                 if (rc)
2557                         GOTO(out_free, rc);
2558                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2559                                        fsdb->fsdb_clilmv);
2560                 if (rc)
2561                         GOTO(out_free, rc);
2562         }
2563         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2564                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2565 out_free:
2566         name_destroy(&logname);
2567         RETURN(rc);
2568 }
2569
2570 static __inline__ int mgs_param_empty(char *ptr)
2571 {
2572         char *tmp;
2573
2574         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2575                 return 1;
2576         return 0;
2577 }
2578
2579 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2580                                           struct mgs_device *mgs,
2581                                           struct fs_db *fsdb,
2582                                           struct mgs_target_info *mti,
2583                                           char *logname, char *cliname)
2584 {
2585         int rc;
2586         struct llog_handle *llh = NULL;
2587
2588         if (mgs_param_empty(mti->mti_params)) {
2589                 /* Remove _all_ failnids */
2590                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2591                                 mti->mti_svname, "add failnid", CM_SKIP);
2592                 return rc < 0 ? rc : 0;
2593         }
2594
2595         /* Otherwise failover nids are additive */
2596         rc = record_start_log(env, mgs, &llh, logname);
2597         if (rc)
2598                 return rc;
2599                 /* FIXME this should be a single journal transaction */
2600         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2601                            "add failnid");
2602         if (rc)
2603                 goto out_end;
2604         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2605         if (rc)
2606                 goto out_end;
2607         rc = record_marker(env, llh, fsdb, CM_END,
2608                            mti->mti_svname, "add failnid");
2609 out_end:
2610         record_end_log(env, &llh);
2611         return rc;
2612 }
2613
2614
2615 /* Add additional failnids to an existing log.
2616    The mdc/osc must have been added to logs first */
2617 /* tcp nids must be in dotted-quad ascii -
2618    we can't resolve hostnames from the kernel. */
2619 static int mgs_write_log_add_failnid(const struct lu_env *env,
2620                                      struct mgs_device *mgs,
2621                                      struct fs_db *fsdb,
2622                                      struct mgs_target_info *mti)
2623 {
2624         char *logname, *cliname;
2625         int rc;
2626         ENTRY;
2627
2628         /* FIXME we currently can't erase the failnids
2629          * given when a target first registers, since they aren't part of
2630          * an "add uuid" stanza */
2631
2632         /* Verify that we know about this target */
2633         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2634                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2635                                    "yet. It must be started before failnids "
2636                                    "can be added.\n", mti->mti_svname);
2637                 RETURN(-ENOENT);
2638         }
2639
2640         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2641         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2642                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2643         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2644                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2645         } else {
2646                 RETURN(-EINVAL);
2647         }
2648         if (rc)
2649                 RETURN(rc);
2650         /* Add failover nids to the client log */
2651         rc = name_create(&logname, mti->mti_fsname, "-client");
2652         if (rc) {
2653                 name_destroy(&cliname);
2654                 RETURN(rc);
2655         }
2656         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2657         name_destroy(&logname);
2658         name_destroy(&cliname);
2659         if (rc)
2660                 RETURN(rc);
2661
2662         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2663                 /* Add OST failover nids to the MDT logs as well */
2664                 int i;
2665
2666                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2667                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2668                                 continue;
2669                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2670                         if (rc)
2671                                 RETURN(rc);
2672                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2673                                                  fsdb, i);
2674                         if (rc) {
2675                                 name_destroy(&logname);
2676                                 RETURN(rc);
2677                         }
2678                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2679                                                             mti, logname,
2680                                                             cliname);
2681                         name_destroy(&cliname);
2682                         name_destroy(&logname);
2683                         if (rc)
2684                                 RETURN(rc);
2685                 }
2686         }
2687
2688         RETURN(rc);
2689 }
2690
2691 static int mgs_wlp_lcfg(const struct lu_env *env,
2692                         struct mgs_device *mgs, struct fs_db *fsdb,
2693                         struct mgs_target_info *mti,
2694                         char *logname, struct lustre_cfg_bufs *bufs,
2695                         char *tgtname, char *ptr)
2696 {
2697         char comment[MTI_NAME_MAXLEN];
2698         char *tmp;
2699         struct lustre_cfg *lcfg;
2700         int rc, del;
2701
2702         /* Erase any old settings of this same parameter */
2703         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2704         comment[MTI_NAME_MAXLEN - 1] = 0;
2705         /* But don't try to match the value. */
2706         if ((tmp = strchr(comment, '=')))
2707             *tmp = 0;
2708         /* FIXME we should skip settings that are the same as old values */
2709         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2710         if (rc < 0)
2711                 return rc;
2712         del = mgs_param_empty(ptr);
2713
2714         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2715                       "Sett" : "Modify", tgtname, comment, logname);
2716         if (del)
2717                 return rc;
2718
2719         lustre_cfg_bufs_reset(bufs, tgtname);
2720         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2721         if (mti->mti_flags & LDD_F_PARAM2)
2722                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2723
2724         lcfg = lustre_cfg_new((mti->mti_flags & LDD_F_PARAM2) ?
2725                               LCFG_SET_PARAM : LCFG_PARAM, bufs);
2726
2727         if (!lcfg)
2728                 return -ENOMEM;
2729         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2730         lustre_cfg_free(lcfg);
2731         return rc;
2732 }
2733
2734 static int mgs_write_log_param2(const struct lu_env *env,
2735                                 struct mgs_device *mgs,
2736                                 struct fs_db *fsdb,
2737                                 struct mgs_target_info *mti, char *ptr)
2738 {
2739         struct lustre_cfg_bufs  bufs;
2740         int                     rc = 0;
2741         ENTRY;
2742
2743         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2744         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2745                           mti->mti_svname, ptr);
2746
2747         RETURN(rc);
2748 }
2749
2750 /* write global variable settings into log */
2751 static int mgs_write_log_sys(const struct lu_env *env,
2752                              struct mgs_device *mgs, struct fs_db *fsdb,
2753                              struct mgs_target_info *mti, char *sys, char *ptr)
2754 {
2755         struct mgs_thread_info *mgi = mgs_env_info(env);
2756         struct lustre_cfg *lcfg;
2757         char *tmp, sep;
2758         int rc, cmd, convert = 1;
2759
2760         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2761                 cmd = LCFG_SET_TIMEOUT;
2762         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2763                 cmd = LCFG_SET_LDLM_TIMEOUT;
2764         /* Check for known params here so we can return error to lctl */
2765         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2766                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2767                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2768                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2769                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2770                 cmd = LCFG_PARAM;
2771         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2772                 convert = 0; /* Don't convert string value to integer */
2773                 cmd = LCFG_PARAM;
2774         } else {
2775                 return -EINVAL;
2776         }
2777
2778         if (mgs_param_empty(ptr))
2779                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2780         else
2781                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2782
2783         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2784         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2785         if (!convert && *tmp != '\0')
2786                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2787         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2788         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2789         /* truncate the comment to the parameter name */
2790         ptr = tmp - 1;
2791         sep = *ptr;
2792         *ptr = '\0';
2793         /* modify all servers and clients */
2794         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2795                                       *tmp == '\0' ? NULL : lcfg,
2796                                       mti->mti_fsname, sys, 0);
2797         if (rc == 0 && *tmp != '\0') {
2798                 switch (cmd) {
2799                 case LCFG_SET_TIMEOUT:
2800                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2801                                 class_process_config(lcfg);
2802                         break;
2803                 case LCFG_SET_LDLM_TIMEOUT:
2804                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2805                                 class_process_config(lcfg);
2806                         break;
2807                 default:
2808                         break;
2809                 }
2810         }
2811         *ptr = sep;
2812         lustre_cfg_free(lcfg);
2813         return rc;
2814 }
2815
2816 /* write quota settings into log */
2817 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2818                                struct fs_db *fsdb, struct mgs_target_info *mti,
2819                                char *quota, char *ptr)
2820 {
2821         struct mgs_thread_info  *mgi = mgs_env_info(env);
2822         struct lustre_cfg       *lcfg;
2823         char                    *tmp;
2824         char                     sep;
2825         int                      rc, cmd = LCFG_PARAM;
2826
2827         /* support only 'meta' and 'data' pools so far */
2828         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2829             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2830                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2831                        "& quota.ost are)\n", ptr);
2832                 return -EINVAL;
2833         }
2834
2835         if (*tmp == '\0') {
2836                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2837         } else {
2838                 CDEBUG(D_MGS, "global '%s'\n", quota);
2839
2840                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2841                     strcmp(tmp, "none") != 0) {
2842                         CERROR("enable option(%s) isn't supported\n", tmp);
2843                         return -EINVAL;
2844                 }
2845         }
2846
2847         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2848         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2849         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2850         /* truncate the comment to the parameter name */
2851         ptr = tmp - 1;
2852         sep = *ptr;
2853         *ptr = '\0';
2854
2855         /* XXX we duplicated quota enable information in all server
2856          *     config logs, it should be moved to a separate config
2857          *     log once we cleanup the config log for global param. */
2858         /* modify all servers */
2859         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2860                                       *tmp == '\0' ? NULL : lcfg,
2861                                       mti->mti_fsname, quota, 1);
2862         *ptr = sep;
2863         lustre_cfg_free(lcfg);
2864         return rc < 0 ? rc : 0;
2865 }
2866
2867 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2868                                    struct mgs_device *mgs,
2869                                    struct fs_db *fsdb,
2870                                    struct mgs_target_info *mti,
2871                                    char *param)
2872 {
2873         struct mgs_thread_info *mgi = mgs_env_info(env);
2874         struct llog_handle     *llh = NULL;
2875         char                   *logname;
2876         char                   *comment, *ptr;
2877         struct lustre_cfg      *lcfg;
2878         int                     rc, len;
2879         ENTRY;
2880
2881         /* get comment */
2882         ptr = strchr(param, '=');
2883         LASSERT(ptr);
2884         len = ptr - param;
2885
2886         OBD_ALLOC(comment, len + 1);
2887         if (comment == NULL)
2888                 RETURN(-ENOMEM);
2889         strncpy(comment, param, len);
2890         comment[len] = '\0';
2891
2892         /* prepare lcfg */
2893         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2894         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2895         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2896         if (lcfg == NULL)
2897                 GOTO(out_comment, rc = -ENOMEM);
2898
2899         /* construct log name */
2900         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2901         if (rc)
2902                 GOTO(out_lcfg, rc);
2903
2904         if (mgs_log_is_empty(env, mgs, logname)) {
2905                 rc = record_start_log(env, mgs, &llh, logname);
2906                 if (rc)
2907                         GOTO(out, rc);
2908                 record_end_log(env, &llh);
2909         }
2910
2911         /* obsolete old one */
2912         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2913                         comment, CM_SKIP);
2914         if (rc < 0)
2915                 GOTO(out, rc);
2916         /* write the new one */
2917         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2918                                   mti->mti_svname, comment);
2919         if (rc)
2920                 CERROR("err %d writing log %s\n", rc, logname);
2921 out:
2922         name_destroy(&logname);
2923 out_lcfg:
2924         lustre_cfg_free(lcfg);
2925 out_comment:
2926         OBD_FREE(comment, len + 1);
2927         RETURN(rc);
2928 }
2929
2930 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2931                                         char *param)
2932 {
2933         char    *ptr;
2934
2935         /* disable the adjustable udesc parameter for now, i.e. use default
2936          * setting that client always ship udesc to MDT if possible. to enable
2937          * it simply remove the following line */
2938         goto error_out;
2939
2940         ptr = strchr(param, '=');
2941         if (ptr == NULL)
2942                 goto error_out;
2943         *ptr++ = '\0';
2944
2945         if (strcmp(param, PARAM_SRPC_UDESC))
2946                 goto error_out;
2947
2948         if (strcmp(ptr, "yes") == 0) {
2949                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2950                 CWARN("Enable user descriptor shipping from client to MDT\n");
2951         } else if (strcmp(ptr, "no") == 0) {
2952                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2953                 CWARN("Disable user descriptor shipping from client to MDT\n");
2954         } else {
2955                 *(ptr - 1) = '=';
2956                 goto error_out;
2957         }
2958         return 0;
2959
2960 error_out:
2961         CERROR("Invalid param: %s\n", param);
2962         return -EINVAL;
2963 }
2964
2965 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2966                                   const char *svname,
2967                                   char *param)
2968 {
2969         struct sptlrpc_rule      rule;
2970         struct sptlrpc_rule_set *rset;
2971         int                      rc;
2972         ENTRY;
2973
2974         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2975                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2976                 RETURN(-EINVAL);
2977         }
2978
2979         if (strncmp(param, PARAM_SRPC_UDESC,
2980                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2981                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2982         }
2983
2984         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2985                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2986                 RETURN(-EINVAL);
2987         }
2988
2989         param += sizeof(PARAM_SRPC_FLVR) - 1;
2990
2991         rc = sptlrpc_parse_rule(param, &rule);
2992         if (rc)
2993                 RETURN(rc);
2994
2995         /* mgs rules implies must be mgc->mgs */
2996         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2997                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2998                      rule.sr_from != LUSTRE_SP_ANY) ||
2999                     (rule.sr_to != LUSTRE_SP_MGS &&
3000                      rule.sr_to != LUSTRE_SP_ANY))
3001                         RETURN(-EINVAL);
3002         }
3003
3004         /* preapre room for this coming rule. svcname format should be:
3005          * - fsname: general rule
3006          * - fsname-tgtname: target-specific rule
3007          */
3008         if (strchr(svname, '-')) {
3009                 struct mgs_tgt_srpc_conf *tgtconf;
3010                 int                       found = 0;
3011
3012                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3013                      tgtconf = tgtconf->mtsc_next) {
3014                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3015                                 found = 1;
3016                                 break;
3017                         }
3018                 }
3019
3020                 if (!found) {
3021                         int name_len;
3022
3023                         OBD_ALLOC_PTR(tgtconf);
3024                         if (tgtconf == NULL)
3025                                 RETURN(-ENOMEM);
3026
3027                         name_len = strlen(svname);
3028
3029                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3030                         if (tgtconf->mtsc_tgt == NULL) {
3031                                 OBD_FREE_PTR(tgtconf);
3032                                 RETURN(-ENOMEM);
3033                         }
3034                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3035
3036                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3037                         fsdb->fsdb_srpc_tgt = tgtconf;
3038                 }
3039
3040                 rset = &tgtconf->mtsc_rset;
3041         } else {
3042                 rset = &fsdb->fsdb_srpc_gen;
3043         }
3044
3045         rc = sptlrpc_rule_set_merge(rset, &rule);
3046
3047         RETURN(rc);
3048 }
3049
3050 static int mgs_srpc_set_param(const struct lu_env *env,
3051                               struct mgs_device *mgs,
3052                               struct fs_db *fsdb,
3053                               struct mgs_target_info *mti,
3054                               char *param)
3055 {
3056         char                   *copy;
3057         int                     rc, copy_size;
3058         ENTRY;
3059
3060 #ifndef HAVE_GSS
3061         RETURN(-EINVAL);
3062 #endif
3063         /* keep a copy of original param, which could be destroied
3064          * during parsing */
3065         copy_size = strlen(param) + 1;
3066         OBD_ALLOC(copy, copy_size);
3067         if (copy == NULL)
3068                 return -ENOMEM;
3069         memcpy(copy, param, copy_size);
3070
3071         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3072         if (rc)
3073                 goto out_free;
3074
3075         /* previous steps guaranteed the syntax is correct */
3076         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3077         if (rc)
3078                 goto out_free;
3079
3080         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3081                 /*
3082                  * for mgs rules, make them effective immediately.
3083                  */
3084                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3085                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3086                                                  &fsdb->fsdb_srpc_gen);
3087         }
3088
3089 out_free:
3090         OBD_FREE(copy, copy_size);
3091         RETURN(rc);
3092 }
3093
3094 struct mgs_srpc_read_data {
3095         struct fs_db   *msrd_fsdb;
3096         int             msrd_skip;
3097 };
3098
3099 static int mgs_srpc_read_handler(const struct lu_env *env,
3100                                  struct llog_handle *llh,
3101                                  struct llog_rec_hdr *rec, void *data)
3102 {
3103         struct mgs_srpc_read_data *msrd = data;
3104         struct cfg_marker         *marker;
3105         struct lustre_cfg         *lcfg = REC_DATA(rec);
3106         char                      *svname, *param;
3107         int                        cfg_len, rc;
3108         ENTRY;
3109
3110         if (rec->lrh_type != OBD_CFG_REC) {
3111                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3112                 RETURN(-EINVAL);
3113         }
3114
3115         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3116                   sizeof(struct llog_rec_tail);
3117
3118         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3119         if (rc) {
3120                 CERROR("Insane cfg\n");
3121                 RETURN(rc);
3122         }
3123
3124         if (lcfg->lcfg_command == LCFG_MARKER) {
3125                 marker = lustre_cfg_buf(lcfg, 1);
3126
3127                 if (marker->cm_flags & CM_START &&
3128                     marker->cm_flags & CM_SKIP)
3129                         msrd->msrd_skip = 1;
3130                 if (marker->cm_flags & CM_END)
3131                         msrd->msrd_skip = 0;
3132
3133                 RETURN(0);
3134         }
3135
3136         if (msrd->msrd_skip)
3137                 RETURN(0);
3138
3139         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3140                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3141                 RETURN(0);
3142         }
3143
3144         svname = lustre_cfg_string(lcfg, 0);
3145         if (svname == NULL) {
3146                 CERROR("svname is empty\n");
3147                 RETURN(0);
3148         }
3149
3150         param = lustre_cfg_string(lcfg, 1);
3151         if (param == NULL) {
3152                 CERROR("param is empty\n");
3153                 RETURN(0);
3154         }
3155
3156         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3157         if (rc)
3158                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3159
3160         RETURN(0);
3161 }
3162
3163 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3164                                 struct mgs_device *mgs,
3165                                 struct fs_db *fsdb)
3166 {
3167         struct llog_handle        *llh = NULL;
3168         struct llog_ctxt          *ctxt;
3169         char                      *logname;
3170         struct mgs_srpc_read_data  msrd;
3171         int                        rc;
3172         ENTRY;
3173
3174         /* construct log name */
3175         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3176         if (rc)
3177                 RETURN(rc);
3178
3179         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3180         LASSERT(ctxt != NULL);
3181
3182         if (mgs_log_is_empty(env, mgs, logname))
3183                 GOTO(out, rc = 0);
3184
3185         rc = llog_open(env, ctxt, &llh, NULL, logname,
3186                        LLOG_OPEN_EXISTS);
3187         if (rc < 0) {
3188                 if (rc == -ENOENT)
3189                         rc = 0;
3190                 GOTO(out, rc);
3191         }
3192
3193         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3194         if (rc)
3195                 GOTO(out_close, rc);
3196
3197         if (llog_get_size(llh) <= 1)
3198                 GOTO(out_close, rc = 0);
3199
3200         msrd.msrd_fsdb = fsdb;
3201         msrd.msrd_skip = 0;
3202
3203         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3204                           NULL);
3205
3206 out_close:
3207         llog_close(env, llh);
3208 out:
3209         llog_ctxt_put(ctxt);
3210         name_destroy(&logname);
3211
3212         if (rc)
3213                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3214         RETURN(rc);
3215 }
3216
3217 /* Permanent settings of all parameters by writing into the appropriate
3218  * configuration logs.
3219  * A parameter with null value ("<param>='\0'") means to erase it out of
3220  * the logs.
3221  */
3222 static int mgs_write_log_param(const struct lu_env *env,
3223                                struct mgs_device *mgs, struct fs_db *fsdb,
3224                                struct mgs_target_info *mti, char *ptr)
3225 {
3226         struct mgs_thread_info *mgi = mgs_env_info(env);
3227         char *logname;
3228         char *tmp;
3229         int rc = 0, rc2 = 0;
3230         ENTRY;
3231
3232         /* For various parameter settings, we have to figure out which logs
3233            care about them (e.g. both mdt and client for lov settings) */
3234         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3235
3236         /* The params are stored in MOUNT_DATA_FILE and modified via
3237            tunefs.lustre, or set using lctl conf_param */
3238
3239         /* Processed in lustre_start_mgc */
3240         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3241                 GOTO(end, rc);
3242
3243         /* Processed in ost/mdt */
3244         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3245                 GOTO(end, rc);
3246
3247         /* Processed in mgs_write_log_ost */
3248         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3249                 if (mti->mti_flags & LDD_F_PARAM) {
3250                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3251                                            "changed with tunefs.lustre"
3252                                            "and --writeconf\n", ptr);
3253                         rc = -EPERM;
3254                 }
3255                 GOTO(end, rc);
3256         }
3257
3258         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3259                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3260                 GOTO(end, rc);
3261         }
3262
3263         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3264                 /* Add a failover nidlist */
3265                 rc = 0;
3266                 /* We already processed failovers params for new
3267                    targets in mgs_write_log_target */
3268                 if (mti->mti_flags & LDD_F_PARAM) {
3269                         CDEBUG(D_MGS, "Adding failnode\n");
3270                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3271                 }
3272                 GOTO(end, rc);
3273         }
3274
3275         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3276                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3277                 GOTO(end, rc);
3278         }
3279
3280         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3281                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3282                 GOTO(end, rc);
3283         }
3284
3285         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3286                 /* active=0 means off, anything else means on */
3287                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3288                 int i;
3289
3290                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3291                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3292                                            "be (de)activated.\n",
3293                                            mti->mti_svname);
3294                         GOTO(end, rc = -EINVAL);
3295                 }
3296                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3297                               flag ? "de": "re", mti->mti_svname);
3298                 /* Modify clilov */
3299                 rc = name_create(&logname, mti->mti_fsname, "-client");
3300                 if (rc)
3301                         GOTO(end, rc);
3302                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3303                                 mti->mti_svname, "add osc", flag);
3304                 name_destroy(&logname);
3305                 if (rc)
3306                         goto active_err;
3307                 /* Modify mdtlov */
3308                 /* Add to all MDT logs for CMD */
3309                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3310                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3311                                 continue;
3312                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3313                         if (rc)
3314                                 GOTO(end, rc);
3315                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3316                                         mti->mti_svname, "add osc", flag);
3317                         name_destroy(&logname);
3318                         if (rc)
3319                                 goto active_err;
3320                 }
3321         active_err:
3322                 if (rc) {
3323                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3324                                            "log (%d). No permanent "
3325                                            "changes were made to the "
3326                                            "config log.\n",
3327                                            mti->mti_svname, rc);
3328                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3329                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3330                                                    " because the log"
3331                                                    "is in the old 1.4"
3332                                                    "style. Consider "
3333                                                    " --writeconf to "
3334                                                    "update the logs.\n");
3335                         GOTO(end, rc);
3336                 }
3337                 /* Fall through to osc proc for deactivating live OSC
3338                    on running MDT / clients. */
3339         }
3340         /* Below here, let obd's XXX_process_config methods handle it */
3341
3342         /* All lov. in proc */
3343         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3344                 char *mdtlovname;
3345
3346                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3347                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3348                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3349                                            "set on the MDT, not %s. "
3350                                            "Ignoring.\n",
3351                                            mti->mti_svname);
3352                         GOTO(end, rc = 0);
3353                 }
3354
3355                 /* Modify mdtlov */
3356                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3357                         GOTO(end, rc = -ENODEV);
3358
3359                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3360                                              mti->mti_stripe_index);
3361                 if (rc)
3362                         GOTO(end, rc);
3363                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3364                                   &mgi->mgi_bufs, mdtlovname, ptr);
3365                 name_destroy(&logname);
3366                 name_destroy(&mdtlovname);
3367                 if (rc)
3368                         GOTO(end, rc);
3369
3370                 /* Modify clilov */
3371                 rc = name_create(&logname, mti->mti_fsname, "-client");
3372                 if (rc)
3373                         GOTO(end, rc);
3374                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3375                                   fsdb->fsdb_clilov, ptr);
3376                 name_destroy(&logname);
3377                 GOTO(end, rc);
3378         }
3379
3380         /* All osc., mdc., llite. params in proc */
3381         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3382             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3383             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3384                 char *cname;
3385
3386                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3387                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3388                                            " cannot be modified. Consider"
3389                                            " updating the configuration with"
3390                                            " --writeconf\n",
3391                                            mti->mti_svname);
3392                         GOTO(end, rc = -EINVAL);
3393                 }
3394                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3395                         rc = name_create(&cname, mti->mti_fsname, "-client");
3396                         /* Add the client type to match the obdname in
3397                            class_config_llog_handler */
3398                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3399                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3400                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3401                         rc = name_create(&cname, mti->mti_svname, "-osc");
3402                 } else {
3403                         GOTO(end, rc = -EINVAL);
3404                 }
3405                 if (rc)
3406                         GOTO(end, rc);
3407
3408                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3409
3410                 /* Modify client */
3411                 rc = name_create(&logname, mti->mti_fsname, "-client");
3412                 if (rc) {
3413                         name_destroy(&cname);
3414                         GOTO(end, rc);
3415                 }
3416                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3417                                   cname, ptr);
3418
3419                 /* osc params affect the MDT as well */
3420                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3421                         int i;
3422
3423                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3424                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3425                                         continue;
3426                                 name_destroy(&cname);
3427                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3428                                                          fsdb, i);
3429                                 name_destroy(&logname);
3430                                 if (rc)
3431                                         break;
3432                                 rc = name_create_mdt(&logname,
3433                                                      mti->mti_fsname, i);
3434                                 if (rc)
3435                                         break;
3436                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3437                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3438                                                           mti, logname,
3439                                                           &mgi->mgi_bufs,
3440                                                           cname, ptr);
3441                                         if (rc)
3442                                                 break;
3443                                 }
3444                         }
3445                 }
3446                 name_destroy(&logname);
3447                 name_destroy(&cname);
3448                 GOTO(end, rc);
3449         }
3450
3451         /* All mdt. params in proc */
3452         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3453                 int i;
3454                 __u32 idx;
3455
3456                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3457                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3458                             MTI_NAME_MAXLEN) == 0)
3459                         /* device is unspecified completely? */
3460                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3461                 else
3462                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3463                 if (rc < 0)
3464                         goto active_err;
3465                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3466                         goto active_err;
3467                 if (rc & LDD_F_SV_ALL) {
3468                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3469                                 if (!test_bit(i,
3470                                                   fsdb->fsdb_mdt_index_map))
3471                                         continue;
3472                                 rc = name_create_mdt(&logname,
3473                                                 mti->mti_fsname, i);
3474                                 if (rc)
3475                                         goto active_err;
3476                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3477                                                   logname, &mgi->mgi_bufs,
3478                                                   logname, ptr);
3479                                 name_destroy(&logname);
3480                                 if (rc)
3481                                         goto active_err;
3482                         }
3483                 } else {
3484                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3485                                           mti->mti_svname, &mgi->mgi_bufs,
3486                                           mti->mti_svname, ptr);
3487                         if (rc)
3488                                 goto active_err;
3489                 }
3490                 GOTO(end, rc);
3491         }
3492
3493         /* All mdd., ost. and osd. params in proc */
3494         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3495             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3496             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3497                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3498                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3499                         GOTO(end, rc = -ENODEV);
3500
3501                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3502                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3503                 GOTO(end, rc);
3504         }
3505
3506         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3507         rc2 = -ENOSYS;
3508
3509 end:
3510         if (rc)
3511                 CERROR("err %d on param '%s'\n", rc, ptr);
3512
3513         RETURN(rc ?: rc2);
3514 }
3515
3516 /* Not implementing automatic failover nid addition at this time. */
3517 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3518                       struct mgs_target_info *mti)
3519 {
3520 #if 0
3521         struct fs_db *fsdb;
3522         int rc;
3523         ENTRY;
3524
3525         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3526         if (rc)
3527                 RETURN(rc);
3528
3529         if (mgs_log_is_empty(obd, mti->mti_svname))
3530                 /* should never happen */
3531                 RETURN(-ENOENT);
3532
3533         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3534
3535         /* FIXME We can just check mti->params to see if we're already in
3536            the failover list.  Modify mti->params for rewriting back at
3537            server_register_target(). */
3538
3539         mutex_lock(&fsdb->fsdb_mutex);
3540         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3541         mutex_unlock(&fsdb->fsdb_mutex);
3542
3543         RETURN(rc);
3544 #endif
3545         return 0;
3546 }
3547
3548 int mgs_write_log_target(const struct lu_env *env,
3549                          struct mgs_device *mgs,
3550                          struct mgs_target_info *mti,
3551                          struct fs_db *fsdb)
3552 {
3553         int rc = -EINVAL;
3554         char *buf, *params;
3555         ENTRY;
3556
3557         /* set/check the new target index */
3558         rc = mgs_set_index(env, mgs, mti);
3559         if (rc < 0) {
3560                 CERROR("Can't get index (%d)\n", rc);
3561                 RETURN(rc);
3562         }
3563
3564         if (rc == EALREADY) {
3565                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3566                               mti->mti_stripe_index, mti->mti_svname);
3567                 /* We would like to mark old log sections as invalid
3568                    and add new log sections in the client and mdt logs.
3569                    But if we add new sections, then live clients will
3570                    get repeat setup instructions for already running
3571                    osc's. So don't update the client/mdt logs. */
3572                 mti->mti_flags &= ~LDD_F_UPDATE;
3573                 rc = 0;
3574         }
3575
3576         mutex_lock(&fsdb->fsdb_mutex);
3577
3578         if (mti->mti_flags &
3579             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3580                 /* Generate a log from scratch */
3581                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3582                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3583                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3584                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3585                 } else {
3586                         CERROR("Unknown target type %#x, can't create log for "
3587                                "%s\n", mti->mti_flags, mti->mti_svname);
3588                 }
3589                 if (rc) {
3590                         CERROR("Can't write logs for %s (%d)\n",
3591                                mti->mti_svname, rc);
3592                         GOTO(out_up, rc);
3593                 }
3594         } else {
3595                 /* Just update the params from tunefs in mgs_write_log_params */
3596                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3597                 mti->mti_flags |= LDD_F_PARAM;
3598         }
3599
3600         /* allocate temporary buffer, where class_get_next_param will
3601            make copy of a current  parameter */
3602         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3603         if (buf == NULL)
3604                 GOTO(out_up, rc = -ENOMEM);
3605         params = mti->mti_params;
3606         while (params != NULL) {
3607                 rc = class_get_next_param(&params, buf);
3608                 if (rc) {
3609                         if (rc == 1)
3610                                 /* there is no next parameter, that is
3611                                    not an error */
3612                                 rc = 0;
3613                         break;
3614                 }
3615                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3616                        params, buf);
3617                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3618                 if (rc)
3619                         break;
3620         }
3621
3622         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3623
3624 out_up:
3625         mutex_unlock(&fsdb->fsdb_mutex);
3626         RETURN(rc);
3627 }
3628
3629 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3630 {
3631         struct llog_ctxt        *ctxt;
3632         int                      rc = 0;
3633
3634         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3635         if (ctxt == NULL) {
3636                 CERROR("%s: MGS config context doesn't exist\n",
3637                        mgs->mgs_obd->obd_name);
3638                 rc = -ENODEV;
3639         } else {
3640                 rc = llog_erase(env, ctxt, NULL, name);
3641                 /* llog may not exist */
3642                 if (rc == -ENOENT)
3643                         rc = 0;
3644                 llog_ctxt_put(ctxt);
3645         }
3646
3647         if (rc)
3648                 CERROR("%s: failed to clear log %s: %d\n",
3649                        mgs->mgs_obd->obd_name, name, rc);
3650
3651         return rc;
3652 }
3653
3654 /* erase all logs for the given fs */
3655 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3656 {
3657         struct fs_db *fsdb;
3658         cfs_list_t list;
3659         struct mgs_direntry *dirent, *n;
3660         int rc, len = strlen(fsname);
3661         char *suffix;
3662         ENTRY;
3663
3664         /* Find all the logs in the CONFIGS directory */
3665         rc = class_dentry_readdir(env, mgs, &list);
3666         if (rc)
3667                 RETURN(rc);
3668
3669         mutex_lock(&mgs->mgs_mutex);
3670
3671         /* Delete the fs db */
3672         fsdb = mgs_find_fsdb(mgs, fsname);
3673         if (fsdb)
3674                 mgs_free_fsdb(mgs, fsdb);
3675
3676         mutex_unlock(&mgs->mgs_mutex);
3677
3678         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3679                 cfs_list_del(&dirent->list);
3680                 suffix = strrchr(dirent->name, '-');
3681                 if (suffix != NULL) {
3682                         if ((len == suffix - dirent->name) &&
3683                             (strncmp(fsname, dirent->name, len) == 0)) {
3684                                 CDEBUG(D_MGS, "Removing log %s\n",
3685                                        dirent->name);
3686                                 mgs_erase_log(env, mgs, dirent->name);
3687                         }
3688                 }
3689                 mgs_direntry_free(dirent);
3690         }
3691
3692         RETURN(rc);
3693 }
3694
3695 /* list all logs for the given fs */
3696 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3697                   struct obd_ioctl_data *data)
3698 {
3699         cfs_list_t               list;
3700         struct mgs_direntry     *dirent, *n;
3701         char                    *out, *suffix;
3702         int                      l, remains, rc;
3703
3704         ENTRY;
3705
3706         /* Find all the logs in the CONFIGS directory */
3707         rc = class_dentry_readdir(env, mgs, &list);
3708         if (rc) {
3709                 CERROR("%s: can't read %s dir = %d\n",
3710                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
3711                 RETURN(rc);
3712         }
3713
3714         out = data->ioc_bulk;
3715         remains = data->ioc_inllen1;
3716         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3717                 cfs_list_del(&dirent->list);
3718                 suffix = strrchr(dirent->name, '-');
3719                 if (suffix != NULL) {
3720                         l = snprintf(out, remains, "config log: $%s\n",
3721                                      dirent->name);
3722                         out += l;
3723                         remains -= l;
3724                 }
3725                 mgs_direntry_free(dirent);
3726                 if (remains <= 0)
3727                         break;
3728         }
3729         RETURN(rc);
3730 }
3731
3732 /* from llog_swab */
3733 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3734 {
3735         int i;
3736         ENTRY;
3737
3738         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3739         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3740
3741         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3742         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3743         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3744         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3745
3746         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3747         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3748                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3749                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3750                                i, lcfg->lcfg_buflens[i],
3751                                lustre_cfg_string(lcfg, i));
3752                 }
3753         EXIT;
3754 }
3755
3756 /* Set a permanent (config log) param for a target or fs
3757  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3758  *             buf1 contains the single parameter
3759  */
3760 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3761                  struct lustre_cfg *lcfg, char *fsname)
3762 {
3763         struct fs_db *fsdb;
3764         struct mgs_target_info *mti;
3765         char *devname, *param;
3766         char *ptr;
3767         const char *tmp;
3768         __u32 index;
3769         int rc = 0;
3770         ENTRY;
3771
3772         print_lustre_cfg(lcfg);
3773
3774         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3775         devname = lustre_cfg_string(lcfg, 0);
3776         param = lustre_cfg_string(lcfg, 1);
3777         if (!devname) {
3778                 /* Assume device name embedded in param:
3779                    lustre-OST0000.osc.max_dirty_mb=32 */
3780                 ptr = strchr(param, '.');
3781                 if (ptr) {
3782                         devname = param;
3783                         *ptr = 0;
3784                         param = ptr + 1;
3785                 }
3786         }
3787         if (!devname) {
3788                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3789                 RETURN(-ENOSYS);
3790         }
3791
3792         rc = mgs_parse_devname(devname, fsname, NULL);
3793         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3794                 /* param related to llite isn't allowed to set by OST or MDT */
3795                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3796                                        sizeof(PARAM_LLITE) - 1) == 0)
3797                         RETURN(-EINVAL);
3798         } else {
3799                 /* assume devname is the fsname */
3800                 memset(fsname, 0, MTI_NAME_MAXLEN);
3801                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3802                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3803         }
3804         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3805
3806         rc = mgs_find_or_make_fsdb(env, mgs,
3807                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3808                                    PARAMS_FILENAME : fsname, &fsdb);
3809         if (rc)
3810                 RETURN(rc);
3811
3812         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3813             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3814             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3815                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3816                        "is '%s'\n", fsname, devname);
3817                 mgs_free_fsdb(mgs, fsdb);
3818                 RETURN(-EINVAL);
3819         }
3820
3821         /* Create a fake mti to hold everything */
3822         OBD_ALLOC_PTR(mti);
3823         if (!mti)
3824                 GOTO(out, rc = -ENOMEM);
3825         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3826             >= sizeof(mti->mti_fsname))
3827                 GOTO(out, rc = -E2BIG);
3828         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3829             >= sizeof(mti->mti_svname))
3830                 GOTO(out, rc = -E2BIG);
3831         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3832             >= sizeof(mti->mti_params))
3833                 GOTO(out, rc = -E2BIG);
3834         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3835         if (rc < 0)
3836                 /* Not a valid server; may be only fsname */
3837                 rc = 0;
3838         else
3839                 /* Strip -osc or -mdc suffix from svname */
3840                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3841                                      mti->mti_svname))
3842                         GOTO(out, rc = -EINVAL);
3843         /*
3844          * Revoke lock so everyone updates.  Should be alright if
3845          * someone was already reading while we were updating the logs,
3846          * so we don't really need to hold the lock while we're
3847          * writing (above).
3848          */
3849         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3850                 mti->mti_flags = rc | LDD_F_PARAM2;
3851                 mutex_lock(&fsdb->fsdb_mutex);
3852                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3853                 mutex_unlock(&fsdb->fsdb_mutex);
3854                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3855         } else {
3856                 mti->mti_flags = rc | LDD_F_PARAM;
3857                 mutex_lock(&fsdb->fsdb_mutex);
3858                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3859                 mutex_unlock(&fsdb->fsdb_mutex);
3860                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3861         }
3862
3863 out:
3864         OBD_FREE_PTR(mti);
3865         RETURN(rc);
3866 }
3867
3868 static int mgs_write_log_pool(const struct lu_env *env,
3869                               struct mgs_device *mgs, char *logname,
3870                               struct fs_db *fsdb, char *tgtname,
3871                               enum lcfg_command_type cmd,
3872                               char *fsname, char *poolname,
3873                               char *ostname, char *comment)
3874 {
3875         struct llog_handle *llh = NULL;
3876         int rc;
3877
3878         rc = record_start_log(env, mgs, &llh, logname);
3879         if (rc)
3880                 return rc;
3881         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3882         if (rc)
3883                 goto out;
3884         rc = record_base(env, llh, tgtname, 0, cmd,
3885                          fsname, poolname, ostname, 0);
3886         if (rc)
3887                 goto out;
3888         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3889 out:
3890         record_end_log(env, &llh);
3891         return rc;
3892 }
3893
3894 int mgs_nodemap_cmd(const struct lu_env *env, struct mgs_device *mgs,
3895                     enum lcfg_command_type cmd, const char *nodemap_name,
3896                     const char *param)
3897 {
3898         lnet_nid_t      nid[2];
3899         __u32           idmap[2];
3900         bool            bool_switch;
3901         __u32           int_id;
3902         int             rc = 0;
3903         ENTRY;
3904
3905         switch (cmd) {
3906         case LCFG_NODEMAP_ADD:
3907                 rc = nodemap_add(nodemap_name);
3908                 break;
3909         case LCFG_NODEMAP_DEL:
3910                 rc = nodemap_del(nodemap_name);
3911                 break;
3912         case LCFG_NODEMAP_ADD_RANGE:
3913                 rc = nodemap_parse_range(param, nid);
3914                 if (rc != 0)
3915                         break;
3916                 rc = nodemap_add_range(nodemap_name, nid);
3917                 break;
3918         case LCFG_NODEMAP_DEL_RANGE:
3919                 rc = nodemap_parse_range(param, nid);
3920                 if (rc != 0)
3921                         break;
3922                 rc = nodemap_del_range(nodemap_name, nid);
3923                 break;
3924         case LCFG_NODEMAP_ADMIN:
3925                 bool_switch = simple_strtoul(param, NULL, 10);
3926                 rc = nodemap_set_allow_root(nodemap_name, bool_switch);
3927                 break;
3928         case LCFG_NODEMAP_TRUSTED:
3929                 bool_switch = simple_strtoul(param, NULL, 10);
3930                 rc = nodemap_set_trust_client_ids(nodemap_name, bool_switch);
3931                 break;
3932         case LCFG_NODEMAP_SQUASH_UID:
3933                 int_id = simple_strtoul(param, NULL, 10);
3934                 rc = nodemap_set_squash_uid(nodemap_name, int_id);
3935                 break;
3936         case LCFG_NODEMAP_SQUASH_GID:
3937                 int_id = simple_strtoul(param, NULL, 10);
3938                 rc = nodemap_set_squash_gid(nodemap_name, int_id);
3939                 break;
3940         case LCFG_NODEMAP_ADD_UIDMAP:
3941         case LCFG_NODEMAP_ADD_GIDMAP:
3942                 rc = nodemap_parse_idmap(param, idmap);
3943                 if (rc != 0)
3944                         break;
3945                 if (cmd == LCFG_NODEMAP_ADD_UIDMAP)
3946                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_UID,
3947                                                idmap);
3948                 else
3949                         rc = nodemap_add_idmap(nodemap_name, NODEMAP_GID,
3950                                                idmap);
3951                 break;
3952         case LCFG_NODEMAP_DEL_UIDMAP:
3953         case LCFG_NODEMAP_DEL_GIDMAP:
3954                 rc = nodemap_parse_idmap(param, idmap);
3955                 if (rc != 0)
3956                         break;
3957                 if (cmd == LCFG_NODEMAP_DEL_UIDMAP)
3958                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_UID,
3959                                                idmap);
3960                 else
3961                         rc = nodemap_del_idmap(nodemap_name, NODEMAP_GID,
3962                                                idmap);
3963                 break;
3964         default:
3965                 rc = -EINVAL;
3966         }
3967
3968         RETURN(rc);
3969 }
3970
3971 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3972                  enum lcfg_command_type cmd, char *fsname,
3973                  char *poolname, char *ostname)
3974 {
3975         struct fs_db *fsdb;
3976         char *lovname;
3977         char *logname;
3978         char *label = NULL, *canceled_label = NULL;
3979         int label_sz;
3980         struct mgs_target_info *mti = NULL;
3981         int rc, i;
3982         ENTRY;
3983
3984         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3985         if (rc) {
3986                 CERROR("Can't get db for %s\n", fsname);
3987                 RETURN(rc);
3988         }
3989         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3990                 CERROR("%s is not defined\n", fsname);
3991                 mgs_free_fsdb(mgs, fsdb);
3992                 RETURN(-EINVAL);
3993         }
3994
3995         label_sz = 10 + strlen(fsname) + strlen(poolname);
3996
3997         /* check if ostname match fsname */
3998         if (ostname != NULL) {
3999                 char *ptr;
4000
4001                 ptr = strrchr(ostname, '-');
4002                 if ((ptr == NULL) ||
4003                     (strncmp(fsname, ostname, ptr-ostname) != 0))
4004                         RETURN(-EINVAL);
4005                 label_sz += strlen(ostname);
4006         }
4007
4008         OBD_ALLOC(label, label_sz);
4009         if (label == NULL)
4010                 RETURN(-ENOMEM);
4011
4012         switch(cmd) {
4013         case LCFG_POOL_NEW:
4014                 sprintf(label,
4015                         "new %s.%s", fsname, poolname);
4016                 break;
4017         case LCFG_POOL_ADD:
4018                 sprintf(label,
4019                         "add %s.%s.%s", fsname, poolname, ostname);
4020                 break;
4021         case LCFG_POOL_REM:
4022                 OBD_ALLOC(canceled_label, label_sz);
4023                 if (canceled_label == NULL)
4024                         GOTO(out_label, rc = -ENOMEM);
4025                 sprintf(label,
4026                         "rem %s.%s.%s", fsname, poolname, ostname);
4027                 sprintf(canceled_label,
4028                         "add %s.%s.%s", fsname, poolname, ostname);
4029                 break;
4030         case LCFG_POOL_DEL:
4031                 OBD_ALLOC(canceled_label, label_sz);
4032                 if (canceled_label == NULL)
4033                         GOTO(out_label, rc = -ENOMEM);
4034                 sprintf(label,
4035                         "del %s.%s", fsname, poolname);
4036                 sprintf(canceled_label,
4037                         "new %s.%s", fsname, poolname);
4038                 break;
4039         default:
4040                 break;
4041         }
4042
4043         if (canceled_label != NULL) {
4044                 OBD_ALLOC_PTR(mti);
4045                 if (mti == NULL)
4046                         GOTO(out_cancel, rc = -ENOMEM);
4047         }
4048
4049         mutex_lock(&fsdb->fsdb_mutex);
4050         /* write pool def to all MDT logs */
4051         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
4052                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
4053                         rc = name_create_mdt_and_lov(&logname, &lovname,
4054                                                      fsdb, i);
4055                         if (rc) {
4056                                 mutex_unlock(&fsdb->fsdb_mutex);
4057                                 GOTO(out_mti, rc);
4058                         }
4059                         if (canceled_label != NULL) {
4060                                 strcpy(mti->mti_svname, "lov pool");
4061                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4062                                                 lovname, canceled_label,
4063                                                 CM_SKIP);
4064                         }
4065
4066                         if (rc >= 0)
4067                                 rc = mgs_write_log_pool(env, mgs, logname,
4068                                                         fsdb, lovname, cmd,
4069                                                         fsname, poolname,
4070                                                         ostname, label);
4071                         name_destroy(&logname);
4072                         name_destroy(&lovname);
4073                         if (rc) {
4074                                 mutex_unlock(&fsdb->fsdb_mutex);
4075                                 GOTO(out_mti, rc);
4076                         }
4077                 }
4078         }
4079
4080         rc = name_create(&logname, fsname, "-client");
4081         if (rc) {
4082                 mutex_unlock(&fsdb->fsdb_mutex);
4083                 GOTO(out_mti, rc);
4084         }
4085         if (canceled_label != NULL) {
4086                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4087                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4088                 if (rc < 0) {
4089                         mutex_unlock(&fsdb->fsdb_mutex);
4090                         name_destroy(&logname);
4091                         GOTO(out_mti, rc);
4092                 }
4093         }
4094
4095         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4096                                 cmd, fsname, poolname, ostname, label);
4097         mutex_unlock(&fsdb->fsdb_mutex);
4098         name_destroy(&logname);
4099         /* request for update */
4100         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4101
4102         EXIT;
4103 out_mti:
4104         if (mti != NULL)
4105                 OBD_FREE_PTR(mti);
4106 out_cancel:
4107         if (canceled_label != NULL)
4108                 OBD_FREE(canceled_label, label_sz);
4109 out_label:
4110         OBD_FREE(label, label_sz);
4111         return rc;
4112 }