Whamcloud - gitweb
LU-3677 mdt: Set HSM dirty open-for-write file when evicted.
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         if (!dt_try_as_dir(env, dir))
71                 GOTO(out, rc = -ENOTDIR);
72
73         LASSERT(dir);
74         LASSERT(dir->do_index_ops);
75
76         iops = &dir->do_index_ops->dio_it;
77         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
78         if (IS_ERR(it))
79                 RETURN(PTR_ERR(it));
80
81         rc = iops->load(env, it, 0);
82         if (rc <= 0)
83                 GOTO(fini, rc = 0);
84
85         /* main cycle */
86         do {
87                 key = (void *)iops->key(env, it);
88                 if (IS_ERR(key)) {
89                         CERROR("%s: key failed when listing %s: rc = %d\n",
90                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
91                                (int) PTR_ERR(key));
92                         goto next;
93                 }
94                 key_sz = iops->key_size(env, it);
95                 LASSERT(key_sz > 0);
96
97                 /* filter out "." and ".." entries */
98                 if (key[0] == '.') {
99                         if (key_sz == 1)
100                                 goto next;
101                         if (key_sz == 2 && key[1] == '.')
102                                 goto next;
103                 }
104
105                 de = mgs_direntry_alloc(key_sz + 1);
106                 if (de == NULL) {
107                         rc = -ENOMEM;
108                         break;
109                 }
110
111                 memcpy(de->name, key, key_sz);
112                 de->name[key_sz] = 0;
113
114                 cfs_list_add(&de->list, list);
115
116 next:
117                 rc = iops->next(env, it);
118         } while (rc == 0);
119         rc = 0;
120
121         iops->put(env, it);
122
123 fini:
124         iops->fini(env, it);
125 out:
126         if (rc)
127                 CERROR("%s: key failed when listing %s: rc = %d\n",
128                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
129         RETURN(rc);
130 }
131
132 /******************** DB functions *********************/
133
134 static inline int name_create(char **newname, char *prefix, char *suffix)
135 {
136         LASSERT(newname);
137         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
138         if (!*newname)
139                 return -ENOMEM;
140         sprintf(*newname, "%s%s", prefix, suffix);
141         return 0;
142 }
143
144 static inline void name_destroy(char **name)
145 {
146         if (*name)
147                 OBD_FREE(*name, strlen(*name) + 1);
148         *name = NULL;
149 }
150
151 struct mgs_fsdb_handler_data
152 {
153         struct fs_db   *fsdb;
154         __u32           ver;
155 };
156
157 /* from the (client) config log, figure out:
158         1. which ost's/mdt's are configured (by index)
159         2. what the last config step is
160         3. COMPAT_18 osc name
161 */
162 /* It might be better to have a separate db file, instead of parsing the info
163    out of the client log.  This is slow and potentially error-prone. */
164 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
165                             struct llog_rec_hdr *rec, void *data)
166 {
167         struct mgs_fsdb_handler_data *d = data;
168         struct fs_db *fsdb = d->fsdb;
169         int cfg_len = rec->lrh_len;
170         char *cfg_buf = (char*) (rec + 1);
171         struct lustre_cfg *lcfg;
172         __u32 index;
173         int rc = 0;
174         ENTRY;
175
176         if (rec->lrh_type != OBD_CFG_REC) {
177                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
178                 RETURN(-EINVAL);
179         }
180
181         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
182         if (rc) {
183                 CERROR("Insane cfg\n");
184                 RETURN(rc);
185         }
186
187         lcfg = (struct lustre_cfg *)cfg_buf;
188
189         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
190                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
191
192         /* Figure out ost indicies */
193         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
194         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
195             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
196                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
197                                        NULL, 10);
198                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
199                        lustre_cfg_string(lcfg, 1), index,
200                        lustre_cfg_string(lcfg, 2));
201                 set_bit(index, fsdb->fsdb_ost_index_map);
202         }
203
204         /* Figure out mdt indicies */
205         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
206         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
207             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
208                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
209                                        &index, NULL);
210                 if (rc != LDD_F_SV_TYPE_MDT) {
211                         CWARN("Unparsable MDC name %s, assuming index 0\n",
212                               lustre_cfg_string(lcfg, 0));
213                         index = 0;
214                 }
215                 rc = 0;
216                 CDEBUG(D_MGS, "MDT index is %u\n", index);
217                 set_bit(index, fsdb->fsdb_mdt_index_map);
218                 fsdb->fsdb_mdt_count ++;
219         }
220
221         /**
222          * figure out the old config. fsdb_gen = 0 means old log
223          * It is obsoleted and not supported anymore
224          */
225         if (fsdb->fsdb_gen == 0) {
226                 CERROR("Old config format is not supported\n");
227                 RETURN(-EINVAL);
228         }
229
230         /*
231          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
232          */
233         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
234             lcfg->lcfg_command == LCFG_ATTACH &&
235             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
236                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
237                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
238                         CWARN("MDT using 1.8 OSC name scheme\n");
239                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
240                 }
241         }
242
243         if (lcfg->lcfg_command == LCFG_MARKER) {
244                 struct cfg_marker *marker;
245                 marker = lustre_cfg_buf(lcfg, 1);
246
247                 d->ver = marker->cm_vers;
248
249                 /* Keep track of the latest marker step */
250                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
251         }
252
253         RETURN(rc);
254 }
255
256 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
257 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
258                                   struct mgs_device *mgs,
259                                   struct fs_db *fsdb)
260 {
261         char                            *logname;
262         struct llog_handle              *loghandle;
263         struct llog_ctxt                *ctxt;
264         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
265         int rc;
266
267         ENTRY;
268
269         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
270         LASSERT(ctxt != NULL);
271         rc = name_create(&logname, fsdb->fsdb_name, "-client");
272         if (rc)
273                 GOTO(out_put, rc);
274         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
279         if (rc)
280                 GOTO(out_close, rc);
281
282         if (llog_get_size(loghandle) <= 1)
283                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
284
285         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
286         CDEBUG(D_INFO, "get_db = %d\n", rc);
287 out_close:
288         llog_close(env, loghandle);
289 out_pop:
290         name_destroy(&logname);
291 out_put:
292         llog_ctxt_put(ctxt);
293
294         RETURN(rc);
295 }
296
297 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
298 {
299         struct mgs_tgt_srpc_conf *tgtconf;
300
301         /* free target-specific rules */
302         while (fsdb->fsdb_srpc_tgt) {
303                 tgtconf = fsdb->fsdb_srpc_tgt;
304                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
305
306                 LASSERT(tgtconf->mtsc_tgt);
307
308                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
309                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
310                 OBD_FREE_PTR(tgtconf);
311         }
312
313         /* free general rules */
314         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
315 }
316
317 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
318 {
319         struct fs_db *fsdb;
320         cfs_list_t *tmp;
321
322         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
323                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
324                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
325                         return fsdb;
326         }
327         return NULL;
328 }
329
330 /* caller must hold the mgs->mgs_fs_db_lock */
331 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
332                                   struct mgs_device *mgs, char *fsname)
333 {
334         struct fs_db *fsdb;
335         int rc;
336         ENTRY;
337
338         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
339                 CERROR("fsname %s is too long\n", fsname);
340                 RETURN(NULL);
341         }
342
343         OBD_ALLOC_PTR(fsdb);
344         if (!fsdb)
345                 RETURN(NULL);
346
347         strcpy(fsdb->fsdb_name, fsname);
348         mutex_init(&fsdb->fsdb_mutex);
349         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
350         fsdb->fsdb_gen = 1;
351
352         if (strcmp(fsname, MGSSELF_NAME) == 0) {
353                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
354         } else {
355                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
356                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
357                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
358                         CERROR("No memory for index maps\n");
359                         GOTO(err, rc = -ENOMEM);
360                 }
361
362                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
363                 if (rc)
364                         GOTO(err, rc);
365                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
366                 if (rc)
367                         GOTO(err, rc);
368
369                 /* initialise data for NID table */
370                 mgs_ir_init_fs(env, mgs, fsdb);
371
372                 lproc_mgs_add_live(mgs, fsdb);
373         }
374
375         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
376
377         RETURN(fsdb);
378 err:
379         if (fsdb->fsdb_ost_index_map)
380                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
381         if (fsdb->fsdb_mdt_index_map)
382                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
383         name_destroy(&fsdb->fsdb_clilov);
384         name_destroy(&fsdb->fsdb_clilmv);
385         OBD_FREE_PTR(fsdb);
386         RETURN(NULL);
387 }
388
389 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
390 {
391         /* wait for anyone with the sem */
392         mutex_lock(&fsdb->fsdb_mutex);
393         lproc_mgs_del_live(mgs, fsdb);
394         cfs_list_del(&fsdb->fsdb_list);
395
396         /* deinitialize fsr */
397         mgs_ir_fini_fs(mgs, fsdb);
398
399         if (fsdb->fsdb_ost_index_map)
400                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
401         if (fsdb->fsdb_mdt_index_map)
402                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
403         name_destroy(&fsdb->fsdb_clilov);
404         name_destroy(&fsdb->fsdb_clilmv);
405         mgs_free_fsdb_srpc(fsdb);
406         mutex_unlock(&fsdb->fsdb_mutex);
407         OBD_FREE_PTR(fsdb);
408 }
409
410 int mgs_init_fsdb_list(struct mgs_device *mgs)
411 {
412         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
413         return 0;
414 }
415
416 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
417 {
418         struct fs_db *fsdb;
419         cfs_list_t *tmp, *tmp2;
420         mutex_lock(&mgs->mgs_mutex);
421         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
422                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
423                 mgs_free_fsdb(mgs, fsdb);
424         }
425         mutex_unlock(&mgs->mgs_mutex);
426         return 0;
427 }
428
429 int mgs_find_or_make_fsdb(const struct lu_env *env,
430                           struct mgs_device *mgs, char *name,
431                           struct fs_db **dbh)
432 {
433         struct fs_db *fsdb;
434         int rc = 0;
435
436         ENTRY;
437         mutex_lock(&mgs->mgs_mutex);
438         fsdb = mgs_find_fsdb(mgs, name);
439         if (fsdb) {
440                 mutex_unlock(&mgs->mgs_mutex);
441                 *dbh = fsdb;
442                 RETURN(0);
443         }
444
445         CDEBUG(D_MGS, "Creating new db\n");
446         fsdb = mgs_new_fsdb(env, mgs, name);
447         /* lock fsdb_mutex until the db is loaded from llogs */
448         if (fsdb)
449                 mutex_lock(&fsdb->fsdb_mutex);
450         mutex_unlock(&mgs->mgs_mutex);
451         if (!fsdb)
452                 RETURN(-ENOMEM);
453
454         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
455                 /* populate the db from the client llog */
456                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
457                 if (rc) {
458                         CERROR("Can't get db from client log %d\n", rc);
459                         GOTO(out_free, rc);
460                 }
461         }
462
463         /* populate srpc rules from params llog */
464         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
465         if (rc) {
466                 CERROR("Can't get db from params log %d\n", rc);
467                 GOTO(out_free, rc);
468         }
469
470         mutex_unlock(&fsdb->fsdb_mutex);
471         *dbh = fsdb;
472
473         RETURN(0);
474
475 out_free:
476         mutex_unlock(&fsdb->fsdb_mutex);
477         mgs_free_fsdb(mgs, fsdb);
478         return rc;
479 }
480
481 /* 1 = index in use
482    0 = index unused
483    -1= empty client log */
484 int mgs_check_index(const struct lu_env *env,
485                     struct mgs_device *mgs,
486                     struct mgs_target_info *mti)
487 {
488         struct fs_db *fsdb;
489         void *imap;
490         int rc = 0;
491         ENTRY;
492
493         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
494
495         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
496         if (rc) {
497                 CERROR("Can't get db for %s\n", mti->mti_fsname);
498                 RETURN(rc);
499         }
500
501         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
502                 RETURN(-1);
503
504         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
505                 imap = fsdb->fsdb_ost_index_map;
506         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
507                 imap = fsdb->fsdb_mdt_index_map;
508         else
509                 RETURN(-EINVAL);
510
511         if (test_bit(mti->mti_stripe_index, imap))
512                 RETURN(1);
513         RETURN(0);
514 }
515
516 static __inline__ int next_index(void *index_map, int map_len)
517 {
518         int i;
519         for (i = 0; i < map_len * 8; i++)
520                 if (!test_bit(i, index_map)) {
521                          return i;
522                  }
523         CERROR("max index %d exceeded.\n", i);
524         return -1;
525 }
526
527 /* Return codes:
528         0  newly marked as in use
529         <0 err
530         +EALREADY for update of an old index */
531 static int mgs_set_index(const struct lu_env *env,
532                          struct mgs_device *mgs,
533                          struct mgs_target_info *mti)
534 {
535         struct fs_db *fsdb;
536         void *imap;
537         int rc = 0;
538         ENTRY;
539
540         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
541         if (rc) {
542                 CERROR("Can't get db for %s\n", mti->mti_fsname);
543                 RETURN(rc);
544         }
545
546         mutex_lock(&fsdb->fsdb_mutex);
547         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
548                 imap = fsdb->fsdb_ost_index_map;
549         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
550                 imap = fsdb->fsdb_mdt_index_map;
551         } else {
552                 GOTO(out_up, rc = -EINVAL);
553         }
554
555         if (mti->mti_flags & LDD_F_NEED_INDEX) {
556                 rc = next_index(imap, INDEX_MAP_SIZE);
557                 if (rc == -1)
558                         GOTO(out_up, rc = -ERANGE);
559                 mti->mti_stripe_index = rc;
560                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
561                         fsdb->fsdb_mdt_count ++;
562         }
563
564         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
565                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
566                                    "but the max index is %d.\n",
567                                    mti->mti_svname, mti->mti_stripe_index,
568                                    INDEX_MAP_SIZE * 8);
569                 GOTO(out_up, rc = -ERANGE);
570         }
571
572         if (test_bit(mti->mti_stripe_index, imap)) {
573                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
574                     !(mti->mti_flags & LDD_F_WRITECONF)) {
575                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
576                                            "%d, but that index is already in "
577                                            "use. Use --writeconf to force\n",
578                                            mti->mti_svname,
579                                            mti->mti_stripe_index);
580                         GOTO(out_up, rc = -EADDRINUSE);
581                 } else {
582                         CDEBUG(D_MGS, "Server %s updating index %d\n",
583                                mti->mti_svname, mti->mti_stripe_index);
584                         GOTO(out_up, rc = EALREADY);
585                 }
586         }
587
588         set_bit(mti->mti_stripe_index, imap);
589         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
590         mutex_unlock(&fsdb->fsdb_mutex);
591         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
592                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
593
594         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
595                mti->mti_stripe_index);
596
597         RETURN(0);
598 out_up:
599         mutex_unlock(&fsdb->fsdb_mutex);
600         return rc;
601 }
602
603 struct mgs_modify_lookup {
604         struct cfg_marker mml_marker;
605         int               mml_modified;
606 };
607
608 static int mgs_modify_handler(const struct lu_env *env,
609                               struct llog_handle *llh,
610                               struct llog_rec_hdr *rec, void *data)
611 {
612         struct mgs_modify_lookup *mml = data;
613         struct cfg_marker *marker;
614         struct lustre_cfg *lcfg = REC_DATA(rec);
615         int cfg_len = REC_DATA_LEN(rec);
616         int rc;
617         ENTRY;
618
619         if (rec->lrh_type != OBD_CFG_REC) {
620                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
621                 RETURN(-EINVAL);
622         }
623
624         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
625         if (rc) {
626                 CERROR("Insane cfg\n");
627                 RETURN(rc);
628         }
629
630         /* We only care about markers */
631         if (lcfg->lcfg_command != LCFG_MARKER)
632                 RETURN(0);
633
634         marker = lustre_cfg_buf(lcfg, 1);
635         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
636             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
637             !(marker->cm_flags & CM_SKIP)) {
638                 /* Found a non-skipped marker match */
639                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
640                        rec->lrh_index, marker->cm_step,
641                        marker->cm_flags, mml->mml_marker.cm_flags,
642                        marker->cm_tgtname, marker->cm_comment);
643                 /* Overwrite the old marker llog entry */
644                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
645                 marker->cm_flags |= mml->mml_marker.cm_flags;
646                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
647                 /* Header and tail are added back to lrh_len in
648                    llog_lvfs_write_rec */
649                 rec->lrh_len = cfg_len;
650                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
651                                 rec->lrh_index);
652                 if (!rc)
653                          mml->mml_modified++;
654         }
655
656         RETURN(rc);
657 }
658
659 /**
660  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
661  * Return code:
662  * 0 - modified successfully,
663  * 1 - no modification was done
664  * negative - error
665  */
666 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
667                       struct fs_db *fsdb, struct mgs_target_info *mti,
668                       char *logname, char *devname, char *comment, int flags)
669 {
670         struct llog_handle *loghandle;
671         struct llog_ctxt *ctxt;
672         struct mgs_modify_lookup *mml;
673         int rc;
674
675         ENTRY;
676
677         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
678         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
679                flags);
680
681         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
682         LASSERT(ctxt != NULL);
683         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
684         if (rc < 0) {
685                 if (rc == -ENOENT)
686                         rc = 0;
687                 GOTO(out_pop, rc);
688         }
689
690         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
691         if (rc)
692                 GOTO(out_close, rc);
693
694         if (llog_get_size(loghandle) <= 1)
695                 GOTO(out_close, rc = 0);
696
697         OBD_ALLOC_PTR(mml);
698         if (!mml)
699                 GOTO(out_close, rc = -ENOMEM);
700         strcpy(mml->mml_marker.cm_comment, comment);
701         strcpy(mml->mml_marker.cm_tgtname, devname);
702         /* Modify mostly means cancel */
703         mml->mml_marker.cm_flags = flags;
704         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
705         mml->mml_modified = 0;
706         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
707                           NULL);
708         if (!rc && !mml->mml_modified)
709                 rc = 1;
710         OBD_FREE_PTR(mml);
711
712 out_close:
713         llog_close(env, loghandle);
714 out_pop:
715         if (rc < 0)
716                 CERROR("%s: modify %s/%s failed: rc = %d\n",
717                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
718         llog_ctxt_put(ctxt);
719         RETURN(rc);
720 }
721
722 /** This structure is passed to mgs_replace_handler */
723 struct mgs_replace_uuid_lookup {
724         /* Nids are replaced for this target device */
725         struct mgs_target_info target;
726         /* Temporary modified llog */
727         struct llog_handle *temp_llh;
728         /* Flag is set if in target block*/
729         int in_target_device;
730         /* Nids already added. Just skip (multiple nids) */
731         int device_nids_added;
732         /* Flag is set if this block should not be copied */
733         int skip_it;
734 };
735
736 /**
737  * Check: a) if block should be skipped
738  * b) is it target block
739  *
740  * \param[in] lcfg
741  * \param[in] mrul
742  *
743  * \retval 0 should not to be skipped
744  * \retval 1 should to be skipped
745  */
746 static int check_markers(struct lustre_cfg *lcfg,
747                          struct mgs_replace_uuid_lookup *mrul)
748 {
749          struct cfg_marker *marker;
750
751         /* Track markers. Find given device */
752         if (lcfg->lcfg_command == LCFG_MARKER) {
753                 marker = lustre_cfg_buf(lcfg, 1);
754                 /* Clean llog from records marked as CM_EXCLUDE.
755                    CM_SKIP records are used for "active" command
756                    and can be restored if needed */
757                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
758                     (CM_EXCLUDE | CM_START)) {
759                         mrul->skip_it = 1;
760                         return 1;
761                 }
762
763                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
764                     (CM_EXCLUDE | CM_END)) {
765                         mrul->skip_it = 0;
766                         return 1;
767                 }
768
769                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
770                         LASSERT(!(marker->cm_flags & CM_START) ||
771                                 !(marker->cm_flags & CM_END));
772                         if (marker->cm_flags & CM_START) {
773                                 mrul->in_target_device = 1;
774                                 mrul->device_nids_added = 0;
775                         } else if (marker->cm_flags & CM_END)
776                                 mrul->in_target_device = 0;
777                 }
778         }
779
780         return 0;
781 }
782
783 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
784                        struct lustre_cfg *lcfg)
785 {
786         struct llog_rec_hdr      rec;
787         int                      buflen, rc;
788
789         if (!lcfg || !llh)
790                 return -ENOMEM;
791
792         LASSERT(llh->lgh_ctxt);
793
794         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
795                                 lcfg->lcfg_buflens);
796         rec.lrh_len = llog_data_len(buflen);
797         rec.lrh_type = OBD_CFG_REC;
798
799         /* idx = -1 means append */
800         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
801         if (rc)
802                 CERROR("failed %d\n", rc);
803         return rc;
804 }
805
806 static int record_base(const struct lu_env *env, struct llog_handle *llh,
807                      char *cfgname, lnet_nid_t nid, int cmd,
808                      char *s1, char *s2, char *s3, char *s4)
809 {
810         struct mgs_thread_info *mgi = mgs_env_info(env);
811         struct lustre_cfg     *lcfg;
812         int rc;
813
814         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
815                cmd, s1, s2, s3, s4);
816
817         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
818         if (s1)
819                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
820         if (s2)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
822         if (s3)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
824         if (s4)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
826
827         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
828         if (!lcfg)
829                 return -ENOMEM;
830         lcfg->lcfg_nid = nid;
831
832         rc = record_lcfg(env, llh, lcfg);
833
834         lustre_cfg_free(lcfg);
835
836         if (rc) {
837                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
838                        cmd, s1, s2, s3, s4);
839         }
840         return rc;
841 }
842
843 static inline int record_add_uuid(const struct lu_env *env,
844                                   struct llog_handle *llh,
845                                   uint64_t nid, char *uuid)
846 {
847         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
848 }
849
850 static inline int record_add_conn(const struct lu_env *env,
851                                   struct llog_handle *llh,
852                                   char *devname, char *uuid)
853 {
854         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
855 }
856
857 static inline int record_attach(const struct lu_env *env,
858                                 struct llog_handle *llh, char *devname,
859                                 char *type, char *uuid)
860 {
861         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
862 }
863
864 static inline int record_setup(const struct lu_env *env,
865                                struct llog_handle *llh, char *devname,
866                                char *s1, char *s2, char *s3, char *s4)
867 {
868         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
869 }
870
871 /**
872  * \retval <0 record processing error
873  * \retval n record is processed. No need copy original one.
874  * \retval 0 record is not processed.
875  */
876 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
877                            struct mgs_replace_uuid_lookup *mrul)
878 {
879         int nids_added = 0;
880         lnet_nid_t nid;
881         char *ptr;
882         int rc;
883
884         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
885                 /* LCFG_ADD_UUID command found. Let's skip original command
886                    and add passed nids */
887                 ptr = mrul->target.mti_params;
888                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
889                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
890                                "device %s\n", libcfs_nid2str(nid),
891                                 mrul->target.mti_params,
892                                 mrul->target.mti_svname);
893                         rc = record_add_uuid(env,
894                                              mrul->temp_llh, nid,
895                                              mrul->target.mti_params);
896                         if (!rc)
897                                 nids_added++;
898                 }
899
900                 if (nids_added == 0) {
901                         CERROR("No new nids were added, nid %s with uuid %s, "
902                                "device %s\n", libcfs_nid2str(nid),
903                                mrul->target.mti_params,
904                                mrul->target.mti_svname);
905                         RETURN(-ENXIO);
906                 } else {
907                         mrul->device_nids_added = 1;
908                 }
909
910                 return nids_added;
911         }
912
913         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
914                 /* LCFG_SETUP command found. UUID should be changed */
915                 rc = record_setup(env,
916                                   mrul->temp_llh,
917                                   /* devname the same */
918                                   lustre_cfg_string(lcfg, 0),
919                                   /* s1 is not changed */
920                                   lustre_cfg_string(lcfg, 1),
921                                   /* new uuid should be
922                                   the full nidlist */
923                                   mrul->target.mti_params,
924                                   /* s3 is not changed */
925                                   lustre_cfg_string(lcfg, 3),
926                                   /* s4 is not changed */
927                                   lustre_cfg_string(lcfg, 4));
928                 return rc ? rc : 1;
929         }
930
931         /* Another commands in target device block */
932         return 0;
933 }
934
935 /**
936  * Handler that called for every record in llog.
937  * Records are processed in order they placed in llog.
938  *
939  * \param[in] llh       log to be processed
940  * \param[in] rec       current record
941  * \param[in] data      mgs_replace_uuid_lookup structure
942  *
943  * \retval 0    success
944  */
945 static int mgs_replace_handler(const struct lu_env *env,
946                                struct llog_handle *llh,
947                                struct llog_rec_hdr *rec,
948                                void *data)
949 {
950         struct mgs_replace_uuid_lookup *mrul;
951         struct lustre_cfg *lcfg = REC_DATA(rec);
952         int cfg_len = REC_DATA_LEN(rec);
953         int rc;
954         ENTRY;
955
956         mrul = (struct mgs_replace_uuid_lookup *)data;
957
958         if (rec->lrh_type != OBD_CFG_REC) {
959                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
960                        rec->lrh_type, lcfg->lcfg_command,
961                        lustre_cfg_string(lcfg, 0),
962                        lustre_cfg_string(lcfg, 1));
963                 RETURN(-EINVAL);
964         }
965
966         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
967         if (rc) {
968                 /* Do not copy any invalidated records */
969                 GOTO(skip_out, rc = 0);
970         }
971
972         rc = check_markers(lcfg, mrul);
973         if (rc || mrul->skip_it)
974                 GOTO(skip_out, rc = 0);
975
976         /* Write to new log all commands outside target device block */
977         if (!mrul->in_target_device)
978                 GOTO(copy_out, rc = 0);
979
980         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
981            (failover nids) for this target, assuming that if then
982            primary is changing then so is the failover */
983         if (mrul->device_nids_added &&
984             (lcfg->lcfg_command == LCFG_ADD_UUID ||
985              lcfg->lcfg_command == LCFG_ADD_CONN))
986                 GOTO(skip_out, rc = 0);
987
988         rc = process_command(env, lcfg, mrul);
989         if (rc < 0)
990                 RETURN(rc);
991
992         if (rc)
993                 RETURN(0);
994 copy_out:
995         /* Record is placed in temporary llog as is */
996         rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1);
997
998         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
999                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1000                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1001         RETURN(rc);
1002
1003 skip_out:
1004         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1005                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1006                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1007         RETURN(rc);
1008 }
1009
1010 static int mgs_log_is_empty(const struct lu_env *env,
1011                             struct mgs_device *mgs, char *name)
1012 {
1013         struct llog_ctxt        *ctxt;
1014         int                      rc;
1015
1016         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1017         LASSERT(ctxt != NULL);
1018
1019         rc = llog_is_empty(env, ctxt, name);
1020         llog_ctxt_put(ctxt);
1021         return rc;
1022 }
1023
1024 static int mgs_replace_nids_log(const struct lu_env *env,
1025                                 struct obd_device *mgs, struct fs_db *fsdb,
1026                                 char *logname, char *devname, char *nids)
1027 {
1028         struct llog_handle *orig_llh, *backup_llh;
1029         struct llog_ctxt *ctxt;
1030         struct mgs_replace_uuid_lookup *mrul;
1031         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1032         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1033         char *backup;
1034         int rc, rc2;
1035         ENTRY;
1036
1037         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1038
1039         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1040         LASSERT(ctxt != NULL);
1041
1042         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1043                 /* Log is empty. Nothing to replace */
1044                 GOTO(out_put, rc = 0);
1045         }
1046
1047         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1048         if (backup == NULL)
1049                 GOTO(out_put, rc = -ENOMEM);
1050
1051         sprintf(backup, "%s.bak", logname);
1052
1053         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1054         if (rc == 0) {
1055                 /* Now erase original log file. Connections are not allowed.
1056                    Backup is already saved */
1057                 rc = llog_erase(env, ctxt, NULL, logname);
1058                 if (rc < 0)
1059                         GOTO(out_free, rc);
1060         } else if (rc != -ENOENT) {
1061                 CERROR("%s: can't make backup for %s: rc = %d\n",
1062                        mgs->obd_name, logname, rc);
1063                 GOTO(out_free,rc);
1064         }
1065
1066         /* open local log */
1067         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1068         if (rc)
1069                 GOTO(out_restore, rc);
1070
1071         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1072         if (rc)
1073                 GOTO(out_closel, rc);
1074
1075         /* open backup llog */
1076         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1077                        LLOG_OPEN_EXISTS);
1078         if (rc)
1079                 GOTO(out_closel, rc);
1080
1081         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1082         if (rc)
1083                 GOTO(out_close, rc);
1084
1085         if (llog_get_size(backup_llh) <= 1)
1086                 GOTO(out_close, rc = 0);
1087
1088         OBD_ALLOC_PTR(mrul);
1089         if (!mrul)
1090                 GOTO(out_close, rc = -ENOMEM);
1091         /* devname is only needed information to replace UUID records */
1092         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1093         /* parse nids later */
1094         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1095         /* Copy records to this temporary llog */
1096         mrul->temp_llh = orig_llh;
1097
1098         rc = llog_process(env, backup_llh, mgs_replace_handler,
1099                           (void *)mrul, NULL);
1100         OBD_FREE_PTR(mrul);
1101 out_close:
1102         rc2 = llog_close(NULL, backup_llh);
1103         if (!rc)
1104                 rc = rc2;
1105 out_closel:
1106         rc2 = llog_close(NULL, orig_llh);
1107         if (!rc)
1108                 rc = rc2;
1109
1110 out_restore:
1111         if (rc) {
1112                 CERROR("%s: llog should be restored: rc = %d\n",
1113                        mgs->obd_name, rc);
1114                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1115                                   logname);
1116                 if (rc2 < 0)
1117                         CERROR("%s: can't restore backup %s: rc = %d\n",
1118                                mgs->obd_name, logname, rc2);
1119         }
1120
1121 out_free:
1122         OBD_FREE(backup, strlen(backup) + 1);
1123
1124 out_put:
1125         llog_ctxt_put(ctxt);
1126
1127         if (rc)
1128                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1129                        mgs->obd_name, logname, rc);
1130
1131         RETURN(rc);
1132 }
1133
1134 /**
1135  * Parse device name and get file system name and/or device index
1136  *
1137  * \param[in]   devname device name (ex. lustre-MDT0000)
1138  * \param[out]  fsname  file system name(optional)
1139  * \param[out]  index   device index(optional)
1140  *
1141  * \retval 0    success
1142  */
1143 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1144 {
1145         int rc;
1146         ENTRY;
1147
1148         /* Extract fsname */
1149         if (fsname) {
1150                 rc = server_name2fsname(devname, fsname, NULL);
1151                 if (rc < 0) {
1152                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1153                                devname);
1154                         RETURN(-EINVAL);
1155                 }
1156         }
1157
1158         if (index) {
1159                 rc = server_name2index(devname, index, NULL);
1160                 if (rc < 0) {
1161                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1162                                devname);
1163                         RETURN(-EINVAL);
1164                 }
1165         }
1166
1167         RETURN(0);
1168 }
1169
1170 static int only_mgs_is_running(struct obd_device *mgs_obd)
1171 {
1172         /* TDB: Is global variable with devices count exists? */
1173         int num_devices = get_devices_count();
1174         /* osd, MGS and MGC + self_export
1175            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1176         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1177 }
1178
1179 static int name_create_mdt(char **logname, char *fsname, int i)
1180 {
1181         char mdt_index[9];
1182
1183         sprintf(mdt_index, "-MDT%04x", i);
1184         return name_create(logname, fsname, mdt_index);
1185 }
1186
1187 /**
1188  * Replace nids for \a device to \a nids values
1189  *
1190  * \param obd           MGS obd device
1191  * \param devname       nids need to be replaced for this device
1192  * (ex. lustre-OST0000)
1193  * \param nids          nids list (ex. nid1,nid2,nid3)
1194  *
1195  * \retval 0    success
1196  */
1197 int mgs_replace_nids(const struct lu_env *env,
1198                      struct mgs_device *mgs,
1199                      char *devname, char *nids)
1200 {
1201         /* Assume fsname is part of device name */
1202         char fsname[MTI_NAME_MAXLEN];
1203         int rc;
1204         __u32 index;
1205         char *logname;
1206         struct fs_db *fsdb;
1207         unsigned int i;
1208         int conn_state;
1209         struct obd_device *mgs_obd = mgs->mgs_obd;
1210         ENTRY;
1211
1212         /* We can only change NIDs if no other nodes are connected */
1213         spin_lock(&mgs_obd->obd_dev_lock);
1214         conn_state = mgs_obd->obd_no_conn;
1215         mgs_obd->obd_no_conn = 1;
1216         spin_unlock(&mgs_obd->obd_dev_lock);
1217
1218         /* We can not change nids if not only MGS is started */
1219         if (!only_mgs_is_running(mgs_obd)) {
1220                 CERROR("Only MGS is allowed to be started\n");
1221                 GOTO(out, rc = -EINPROGRESS);
1222         }
1223
1224         /* Get fsname and index*/
1225         rc = mgs_parse_devname(devname, fsname, &index);
1226         if (rc)
1227                 GOTO(out, rc);
1228
1229         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1230         if (rc) {
1231                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1232                 GOTO(out, rc);
1233         }
1234
1235         /* Process client llogs */
1236         name_create(&logname, fsname, "-client");
1237         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1238         name_destroy(&logname);
1239         if (rc) {
1240                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1241                        fsname, devname, rc);
1242                 GOTO(out, rc);
1243         }
1244
1245         /* Process MDT llogs */
1246         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1247                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1248                         continue;
1249                 name_create_mdt(&logname, fsname, i);
1250                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1251                 name_destroy(&logname);
1252                 if (rc)
1253                         GOTO(out, rc);
1254         }
1255
1256 out:
1257         spin_lock(&mgs_obd->obd_dev_lock);
1258         mgs_obd->obd_no_conn = conn_state;
1259         spin_unlock(&mgs_obd->obd_dev_lock);
1260
1261         RETURN(rc);
1262 }
1263
1264 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1265                             char *devname, struct lov_desc *desc)
1266 {
1267         struct mgs_thread_info *mgi = mgs_env_info(env);
1268         struct lustre_cfg *lcfg;
1269         int rc;
1270
1271         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1272         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1273         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1274         if (!lcfg)
1275                 return -ENOMEM;
1276         rc = record_lcfg(env, llh, lcfg);
1277
1278         lustre_cfg_free(lcfg);
1279         return rc;
1280 }
1281
1282 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1283                             char *devname, struct lmv_desc *desc)
1284 {
1285         struct mgs_thread_info *mgi = mgs_env_info(env);
1286         struct lustre_cfg *lcfg;
1287         int rc;
1288
1289         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1290         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1291         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1292
1293         rc = record_lcfg(env, llh, lcfg);
1294
1295         lustre_cfg_free(lcfg);
1296         return rc;
1297 }
1298
1299 static inline int record_mdc_add(const struct lu_env *env,
1300                                  struct llog_handle *llh,
1301                                  char *logname, char *mdcuuid,
1302                                  char *mdtuuid, char *index,
1303                                  char *gen)
1304 {
1305         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1306                            mdtuuid,index,gen,mdcuuid);
1307 }
1308
1309 static inline int record_lov_add(const struct lu_env *env,
1310                                  struct llog_handle *llh,
1311                                  char *lov_name, char *ost_uuid,
1312                                  char *index, char *gen)
1313 {
1314         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1315                            ost_uuid, index, gen, 0);
1316 }
1317
1318 static inline int record_mount_opt(const struct lu_env *env,
1319                                    struct llog_handle *llh,
1320                                    char *profile, char *lov_name,
1321                                    char *mdc_name)
1322 {
1323         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1324                            profile,lov_name,mdc_name,0);
1325 }
1326
1327 static int record_marker(const struct lu_env *env,
1328                          struct llog_handle *llh,
1329                          struct fs_db *fsdb, __u32 flags,
1330                          char *tgtname, char *comment)
1331 {
1332         struct mgs_thread_info *mgi = mgs_env_info(env);
1333         struct lustre_cfg *lcfg;
1334         int rc;
1335         int cplen = 0;
1336
1337         if (flags & CM_START)
1338                 fsdb->fsdb_gen++;
1339         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1340         mgi->mgi_marker.cm_flags = flags;
1341         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1342         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1343                         sizeof(mgi->mgi_marker.cm_tgtname));
1344         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1345                 return -E2BIG;
1346         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1347                         sizeof(mgi->mgi_marker.cm_comment));
1348         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1349                 return -E2BIG;
1350         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1351         mgi->mgi_marker.cm_canceltime = 0;
1352         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1353         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1354                             sizeof(mgi->mgi_marker));
1355         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1356         if (!lcfg)
1357                 return -ENOMEM;
1358         rc = record_lcfg(env, llh, lcfg);
1359
1360         lustre_cfg_free(lcfg);
1361         return rc;
1362 }
1363
1364 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1365                             struct llog_handle **llh, char *name)
1366 {
1367         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1368         struct llog_ctxt        *ctxt;
1369         int                      rc = 0;
1370
1371         if (*llh)
1372                 GOTO(out, rc = -EBUSY);
1373
1374         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1375         if (!ctxt)
1376                 GOTO(out, rc = -ENODEV);
1377         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1378
1379         rc = llog_open_create(env, ctxt, llh, NULL, name);
1380         if (rc)
1381                 GOTO(out_ctxt, rc);
1382         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1383         if (rc)
1384                 llog_close(env, *llh);
1385 out_ctxt:
1386         llog_ctxt_put(ctxt);
1387 out:
1388         if (rc) {
1389                 CERROR("%s: can't start log %s: rc = %d\n",
1390                        mgs->mgs_obd->obd_name, name, rc);
1391                 *llh = NULL;
1392         }
1393         RETURN(rc);
1394 }
1395
1396 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1397 {
1398         int rc;
1399
1400         rc = llog_close(env, *llh);
1401         *llh = NULL;
1402
1403         return rc;
1404 }
1405
1406 /******************** config "macros" *********************/
1407
1408 /* write an lcfg directly into a log (with markers) */
1409 static int mgs_write_log_direct(const struct lu_env *env,
1410                                 struct mgs_device *mgs, struct fs_db *fsdb,
1411                                 char *logname, struct lustre_cfg *lcfg,
1412                                 char *devname, char *comment)
1413 {
1414         struct llog_handle *llh = NULL;
1415         int rc;
1416         ENTRY;
1417
1418         if (!lcfg)
1419                 RETURN(-ENOMEM);
1420
1421         rc = record_start_log(env, mgs, &llh, logname);
1422         if (rc)
1423                 RETURN(rc);
1424
1425         /* FIXME These should be a single journal transaction */
1426         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1427         if (rc)
1428                 GOTO(out_end, rc);
1429         rc = record_lcfg(env, llh, lcfg);
1430         if (rc)
1431                 GOTO(out_end, rc);
1432         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1433         if (rc)
1434                 GOTO(out_end, rc);
1435 out_end:
1436         record_end_log(env, &llh);
1437         RETURN(rc);
1438 }
1439
1440 /* write the lcfg in all logs for the given fs */
1441 int mgs_write_log_direct_all(const struct lu_env *env,
1442                              struct mgs_device *mgs,
1443                              struct fs_db *fsdb,
1444                              struct mgs_target_info *mti,
1445                              struct lustre_cfg *lcfg,
1446                              char *devname, char *comment,
1447                              int server_only)
1448 {
1449         cfs_list_t list;
1450         struct mgs_direntry *dirent, *n;
1451         char *fsname = mti->mti_fsname;
1452         char *logname;
1453         int rc = 0, len = strlen(fsname);
1454         ENTRY;
1455
1456         /* We need to set params for any future logs
1457            as well. FIXME Append this file to every new log.
1458            Actually, we should store as params (text), not llogs.  Or
1459            in a database. */
1460         rc = name_create(&logname, fsname, "-params");
1461         if (rc)
1462                 RETURN(rc);
1463         if (mgs_log_is_empty(env, mgs, logname)) {
1464                 struct llog_handle *llh = NULL;
1465                 rc = record_start_log(env, mgs, &llh, logname);
1466                 record_end_log(env, &llh);
1467         }
1468         name_destroy(&logname);
1469         if (rc)
1470                 RETURN(rc);
1471
1472         /* Find all the logs in the CONFIGS directory */
1473         rc = class_dentry_readdir(env, mgs, &list);
1474         if (rc)
1475                 RETURN(rc);
1476
1477         /* Could use fsdb index maps instead of directory listing */
1478         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1479                 cfs_list_del(&dirent->list);
1480                 /* don't write to sptlrpc rule log */
1481                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1482                         goto next;
1483
1484                 /* caller wants write server logs only */
1485                 if (server_only && strstr(dirent->name, "-client") != NULL)
1486                         goto next;
1487
1488                 if (strncmp(fsname, dirent->name, len) == 0) {
1489                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1490                         /* Erase any old settings of this same parameter */
1491                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1492                                         devname, comment, CM_SKIP);
1493                         if (rc < 0)
1494                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1495                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1496                         /* Write the new one */
1497                         if (lcfg) {
1498                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1499                                                           dirent->name,
1500                                                           lcfg, devname,
1501                                                           comment);
1502                                 if (rc)
1503                                         CERROR("%s: writing log %s: rc = %d\n",
1504                                                mgs->mgs_obd->obd_name,
1505                                                dirent->name, rc);
1506                         }
1507                 }
1508 next:
1509                 mgs_direntry_free(dirent);
1510         }
1511
1512         RETURN(rc);
1513 }
1514
1515 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1516                                     struct mgs_device *mgs,
1517                                     struct fs_db *fsdb,
1518                                     struct mgs_target_info *mti,
1519                                     int index, char *logname);
1520 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1521                                     struct mgs_device *mgs,
1522                                     struct fs_db *fsdb,
1523                                     struct mgs_target_info *mti,
1524                                     char *logname, char *suffix, char *lovname,
1525                                     enum lustre_sec_part sec_part, int flags);
1526 static int name_create_mdt_and_lov(char **logname, char **lovname,
1527                                    struct fs_db *fsdb, int i);
1528
1529 static int add_param(char *params, char *key, char *val)
1530 {
1531         char *start = params + strlen(params);
1532         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1533         int keylen = 0;
1534
1535         if (key != NULL)
1536                 keylen = strlen(key);
1537         if (start + 1 + keylen + strlen(val) >= end) {
1538                 CERROR("params are too long: %s %s%s\n",
1539                        params, key != NULL ? key : "", val);
1540                 return -EINVAL;
1541         }
1542
1543         sprintf(start, " %s%s", key != NULL ? key : "", val);
1544         return 0;
1545 }
1546
1547 /**
1548  * Walk through client config log record and convert the related records
1549  * into the target.
1550  **/
1551 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1552                                          struct llog_handle *llh,
1553                                          struct llog_rec_hdr *rec, void *data)
1554 {
1555         struct mgs_device *mgs;
1556         struct obd_device *obd;
1557         struct mgs_target_info *mti, *tmti;
1558         struct fs_db *fsdb;
1559         int cfg_len = rec->lrh_len;
1560         char *cfg_buf = (char*) (rec + 1);
1561         struct lustre_cfg *lcfg;
1562         int rc = 0;
1563         struct llog_handle *mdt_llh = NULL;
1564         static int got_an_osc_or_mdc = 0;
1565         /* 0: not found any osc/mdc;
1566            1: found osc;
1567            2: found mdc;
1568         */
1569         static int last_step = -1;
1570         int cplen = 0;
1571
1572         ENTRY;
1573
1574         mti = ((struct temp_comp*)data)->comp_mti;
1575         tmti = ((struct temp_comp*)data)->comp_tmti;
1576         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1577         obd = ((struct temp_comp *)data)->comp_obd;
1578         mgs = lu2mgs_dev(obd->obd_lu_dev);
1579         LASSERT(mgs);
1580
1581         if (rec->lrh_type != OBD_CFG_REC) {
1582                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1583                 RETURN(-EINVAL);
1584         }
1585
1586         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1587         if (rc) {
1588                 CERROR("Insane cfg\n");
1589                 RETURN(rc);
1590         }
1591
1592         lcfg = (struct lustre_cfg *)cfg_buf;
1593
1594         if (lcfg->lcfg_command == LCFG_MARKER) {
1595                 struct cfg_marker *marker;
1596                 marker = lustre_cfg_buf(lcfg, 1);
1597                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1598                     (marker->cm_flags & CM_START) &&
1599                      !(marker->cm_flags & CM_SKIP)) {
1600                         got_an_osc_or_mdc = 1;
1601                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1602                                         sizeof(tmti->mti_svname));
1603                         if (cplen >= sizeof(tmti->mti_svname))
1604                                 RETURN(-E2BIG);
1605                         rc = record_start_log(env, mgs, &mdt_llh,
1606                                               mti->mti_svname);
1607                         if (rc)
1608                                 RETURN(rc);
1609                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1610                                            mti->mti_svname, "add osc(copied)");
1611                         record_end_log(env, &mdt_llh);
1612                         last_step = marker->cm_step;
1613                         RETURN(rc);
1614                 }
1615                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1616                     (marker->cm_flags & CM_END) &&
1617                      !(marker->cm_flags & CM_SKIP)) {
1618                         LASSERT(last_step == marker->cm_step);
1619                         last_step = -1;
1620                         got_an_osc_or_mdc = 0;
1621                         memset(tmti, 0, sizeof(*tmti));
1622                         rc = record_start_log(env, mgs, &mdt_llh,
1623                                               mti->mti_svname);
1624                         if (rc)
1625                                 RETURN(rc);
1626                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1627                                            mti->mti_svname, "add osc(copied)");
1628                         record_end_log(env, &mdt_llh);
1629                         RETURN(rc);
1630                 }
1631                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1632                     (marker->cm_flags & CM_START) &&
1633                      !(marker->cm_flags & CM_SKIP)) {
1634                         got_an_osc_or_mdc = 2;
1635                         last_step = marker->cm_step;
1636                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1637                                strlen(marker->cm_tgtname));
1638
1639                         RETURN(rc);
1640                 }
1641                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1642                     (marker->cm_flags & CM_END) &&
1643                      !(marker->cm_flags & CM_SKIP)) {
1644                         LASSERT(last_step == marker->cm_step);
1645                         last_step = -1;
1646                         got_an_osc_or_mdc = 0;
1647                         memset(tmti, 0, sizeof(*tmti));
1648                         RETURN(rc);
1649                 }
1650         }
1651
1652         if (got_an_osc_or_mdc == 0 || last_step < 0)
1653                 RETURN(rc);
1654
1655         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1656                 uint64_t nodenid = lcfg->lcfg_nid;
1657
1658                 if (strlen(tmti->mti_uuid) == 0) {
1659                         /* target uuid not set, this config record is before
1660                          * LCFG_SETUP, this nid is one of target node nid.
1661                          */
1662                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1663                         tmti->mti_nid_count++;
1664                 } else {
1665                         /* failover node nid */
1666                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1667                                        libcfs_nid2str(nodenid));
1668                 }
1669
1670                 RETURN(rc);
1671         }
1672
1673         if (lcfg->lcfg_command == LCFG_SETUP) {
1674                 char *target;
1675
1676                 target = lustre_cfg_string(lcfg, 1);
1677                 memcpy(tmti->mti_uuid, target, strlen(target));
1678                 RETURN(rc);
1679         }
1680
1681         /* ignore client side sptlrpc_conf_log */
1682         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1683                 RETURN(rc);
1684
1685         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1686                 int index;
1687
1688                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1689                         RETURN (-EINVAL);
1690
1691                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1692                        strlen(mti->mti_fsname));
1693                 tmti->mti_stripe_index = index;
1694
1695                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1696                                               mti->mti_stripe_index,
1697                                               mti->mti_svname);
1698                 memset(tmti, 0, sizeof(*tmti));
1699                 RETURN(rc);
1700         }
1701
1702         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1703                 int index;
1704                 char mdt_index[9];
1705                 char *logname, *lovname;
1706
1707                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1708                                              mti->mti_stripe_index);
1709                 if (rc)
1710                         RETURN(rc);
1711                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1712
1713                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1714                         name_destroy(&logname);
1715                         name_destroy(&lovname);
1716                         RETURN(-EINVAL);
1717                 }
1718
1719                 tmti->mti_stripe_index = index;
1720                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1721                                          mdt_index, lovname,
1722                                          LUSTRE_SP_MDT, 0);
1723                 name_destroy(&logname);
1724                 name_destroy(&lovname);
1725                 RETURN(rc);
1726         }
1727         RETURN(rc);
1728 }
1729
1730 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1731 /* stealed from mgs_get_fsdb_from_llog*/
1732 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1733                                               struct mgs_device *mgs,
1734                                               char *client_name,
1735                                               struct temp_comp* comp)
1736 {
1737         struct llog_handle *loghandle;
1738         struct mgs_target_info *tmti;
1739         struct llog_ctxt *ctxt;
1740         int rc;
1741
1742         ENTRY;
1743
1744         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1745         LASSERT(ctxt != NULL);
1746
1747         OBD_ALLOC_PTR(tmti);
1748         if (tmti == NULL)
1749                 GOTO(out_ctxt, rc = -ENOMEM);
1750
1751         comp->comp_tmti = tmti;
1752         comp->comp_obd = mgs->mgs_obd;
1753
1754         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1755                        LLOG_OPEN_EXISTS);
1756         if (rc < 0) {
1757                 if (rc == -ENOENT)
1758                         rc = 0;
1759                 GOTO(out_pop, rc);
1760         }
1761
1762         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1763         if (rc)
1764                 GOTO(out_close, rc);
1765
1766         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1767                                   (void *)comp, NULL, false);
1768         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1769 out_close:
1770         llog_close(env, loghandle);
1771 out_pop:
1772         OBD_FREE_PTR(tmti);
1773 out_ctxt:
1774         llog_ctxt_put(ctxt);
1775         RETURN(rc);
1776 }
1777
1778 /* lmv is the second thing for client logs */
1779 /* copied from mgs_write_log_lov. Please refer to that.  */
1780 static int mgs_write_log_lmv(const struct lu_env *env,
1781                              struct mgs_device *mgs,
1782                              struct fs_db *fsdb,
1783                              struct mgs_target_info *mti,
1784                              char *logname, char *lmvname)
1785 {
1786         struct llog_handle *llh = NULL;
1787         struct lmv_desc *lmvdesc;
1788         char *uuid;
1789         int rc = 0;
1790         ENTRY;
1791
1792         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1793
1794         OBD_ALLOC_PTR(lmvdesc);
1795         if (lmvdesc == NULL)
1796                 RETURN(-ENOMEM);
1797         lmvdesc->ld_active_tgt_count = 0;
1798         lmvdesc->ld_tgt_count = 0;
1799         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1800         uuid = (char *)lmvdesc->ld_uuid.uuid;
1801
1802         rc = record_start_log(env, mgs, &llh, logname);
1803         if (rc)
1804                 GOTO(out_free, rc);
1805         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1806         if (rc)
1807                 GOTO(out_end, rc);
1808         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1809         if (rc)
1810                 GOTO(out_end, rc);
1811         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1812         if (rc)
1813                 GOTO(out_end, rc);
1814         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1815         if (rc)
1816                 GOTO(out_end, rc);
1817 out_end:
1818         record_end_log(env, &llh);
1819 out_free:
1820         OBD_FREE_PTR(lmvdesc);
1821         RETURN(rc);
1822 }
1823
1824 /* lov is the first thing in the mdt and client logs */
1825 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1826                              struct fs_db *fsdb, struct mgs_target_info *mti,
1827                              char *logname, char *lovname)
1828 {
1829         struct llog_handle *llh = NULL;
1830         struct lov_desc *lovdesc;
1831         char *uuid;
1832         int rc = 0;
1833         ENTRY;
1834
1835         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1836
1837         /*
1838         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1839         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1840               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1841         */
1842
1843         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1844         OBD_ALLOC_PTR(lovdesc);
1845         if (lovdesc == NULL)
1846                 RETURN(-ENOMEM);
1847         lovdesc->ld_magic = LOV_DESC_MAGIC;
1848         lovdesc->ld_tgt_count = 0;
1849         /* Defaults.  Can be changed later by lcfg config_param */
1850         lovdesc->ld_default_stripe_count = 1;
1851         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1852         lovdesc->ld_default_stripe_size = 1024 * 1024;
1853         lovdesc->ld_default_stripe_offset = -1;
1854         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1855         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1856         /* can these be the same? */
1857         uuid = (char *)lovdesc->ld_uuid.uuid;
1858
1859         /* This should always be the first entry in a log.
1860         rc = mgs_clear_log(obd, logname); */
1861         rc = record_start_log(env, mgs, &llh, logname);
1862         if (rc)
1863                 GOTO(out_free, rc);
1864         /* FIXME these should be a single journal transaction */
1865         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1866         if (rc)
1867                 GOTO(out_end, rc);
1868         rc = record_attach(env, llh, lovname, "lov", uuid);
1869         if (rc)
1870                 GOTO(out_end, rc);
1871         rc = record_lov_setup(env, llh, lovname, lovdesc);
1872         if (rc)
1873                 GOTO(out_end, rc);
1874         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1875         if (rc)
1876                 GOTO(out_end, rc);
1877         EXIT;
1878 out_end:
1879         record_end_log(env, &llh);
1880 out_free:
1881         OBD_FREE_PTR(lovdesc);
1882         return rc;
1883 }
1884
1885 /* add failnids to open log */
1886 static int mgs_write_log_failnids(const struct lu_env *env,
1887                                   struct mgs_target_info *mti,
1888                                   struct llog_handle *llh,
1889                                   char *cliname)
1890 {
1891         char *failnodeuuid = NULL;
1892         char *ptr = mti->mti_params;
1893         lnet_nid_t nid;
1894         int rc = 0;
1895
1896         /*
1897         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1898         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1899         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1900         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1901         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1902         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1903         */
1904
1905         /* Pull failnid info out of params string */
1906         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1907                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1908                         if (failnodeuuid == NULL) {
1909                                 /* We don't know the failover node name,
1910                                    so just use the first nid as the uuid */
1911                                 rc = name_create(&failnodeuuid,
1912                                                  libcfs_nid2str(nid), "");
1913                                 if (rc)
1914                                         return rc;
1915                         }
1916                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1917                                "client %s\n", libcfs_nid2str(nid),
1918                                failnodeuuid, cliname);
1919                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1920                 }
1921                 if (failnodeuuid)
1922                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1923         }
1924
1925         name_destroy(&failnodeuuid);
1926         return rc;
1927 }
1928
1929 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1930                                     struct mgs_device *mgs,
1931                                     struct fs_db *fsdb,
1932                                     struct mgs_target_info *mti,
1933                                     char *logname, char *lmvname)
1934 {
1935         struct llog_handle *llh = NULL;
1936         char *mdcname = NULL;
1937         char *nodeuuid = NULL;
1938         char *mdcuuid = NULL;
1939         char *lmvuuid = NULL;
1940         char index[6];
1941         int i, rc;
1942         ENTRY;
1943
1944         if (mgs_log_is_empty(env, mgs, logname)) {
1945                 CERROR("log is empty! Logical error\n");
1946                 RETURN(-EINVAL);
1947         }
1948
1949         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1950                mti->mti_svname, logname, lmvname);
1951
1952         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1953         if (rc)
1954                 RETURN(rc);
1955         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1956         if (rc)
1957                 GOTO(out_free, rc);
1958         rc = name_create(&mdcuuid, mdcname, "_UUID");
1959         if (rc)
1960                 GOTO(out_free, rc);
1961         rc = name_create(&lmvuuid, lmvname, "_UUID");
1962         if (rc)
1963                 GOTO(out_free, rc);
1964
1965         rc = record_start_log(env, mgs, &llh, logname);
1966         if (rc)
1967                 GOTO(out_free, rc);
1968         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1969                            "add mdc");
1970         if (rc)
1971                 GOTO(out_end, rc);
1972         for (i = 0; i < mti->mti_nid_count; i++) {
1973                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1974                        libcfs_nid2str(mti->mti_nids[i]));
1975
1976                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1977                 if (rc)
1978                         GOTO(out_end, rc);
1979         }
1980
1981         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1982         if (rc)
1983                 GOTO(out_end, rc);
1984         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1985         if (rc)
1986                 GOTO(out_end, rc);
1987         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1988         if (rc)
1989                 GOTO(out_end, rc);
1990         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1991         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1992                             index, "1");
1993         if (rc)
1994                 GOTO(out_end, rc);
1995         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1996                            "add mdc");
1997         if (rc)
1998                 GOTO(out_end, rc);
1999 out_end:
2000         record_end_log(env, &llh);
2001 out_free:
2002         name_destroy(&lmvuuid);
2003         name_destroy(&mdcuuid);
2004         name_destroy(&mdcname);
2005         name_destroy(&nodeuuid);
2006         RETURN(rc);
2007 }
2008
2009 static inline int name_create_lov(char **lovname, char *mdtname,
2010                                   struct fs_db *fsdb, int index)
2011 {
2012         /* COMPAT_180 */
2013         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2014                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2015         else
2016                 return name_create(lovname, mdtname, "-mdtlov");
2017 }
2018
2019 static int name_create_mdt_and_lov(char **logname, char **lovname,
2020                                    struct fs_db *fsdb, int i)
2021 {
2022         int rc;
2023
2024         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2025         if (rc)
2026                 return rc;
2027         /* COMPAT_180 */
2028         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2029                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2030         else
2031                 rc = name_create(lovname, *logname, "-mdtlov");
2032         if (rc) {
2033                 name_destroy(logname);
2034                 *logname = NULL;
2035         }
2036         return rc;
2037 }
2038
2039 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2040                                       struct fs_db *fsdb, int i)
2041 {
2042         char suffix[16];
2043
2044         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2045                 sprintf(suffix, "-osc");
2046         else
2047                 sprintf(suffix, "-osc-MDT%04x", i);
2048         return name_create(oscname, ostname, suffix);
2049 }
2050
2051 /* add new mdc to already existent MDS */
2052 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2053                                     struct mgs_device *mgs,
2054                                     struct fs_db *fsdb,
2055                                     struct mgs_target_info *mti,
2056                                     int mdt_index, char *logname)
2057 {
2058         struct llog_handle      *llh = NULL;
2059         char    *nodeuuid = NULL;
2060         char    *ospname = NULL;
2061         char    *lovuuid = NULL;
2062         char    *mdtuuid = NULL;
2063         char    *svname = NULL;
2064         char    *mdtname = NULL;
2065         char    *lovname = NULL;
2066         char    index_str[16];
2067         int     i, rc;
2068
2069         ENTRY;
2070         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2071                 CERROR("log is empty! Logical error\n");
2072                 RETURN (-EINVAL);
2073         }
2074
2075         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2076                logname);
2077
2078         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2079         if (rc)
2080                 RETURN(rc);
2081
2082         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2083         if (rc)
2084                 GOTO(out_destory, rc);
2085
2086         rc = name_create(&svname, mdtname, "-osp");
2087         if (rc)
2088                 GOTO(out_destory, rc);
2089
2090         sprintf(index_str, "-MDT%04x", mdt_index);
2091         rc = name_create(&ospname, svname, index_str);
2092         if (rc)
2093                 GOTO(out_destory, rc);
2094
2095         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2096         if (rc)
2097                 GOTO(out_destory, rc);
2098
2099         rc = name_create(&lovuuid, lovname, "_UUID");
2100         if (rc)
2101                 GOTO(out_destory, rc);
2102
2103         rc = name_create(&mdtuuid, mdtname, "_UUID");
2104         if (rc)
2105                 GOTO(out_destory, rc);
2106
2107         rc = record_start_log(env, mgs, &llh, logname);
2108         if (rc)
2109                 GOTO(out_destory, rc);
2110
2111         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2112                            "add osp");
2113         if (rc)
2114                 GOTO(out_destory, rc);
2115
2116         for (i = 0; i < mti->mti_nid_count; i++) {
2117                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2118                        libcfs_nid2str(mti->mti_nids[i]));
2119                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2120                 if (rc)
2121                         GOTO(out_end, rc);
2122         }
2123
2124         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2125         if (rc)
2126                 GOTO(out_end, rc);
2127
2128         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2129                           NULL, NULL);
2130         if (rc)
2131                 GOTO(out_end, rc);
2132
2133         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2134         if (rc)
2135                 GOTO(out_end, rc);
2136
2137         /* Add mdc(osp) to lod */
2138         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2139                  mti->mti_stripe_index);
2140         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2141                          index_str, "1", NULL);
2142         if (rc)
2143                 GOTO(out_end, rc);
2144
2145         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2146         if (rc)
2147                 GOTO(out_end, rc);
2148
2149 out_end:
2150         record_end_log(env, &llh);
2151
2152 out_destory:
2153         name_destroy(&mdtuuid);
2154         name_destroy(&lovuuid);
2155         name_destroy(&lovname);
2156         name_destroy(&ospname);
2157         name_destroy(&svname);
2158         name_destroy(&nodeuuid);
2159         name_destroy(&mdtname);
2160         RETURN(rc);
2161 }
2162
2163 static int mgs_write_log_mdt0(const struct lu_env *env,
2164                               struct mgs_device *mgs,
2165                               struct fs_db *fsdb,
2166                               struct mgs_target_info *mti)
2167 {
2168         char *log = mti->mti_svname;
2169         struct llog_handle *llh = NULL;
2170         char *uuid, *lovname;
2171         char mdt_index[6];
2172         char *ptr = mti->mti_params;
2173         int rc = 0, failout = 0;
2174         ENTRY;
2175
2176         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2177         if (uuid == NULL)
2178                 RETURN(-ENOMEM);
2179
2180         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2181                 failout = (strncmp(ptr, "failout", 7) == 0);
2182
2183         rc = name_create(&lovname, log, "-mdtlov");
2184         if (rc)
2185                 GOTO(out_free, rc);
2186         if (mgs_log_is_empty(env, mgs, log)) {
2187                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2188                 if (rc)
2189                         GOTO(out_lod, rc);
2190         }
2191
2192         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2193
2194         rc = record_start_log(env, mgs, &llh, log);
2195         if (rc)
2196                 GOTO(out_lod, rc);
2197
2198         /* add MDT itself */
2199
2200         /* FIXME this whole fn should be a single journal transaction */
2201         sprintf(uuid, "%s_UUID", log);
2202         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2203         if (rc)
2204                 GOTO(out_lod, rc);
2205         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2206         if (rc)
2207                 GOTO(out_end, rc);
2208         rc = record_mount_opt(env, llh, log, lovname, NULL);
2209         if (rc)
2210                 GOTO(out_end, rc);
2211         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2212                         failout ? "n" : "f");
2213         if (rc)
2214                 GOTO(out_end, rc);
2215         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2216         if (rc)
2217                 GOTO(out_end, rc);
2218 out_end:
2219         record_end_log(env, &llh);
2220 out_lod:
2221         name_destroy(&lovname);
2222 out_free:
2223         OBD_FREE(uuid, sizeof(struct obd_uuid));
2224         RETURN(rc);
2225 }
2226
2227 /* envelope method for all layers log */
2228 static int mgs_write_log_mdt(const struct lu_env *env,
2229                              struct mgs_device *mgs,
2230                              struct fs_db *fsdb,
2231                              struct mgs_target_info *mti)
2232 {
2233         struct mgs_thread_info *mgi = mgs_env_info(env);
2234         struct llog_handle *llh = NULL;
2235         char *cliname;
2236         int rc, i = 0;
2237         ENTRY;
2238
2239         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2240
2241         if (mti->mti_uuid[0] == '\0') {
2242                 /* Make up our own uuid */
2243                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2244                          "%s_UUID", mti->mti_svname);
2245         }
2246
2247         /* add mdt */
2248         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2249         if (rc)
2250                 RETURN(rc);
2251         /* Append the mdt info to the client log */
2252         rc = name_create(&cliname, mti->mti_fsname, "-client");
2253         if (rc)
2254                 RETURN(rc);
2255
2256         if (mgs_log_is_empty(env, mgs, cliname)) {
2257                 /* Start client log */
2258                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2259                                        fsdb->fsdb_clilov);
2260                 if (rc)
2261                         GOTO(out_free, rc);
2262                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2263                                        fsdb->fsdb_clilmv);
2264                 if (rc)
2265                         GOTO(out_free, rc);
2266         }
2267
2268         /*
2269         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2270         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2271         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2272         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2273         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2274         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2275         */
2276
2277                 /* copy client info about lov/lmv */
2278                 mgi->mgi_comp.comp_mti = mti;
2279                 mgi->mgi_comp.comp_fsdb = fsdb;
2280
2281                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2282                                                         &mgi->mgi_comp);
2283                 if (rc)
2284                         GOTO(out_free, rc);
2285                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2286                                               fsdb->fsdb_clilmv);
2287                 if (rc)
2288                         GOTO(out_free, rc);
2289
2290                 /* add mountopts */
2291                 rc = record_start_log(env, mgs, &llh, cliname);
2292                 if (rc)
2293                         GOTO(out_free, rc);
2294
2295                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2296                                    "mount opts");
2297                 if (rc)
2298                         GOTO(out_end, rc);
2299                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2300                                       fsdb->fsdb_clilmv);
2301                 if (rc)
2302                         GOTO(out_end, rc);
2303                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2304                                    "mount opts");
2305
2306         if (rc)
2307                 GOTO(out_end, rc);
2308
2309         /* for_all_existing_mdt except current one */
2310         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2311                 if (i !=  mti->mti_stripe_index &&
2312                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2313                         char *logname;
2314
2315                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2316                         if (rc)
2317                                 GOTO(out_end, rc);
2318
2319                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2320                                                       i, logname);
2321                         name_destroy(&logname);
2322                         if (rc)
2323                                 GOTO(out_end, rc);
2324                 }
2325         }
2326 out_end:
2327         record_end_log(env, &llh);
2328 out_free:
2329         name_destroy(&cliname);
2330         RETURN(rc);
2331 }
2332
2333 /* Add the ost info to the client/mdt lov */
2334 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2335                                     struct mgs_device *mgs, struct fs_db *fsdb,
2336                                     struct mgs_target_info *mti,
2337                                     char *logname, char *suffix, char *lovname,
2338                                     enum lustre_sec_part sec_part, int flags)
2339 {
2340         struct llog_handle *llh = NULL;
2341         char *nodeuuid = NULL;
2342         char *oscname = NULL;
2343         char *oscuuid = NULL;
2344         char *lovuuid = NULL;
2345         char *svname = NULL;
2346         char index[6];
2347         int i, rc;
2348
2349         ENTRY;
2350         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2351                mti->mti_svname, logname);
2352
2353         if (mgs_log_is_empty(env, mgs, logname)) {
2354                 CERROR("log is empty! Logical error\n");
2355                 RETURN (-EINVAL);
2356         }
2357
2358         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2359         if (rc)
2360                 RETURN(rc);
2361         rc = name_create(&svname, mti->mti_svname, "-osc");
2362         if (rc)
2363                 GOTO(out_free, rc);
2364
2365         /* for the system upgraded from old 1.8, keep using the old osc naming
2366          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2367         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2368                 rc = name_create(&oscname, svname, "");
2369         else
2370                 rc = name_create(&oscname, svname, suffix);
2371         if (rc)
2372                 GOTO(out_free, rc);
2373
2374         rc = name_create(&oscuuid, oscname, "_UUID");
2375         if (rc)
2376                 GOTO(out_free, rc);
2377         rc = name_create(&lovuuid, lovname, "_UUID");
2378         if (rc)
2379                 GOTO(out_free, rc);
2380
2381
2382         /*
2383         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2384         multihomed (#4)
2385         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2386         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2387         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2388         failover (#6,7)
2389         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2390         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2391         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2392         */
2393
2394         rc = record_start_log(env, mgs, &llh, logname);
2395         if (rc)
2396                 GOTO(out_free, rc);
2397
2398         /* FIXME these should be a single journal transaction */
2399         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2400                            "add osc");
2401         if (rc)
2402                 GOTO(out_end, rc);
2403
2404         /* NB: don't change record order, because upon MDT steal OSC config
2405          * from client, it treats all nids before LCFG_SETUP as target nids
2406          * (multiple interfaces), while nids after as failover node nids.
2407          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2408          */
2409         for (i = 0; i < mti->mti_nid_count; i++) {
2410                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2411                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2412                 if (rc)
2413                         GOTO(out_end, rc);
2414         }
2415         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2416         if (rc)
2417                 GOTO(out_end, rc);
2418         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2419         if (rc)
2420                 GOTO(out_end, rc);
2421         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2422         if (rc)
2423                 GOTO(out_end, rc);
2424
2425         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2426
2427         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2428         if (rc)
2429                 GOTO(out_end, rc);
2430         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2431                            "add osc");
2432         if (rc)
2433                 GOTO(out_end, rc);
2434 out_end:
2435         record_end_log(env, &llh);
2436 out_free:
2437         name_destroy(&lovuuid);
2438         name_destroy(&oscuuid);
2439         name_destroy(&oscname);
2440         name_destroy(&svname);
2441         name_destroy(&nodeuuid);
2442         RETURN(rc);
2443 }
2444
2445 static int mgs_write_log_ost(const struct lu_env *env,
2446                              struct mgs_device *mgs, struct fs_db *fsdb,
2447                              struct mgs_target_info *mti)
2448 {
2449         struct llog_handle *llh = NULL;
2450         char *logname, *lovname;
2451         char *ptr = mti->mti_params;
2452         int rc, flags = 0, failout = 0, i;
2453         ENTRY;
2454
2455         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2456
2457         /* The ost startup log */
2458
2459         /* If the ost log already exists, that means that someone reformatted
2460            the ost and it called target_add again. */
2461         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2462                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2463                                    "exists, yet the server claims it never "
2464                                    "registered. It may have been reformatted, "
2465                                    "or the index changed. writeconf the MDT to "
2466                                    "regenerate all logs.\n", mti->mti_svname);
2467                 RETURN(-EALREADY);
2468         }
2469
2470         /*
2471         attach obdfilter ost1 ost1_UUID
2472         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2473         */
2474         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2475                 failout = (strncmp(ptr, "failout", 7) == 0);
2476         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2477         if (rc)
2478                 RETURN(rc);
2479         /* FIXME these should be a single journal transaction */
2480         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2481         if (rc)
2482                 GOTO(out_end, rc);
2483         if (*mti->mti_uuid == '\0')
2484                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2485                          "%s_UUID", mti->mti_svname);
2486         rc = record_attach(env, llh, mti->mti_svname,
2487                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2488         if (rc)
2489                 GOTO(out_end, rc);
2490         rc = record_setup(env, llh, mti->mti_svname,
2491                           "dev"/*ignored*/, "type"/*ignored*/,
2492                           failout ? "n" : "f", 0/*options*/);
2493         if (rc)
2494                 GOTO(out_end, rc);
2495         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2496         if (rc)
2497                 GOTO(out_end, rc);
2498 out_end:
2499         record_end_log(env, &llh);
2500         if (rc)
2501                 RETURN(rc);
2502         /* We also have to update the other logs where this osc is part of
2503            the lov */
2504
2505         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2506                 /* If we're upgrading, the old mdt log already has our
2507                    entry. Let's do a fake one for fun. */
2508                 /* Note that we can't add any new failnids, since we don't
2509                    know the old osc names. */
2510                 flags = CM_SKIP | CM_UPGRADE146;
2511
2512         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2513                 /* If the update flag isn't set, don't update client/mdt
2514                    logs. */
2515                 flags |= CM_SKIP;
2516                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2517                               "the MDT first to regenerate it.\n",
2518                               mti->mti_svname);
2519         }
2520
2521         /* Add ost to all MDT lov defs */
2522         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2523                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2524                         char mdt_index[9];
2525
2526                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2527                                                      i);
2528                         if (rc)
2529                                 RETURN(rc);
2530                         sprintf(mdt_index, "-MDT%04x", i);
2531                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2532                                                       logname, mdt_index,
2533                                                       lovname, LUSTRE_SP_MDT,
2534                                                       flags);
2535                         name_destroy(&logname);
2536                         name_destroy(&lovname);
2537                         if (rc)
2538                                 RETURN(rc);
2539                 }
2540         }
2541
2542         /* Append ost info to the client log */
2543         rc = name_create(&logname, mti->mti_fsname, "-client");
2544         if (rc)
2545                 RETURN(rc);
2546         if (mgs_log_is_empty(env, mgs, logname)) {
2547                 /* Start client log */
2548                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2549                                        fsdb->fsdb_clilov);
2550                 if (rc)
2551                         GOTO(out_free, rc);
2552                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2553                                        fsdb->fsdb_clilmv);
2554                 if (rc)
2555                         GOTO(out_free, rc);
2556         }
2557         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2558                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2559 out_free:
2560         name_destroy(&logname);
2561         RETURN(rc);
2562 }
2563
2564 static __inline__ int mgs_param_empty(char *ptr)
2565 {
2566         char *tmp;
2567
2568         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2569                 return 1;
2570         return 0;
2571 }
2572
2573 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2574                                           struct mgs_device *mgs,
2575                                           struct fs_db *fsdb,
2576                                           struct mgs_target_info *mti,
2577                                           char *logname, char *cliname)
2578 {
2579         int rc;
2580         struct llog_handle *llh = NULL;
2581
2582         if (mgs_param_empty(mti->mti_params)) {
2583                 /* Remove _all_ failnids */
2584                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2585                                 mti->mti_svname, "add failnid", CM_SKIP);
2586                 return rc < 0 ? rc : 0;
2587         }
2588
2589         /* Otherwise failover nids are additive */
2590         rc = record_start_log(env, mgs, &llh, logname);
2591         if (rc)
2592                 return rc;
2593                 /* FIXME this should be a single journal transaction */
2594         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2595                            "add failnid");
2596         if (rc)
2597                 goto out_end;
2598         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2599         if (rc)
2600                 goto out_end;
2601         rc = record_marker(env, llh, fsdb, CM_END,
2602                            mti->mti_svname, "add failnid");
2603 out_end:
2604         record_end_log(env, &llh);
2605         return rc;
2606 }
2607
2608
2609 /* Add additional failnids to an existing log.
2610    The mdc/osc must have been added to logs first */
2611 /* tcp nids must be in dotted-quad ascii -
2612    we can't resolve hostnames from the kernel. */
2613 static int mgs_write_log_add_failnid(const struct lu_env *env,
2614                                      struct mgs_device *mgs,
2615                                      struct fs_db *fsdb,
2616                                      struct mgs_target_info *mti)
2617 {
2618         char *logname, *cliname;
2619         int rc;
2620         ENTRY;
2621
2622         /* FIXME we currently can't erase the failnids
2623          * given when a target first registers, since they aren't part of
2624          * an "add uuid" stanza */
2625
2626         /* Verify that we know about this target */
2627         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2628                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2629                                    "yet. It must be started before failnids "
2630                                    "can be added.\n", mti->mti_svname);
2631                 RETURN(-ENOENT);
2632         }
2633
2634         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2635         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2636                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2637         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2638                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2639         } else {
2640                 RETURN(-EINVAL);
2641         }
2642         if (rc)
2643                 RETURN(rc);
2644         /* Add failover nids to the client log */
2645         rc = name_create(&logname, mti->mti_fsname, "-client");
2646         if (rc) {
2647                 name_destroy(&cliname);
2648                 RETURN(rc);
2649         }
2650         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2651         name_destroy(&logname);
2652         name_destroy(&cliname);
2653         if (rc)
2654                 RETURN(rc);
2655
2656         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2657                 /* Add OST failover nids to the MDT logs as well */
2658                 int i;
2659
2660                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2661                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2662                                 continue;
2663                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2664                         if (rc)
2665                                 RETURN(rc);
2666                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2667                                                  fsdb, i);
2668                         if (rc) {
2669                                 name_destroy(&logname);
2670                                 RETURN(rc);
2671                         }
2672                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2673                                                             mti, logname,
2674                                                             cliname);
2675                         name_destroy(&cliname);
2676                         name_destroy(&logname);
2677                         if (rc)
2678                                 RETURN(rc);
2679                 }
2680         }
2681
2682         RETURN(rc);
2683 }
2684
2685 static int mgs_wlp_lcfg(const struct lu_env *env,
2686                         struct mgs_device *mgs, struct fs_db *fsdb,
2687                         struct mgs_target_info *mti,
2688                         char *logname, struct lustre_cfg_bufs *bufs,
2689                         char *tgtname, char *ptr)
2690 {
2691         char comment[MTI_NAME_MAXLEN];
2692         char *tmp;
2693         struct lustre_cfg *lcfg;
2694         int rc, del;
2695
2696         /* Erase any old settings of this same parameter */
2697         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2698         comment[MTI_NAME_MAXLEN - 1] = 0;
2699         /* But don't try to match the value. */
2700         if ((tmp = strchr(comment, '=')))
2701             *tmp = 0;
2702         /* FIXME we should skip settings that are the same as old values */
2703         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2704         if (rc < 0)
2705                 return rc;
2706         del = mgs_param_empty(ptr);
2707
2708         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2709                       "Sett" : "Modify", tgtname, comment, logname);
2710         if (del)
2711                 return rc;
2712
2713         lustre_cfg_bufs_reset(bufs, tgtname);
2714         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2715         if (mti->mti_flags & LDD_F_PARAM2)
2716                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2717
2718         lcfg = lustre_cfg_new((mti->mti_flags & LDD_F_PARAM2) ?
2719                               LCFG_SET_PARAM : LCFG_PARAM, bufs);
2720
2721         if (!lcfg)
2722                 return -ENOMEM;
2723         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2724         lustre_cfg_free(lcfg);
2725         return rc;
2726 }
2727
2728 static int mgs_write_log_param2(const struct lu_env *env,
2729                                 struct mgs_device *mgs,
2730                                 struct fs_db *fsdb,
2731                                 struct mgs_target_info *mti, char *ptr)
2732 {
2733         struct lustre_cfg_bufs  bufs;
2734         int                     rc = 0;
2735         ENTRY;
2736
2737         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2738         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2739                           mti->mti_svname, ptr);
2740
2741         RETURN(rc);
2742 }
2743
2744 /* write global variable settings into log */
2745 static int mgs_write_log_sys(const struct lu_env *env,
2746                              struct mgs_device *mgs, struct fs_db *fsdb,
2747                              struct mgs_target_info *mti, char *sys, char *ptr)
2748 {
2749         struct mgs_thread_info *mgi = mgs_env_info(env);
2750         struct lustre_cfg *lcfg;
2751         char *tmp, sep;
2752         int rc, cmd, convert = 1;
2753
2754         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2755                 cmd = LCFG_SET_TIMEOUT;
2756         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2757                 cmd = LCFG_SET_LDLM_TIMEOUT;
2758         /* Check for known params here so we can return error to lctl */
2759         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2760                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2761                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2762                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2763                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2764                 cmd = LCFG_PARAM;
2765         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2766                 convert = 0; /* Don't convert string value to integer */
2767                 cmd = LCFG_PARAM;
2768         } else {
2769                 return -EINVAL;
2770         }
2771
2772         if (mgs_param_empty(ptr))
2773                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2774         else
2775                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2776
2777         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2778         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2779         if (!convert && *tmp != '\0')
2780                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2781         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2782         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2783         /* truncate the comment to the parameter name */
2784         ptr = tmp - 1;
2785         sep = *ptr;
2786         *ptr = '\0';
2787         /* modify all servers and clients */
2788         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2789                                       *tmp == '\0' ? NULL : lcfg,
2790                                       mti->mti_fsname, sys, 0);
2791         if (rc == 0 && *tmp != '\0') {
2792                 switch (cmd) {
2793                 case LCFG_SET_TIMEOUT:
2794                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2795                                 class_process_config(lcfg);
2796                         break;
2797                 case LCFG_SET_LDLM_TIMEOUT:
2798                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2799                                 class_process_config(lcfg);
2800                         break;
2801                 default:
2802                         break;
2803                 }
2804         }
2805         *ptr = sep;
2806         lustre_cfg_free(lcfg);
2807         return rc;
2808 }
2809
2810 /* write quota settings into log */
2811 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2812                                struct fs_db *fsdb, struct mgs_target_info *mti,
2813                                char *quota, char *ptr)
2814 {
2815         struct mgs_thread_info  *mgi = mgs_env_info(env);
2816         struct lustre_cfg       *lcfg;
2817         char                    *tmp;
2818         char                     sep;
2819         int                      rc, cmd = LCFG_PARAM;
2820
2821         /* support only 'meta' and 'data' pools so far */
2822         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2823             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2824                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2825                        "& quota.ost are)\n", ptr);
2826                 return -EINVAL;
2827         }
2828
2829         if (*tmp == '\0') {
2830                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2831         } else {
2832                 CDEBUG(D_MGS, "global '%s'\n", quota);
2833
2834                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2835                     strcmp(tmp, "none") != 0) {
2836                         CERROR("enable option(%s) isn't supported\n", tmp);
2837                         return -EINVAL;
2838                 }
2839         }
2840
2841         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2842         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2843         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2844         /* truncate the comment to the parameter name */
2845         ptr = tmp - 1;
2846         sep = *ptr;
2847         *ptr = '\0';
2848
2849         /* XXX we duplicated quota enable information in all server
2850          *     config logs, it should be moved to a separate config
2851          *     log once we cleanup the config log for global param. */
2852         /* modify all servers */
2853         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2854                                       *tmp == '\0' ? NULL : lcfg,
2855                                       mti->mti_fsname, quota, 1);
2856         *ptr = sep;
2857         lustre_cfg_free(lcfg);
2858         return rc < 0 ? rc : 0;
2859 }
2860
2861 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2862                                    struct mgs_device *mgs,
2863                                    struct fs_db *fsdb,
2864                                    struct mgs_target_info *mti,
2865                                    char *param)
2866 {
2867         struct mgs_thread_info *mgi = mgs_env_info(env);
2868         struct llog_handle     *llh = NULL;
2869         char                   *logname;
2870         char                   *comment, *ptr;
2871         struct lustre_cfg      *lcfg;
2872         int                     rc, len;
2873         ENTRY;
2874
2875         /* get comment */
2876         ptr = strchr(param, '=');
2877         LASSERT(ptr);
2878         len = ptr - param;
2879
2880         OBD_ALLOC(comment, len + 1);
2881         if (comment == NULL)
2882                 RETURN(-ENOMEM);
2883         strncpy(comment, param, len);
2884         comment[len] = '\0';
2885
2886         /* prepare lcfg */
2887         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2888         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2889         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2890         if (lcfg == NULL)
2891                 GOTO(out_comment, rc = -ENOMEM);
2892
2893         /* construct log name */
2894         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2895         if (rc)
2896                 GOTO(out_lcfg, rc);
2897
2898         if (mgs_log_is_empty(env, mgs, logname)) {
2899                 rc = record_start_log(env, mgs, &llh, logname);
2900                 if (rc)
2901                         GOTO(out, rc);
2902                 record_end_log(env, &llh);
2903         }
2904
2905         /* obsolete old one */
2906         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2907                         comment, CM_SKIP);
2908         if (rc < 0)
2909                 GOTO(out, rc);
2910         /* write the new one */
2911         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2912                                   mti->mti_svname, comment);
2913         if (rc)
2914                 CERROR("err %d writing log %s\n", rc, logname);
2915 out:
2916         name_destroy(&logname);
2917 out_lcfg:
2918         lustre_cfg_free(lcfg);
2919 out_comment:
2920         OBD_FREE(comment, len + 1);
2921         RETURN(rc);
2922 }
2923
2924 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2925                                         char *param)
2926 {
2927         char    *ptr;
2928
2929         /* disable the adjustable udesc parameter for now, i.e. use default
2930          * setting that client always ship udesc to MDT if possible. to enable
2931          * it simply remove the following line */
2932         goto error_out;
2933
2934         ptr = strchr(param, '=');
2935         if (ptr == NULL)
2936                 goto error_out;
2937         *ptr++ = '\0';
2938
2939         if (strcmp(param, PARAM_SRPC_UDESC))
2940                 goto error_out;
2941
2942         if (strcmp(ptr, "yes") == 0) {
2943                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2944                 CWARN("Enable user descriptor shipping from client to MDT\n");
2945         } else if (strcmp(ptr, "no") == 0) {
2946                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2947                 CWARN("Disable user descriptor shipping from client to MDT\n");
2948         } else {
2949                 *(ptr - 1) = '=';
2950                 goto error_out;
2951         }
2952         return 0;
2953
2954 error_out:
2955         CERROR("Invalid param: %s\n", param);
2956         return -EINVAL;
2957 }
2958
2959 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2960                                   const char *svname,
2961                                   char *param)
2962 {
2963         struct sptlrpc_rule      rule;
2964         struct sptlrpc_rule_set *rset;
2965         int                      rc;
2966         ENTRY;
2967
2968         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2969                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2970                 RETURN(-EINVAL);
2971         }
2972
2973         if (strncmp(param, PARAM_SRPC_UDESC,
2974                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2975                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2976         }
2977
2978         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2979                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2980                 RETURN(-EINVAL);
2981         }
2982
2983         param += sizeof(PARAM_SRPC_FLVR) - 1;
2984
2985         rc = sptlrpc_parse_rule(param, &rule);
2986         if (rc)
2987                 RETURN(rc);
2988
2989         /* mgs rules implies must be mgc->mgs */
2990         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2991                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2992                      rule.sr_from != LUSTRE_SP_ANY) ||
2993                     (rule.sr_to != LUSTRE_SP_MGS &&
2994                      rule.sr_to != LUSTRE_SP_ANY))
2995                         RETURN(-EINVAL);
2996         }
2997
2998         /* preapre room for this coming rule. svcname format should be:
2999          * - fsname: general rule
3000          * - fsname-tgtname: target-specific rule
3001          */
3002         if (strchr(svname, '-')) {
3003                 struct mgs_tgt_srpc_conf *tgtconf;
3004                 int                       found = 0;
3005
3006                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3007                      tgtconf = tgtconf->mtsc_next) {
3008                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3009                                 found = 1;
3010                                 break;
3011                         }
3012                 }
3013
3014                 if (!found) {
3015                         int name_len;
3016
3017                         OBD_ALLOC_PTR(tgtconf);
3018                         if (tgtconf == NULL)
3019                                 RETURN(-ENOMEM);
3020
3021                         name_len = strlen(svname);
3022
3023                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3024                         if (tgtconf->mtsc_tgt == NULL) {
3025                                 OBD_FREE_PTR(tgtconf);
3026                                 RETURN(-ENOMEM);
3027                         }
3028                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3029
3030                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3031                         fsdb->fsdb_srpc_tgt = tgtconf;
3032                 }
3033
3034                 rset = &tgtconf->mtsc_rset;
3035         } else {
3036                 rset = &fsdb->fsdb_srpc_gen;
3037         }
3038
3039         rc = sptlrpc_rule_set_merge(rset, &rule);
3040
3041         RETURN(rc);
3042 }
3043
3044 static int mgs_srpc_set_param(const struct lu_env *env,
3045                               struct mgs_device *mgs,
3046                               struct fs_db *fsdb,
3047                               struct mgs_target_info *mti,
3048                               char *param)
3049 {
3050         char                   *copy;
3051         int                     rc, copy_size;
3052         ENTRY;
3053
3054 #ifndef HAVE_GSS
3055         RETURN(-EINVAL);
3056 #endif
3057         /* keep a copy of original param, which could be destroied
3058          * during parsing */
3059         copy_size = strlen(param) + 1;
3060         OBD_ALLOC(copy, copy_size);
3061         if (copy == NULL)
3062                 return -ENOMEM;
3063         memcpy(copy, param, copy_size);
3064
3065         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3066         if (rc)
3067                 goto out_free;
3068
3069         /* previous steps guaranteed the syntax is correct */
3070         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3071         if (rc)
3072                 goto out_free;
3073
3074         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3075                 /*
3076                  * for mgs rules, make them effective immediately.
3077                  */
3078                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3079                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3080                                                  &fsdb->fsdb_srpc_gen);
3081         }
3082
3083 out_free:
3084         OBD_FREE(copy, copy_size);
3085         RETURN(rc);
3086 }
3087
3088 struct mgs_srpc_read_data {
3089         struct fs_db   *msrd_fsdb;
3090         int             msrd_skip;
3091 };
3092
3093 static int mgs_srpc_read_handler(const struct lu_env *env,
3094                                  struct llog_handle *llh,
3095                                  struct llog_rec_hdr *rec, void *data)
3096 {
3097         struct mgs_srpc_read_data *msrd = data;
3098         struct cfg_marker         *marker;
3099         struct lustre_cfg         *lcfg = REC_DATA(rec);
3100         char                      *svname, *param;
3101         int                        cfg_len, rc;
3102         ENTRY;
3103
3104         if (rec->lrh_type != OBD_CFG_REC) {
3105                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3106                 RETURN(-EINVAL);
3107         }
3108
3109         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3110                   sizeof(struct llog_rec_tail);
3111
3112         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3113         if (rc) {
3114                 CERROR("Insane cfg\n");
3115                 RETURN(rc);
3116         }
3117
3118         if (lcfg->lcfg_command == LCFG_MARKER) {
3119                 marker = lustre_cfg_buf(lcfg, 1);
3120
3121                 if (marker->cm_flags & CM_START &&
3122                     marker->cm_flags & CM_SKIP)
3123                         msrd->msrd_skip = 1;
3124                 if (marker->cm_flags & CM_END)
3125                         msrd->msrd_skip = 0;
3126
3127                 RETURN(0);
3128         }
3129
3130         if (msrd->msrd_skip)
3131                 RETURN(0);
3132
3133         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3134                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3135                 RETURN(0);
3136         }
3137
3138         svname = lustre_cfg_string(lcfg, 0);
3139         if (svname == NULL) {
3140                 CERROR("svname is empty\n");
3141                 RETURN(0);
3142         }
3143
3144         param = lustre_cfg_string(lcfg, 1);
3145         if (param == NULL) {
3146                 CERROR("param is empty\n");
3147                 RETURN(0);
3148         }
3149
3150         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3151         if (rc)
3152                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3153
3154         RETURN(0);
3155 }
3156
3157 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3158                                 struct mgs_device *mgs,
3159                                 struct fs_db *fsdb)
3160 {
3161         struct llog_handle        *llh = NULL;
3162         struct llog_ctxt          *ctxt;
3163         char                      *logname;
3164         struct mgs_srpc_read_data  msrd;
3165         int                        rc;
3166         ENTRY;
3167
3168         /* construct log name */
3169         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3170         if (rc)
3171                 RETURN(rc);
3172
3173         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3174         LASSERT(ctxt != NULL);
3175
3176         if (mgs_log_is_empty(env, mgs, logname))
3177                 GOTO(out, rc = 0);
3178
3179         rc = llog_open(env, ctxt, &llh, NULL, logname,
3180                        LLOG_OPEN_EXISTS);
3181         if (rc < 0) {
3182                 if (rc == -ENOENT)
3183                         rc = 0;
3184                 GOTO(out, rc);
3185         }
3186
3187         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3188         if (rc)
3189                 GOTO(out_close, rc);
3190
3191         if (llog_get_size(llh) <= 1)
3192                 GOTO(out_close, rc = 0);
3193
3194         msrd.msrd_fsdb = fsdb;
3195         msrd.msrd_skip = 0;
3196
3197         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3198                           NULL);
3199
3200 out_close:
3201         llog_close(env, llh);
3202 out:
3203         llog_ctxt_put(ctxt);
3204         name_destroy(&logname);
3205
3206         if (rc)
3207                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3208         RETURN(rc);
3209 }
3210
3211 /* Permanent settings of all parameters by writing into the appropriate
3212  * configuration logs.
3213  * A parameter with null value ("<param>='\0'") means to erase it out of
3214  * the logs.
3215  */
3216 static int mgs_write_log_param(const struct lu_env *env,
3217                                struct mgs_device *mgs, struct fs_db *fsdb,
3218                                struct mgs_target_info *mti, char *ptr)
3219 {
3220         struct mgs_thread_info *mgi = mgs_env_info(env);
3221         char *logname;
3222         char *tmp;
3223         int rc = 0, rc2 = 0;
3224         ENTRY;
3225
3226         /* For various parameter settings, we have to figure out which logs
3227            care about them (e.g. both mdt and client for lov settings) */
3228         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3229
3230         /* The params are stored in MOUNT_DATA_FILE and modified via
3231            tunefs.lustre, or set using lctl conf_param */
3232
3233         /* Processed in lustre_start_mgc */
3234         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3235                 GOTO(end, rc);
3236
3237         /* Processed in ost/mdt */
3238         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3239                 GOTO(end, rc);
3240
3241         /* Processed in mgs_write_log_ost */
3242         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3243                 if (mti->mti_flags & LDD_F_PARAM) {
3244                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3245                                            "changed with tunefs.lustre"
3246                                            "and --writeconf\n", ptr);
3247                         rc = -EPERM;
3248                 }
3249                 GOTO(end, rc);
3250         }
3251
3252         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3253                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3254                 GOTO(end, rc);
3255         }
3256
3257         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3258                 /* Add a failover nidlist */
3259                 rc = 0;
3260                 /* We already processed failovers params for new
3261                    targets in mgs_write_log_target */
3262                 if (mti->mti_flags & LDD_F_PARAM) {
3263                         CDEBUG(D_MGS, "Adding failnode\n");
3264                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3265                 }
3266                 GOTO(end, rc);
3267         }
3268
3269         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3270                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3271                 GOTO(end, rc);
3272         }
3273
3274         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3275                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3276                 GOTO(end, rc);
3277         }
3278
3279         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3280                 /* active=0 means off, anything else means on */
3281                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3282                 int i;
3283
3284                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3285                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3286                                            "be (de)activated.\n",
3287                                            mti->mti_svname);
3288                         GOTO(end, rc = -EINVAL);
3289                 }
3290                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3291                               flag ? "de": "re", mti->mti_svname);
3292                 /* Modify clilov */
3293                 rc = name_create(&logname, mti->mti_fsname, "-client");
3294                 if (rc)
3295                         GOTO(end, rc);
3296                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3297                                 mti->mti_svname, "add osc", flag);
3298                 name_destroy(&logname);
3299                 if (rc)
3300                         goto active_err;
3301                 /* Modify mdtlov */
3302                 /* Add to all MDT logs for CMD */
3303                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3304                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3305                                 continue;
3306                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3307                         if (rc)
3308                                 GOTO(end, rc);
3309                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3310                                         mti->mti_svname, "add osc", flag);
3311                         name_destroy(&logname);
3312                         if (rc)
3313                                 goto active_err;
3314                 }
3315         active_err:
3316                 if (rc) {
3317                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3318                                            "log (%d). No permanent "
3319                                            "changes were made to the "
3320                                            "config log.\n",
3321                                            mti->mti_svname, rc);
3322                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3323                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3324                                                    " because the log"
3325                                                    "is in the old 1.4"
3326                                                    "style. Consider "
3327                                                    " --writeconf to "
3328                                                    "update the logs.\n");
3329                         GOTO(end, rc);
3330                 }
3331                 /* Fall through to osc proc for deactivating live OSC
3332                    on running MDT / clients. */
3333         }
3334         /* Below here, let obd's XXX_process_config methods handle it */
3335
3336         /* All lov. in proc */
3337         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3338                 char *mdtlovname;
3339
3340                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3341                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3342                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3343                                            "set on the MDT, not %s. "
3344                                            "Ignoring.\n",
3345                                            mti->mti_svname);
3346                         GOTO(end, rc = 0);
3347                 }
3348
3349                 /* Modify mdtlov */
3350                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3351                         GOTO(end, rc = -ENODEV);
3352
3353                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3354                                              mti->mti_stripe_index);
3355                 if (rc)
3356                         GOTO(end, rc);
3357                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3358                                   &mgi->mgi_bufs, mdtlovname, ptr);
3359                 name_destroy(&logname);
3360                 name_destroy(&mdtlovname);
3361                 if (rc)
3362                         GOTO(end, rc);
3363
3364                 /* Modify clilov */
3365                 rc = name_create(&logname, mti->mti_fsname, "-client");
3366                 if (rc)
3367                         GOTO(end, rc);
3368                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3369                                   fsdb->fsdb_clilov, ptr);
3370                 name_destroy(&logname);
3371                 GOTO(end, rc);
3372         }
3373
3374         /* All osc., mdc., llite. params in proc */
3375         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3376             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3377             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3378                 char *cname;
3379
3380                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3381                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3382                                            " cannot be modified. Consider"
3383                                            " updating the configuration with"
3384                                            " --writeconf\n",
3385                                            mti->mti_svname);
3386                         GOTO(end, rc = -EINVAL);
3387                 }
3388                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3389                         rc = name_create(&cname, mti->mti_fsname, "-client");
3390                         /* Add the client type to match the obdname in
3391                            class_config_llog_handler */
3392                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3393                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3394                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3395                         rc = name_create(&cname, mti->mti_svname, "-osc");
3396                 } else {
3397                         GOTO(end, rc = -EINVAL);
3398                 }
3399                 if (rc)
3400                         GOTO(end, rc);
3401
3402                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3403
3404                 /* Modify client */
3405                 rc = name_create(&logname, mti->mti_fsname, "-client");
3406                 if (rc) {
3407                         name_destroy(&cname);
3408                         GOTO(end, rc);
3409                 }
3410                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3411                                   cname, ptr);
3412
3413                 /* osc params affect the MDT as well */
3414                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3415                         int i;
3416
3417                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3418                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3419                                         continue;
3420                                 name_destroy(&cname);
3421                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3422                                                          fsdb, i);
3423                                 name_destroy(&logname);
3424                                 if (rc)
3425                                         break;
3426                                 rc = name_create_mdt(&logname,
3427                                                      mti->mti_fsname, i);
3428                                 if (rc)
3429                                         break;
3430                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3431                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3432                                                           mti, logname,
3433                                                           &mgi->mgi_bufs,
3434                                                           cname, ptr);
3435                                         if (rc)
3436                                                 break;
3437                                 }
3438                         }
3439                 }
3440                 name_destroy(&logname);
3441                 name_destroy(&cname);
3442                 GOTO(end, rc);
3443         }
3444
3445         /* All mdt. params in proc */
3446         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3447                 int i;
3448                 __u32 idx;
3449
3450                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3451                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3452                             MTI_NAME_MAXLEN) == 0)
3453                         /* device is unspecified completely? */
3454                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3455                 else
3456                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3457                 if (rc < 0)
3458                         goto active_err;
3459                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3460                         goto active_err;
3461                 if (rc & LDD_F_SV_ALL) {
3462                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3463                                 if (!test_bit(i,
3464                                                   fsdb->fsdb_mdt_index_map))
3465                                         continue;
3466                                 rc = name_create_mdt(&logname,
3467                                                 mti->mti_fsname, i);
3468                                 if (rc)
3469                                         goto active_err;
3470                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3471                                                   logname, &mgi->mgi_bufs,
3472                                                   logname, ptr);
3473                                 name_destroy(&logname);
3474                                 if (rc)
3475                                         goto active_err;
3476                         }
3477                 } else {
3478                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3479                                           mti->mti_svname, &mgi->mgi_bufs,
3480                                           mti->mti_svname, ptr);
3481                         if (rc)
3482                                 goto active_err;
3483                 }
3484                 GOTO(end, rc);
3485         }
3486
3487         /* All mdd., ost. params in proc */
3488         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3489             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
3490                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3491                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3492                         GOTO(end, rc = -ENODEV);
3493
3494                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3495                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3496                 GOTO(end, rc);
3497         }
3498
3499         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3500         rc2 = -ENOSYS;
3501
3502 end:
3503         if (rc)
3504                 CERROR("err %d on param '%s'\n", rc, ptr);
3505
3506         RETURN(rc ?: rc2);
3507 }
3508
3509 /* Not implementing automatic failover nid addition at this time. */
3510 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3511                       struct mgs_target_info *mti)
3512 {
3513 #if 0
3514         struct fs_db *fsdb;
3515         int rc;
3516         ENTRY;
3517
3518         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3519         if (rc)
3520                 RETURN(rc);
3521
3522         if (mgs_log_is_empty(obd, mti->mti_svname))
3523                 /* should never happen */
3524                 RETURN(-ENOENT);
3525
3526         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3527
3528         /* FIXME We can just check mti->params to see if we're already in
3529            the failover list.  Modify mti->params for rewriting back at
3530            server_register_target(). */
3531
3532         mutex_lock(&fsdb->fsdb_mutex);
3533         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3534         mutex_unlock(&fsdb->fsdb_mutex);
3535
3536         RETURN(rc);
3537 #endif
3538         return 0;
3539 }
3540
3541 int mgs_write_log_target(const struct lu_env *env,
3542                          struct mgs_device *mgs,
3543                          struct mgs_target_info *mti,
3544                          struct fs_db *fsdb)
3545 {
3546         int rc = -EINVAL;
3547         char *buf, *params;
3548         ENTRY;
3549
3550         /* set/check the new target index */
3551         rc = mgs_set_index(env, mgs, mti);
3552         if (rc < 0) {
3553                 CERROR("Can't get index (%d)\n", rc);
3554                 RETURN(rc);
3555         }
3556
3557         if (rc == EALREADY) {
3558                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3559                               mti->mti_stripe_index, mti->mti_svname);
3560                 /* We would like to mark old log sections as invalid
3561                    and add new log sections in the client and mdt logs.
3562                    But if we add new sections, then live clients will
3563                    get repeat setup instructions for already running
3564                    osc's. So don't update the client/mdt logs. */
3565                 mti->mti_flags &= ~LDD_F_UPDATE;
3566                 rc = 0;
3567         }
3568
3569         mutex_lock(&fsdb->fsdb_mutex);
3570
3571         if (mti->mti_flags &
3572             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3573                 /* Generate a log from scratch */
3574                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3575                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3576                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3577                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3578                 } else {
3579                         CERROR("Unknown target type %#x, can't create log for "
3580                                "%s\n", mti->mti_flags, mti->mti_svname);
3581                 }
3582                 if (rc) {
3583                         CERROR("Can't write logs for %s (%d)\n",
3584                                mti->mti_svname, rc);
3585                         GOTO(out_up, rc);
3586                 }
3587         } else {
3588                 /* Just update the params from tunefs in mgs_write_log_params */
3589                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3590                 mti->mti_flags |= LDD_F_PARAM;
3591         }
3592
3593         /* allocate temporary buffer, where class_get_next_param will
3594            make copy of a current  parameter */
3595         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3596         if (buf == NULL)
3597                 GOTO(out_up, rc = -ENOMEM);
3598         params = mti->mti_params;
3599         while (params != NULL) {
3600                 rc = class_get_next_param(&params, buf);
3601                 if (rc) {
3602                         if (rc == 1)
3603                                 /* there is no next parameter, that is
3604                                    not an error */
3605                                 rc = 0;
3606                         break;
3607                 }
3608                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3609                        params, buf);
3610                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3611                 if (rc)
3612                         break;
3613         }
3614
3615         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3616
3617 out_up:
3618         mutex_unlock(&fsdb->fsdb_mutex);
3619         RETURN(rc);
3620 }
3621
3622 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3623 {
3624         struct llog_ctxt        *ctxt;
3625         int                      rc = 0;
3626
3627         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3628         if (ctxt == NULL) {
3629                 CERROR("%s: MGS config context doesn't exist\n",
3630                        mgs->mgs_obd->obd_name);
3631                 rc = -ENODEV;
3632         } else {
3633                 rc = llog_erase(env, ctxt, NULL, name);
3634                 /* llog may not exist */
3635                 if (rc == -ENOENT)
3636                         rc = 0;
3637                 llog_ctxt_put(ctxt);
3638         }
3639
3640         if (rc)
3641                 CERROR("%s: failed to clear log %s: %d\n",
3642                        mgs->mgs_obd->obd_name, name, rc);
3643
3644         return rc;
3645 }
3646
3647 /* erase all logs for the given fs */
3648 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3649 {
3650         struct fs_db *fsdb;
3651         cfs_list_t list;
3652         struct mgs_direntry *dirent, *n;
3653         int rc, len = strlen(fsname);
3654         char *suffix;
3655         ENTRY;
3656
3657         /* Find all the logs in the CONFIGS directory */
3658         rc = class_dentry_readdir(env, mgs, &list);
3659         if (rc)
3660                 RETURN(rc);
3661
3662         mutex_lock(&mgs->mgs_mutex);
3663
3664         /* Delete the fs db */
3665         fsdb = mgs_find_fsdb(mgs, fsname);
3666         if (fsdb)
3667                 mgs_free_fsdb(mgs, fsdb);
3668
3669         mutex_unlock(&mgs->mgs_mutex);
3670
3671         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3672                 cfs_list_del(&dirent->list);
3673                 suffix = strrchr(dirent->name, '-');
3674                 if (suffix != NULL) {
3675                         if ((len == suffix - dirent->name) &&
3676                             (strncmp(fsname, dirent->name, len) == 0)) {
3677                                 CDEBUG(D_MGS, "Removing log %s\n",
3678                                        dirent->name);
3679                                 mgs_erase_log(env, mgs, dirent->name);
3680                         }
3681                 }
3682                 mgs_direntry_free(dirent);
3683         }
3684
3685         RETURN(rc);
3686 }
3687
3688 /* from llog_swab */
3689 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3690 {
3691         int i;
3692         ENTRY;
3693
3694         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3695         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3696
3697         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3698         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3699         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3700         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3701
3702         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3703         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3704                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3705                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3706                                i, lcfg->lcfg_buflens[i],
3707                                lustre_cfg_string(lcfg, i));
3708                 }
3709         EXIT;
3710 }
3711
3712 /* Set a permanent (config log) param for a target or fs
3713  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3714  *             buf1 contains the single parameter
3715  */
3716 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3717                  struct lustre_cfg *lcfg, char *fsname)
3718 {
3719         struct fs_db *fsdb;
3720         struct mgs_target_info *mti;
3721         char *devname, *param;
3722         char *ptr;
3723         const char *tmp;
3724         __u32 index;
3725         int rc = 0;
3726         ENTRY;
3727
3728         print_lustre_cfg(lcfg);
3729
3730         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3731         devname = lustre_cfg_string(lcfg, 0);
3732         param = lustre_cfg_string(lcfg, 1);
3733         if (!devname) {
3734                 /* Assume device name embedded in param:
3735                    lustre-OST0000.osc.max_dirty_mb=32 */
3736                 ptr = strchr(param, '.');
3737                 if (ptr) {
3738                         devname = param;
3739                         *ptr = 0;
3740                         param = ptr + 1;
3741                 }
3742         }
3743         if (!devname) {
3744                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3745                 RETURN(-ENOSYS);
3746         }
3747
3748         rc = mgs_parse_devname(devname, fsname, NULL);
3749         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3750                 /* param related to llite isn't allowed to set by OST or MDT */
3751                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3752                                    sizeof(PARAM_LLITE)) == 0)
3753                         RETURN(-EINVAL);
3754         } else {
3755                 /* assume devname is the fsname */
3756                 memset(fsname, 0, MTI_NAME_MAXLEN);
3757                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3758                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3759         }
3760         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3761
3762         rc = mgs_find_or_make_fsdb(env, mgs,
3763                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3764                                    PARAMS_FILENAME : fsname, &fsdb);
3765         if (rc)
3766                 RETURN(rc);
3767
3768         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3769             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3770             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3771                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3772                        "is '%s'\n", fsname, devname);
3773                 mgs_free_fsdb(mgs, fsdb);
3774                 RETURN(-EINVAL);
3775         }
3776
3777         /* Create a fake mti to hold everything */
3778         OBD_ALLOC_PTR(mti);
3779         if (!mti)
3780                 GOTO(out, rc = -ENOMEM);
3781         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3782             >= sizeof(mti->mti_fsname))
3783                 GOTO(out, rc = -E2BIG);
3784         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3785             >= sizeof(mti->mti_svname))
3786                 GOTO(out, rc = -E2BIG);
3787         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3788             >= sizeof(mti->mti_params))
3789                 GOTO(out, rc = -E2BIG);
3790         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3791         if (rc < 0)
3792                 /* Not a valid server; may be only fsname */
3793                 rc = 0;
3794         else
3795                 /* Strip -osc or -mdc suffix from svname */
3796                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3797                                      mti->mti_svname))
3798                         GOTO(out, rc = -EINVAL);
3799         /*
3800          * Revoke lock so everyone updates.  Should be alright if
3801          * someone was already reading while we were updating the logs,
3802          * so we don't really need to hold the lock while we're
3803          * writing (above).
3804          */
3805         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3806                 mti->mti_flags = rc | LDD_F_PARAM2;
3807                 mutex_lock(&fsdb->fsdb_mutex);
3808                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3809                 mutex_unlock(&fsdb->fsdb_mutex);
3810                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3811         } else {
3812                 mti->mti_flags = rc | LDD_F_PARAM;
3813                 mutex_lock(&fsdb->fsdb_mutex);
3814                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3815                 mutex_unlock(&fsdb->fsdb_mutex);
3816                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3817         }
3818
3819 out:
3820         OBD_FREE_PTR(mti);
3821         RETURN(rc);
3822 }
3823
3824 static int mgs_write_log_pool(const struct lu_env *env,
3825                               struct mgs_device *mgs, char *logname,
3826                               struct fs_db *fsdb, char *lovname,
3827                               enum lcfg_command_type cmd,
3828                               char *poolname, char *fsname,
3829                               char *ostname, char *comment)
3830 {
3831         struct llog_handle *llh = NULL;
3832         int rc;
3833
3834         rc = record_start_log(env, mgs, &llh, logname);
3835         if (rc)
3836                 return rc;
3837         rc = record_marker(env, llh, fsdb, CM_START, lovname, comment);
3838         if (rc)
3839                 goto out;
3840         rc = record_base(env, llh, lovname, 0, cmd, poolname, fsname, ostname, 0);
3841         if (rc)
3842                 goto out;
3843         rc = record_marker(env, llh, fsdb, CM_END, lovname, comment);
3844 out:
3845         record_end_log(env, &llh);
3846         return rc;
3847 }
3848
3849 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3850                  enum lcfg_command_type cmd, char *fsname,
3851                  char *poolname, char *ostname)
3852 {
3853         struct fs_db *fsdb;
3854         char *lovname;
3855         char *logname;
3856         char *label = NULL, *canceled_label = NULL;
3857         int label_sz;
3858         struct mgs_target_info *mti = NULL;
3859         int rc, i;
3860         ENTRY;
3861
3862         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3863         if (rc) {
3864                 CERROR("Can't get db for %s\n", fsname);
3865                 RETURN(rc);
3866         }
3867         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3868                 CERROR("%s is not defined\n", fsname);
3869                 mgs_free_fsdb(mgs, fsdb);
3870                 RETURN(-EINVAL);
3871         }
3872
3873         label_sz = 10 + strlen(fsname) + strlen(poolname);
3874
3875         /* check if ostname match fsname */
3876         if (ostname != NULL) {
3877                 char *ptr;
3878
3879                 ptr = strrchr(ostname, '-');
3880                 if ((ptr == NULL) ||
3881                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3882                         RETURN(-EINVAL);
3883                 label_sz += strlen(ostname);
3884         }
3885
3886         OBD_ALLOC(label, label_sz);
3887         if (label == NULL)
3888                 RETURN(-ENOMEM);
3889
3890         switch(cmd) {
3891         case LCFG_POOL_NEW:
3892                 sprintf(label,
3893                         "new %s.%s", fsname, poolname);
3894                 break;
3895         case LCFG_POOL_ADD:
3896                 sprintf(label,
3897                         "add %s.%s.%s", fsname, poolname, ostname);
3898                 break;
3899         case LCFG_POOL_REM:
3900                 OBD_ALLOC(canceled_label, label_sz);
3901                 if (canceled_label == NULL)
3902                         GOTO(out_label, rc = -ENOMEM);
3903                 sprintf(label,
3904                         "rem %s.%s.%s", fsname, poolname, ostname);
3905                 sprintf(canceled_label,
3906                         "add %s.%s.%s", fsname, poolname, ostname);
3907                 break;
3908         case LCFG_POOL_DEL:
3909                 OBD_ALLOC(canceled_label, label_sz);
3910                 if (canceled_label == NULL)
3911                         GOTO(out_label, rc = -ENOMEM);
3912                 sprintf(label,
3913                         "del %s.%s", fsname, poolname);
3914                 sprintf(canceled_label,
3915                         "new %s.%s", fsname, poolname);
3916                 break;
3917         default:
3918                 break;
3919         }
3920
3921         if (canceled_label != NULL) {
3922                 OBD_ALLOC_PTR(mti);
3923                 if (mti == NULL)
3924                         GOTO(out_cancel, rc = -ENOMEM);
3925         }
3926
3927         mutex_lock(&fsdb->fsdb_mutex);
3928         /* write pool def to all MDT logs */
3929         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3930                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3931                         rc = name_create_mdt_and_lov(&logname, &lovname,
3932                                                      fsdb, i);
3933                         if (rc) {
3934                                 mutex_unlock(&fsdb->fsdb_mutex);
3935                                 GOTO(out_mti, rc);
3936                         }
3937                         if (canceled_label != NULL) {
3938                                 strcpy(mti->mti_svname, "lov pool");
3939                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3940                                                 lovname, canceled_label,
3941                                                 CM_SKIP);
3942                         }
3943
3944                         if (rc >= 0)
3945                                 rc = mgs_write_log_pool(env, mgs, logname,
3946                                                         fsdb, lovname, cmd,
3947                                                         fsname, poolname,
3948                                                         ostname, label);
3949                         name_destroy(&logname);
3950                         name_destroy(&lovname);
3951                         if (rc) {
3952                                 mutex_unlock(&fsdb->fsdb_mutex);
3953                                 GOTO(out_mti, rc);
3954                         }
3955                 }
3956         }
3957
3958         rc = name_create(&logname, fsname, "-client");
3959         if (rc) {
3960                 mutex_unlock(&fsdb->fsdb_mutex);
3961                 GOTO(out_mti, rc);
3962         }
3963         if (canceled_label != NULL) {
3964                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3965                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
3966                 if (rc < 0) {
3967                         mutex_unlock(&fsdb->fsdb_mutex);
3968                         name_destroy(&logname);
3969                         GOTO(out_mti, rc);
3970                 }
3971         }
3972
3973         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
3974                                 cmd, fsname, poolname, ostname, label);
3975         mutex_unlock(&fsdb->fsdb_mutex);
3976         name_destroy(&logname);
3977         /* request for update */
3978         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3979
3980         EXIT;
3981 out_mti:
3982         if (mti != NULL)
3983                 OBD_FREE_PTR(mti);
3984 out_cancel:
3985         if (canceled_label != NULL)
3986                 OBD_FREE(canceled_label, label_sz);
3987 out_label:
3988         OBD_FREE(label, label_sz);
3989         return rc;
3990 }