Whamcloud - gitweb
LU-2074 build: fix 'copy into fixed size buffer' errors
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         LASSERT(dir);
71         LASSERT(dir->do_index_ops);
72
73         iops = &dir->do_index_ops->dio_it;
74         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
75         if (IS_ERR(it))
76                 RETURN(PTR_ERR(it));
77
78         rc = iops->load(env, it, 0);
79         if (rc <= 0)
80                 GOTO(fini, rc = 0);
81
82         /* main cycle */
83         do {
84                 key = (void *)iops->key(env, it);
85                 if (IS_ERR(key)) {
86                         CERROR("%s: key failed when listing %s: rc = %d\n",
87                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
88                                (int) PTR_ERR(key));
89                         goto next;
90                 }
91                 key_sz = iops->key_size(env, it);
92                 LASSERT(key_sz > 0);
93
94                 /* filter out "." and ".." entries */
95                 if (key[0] == '.') {
96                         if (key_sz == 1)
97                                 goto next;
98                         if (key_sz == 2 && key[1] == '.')
99                                 goto next;
100                 }
101
102                 de = mgs_direntry_alloc(key_sz + 1);
103                 if (de == NULL) {
104                         rc = -ENOMEM;
105                         break;
106                 }
107
108                 memcpy(de->name, key, key_sz);
109                 de->name[key_sz] = 0;
110
111                 cfs_list_add(&de->list, list);
112
113 next:
114                 rc = iops->next(env, it);
115         } while (rc == 0);
116         rc = 0;
117
118         iops->put(env, it);
119
120 fini:
121         iops->fini(env, it);
122         if (rc)
123                 CERROR("%s: key failed when listing %s: rc = %d\n",
124                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
125         RETURN(rc);
126 }
127
128 /******************** DB functions *********************/
129
130 static inline int name_create(char **newname, char *prefix, char *suffix)
131 {
132         LASSERT(newname);
133         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
134         if (!*newname)
135                 return -ENOMEM;
136         sprintf(*newname, "%s%s", prefix, suffix);
137         return 0;
138 }
139
140 static inline void name_destroy(char **name)
141 {
142         if (*name)
143                 OBD_FREE(*name, strlen(*name) + 1);
144         *name = NULL;
145 }
146
147 struct mgs_fsdb_handler_data
148 {
149         struct fs_db   *fsdb;
150         __u32           ver;
151 };
152
153 /* from the (client) config log, figure out:
154         1. which ost's/mdt's are configured (by index)
155         2. what the last config step is
156         3. COMPAT_18 osc name
157 */
158 /* It might be better to have a separate db file, instead of parsing the info
159    out of the client log.  This is slow and potentially error-prone. */
160 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
161                             struct llog_rec_hdr *rec, void *data)
162 {
163         struct mgs_fsdb_handler_data *d = data;
164         struct fs_db *fsdb = d->fsdb;
165         int cfg_len = rec->lrh_len;
166         char *cfg_buf = (char*) (rec + 1);
167         struct lustre_cfg *lcfg;
168         __u32 index;
169         int rc = 0;
170         ENTRY;
171
172         if (rec->lrh_type != OBD_CFG_REC) {
173                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
174                 RETURN(-EINVAL);
175         }
176
177         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
178         if (rc) {
179                 CERROR("Insane cfg\n");
180                 RETURN(rc);
181         }
182
183         lcfg = (struct lustre_cfg *)cfg_buf;
184
185         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
186                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
187
188         /* Figure out ost indicies */
189         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
190         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
191             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
192                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
193                                        NULL, 10);
194                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
195                        lustre_cfg_string(lcfg, 1), index,
196                        lustre_cfg_string(lcfg, 2));
197                 set_bit(index, fsdb->fsdb_ost_index_map);
198         }
199
200         /* Figure out mdt indicies */
201         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
202         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
203             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
204                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
205                                        &index, NULL);
206                 if (rc != LDD_F_SV_TYPE_MDT) {
207                         CWARN("Unparsable MDC name %s, assuming index 0\n",
208                               lustre_cfg_string(lcfg, 0));
209                         index = 0;
210                 }
211                 rc = 0;
212                 CDEBUG(D_MGS, "MDT index is %u\n", index);
213                 set_bit(index, fsdb->fsdb_mdt_index_map);
214                 fsdb->fsdb_mdt_count ++;
215         }
216
217         /**
218          * figure out the old config. fsdb_gen = 0 means old log
219          * It is obsoleted and not supported anymore
220          */
221         if (fsdb->fsdb_gen == 0) {
222                 CERROR("Old config format is not supported\n");
223                 RETURN(-EINVAL);
224         }
225
226         /*
227          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
228          */
229         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
230             lcfg->lcfg_command == LCFG_ATTACH &&
231             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
232                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
233                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
234                         CWARN("MDT using 1.8 OSC name scheme\n");
235                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
236                 }
237         }
238
239         if (lcfg->lcfg_command == LCFG_MARKER) {
240                 struct cfg_marker *marker;
241                 marker = lustre_cfg_buf(lcfg, 1);
242
243                 d->ver = marker->cm_vers;
244
245                 /* Keep track of the latest marker step */
246                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
247         }
248
249         RETURN(rc);
250 }
251
252 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
253 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
254                                   struct mgs_device *mgs,
255                                   struct fs_db *fsdb)
256 {
257         char                            *logname;
258         struct llog_handle              *loghandle;
259         struct llog_ctxt                *ctxt;
260         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
261         int rc;
262
263         ENTRY;
264
265         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
266         LASSERT(ctxt != NULL);
267         rc = name_create(&logname, fsdb->fsdb_name, "-client");
268         if (rc)
269                 GOTO(out_put, rc);
270         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
271         if (rc)
272                 GOTO(out_pop, rc);
273
274         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
275         if (rc)
276                 GOTO(out_close, rc);
277
278         if (llog_get_size(loghandle) <= 1)
279                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
280
281         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
282         CDEBUG(D_INFO, "get_db = %d\n", rc);
283 out_close:
284         llog_close(env, loghandle);
285 out_pop:
286         name_destroy(&logname);
287 out_put:
288         llog_ctxt_put(ctxt);
289
290         RETURN(rc);
291 }
292
293 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
294 {
295         struct mgs_tgt_srpc_conf *tgtconf;
296
297         /* free target-specific rules */
298         while (fsdb->fsdb_srpc_tgt) {
299                 tgtconf = fsdb->fsdb_srpc_tgt;
300                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
301
302                 LASSERT(tgtconf->mtsc_tgt);
303
304                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
305                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
306                 OBD_FREE_PTR(tgtconf);
307         }
308
309         /* free general rules */
310         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
311 }
312
313 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
314 {
315         struct fs_db *fsdb;
316         cfs_list_t *tmp;
317
318         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
319                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
320                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
321                         return fsdb;
322         }
323         return NULL;
324 }
325
326 /* caller must hold the mgs->mgs_fs_db_lock */
327 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
328                                   struct mgs_device *mgs, char *fsname)
329 {
330         struct fs_db *fsdb;
331         int rc;
332         ENTRY;
333
334         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
335                 CERROR("fsname %s is too long\n", fsname);
336                 RETURN(NULL);
337         }
338
339         OBD_ALLOC_PTR(fsdb);
340         if (!fsdb)
341                 RETURN(NULL);
342
343         strcpy(fsdb->fsdb_name, fsname);
344         mutex_init(&fsdb->fsdb_mutex);
345         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
346         fsdb->fsdb_gen = 1;
347
348         if (strcmp(fsname, MGSSELF_NAME) == 0) {
349                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
350         } else {
351                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
352                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
353                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
354                         CERROR("No memory for index maps\n");
355                         GOTO(err, rc = -ENOMEM);
356                 }
357
358                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
359                 if (rc)
360                         GOTO(err, rc);
361                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
362                 if (rc)
363                         GOTO(err, rc);
364
365                 /* initialise data for NID table */
366                 mgs_ir_init_fs(env, mgs, fsdb);
367
368                 lproc_mgs_add_live(mgs, fsdb);
369         }
370
371         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
372
373         RETURN(fsdb);
374 err:
375         if (fsdb->fsdb_ost_index_map)
376                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
377         if (fsdb->fsdb_mdt_index_map)
378                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
379         name_destroy(&fsdb->fsdb_clilov);
380         name_destroy(&fsdb->fsdb_clilmv);
381         OBD_FREE_PTR(fsdb);
382         RETURN(NULL);
383 }
384
385 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
386 {
387         /* wait for anyone with the sem */
388         mutex_lock(&fsdb->fsdb_mutex);
389         lproc_mgs_del_live(mgs, fsdb);
390         cfs_list_del(&fsdb->fsdb_list);
391
392         /* deinitialize fsr */
393         mgs_ir_fini_fs(mgs, fsdb);
394
395         if (fsdb->fsdb_ost_index_map)
396                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
397         if (fsdb->fsdb_mdt_index_map)
398                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
399         name_destroy(&fsdb->fsdb_clilov);
400         name_destroy(&fsdb->fsdb_clilmv);
401         mgs_free_fsdb_srpc(fsdb);
402         mutex_unlock(&fsdb->fsdb_mutex);
403         OBD_FREE_PTR(fsdb);
404 }
405
406 int mgs_init_fsdb_list(struct mgs_device *mgs)
407 {
408         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
409         return 0;
410 }
411
412 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
413 {
414         struct fs_db *fsdb;
415         cfs_list_t *tmp, *tmp2;
416         mutex_lock(&mgs->mgs_mutex);
417         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
418                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
419                 mgs_free_fsdb(mgs, fsdb);
420         }
421         mutex_unlock(&mgs->mgs_mutex);
422         return 0;
423 }
424
425 int mgs_find_or_make_fsdb(const struct lu_env *env,
426                           struct mgs_device *mgs, char *name,
427                           struct fs_db **dbh)
428 {
429         struct fs_db *fsdb;
430         int rc = 0;
431
432         ENTRY;
433         mutex_lock(&mgs->mgs_mutex);
434         fsdb = mgs_find_fsdb(mgs, name);
435         if (fsdb) {
436                 mutex_unlock(&mgs->mgs_mutex);
437                 *dbh = fsdb;
438                 RETURN(0);
439         }
440
441         CDEBUG(D_MGS, "Creating new db\n");
442         fsdb = mgs_new_fsdb(env, mgs, name);
443         /* lock fsdb_mutex until the db is loaded from llogs */
444         if (fsdb)
445                 mutex_lock(&fsdb->fsdb_mutex);
446         mutex_unlock(&mgs->mgs_mutex);
447         if (!fsdb)
448                 RETURN(-ENOMEM);
449
450         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
451                 /* populate the db from the client llog */
452                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
453                 if (rc) {
454                         CERROR("Can't get db from client log %d\n", rc);
455                         GOTO(out_free, rc);
456                 }
457         }
458
459         /* populate srpc rules from params llog */
460         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
461         if (rc) {
462                 CERROR("Can't get db from params log %d\n", rc);
463                 GOTO(out_free, rc);
464         }
465
466         mutex_unlock(&fsdb->fsdb_mutex);
467         *dbh = fsdb;
468
469         RETURN(0);
470
471 out_free:
472         mutex_unlock(&fsdb->fsdb_mutex);
473         mgs_free_fsdb(mgs, fsdb);
474         return rc;
475 }
476
477 /* 1 = index in use
478    0 = index unused
479    -1= empty client log */
480 int mgs_check_index(const struct lu_env *env,
481                     struct mgs_device *mgs,
482                     struct mgs_target_info *mti)
483 {
484         struct fs_db *fsdb;
485         void *imap;
486         int rc = 0;
487         ENTRY;
488
489         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
490
491         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
492         if (rc) {
493                 CERROR("Can't get db for %s\n", mti->mti_fsname);
494                 RETURN(rc);
495         }
496
497         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
498                 RETURN(-1);
499
500         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
501                 imap = fsdb->fsdb_ost_index_map;
502         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
503                 imap = fsdb->fsdb_mdt_index_map;
504         else
505                 RETURN(-EINVAL);
506
507         if (test_bit(mti->mti_stripe_index, imap))
508                 RETURN(1);
509         RETURN(0);
510 }
511
512 static __inline__ int next_index(void *index_map, int map_len)
513 {
514         int i;
515         for (i = 0; i < map_len * 8; i++)
516                 if (!test_bit(i, index_map)) {
517                          return i;
518                  }
519         CERROR("max index %d exceeded.\n", i);
520         return -1;
521 }
522
523 /* Return codes:
524         0  newly marked as in use
525         <0 err
526         +EALREADY for update of an old index */
527 static int mgs_set_index(const struct lu_env *env,
528                          struct mgs_device *mgs,
529                          struct mgs_target_info *mti)
530 {
531         struct fs_db *fsdb;
532         void *imap;
533         int rc = 0;
534         ENTRY;
535
536         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
537         if (rc) {
538                 CERROR("Can't get db for %s\n", mti->mti_fsname);
539                 RETURN(rc);
540         }
541
542         mutex_lock(&fsdb->fsdb_mutex);
543         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
544                 imap = fsdb->fsdb_ost_index_map;
545         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
546                 imap = fsdb->fsdb_mdt_index_map;
547         } else {
548                 GOTO(out_up, rc = -EINVAL);
549         }
550
551         if (mti->mti_flags & LDD_F_NEED_INDEX) {
552                 rc = next_index(imap, INDEX_MAP_SIZE);
553                 if (rc == -1)
554                         GOTO(out_up, rc = -ERANGE);
555                 mti->mti_stripe_index = rc;
556                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
557                         fsdb->fsdb_mdt_count ++;
558         }
559
560         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
561                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
562                                    "but the max index is %d.\n",
563                                    mti->mti_svname, mti->mti_stripe_index,
564                                    INDEX_MAP_SIZE * 8);
565                 GOTO(out_up, rc = -ERANGE);
566         }
567
568         if (test_bit(mti->mti_stripe_index, imap)) {
569                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
570                     !(mti->mti_flags & LDD_F_WRITECONF)) {
571                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
572                                            "%d, but that index is already in "
573                                            "use. Use --writeconf to force\n",
574                                            mti->mti_svname,
575                                            mti->mti_stripe_index);
576                         GOTO(out_up, rc = -EADDRINUSE);
577                 } else {
578                         CDEBUG(D_MGS, "Server %s updating index %d\n",
579                                mti->mti_svname, mti->mti_stripe_index);
580                         GOTO(out_up, rc = EALREADY);
581                 }
582         }
583
584         set_bit(mti->mti_stripe_index, imap);
585         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
586         mutex_unlock(&fsdb->fsdb_mutex);
587         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
588                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
589
590         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
591                mti->mti_stripe_index);
592
593         RETURN(0);
594 out_up:
595         mutex_unlock(&fsdb->fsdb_mutex);
596         return rc;
597 }
598
599 struct mgs_modify_lookup {
600         struct cfg_marker mml_marker;
601         int               mml_modified;
602 };
603
604 static int mgs_modify_handler(const struct lu_env *env,
605                               struct llog_handle *llh,
606                               struct llog_rec_hdr *rec, void *data)
607 {
608         struct mgs_modify_lookup *mml = data;
609         struct cfg_marker *marker;
610         struct lustre_cfg *lcfg = REC_DATA(rec);
611         int cfg_len = REC_DATA_LEN(rec);
612         int rc;
613         ENTRY;
614
615         if (rec->lrh_type != OBD_CFG_REC) {
616                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
617                 RETURN(-EINVAL);
618         }
619
620         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
621         if (rc) {
622                 CERROR("Insane cfg\n");
623                 RETURN(rc);
624         }
625
626         /* We only care about markers */
627         if (lcfg->lcfg_command != LCFG_MARKER)
628                 RETURN(0);
629
630         marker = lustre_cfg_buf(lcfg, 1);
631         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
632             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
633             !(marker->cm_flags & CM_SKIP)) {
634                 /* Found a non-skipped marker match */
635                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
636                        rec->lrh_index, marker->cm_step,
637                        marker->cm_flags, mml->mml_marker.cm_flags,
638                        marker->cm_tgtname, marker->cm_comment);
639                 /* Overwrite the old marker llog entry */
640                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
641                 marker->cm_flags |= mml->mml_marker.cm_flags;
642                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
643                 /* Header and tail are added back to lrh_len in
644                    llog_lvfs_write_rec */
645                 rec->lrh_len = cfg_len;
646                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
647                                 rec->lrh_index);
648                 if (!rc)
649                          mml->mml_modified++;
650         }
651
652         RETURN(rc);
653 }
654
655 /**
656  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
657  * Return code:
658  * 0 - modified successfully,
659  * 1 - no modification was done
660  * negative - error
661  */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct llog_ctxt *ctxt;
668         struct mgs_modify_lookup *mml;
669         int rc;
670
671         ENTRY;
672
673         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
674         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
675                flags);
676
677         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
678         LASSERT(ctxt != NULL);
679         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
680         if (rc < 0) {
681                 if (rc == -ENOENT)
682                         rc = 0;
683                 GOTO(out_pop, rc);
684         }
685
686         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
687         if (rc)
688                 GOTO(out_close, rc);
689
690         if (llog_get_size(loghandle) <= 1)
691                 GOTO(out_close, rc = 0);
692
693         OBD_ALLOC_PTR(mml);
694         if (!mml)
695                 GOTO(out_close, rc = -ENOMEM);
696         if (strlcpy(mml->mml_marker.cm_comment, comment,
697                     sizeof(mml->mml_marker.cm_comment)) >=
698             sizeof(mml->mml_marker.cm_comment))
699                 GOTO(out_close, rc = -E2BIG);
700         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
701                     sizeof(mml->mml_marker.cm_tgtname)) >=
702             sizeof(mml->mml_marker.cm_tgtname))
703                 GOTO(out_close, rc = -E2BIG);
704         /* Modify mostly means cancel */
705         mml->mml_marker.cm_flags = flags;
706         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
707         mml->mml_modified = 0;
708         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
709                           NULL);
710         if (!rc && !mml->mml_modified)
711                 rc = 1;
712         OBD_FREE_PTR(mml);
713
714 out_close:
715         llog_close(env, loghandle);
716 out_pop:
717         if (rc < 0)
718                 CERROR("%s: modify %s/%s failed: rc = %d\n",
719                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
720         llog_ctxt_put(ctxt);
721         RETURN(rc);
722 }
723
724 /** This structure is passed to mgs_replace_handler */
725 struct mgs_replace_uuid_lookup {
726         /* Nids are replaced for this target device */
727         struct mgs_target_info target;
728         /* Temporary modified llog */
729         struct llog_handle *temp_llh;
730         /* Flag is set if in target block*/
731         int in_target_device;
732         /* Nids already added. Just skip (multiple nids) */
733         int device_nids_added;
734         /* Flag is set if this block should not be copied */
735         int skip_it;
736 };
737
738 /**
739  * Check: a) if block should be skipped
740  * b) is it target block
741  *
742  * \param[in] lcfg
743  * \param[in] mrul
744  *
745  * \retval 0 should not to be skipped
746  * \retval 1 should to be skipped
747  */
748 static int check_markers(struct lustre_cfg *lcfg,
749                          struct mgs_replace_uuid_lookup *mrul)
750 {
751          struct cfg_marker *marker;
752
753         /* Track markers. Find given device */
754         if (lcfg->lcfg_command == LCFG_MARKER) {
755                 marker = lustre_cfg_buf(lcfg, 1);
756                 /* Clean llog from records marked as CM_EXCLUDE.
757                    CM_SKIP records are used for "active" command
758                    and can be restored if needed */
759                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
760                     (CM_EXCLUDE | CM_START)) {
761                         mrul->skip_it = 1;
762                         return 1;
763                 }
764
765                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
766                     (CM_EXCLUDE | CM_END)) {
767                         mrul->skip_it = 0;
768                         return 1;
769                 }
770
771                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
772                         LASSERT(!(marker->cm_flags & CM_START) ||
773                                 !(marker->cm_flags & CM_END));
774                         if (marker->cm_flags & CM_START) {
775                                 mrul->in_target_device = 1;
776                                 mrul->device_nids_added = 0;
777                         } else if (marker->cm_flags & CM_END)
778                                 mrul->in_target_device = 0;
779                 }
780         }
781
782         return 0;
783 }
784
785 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
786                        struct lustre_cfg *lcfg)
787 {
788         struct llog_rec_hdr      rec;
789         int                      buflen, rc;
790
791         if (!lcfg || !llh)
792                 return -ENOMEM;
793
794         LASSERT(llh->lgh_ctxt);
795
796         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
797                                 lcfg->lcfg_buflens);
798         rec.lrh_len = llog_data_len(buflen);
799         rec.lrh_type = OBD_CFG_REC;
800
801         /* idx = -1 means append */
802         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
803         if (rc)
804                 CERROR("failed %d\n", rc);
805         return rc;
806 }
807
808 static int record_base(const struct lu_env *env, struct llog_handle *llh,
809                      char *cfgname, lnet_nid_t nid, int cmd,
810                      char *s1, char *s2, char *s3, char *s4)
811 {
812         struct mgs_thread_info *mgi = mgs_env_info(env);
813         struct lustre_cfg     *lcfg;
814         int rc;
815
816         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
817                cmd, s1, s2, s3, s4);
818
819         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
820         if (s1)
821                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
822         if (s2)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
824         if (s3)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
826         if (s4)
827                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
828
829         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
830         if (!lcfg)
831                 return -ENOMEM;
832         lcfg->lcfg_nid = nid;
833
834         rc = record_lcfg(env, llh, lcfg);
835
836         lustre_cfg_free(lcfg);
837
838         if (rc) {
839                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
840                        cmd, s1, s2, s3, s4);
841         }
842         return rc;
843 }
844
845 static inline int record_add_uuid(const struct lu_env *env,
846                                   struct llog_handle *llh,
847                                   uint64_t nid, char *uuid)
848 {
849         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
850 }
851
852 static inline int record_add_conn(const struct lu_env *env,
853                                   struct llog_handle *llh,
854                                   char *devname, char *uuid)
855 {
856         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
857 }
858
859 static inline int record_attach(const struct lu_env *env,
860                                 struct llog_handle *llh, char *devname,
861                                 char *type, char *uuid)
862 {
863         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
864 }
865
866 static inline int record_setup(const struct lu_env *env,
867                                struct llog_handle *llh, char *devname,
868                                char *s1, char *s2, char *s3, char *s4)
869 {
870         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
871 }
872
873 /**
874  * \retval <0 record processing error
875  * \retval n record is processed. No need copy original one.
876  * \retval 0 record is not processed.
877  */
878 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
879                            struct mgs_replace_uuid_lookup *mrul)
880 {
881         int nids_added = 0;
882         lnet_nid_t nid;
883         char *ptr;
884         int rc;
885
886         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
887                 /* LCFG_ADD_UUID command found. Let's skip original command
888                    and add passed nids */
889                 ptr = mrul->target.mti_params;
890                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
891                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
892                                "device %s\n", libcfs_nid2str(nid),
893                                 mrul->target.mti_params,
894                                 mrul->target.mti_svname);
895                         rc = record_add_uuid(env,
896                                              mrul->temp_llh, nid,
897                                              mrul->target.mti_params);
898                         if (!rc)
899                                 nids_added++;
900                 }
901
902                 if (nids_added == 0) {
903                         CERROR("No new nids were added, nid %s with uuid %s, "
904                                "device %s\n", libcfs_nid2str(nid),
905                                mrul->target.mti_params,
906                                mrul->target.mti_svname);
907                         RETURN(-ENXIO);
908                 } else {
909                         mrul->device_nids_added = 1;
910                 }
911
912                 return nids_added;
913         }
914
915         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
916                 /* LCFG_SETUP command found. UUID should be changed */
917                 rc = record_setup(env,
918                                   mrul->temp_llh,
919                                   /* devname the same */
920                                   lustre_cfg_string(lcfg, 0),
921                                   /* s1 is not changed */
922                                   lustre_cfg_string(lcfg, 1),
923                                   /* new uuid should be
924                                   the full nidlist */
925                                   mrul->target.mti_params,
926                                   /* s3 is not changed */
927                                   lustre_cfg_string(lcfg, 3),
928                                   /* s4 is not changed */
929                                   lustre_cfg_string(lcfg, 4));
930                 return rc ? rc : 1;
931         }
932
933         /* Another commands in target device block */
934         return 0;
935 }
936
937 /**
938  * Handler that called for every record in llog.
939  * Records are processed in order they placed in llog.
940  *
941  * \param[in] llh       log to be processed
942  * \param[in] rec       current record
943  * \param[in] data      mgs_replace_uuid_lookup structure
944  *
945  * \retval 0    success
946  */
947 static int mgs_replace_handler(const struct lu_env *env,
948                                struct llog_handle *llh,
949                                struct llog_rec_hdr *rec,
950                                void *data)
951 {
952         struct mgs_replace_uuid_lookup *mrul;
953         struct lustre_cfg *lcfg = REC_DATA(rec);
954         int cfg_len = REC_DATA_LEN(rec);
955         int rc;
956         ENTRY;
957
958         mrul = (struct mgs_replace_uuid_lookup *)data;
959
960         if (rec->lrh_type != OBD_CFG_REC) {
961                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
962                        rec->lrh_type, lcfg->lcfg_command,
963                        lustre_cfg_string(lcfg, 0),
964                        lustre_cfg_string(lcfg, 1));
965                 RETURN(-EINVAL);
966         }
967
968         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
969         if (rc) {
970                 /* Do not copy any invalidated records */
971                 GOTO(skip_out, rc = 0);
972         }
973
974         rc = check_markers(lcfg, mrul);
975         if (rc || mrul->skip_it)
976                 GOTO(skip_out, rc = 0);
977
978         /* Write to new log all commands outside target device block */
979         if (!mrul->in_target_device)
980                 GOTO(copy_out, rc = 0);
981
982         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
983            (failover nids) for this target, assuming that if then
984            primary is changing then so is the failover */
985         if (mrul->device_nids_added &&
986             (lcfg->lcfg_command == LCFG_ADD_UUID ||
987              lcfg->lcfg_command == LCFG_ADD_CONN))
988                 GOTO(skip_out, rc = 0);
989
990         rc = process_command(env, lcfg, mrul);
991         if (rc < 0)
992                 RETURN(rc);
993
994         if (rc)
995                 RETURN(0);
996 copy_out:
997         /* Record is placed in temporary llog as is */
998         rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1);
999
1000         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1001                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1002                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1003         RETURN(rc);
1004
1005 skip_out:
1006         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1007                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1008                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1009         RETURN(rc);
1010 }
1011
1012 static int mgs_log_is_empty(const struct lu_env *env,
1013                             struct mgs_device *mgs, char *name)
1014 {
1015         struct llog_ctxt        *ctxt;
1016         int                      rc;
1017
1018         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1019         LASSERT(ctxt != NULL);
1020
1021         rc = llog_is_empty(env, ctxt, name);
1022         llog_ctxt_put(ctxt);
1023         return rc;
1024 }
1025
1026 static int mgs_replace_nids_log(const struct lu_env *env,
1027                                 struct obd_device *mgs, struct fs_db *fsdb,
1028                                 char *logname, char *devname, char *nids)
1029 {
1030         struct llog_handle *orig_llh, *backup_llh;
1031         struct llog_ctxt *ctxt;
1032         struct mgs_replace_uuid_lookup *mrul;
1033         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1034         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1035         char *backup;
1036         int rc, rc2;
1037         ENTRY;
1038
1039         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1040
1041         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1042         LASSERT(ctxt != NULL);
1043
1044         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1045                 /* Log is empty. Nothing to replace */
1046                 GOTO(out_put, rc = 0);
1047         }
1048
1049         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1050         if (backup == NULL)
1051                 GOTO(out_put, rc = -ENOMEM);
1052
1053         sprintf(backup, "%s.bak", logname);
1054
1055         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1056         if (rc == 0) {
1057                 /* Now erase original log file. Connections are not allowed.
1058                    Backup is already saved */
1059                 rc = llog_erase(env, ctxt, NULL, logname);
1060                 if (rc < 0)
1061                         GOTO(out_free, rc);
1062         } else if (rc != -ENOENT) {
1063                 CERROR("%s: can't make backup for %s: rc = %d\n",
1064                        mgs->obd_name, logname, rc);
1065                 GOTO(out_free,rc);
1066         }
1067
1068         /* open local log */
1069         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1070         if (rc)
1071                 GOTO(out_restore, rc);
1072
1073         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1074         if (rc)
1075                 GOTO(out_closel, rc);
1076
1077         /* open backup llog */
1078         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1079                        LLOG_OPEN_EXISTS);
1080         if (rc)
1081                 GOTO(out_closel, rc);
1082
1083         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1084         if (rc)
1085                 GOTO(out_close, rc);
1086
1087         if (llog_get_size(backup_llh) <= 1)
1088                 GOTO(out_close, rc = 0);
1089
1090         OBD_ALLOC_PTR(mrul);
1091         if (!mrul)
1092                 GOTO(out_close, rc = -ENOMEM);
1093         /* devname is only needed information to replace UUID records */
1094         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1095         /* parse nids later */
1096         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1097         /* Copy records to this temporary llog */
1098         mrul->temp_llh = orig_llh;
1099
1100         rc = llog_process(env, backup_llh, mgs_replace_handler,
1101                           (void *)mrul, NULL);
1102         OBD_FREE_PTR(mrul);
1103 out_close:
1104         rc2 = llog_close(NULL, backup_llh);
1105         if (!rc)
1106                 rc = rc2;
1107 out_closel:
1108         rc2 = llog_close(NULL, orig_llh);
1109         if (!rc)
1110                 rc = rc2;
1111
1112 out_restore:
1113         if (rc) {
1114                 CERROR("%s: llog should be restored: rc = %d\n",
1115                        mgs->obd_name, rc);
1116                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1117                                   logname);
1118                 if (rc2 < 0)
1119                         CERROR("%s: can't restore backup %s: rc = %d\n",
1120                                mgs->obd_name, logname, rc2);
1121         }
1122
1123 out_free:
1124         OBD_FREE(backup, strlen(backup) + 1);
1125
1126 out_put:
1127         llog_ctxt_put(ctxt);
1128
1129         if (rc)
1130                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1131                        mgs->obd_name, logname, rc);
1132
1133         RETURN(rc);
1134 }
1135
1136 /**
1137  * Parse device name and get file system name and/or device index
1138  *
1139  * \param[in]   devname device name (ex. lustre-MDT0000)
1140  * \param[out]  fsname  file system name(optional)
1141  * \param[out]  index   device index(optional)
1142  *
1143  * \retval 0    success
1144  */
1145 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1146 {
1147         int rc;
1148         ENTRY;
1149
1150         /* Extract fsname */
1151         if (fsname) {
1152                 rc = server_name2fsname(devname, fsname, NULL);
1153                 if (rc < 0) {
1154                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1155                                devname);
1156                         RETURN(-EINVAL);
1157                 }
1158         }
1159
1160         if (index) {
1161                 rc = server_name2index(devname, index, NULL);
1162                 if (rc < 0) {
1163                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1164                                devname);
1165                         RETURN(-EINVAL);
1166                 }
1167         }
1168
1169         RETURN(0);
1170 }
1171
1172 static int only_mgs_is_running(struct obd_device *mgs_obd)
1173 {
1174         /* TDB: Is global variable with devices count exists? */
1175         int num_devices = get_devices_count();
1176         /* osd, MGS and MGC + self_export
1177            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1178         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1179 }
1180
1181 static int name_create_mdt(char **logname, char *fsname, int i)
1182 {
1183         char mdt_index[9];
1184
1185         sprintf(mdt_index, "-MDT%04x", i);
1186         return name_create(logname, fsname, mdt_index);
1187 }
1188
1189 /**
1190  * Replace nids for \a device to \a nids values
1191  *
1192  * \param obd           MGS obd device
1193  * \param devname       nids need to be replaced for this device
1194  * (ex. lustre-OST0000)
1195  * \param nids          nids list (ex. nid1,nid2,nid3)
1196  *
1197  * \retval 0    success
1198  */
1199 int mgs_replace_nids(const struct lu_env *env,
1200                      struct mgs_device *mgs,
1201                      char *devname, char *nids)
1202 {
1203         /* Assume fsname is part of device name */
1204         char fsname[MTI_NAME_MAXLEN];
1205         int rc;
1206         __u32 index;
1207         char *logname;
1208         struct fs_db *fsdb;
1209         unsigned int i;
1210         int conn_state;
1211         struct obd_device *mgs_obd = mgs->mgs_obd;
1212         ENTRY;
1213
1214         /* We can only change NIDs if no other nodes are connected */
1215         spin_lock(&mgs_obd->obd_dev_lock);
1216         conn_state = mgs_obd->obd_no_conn;
1217         mgs_obd->obd_no_conn = 1;
1218         spin_unlock(&mgs_obd->obd_dev_lock);
1219
1220         /* We can not change nids if not only MGS is started */
1221         if (!only_mgs_is_running(mgs_obd)) {
1222                 CERROR("Only MGS is allowed to be started\n");
1223                 GOTO(out, rc = -EINPROGRESS);
1224         }
1225
1226         /* Get fsname and index*/
1227         rc = mgs_parse_devname(devname, fsname, &index);
1228         if (rc)
1229                 GOTO(out, rc);
1230
1231         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1232         if (rc) {
1233                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1234                 GOTO(out, rc);
1235         }
1236
1237         /* Process client llogs */
1238         name_create(&logname, fsname, "-client");
1239         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1240         name_destroy(&logname);
1241         if (rc) {
1242                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1243                        fsname, devname, rc);
1244                 GOTO(out, rc);
1245         }
1246
1247         /* Process MDT llogs */
1248         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1249                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1250                         continue;
1251                 name_create_mdt(&logname, fsname, i);
1252                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1253                 name_destroy(&logname);
1254                 if (rc)
1255                         GOTO(out, rc);
1256         }
1257
1258 out:
1259         spin_lock(&mgs_obd->obd_dev_lock);
1260         mgs_obd->obd_no_conn = conn_state;
1261         spin_unlock(&mgs_obd->obd_dev_lock);
1262
1263         RETURN(rc);
1264 }
1265
1266 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1267                             char *devname, struct lov_desc *desc)
1268 {
1269         struct mgs_thread_info *mgi = mgs_env_info(env);
1270         struct lustre_cfg *lcfg;
1271         int rc;
1272
1273         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1274         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1275         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1276         if (!lcfg)
1277                 return -ENOMEM;
1278         rc = record_lcfg(env, llh, lcfg);
1279
1280         lustre_cfg_free(lcfg);
1281         return rc;
1282 }
1283
1284 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1285                             char *devname, struct lmv_desc *desc)
1286 {
1287         struct mgs_thread_info *mgi = mgs_env_info(env);
1288         struct lustre_cfg *lcfg;
1289         int rc;
1290
1291         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1292         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1293         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1294
1295         rc = record_lcfg(env, llh, lcfg);
1296
1297         lustre_cfg_free(lcfg);
1298         return rc;
1299 }
1300
1301 static inline int record_mdc_add(const struct lu_env *env,
1302                                  struct llog_handle *llh,
1303                                  char *logname, char *mdcuuid,
1304                                  char *mdtuuid, char *index,
1305                                  char *gen)
1306 {
1307         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1308                            mdtuuid,index,gen,mdcuuid);
1309 }
1310
1311 static inline int record_lov_add(const struct lu_env *env,
1312                                  struct llog_handle *llh,
1313                                  char *lov_name, char *ost_uuid,
1314                                  char *index, char *gen)
1315 {
1316         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1317                            ost_uuid, index, gen, 0);
1318 }
1319
1320 static inline int record_mount_opt(const struct lu_env *env,
1321                                    struct llog_handle *llh,
1322                                    char *profile, char *lov_name,
1323                                    char *mdc_name)
1324 {
1325         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1326                            profile,lov_name,mdc_name,0);
1327 }
1328
1329 static int record_marker(const struct lu_env *env,
1330                          struct llog_handle *llh,
1331                          struct fs_db *fsdb, __u32 flags,
1332                          char *tgtname, char *comment)
1333 {
1334         struct mgs_thread_info *mgi = mgs_env_info(env);
1335         struct lustre_cfg *lcfg;
1336         int rc;
1337         int cplen = 0;
1338
1339         if (flags & CM_START)
1340                 fsdb->fsdb_gen++;
1341         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1342         mgi->mgi_marker.cm_flags = flags;
1343         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1344         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1345                         sizeof(mgi->mgi_marker.cm_tgtname));
1346         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1347                 return -E2BIG;
1348         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1349                         sizeof(mgi->mgi_marker.cm_comment));
1350         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1351                 return -E2BIG;
1352         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1353         mgi->mgi_marker.cm_canceltime = 0;
1354         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1355         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1356                             sizeof(mgi->mgi_marker));
1357         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1358         if (!lcfg)
1359                 return -ENOMEM;
1360         rc = record_lcfg(env, llh, lcfg);
1361
1362         lustre_cfg_free(lcfg);
1363         return rc;
1364 }
1365
1366 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1367                             struct llog_handle **llh, char *name)
1368 {
1369         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1370         struct llog_ctxt        *ctxt;
1371         int                      rc = 0;
1372
1373         if (*llh)
1374                 GOTO(out, rc = -EBUSY);
1375
1376         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1377         if (!ctxt)
1378                 GOTO(out, rc = -ENODEV);
1379         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1380
1381         rc = llog_open_create(env, ctxt, llh, NULL, name);
1382         if (rc)
1383                 GOTO(out_ctxt, rc);
1384         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1385         if (rc)
1386                 llog_close(env, *llh);
1387 out_ctxt:
1388         llog_ctxt_put(ctxt);
1389 out:
1390         if (rc) {
1391                 CERROR("%s: can't start log %s: rc = %d\n",
1392                        mgs->mgs_obd->obd_name, name, rc);
1393                 *llh = NULL;
1394         }
1395         RETURN(rc);
1396 }
1397
1398 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1399 {
1400         int rc;
1401
1402         rc = llog_close(env, *llh);
1403         *llh = NULL;
1404
1405         return rc;
1406 }
1407
1408 /******************** config "macros" *********************/
1409
1410 /* write an lcfg directly into a log (with markers) */
1411 static int mgs_write_log_direct(const struct lu_env *env,
1412                                 struct mgs_device *mgs, struct fs_db *fsdb,
1413                                 char *logname, struct lustre_cfg *lcfg,
1414                                 char *devname, char *comment)
1415 {
1416         struct llog_handle *llh = NULL;
1417         int rc;
1418         ENTRY;
1419
1420         if (!lcfg)
1421                 RETURN(-ENOMEM);
1422
1423         rc = record_start_log(env, mgs, &llh, logname);
1424         if (rc)
1425                 RETURN(rc);
1426
1427         /* FIXME These should be a single journal transaction */
1428         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1429         if (rc)
1430                 GOTO(out_end, rc);
1431         rc = record_lcfg(env, llh, lcfg);
1432         if (rc)
1433                 GOTO(out_end, rc);
1434         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1435         if (rc)
1436                 GOTO(out_end, rc);
1437 out_end:
1438         record_end_log(env, &llh);
1439         RETURN(rc);
1440 }
1441
1442 /* write the lcfg in all logs for the given fs */
1443 int mgs_write_log_direct_all(const struct lu_env *env,
1444                              struct mgs_device *mgs,
1445                              struct fs_db *fsdb,
1446                              struct mgs_target_info *mti,
1447                              struct lustre_cfg *lcfg,
1448                              char *devname, char *comment,
1449                              int server_only)
1450 {
1451         cfs_list_t list;
1452         struct mgs_direntry *dirent, *n;
1453         char *fsname = mti->mti_fsname;
1454         char *logname;
1455         int rc = 0, len = strlen(fsname);
1456         ENTRY;
1457
1458         /* We need to set params for any future logs
1459            as well. FIXME Append this file to every new log.
1460            Actually, we should store as params (text), not llogs.  Or
1461            in a database. */
1462         rc = name_create(&logname, fsname, "-params");
1463         if (rc)
1464                 RETURN(rc);
1465         if (mgs_log_is_empty(env, mgs, logname)) {
1466                 struct llog_handle *llh = NULL;
1467                 rc = record_start_log(env, mgs, &llh, logname);
1468                 record_end_log(env, &llh);
1469         }
1470         name_destroy(&logname);
1471         if (rc)
1472                 RETURN(rc);
1473
1474         /* Find all the logs in the CONFIGS directory */
1475         rc = class_dentry_readdir(env, mgs, &list);
1476         if (rc)
1477                 RETURN(rc);
1478
1479         /* Could use fsdb index maps instead of directory listing */
1480         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1481                 cfs_list_del(&dirent->list);
1482                 /* don't write to sptlrpc rule log */
1483                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1484                         goto next;
1485
1486                 /* caller wants write server logs only */
1487                 if (server_only && strstr(dirent->name, "-client") != NULL)
1488                         goto next;
1489
1490                 if (strncmp(fsname, dirent->name, len) == 0) {
1491                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1492                         /* Erase any old settings of this same parameter */
1493                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1494                                         devname, comment, CM_SKIP);
1495                         if (rc < 0)
1496                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1497                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1498                         /* Write the new one */
1499                         if (lcfg) {
1500                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1501                                                           dirent->name,
1502                                                           lcfg, devname,
1503                                                           comment);
1504                                 if (rc)
1505                                         CERROR("%s: writing log %s: rc = %d\n",
1506                                                mgs->mgs_obd->obd_name,
1507                                                dirent->name, rc);
1508                         }
1509                 }
1510 next:
1511                 mgs_direntry_free(dirent);
1512         }
1513
1514         RETURN(rc);
1515 }
1516
1517 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1518                                     struct mgs_device *mgs,
1519                                     struct fs_db *fsdb,
1520                                     struct mgs_target_info *mti,
1521                                     int index, char *logname);
1522 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1523                                     struct mgs_device *mgs,
1524                                     struct fs_db *fsdb,
1525                                     struct mgs_target_info *mti,
1526                                     char *logname, char *suffix, char *lovname,
1527                                     enum lustre_sec_part sec_part, int flags);
1528 static int name_create_mdt_and_lov(char **logname, char **lovname,
1529                                    struct fs_db *fsdb, int i);
1530
1531 static int add_param(char *params, char *key, char *val)
1532 {
1533         char *start = params + strlen(params);
1534         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1535         int keylen = 0;
1536
1537         if (key != NULL)
1538                 keylen = strlen(key);
1539         if (start + 1 + keylen + strlen(val) >= end) {
1540                 CERROR("params are too long: %s %s%s\n",
1541                        params, key != NULL ? key : "", val);
1542                 return -EINVAL;
1543         }
1544
1545         sprintf(start, " %s%s", key != NULL ? key : "", val);
1546         return 0;
1547 }
1548
1549 /**
1550  * Walk through client config log record and convert the related records
1551  * into the target.
1552  **/
1553 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1554                                          struct llog_handle *llh,
1555                                          struct llog_rec_hdr *rec, void *data)
1556 {
1557         struct mgs_device *mgs;
1558         struct obd_device *obd;
1559         struct mgs_target_info *mti, *tmti;
1560         struct fs_db *fsdb;
1561         int cfg_len = rec->lrh_len;
1562         char *cfg_buf = (char*) (rec + 1);
1563         struct lustre_cfg *lcfg;
1564         int rc = 0;
1565         struct llog_handle *mdt_llh = NULL;
1566         static int got_an_osc_or_mdc = 0;
1567         /* 0: not found any osc/mdc;
1568            1: found osc;
1569            2: found mdc;
1570         */
1571         static int last_step = -1;
1572         int cplen = 0;
1573
1574         ENTRY;
1575
1576         mti = ((struct temp_comp*)data)->comp_mti;
1577         tmti = ((struct temp_comp*)data)->comp_tmti;
1578         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1579         obd = ((struct temp_comp *)data)->comp_obd;
1580         mgs = lu2mgs_dev(obd->obd_lu_dev);
1581         LASSERT(mgs);
1582
1583         if (rec->lrh_type != OBD_CFG_REC) {
1584                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1585                 RETURN(-EINVAL);
1586         }
1587
1588         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1589         if (rc) {
1590                 CERROR("Insane cfg\n");
1591                 RETURN(rc);
1592         }
1593
1594         lcfg = (struct lustre_cfg *)cfg_buf;
1595
1596         if (lcfg->lcfg_command == LCFG_MARKER) {
1597                 struct cfg_marker *marker;
1598                 marker = lustre_cfg_buf(lcfg, 1);
1599                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1600                     (marker->cm_flags & CM_START) &&
1601                      !(marker->cm_flags & CM_SKIP)) {
1602                         got_an_osc_or_mdc = 1;
1603                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1604                                         sizeof(tmti->mti_svname));
1605                         if (cplen >= sizeof(tmti->mti_svname))
1606                                 RETURN(-E2BIG);
1607                         rc = record_start_log(env, mgs, &mdt_llh,
1608                                               mti->mti_svname);
1609                         if (rc)
1610                                 RETURN(rc);
1611                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1612                                            mti->mti_svname, "add osc(copied)");
1613                         record_end_log(env, &mdt_llh);
1614                         last_step = marker->cm_step;
1615                         RETURN(rc);
1616                 }
1617                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1618                     (marker->cm_flags & CM_END) &&
1619                      !(marker->cm_flags & CM_SKIP)) {
1620                         LASSERT(last_step == marker->cm_step);
1621                         last_step = -1;
1622                         got_an_osc_or_mdc = 0;
1623                         memset(tmti, 0, sizeof(*tmti));
1624                         rc = record_start_log(env, mgs, &mdt_llh,
1625                                               mti->mti_svname);
1626                         if (rc)
1627                                 RETURN(rc);
1628                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1629                                            mti->mti_svname, "add osc(copied)");
1630                         record_end_log(env, &mdt_llh);
1631                         RETURN(rc);
1632                 }
1633                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1634                     (marker->cm_flags & CM_START) &&
1635                      !(marker->cm_flags & CM_SKIP)) {
1636                         got_an_osc_or_mdc = 2;
1637                         last_step = marker->cm_step;
1638                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1639                                strlen(marker->cm_tgtname));
1640
1641                         RETURN(rc);
1642                 }
1643                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1644                     (marker->cm_flags & CM_END) &&
1645                      !(marker->cm_flags & CM_SKIP)) {
1646                         LASSERT(last_step == marker->cm_step);
1647                         last_step = -1;
1648                         got_an_osc_or_mdc = 0;
1649                         memset(tmti, 0, sizeof(*tmti));
1650                         RETURN(rc);
1651                 }
1652         }
1653
1654         if (got_an_osc_or_mdc == 0 || last_step < 0)
1655                 RETURN(rc);
1656
1657         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1658                 uint64_t nodenid = lcfg->lcfg_nid;
1659
1660                 if (strlen(tmti->mti_uuid) == 0) {
1661                         /* target uuid not set, this config record is before
1662                          * LCFG_SETUP, this nid is one of target node nid.
1663                          */
1664                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1665                         tmti->mti_nid_count++;
1666                 } else {
1667                         /* failover node nid */
1668                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1669                                        libcfs_nid2str(nodenid));
1670                 }
1671
1672                 RETURN(rc);
1673         }
1674
1675         if (lcfg->lcfg_command == LCFG_SETUP) {
1676                 char *target;
1677
1678                 target = lustre_cfg_string(lcfg, 1);
1679                 memcpy(tmti->mti_uuid, target, strlen(target));
1680                 RETURN(rc);
1681         }
1682
1683         /* ignore client side sptlrpc_conf_log */
1684         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1685                 RETURN(rc);
1686
1687         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1688                 int index;
1689
1690                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1691                         RETURN (-EINVAL);
1692
1693                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1694                        strlen(mti->mti_fsname));
1695                 tmti->mti_stripe_index = index;
1696
1697                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1698                                               mti->mti_stripe_index,
1699                                               mti->mti_svname);
1700                 memset(tmti, 0, sizeof(*tmti));
1701                 RETURN(rc);
1702         }
1703
1704         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1705                 int index;
1706                 char mdt_index[9];
1707                 char *logname, *lovname;
1708
1709                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1710                                              mti->mti_stripe_index);
1711                 if (rc)
1712                         RETURN(rc);
1713                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1714
1715                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1716                         name_destroy(&logname);
1717                         name_destroy(&lovname);
1718                         RETURN(-EINVAL);
1719                 }
1720
1721                 tmti->mti_stripe_index = index;
1722                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1723                                          mdt_index, lovname,
1724                                          LUSTRE_SP_MDT, 0);
1725                 name_destroy(&logname);
1726                 name_destroy(&lovname);
1727                 RETURN(rc);
1728         }
1729         RETURN(rc);
1730 }
1731
1732 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1733 /* stealed from mgs_get_fsdb_from_llog*/
1734 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1735                                               struct mgs_device *mgs,
1736                                               char *client_name,
1737                                               struct temp_comp* comp)
1738 {
1739         struct llog_handle *loghandle;
1740         struct mgs_target_info *tmti;
1741         struct llog_ctxt *ctxt;
1742         int rc;
1743
1744         ENTRY;
1745
1746         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1747         LASSERT(ctxt != NULL);
1748
1749         OBD_ALLOC_PTR(tmti);
1750         if (tmti == NULL)
1751                 GOTO(out_ctxt, rc = -ENOMEM);
1752
1753         comp->comp_tmti = tmti;
1754         comp->comp_obd = mgs->mgs_obd;
1755
1756         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1757                        LLOG_OPEN_EXISTS);
1758         if (rc < 0) {
1759                 if (rc == -ENOENT)
1760                         rc = 0;
1761                 GOTO(out_pop, rc);
1762         }
1763
1764         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1765         if (rc)
1766                 GOTO(out_close, rc);
1767
1768         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1769                                   (void *)comp, NULL, false);
1770         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1771 out_close:
1772         llog_close(env, loghandle);
1773 out_pop:
1774         OBD_FREE_PTR(tmti);
1775 out_ctxt:
1776         llog_ctxt_put(ctxt);
1777         RETURN(rc);
1778 }
1779
1780 /* lmv is the second thing for client logs */
1781 /* copied from mgs_write_log_lov. Please refer to that.  */
1782 static int mgs_write_log_lmv(const struct lu_env *env,
1783                              struct mgs_device *mgs,
1784                              struct fs_db *fsdb,
1785                              struct mgs_target_info *mti,
1786                              char *logname, char *lmvname)
1787 {
1788         struct llog_handle *llh = NULL;
1789         struct lmv_desc *lmvdesc;
1790         char *uuid;
1791         int rc = 0;
1792         ENTRY;
1793
1794         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1795
1796         OBD_ALLOC_PTR(lmvdesc);
1797         if (lmvdesc == NULL)
1798                 RETURN(-ENOMEM);
1799         lmvdesc->ld_active_tgt_count = 0;
1800         lmvdesc->ld_tgt_count = 0;
1801         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1802         uuid = (char *)lmvdesc->ld_uuid.uuid;
1803
1804         rc = record_start_log(env, mgs, &llh, logname);
1805         if (rc)
1806                 GOTO(out_free, rc);
1807         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1808         if (rc)
1809                 GOTO(out_end, rc);
1810         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1811         if (rc)
1812                 GOTO(out_end, rc);
1813         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1814         if (rc)
1815                 GOTO(out_end, rc);
1816         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1817         if (rc)
1818                 GOTO(out_end, rc);
1819 out_end:
1820         record_end_log(env, &llh);
1821 out_free:
1822         OBD_FREE_PTR(lmvdesc);
1823         RETURN(rc);
1824 }
1825
1826 /* lov is the first thing in the mdt and client logs */
1827 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1828                              struct fs_db *fsdb, struct mgs_target_info *mti,
1829                              char *logname, char *lovname)
1830 {
1831         struct llog_handle *llh = NULL;
1832         struct lov_desc *lovdesc;
1833         char *uuid;
1834         int rc = 0;
1835         ENTRY;
1836
1837         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1838
1839         /*
1840         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1841         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1842               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1843         */
1844
1845         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1846         OBD_ALLOC_PTR(lovdesc);
1847         if (lovdesc == NULL)
1848                 RETURN(-ENOMEM);
1849         lovdesc->ld_magic = LOV_DESC_MAGIC;
1850         lovdesc->ld_tgt_count = 0;
1851         /* Defaults.  Can be changed later by lcfg config_param */
1852         lovdesc->ld_default_stripe_count = 1;
1853         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1854         lovdesc->ld_default_stripe_size = 1024 * 1024;
1855         lovdesc->ld_default_stripe_offset = -1;
1856         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1857         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1858         /* can these be the same? */
1859         uuid = (char *)lovdesc->ld_uuid.uuid;
1860
1861         /* This should always be the first entry in a log.
1862         rc = mgs_clear_log(obd, logname); */
1863         rc = record_start_log(env, mgs, &llh, logname);
1864         if (rc)
1865                 GOTO(out_free, rc);
1866         /* FIXME these should be a single journal transaction */
1867         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1868         if (rc)
1869                 GOTO(out_end, rc);
1870         rc = record_attach(env, llh, lovname, "lov", uuid);
1871         if (rc)
1872                 GOTO(out_end, rc);
1873         rc = record_lov_setup(env, llh, lovname, lovdesc);
1874         if (rc)
1875                 GOTO(out_end, rc);
1876         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1877         if (rc)
1878                 GOTO(out_end, rc);
1879         EXIT;
1880 out_end:
1881         record_end_log(env, &llh);
1882 out_free:
1883         OBD_FREE_PTR(lovdesc);
1884         return rc;
1885 }
1886
1887 /* add failnids to open log */
1888 static int mgs_write_log_failnids(const struct lu_env *env,
1889                                   struct mgs_target_info *mti,
1890                                   struct llog_handle *llh,
1891                                   char *cliname)
1892 {
1893         char *failnodeuuid = NULL;
1894         char *ptr = mti->mti_params;
1895         lnet_nid_t nid;
1896         int rc = 0;
1897
1898         /*
1899         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1900         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1901         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1902         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1903         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1904         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1905         */
1906
1907         /* Pull failnid info out of params string */
1908         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1909                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1910                         if (failnodeuuid == NULL) {
1911                                 /* We don't know the failover node name,
1912                                    so just use the first nid as the uuid */
1913                                 rc = name_create(&failnodeuuid,
1914                                                  libcfs_nid2str(nid), "");
1915                                 if (rc)
1916                                         return rc;
1917                         }
1918                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1919                                "client %s\n", libcfs_nid2str(nid),
1920                                failnodeuuid, cliname);
1921                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1922                 }
1923                 if (failnodeuuid)
1924                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1925         }
1926
1927         name_destroy(&failnodeuuid);
1928         return rc;
1929 }
1930
1931 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1932                                     struct mgs_device *mgs,
1933                                     struct fs_db *fsdb,
1934                                     struct mgs_target_info *mti,
1935                                     char *logname, char *lmvname)
1936 {
1937         struct llog_handle *llh = NULL;
1938         char *mdcname = NULL;
1939         char *nodeuuid = NULL;
1940         char *mdcuuid = NULL;
1941         char *lmvuuid = NULL;
1942         char index[6];
1943         int i, rc;
1944         ENTRY;
1945
1946         if (mgs_log_is_empty(env, mgs, logname)) {
1947                 CERROR("log is empty! Logical error\n");
1948                 RETURN(-EINVAL);
1949         }
1950
1951         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1952                mti->mti_svname, logname, lmvname);
1953
1954         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1955         if (rc)
1956                 RETURN(rc);
1957         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1958         if (rc)
1959                 GOTO(out_free, rc);
1960         rc = name_create(&mdcuuid, mdcname, "_UUID");
1961         if (rc)
1962                 GOTO(out_free, rc);
1963         rc = name_create(&lmvuuid, lmvname, "_UUID");
1964         if (rc)
1965                 GOTO(out_free, rc);
1966
1967         rc = record_start_log(env, mgs, &llh, logname);
1968         if (rc)
1969                 GOTO(out_free, rc);
1970         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1971                            "add mdc");
1972         if (rc)
1973                 GOTO(out_end, rc);
1974         for (i = 0; i < mti->mti_nid_count; i++) {
1975                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1976                        libcfs_nid2str(mti->mti_nids[i]));
1977
1978                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1979                 if (rc)
1980                         GOTO(out_end, rc);
1981         }
1982
1983         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1984         if (rc)
1985                 GOTO(out_end, rc);
1986         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1987         if (rc)
1988                 GOTO(out_end, rc);
1989         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1990         if (rc)
1991                 GOTO(out_end, rc);
1992         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1993         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1994                             index, "1");
1995         if (rc)
1996                 GOTO(out_end, rc);
1997         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
1998                            "add mdc");
1999         if (rc)
2000                 GOTO(out_end, rc);
2001 out_end:
2002         record_end_log(env, &llh);
2003 out_free:
2004         name_destroy(&lmvuuid);
2005         name_destroy(&mdcuuid);
2006         name_destroy(&mdcname);
2007         name_destroy(&nodeuuid);
2008         RETURN(rc);
2009 }
2010
2011 static inline int name_create_lov(char **lovname, char *mdtname,
2012                                   struct fs_db *fsdb, int index)
2013 {
2014         /* COMPAT_180 */
2015         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2016                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2017         else
2018                 return name_create(lovname, mdtname, "-mdtlov");
2019 }
2020
2021 static int name_create_mdt_and_lov(char **logname, char **lovname,
2022                                    struct fs_db *fsdb, int i)
2023 {
2024         int rc;
2025
2026         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2027         if (rc)
2028                 return rc;
2029         /* COMPAT_180 */
2030         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2031                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2032         else
2033                 rc = name_create(lovname, *logname, "-mdtlov");
2034         if (rc) {
2035                 name_destroy(logname);
2036                 *logname = NULL;
2037         }
2038         return rc;
2039 }
2040
2041 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2042                                       struct fs_db *fsdb, int i)
2043 {
2044         char suffix[16];
2045
2046         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2047                 sprintf(suffix, "-osc");
2048         else
2049                 sprintf(suffix, "-osc-MDT%04x", i);
2050         return name_create(oscname, ostname, suffix);
2051 }
2052
2053 /* add new mdc to already existent MDS */
2054 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2055                                     struct mgs_device *mgs,
2056                                     struct fs_db *fsdb,
2057                                     struct mgs_target_info *mti,
2058                                     int mdt_index, char *logname)
2059 {
2060         struct llog_handle      *llh = NULL;
2061         char    *nodeuuid = NULL;
2062         char    *ospname = NULL;
2063         char    *lovuuid = NULL;
2064         char    *mdtuuid = NULL;
2065         char    *svname = NULL;
2066         char    *mdtname = NULL;
2067         char    *lovname = NULL;
2068         char    index_str[16];
2069         int     i, rc;
2070
2071         ENTRY;
2072         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2073                 CERROR("log is empty! Logical error\n");
2074                 RETURN (-EINVAL);
2075         }
2076
2077         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2078                logname);
2079
2080         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2081         if (rc)
2082                 RETURN(rc);
2083
2084         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2085         if (rc)
2086                 GOTO(out_destory, rc);
2087
2088         rc = name_create(&svname, mdtname, "-osp");
2089         if (rc)
2090                 GOTO(out_destory, rc);
2091
2092         sprintf(index_str, "-MDT%04x", mdt_index);
2093         rc = name_create(&ospname, svname, index_str);
2094         if (rc)
2095                 GOTO(out_destory, rc);
2096
2097         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2098         if (rc)
2099                 GOTO(out_destory, rc);
2100
2101         rc = name_create(&lovuuid, lovname, "_UUID");
2102         if (rc)
2103                 GOTO(out_destory, rc);
2104
2105         rc = name_create(&mdtuuid, mdtname, "_UUID");
2106         if (rc)
2107                 GOTO(out_destory, rc);
2108
2109         rc = record_start_log(env, mgs, &llh, logname);
2110         if (rc)
2111                 GOTO(out_destory, rc);
2112
2113         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2114                            "add osp");
2115         if (rc)
2116                 GOTO(out_destory, rc);
2117
2118         for (i = 0; i < mti->mti_nid_count; i++) {
2119                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2120                        libcfs_nid2str(mti->mti_nids[i]));
2121                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2122                 if (rc)
2123                         GOTO(out_end, rc);
2124         }
2125
2126         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2127         if (rc)
2128                 GOTO(out_end, rc);
2129
2130         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2131                           NULL, NULL);
2132         if (rc)
2133                 GOTO(out_end, rc);
2134
2135         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2136         if (rc)
2137                 GOTO(out_end, rc);
2138
2139         /* Add mdc(osp) to lod */
2140         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2141                  mti->mti_stripe_index);
2142         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2143                          index_str, "1", NULL);
2144         if (rc)
2145                 GOTO(out_end, rc);
2146
2147         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2148         if (rc)
2149                 GOTO(out_end, rc);
2150
2151 out_end:
2152         record_end_log(env, &llh);
2153
2154 out_destory:
2155         name_destroy(&mdtuuid);
2156         name_destroy(&lovuuid);
2157         name_destroy(&lovname);
2158         name_destroy(&ospname);
2159         name_destroy(&svname);
2160         name_destroy(&nodeuuid);
2161         name_destroy(&mdtname);
2162         RETURN(rc);
2163 }
2164
2165 static int mgs_write_log_mdt0(const struct lu_env *env,
2166                               struct mgs_device *mgs,
2167                               struct fs_db *fsdb,
2168                               struct mgs_target_info *mti)
2169 {
2170         char *log = mti->mti_svname;
2171         struct llog_handle *llh = NULL;
2172         char *uuid, *lovname;
2173         char mdt_index[6];
2174         char *ptr = mti->mti_params;
2175         int rc = 0, failout = 0;
2176         ENTRY;
2177
2178         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2179         if (uuid == NULL)
2180                 RETURN(-ENOMEM);
2181
2182         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2183                 failout = (strncmp(ptr, "failout", 7) == 0);
2184
2185         rc = name_create(&lovname, log, "-mdtlov");
2186         if (rc)
2187                 GOTO(out_free, rc);
2188         if (mgs_log_is_empty(env, mgs, log)) {
2189                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2190                 if (rc)
2191                         GOTO(out_lod, rc);
2192         }
2193
2194         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2195
2196         rc = record_start_log(env, mgs, &llh, log);
2197         if (rc)
2198                 GOTO(out_lod, rc);
2199
2200         /* add MDT itself */
2201
2202         /* FIXME this whole fn should be a single journal transaction */
2203         sprintf(uuid, "%s_UUID", log);
2204         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2205         if (rc)
2206                 GOTO(out_lod, rc);
2207         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2208         if (rc)
2209                 GOTO(out_end, rc);
2210         rc = record_mount_opt(env, llh, log, lovname, NULL);
2211         if (rc)
2212                 GOTO(out_end, rc);
2213         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2214                         failout ? "n" : "f");
2215         if (rc)
2216                 GOTO(out_end, rc);
2217         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2218         if (rc)
2219                 GOTO(out_end, rc);
2220 out_end:
2221         record_end_log(env, &llh);
2222 out_lod:
2223         name_destroy(&lovname);
2224 out_free:
2225         OBD_FREE(uuid, sizeof(struct obd_uuid));
2226         RETURN(rc);
2227 }
2228
2229 /* envelope method for all layers log */
2230 static int mgs_write_log_mdt(const struct lu_env *env,
2231                              struct mgs_device *mgs,
2232                              struct fs_db *fsdb,
2233                              struct mgs_target_info *mti)
2234 {
2235         struct mgs_thread_info *mgi = mgs_env_info(env);
2236         struct llog_handle *llh = NULL;
2237         char *cliname;
2238         int rc, i = 0;
2239         ENTRY;
2240
2241         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2242
2243         if (mti->mti_uuid[0] == '\0') {
2244                 /* Make up our own uuid */
2245                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2246                          "%s_UUID", mti->mti_svname);
2247         }
2248
2249         /* add mdt */
2250         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2251         if (rc)
2252                 RETURN(rc);
2253         /* Append the mdt info to the client log */
2254         rc = name_create(&cliname, mti->mti_fsname, "-client");
2255         if (rc)
2256                 RETURN(rc);
2257
2258         if (mgs_log_is_empty(env, mgs, cliname)) {
2259                 /* Start client log */
2260                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2261                                        fsdb->fsdb_clilov);
2262                 if (rc)
2263                         GOTO(out_free, rc);
2264                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2265                                        fsdb->fsdb_clilmv);
2266                 if (rc)
2267                         GOTO(out_free, rc);
2268         }
2269
2270         /*
2271         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2272         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2273         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2274         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2275         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2276         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2277         */
2278
2279                 /* copy client info about lov/lmv */
2280                 mgi->mgi_comp.comp_mti = mti;
2281                 mgi->mgi_comp.comp_fsdb = fsdb;
2282
2283                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2284                                                         &mgi->mgi_comp);
2285                 if (rc)
2286                         GOTO(out_free, rc);
2287                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2288                                               fsdb->fsdb_clilmv);
2289                 if (rc)
2290                         GOTO(out_free, rc);
2291
2292                 /* add mountopts */
2293                 rc = record_start_log(env, mgs, &llh, cliname);
2294                 if (rc)
2295                         GOTO(out_free, rc);
2296
2297                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2298                                    "mount opts");
2299                 if (rc)
2300                         GOTO(out_end, rc);
2301                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2302                                       fsdb->fsdb_clilmv);
2303                 if (rc)
2304                         GOTO(out_end, rc);
2305                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2306                                    "mount opts");
2307
2308         if (rc)
2309                 GOTO(out_end, rc);
2310
2311         /* for_all_existing_mdt except current one */
2312         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2313                 if (i !=  mti->mti_stripe_index &&
2314                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2315                         char *logname;
2316
2317                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2318                         if (rc)
2319                                 GOTO(out_end, rc);
2320
2321                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2322                                                       i, logname);
2323                         name_destroy(&logname);
2324                         if (rc)
2325                                 GOTO(out_end, rc);
2326                 }
2327         }
2328 out_end:
2329         record_end_log(env, &llh);
2330 out_free:
2331         name_destroy(&cliname);
2332         RETURN(rc);
2333 }
2334
2335 /* Add the ost info to the client/mdt lov */
2336 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2337                                     struct mgs_device *mgs, struct fs_db *fsdb,
2338                                     struct mgs_target_info *mti,
2339                                     char *logname, char *suffix, char *lovname,
2340                                     enum lustre_sec_part sec_part, int flags)
2341 {
2342         struct llog_handle *llh = NULL;
2343         char *nodeuuid = NULL;
2344         char *oscname = NULL;
2345         char *oscuuid = NULL;
2346         char *lovuuid = NULL;
2347         char *svname = NULL;
2348         char index[6];
2349         int i, rc;
2350
2351         ENTRY;
2352         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2353                mti->mti_svname, logname);
2354
2355         if (mgs_log_is_empty(env, mgs, logname)) {
2356                 CERROR("log is empty! Logical error\n");
2357                 RETURN (-EINVAL);
2358         }
2359
2360         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2361         if (rc)
2362                 RETURN(rc);
2363         rc = name_create(&svname, mti->mti_svname, "-osc");
2364         if (rc)
2365                 GOTO(out_free, rc);
2366
2367         /* for the system upgraded from old 1.8, keep using the old osc naming
2368          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2369         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2370                 rc = name_create(&oscname, svname, "");
2371         else
2372                 rc = name_create(&oscname, svname, suffix);
2373         if (rc)
2374                 GOTO(out_free, rc);
2375
2376         rc = name_create(&oscuuid, oscname, "_UUID");
2377         if (rc)
2378                 GOTO(out_free, rc);
2379         rc = name_create(&lovuuid, lovname, "_UUID");
2380         if (rc)
2381                 GOTO(out_free, rc);
2382
2383
2384         /*
2385         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2386         multihomed (#4)
2387         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2388         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2389         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2390         failover (#6,7)
2391         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2392         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2393         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2394         */
2395
2396         rc = record_start_log(env, mgs, &llh, logname);
2397         if (rc)
2398                 GOTO(out_free, rc);
2399
2400         /* FIXME these should be a single journal transaction */
2401         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2402                            "add osc");
2403         if (rc)
2404                 GOTO(out_end, rc);
2405
2406         /* NB: don't change record order, because upon MDT steal OSC config
2407          * from client, it treats all nids before LCFG_SETUP as target nids
2408          * (multiple interfaces), while nids after as failover node nids.
2409          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2410          */
2411         for (i = 0; i < mti->mti_nid_count; i++) {
2412                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2413                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2414                 if (rc)
2415                         GOTO(out_end, rc);
2416         }
2417         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2418         if (rc)
2419                 GOTO(out_end, rc);
2420         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2421         if (rc)
2422                 GOTO(out_end, rc);
2423         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2424         if (rc)
2425                 GOTO(out_end, rc);
2426
2427         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2428
2429         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2430         if (rc)
2431                 GOTO(out_end, rc);
2432         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2433                            "add osc");
2434         if (rc)
2435                 GOTO(out_end, rc);
2436 out_end:
2437         record_end_log(env, &llh);
2438 out_free:
2439         name_destroy(&lovuuid);
2440         name_destroy(&oscuuid);
2441         name_destroy(&oscname);
2442         name_destroy(&svname);
2443         name_destroy(&nodeuuid);
2444         RETURN(rc);
2445 }
2446
2447 static int mgs_write_log_ost(const struct lu_env *env,
2448                              struct mgs_device *mgs, struct fs_db *fsdb,
2449                              struct mgs_target_info *mti)
2450 {
2451         struct llog_handle *llh = NULL;
2452         char *logname, *lovname;
2453         char *ptr = mti->mti_params;
2454         int rc, flags = 0, failout = 0, i;
2455         ENTRY;
2456
2457         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2458
2459         /* The ost startup log */
2460
2461         /* If the ost log already exists, that means that someone reformatted
2462            the ost and it called target_add again. */
2463         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2464                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2465                                    "exists, yet the server claims it never "
2466                                    "registered. It may have been reformatted, "
2467                                    "or the index changed. writeconf the MDT to "
2468                                    "regenerate all logs.\n", mti->mti_svname);
2469                 RETURN(-EALREADY);
2470         }
2471
2472         /*
2473         attach obdfilter ost1 ost1_UUID
2474         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2475         */
2476         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2477                 failout = (strncmp(ptr, "failout", 7) == 0);
2478         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2479         if (rc)
2480                 RETURN(rc);
2481         /* FIXME these should be a single journal transaction */
2482         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2483         if (rc)
2484                 GOTO(out_end, rc);
2485         if (*mti->mti_uuid == '\0')
2486                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2487                          "%s_UUID", mti->mti_svname);
2488         rc = record_attach(env, llh, mti->mti_svname,
2489                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2490         if (rc)
2491                 GOTO(out_end, rc);
2492         rc = record_setup(env, llh, mti->mti_svname,
2493                           "dev"/*ignored*/, "type"/*ignored*/,
2494                           failout ? "n" : "f", 0/*options*/);
2495         if (rc)
2496                 GOTO(out_end, rc);
2497         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2498         if (rc)
2499                 GOTO(out_end, rc);
2500 out_end:
2501         record_end_log(env, &llh);
2502         if (rc)
2503                 RETURN(rc);
2504         /* We also have to update the other logs where this osc is part of
2505            the lov */
2506
2507         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2508                 /* If we're upgrading, the old mdt log already has our
2509                    entry. Let's do a fake one for fun. */
2510                 /* Note that we can't add any new failnids, since we don't
2511                    know the old osc names. */
2512                 flags = CM_SKIP | CM_UPGRADE146;
2513
2514         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2515                 /* If the update flag isn't set, don't update client/mdt
2516                    logs. */
2517                 flags |= CM_SKIP;
2518                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2519                               "the MDT first to regenerate it.\n",
2520                               mti->mti_svname);
2521         }
2522
2523         /* Add ost to all MDT lov defs */
2524         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2525                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2526                         char mdt_index[9];
2527
2528                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2529                                                      i);
2530                         if (rc)
2531                                 RETURN(rc);
2532                         sprintf(mdt_index, "-MDT%04x", i);
2533                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2534                                                       logname, mdt_index,
2535                                                       lovname, LUSTRE_SP_MDT,
2536                                                       flags);
2537                         name_destroy(&logname);
2538                         name_destroy(&lovname);
2539                         if (rc)
2540                                 RETURN(rc);
2541                 }
2542         }
2543
2544         /* Append ost info to the client log */
2545         rc = name_create(&logname, mti->mti_fsname, "-client");
2546         if (rc)
2547                 RETURN(rc);
2548         if (mgs_log_is_empty(env, mgs, logname)) {
2549                 /* Start client log */
2550                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2551                                        fsdb->fsdb_clilov);
2552                 if (rc)
2553                         GOTO(out_free, rc);
2554                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2555                                        fsdb->fsdb_clilmv);
2556                 if (rc)
2557                         GOTO(out_free, rc);
2558         }
2559         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2560                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2561 out_free:
2562         name_destroy(&logname);
2563         RETURN(rc);
2564 }
2565
2566 static __inline__ int mgs_param_empty(char *ptr)
2567 {
2568         char *tmp;
2569
2570         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2571                 return 1;
2572         return 0;
2573 }
2574
2575 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2576                                           struct mgs_device *mgs,
2577                                           struct fs_db *fsdb,
2578                                           struct mgs_target_info *mti,
2579                                           char *logname, char *cliname)
2580 {
2581         int rc;
2582         struct llog_handle *llh = NULL;
2583
2584         if (mgs_param_empty(mti->mti_params)) {
2585                 /* Remove _all_ failnids */
2586                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2587                                 mti->mti_svname, "add failnid", CM_SKIP);
2588                 return rc < 0 ? rc : 0;
2589         }
2590
2591         /* Otherwise failover nids are additive */
2592         rc = record_start_log(env, mgs, &llh, logname);
2593         if (rc)
2594                 return rc;
2595                 /* FIXME this should be a single journal transaction */
2596         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2597                            "add failnid");
2598         if (rc)
2599                 goto out_end;
2600         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2601         if (rc)
2602                 goto out_end;
2603         rc = record_marker(env, llh, fsdb, CM_END,
2604                            mti->mti_svname, "add failnid");
2605 out_end:
2606         record_end_log(env, &llh);
2607         return rc;
2608 }
2609
2610
2611 /* Add additional failnids to an existing log.
2612    The mdc/osc must have been added to logs first */
2613 /* tcp nids must be in dotted-quad ascii -
2614    we can't resolve hostnames from the kernel. */
2615 static int mgs_write_log_add_failnid(const struct lu_env *env,
2616                                      struct mgs_device *mgs,
2617                                      struct fs_db *fsdb,
2618                                      struct mgs_target_info *mti)
2619 {
2620         char *logname, *cliname;
2621         int rc;
2622         ENTRY;
2623
2624         /* FIXME we currently can't erase the failnids
2625          * given when a target first registers, since they aren't part of
2626          * an "add uuid" stanza */
2627
2628         /* Verify that we know about this target */
2629         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2630                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2631                                    "yet. It must be started before failnids "
2632                                    "can be added.\n", mti->mti_svname);
2633                 RETURN(-ENOENT);
2634         }
2635
2636         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2637         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2638                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2639         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2640                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2641         } else {
2642                 RETURN(-EINVAL);
2643         }
2644         if (rc)
2645                 RETURN(rc);
2646         /* Add failover nids to the client log */
2647         rc = name_create(&logname, mti->mti_fsname, "-client");
2648         if (rc) {
2649                 name_destroy(&cliname);
2650                 RETURN(rc);
2651         }
2652         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2653         name_destroy(&logname);
2654         name_destroy(&cliname);
2655         if (rc)
2656                 RETURN(rc);
2657
2658         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2659                 /* Add OST failover nids to the MDT logs as well */
2660                 int i;
2661
2662                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2663                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2664                                 continue;
2665                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2666                         if (rc)
2667                                 RETURN(rc);
2668                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2669                                                  fsdb, i);
2670                         if (rc) {
2671                                 name_destroy(&logname);
2672                                 RETURN(rc);
2673                         }
2674                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2675                                                             mti, logname,
2676                                                             cliname);
2677                         name_destroy(&cliname);
2678                         name_destroy(&logname);
2679                         if (rc)
2680                                 RETURN(rc);
2681                 }
2682         }
2683
2684         RETURN(rc);
2685 }
2686
2687 static int mgs_wlp_lcfg(const struct lu_env *env,
2688                         struct mgs_device *mgs, struct fs_db *fsdb,
2689                         struct mgs_target_info *mti,
2690                         char *logname, struct lustre_cfg_bufs *bufs,
2691                         char *tgtname, char *ptr)
2692 {
2693         char comment[MTI_NAME_MAXLEN];
2694         char *tmp;
2695         struct lustre_cfg *lcfg;
2696         int rc, del;
2697
2698         /* Erase any old settings of this same parameter */
2699         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2700         comment[MTI_NAME_MAXLEN - 1] = 0;
2701         /* But don't try to match the value. */
2702         if ((tmp = strchr(comment, '=')))
2703             *tmp = 0;
2704         /* FIXME we should skip settings that are the same as old values */
2705         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2706         if (rc < 0)
2707                 return rc;
2708         del = mgs_param_empty(ptr);
2709
2710         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2711                       "Sett" : "Modify", tgtname, comment, logname);
2712         if (del)
2713                 return rc;
2714
2715         lustre_cfg_bufs_reset(bufs, tgtname);
2716         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2717         if (mti->mti_flags & LDD_F_PARAM2)
2718                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2719
2720         lcfg = lustre_cfg_new((mti->mti_flags & LDD_F_PARAM2) ?
2721                               LCFG_SET_PARAM : LCFG_PARAM, bufs);
2722
2723         if (!lcfg)
2724                 return -ENOMEM;
2725         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2726         lustre_cfg_free(lcfg);
2727         return rc;
2728 }
2729
2730 static int mgs_write_log_param2(const struct lu_env *env,
2731                                 struct mgs_device *mgs,
2732                                 struct fs_db *fsdb,
2733                                 struct mgs_target_info *mti, char *ptr)
2734 {
2735         struct lustre_cfg_bufs  bufs;
2736         int                     rc = 0;
2737         ENTRY;
2738
2739         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2740         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2741                           mti->mti_svname, ptr);
2742
2743         RETURN(rc);
2744 }
2745
2746 /* write global variable settings into log */
2747 static int mgs_write_log_sys(const struct lu_env *env,
2748                              struct mgs_device *mgs, struct fs_db *fsdb,
2749                              struct mgs_target_info *mti, char *sys, char *ptr)
2750 {
2751         struct mgs_thread_info *mgi = mgs_env_info(env);
2752         struct lustre_cfg *lcfg;
2753         char *tmp, sep;
2754         int rc, cmd, convert = 1;
2755
2756         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2757                 cmd = LCFG_SET_TIMEOUT;
2758         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2759                 cmd = LCFG_SET_LDLM_TIMEOUT;
2760         /* Check for known params here so we can return error to lctl */
2761         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2762                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2763                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2764                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2765                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2766                 cmd = LCFG_PARAM;
2767         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2768                 convert = 0; /* Don't convert string value to integer */
2769                 cmd = LCFG_PARAM;
2770         } else {
2771                 return -EINVAL;
2772         }
2773
2774         if (mgs_param_empty(ptr))
2775                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2776         else
2777                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2778
2779         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2780         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2781         if (!convert && *tmp != '\0')
2782                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2783         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2784         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2785         /* truncate the comment to the parameter name */
2786         ptr = tmp - 1;
2787         sep = *ptr;
2788         *ptr = '\0';
2789         /* modify all servers and clients */
2790         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2791                                       *tmp == '\0' ? NULL : lcfg,
2792                                       mti->mti_fsname, sys, 0);
2793         if (rc == 0 && *tmp != '\0') {
2794                 switch (cmd) {
2795                 case LCFG_SET_TIMEOUT:
2796                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2797                                 class_process_config(lcfg);
2798                         break;
2799                 case LCFG_SET_LDLM_TIMEOUT:
2800                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2801                                 class_process_config(lcfg);
2802                         break;
2803                 default:
2804                         break;
2805                 }
2806         }
2807         *ptr = sep;
2808         lustre_cfg_free(lcfg);
2809         return rc;
2810 }
2811
2812 /* write quota settings into log */
2813 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2814                                struct fs_db *fsdb, struct mgs_target_info *mti,
2815                                char *quota, char *ptr)
2816 {
2817         struct mgs_thread_info  *mgi = mgs_env_info(env);
2818         struct lustre_cfg       *lcfg;
2819         char                    *tmp;
2820         char                     sep;
2821         int                      rc, cmd = LCFG_PARAM;
2822
2823         /* support only 'meta' and 'data' pools so far */
2824         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2825             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2826                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2827                        "& quota.ost are)\n", ptr);
2828                 return -EINVAL;
2829         }
2830
2831         if (*tmp == '\0') {
2832                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2833         } else {
2834                 CDEBUG(D_MGS, "global '%s'\n", quota);
2835
2836                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2837                     strcmp(tmp, "none") != 0) {
2838                         CERROR("enable option(%s) isn't supported\n", tmp);
2839                         return -EINVAL;
2840                 }
2841         }
2842
2843         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2844         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2845         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2846         /* truncate the comment to the parameter name */
2847         ptr = tmp - 1;
2848         sep = *ptr;
2849         *ptr = '\0';
2850
2851         /* XXX we duplicated quota enable information in all server
2852          *     config logs, it should be moved to a separate config
2853          *     log once we cleanup the config log for global param. */
2854         /* modify all servers */
2855         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2856                                       *tmp == '\0' ? NULL : lcfg,
2857                                       mti->mti_fsname, quota, 1);
2858         *ptr = sep;
2859         lustre_cfg_free(lcfg);
2860         return rc < 0 ? rc : 0;
2861 }
2862
2863 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2864                                    struct mgs_device *mgs,
2865                                    struct fs_db *fsdb,
2866                                    struct mgs_target_info *mti,
2867                                    char *param)
2868 {
2869         struct mgs_thread_info *mgi = mgs_env_info(env);
2870         struct llog_handle     *llh = NULL;
2871         char                   *logname;
2872         char                   *comment, *ptr;
2873         struct lustre_cfg      *lcfg;
2874         int                     rc, len;
2875         ENTRY;
2876
2877         /* get comment */
2878         ptr = strchr(param, '=');
2879         LASSERT(ptr);
2880         len = ptr - param;
2881
2882         OBD_ALLOC(comment, len + 1);
2883         if (comment == NULL)
2884                 RETURN(-ENOMEM);
2885         strncpy(comment, param, len);
2886         comment[len] = '\0';
2887
2888         /* prepare lcfg */
2889         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2890         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2891         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2892         if (lcfg == NULL)
2893                 GOTO(out_comment, rc = -ENOMEM);
2894
2895         /* construct log name */
2896         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2897         if (rc)
2898                 GOTO(out_lcfg, rc);
2899
2900         if (mgs_log_is_empty(env, mgs, logname)) {
2901                 rc = record_start_log(env, mgs, &llh, logname);
2902                 if (rc)
2903                         GOTO(out, rc);
2904                 record_end_log(env, &llh);
2905         }
2906
2907         /* obsolete old one */
2908         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2909                         comment, CM_SKIP);
2910         if (rc < 0)
2911                 GOTO(out, rc);
2912         /* write the new one */
2913         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2914                                   mti->mti_svname, comment);
2915         if (rc)
2916                 CERROR("err %d writing log %s\n", rc, logname);
2917 out:
2918         name_destroy(&logname);
2919 out_lcfg:
2920         lustre_cfg_free(lcfg);
2921 out_comment:
2922         OBD_FREE(comment, len + 1);
2923         RETURN(rc);
2924 }
2925
2926 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2927                                         char *param)
2928 {
2929         char    *ptr;
2930
2931         /* disable the adjustable udesc parameter for now, i.e. use default
2932          * setting that client always ship udesc to MDT if possible. to enable
2933          * it simply remove the following line */
2934         goto error_out;
2935
2936         ptr = strchr(param, '=');
2937         if (ptr == NULL)
2938                 goto error_out;
2939         *ptr++ = '\0';
2940
2941         if (strcmp(param, PARAM_SRPC_UDESC))
2942                 goto error_out;
2943
2944         if (strcmp(ptr, "yes") == 0) {
2945                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2946                 CWARN("Enable user descriptor shipping from client to MDT\n");
2947         } else if (strcmp(ptr, "no") == 0) {
2948                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2949                 CWARN("Disable user descriptor shipping from client to MDT\n");
2950         } else {
2951                 *(ptr - 1) = '=';
2952                 goto error_out;
2953         }
2954         return 0;
2955
2956 error_out:
2957         CERROR("Invalid param: %s\n", param);
2958         return -EINVAL;
2959 }
2960
2961 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2962                                   const char *svname,
2963                                   char *param)
2964 {
2965         struct sptlrpc_rule      rule;
2966         struct sptlrpc_rule_set *rset;
2967         int                      rc;
2968         ENTRY;
2969
2970         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2971                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2972                 RETURN(-EINVAL);
2973         }
2974
2975         if (strncmp(param, PARAM_SRPC_UDESC,
2976                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2977                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2978         }
2979
2980         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2981                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2982                 RETURN(-EINVAL);
2983         }
2984
2985         param += sizeof(PARAM_SRPC_FLVR) - 1;
2986
2987         rc = sptlrpc_parse_rule(param, &rule);
2988         if (rc)
2989                 RETURN(rc);
2990
2991         /* mgs rules implies must be mgc->mgs */
2992         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2993                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2994                      rule.sr_from != LUSTRE_SP_ANY) ||
2995                     (rule.sr_to != LUSTRE_SP_MGS &&
2996                      rule.sr_to != LUSTRE_SP_ANY))
2997                         RETURN(-EINVAL);
2998         }
2999
3000         /* preapre room for this coming rule. svcname format should be:
3001          * - fsname: general rule
3002          * - fsname-tgtname: target-specific rule
3003          */
3004         if (strchr(svname, '-')) {
3005                 struct mgs_tgt_srpc_conf *tgtconf;
3006                 int                       found = 0;
3007
3008                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3009                      tgtconf = tgtconf->mtsc_next) {
3010                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3011                                 found = 1;
3012                                 break;
3013                         }
3014                 }
3015
3016                 if (!found) {
3017                         int name_len;
3018
3019                         OBD_ALLOC_PTR(tgtconf);
3020                         if (tgtconf == NULL)
3021                                 RETURN(-ENOMEM);
3022
3023                         name_len = strlen(svname);
3024
3025                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3026                         if (tgtconf->mtsc_tgt == NULL) {
3027                                 OBD_FREE_PTR(tgtconf);
3028                                 RETURN(-ENOMEM);
3029                         }
3030                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3031
3032                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3033                         fsdb->fsdb_srpc_tgt = tgtconf;
3034                 }
3035
3036                 rset = &tgtconf->mtsc_rset;
3037         } else {
3038                 rset = &fsdb->fsdb_srpc_gen;
3039         }
3040
3041         rc = sptlrpc_rule_set_merge(rset, &rule);
3042
3043         RETURN(rc);
3044 }
3045
3046 static int mgs_srpc_set_param(const struct lu_env *env,
3047                               struct mgs_device *mgs,
3048                               struct fs_db *fsdb,
3049                               struct mgs_target_info *mti,
3050                               char *param)
3051 {
3052         char                   *copy;
3053         int                     rc, copy_size;
3054         ENTRY;
3055
3056 #ifndef HAVE_GSS
3057         RETURN(-EINVAL);
3058 #endif
3059         /* keep a copy of original param, which could be destroied
3060          * during parsing */
3061         copy_size = strlen(param) + 1;
3062         OBD_ALLOC(copy, copy_size);
3063         if (copy == NULL)
3064                 return -ENOMEM;
3065         memcpy(copy, param, copy_size);
3066
3067         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3068         if (rc)
3069                 goto out_free;
3070
3071         /* previous steps guaranteed the syntax is correct */
3072         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3073         if (rc)
3074                 goto out_free;
3075
3076         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3077                 /*
3078                  * for mgs rules, make them effective immediately.
3079                  */
3080                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3081                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3082                                                  &fsdb->fsdb_srpc_gen);
3083         }
3084
3085 out_free:
3086         OBD_FREE(copy, copy_size);
3087         RETURN(rc);
3088 }
3089
3090 struct mgs_srpc_read_data {
3091         struct fs_db   *msrd_fsdb;
3092         int             msrd_skip;
3093 };
3094
3095 static int mgs_srpc_read_handler(const struct lu_env *env,
3096                                  struct llog_handle *llh,
3097                                  struct llog_rec_hdr *rec, void *data)
3098 {
3099         struct mgs_srpc_read_data *msrd = data;
3100         struct cfg_marker         *marker;
3101         struct lustre_cfg         *lcfg = REC_DATA(rec);
3102         char                      *svname, *param;
3103         int                        cfg_len, rc;
3104         ENTRY;
3105
3106         if (rec->lrh_type != OBD_CFG_REC) {
3107                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3108                 RETURN(-EINVAL);
3109         }
3110
3111         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3112                   sizeof(struct llog_rec_tail);
3113
3114         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3115         if (rc) {
3116                 CERROR("Insane cfg\n");
3117                 RETURN(rc);
3118         }
3119
3120         if (lcfg->lcfg_command == LCFG_MARKER) {
3121                 marker = lustre_cfg_buf(lcfg, 1);
3122
3123                 if (marker->cm_flags & CM_START &&
3124                     marker->cm_flags & CM_SKIP)
3125                         msrd->msrd_skip = 1;
3126                 if (marker->cm_flags & CM_END)
3127                         msrd->msrd_skip = 0;
3128
3129                 RETURN(0);
3130         }
3131
3132         if (msrd->msrd_skip)
3133                 RETURN(0);
3134
3135         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3136                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3137                 RETURN(0);
3138         }
3139
3140         svname = lustre_cfg_string(lcfg, 0);
3141         if (svname == NULL) {
3142                 CERROR("svname is empty\n");
3143                 RETURN(0);
3144         }
3145
3146         param = lustre_cfg_string(lcfg, 1);
3147         if (param == NULL) {
3148                 CERROR("param is empty\n");
3149                 RETURN(0);
3150         }
3151
3152         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3153         if (rc)
3154                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3155
3156         RETURN(0);
3157 }
3158
3159 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3160                                 struct mgs_device *mgs,
3161                                 struct fs_db *fsdb)
3162 {
3163         struct llog_handle        *llh = NULL;
3164         struct llog_ctxt          *ctxt;
3165         char                      *logname;
3166         struct mgs_srpc_read_data  msrd;
3167         int                        rc;
3168         ENTRY;
3169
3170         /* construct log name */
3171         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3172         if (rc)
3173                 RETURN(rc);
3174
3175         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3176         LASSERT(ctxt != NULL);
3177
3178         if (mgs_log_is_empty(env, mgs, logname))
3179                 GOTO(out, rc = 0);
3180
3181         rc = llog_open(env, ctxt, &llh, NULL, logname,
3182                        LLOG_OPEN_EXISTS);
3183         if (rc < 0) {
3184                 if (rc == -ENOENT)
3185                         rc = 0;
3186                 GOTO(out, rc);
3187         }
3188
3189         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3190         if (rc)
3191                 GOTO(out_close, rc);
3192
3193         if (llog_get_size(llh) <= 1)
3194                 GOTO(out_close, rc = 0);
3195
3196         msrd.msrd_fsdb = fsdb;
3197         msrd.msrd_skip = 0;
3198
3199         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3200                           NULL);
3201
3202 out_close:
3203         llog_close(env, llh);
3204 out:
3205         llog_ctxt_put(ctxt);
3206         name_destroy(&logname);
3207
3208         if (rc)
3209                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3210         RETURN(rc);
3211 }
3212
3213 /* Permanent settings of all parameters by writing into the appropriate
3214  * configuration logs.
3215  * A parameter with null value ("<param>='\0'") means to erase it out of
3216  * the logs.
3217  */
3218 static int mgs_write_log_param(const struct lu_env *env,
3219                                struct mgs_device *mgs, struct fs_db *fsdb,
3220                                struct mgs_target_info *mti, char *ptr)
3221 {
3222         struct mgs_thread_info *mgi = mgs_env_info(env);
3223         char *logname;
3224         char *tmp;
3225         int rc = 0, rc2 = 0;
3226         ENTRY;
3227
3228         /* For various parameter settings, we have to figure out which logs
3229            care about them (e.g. both mdt and client for lov settings) */
3230         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3231
3232         /* The params are stored in MOUNT_DATA_FILE and modified via
3233            tunefs.lustre, or set using lctl conf_param */
3234
3235         /* Processed in lustre_start_mgc */
3236         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3237                 GOTO(end, rc);
3238
3239         /* Processed in ost/mdt */
3240         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3241                 GOTO(end, rc);
3242
3243         /* Processed in mgs_write_log_ost */
3244         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3245                 if (mti->mti_flags & LDD_F_PARAM) {
3246                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3247                                            "changed with tunefs.lustre"
3248                                            "and --writeconf\n", ptr);
3249                         rc = -EPERM;
3250                 }
3251                 GOTO(end, rc);
3252         }
3253
3254         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3255                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3256                 GOTO(end, rc);
3257         }
3258
3259         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3260                 /* Add a failover nidlist */
3261                 rc = 0;
3262                 /* We already processed failovers params for new
3263                    targets in mgs_write_log_target */
3264                 if (mti->mti_flags & LDD_F_PARAM) {
3265                         CDEBUG(D_MGS, "Adding failnode\n");
3266                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3267                 }
3268                 GOTO(end, rc);
3269         }
3270
3271         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3272                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3273                 GOTO(end, rc);
3274         }
3275
3276         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3277                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3278                 GOTO(end, rc);
3279         }
3280
3281         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3282                 /* active=0 means off, anything else means on */
3283                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3284                 int i;
3285
3286                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3287                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3288                                            "be (de)activated.\n",
3289                                            mti->mti_svname);
3290                         GOTO(end, rc = -EINVAL);
3291                 }
3292                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3293                               flag ? "de": "re", mti->mti_svname);
3294                 /* Modify clilov */
3295                 rc = name_create(&logname, mti->mti_fsname, "-client");
3296                 if (rc)
3297                         GOTO(end, rc);
3298                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3299                                 mti->mti_svname, "add osc", flag);
3300                 name_destroy(&logname);
3301                 if (rc)
3302                         goto active_err;
3303                 /* Modify mdtlov */
3304                 /* Add to all MDT logs for CMD */
3305                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3306                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3307                                 continue;
3308                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3309                         if (rc)
3310                                 GOTO(end, rc);
3311                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3312                                         mti->mti_svname, "add osc", flag);
3313                         name_destroy(&logname);
3314                         if (rc)
3315                                 goto active_err;
3316                 }
3317         active_err:
3318                 if (rc) {
3319                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3320                                            "log (%d). No permanent "
3321                                            "changes were made to the "
3322                                            "config log.\n",
3323                                            mti->mti_svname, rc);
3324                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3325                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3326                                                    " because the log"
3327                                                    "is in the old 1.4"
3328                                                    "style. Consider "
3329                                                    " --writeconf to "
3330                                                    "update the logs.\n");
3331                         GOTO(end, rc);
3332                 }
3333                 /* Fall through to osc proc for deactivating live OSC
3334                    on running MDT / clients. */
3335         }
3336         /* Below here, let obd's XXX_process_config methods handle it */
3337
3338         /* All lov. in proc */
3339         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3340                 char *mdtlovname;
3341
3342                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3343                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3344                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3345                                            "set on the MDT, not %s. "
3346                                            "Ignoring.\n",
3347                                            mti->mti_svname);
3348                         GOTO(end, rc = 0);
3349                 }
3350
3351                 /* Modify mdtlov */
3352                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3353                         GOTO(end, rc = -ENODEV);
3354
3355                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3356                                              mti->mti_stripe_index);
3357                 if (rc)
3358                         GOTO(end, rc);
3359                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3360                                   &mgi->mgi_bufs, mdtlovname, ptr);
3361                 name_destroy(&logname);
3362                 name_destroy(&mdtlovname);
3363                 if (rc)
3364                         GOTO(end, rc);
3365
3366                 /* Modify clilov */
3367                 rc = name_create(&logname, mti->mti_fsname, "-client");
3368                 if (rc)
3369                         GOTO(end, rc);
3370                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3371                                   fsdb->fsdb_clilov, ptr);
3372                 name_destroy(&logname);
3373                 GOTO(end, rc);
3374         }
3375
3376         /* All osc., mdc., llite. params in proc */
3377         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3378             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3379             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3380                 char *cname;
3381
3382                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3383                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3384                                            " cannot be modified. Consider"
3385                                            " updating the configuration with"
3386                                            " --writeconf\n",
3387                                            mti->mti_svname);
3388                         GOTO(end, rc = -EINVAL);
3389                 }
3390                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3391                         rc = name_create(&cname, mti->mti_fsname, "-client");
3392                         /* Add the client type to match the obdname in
3393                            class_config_llog_handler */
3394                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3395                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3396                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3397                         rc = name_create(&cname, mti->mti_svname, "-osc");
3398                 } else {
3399                         GOTO(end, rc = -EINVAL);
3400                 }
3401                 if (rc)
3402                         GOTO(end, rc);
3403
3404                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3405
3406                 /* Modify client */
3407                 rc = name_create(&logname, mti->mti_fsname, "-client");
3408                 if (rc) {
3409                         name_destroy(&cname);
3410                         GOTO(end, rc);
3411                 }
3412                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3413                                   cname, ptr);
3414
3415                 /* osc params affect the MDT as well */
3416                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3417                         int i;
3418
3419                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3420                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3421                                         continue;
3422                                 name_destroy(&cname);
3423                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3424                                                          fsdb, i);
3425                                 name_destroy(&logname);
3426                                 if (rc)
3427                                         break;
3428                                 rc = name_create_mdt(&logname,
3429                                                      mti->mti_fsname, i);
3430                                 if (rc)
3431                                         break;
3432                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3433                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3434                                                           mti, logname,
3435                                                           &mgi->mgi_bufs,
3436                                                           cname, ptr);
3437                                         if (rc)
3438                                                 break;
3439                                 }
3440                         }
3441                 }
3442                 name_destroy(&logname);
3443                 name_destroy(&cname);
3444                 GOTO(end, rc);
3445         }
3446
3447         /* All mdt. params in proc */
3448         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3449                 int i;
3450                 __u32 idx;
3451
3452                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3453                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3454                             MTI_NAME_MAXLEN) == 0)
3455                         /* device is unspecified completely? */
3456                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3457                 else
3458                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3459                 if (rc < 0)
3460                         goto active_err;
3461                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3462                         goto active_err;
3463                 if (rc & LDD_F_SV_ALL) {
3464                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3465                                 if (!test_bit(i,
3466                                                   fsdb->fsdb_mdt_index_map))
3467                                         continue;
3468                                 rc = name_create_mdt(&logname,
3469                                                 mti->mti_fsname, i);
3470                                 if (rc)
3471                                         goto active_err;
3472                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3473                                                   logname, &mgi->mgi_bufs,
3474                                                   logname, ptr);
3475                                 name_destroy(&logname);
3476                                 if (rc)
3477                                         goto active_err;
3478                         }
3479                 } else {
3480                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3481                                           mti->mti_svname, &mgi->mgi_bufs,
3482                                           mti->mti_svname, ptr);
3483                         if (rc)
3484                                 goto active_err;
3485                 }
3486                 GOTO(end, rc);
3487         }
3488
3489         /* All mdd., ost. params in proc */
3490         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3491             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
3492                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3493                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3494                         GOTO(end, rc = -ENODEV);
3495
3496                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3497                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3498                 GOTO(end, rc);
3499         }
3500
3501         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3502         rc2 = -ENOSYS;
3503
3504 end:
3505         if (rc)
3506                 CERROR("err %d on param '%s'\n", rc, ptr);
3507
3508         RETURN(rc ?: rc2);
3509 }
3510
3511 /* Not implementing automatic failover nid addition at this time. */
3512 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3513                       struct mgs_target_info *mti)
3514 {
3515 #if 0
3516         struct fs_db *fsdb;
3517         int rc;
3518         ENTRY;
3519
3520         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3521         if (rc)
3522                 RETURN(rc);
3523
3524         if (mgs_log_is_empty(obd, mti->mti_svname))
3525                 /* should never happen */
3526                 RETURN(-ENOENT);
3527
3528         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3529
3530         /* FIXME We can just check mti->params to see if we're already in
3531            the failover list.  Modify mti->params for rewriting back at
3532            server_register_target(). */
3533
3534         mutex_lock(&fsdb->fsdb_mutex);
3535         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3536         mutex_unlock(&fsdb->fsdb_mutex);
3537
3538         RETURN(rc);
3539 #endif
3540         return 0;
3541 }
3542
3543 int mgs_write_log_target(const struct lu_env *env,
3544                          struct mgs_device *mgs,
3545                          struct mgs_target_info *mti,
3546                          struct fs_db *fsdb)
3547 {
3548         int rc = -EINVAL;
3549         char *buf, *params;
3550         ENTRY;
3551
3552         /* set/check the new target index */
3553         rc = mgs_set_index(env, mgs, mti);
3554         if (rc < 0) {
3555                 CERROR("Can't get index (%d)\n", rc);
3556                 RETURN(rc);
3557         }
3558
3559         if (rc == EALREADY) {
3560                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3561                               mti->mti_stripe_index, mti->mti_svname);
3562                 /* We would like to mark old log sections as invalid
3563                    and add new log sections in the client and mdt logs.
3564                    But if we add new sections, then live clients will
3565                    get repeat setup instructions for already running
3566                    osc's. So don't update the client/mdt logs. */
3567                 mti->mti_flags &= ~LDD_F_UPDATE;
3568                 rc = 0;
3569         }
3570
3571         mutex_lock(&fsdb->fsdb_mutex);
3572
3573         if (mti->mti_flags &
3574             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3575                 /* Generate a log from scratch */
3576                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3577                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3578                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3579                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3580                 } else {
3581                         CERROR("Unknown target type %#x, can't create log for "
3582                                "%s\n", mti->mti_flags, mti->mti_svname);
3583                 }
3584                 if (rc) {
3585                         CERROR("Can't write logs for %s (%d)\n",
3586                                mti->mti_svname, rc);
3587                         GOTO(out_up, rc);
3588                 }
3589         } else {
3590                 /* Just update the params from tunefs in mgs_write_log_params */
3591                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3592                 mti->mti_flags |= LDD_F_PARAM;
3593         }
3594
3595         /* allocate temporary buffer, where class_get_next_param will
3596            make copy of a current  parameter */
3597         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3598         if (buf == NULL)
3599                 GOTO(out_up, rc = -ENOMEM);
3600         params = mti->mti_params;
3601         while (params != NULL) {
3602                 rc = class_get_next_param(&params, buf);
3603                 if (rc) {
3604                         if (rc == 1)
3605                                 /* there is no next parameter, that is
3606                                    not an error */
3607                                 rc = 0;
3608                         break;
3609                 }
3610                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3611                        params, buf);
3612                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3613                 if (rc)
3614                         break;
3615         }
3616
3617         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3618
3619 out_up:
3620         mutex_unlock(&fsdb->fsdb_mutex);
3621         RETURN(rc);
3622 }
3623
3624 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3625 {
3626         struct llog_ctxt        *ctxt;
3627         int                      rc = 0;
3628
3629         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3630         if (ctxt == NULL) {
3631                 CERROR("%s: MGS config context doesn't exist\n",
3632                        mgs->mgs_obd->obd_name);
3633                 rc = -ENODEV;
3634         } else {
3635                 rc = llog_erase(env, ctxt, NULL, name);
3636                 /* llog may not exist */
3637                 if (rc == -ENOENT)
3638                         rc = 0;
3639                 llog_ctxt_put(ctxt);
3640         }
3641
3642         if (rc)
3643                 CERROR("%s: failed to clear log %s: %d\n",
3644                        mgs->mgs_obd->obd_name, name, rc);
3645
3646         return rc;
3647 }
3648
3649 /* erase all logs for the given fs */
3650 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3651 {
3652         struct fs_db *fsdb;
3653         cfs_list_t list;
3654         struct mgs_direntry *dirent, *n;
3655         int rc, len = strlen(fsname);
3656         char *suffix;
3657         ENTRY;
3658
3659         /* Find all the logs in the CONFIGS directory */
3660         rc = class_dentry_readdir(env, mgs, &list);
3661         if (rc)
3662                 RETURN(rc);
3663
3664         mutex_lock(&mgs->mgs_mutex);
3665
3666         /* Delete the fs db */
3667         fsdb = mgs_find_fsdb(mgs, fsname);
3668         if (fsdb)
3669                 mgs_free_fsdb(mgs, fsdb);
3670
3671         mutex_unlock(&mgs->mgs_mutex);
3672
3673         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3674                 cfs_list_del(&dirent->list);
3675                 suffix = strrchr(dirent->name, '-');
3676                 if (suffix != NULL) {
3677                         if ((len == suffix - dirent->name) &&
3678                             (strncmp(fsname, dirent->name, len) == 0)) {
3679                                 CDEBUG(D_MGS, "Removing log %s\n",
3680                                        dirent->name);
3681                                 mgs_erase_log(env, mgs, dirent->name);
3682                         }
3683                 }
3684                 mgs_direntry_free(dirent);
3685         }
3686
3687         RETURN(rc);
3688 }
3689
3690 /* from llog_swab */
3691 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3692 {
3693         int i;
3694         ENTRY;
3695
3696         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3697         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3698
3699         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3700         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3701         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3702         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3703
3704         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3705         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3706                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3707                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3708                                i, lcfg->lcfg_buflens[i],
3709                                lustre_cfg_string(lcfg, i));
3710                 }
3711         EXIT;
3712 }
3713
3714 /* Set a permanent (config log) param for a target or fs
3715  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3716  *             buf1 contains the single parameter
3717  */
3718 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3719                  struct lustre_cfg *lcfg, char *fsname)
3720 {
3721         struct fs_db *fsdb;
3722         struct mgs_target_info *mti;
3723         char *devname, *param;
3724         char *ptr;
3725         const char *tmp;
3726         __u32 index;
3727         int rc = 0;
3728         ENTRY;
3729
3730         print_lustre_cfg(lcfg);
3731
3732         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3733         devname = lustre_cfg_string(lcfg, 0);
3734         param = lustre_cfg_string(lcfg, 1);
3735         if (!devname) {
3736                 /* Assume device name embedded in param:
3737                    lustre-OST0000.osc.max_dirty_mb=32 */
3738                 ptr = strchr(param, '.');
3739                 if (ptr) {
3740                         devname = param;
3741                         *ptr = 0;
3742                         param = ptr + 1;
3743                 }
3744         }
3745         if (!devname) {
3746                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3747                 RETURN(-ENOSYS);
3748         }
3749
3750         rc = mgs_parse_devname(devname, fsname, NULL);
3751         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3752                 /* param related to llite isn't allowed to set by OST or MDT */
3753                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3754                                    sizeof(PARAM_LLITE)) == 0)
3755                         RETURN(-EINVAL);
3756         } else {
3757                 /* assume devname is the fsname */
3758                 memset(fsname, 0, MTI_NAME_MAXLEN);
3759                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3760                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3761         }
3762         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3763
3764         rc = mgs_find_or_make_fsdb(env, mgs,
3765                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3766                                    PARAMS_FILENAME : fsname, &fsdb);
3767         if (rc)
3768                 RETURN(rc);
3769
3770         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3771             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3772             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3773                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3774                        "is '%s'\n", fsname, devname);
3775                 mgs_free_fsdb(mgs, fsdb);
3776                 RETURN(-EINVAL);
3777         }
3778
3779         /* Create a fake mti to hold everything */
3780         OBD_ALLOC_PTR(mti);
3781         if (!mti)
3782                 GOTO(out, rc = -ENOMEM);
3783         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3784             >= sizeof(mti->mti_fsname))
3785                 GOTO(out, rc = -E2BIG);
3786         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3787             >= sizeof(mti->mti_svname))
3788                 GOTO(out, rc = -E2BIG);
3789         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3790             >= sizeof(mti->mti_params))
3791                 GOTO(out, rc = -E2BIG);
3792         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3793         if (rc < 0)
3794                 /* Not a valid server; may be only fsname */
3795                 rc = 0;
3796         else
3797                 /* Strip -osc or -mdc suffix from svname */
3798                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3799                                      mti->mti_svname))
3800                         GOTO(out, rc = -EINVAL);
3801         /*
3802          * Revoke lock so everyone updates.  Should be alright if
3803          * someone was already reading while we were updating the logs,
3804          * so we don't really need to hold the lock while we're
3805          * writing (above).
3806          */
3807         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3808                 mti->mti_flags = rc | LDD_F_PARAM2;
3809                 mutex_lock(&fsdb->fsdb_mutex);
3810                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3811                 mutex_unlock(&fsdb->fsdb_mutex);
3812                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3813         } else {
3814                 mti->mti_flags = rc | LDD_F_PARAM;
3815                 mutex_lock(&fsdb->fsdb_mutex);
3816                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3817                 mutex_unlock(&fsdb->fsdb_mutex);
3818                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3819         }
3820
3821 out:
3822         OBD_FREE_PTR(mti);
3823         RETURN(rc);
3824 }
3825
3826 static int mgs_write_log_pool(const struct lu_env *env,
3827                               struct mgs_device *mgs, char *logname,
3828                               struct fs_db *fsdb, char *tgtname,
3829                               enum lcfg_command_type cmd,
3830                               char *fsname, char *poolname,
3831                               char *ostname, char *comment)
3832 {
3833         struct llog_handle *llh = NULL;
3834         int rc;
3835
3836         rc = record_start_log(env, mgs, &llh, logname);
3837         if (rc)
3838                 return rc;
3839         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3840         if (rc)
3841                 goto out;
3842         rc = record_base(env, llh, tgtname, 0, cmd,
3843                          fsname, poolname, ostname, 0);
3844         if (rc)
3845                 goto out;
3846         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3847 out:
3848         record_end_log(env, &llh);
3849         return rc;
3850 }
3851
3852 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3853                  enum lcfg_command_type cmd, char *fsname,
3854                  char *poolname, char *ostname)
3855 {
3856         struct fs_db *fsdb;
3857         char *lovname;
3858         char *logname;
3859         char *label = NULL, *canceled_label = NULL;
3860         int label_sz;
3861         struct mgs_target_info *mti = NULL;
3862         int rc, i;
3863         ENTRY;
3864
3865         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3866         if (rc) {
3867                 CERROR("Can't get db for %s\n", fsname);
3868                 RETURN(rc);
3869         }
3870         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3871                 CERROR("%s is not defined\n", fsname);
3872                 mgs_free_fsdb(mgs, fsdb);
3873                 RETURN(-EINVAL);
3874         }
3875
3876         label_sz = 10 + strlen(fsname) + strlen(poolname);
3877
3878         /* check if ostname match fsname */
3879         if (ostname != NULL) {
3880                 char *ptr;
3881
3882                 ptr = strrchr(ostname, '-');
3883                 if ((ptr == NULL) ||
3884                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3885                         RETURN(-EINVAL);
3886                 label_sz += strlen(ostname);
3887         }
3888
3889         OBD_ALLOC(label, label_sz);
3890         if (label == NULL)
3891                 RETURN(-ENOMEM);
3892
3893         switch(cmd) {
3894         case LCFG_POOL_NEW:
3895                 sprintf(label,
3896                         "new %s.%s", fsname, poolname);
3897                 break;
3898         case LCFG_POOL_ADD:
3899                 sprintf(label,
3900                         "add %s.%s.%s", fsname, poolname, ostname);
3901                 break;
3902         case LCFG_POOL_REM:
3903                 OBD_ALLOC(canceled_label, label_sz);
3904                 if (canceled_label == NULL)
3905                         GOTO(out_label, rc = -ENOMEM);
3906                 sprintf(label,
3907                         "rem %s.%s.%s", fsname, poolname, ostname);
3908                 sprintf(canceled_label,
3909                         "add %s.%s.%s", fsname, poolname, ostname);
3910                 break;
3911         case LCFG_POOL_DEL:
3912                 OBD_ALLOC(canceled_label, label_sz);
3913                 if (canceled_label == NULL)
3914                         GOTO(out_label, rc = -ENOMEM);
3915                 sprintf(label,
3916                         "del %s.%s", fsname, poolname);
3917                 sprintf(canceled_label,
3918                         "new %s.%s", fsname, poolname);
3919                 break;
3920         default:
3921                 break;
3922         }
3923
3924         if (canceled_label != NULL) {
3925                 OBD_ALLOC_PTR(mti);
3926                 if (mti == NULL)
3927                         GOTO(out_cancel, rc = -ENOMEM);
3928         }
3929
3930         mutex_lock(&fsdb->fsdb_mutex);
3931         /* write pool def to all MDT logs */
3932         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3933                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3934                         rc = name_create_mdt_and_lov(&logname, &lovname,
3935                                                      fsdb, i);
3936                         if (rc) {
3937                                 mutex_unlock(&fsdb->fsdb_mutex);
3938                                 GOTO(out_mti, rc);
3939                         }
3940                         if (canceled_label != NULL) {
3941                                 strcpy(mti->mti_svname, "lov pool");
3942                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3943                                                 lovname, canceled_label,
3944                                                 CM_SKIP);
3945                         }
3946
3947                         if (rc >= 0)
3948                                 rc = mgs_write_log_pool(env, mgs, logname,
3949                                                         fsdb, lovname, cmd,
3950                                                         fsname, poolname,
3951                                                         ostname, label);
3952                         name_destroy(&logname);
3953                         name_destroy(&lovname);
3954                         if (rc) {
3955                                 mutex_unlock(&fsdb->fsdb_mutex);
3956                                 GOTO(out_mti, rc);
3957                         }
3958                 }
3959         }
3960
3961         rc = name_create(&logname, fsname, "-client");
3962         if (rc) {
3963                 mutex_unlock(&fsdb->fsdb_mutex);
3964                 GOTO(out_mti, rc);
3965         }
3966         if (canceled_label != NULL) {
3967                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3968                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
3969                 if (rc < 0) {
3970                         mutex_unlock(&fsdb->fsdb_mutex);
3971                         name_destroy(&logname);
3972                         GOTO(out_mti, rc);
3973                 }
3974         }
3975
3976         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
3977                                 cmd, fsname, poolname, ostname, label);
3978         mutex_unlock(&fsdb->fsdb_mutex);
3979         name_destroy(&logname);
3980         /* request for update */
3981         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3982
3983         EXIT;
3984 out_mti:
3985         if (mti != NULL)
3986                 OBD_FREE_PTR(mti);
3987 out_cancel:
3988         if (canceled_label != NULL)
3989                 OBD_FREE(canceled_label, label_sz);
3990 out_label:
3991         OBD_FREE(label, label_sz);
3992         return rc;
3993 }