Whamcloud - gitweb
9b0201fa58c4104d96250d0fd850141b664c5386
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         LASSERT(dir);
71         LASSERT(dir->do_index_ops);
72
73         iops = &dir->do_index_ops->dio_it;
74         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
75         if (IS_ERR(it))
76                 RETURN(PTR_ERR(it));
77
78         rc = iops->load(env, it, 0);
79         if (rc <= 0)
80                 GOTO(fini, rc = 0);
81
82         /* main cycle */
83         do {
84                 key = (void *)iops->key(env, it);
85                 if (IS_ERR(key)) {
86                         CERROR("%s: key failed when listing %s: rc = %d\n",
87                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
88                                (int) PTR_ERR(key));
89                         goto next;
90                 }
91                 key_sz = iops->key_size(env, it);
92                 LASSERT(key_sz > 0);
93
94                 /* filter out "." and ".." entries */
95                 if (key[0] == '.') {
96                         if (key_sz == 1)
97                                 goto next;
98                         if (key_sz == 2 && key[1] == '.')
99                                 goto next;
100                 }
101
102                 de = mgs_direntry_alloc(key_sz + 1);
103                 if (de == NULL) {
104                         rc = -ENOMEM;
105                         break;
106                 }
107
108                 memcpy(de->name, key, key_sz);
109                 de->name[key_sz] = 0;
110
111                 cfs_list_add(&de->list, list);
112
113 next:
114                 rc = iops->next(env, it);
115         } while (rc == 0);
116         rc = 0;
117
118         iops->put(env, it);
119
120 fini:
121         iops->fini(env, it);
122         if (rc)
123                 CERROR("%s: key failed when listing %s: rc = %d\n",
124                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
125         RETURN(rc);
126 }
127
128 /******************** DB functions *********************/
129
130 static inline int name_create(char **newname, char *prefix, char *suffix)
131 {
132         LASSERT(newname);
133         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
134         if (!*newname)
135                 return -ENOMEM;
136         sprintf(*newname, "%s%s", prefix, suffix);
137         return 0;
138 }
139
140 static inline void name_destroy(char **name)
141 {
142         if (*name)
143                 OBD_FREE(*name, strlen(*name) + 1);
144         *name = NULL;
145 }
146
147 struct mgs_fsdb_handler_data
148 {
149         struct fs_db   *fsdb;
150         __u32           ver;
151 };
152
153 /* from the (client) config log, figure out:
154         1. which ost's/mdt's are configured (by index)
155         2. what the last config step is
156         3. COMPAT_18 osc name
157 */
158 /* It might be better to have a separate db file, instead of parsing the info
159    out of the client log.  This is slow and potentially error-prone. */
160 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
161                             struct llog_rec_hdr *rec, void *data)
162 {
163         struct mgs_fsdb_handler_data *d = data;
164         struct fs_db *fsdb = d->fsdb;
165         int cfg_len = rec->lrh_len;
166         char *cfg_buf = (char*) (rec + 1);
167         struct lustre_cfg *lcfg;
168         __u32 index;
169         int rc = 0;
170         ENTRY;
171
172         if (rec->lrh_type != OBD_CFG_REC) {
173                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
174                 RETURN(-EINVAL);
175         }
176
177         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
178         if (rc) {
179                 CERROR("Insane cfg\n");
180                 RETURN(rc);
181         }
182
183         lcfg = (struct lustre_cfg *)cfg_buf;
184
185         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
186                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
187
188         /* Figure out ost indicies */
189         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
190         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
191             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
192                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
193                                        NULL, 10);
194                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
195                        lustre_cfg_string(lcfg, 1), index,
196                        lustre_cfg_string(lcfg, 2));
197                 set_bit(index, fsdb->fsdb_ost_index_map);
198         }
199
200         /* Figure out mdt indicies */
201         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
202         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
203             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
204                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
205                                        &index, NULL);
206                 if (rc != LDD_F_SV_TYPE_MDT) {
207                         CWARN("Unparsable MDC name %s, assuming index 0\n",
208                               lustre_cfg_string(lcfg, 0));
209                         index = 0;
210                 }
211                 rc = 0;
212                 CDEBUG(D_MGS, "MDT index is %u\n", index);
213                 set_bit(index, fsdb->fsdb_mdt_index_map);
214                 fsdb->fsdb_mdt_count ++;
215         }
216
217         /**
218          * figure out the old config. fsdb_gen = 0 means old log
219          * It is obsoleted and not supported anymore
220          */
221         if (fsdb->fsdb_gen == 0) {
222                 CERROR("Old config format is not supported\n");
223                 RETURN(-EINVAL);
224         }
225
226         /*
227          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
228          */
229         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
230             lcfg->lcfg_command == LCFG_ATTACH &&
231             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
232                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
233                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
234                         CWARN("MDT using 1.8 OSC name scheme\n");
235                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
236                 }
237         }
238
239         if (lcfg->lcfg_command == LCFG_MARKER) {
240                 struct cfg_marker *marker;
241                 marker = lustre_cfg_buf(lcfg, 1);
242
243                 d->ver = marker->cm_vers;
244
245                 /* Keep track of the latest marker step */
246                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
247         }
248
249         RETURN(rc);
250 }
251
252 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
253 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
254                                   struct mgs_device *mgs,
255                                   struct fs_db *fsdb)
256 {
257         char                            *logname;
258         struct llog_handle              *loghandle;
259         struct llog_ctxt                *ctxt;
260         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
261         int rc;
262
263         ENTRY;
264
265         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
266         LASSERT(ctxt != NULL);
267         rc = name_create(&logname, fsdb->fsdb_name, "-client");
268         if (rc)
269                 GOTO(out_put, rc);
270         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
271         if (rc)
272                 GOTO(out_pop, rc);
273
274         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
275         if (rc)
276                 GOTO(out_close, rc);
277
278         if (llog_get_size(loghandle) <= 1)
279                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
280
281         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
282         CDEBUG(D_INFO, "get_db = %d\n", rc);
283 out_close:
284         llog_close(env, loghandle);
285 out_pop:
286         name_destroy(&logname);
287 out_put:
288         llog_ctxt_put(ctxt);
289
290         RETURN(rc);
291 }
292
293 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
294 {
295         struct mgs_tgt_srpc_conf *tgtconf;
296
297         /* free target-specific rules */
298         while (fsdb->fsdb_srpc_tgt) {
299                 tgtconf = fsdb->fsdb_srpc_tgt;
300                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
301
302                 LASSERT(tgtconf->mtsc_tgt);
303
304                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
305                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
306                 OBD_FREE_PTR(tgtconf);
307         }
308
309         /* free general rules */
310         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
311 }
312
313 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
314 {
315         struct fs_db *fsdb;
316         cfs_list_t *tmp;
317
318         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
319                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
320                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
321                         return fsdb;
322         }
323         return NULL;
324 }
325
326 /* caller must hold the mgs->mgs_fs_db_lock */
327 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
328                                   struct mgs_device *mgs, char *fsname)
329 {
330         struct fs_db *fsdb;
331         int rc;
332         ENTRY;
333
334         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
335                 CERROR("fsname %s is too long\n", fsname);
336                 RETURN(NULL);
337         }
338
339         OBD_ALLOC_PTR(fsdb);
340         if (!fsdb)
341                 RETURN(NULL);
342
343         strcpy(fsdb->fsdb_name, fsname);
344         mutex_init(&fsdb->fsdb_mutex);
345         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
346         fsdb->fsdb_gen = 1;
347
348         if (strcmp(fsname, MGSSELF_NAME) == 0) {
349                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
350         } else {
351                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
352                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
353                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
354                         CERROR("No memory for index maps\n");
355                         GOTO(err, rc = -ENOMEM);
356                 }
357
358                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
359                 if (rc)
360                         GOTO(err, rc);
361                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
362                 if (rc)
363                         GOTO(err, rc);
364
365                 /* initialise data for NID table */
366                 mgs_ir_init_fs(env, mgs, fsdb);
367
368                 lproc_mgs_add_live(mgs, fsdb);
369         }
370
371         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
372
373         RETURN(fsdb);
374 err:
375         if (fsdb->fsdb_ost_index_map)
376                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
377         if (fsdb->fsdb_mdt_index_map)
378                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
379         name_destroy(&fsdb->fsdb_clilov);
380         name_destroy(&fsdb->fsdb_clilmv);
381         OBD_FREE_PTR(fsdb);
382         RETURN(NULL);
383 }
384
385 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
386 {
387         /* wait for anyone with the sem */
388         mutex_lock(&fsdb->fsdb_mutex);
389         lproc_mgs_del_live(mgs, fsdb);
390         cfs_list_del(&fsdb->fsdb_list);
391
392         /* deinitialize fsr */
393         mgs_ir_fini_fs(mgs, fsdb);
394
395         if (fsdb->fsdb_ost_index_map)
396                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
397         if (fsdb->fsdb_mdt_index_map)
398                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
399         name_destroy(&fsdb->fsdb_clilov);
400         name_destroy(&fsdb->fsdb_clilmv);
401         mgs_free_fsdb_srpc(fsdb);
402         mutex_unlock(&fsdb->fsdb_mutex);
403         OBD_FREE_PTR(fsdb);
404 }
405
406 int mgs_init_fsdb_list(struct mgs_device *mgs)
407 {
408         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
409         return 0;
410 }
411
412 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
413 {
414         struct fs_db *fsdb;
415         cfs_list_t *tmp, *tmp2;
416         mutex_lock(&mgs->mgs_mutex);
417         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
418                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
419                 mgs_free_fsdb(mgs, fsdb);
420         }
421         mutex_unlock(&mgs->mgs_mutex);
422         return 0;
423 }
424
425 int mgs_find_or_make_fsdb(const struct lu_env *env,
426                           struct mgs_device *mgs, char *name,
427                           struct fs_db **dbh)
428 {
429         struct fs_db *fsdb;
430         int rc = 0;
431
432         ENTRY;
433         mutex_lock(&mgs->mgs_mutex);
434         fsdb = mgs_find_fsdb(mgs, name);
435         if (fsdb) {
436                 mutex_unlock(&mgs->mgs_mutex);
437                 *dbh = fsdb;
438                 RETURN(0);
439         }
440
441         CDEBUG(D_MGS, "Creating new db\n");
442         fsdb = mgs_new_fsdb(env, mgs, name);
443         /* lock fsdb_mutex until the db is loaded from llogs */
444         if (fsdb)
445                 mutex_lock(&fsdb->fsdb_mutex);
446         mutex_unlock(&mgs->mgs_mutex);
447         if (!fsdb)
448                 RETURN(-ENOMEM);
449
450         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
451                 /* populate the db from the client llog */
452                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
453                 if (rc) {
454                         CERROR("Can't get db from client log %d\n", rc);
455                         GOTO(out_free, rc);
456                 }
457         }
458
459         /* populate srpc rules from params llog */
460         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
461         if (rc) {
462                 CERROR("Can't get db from params log %d\n", rc);
463                 GOTO(out_free, rc);
464         }
465
466         mutex_unlock(&fsdb->fsdb_mutex);
467         *dbh = fsdb;
468
469         RETURN(0);
470
471 out_free:
472         mutex_unlock(&fsdb->fsdb_mutex);
473         mgs_free_fsdb(mgs, fsdb);
474         return rc;
475 }
476
477 /* 1 = index in use
478    0 = index unused
479    -1= empty client log */
480 int mgs_check_index(const struct lu_env *env,
481                     struct mgs_device *mgs,
482                     struct mgs_target_info *mti)
483 {
484         struct fs_db *fsdb;
485         void *imap;
486         int rc = 0;
487         ENTRY;
488
489         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
490
491         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
492         if (rc) {
493                 CERROR("Can't get db for %s\n", mti->mti_fsname);
494                 RETURN(rc);
495         }
496
497         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
498                 RETURN(-1);
499
500         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
501                 imap = fsdb->fsdb_ost_index_map;
502         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
503                 imap = fsdb->fsdb_mdt_index_map;
504         else
505                 RETURN(-EINVAL);
506
507         if (test_bit(mti->mti_stripe_index, imap))
508                 RETURN(1);
509         RETURN(0);
510 }
511
512 static __inline__ int next_index(void *index_map, int map_len)
513 {
514         int i;
515         for (i = 0; i < map_len * 8; i++)
516                 if (!test_bit(i, index_map)) {
517                          return i;
518                  }
519         CERROR("max index %d exceeded.\n", i);
520         return -1;
521 }
522
523 /* Return codes:
524         0  newly marked as in use
525         <0 err
526         +EALREADY for update of an old index */
527 static int mgs_set_index(const struct lu_env *env,
528                          struct mgs_device *mgs,
529                          struct mgs_target_info *mti)
530 {
531         struct fs_db *fsdb;
532         void *imap;
533         int rc = 0;
534         ENTRY;
535
536         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
537         if (rc) {
538                 CERROR("Can't get db for %s\n", mti->mti_fsname);
539                 RETURN(rc);
540         }
541
542         mutex_lock(&fsdb->fsdb_mutex);
543         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
544                 imap = fsdb->fsdb_ost_index_map;
545         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
546                 imap = fsdb->fsdb_mdt_index_map;
547         } else {
548                 GOTO(out_up, rc = -EINVAL);
549         }
550
551         if (mti->mti_flags & LDD_F_NEED_INDEX) {
552                 rc = next_index(imap, INDEX_MAP_SIZE);
553                 if (rc == -1)
554                         GOTO(out_up, rc = -ERANGE);
555                 mti->mti_stripe_index = rc;
556                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
557                         fsdb->fsdb_mdt_count ++;
558         }
559
560         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
561                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
562                                    "but the max index is %d.\n",
563                                    mti->mti_svname, mti->mti_stripe_index,
564                                    INDEX_MAP_SIZE * 8);
565                 GOTO(out_up, rc = -ERANGE);
566         }
567
568         if (test_bit(mti->mti_stripe_index, imap)) {
569                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
570                     !(mti->mti_flags & LDD_F_WRITECONF)) {
571                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
572                                            "%d, but that index is already in "
573                                            "use. Use --writeconf to force\n",
574                                            mti->mti_svname,
575                                            mti->mti_stripe_index);
576                         GOTO(out_up, rc = -EADDRINUSE);
577                 } else {
578                         CDEBUG(D_MGS, "Server %s updating index %d\n",
579                                mti->mti_svname, mti->mti_stripe_index);
580                         GOTO(out_up, rc = EALREADY);
581                 }
582         }
583
584         set_bit(mti->mti_stripe_index, imap);
585         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
586         mutex_unlock(&fsdb->fsdb_mutex);
587         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
588                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
589
590         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
591                mti->mti_stripe_index);
592
593         RETURN(0);
594 out_up:
595         mutex_unlock(&fsdb->fsdb_mutex);
596         return rc;
597 }
598
599 struct mgs_modify_lookup {
600         struct cfg_marker mml_marker;
601         int               mml_modified;
602 };
603
604 static int mgs_modify_handler(const struct lu_env *env,
605                               struct llog_handle *llh,
606                               struct llog_rec_hdr *rec, void *data)
607 {
608         struct mgs_modify_lookup *mml = data;
609         struct cfg_marker *marker;
610         struct lustre_cfg *lcfg = REC_DATA(rec);
611         int cfg_len = REC_DATA_LEN(rec);
612         int rc;
613         ENTRY;
614
615         if (rec->lrh_type != OBD_CFG_REC) {
616                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
617                 RETURN(-EINVAL);
618         }
619
620         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
621         if (rc) {
622                 CERROR("Insane cfg\n");
623                 RETURN(rc);
624         }
625
626         /* We only care about markers */
627         if (lcfg->lcfg_command != LCFG_MARKER)
628                 RETURN(0);
629
630         marker = lustre_cfg_buf(lcfg, 1);
631         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
632             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
633             !(marker->cm_flags & CM_SKIP)) {
634                 /* Found a non-skipped marker match */
635                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
636                        rec->lrh_index, marker->cm_step,
637                        marker->cm_flags, mml->mml_marker.cm_flags,
638                        marker->cm_tgtname, marker->cm_comment);
639                 /* Overwrite the old marker llog entry */
640                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
641                 marker->cm_flags |= mml->mml_marker.cm_flags;
642                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
643                 /* Header and tail are added back to lrh_len in
644                    llog_lvfs_write_rec */
645                 rec->lrh_len = cfg_len;
646                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
647                                 rec->lrh_index);
648                 if (!rc)
649                          mml->mml_modified++;
650         }
651
652         RETURN(rc);
653 }
654
655 /**
656  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
657  * Return code:
658  * 0 - modified successfully,
659  * 1 - no modification was done
660  * negative - error
661  */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct llog_ctxt *ctxt;
668         struct mgs_modify_lookup *mml;
669         int rc;
670
671         ENTRY;
672
673         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
674         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
675                flags);
676
677         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
678         LASSERT(ctxt != NULL);
679         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
680         if (rc < 0) {
681                 if (rc == -ENOENT)
682                         rc = 0;
683                 GOTO(out_pop, rc);
684         }
685
686         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
687         if (rc)
688                 GOTO(out_close, rc);
689
690         if (llog_get_size(loghandle) <= 1)
691                 GOTO(out_close, rc = 0);
692
693         OBD_ALLOC_PTR(mml);
694         if (!mml)
695                 GOTO(out_close, rc = -ENOMEM);
696         if (strlcpy(mml->mml_marker.cm_comment, comment,
697                     sizeof(mml->mml_marker.cm_comment)) >=
698             sizeof(mml->mml_marker.cm_comment))
699                 GOTO(out_free, rc = -E2BIG);
700         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
701                     sizeof(mml->mml_marker.cm_tgtname)) >=
702             sizeof(mml->mml_marker.cm_tgtname))
703                 GOTO(out_free, rc = -E2BIG);
704         /* Modify mostly means cancel */
705         mml->mml_marker.cm_flags = flags;
706         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
707         mml->mml_modified = 0;
708         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
709                           NULL);
710         if (!rc && !mml->mml_modified)
711                 rc = 1;
712
713 out_free:
714         OBD_FREE_PTR(mml);
715
716 out_close:
717         llog_close(env, loghandle);
718 out_pop:
719         if (rc < 0)
720                 CERROR("%s: modify %s/%s failed: rc = %d\n",
721                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
722         llog_ctxt_put(ctxt);
723         RETURN(rc);
724 }
725
726 /** This structure is passed to mgs_replace_handler */
727 struct mgs_replace_uuid_lookup {
728         /* Nids are replaced for this target device */
729         struct mgs_target_info target;
730         /* Temporary modified llog */
731         struct llog_handle *temp_llh;
732         /* Flag is set if in target block*/
733         int in_target_device;
734         /* Nids already added. Just skip (multiple nids) */
735         int device_nids_added;
736         /* Flag is set if this block should not be copied */
737         int skip_it;
738 };
739
740 /**
741  * Check: a) if block should be skipped
742  * b) is it target block
743  *
744  * \param[in] lcfg
745  * \param[in] mrul
746  *
747  * \retval 0 should not to be skipped
748  * \retval 1 should to be skipped
749  */
750 static int check_markers(struct lustre_cfg *lcfg,
751                          struct mgs_replace_uuid_lookup *mrul)
752 {
753          struct cfg_marker *marker;
754
755         /* Track markers. Find given device */
756         if (lcfg->lcfg_command == LCFG_MARKER) {
757                 marker = lustre_cfg_buf(lcfg, 1);
758                 /* Clean llog from records marked as CM_EXCLUDE.
759                    CM_SKIP records are used for "active" command
760                    and can be restored if needed */
761                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
762                     (CM_EXCLUDE | CM_START)) {
763                         mrul->skip_it = 1;
764                         return 1;
765                 }
766
767                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
768                     (CM_EXCLUDE | CM_END)) {
769                         mrul->skip_it = 0;
770                         return 1;
771                 }
772
773                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
774                         LASSERT(!(marker->cm_flags & CM_START) ||
775                                 !(marker->cm_flags & CM_END));
776                         if (marker->cm_flags & CM_START) {
777                                 mrul->in_target_device = 1;
778                                 mrul->device_nids_added = 0;
779                         } else if (marker->cm_flags & CM_END)
780                                 mrul->in_target_device = 0;
781                 }
782         }
783
784         return 0;
785 }
786
787 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
788                        struct lustre_cfg *lcfg)
789 {
790         struct llog_rec_hdr      rec;
791         int                      buflen, rc;
792
793         if (!lcfg || !llh)
794                 return -ENOMEM;
795
796         LASSERT(llh->lgh_ctxt);
797
798         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
799                                 lcfg->lcfg_buflens);
800         rec.lrh_len = llog_data_len(buflen);
801         rec.lrh_type = OBD_CFG_REC;
802
803         /* idx = -1 means append */
804         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
805         if (rc)
806                 CERROR("failed %d\n", rc);
807         return rc;
808 }
809
810 static int record_base(const struct lu_env *env, struct llog_handle *llh,
811                      char *cfgname, lnet_nid_t nid, int cmd,
812                      char *s1, char *s2, char *s3, char *s4)
813 {
814         struct mgs_thread_info *mgi = mgs_env_info(env);
815         struct lustre_cfg     *lcfg;
816         int rc;
817
818         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
819                cmd, s1, s2, s3, s4);
820
821         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
822         if (s1)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
824         if (s2)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
826         if (s3)
827                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
828         if (s4)
829                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
830
831         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
832         if (!lcfg)
833                 return -ENOMEM;
834         lcfg->lcfg_nid = nid;
835
836         rc = record_lcfg(env, llh, lcfg);
837
838         lustre_cfg_free(lcfg);
839
840         if (rc) {
841                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
842                        cmd, s1, s2, s3, s4);
843         }
844         return rc;
845 }
846
847 static inline int record_add_uuid(const struct lu_env *env,
848                                   struct llog_handle *llh,
849                                   uint64_t nid, char *uuid)
850 {
851         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
852 }
853
854 static inline int record_add_conn(const struct lu_env *env,
855                                   struct llog_handle *llh,
856                                   char *devname, char *uuid)
857 {
858         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
859 }
860
861 static inline int record_attach(const struct lu_env *env,
862                                 struct llog_handle *llh, char *devname,
863                                 char *type, char *uuid)
864 {
865         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
866 }
867
868 static inline int record_setup(const struct lu_env *env,
869                                struct llog_handle *llh, char *devname,
870                                char *s1, char *s2, char *s3, char *s4)
871 {
872         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
873 }
874
875 /**
876  * \retval <0 record processing error
877  * \retval n record is processed. No need copy original one.
878  * \retval 0 record is not processed.
879  */
880 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
881                            struct mgs_replace_uuid_lookup *mrul)
882 {
883         int nids_added = 0;
884         lnet_nid_t nid;
885         char *ptr;
886         int rc;
887
888         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
889                 /* LCFG_ADD_UUID command found. Let's skip original command
890                    and add passed nids */
891                 ptr = mrul->target.mti_params;
892                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
893                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
894                                "device %s\n", libcfs_nid2str(nid),
895                                 mrul->target.mti_params,
896                                 mrul->target.mti_svname);
897                         rc = record_add_uuid(env,
898                                              mrul->temp_llh, nid,
899                                              mrul->target.mti_params);
900                         if (!rc)
901                                 nids_added++;
902                 }
903
904                 if (nids_added == 0) {
905                         CERROR("No new nids were added, nid %s with uuid %s, "
906                                "device %s\n", libcfs_nid2str(nid),
907                                mrul->target.mti_params,
908                                mrul->target.mti_svname);
909                         RETURN(-ENXIO);
910                 } else {
911                         mrul->device_nids_added = 1;
912                 }
913
914                 return nids_added;
915         }
916
917         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
918                 /* LCFG_SETUP command found. UUID should be changed */
919                 rc = record_setup(env,
920                                   mrul->temp_llh,
921                                   /* devname the same */
922                                   lustre_cfg_string(lcfg, 0),
923                                   /* s1 is not changed */
924                                   lustre_cfg_string(lcfg, 1),
925                                   /* new uuid should be
926                                   the full nidlist */
927                                   mrul->target.mti_params,
928                                   /* s3 is not changed */
929                                   lustre_cfg_string(lcfg, 3),
930                                   /* s4 is not changed */
931                                   lustre_cfg_string(lcfg, 4));
932                 return rc ? rc : 1;
933         }
934
935         /* Another commands in target device block */
936         return 0;
937 }
938
939 /**
940  * Handler that called for every record in llog.
941  * Records are processed in order they placed in llog.
942  *
943  * \param[in] llh       log to be processed
944  * \param[in] rec       current record
945  * \param[in] data      mgs_replace_uuid_lookup structure
946  *
947  * \retval 0    success
948  */
949 static int mgs_replace_handler(const struct lu_env *env,
950                                struct llog_handle *llh,
951                                struct llog_rec_hdr *rec,
952                                void *data)
953 {
954         struct mgs_replace_uuid_lookup *mrul;
955         struct lustre_cfg *lcfg = REC_DATA(rec);
956         int cfg_len = REC_DATA_LEN(rec);
957         int rc;
958         ENTRY;
959
960         mrul = (struct mgs_replace_uuid_lookup *)data;
961
962         if (rec->lrh_type != OBD_CFG_REC) {
963                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
964                        rec->lrh_type, lcfg->lcfg_command,
965                        lustre_cfg_string(lcfg, 0),
966                        lustre_cfg_string(lcfg, 1));
967                 RETURN(-EINVAL);
968         }
969
970         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
971         if (rc) {
972                 /* Do not copy any invalidated records */
973                 GOTO(skip_out, rc = 0);
974         }
975
976         rc = check_markers(lcfg, mrul);
977         if (rc || mrul->skip_it)
978                 GOTO(skip_out, rc = 0);
979
980         /* Write to new log all commands outside target device block */
981         if (!mrul->in_target_device)
982                 GOTO(copy_out, rc = 0);
983
984         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
985            (failover nids) for this target, assuming that if then
986            primary is changing then so is the failover */
987         if (mrul->device_nids_added &&
988             (lcfg->lcfg_command == LCFG_ADD_UUID ||
989              lcfg->lcfg_command == LCFG_ADD_CONN))
990                 GOTO(skip_out, rc = 0);
991
992         rc = process_command(env, lcfg, mrul);
993         if (rc < 0)
994                 RETURN(rc);
995
996         if (rc)
997                 RETURN(0);
998 copy_out:
999         /* Record is placed in temporary llog as is */
1000         rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1);
1001
1002         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1003                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1004                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1005         RETURN(rc);
1006
1007 skip_out:
1008         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1009                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1010                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1011         RETURN(rc);
1012 }
1013
1014 static int mgs_log_is_empty(const struct lu_env *env,
1015                             struct mgs_device *mgs, char *name)
1016 {
1017         struct llog_ctxt        *ctxt;
1018         int                      rc;
1019
1020         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1021         LASSERT(ctxt != NULL);
1022
1023         rc = llog_is_empty(env, ctxt, name);
1024         llog_ctxt_put(ctxt);
1025         return rc;
1026 }
1027
1028 static int mgs_replace_nids_log(const struct lu_env *env,
1029                                 struct obd_device *mgs, struct fs_db *fsdb,
1030                                 char *logname, char *devname, char *nids)
1031 {
1032         struct llog_handle *orig_llh, *backup_llh;
1033         struct llog_ctxt *ctxt;
1034         struct mgs_replace_uuid_lookup *mrul;
1035         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1036         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1037         char *backup;
1038         int rc, rc2;
1039         ENTRY;
1040
1041         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1042
1043         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1044         LASSERT(ctxt != NULL);
1045
1046         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1047                 /* Log is empty. Nothing to replace */
1048                 GOTO(out_put, rc = 0);
1049         }
1050
1051         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1052         if (backup == NULL)
1053                 GOTO(out_put, rc = -ENOMEM);
1054
1055         sprintf(backup, "%s.bak", logname);
1056
1057         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1058         if (rc == 0) {
1059                 /* Now erase original log file. Connections are not allowed.
1060                    Backup is already saved */
1061                 rc = llog_erase(env, ctxt, NULL, logname);
1062                 if (rc < 0)
1063                         GOTO(out_free, rc);
1064         } else if (rc != -ENOENT) {
1065                 CERROR("%s: can't make backup for %s: rc = %d\n",
1066                        mgs->obd_name, logname, rc);
1067                 GOTO(out_free,rc);
1068         }
1069
1070         /* open local log */
1071         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1072         if (rc)
1073                 GOTO(out_restore, rc);
1074
1075         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1076         if (rc)
1077                 GOTO(out_closel, rc);
1078
1079         /* open backup llog */
1080         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1081                        LLOG_OPEN_EXISTS);
1082         if (rc)
1083                 GOTO(out_closel, rc);
1084
1085         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1086         if (rc)
1087                 GOTO(out_close, rc);
1088
1089         if (llog_get_size(backup_llh) <= 1)
1090                 GOTO(out_close, rc = 0);
1091
1092         OBD_ALLOC_PTR(mrul);
1093         if (!mrul)
1094                 GOTO(out_close, rc = -ENOMEM);
1095         /* devname is only needed information to replace UUID records */
1096         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1097         /* parse nids later */
1098         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1099         /* Copy records to this temporary llog */
1100         mrul->temp_llh = orig_llh;
1101
1102         rc = llog_process(env, backup_llh, mgs_replace_handler,
1103                           (void *)mrul, NULL);
1104         OBD_FREE_PTR(mrul);
1105 out_close:
1106         rc2 = llog_close(NULL, backup_llh);
1107         if (!rc)
1108                 rc = rc2;
1109 out_closel:
1110         rc2 = llog_close(NULL, orig_llh);
1111         if (!rc)
1112                 rc = rc2;
1113
1114 out_restore:
1115         if (rc) {
1116                 CERROR("%s: llog should be restored: rc = %d\n",
1117                        mgs->obd_name, rc);
1118                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1119                                   logname);
1120                 if (rc2 < 0)
1121                         CERROR("%s: can't restore backup %s: rc = %d\n",
1122                                mgs->obd_name, logname, rc2);
1123         }
1124
1125 out_free:
1126         OBD_FREE(backup, strlen(backup) + 1);
1127
1128 out_put:
1129         llog_ctxt_put(ctxt);
1130
1131         if (rc)
1132                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1133                        mgs->obd_name, logname, rc);
1134
1135         RETURN(rc);
1136 }
1137
1138 /**
1139  * Parse device name and get file system name and/or device index
1140  *
1141  * \param[in]   devname device name (ex. lustre-MDT0000)
1142  * \param[out]  fsname  file system name(optional)
1143  * \param[out]  index   device index(optional)
1144  *
1145  * \retval 0    success
1146  */
1147 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1148 {
1149         int rc;
1150         ENTRY;
1151
1152         /* Extract fsname */
1153         if (fsname) {
1154                 rc = server_name2fsname(devname, fsname, NULL);
1155                 if (rc < 0) {
1156                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1157                                devname);
1158                         RETURN(-EINVAL);
1159                 }
1160         }
1161
1162         if (index) {
1163                 rc = server_name2index(devname, index, NULL);
1164                 if (rc < 0) {
1165                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1166                                devname);
1167                         RETURN(-EINVAL);
1168                 }
1169         }
1170
1171         RETURN(0);
1172 }
1173
1174 static int only_mgs_is_running(struct obd_device *mgs_obd)
1175 {
1176         /* TDB: Is global variable with devices count exists? */
1177         int num_devices = get_devices_count();
1178         /* osd, MGS and MGC + self_export
1179            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1180         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1181 }
1182
1183 static int name_create_mdt(char **logname, char *fsname, int i)
1184 {
1185         char mdt_index[9];
1186
1187         sprintf(mdt_index, "-MDT%04x", i);
1188         return name_create(logname, fsname, mdt_index);
1189 }
1190
1191 /**
1192  * Replace nids for \a device to \a nids values
1193  *
1194  * \param obd           MGS obd device
1195  * \param devname       nids need to be replaced for this device
1196  * (ex. lustre-OST0000)
1197  * \param nids          nids list (ex. nid1,nid2,nid3)
1198  *
1199  * \retval 0    success
1200  */
1201 int mgs_replace_nids(const struct lu_env *env,
1202                      struct mgs_device *mgs,
1203                      char *devname, char *nids)
1204 {
1205         /* Assume fsname is part of device name */
1206         char fsname[MTI_NAME_MAXLEN];
1207         int rc;
1208         __u32 index;
1209         char *logname;
1210         struct fs_db *fsdb;
1211         unsigned int i;
1212         int conn_state;
1213         struct obd_device *mgs_obd = mgs->mgs_obd;
1214         ENTRY;
1215
1216         /* We can only change NIDs if no other nodes are connected */
1217         spin_lock(&mgs_obd->obd_dev_lock);
1218         conn_state = mgs_obd->obd_no_conn;
1219         mgs_obd->obd_no_conn = 1;
1220         spin_unlock(&mgs_obd->obd_dev_lock);
1221
1222         /* We can not change nids if not only MGS is started */
1223         if (!only_mgs_is_running(mgs_obd)) {
1224                 CERROR("Only MGS is allowed to be started\n");
1225                 GOTO(out, rc = -EINPROGRESS);
1226         }
1227
1228         /* Get fsname and index*/
1229         rc = mgs_parse_devname(devname, fsname, &index);
1230         if (rc)
1231                 GOTO(out, rc);
1232
1233         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1234         if (rc) {
1235                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1236                 GOTO(out, rc);
1237         }
1238
1239         /* Process client llogs */
1240         name_create(&logname, fsname, "-client");
1241         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1242         name_destroy(&logname);
1243         if (rc) {
1244                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1245                        fsname, devname, rc);
1246                 GOTO(out, rc);
1247         }
1248
1249         /* Process MDT llogs */
1250         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1251                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1252                         continue;
1253                 name_create_mdt(&logname, fsname, i);
1254                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1255                 name_destroy(&logname);
1256                 if (rc)
1257                         GOTO(out, rc);
1258         }
1259
1260 out:
1261         spin_lock(&mgs_obd->obd_dev_lock);
1262         mgs_obd->obd_no_conn = conn_state;
1263         spin_unlock(&mgs_obd->obd_dev_lock);
1264
1265         RETURN(rc);
1266 }
1267
1268 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1269                             char *devname, struct lov_desc *desc)
1270 {
1271         struct mgs_thread_info *mgi = mgs_env_info(env);
1272         struct lustre_cfg *lcfg;
1273         int rc;
1274
1275         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1276         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1277         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1278         if (!lcfg)
1279                 return -ENOMEM;
1280         rc = record_lcfg(env, llh, lcfg);
1281
1282         lustre_cfg_free(lcfg);
1283         return rc;
1284 }
1285
1286 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1287                             char *devname, struct lmv_desc *desc)
1288 {
1289         struct mgs_thread_info *mgi = mgs_env_info(env);
1290         struct lustre_cfg *lcfg;
1291         int rc;
1292
1293         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1294         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1295         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1296
1297         rc = record_lcfg(env, llh, lcfg);
1298
1299         lustre_cfg_free(lcfg);
1300         return rc;
1301 }
1302
1303 static inline int record_mdc_add(const struct lu_env *env,
1304                                  struct llog_handle *llh,
1305                                  char *logname, char *mdcuuid,
1306                                  char *mdtuuid, char *index,
1307                                  char *gen)
1308 {
1309         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1310                            mdtuuid,index,gen,mdcuuid);
1311 }
1312
1313 static inline int record_lov_add(const struct lu_env *env,
1314                                  struct llog_handle *llh,
1315                                  char *lov_name, char *ost_uuid,
1316                                  char *index, char *gen)
1317 {
1318         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1319                            ost_uuid, index, gen, 0);
1320 }
1321
1322 static inline int record_mount_opt(const struct lu_env *env,
1323                                    struct llog_handle *llh,
1324                                    char *profile, char *lov_name,
1325                                    char *mdc_name)
1326 {
1327         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1328                            profile,lov_name,mdc_name,0);
1329 }
1330
1331 static int record_marker(const struct lu_env *env,
1332                          struct llog_handle *llh,
1333                          struct fs_db *fsdb, __u32 flags,
1334                          char *tgtname, char *comment)
1335 {
1336         struct mgs_thread_info *mgi = mgs_env_info(env);
1337         struct lustre_cfg *lcfg;
1338         int rc;
1339         int cplen = 0;
1340
1341         if (flags & CM_START)
1342                 fsdb->fsdb_gen++;
1343         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1344         mgi->mgi_marker.cm_flags = flags;
1345         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1346         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1347                         sizeof(mgi->mgi_marker.cm_tgtname));
1348         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1349                 return -E2BIG;
1350         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1351                         sizeof(mgi->mgi_marker.cm_comment));
1352         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1353                 return -E2BIG;
1354         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1355         mgi->mgi_marker.cm_canceltime = 0;
1356         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1357         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1358                             sizeof(mgi->mgi_marker));
1359         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1360         if (!lcfg)
1361                 return -ENOMEM;
1362         rc = record_lcfg(env, llh, lcfg);
1363
1364         lustre_cfg_free(lcfg);
1365         return rc;
1366 }
1367
1368 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1369                             struct llog_handle **llh, char *name)
1370 {
1371         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1372         struct llog_ctxt        *ctxt;
1373         int                      rc = 0;
1374
1375         if (*llh)
1376                 GOTO(out, rc = -EBUSY);
1377
1378         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1379         if (!ctxt)
1380                 GOTO(out, rc = -ENODEV);
1381         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1382
1383         rc = llog_open_create(env, ctxt, llh, NULL, name);
1384         if (rc)
1385                 GOTO(out_ctxt, rc);
1386         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1387         if (rc)
1388                 llog_close(env, *llh);
1389 out_ctxt:
1390         llog_ctxt_put(ctxt);
1391 out:
1392         if (rc) {
1393                 CERROR("%s: can't start log %s: rc = %d\n",
1394                        mgs->mgs_obd->obd_name, name, rc);
1395                 *llh = NULL;
1396         }
1397         RETURN(rc);
1398 }
1399
1400 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1401 {
1402         int rc;
1403
1404         rc = llog_close(env, *llh);
1405         *llh = NULL;
1406
1407         return rc;
1408 }
1409
1410 /******************** config "macros" *********************/
1411
1412 /* write an lcfg directly into a log (with markers) */
1413 static int mgs_write_log_direct(const struct lu_env *env,
1414                                 struct mgs_device *mgs, struct fs_db *fsdb,
1415                                 char *logname, struct lustre_cfg *lcfg,
1416                                 char *devname, char *comment)
1417 {
1418         struct llog_handle *llh = NULL;
1419         int rc;
1420         ENTRY;
1421
1422         if (!lcfg)
1423                 RETURN(-ENOMEM);
1424
1425         rc = record_start_log(env, mgs, &llh, logname);
1426         if (rc)
1427                 RETURN(rc);
1428
1429         /* FIXME These should be a single journal transaction */
1430         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1431         if (rc)
1432                 GOTO(out_end, rc);
1433         rc = record_lcfg(env, llh, lcfg);
1434         if (rc)
1435                 GOTO(out_end, rc);
1436         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1437         if (rc)
1438                 GOTO(out_end, rc);
1439 out_end:
1440         record_end_log(env, &llh);
1441         RETURN(rc);
1442 }
1443
1444 /* write the lcfg in all logs for the given fs */
1445 int mgs_write_log_direct_all(const struct lu_env *env,
1446                              struct mgs_device *mgs,
1447                              struct fs_db *fsdb,
1448                              struct mgs_target_info *mti,
1449                              struct lustre_cfg *lcfg,
1450                              char *devname, char *comment,
1451                              int server_only)
1452 {
1453         cfs_list_t list;
1454         struct mgs_direntry *dirent, *n;
1455         char *fsname = mti->mti_fsname;
1456         char *logname;
1457         int rc = 0, len = strlen(fsname);
1458         ENTRY;
1459
1460         /* We need to set params for any future logs
1461            as well. FIXME Append this file to every new log.
1462            Actually, we should store as params (text), not llogs.  Or
1463            in a database. */
1464         rc = name_create(&logname, fsname, "-params");
1465         if (rc)
1466                 RETURN(rc);
1467         if (mgs_log_is_empty(env, mgs, logname)) {
1468                 struct llog_handle *llh = NULL;
1469                 rc = record_start_log(env, mgs, &llh, logname);
1470                 if (rc == 0)
1471                         record_end_log(env, &llh);
1472         }
1473         name_destroy(&logname);
1474         if (rc)
1475                 RETURN(rc);
1476
1477         /* Find all the logs in the CONFIGS directory */
1478         rc = class_dentry_readdir(env, mgs, &list);
1479         if (rc)
1480                 RETURN(rc);
1481
1482         /* Could use fsdb index maps instead of directory listing */
1483         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1484                 cfs_list_del(&dirent->list);
1485                 /* don't write to sptlrpc rule log */
1486                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1487                         goto next;
1488
1489                 /* caller wants write server logs only */
1490                 if (server_only && strstr(dirent->name, "-client") != NULL)
1491                         goto next;
1492
1493                 if (strncmp(fsname, dirent->name, len) == 0) {
1494                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1495                         /* Erase any old settings of this same parameter */
1496                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1497                                         devname, comment, CM_SKIP);
1498                         if (rc < 0)
1499                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1500                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1501                         /* Write the new one */
1502                         if (lcfg) {
1503                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1504                                                           dirent->name,
1505                                                           lcfg, devname,
1506                                                           comment);
1507                                 if (rc)
1508                                         CERROR("%s: writing log %s: rc = %d\n",
1509                                                mgs->mgs_obd->obd_name,
1510                                                dirent->name, rc);
1511                         }
1512                 }
1513 next:
1514                 mgs_direntry_free(dirent);
1515         }
1516
1517         RETURN(rc);
1518 }
1519
1520 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1521                                     struct mgs_device *mgs,
1522                                     struct fs_db *fsdb,
1523                                     struct mgs_target_info *mti,
1524                                     int index, char *logname);
1525 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1526                                     struct mgs_device *mgs,
1527                                     struct fs_db *fsdb,
1528                                     struct mgs_target_info *mti,
1529                                     char *logname, char *suffix, char *lovname,
1530                                     enum lustre_sec_part sec_part, int flags);
1531 static int name_create_mdt_and_lov(char **logname, char **lovname,
1532                                    struct fs_db *fsdb, int i);
1533
1534 static int add_param(char *params, char *key, char *val)
1535 {
1536         char *start = params + strlen(params);
1537         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1538         int keylen = 0;
1539
1540         if (key != NULL)
1541                 keylen = strlen(key);
1542         if (start + 1 + keylen + strlen(val) >= end) {
1543                 CERROR("params are too long: %s %s%s\n",
1544                        params, key != NULL ? key : "", val);
1545                 return -EINVAL;
1546         }
1547
1548         sprintf(start, " %s%s", key != NULL ? key : "", val);
1549         return 0;
1550 }
1551
1552 /**
1553  * Walk through client config log record and convert the related records
1554  * into the target.
1555  **/
1556 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1557                                          struct llog_handle *llh,
1558                                          struct llog_rec_hdr *rec, void *data)
1559 {
1560         struct mgs_device *mgs;
1561         struct obd_device *obd;
1562         struct mgs_target_info *mti, *tmti;
1563         struct fs_db *fsdb;
1564         int cfg_len = rec->lrh_len;
1565         char *cfg_buf = (char*) (rec + 1);
1566         struct lustre_cfg *lcfg;
1567         int rc = 0;
1568         struct llog_handle *mdt_llh = NULL;
1569         static int got_an_osc_or_mdc = 0;
1570         /* 0: not found any osc/mdc;
1571            1: found osc;
1572            2: found mdc;
1573         */
1574         static int last_step = -1;
1575         int cplen = 0;
1576
1577         ENTRY;
1578
1579         mti = ((struct temp_comp*)data)->comp_mti;
1580         tmti = ((struct temp_comp*)data)->comp_tmti;
1581         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1582         obd = ((struct temp_comp *)data)->comp_obd;
1583         mgs = lu2mgs_dev(obd->obd_lu_dev);
1584         LASSERT(mgs);
1585
1586         if (rec->lrh_type != OBD_CFG_REC) {
1587                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1588                 RETURN(-EINVAL);
1589         }
1590
1591         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1592         if (rc) {
1593                 CERROR("Insane cfg\n");
1594                 RETURN(rc);
1595         }
1596
1597         lcfg = (struct lustre_cfg *)cfg_buf;
1598
1599         if (lcfg->lcfg_command == LCFG_MARKER) {
1600                 struct cfg_marker *marker;
1601                 marker = lustre_cfg_buf(lcfg, 1);
1602                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1603                     (marker->cm_flags & CM_START) &&
1604                      !(marker->cm_flags & CM_SKIP)) {
1605                         got_an_osc_or_mdc = 1;
1606                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1607                                         sizeof(tmti->mti_svname));
1608                         if (cplen >= sizeof(tmti->mti_svname))
1609                                 RETURN(-E2BIG);
1610                         rc = record_start_log(env, mgs, &mdt_llh,
1611                                               mti->mti_svname);
1612                         if (rc)
1613                                 RETURN(rc);
1614                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1615                                            mti->mti_svname, "add osc(copied)");
1616                         record_end_log(env, &mdt_llh);
1617                         last_step = marker->cm_step;
1618                         RETURN(rc);
1619                 }
1620                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1621                     (marker->cm_flags & CM_END) &&
1622                      !(marker->cm_flags & CM_SKIP)) {
1623                         LASSERT(last_step == marker->cm_step);
1624                         last_step = -1;
1625                         got_an_osc_or_mdc = 0;
1626                         memset(tmti, 0, sizeof(*tmti));
1627                         rc = record_start_log(env, mgs, &mdt_llh,
1628                                               mti->mti_svname);
1629                         if (rc)
1630                                 RETURN(rc);
1631                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1632                                            mti->mti_svname, "add osc(copied)");
1633                         record_end_log(env, &mdt_llh);
1634                         RETURN(rc);
1635                 }
1636                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1637                     (marker->cm_flags & CM_START) &&
1638                      !(marker->cm_flags & CM_SKIP)) {
1639                         got_an_osc_or_mdc = 2;
1640                         last_step = marker->cm_step;
1641                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1642                                strlen(marker->cm_tgtname));
1643
1644                         RETURN(rc);
1645                 }
1646                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1647                     (marker->cm_flags & CM_END) &&
1648                      !(marker->cm_flags & CM_SKIP)) {
1649                         LASSERT(last_step == marker->cm_step);
1650                         last_step = -1;
1651                         got_an_osc_or_mdc = 0;
1652                         memset(tmti, 0, sizeof(*tmti));
1653                         RETURN(rc);
1654                 }
1655         }
1656
1657         if (got_an_osc_or_mdc == 0 || last_step < 0)
1658                 RETURN(rc);
1659
1660         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1661                 uint64_t nodenid = lcfg->lcfg_nid;
1662
1663                 if (strlen(tmti->mti_uuid) == 0) {
1664                         /* target uuid not set, this config record is before
1665                          * LCFG_SETUP, this nid is one of target node nid.
1666                          */
1667                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1668                         tmti->mti_nid_count++;
1669                 } else {
1670                         /* failover node nid */
1671                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1672                                        libcfs_nid2str(nodenid));
1673                 }
1674
1675                 RETURN(rc);
1676         }
1677
1678         if (lcfg->lcfg_command == LCFG_SETUP) {
1679                 char *target;
1680
1681                 target = lustre_cfg_string(lcfg, 1);
1682                 memcpy(tmti->mti_uuid, target, strlen(target));
1683                 RETURN(rc);
1684         }
1685
1686         /* ignore client side sptlrpc_conf_log */
1687         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1688                 RETURN(rc);
1689
1690         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1691                 int index;
1692
1693                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1694                         RETURN (-EINVAL);
1695
1696                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1697                        strlen(mti->mti_fsname));
1698                 tmti->mti_stripe_index = index;
1699
1700                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1701                                               mti->mti_stripe_index,
1702                                               mti->mti_svname);
1703                 memset(tmti, 0, sizeof(*tmti));
1704                 RETURN(rc);
1705         }
1706
1707         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1708                 int index;
1709                 char mdt_index[9];
1710                 char *logname, *lovname;
1711
1712                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1713                                              mti->mti_stripe_index);
1714                 if (rc)
1715                         RETURN(rc);
1716                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1717
1718                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1719                         name_destroy(&logname);
1720                         name_destroy(&lovname);
1721                         RETURN(-EINVAL);
1722                 }
1723
1724                 tmti->mti_stripe_index = index;
1725                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1726                                          mdt_index, lovname,
1727                                          LUSTRE_SP_MDT, 0);
1728                 name_destroy(&logname);
1729                 name_destroy(&lovname);
1730                 RETURN(rc);
1731         }
1732         RETURN(rc);
1733 }
1734
1735 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1736 /* stealed from mgs_get_fsdb_from_llog*/
1737 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1738                                               struct mgs_device *mgs,
1739                                               char *client_name,
1740                                               struct temp_comp* comp)
1741 {
1742         struct llog_handle *loghandle;
1743         struct mgs_target_info *tmti;
1744         struct llog_ctxt *ctxt;
1745         int rc;
1746
1747         ENTRY;
1748
1749         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1750         LASSERT(ctxt != NULL);
1751
1752         OBD_ALLOC_PTR(tmti);
1753         if (tmti == NULL)
1754                 GOTO(out_ctxt, rc = -ENOMEM);
1755
1756         comp->comp_tmti = tmti;
1757         comp->comp_obd = mgs->mgs_obd;
1758
1759         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1760                        LLOG_OPEN_EXISTS);
1761         if (rc < 0) {
1762                 if (rc == -ENOENT)
1763                         rc = 0;
1764                 GOTO(out_pop, rc);
1765         }
1766
1767         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1768         if (rc)
1769                 GOTO(out_close, rc);
1770
1771         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1772                                   (void *)comp, NULL, false);
1773         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1774 out_close:
1775         llog_close(env, loghandle);
1776 out_pop:
1777         OBD_FREE_PTR(tmti);
1778 out_ctxt:
1779         llog_ctxt_put(ctxt);
1780         RETURN(rc);
1781 }
1782
1783 /* lmv is the second thing for client logs */
1784 /* copied from mgs_write_log_lov. Please refer to that.  */
1785 static int mgs_write_log_lmv(const struct lu_env *env,
1786                              struct mgs_device *mgs,
1787                              struct fs_db *fsdb,
1788                              struct mgs_target_info *mti,
1789                              char *logname, char *lmvname)
1790 {
1791         struct llog_handle *llh = NULL;
1792         struct lmv_desc *lmvdesc;
1793         char *uuid;
1794         int rc = 0;
1795         ENTRY;
1796
1797         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1798
1799         OBD_ALLOC_PTR(lmvdesc);
1800         if (lmvdesc == NULL)
1801                 RETURN(-ENOMEM);
1802         lmvdesc->ld_active_tgt_count = 0;
1803         lmvdesc->ld_tgt_count = 0;
1804         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1805         uuid = (char *)lmvdesc->ld_uuid.uuid;
1806
1807         rc = record_start_log(env, mgs, &llh, logname);
1808         if (rc)
1809                 GOTO(out_free, rc);
1810         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1811         if (rc)
1812                 GOTO(out_end, rc);
1813         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1814         if (rc)
1815                 GOTO(out_end, rc);
1816         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1817         if (rc)
1818                 GOTO(out_end, rc);
1819         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1820         if (rc)
1821                 GOTO(out_end, rc);
1822 out_end:
1823         record_end_log(env, &llh);
1824 out_free:
1825         OBD_FREE_PTR(lmvdesc);
1826         RETURN(rc);
1827 }
1828
1829 /* lov is the first thing in the mdt and client logs */
1830 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1831                              struct fs_db *fsdb, struct mgs_target_info *mti,
1832                              char *logname, char *lovname)
1833 {
1834         struct llog_handle *llh = NULL;
1835         struct lov_desc *lovdesc;
1836         char *uuid;
1837         int rc = 0;
1838         ENTRY;
1839
1840         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1841
1842         /*
1843         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1844         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1845               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1846         */
1847
1848         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1849         OBD_ALLOC_PTR(lovdesc);
1850         if (lovdesc == NULL)
1851                 RETURN(-ENOMEM);
1852         lovdesc->ld_magic = LOV_DESC_MAGIC;
1853         lovdesc->ld_tgt_count = 0;
1854         /* Defaults.  Can be changed later by lcfg config_param */
1855         lovdesc->ld_default_stripe_count = 1;
1856         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1857         lovdesc->ld_default_stripe_size = 1024 * 1024;
1858         lovdesc->ld_default_stripe_offset = -1;
1859         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1860         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1861         /* can these be the same? */
1862         uuid = (char *)lovdesc->ld_uuid.uuid;
1863
1864         /* This should always be the first entry in a log.
1865         rc = mgs_clear_log(obd, logname); */
1866         rc = record_start_log(env, mgs, &llh, logname);
1867         if (rc)
1868                 GOTO(out_free, rc);
1869         /* FIXME these should be a single journal transaction */
1870         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1871         if (rc)
1872                 GOTO(out_end, rc);
1873         rc = record_attach(env, llh, lovname, "lov", uuid);
1874         if (rc)
1875                 GOTO(out_end, rc);
1876         rc = record_lov_setup(env, llh, lovname, lovdesc);
1877         if (rc)
1878                 GOTO(out_end, rc);
1879         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1880         if (rc)
1881                 GOTO(out_end, rc);
1882         EXIT;
1883 out_end:
1884         record_end_log(env, &llh);
1885 out_free:
1886         OBD_FREE_PTR(lovdesc);
1887         return rc;
1888 }
1889
1890 /* add failnids to open log */
1891 static int mgs_write_log_failnids(const struct lu_env *env,
1892                                   struct mgs_target_info *mti,
1893                                   struct llog_handle *llh,
1894                                   char *cliname)
1895 {
1896         char *failnodeuuid = NULL;
1897         char *ptr = mti->mti_params;
1898         lnet_nid_t nid;
1899         int rc = 0;
1900
1901         /*
1902         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1903         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1904         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1905         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1906         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1907         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1908         */
1909
1910         /* Pull failnid info out of params string */
1911         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1912                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1913                         if (failnodeuuid == NULL) {
1914                                 /* We don't know the failover node name,
1915                                    so just use the first nid as the uuid */
1916                                 rc = name_create(&failnodeuuid,
1917                                                  libcfs_nid2str(nid), "");
1918                                 if (rc)
1919                                         return rc;
1920                         }
1921                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1922                                "client %s\n", libcfs_nid2str(nid),
1923                                failnodeuuid, cliname);
1924                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1925                 }
1926                 if (failnodeuuid) {
1927                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1928                         name_destroy(&failnodeuuid);
1929                         failnodeuuid = NULL;
1930                 }
1931         }
1932
1933         return rc;
1934 }
1935
1936 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1937                                     struct mgs_device *mgs,
1938                                     struct fs_db *fsdb,
1939                                     struct mgs_target_info *mti,
1940                                     char *logname, char *lmvname)
1941 {
1942         struct llog_handle *llh = NULL;
1943         char *mdcname = NULL;
1944         char *nodeuuid = NULL;
1945         char *mdcuuid = NULL;
1946         char *lmvuuid = NULL;
1947         char index[6];
1948         int i, rc;
1949         ENTRY;
1950
1951         if (mgs_log_is_empty(env, mgs, logname)) {
1952                 CERROR("log is empty! Logical error\n");
1953                 RETURN(-EINVAL);
1954         }
1955
1956         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1957                mti->mti_svname, logname, lmvname);
1958
1959         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1960         if (rc)
1961                 RETURN(rc);
1962         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1963         if (rc)
1964                 GOTO(out_free, rc);
1965         rc = name_create(&mdcuuid, mdcname, "_UUID");
1966         if (rc)
1967                 GOTO(out_free, rc);
1968         rc = name_create(&lmvuuid, lmvname, "_UUID");
1969         if (rc)
1970                 GOTO(out_free, rc);
1971
1972         rc = record_start_log(env, mgs, &llh, logname);
1973         if (rc)
1974                 GOTO(out_free, rc);
1975         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1976                            "add mdc");
1977         if (rc)
1978                 GOTO(out_end, rc);
1979         for (i = 0; i < mti->mti_nid_count; i++) {
1980                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1981                        libcfs_nid2str(mti->mti_nids[i]));
1982
1983                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1984                 if (rc)
1985                         GOTO(out_end, rc);
1986         }
1987
1988         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1989         if (rc)
1990                 GOTO(out_end, rc);
1991         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1992         if (rc)
1993                 GOTO(out_end, rc);
1994         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1995         if (rc)
1996                 GOTO(out_end, rc);
1997         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1998         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1999                             index, "1");
2000         if (rc)
2001                 GOTO(out_end, rc);
2002         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2003                            "add mdc");
2004         if (rc)
2005                 GOTO(out_end, rc);
2006 out_end:
2007         record_end_log(env, &llh);
2008 out_free:
2009         name_destroy(&lmvuuid);
2010         name_destroy(&mdcuuid);
2011         name_destroy(&mdcname);
2012         name_destroy(&nodeuuid);
2013         RETURN(rc);
2014 }
2015
2016 static inline int name_create_lov(char **lovname, char *mdtname,
2017                                   struct fs_db *fsdb, int index)
2018 {
2019         /* COMPAT_180 */
2020         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2021                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2022         else
2023                 return name_create(lovname, mdtname, "-mdtlov");
2024 }
2025
2026 static int name_create_mdt_and_lov(char **logname, char **lovname,
2027                                    struct fs_db *fsdb, int i)
2028 {
2029         int rc;
2030
2031         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2032         if (rc)
2033                 return rc;
2034         /* COMPAT_180 */
2035         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2036                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2037         else
2038                 rc = name_create(lovname, *logname, "-mdtlov");
2039         if (rc) {
2040                 name_destroy(logname);
2041                 *logname = NULL;
2042         }
2043         return rc;
2044 }
2045
2046 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2047                                       struct fs_db *fsdb, int i)
2048 {
2049         char suffix[16];
2050
2051         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2052                 sprintf(suffix, "-osc");
2053         else
2054                 sprintf(suffix, "-osc-MDT%04x", i);
2055         return name_create(oscname, ostname, suffix);
2056 }
2057
2058 /* add new mdc to already existent MDS */
2059 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2060                                     struct mgs_device *mgs,
2061                                     struct fs_db *fsdb,
2062                                     struct mgs_target_info *mti,
2063                                     int mdt_index, char *logname)
2064 {
2065         struct llog_handle      *llh = NULL;
2066         char    *nodeuuid = NULL;
2067         char    *ospname = NULL;
2068         char    *lovuuid = NULL;
2069         char    *mdtuuid = NULL;
2070         char    *svname = NULL;
2071         char    *mdtname = NULL;
2072         char    *lovname = NULL;
2073         char    index_str[16];
2074         int     i, rc;
2075
2076         ENTRY;
2077         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2078                 CERROR("log is empty! Logical error\n");
2079                 RETURN (-EINVAL);
2080         }
2081
2082         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2083                logname);
2084
2085         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2086         if (rc)
2087                 RETURN(rc);
2088
2089         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2090         if (rc)
2091                 GOTO(out_destory, rc);
2092
2093         rc = name_create(&svname, mdtname, "-osp");
2094         if (rc)
2095                 GOTO(out_destory, rc);
2096
2097         sprintf(index_str, "-MDT%04x", mdt_index);
2098         rc = name_create(&ospname, svname, index_str);
2099         if (rc)
2100                 GOTO(out_destory, rc);
2101
2102         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2103         if (rc)
2104                 GOTO(out_destory, rc);
2105
2106         rc = name_create(&lovuuid, lovname, "_UUID");
2107         if (rc)
2108                 GOTO(out_destory, rc);
2109
2110         rc = name_create(&mdtuuid, mdtname, "_UUID");
2111         if (rc)
2112                 GOTO(out_destory, rc);
2113
2114         rc = record_start_log(env, mgs, &llh, logname);
2115         if (rc)
2116                 GOTO(out_destory, rc);
2117
2118         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2119                            "add osp");
2120         if (rc)
2121                 GOTO(out_destory, rc);
2122
2123         for (i = 0; i < mti->mti_nid_count; i++) {
2124                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2125                        libcfs_nid2str(mti->mti_nids[i]));
2126                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2127                 if (rc)
2128                         GOTO(out_end, rc);
2129         }
2130
2131         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2132         if (rc)
2133                 GOTO(out_end, rc);
2134
2135         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2136                           NULL, NULL);
2137         if (rc)
2138                 GOTO(out_end, rc);
2139
2140         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2141         if (rc)
2142                 GOTO(out_end, rc);
2143
2144         /* Add mdc(osp) to lod */
2145         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2146                  mti->mti_stripe_index);
2147         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2148                          index_str, "1", NULL);
2149         if (rc)
2150                 GOTO(out_end, rc);
2151
2152         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2153         if (rc)
2154                 GOTO(out_end, rc);
2155
2156 out_end:
2157         record_end_log(env, &llh);
2158
2159 out_destory:
2160         name_destroy(&mdtuuid);
2161         name_destroy(&lovuuid);
2162         name_destroy(&lovname);
2163         name_destroy(&ospname);
2164         name_destroy(&svname);
2165         name_destroy(&nodeuuid);
2166         name_destroy(&mdtname);
2167         RETURN(rc);
2168 }
2169
2170 static int mgs_write_log_mdt0(const struct lu_env *env,
2171                               struct mgs_device *mgs,
2172                               struct fs_db *fsdb,
2173                               struct mgs_target_info *mti)
2174 {
2175         char *log = mti->mti_svname;
2176         struct llog_handle *llh = NULL;
2177         char *uuid, *lovname;
2178         char mdt_index[6];
2179         char *ptr = mti->mti_params;
2180         int rc = 0, failout = 0;
2181         ENTRY;
2182
2183         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2184         if (uuid == NULL)
2185                 RETURN(-ENOMEM);
2186
2187         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2188                 failout = (strncmp(ptr, "failout", 7) == 0);
2189
2190         rc = name_create(&lovname, log, "-mdtlov");
2191         if (rc)
2192                 GOTO(out_free, rc);
2193         if (mgs_log_is_empty(env, mgs, log)) {
2194                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2195                 if (rc)
2196                         GOTO(out_lod, rc);
2197         }
2198
2199         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2200
2201         rc = record_start_log(env, mgs, &llh, log);
2202         if (rc)
2203                 GOTO(out_lod, rc);
2204
2205         /* add MDT itself */
2206
2207         /* FIXME this whole fn should be a single journal transaction */
2208         sprintf(uuid, "%s_UUID", log);
2209         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2210         if (rc)
2211                 GOTO(out_lod, rc);
2212         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2213         if (rc)
2214                 GOTO(out_end, rc);
2215         rc = record_mount_opt(env, llh, log, lovname, NULL);
2216         if (rc)
2217                 GOTO(out_end, rc);
2218         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2219                         failout ? "n" : "f");
2220         if (rc)
2221                 GOTO(out_end, rc);
2222         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2223         if (rc)
2224                 GOTO(out_end, rc);
2225 out_end:
2226         record_end_log(env, &llh);
2227 out_lod:
2228         name_destroy(&lovname);
2229 out_free:
2230         OBD_FREE(uuid, sizeof(struct obd_uuid));
2231         RETURN(rc);
2232 }
2233
2234 /* envelope method for all layers log */
2235 static int mgs_write_log_mdt(const struct lu_env *env,
2236                              struct mgs_device *mgs,
2237                              struct fs_db *fsdb,
2238                              struct mgs_target_info *mti)
2239 {
2240         struct mgs_thread_info *mgi = mgs_env_info(env);
2241         struct llog_handle *llh = NULL;
2242         char *cliname;
2243         int rc, i = 0;
2244         ENTRY;
2245
2246         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2247
2248         if (mti->mti_uuid[0] == '\0') {
2249                 /* Make up our own uuid */
2250                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2251                          "%s_UUID", mti->mti_svname);
2252         }
2253
2254         /* add mdt */
2255         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2256         if (rc)
2257                 RETURN(rc);
2258         /* Append the mdt info to the client log */
2259         rc = name_create(&cliname, mti->mti_fsname, "-client");
2260         if (rc)
2261                 RETURN(rc);
2262
2263         if (mgs_log_is_empty(env, mgs, cliname)) {
2264                 /* Start client log */
2265                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2266                                        fsdb->fsdb_clilov);
2267                 if (rc)
2268                         GOTO(out_free, rc);
2269                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2270                                        fsdb->fsdb_clilmv);
2271                 if (rc)
2272                         GOTO(out_free, rc);
2273         }
2274
2275         /*
2276         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2277         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2278         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2279         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2280         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2281         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2282         */
2283
2284                 /* copy client info about lov/lmv */
2285                 mgi->mgi_comp.comp_mti = mti;
2286                 mgi->mgi_comp.comp_fsdb = fsdb;
2287
2288                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2289                                                         &mgi->mgi_comp);
2290                 if (rc)
2291                         GOTO(out_free, rc);
2292                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2293                                               fsdb->fsdb_clilmv);
2294                 if (rc)
2295                         GOTO(out_free, rc);
2296
2297                 /* add mountopts */
2298                 rc = record_start_log(env, mgs, &llh, cliname);
2299                 if (rc)
2300                         GOTO(out_free, rc);
2301
2302                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2303                                    "mount opts");
2304                 if (rc)
2305                         GOTO(out_end, rc);
2306                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2307                                       fsdb->fsdb_clilmv);
2308                 if (rc)
2309                         GOTO(out_end, rc);
2310                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2311                                    "mount opts");
2312
2313         if (rc)
2314                 GOTO(out_end, rc);
2315
2316         /* for_all_existing_mdt except current one */
2317         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2318                 if (i !=  mti->mti_stripe_index &&
2319                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2320                         char *logname;
2321
2322                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2323                         if (rc)
2324                                 GOTO(out_end, rc);
2325
2326                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2327                                                       i, logname);
2328                         name_destroy(&logname);
2329                         if (rc)
2330                                 GOTO(out_end, rc);
2331                 }
2332         }
2333 out_end:
2334         record_end_log(env, &llh);
2335 out_free:
2336         name_destroy(&cliname);
2337         RETURN(rc);
2338 }
2339
2340 /* Add the ost info to the client/mdt lov */
2341 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2342                                     struct mgs_device *mgs, struct fs_db *fsdb,
2343                                     struct mgs_target_info *mti,
2344                                     char *logname, char *suffix, char *lovname,
2345                                     enum lustre_sec_part sec_part, int flags)
2346 {
2347         struct llog_handle *llh = NULL;
2348         char *nodeuuid = NULL;
2349         char *oscname = NULL;
2350         char *oscuuid = NULL;
2351         char *lovuuid = NULL;
2352         char *svname = NULL;
2353         char index[6];
2354         int i, rc;
2355
2356         ENTRY;
2357         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2358                mti->mti_svname, logname);
2359
2360         if (mgs_log_is_empty(env, mgs, logname)) {
2361                 CERROR("log is empty! Logical error\n");
2362                 RETURN (-EINVAL);
2363         }
2364
2365         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2366         if (rc)
2367                 RETURN(rc);
2368         rc = name_create(&svname, mti->mti_svname, "-osc");
2369         if (rc)
2370                 GOTO(out_free, rc);
2371
2372         /* for the system upgraded from old 1.8, keep using the old osc naming
2373          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2374         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2375                 rc = name_create(&oscname, svname, "");
2376         else
2377                 rc = name_create(&oscname, svname, suffix);
2378         if (rc)
2379                 GOTO(out_free, rc);
2380
2381         rc = name_create(&oscuuid, oscname, "_UUID");
2382         if (rc)
2383                 GOTO(out_free, rc);
2384         rc = name_create(&lovuuid, lovname, "_UUID");
2385         if (rc)
2386                 GOTO(out_free, rc);
2387
2388
2389         /*
2390         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2391         multihomed (#4)
2392         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2393         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2394         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2395         failover (#6,7)
2396         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2397         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2398         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2399         */
2400
2401         rc = record_start_log(env, mgs, &llh, logname);
2402         if (rc)
2403                 GOTO(out_free, rc);
2404
2405         /* FIXME these should be a single journal transaction */
2406         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2407                            "add osc");
2408         if (rc)
2409                 GOTO(out_end, rc);
2410
2411         /* NB: don't change record order, because upon MDT steal OSC config
2412          * from client, it treats all nids before LCFG_SETUP as target nids
2413          * (multiple interfaces), while nids after as failover node nids.
2414          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2415          */
2416         for (i = 0; i < mti->mti_nid_count; i++) {
2417                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2418                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2419                 if (rc)
2420                         GOTO(out_end, rc);
2421         }
2422         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2423         if (rc)
2424                 GOTO(out_end, rc);
2425         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2426         if (rc)
2427                 GOTO(out_end, rc);
2428         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2429         if (rc)
2430                 GOTO(out_end, rc);
2431
2432         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2433
2434         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2435         if (rc)
2436                 GOTO(out_end, rc);
2437         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2438                            "add osc");
2439         if (rc)
2440                 GOTO(out_end, rc);
2441 out_end:
2442         record_end_log(env, &llh);
2443 out_free:
2444         name_destroy(&lovuuid);
2445         name_destroy(&oscuuid);
2446         name_destroy(&oscname);
2447         name_destroy(&svname);
2448         name_destroy(&nodeuuid);
2449         RETURN(rc);
2450 }
2451
2452 static int mgs_write_log_ost(const struct lu_env *env,
2453                              struct mgs_device *mgs, struct fs_db *fsdb,
2454                              struct mgs_target_info *mti)
2455 {
2456         struct llog_handle *llh = NULL;
2457         char *logname, *lovname;
2458         char *ptr = mti->mti_params;
2459         int rc, flags = 0, failout = 0, i;
2460         ENTRY;
2461
2462         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2463
2464         /* The ost startup log */
2465
2466         /* If the ost log already exists, that means that someone reformatted
2467            the ost and it called target_add again. */
2468         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2469                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2470                                    "exists, yet the server claims it never "
2471                                    "registered. It may have been reformatted, "
2472                                    "or the index changed. writeconf the MDT to "
2473                                    "regenerate all logs.\n", mti->mti_svname);
2474                 RETURN(-EALREADY);
2475         }
2476
2477         /*
2478         attach obdfilter ost1 ost1_UUID
2479         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2480         */
2481         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2482                 failout = (strncmp(ptr, "failout", 7) == 0);
2483         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2484         if (rc)
2485                 RETURN(rc);
2486         /* FIXME these should be a single journal transaction */
2487         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2488         if (rc)
2489                 GOTO(out_end, rc);
2490         if (*mti->mti_uuid == '\0')
2491                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2492                          "%s_UUID", mti->mti_svname);
2493         rc = record_attach(env, llh, mti->mti_svname,
2494                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2495         if (rc)
2496                 GOTO(out_end, rc);
2497         rc = record_setup(env, llh, mti->mti_svname,
2498                           "dev"/*ignored*/, "type"/*ignored*/,
2499                           failout ? "n" : "f", 0/*options*/);
2500         if (rc)
2501                 GOTO(out_end, rc);
2502         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2503         if (rc)
2504                 GOTO(out_end, rc);
2505 out_end:
2506         record_end_log(env, &llh);
2507         if (rc)
2508                 RETURN(rc);
2509         /* We also have to update the other logs where this osc is part of
2510            the lov */
2511
2512         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2513                 /* If we're upgrading, the old mdt log already has our
2514                    entry. Let's do a fake one for fun. */
2515                 /* Note that we can't add any new failnids, since we don't
2516                    know the old osc names. */
2517                 flags = CM_SKIP | CM_UPGRADE146;
2518
2519         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2520                 /* If the update flag isn't set, don't update client/mdt
2521                    logs. */
2522                 flags |= CM_SKIP;
2523                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2524                               "the MDT first to regenerate it.\n",
2525                               mti->mti_svname);
2526         }
2527
2528         /* Add ost to all MDT lov defs */
2529         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2530                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2531                         char mdt_index[9];
2532
2533                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2534                                                      i);
2535                         if (rc)
2536                                 RETURN(rc);
2537                         sprintf(mdt_index, "-MDT%04x", i);
2538                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2539                                                       logname, mdt_index,
2540                                                       lovname, LUSTRE_SP_MDT,
2541                                                       flags);
2542                         name_destroy(&logname);
2543                         name_destroy(&lovname);
2544                         if (rc)
2545                                 RETURN(rc);
2546                 }
2547         }
2548
2549         /* Append ost info to the client log */
2550         rc = name_create(&logname, mti->mti_fsname, "-client");
2551         if (rc)
2552                 RETURN(rc);
2553         if (mgs_log_is_empty(env, mgs, logname)) {
2554                 /* Start client log */
2555                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2556                                        fsdb->fsdb_clilov);
2557                 if (rc)
2558                         GOTO(out_free, rc);
2559                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2560                                        fsdb->fsdb_clilmv);
2561                 if (rc)
2562                         GOTO(out_free, rc);
2563         }
2564         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2565                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2566 out_free:
2567         name_destroy(&logname);
2568         RETURN(rc);
2569 }
2570
2571 static __inline__ int mgs_param_empty(char *ptr)
2572 {
2573         char *tmp;
2574
2575         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2576                 return 1;
2577         return 0;
2578 }
2579
2580 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2581                                           struct mgs_device *mgs,
2582                                           struct fs_db *fsdb,
2583                                           struct mgs_target_info *mti,
2584                                           char *logname, char *cliname)
2585 {
2586         int rc;
2587         struct llog_handle *llh = NULL;
2588
2589         if (mgs_param_empty(mti->mti_params)) {
2590                 /* Remove _all_ failnids */
2591                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2592                                 mti->mti_svname, "add failnid", CM_SKIP);
2593                 return rc < 0 ? rc : 0;
2594         }
2595
2596         /* Otherwise failover nids are additive */
2597         rc = record_start_log(env, mgs, &llh, logname);
2598         if (rc)
2599                 return rc;
2600                 /* FIXME this should be a single journal transaction */
2601         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2602                            "add failnid");
2603         if (rc)
2604                 goto out_end;
2605         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2606         if (rc)
2607                 goto out_end;
2608         rc = record_marker(env, llh, fsdb, CM_END,
2609                            mti->mti_svname, "add failnid");
2610 out_end:
2611         record_end_log(env, &llh);
2612         return rc;
2613 }
2614
2615
2616 /* Add additional failnids to an existing log.
2617    The mdc/osc must have been added to logs first */
2618 /* tcp nids must be in dotted-quad ascii -
2619    we can't resolve hostnames from the kernel. */
2620 static int mgs_write_log_add_failnid(const struct lu_env *env,
2621                                      struct mgs_device *mgs,
2622                                      struct fs_db *fsdb,
2623                                      struct mgs_target_info *mti)
2624 {
2625         char *logname, *cliname;
2626         int rc;
2627         ENTRY;
2628
2629         /* FIXME we currently can't erase the failnids
2630          * given when a target first registers, since they aren't part of
2631          * an "add uuid" stanza */
2632
2633         /* Verify that we know about this target */
2634         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2635                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2636                                    "yet. It must be started before failnids "
2637                                    "can be added.\n", mti->mti_svname);
2638                 RETURN(-ENOENT);
2639         }
2640
2641         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2642         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2643                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2644         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2645                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2646         } else {
2647                 RETURN(-EINVAL);
2648         }
2649         if (rc)
2650                 RETURN(rc);
2651         /* Add failover nids to the client log */
2652         rc = name_create(&logname, mti->mti_fsname, "-client");
2653         if (rc) {
2654                 name_destroy(&cliname);
2655                 RETURN(rc);
2656         }
2657         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2658         name_destroy(&logname);
2659         name_destroy(&cliname);
2660         if (rc)
2661                 RETURN(rc);
2662
2663         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2664                 /* Add OST failover nids to the MDT logs as well */
2665                 int i;
2666
2667                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2668                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2669                                 continue;
2670                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2671                         if (rc)
2672                                 RETURN(rc);
2673                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2674                                                  fsdb, i);
2675                         if (rc) {
2676                                 name_destroy(&logname);
2677                                 RETURN(rc);
2678                         }
2679                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2680                                                             mti, logname,
2681                                                             cliname);
2682                         name_destroy(&cliname);
2683                         name_destroy(&logname);
2684                         if (rc)
2685                                 RETURN(rc);
2686                 }
2687         }
2688
2689         RETURN(rc);
2690 }
2691
2692 static int mgs_wlp_lcfg(const struct lu_env *env,
2693                         struct mgs_device *mgs, struct fs_db *fsdb,
2694                         struct mgs_target_info *mti,
2695                         char *logname, struct lustre_cfg_bufs *bufs,
2696                         char *tgtname, char *ptr)
2697 {
2698         char comment[MTI_NAME_MAXLEN];
2699         char *tmp;
2700         struct lustre_cfg *lcfg;
2701         int rc, del;
2702
2703         /* Erase any old settings of this same parameter */
2704         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2705         comment[MTI_NAME_MAXLEN - 1] = 0;
2706         /* But don't try to match the value. */
2707         if ((tmp = strchr(comment, '=')))
2708             *tmp = 0;
2709         /* FIXME we should skip settings that are the same as old values */
2710         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2711         if (rc < 0)
2712                 return rc;
2713         del = mgs_param_empty(ptr);
2714
2715         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2716                       "Sett" : "Modify", tgtname, comment, logname);
2717         if (del)
2718                 return rc;
2719
2720         lustre_cfg_bufs_reset(bufs, tgtname);
2721         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2722         if (mti->mti_flags & LDD_F_PARAM2)
2723                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2724
2725         lcfg = lustre_cfg_new((mti->mti_flags & LDD_F_PARAM2) ?
2726                               LCFG_SET_PARAM : LCFG_PARAM, bufs);
2727
2728         if (!lcfg)
2729                 return -ENOMEM;
2730         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2731         lustre_cfg_free(lcfg);
2732         return rc;
2733 }
2734
2735 static int mgs_write_log_param2(const struct lu_env *env,
2736                                 struct mgs_device *mgs,
2737                                 struct fs_db *fsdb,
2738                                 struct mgs_target_info *mti, char *ptr)
2739 {
2740         struct lustre_cfg_bufs  bufs;
2741         int                     rc = 0;
2742         ENTRY;
2743
2744         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2745         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2746                           mti->mti_svname, ptr);
2747
2748         RETURN(rc);
2749 }
2750
2751 /* write global variable settings into log */
2752 static int mgs_write_log_sys(const struct lu_env *env,
2753                              struct mgs_device *mgs, struct fs_db *fsdb,
2754                              struct mgs_target_info *mti, char *sys, char *ptr)
2755 {
2756         struct mgs_thread_info *mgi = mgs_env_info(env);
2757         struct lustre_cfg *lcfg;
2758         char *tmp, sep;
2759         int rc, cmd, convert = 1;
2760
2761         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2762                 cmd = LCFG_SET_TIMEOUT;
2763         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2764                 cmd = LCFG_SET_LDLM_TIMEOUT;
2765         /* Check for known params here so we can return error to lctl */
2766         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2767                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2768                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2769                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2770                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2771                 cmd = LCFG_PARAM;
2772         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2773                 convert = 0; /* Don't convert string value to integer */
2774                 cmd = LCFG_PARAM;
2775         } else {
2776                 return -EINVAL;
2777         }
2778
2779         if (mgs_param_empty(ptr))
2780                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2781         else
2782                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2783
2784         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2785         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2786         if (!convert && *tmp != '\0')
2787                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2788         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2789         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2790         /* truncate the comment to the parameter name */
2791         ptr = tmp - 1;
2792         sep = *ptr;
2793         *ptr = '\0';
2794         /* modify all servers and clients */
2795         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2796                                       *tmp == '\0' ? NULL : lcfg,
2797                                       mti->mti_fsname, sys, 0);
2798         if (rc == 0 && *tmp != '\0') {
2799                 switch (cmd) {
2800                 case LCFG_SET_TIMEOUT:
2801                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2802                                 class_process_config(lcfg);
2803                         break;
2804                 case LCFG_SET_LDLM_TIMEOUT:
2805                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2806                                 class_process_config(lcfg);
2807                         break;
2808                 default:
2809                         break;
2810                 }
2811         }
2812         *ptr = sep;
2813         lustre_cfg_free(lcfg);
2814         return rc;
2815 }
2816
2817 /* write quota settings into log */
2818 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2819                                struct fs_db *fsdb, struct mgs_target_info *mti,
2820                                char *quota, char *ptr)
2821 {
2822         struct mgs_thread_info  *mgi = mgs_env_info(env);
2823         struct lustre_cfg       *lcfg;
2824         char                    *tmp;
2825         char                     sep;
2826         int                      rc, cmd = LCFG_PARAM;
2827
2828         /* support only 'meta' and 'data' pools so far */
2829         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2830             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2831                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2832                        "& quota.ost are)\n", ptr);
2833                 return -EINVAL;
2834         }
2835
2836         if (*tmp == '\0') {
2837                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2838         } else {
2839                 CDEBUG(D_MGS, "global '%s'\n", quota);
2840
2841                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2842                     strcmp(tmp, "none") != 0) {
2843                         CERROR("enable option(%s) isn't supported\n", tmp);
2844                         return -EINVAL;
2845                 }
2846         }
2847
2848         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2849         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2850         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2851         /* truncate the comment to the parameter name */
2852         ptr = tmp - 1;
2853         sep = *ptr;
2854         *ptr = '\0';
2855
2856         /* XXX we duplicated quota enable information in all server
2857          *     config logs, it should be moved to a separate config
2858          *     log once we cleanup the config log for global param. */
2859         /* modify all servers */
2860         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2861                                       *tmp == '\0' ? NULL : lcfg,
2862                                       mti->mti_fsname, quota, 1);
2863         *ptr = sep;
2864         lustre_cfg_free(lcfg);
2865         return rc < 0 ? rc : 0;
2866 }
2867
2868 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2869                                    struct mgs_device *mgs,
2870                                    struct fs_db *fsdb,
2871                                    struct mgs_target_info *mti,
2872                                    char *param)
2873 {
2874         struct mgs_thread_info *mgi = mgs_env_info(env);
2875         struct llog_handle     *llh = NULL;
2876         char                   *logname;
2877         char                   *comment, *ptr;
2878         struct lustre_cfg      *lcfg;
2879         int                     rc, len;
2880         ENTRY;
2881
2882         /* get comment */
2883         ptr = strchr(param, '=');
2884         LASSERT(ptr);
2885         len = ptr - param;
2886
2887         OBD_ALLOC(comment, len + 1);
2888         if (comment == NULL)
2889                 RETURN(-ENOMEM);
2890         strncpy(comment, param, len);
2891         comment[len] = '\0';
2892
2893         /* prepare lcfg */
2894         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2895         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2896         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2897         if (lcfg == NULL)
2898                 GOTO(out_comment, rc = -ENOMEM);
2899
2900         /* construct log name */
2901         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2902         if (rc)
2903                 GOTO(out_lcfg, rc);
2904
2905         if (mgs_log_is_empty(env, mgs, logname)) {
2906                 rc = record_start_log(env, mgs, &llh, logname);
2907                 if (rc)
2908                         GOTO(out, rc);
2909                 record_end_log(env, &llh);
2910         }
2911
2912         /* obsolete old one */
2913         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2914                         comment, CM_SKIP);
2915         if (rc < 0)
2916                 GOTO(out, rc);
2917         /* write the new one */
2918         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2919                                   mti->mti_svname, comment);
2920         if (rc)
2921                 CERROR("err %d writing log %s\n", rc, logname);
2922 out:
2923         name_destroy(&logname);
2924 out_lcfg:
2925         lustre_cfg_free(lcfg);
2926 out_comment:
2927         OBD_FREE(comment, len + 1);
2928         RETURN(rc);
2929 }
2930
2931 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2932                                         char *param)
2933 {
2934         char    *ptr;
2935
2936         /* disable the adjustable udesc parameter for now, i.e. use default
2937          * setting that client always ship udesc to MDT if possible. to enable
2938          * it simply remove the following line */
2939         goto error_out;
2940
2941         ptr = strchr(param, '=');
2942         if (ptr == NULL)
2943                 goto error_out;
2944         *ptr++ = '\0';
2945
2946         if (strcmp(param, PARAM_SRPC_UDESC))
2947                 goto error_out;
2948
2949         if (strcmp(ptr, "yes") == 0) {
2950                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2951                 CWARN("Enable user descriptor shipping from client to MDT\n");
2952         } else if (strcmp(ptr, "no") == 0) {
2953                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2954                 CWARN("Disable user descriptor shipping from client to MDT\n");
2955         } else {
2956                 *(ptr - 1) = '=';
2957                 goto error_out;
2958         }
2959         return 0;
2960
2961 error_out:
2962         CERROR("Invalid param: %s\n", param);
2963         return -EINVAL;
2964 }
2965
2966 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2967                                   const char *svname,
2968                                   char *param)
2969 {
2970         struct sptlrpc_rule      rule;
2971         struct sptlrpc_rule_set *rset;
2972         int                      rc;
2973         ENTRY;
2974
2975         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2976                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2977                 RETURN(-EINVAL);
2978         }
2979
2980         if (strncmp(param, PARAM_SRPC_UDESC,
2981                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2982                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2983         }
2984
2985         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2986                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2987                 RETURN(-EINVAL);
2988         }
2989
2990         param += sizeof(PARAM_SRPC_FLVR) - 1;
2991
2992         rc = sptlrpc_parse_rule(param, &rule);
2993         if (rc)
2994                 RETURN(rc);
2995
2996         /* mgs rules implies must be mgc->mgs */
2997         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2998                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2999                      rule.sr_from != LUSTRE_SP_ANY) ||
3000                     (rule.sr_to != LUSTRE_SP_MGS &&
3001                      rule.sr_to != LUSTRE_SP_ANY))
3002                         RETURN(-EINVAL);
3003         }
3004
3005         /* preapre room for this coming rule. svcname format should be:
3006          * - fsname: general rule
3007          * - fsname-tgtname: target-specific rule
3008          */
3009         if (strchr(svname, '-')) {
3010                 struct mgs_tgt_srpc_conf *tgtconf;
3011                 int                       found = 0;
3012
3013                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3014                      tgtconf = tgtconf->mtsc_next) {
3015                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3016                                 found = 1;
3017                                 break;
3018                         }
3019                 }
3020
3021                 if (!found) {
3022                         int name_len;
3023
3024                         OBD_ALLOC_PTR(tgtconf);
3025                         if (tgtconf == NULL)
3026                                 RETURN(-ENOMEM);
3027
3028                         name_len = strlen(svname);
3029
3030                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3031                         if (tgtconf->mtsc_tgt == NULL) {
3032                                 OBD_FREE_PTR(tgtconf);
3033                                 RETURN(-ENOMEM);
3034                         }
3035                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3036
3037                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3038                         fsdb->fsdb_srpc_tgt = tgtconf;
3039                 }
3040
3041                 rset = &tgtconf->mtsc_rset;
3042         } else {
3043                 rset = &fsdb->fsdb_srpc_gen;
3044         }
3045
3046         rc = sptlrpc_rule_set_merge(rset, &rule);
3047
3048         RETURN(rc);
3049 }
3050
3051 static int mgs_srpc_set_param(const struct lu_env *env,
3052                               struct mgs_device *mgs,
3053                               struct fs_db *fsdb,
3054                               struct mgs_target_info *mti,
3055                               char *param)
3056 {
3057         char                   *copy;
3058         int                     rc, copy_size;
3059         ENTRY;
3060
3061 #ifndef HAVE_GSS
3062         RETURN(-EINVAL);
3063 #endif
3064         /* keep a copy of original param, which could be destroied
3065          * during parsing */
3066         copy_size = strlen(param) + 1;
3067         OBD_ALLOC(copy, copy_size);
3068         if (copy == NULL)
3069                 return -ENOMEM;
3070         memcpy(copy, param, copy_size);
3071
3072         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3073         if (rc)
3074                 goto out_free;
3075
3076         /* previous steps guaranteed the syntax is correct */
3077         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3078         if (rc)
3079                 goto out_free;
3080
3081         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3082                 /*
3083                  * for mgs rules, make them effective immediately.
3084                  */
3085                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3086                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3087                                                  &fsdb->fsdb_srpc_gen);
3088         }
3089
3090 out_free:
3091         OBD_FREE(copy, copy_size);
3092         RETURN(rc);
3093 }
3094
3095 struct mgs_srpc_read_data {
3096         struct fs_db   *msrd_fsdb;
3097         int             msrd_skip;
3098 };
3099
3100 static int mgs_srpc_read_handler(const struct lu_env *env,
3101                                  struct llog_handle *llh,
3102                                  struct llog_rec_hdr *rec, void *data)
3103 {
3104         struct mgs_srpc_read_data *msrd = data;
3105         struct cfg_marker         *marker;
3106         struct lustre_cfg         *lcfg = REC_DATA(rec);
3107         char                      *svname, *param;
3108         int                        cfg_len, rc;
3109         ENTRY;
3110
3111         if (rec->lrh_type != OBD_CFG_REC) {
3112                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3113                 RETURN(-EINVAL);
3114         }
3115
3116         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3117                   sizeof(struct llog_rec_tail);
3118
3119         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3120         if (rc) {
3121                 CERROR("Insane cfg\n");
3122                 RETURN(rc);
3123         }
3124
3125         if (lcfg->lcfg_command == LCFG_MARKER) {
3126                 marker = lustre_cfg_buf(lcfg, 1);
3127
3128                 if (marker->cm_flags & CM_START &&
3129                     marker->cm_flags & CM_SKIP)
3130                         msrd->msrd_skip = 1;
3131                 if (marker->cm_flags & CM_END)
3132                         msrd->msrd_skip = 0;
3133
3134                 RETURN(0);
3135         }
3136
3137         if (msrd->msrd_skip)
3138                 RETURN(0);
3139
3140         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3141                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3142                 RETURN(0);
3143         }
3144
3145         svname = lustre_cfg_string(lcfg, 0);
3146         if (svname == NULL) {
3147                 CERROR("svname is empty\n");
3148                 RETURN(0);
3149         }
3150
3151         param = lustre_cfg_string(lcfg, 1);
3152         if (param == NULL) {
3153                 CERROR("param is empty\n");
3154                 RETURN(0);
3155         }
3156
3157         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3158         if (rc)
3159                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3160
3161         RETURN(0);
3162 }
3163
3164 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3165                                 struct mgs_device *mgs,
3166                                 struct fs_db *fsdb)
3167 {
3168         struct llog_handle        *llh = NULL;
3169         struct llog_ctxt          *ctxt;
3170         char                      *logname;
3171         struct mgs_srpc_read_data  msrd;
3172         int                        rc;
3173         ENTRY;
3174
3175         /* construct log name */
3176         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3177         if (rc)
3178                 RETURN(rc);
3179
3180         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3181         LASSERT(ctxt != NULL);
3182
3183         if (mgs_log_is_empty(env, mgs, logname))
3184                 GOTO(out, rc = 0);
3185
3186         rc = llog_open(env, ctxt, &llh, NULL, logname,
3187                        LLOG_OPEN_EXISTS);
3188         if (rc < 0) {
3189                 if (rc == -ENOENT)
3190                         rc = 0;
3191                 GOTO(out, rc);
3192         }
3193
3194         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3195         if (rc)
3196                 GOTO(out_close, rc);
3197
3198         if (llog_get_size(llh) <= 1)
3199                 GOTO(out_close, rc = 0);
3200
3201         msrd.msrd_fsdb = fsdb;
3202         msrd.msrd_skip = 0;
3203
3204         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3205                           NULL);
3206
3207 out_close:
3208         llog_close(env, llh);
3209 out:
3210         llog_ctxt_put(ctxt);
3211         name_destroy(&logname);
3212
3213         if (rc)
3214                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3215         RETURN(rc);
3216 }
3217
3218 /* Permanent settings of all parameters by writing into the appropriate
3219  * configuration logs.
3220  * A parameter with null value ("<param>='\0'") means to erase it out of
3221  * the logs.
3222  */
3223 static int mgs_write_log_param(const struct lu_env *env,
3224                                struct mgs_device *mgs, struct fs_db *fsdb,
3225                                struct mgs_target_info *mti, char *ptr)
3226 {
3227         struct mgs_thread_info *mgi = mgs_env_info(env);
3228         char *logname;
3229         char *tmp;
3230         int rc = 0, rc2 = 0;
3231         ENTRY;
3232
3233         /* For various parameter settings, we have to figure out which logs
3234            care about them (e.g. both mdt and client for lov settings) */
3235         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3236
3237         /* The params are stored in MOUNT_DATA_FILE and modified via
3238            tunefs.lustre, or set using lctl conf_param */
3239
3240         /* Processed in lustre_start_mgc */
3241         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3242                 GOTO(end, rc);
3243
3244         /* Processed in ost/mdt */
3245         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3246                 GOTO(end, rc);
3247
3248         /* Processed in mgs_write_log_ost */
3249         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3250                 if (mti->mti_flags & LDD_F_PARAM) {
3251                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3252                                            "changed with tunefs.lustre"
3253                                            "and --writeconf\n", ptr);
3254                         rc = -EPERM;
3255                 }
3256                 GOTO(end, rc);
3257         }
3258
3259         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3260                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3261                 GOTO(end, rc);
3262         }
3263
3264         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3265                 /* Add a failover nidlist */
3266                 rc = 0;
3267                 /* We already processed failovers params for new
3268                    targets in mgs_write_log_target */
3269                 if (mti->mti_flags & LDD_F_PARAM) {
3270                         CDEBUG(D_MGS, "Adding failnode\n");
3271                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3272                 }
3273                 GOTO(end, rc);
3274         }
3275
3276         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3277                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3278                 GOTO(end, rc);
3279         }
3280
3281         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3282                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3283                 GOTO(end, rc);
3284         }
3285
3286         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3287                 /* active=0 means off, anything else means on */
3288                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3289                 int i;
3290
3291                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3292                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3293                                            "be (de)activated.\n",
3294                                            mti->mti_svname);
3295                         GOTO(end, rc = -EINVAL);
3296                 }
3297                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3298                               flag ? "de": "re", mti->mti_svname);
3299                 /* Modify clilov */
3300                 rc = name_create(&logname, mti->mti_fsname, "-client");
3301                 if (rc)
3302                         GOTO(end, rc);
3303                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3304                                 mti->mti_svname, "add osc", flag);
3305                 name_destroy(&logname);
3306                 if (rc)
3307                         goto active_err;
3308                 /* Modify mdtlov */
3309                 /* Add to all MDT logs for CMD */
3310                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3311                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3312                                 continue;
3313                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3314                         if (rc)
3315                                 GOTO(end, rc);
3316                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3317                                         mti->mti_svname, "add osc", flag);
3318                         name_destroy(&logname);
3319                         if (rc)
3320                                 goto active_err;
3321                 }
3322         active_err:
3323                 if (rc) {
3324                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3325                                            "log (%d). No permanent "
3326                                            "changes were made to the "
3327                                            "config log.\n",
3328                                            mti->mti_svname, rc);
3329                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3330                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3331                                                    " because the log"
3332                                                    "is in the old 1.4"
3333                                                    "style. Consider "
3334                                                    " --writeconf to "
3335                                                    "update the logs.\n");
3336                         GOTO(end, rc);
3337                 }
3338                 /* Fall through to osc proc for deactivating live OSC
3339                    on running MDT / clients. */
3340         }
3341         /* Below here, let obd's XXX_process_config methods handle it */
3342
3343         /* All lov. in proc */
3344         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3345                 char *mdtlovname;
3346
3347                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3348                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3349                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3350                                            "set on the MDT, not %s. "
3351                                            "Ignoring.\n",
3352                                            mti->mti_svname);
3353                         GOTO(end, rc = 0);
3354                 }
3355
3356                 /* Modify mdtlov */
3357                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3358                         GOTO(end, rc = -ENODEV);
3359
3360                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3361                                              mti->mti_stripe_index);
3362                 if (rc)
3363                         GOTO(end, rc);
3364                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3365                                   &mgi->mgi_bufs, mdtlovname, ptr);
3366                 name_destroy(&logname);
3367                 name_destroy(&mdtlovname);
3368                 if (rc)
3369                         GOTO(end, rc);
3370
3371                 /* Modify clilov */
3372                 rc = name_create(&logname, mti->mti_fsname, "-client");
3373                 if (rc)
3374                         GOTO(end, rc);
3375                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3376                                   fsdb->fsdb_clilov, ptr);
3377                 name_destroy(&logname);
3378                 GOTO(end, rc);
3379         }
3380
3381         /* All osc., mdc., llite. params in proc */
3382         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3383             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3384             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3385                 char *cname;
3386
3387                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3388                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3389                                            " cannot be modified. Consider"
3390                                            " updating the configuration with"
3391                                            " --writeconf\n",
3392                                            mti->mti_svname);
3393                         GOTO(end, rc = -EINVAL);
3394                 }
3395                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3396                         rc = name_create(&cname, mti->mti_fsname, "-client");
3397                         /* Add the client type to match the obdname in
3398                            class_config_llog_handler */
3399                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3400                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3401                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3402                         rc = name_create(&cname, mti->mti_svname, "-osc");
3403                 } else {
3404                         GOTO(end, rc = -EINVAL);
3405                 }
3406                 if (rc)
3407                         GOTO(end, rc);
3408
3409                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3410
3411                 /* Modify client */
3412                 rc = name_create(&logname, mti->mti_fsname, "-client");
3413                 if (rc) {
3414                         name_destroy(&cname);
3415                         GOTO(end, rc);
3416                 }
3417                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3418                                   cname, ptr);
3419
3420                 /* osc params affect the MDT as well */
3421                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3422                         int i;
3423
3424                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3425                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3426                                         continue;
3427                                 name_destroy(&cname);
3428                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3429                                                          fsdb, i);
3430                                 name_destroy(&logname);
3431                                 if (rc)
3432                                         break;
3433                                 rc = name_create_mdt(&logname,
3434                                                      mti->mti_fsname, i);
3435                                 if (rc)
3436                                         break;
3437                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3438                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3439                                                           mti, logname,
3440                                                           &mgi->mgi_bufs,
3441                                                           cname, ptr);
3442                                         if (rc)
3443                                                 break;
3444                                 }
3445                         }
3446                 }
3447                 name_destroy(&logname);
3448                 name_destroy(&cname);
3449                 GOTO(end, rc);
3450         }
3451
3452         /* All mdt. params in proc */
3453         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3454                 int i;
3455                 __u32 idx;
3456
3457                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3458                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3459                             MTI_NAME_MAXLEN) == 0)
3460                         /* device is unspecified completely? */
3461                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3462                 else
3463                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3464                 if (rc < 0)
3465                         goto active_err;
3466                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3467                         goto active_err;
3468                 if (rc & LDD_F_SV_ALL) {
3469                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3470                                 if (!test_bit(i,
3471                                                   fsdb->fsdb_mdt_index_map))
3472                                         continue;
3473                                 rc = name_create_mdt(&logname,
3474                                                 mti->mti_fsname, i);
3475                                 if (rc)
3476                                         goto active_err;
3477                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3478                                                   logname, &mgi->mgi_bufs,
3479                                                   logname, ptr);
3480                                 name_destroy(&logname);
3481                                 if (rc)
3482                                         goto active_err;
3483                         }
3484                 } else {
3485                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3486                                           mti->mti_svname, &mgi->mgi_bufs,
3487                                           mti->mti_svname, ptr);
3488                         if (rc)
3489                                 goto active_err;
3490                 }
3491                 GOTO(end, rc);
3492         }
3493
3494         /* All mdd., ost. and osd. params in proc */
3495         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3496             (class_match_param(ptr, PARAM_OST, NULL) == 0) ||
3497             (class_match_param(ptr, PARAM_OSD, NULL) == 0)) {
3498                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3499                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3500                         GOTO(end, rc = -ENODEV);
3501
3502                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3503                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3504                 GOTO(end, rc);
3505         }
3506
3507         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3508         rc2 = -ENOSYS;
3509
3510 end:
3511         if (rc)
3512                 CERROR("err %d on param '%s'\n", rc, ptr);
3513
3514         RETURN(rc ?: rc2);
3515 }
3516
3517 /* Not implementing automatic failover nid addition at this time. */
3518 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3519                       struct mgs_target_info *mti)
3520 {
3521 #if 0
3522         struct fs_db *fsdb;
3523         int rc;
3524         ENTRY;
3525
3526         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3527         if (rc)
3528                 RETURN(rc);
3529
3530         if (mgs_log_is_empty(obd, mti->mti_svname))
3531                 /* should never happen */
3532                 RETURN(-ENOENT);
3533
3534         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3535
3536         /* FIXME We can just check mti->params to see if we're already in
3537            the failover list.  Modify mti->params for rewriting back at
3538            server_register_target(). */
3539
3540         mutex_lock(&fsdb->fsdb_mutex);
3541         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3542         mutex_unlock(&fsdb->fsdb_mutex);
3543
3544         RETURN(rc);
3545 #endif
3546         return 0;
3547 }
3548
3549 int mgs_write_log_target(const struct lu_env *env,
3550                          struct mgs_device *mgs,
3551                          struct mgs_target_info *mti,
3552                          struct fs_db *fsdb)
3553 {
3554         int rc = -EINVAL;
3555         char *buf, *params;
3556         ENTRY;
3557
3558         /* set/check the new target index */
3559         rc = mgs_set_index(env, mgs, mti);
3560         if (rc < 0) {
3561                 CERROR("Can't get index (%d)\n", rc);
3562                 RETURN(rc);
3563         }
3564
3565         if (rc == EALREADY) {
3566                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3567                               mti->mti_stripe_index, mti->mti_svname);
3568                 /* We would like to mark old log sections as invalid
3569                    and add new log sections in the client and mdt logs.
3570                    But if we add new sections, then live clients will
3571                    get repeat setup instructions for already running
3572                    osc's. So don't update the client/mdt logs. */
3573                 mti->mti_flags &= ~LDD_F_UPDATE;
3574                 rc = 0;
3575         }
3576
3577         mutex_lock(&fsdb->fsdb_mutex);
3578
3579         if (mti->mti_flags &
3580             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3581                 /* Generate a log from scratch */
3582                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3583                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3584                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3585                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3586                 } else {
3587                         CERROR("Unknown target type %#x, can't create log for "
3588                                "%s\n", mti->mti_flags, mti->mti_svname);
3589                 }
3590                 if (rc) {
3591                         CERROR("Can't write logs for %s (%d)\n",
3592                                mti->mti_svname, rc);
3593                         GOTO(out_up, rc);
3594                 }
3595         } else {
3596                 /* Just update the params from tunefs in mgs_write_log_params */
3597                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3598                 mti->mti_flags |= LDD_F_PARAM;
3599         }
3600
3601         /* allocate temporary buffer, where class_get_next_param will
3602            make copy of a current  parameter */
3603         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3604         if (buf == NULL)
3605                 GOTO(out_up, rc = -ENOMEM);
3606         params = mti->mti_params;
3607         while (params != NULL) {
3608                 rc = class_get_next_param(&params, buf);
3609                 if (rc) {
3610                         if (rc == 1)
3611                                 /* there is no next parameter, that is
3612                                    not an error */
3613                                 rc = 0;
3614                         break;
3615                 }
3616                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3617                        params, buf);
3618                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3619                 if (rc)
3620                         break;
3621         }
3622
3623         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3624
3625 out_up:
3626         mutex_unlock(&fsdb->fsdb_mutex);
3627         RETURN(rc);
3628 }
3629
3630 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3631 {
3632         struct llog_ctxt        *ctxt;
3633         int                      rc = 0;
3634
3635         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3636         if (ctxt == NULL) {
3637                 CERROR("%s: MGS config context doesn't exist\n",
3638                        mgs->mgs_obd->obd_name);
3639                 rc = -ENODEV;
3640         } else {
3641                 rc = llog_erase(env, ctxt, NULL, name);
3642                 /* llog may not exist */
3643                 if (rc == -ENOENT)
3644                         rc = 0;
3645                 llog_ctxt_put(ctxt);
3646         }
3647
3648         if (rc)
3649                 CERROR("%s: failed to clear log %s: %d\n",
3650                        mgs->mgs_obd->obd_name, name, rc);
3651
3652         return rc;
3653 }
3654
3655 /* erase all logs for the given fs */
3656 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3657 {
3658         struct fs_db *fsdb;
3659         cfs_list_t list;
3660         struct mgs_direntry *dirent, *n;
3661         int rc, len = strlen(fsname);
3662         char *suffix;
3663         ENTRY;
3664
3665         /* Find all the logs in the CONFIGS directory */
3666         rc = class_dentry_readdir(env, mgs, &list);
3667         if (rc)
3668                 RETURN(rc);
3669
3670         mutex_lock(&mgs->mgs_mutex);
3671
3672         /* Delete the fs db */
3673         fsdb = mgs_find_fsdb(mgs, fsname);
3674         if (fsdb)
3675                 mgs_free_fsdb(mgs, fsdb);
3676
3677         mutex_unlock(&mgs->mgs_mutex);
3678
3679         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3680                 cfs_list_del(&dirent->list);
3681                 suffix = strrchr(dirent->name, '-');
3682                 if (suffix != NULL) {
3683                         if ((len == suffix - dirent->name) &&
3684                             (strncmp(fsname, dirent->name, len) == 0)) {
3685                                 CDEBUG(D_MGS, "Removing log %s\n",
3686                                        dirent->name);
3687                                 mgs_erase_log(env, mgs, dirent->name);
3688                         }
3689                 }
3690                 mgs_direntry_free(dirent);
3691         }
3692
3693         RETURN(rc);
3694 }
3695
3696 /* list all logs for the given fs */
3697 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3698                   struct obd_ioctl_data *data)
3699 {
3700         cfs_list_t               list;
3701         struct mgs_direntry     *dirent, *n;
3702         char                    *out, *suffix;
3703         int                      l, remains, rc;
3704
3705         ENTRY;
3706
3707         /* Find all the logs in the CONFIGS directory */
3708         rc = class_dentry_readdir(env, mgs, &list);
3709         if (rc) {
3710                 CERROR("%s: can't read %s dir = %d\n",
3711                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
3712                 RETURN(rc);
3713         }
3714
3715         out = data->ioc_bulk;
3716         remains = data->ioc_inllen1;
3717         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3718                 cfs_list_del(&dirent->list);
3719                 suffix = strrchr(dirent->name, '-');
3720                 if (suffix != NULL) {
3721                         l = snprintf(out, remains, "config log: $%s\n",
3722                                      dirent->name);
3723                         out += l;
3724                         remains -= l;
3725                 }
3726                 mgs_direntry_free(dirent);
3727                 if (remains <= 0)
3728                         break;
3729         }
3730         RETURN(rc);
3731 }
3732
3733 /* from llog_swab */
3734 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3735 {
3736         int i;
3737         ENTRY;
3738
3739         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3740         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3741
3742         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3743         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3744         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3745         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3746
3747         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3748         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3749                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3750                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3751                                i, lcfg->lcfg_buflens[i],
3752                                lustre_cfg_string(lcfg, i));
3753                 }
3754         EXIT;
3755 }
3756
3757 /* Set a permanent (config log) param for a target or fs
3758  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3759  *             buf1 contains the single parameter
3760  */
3761 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3762                  struct lustre_cfg *lcfg, char *fsname)
3763 {
3764         struct fs_db *fsdb;
3765         struct mgs_target_info *mti;
3766         char *devname, *param;
3767         char *ptr;
3768         const char *tmp;
3769         __u32 index;
3770         int rc = 0;
3771         ENTRY;
3772
3773         print_lustre_cfg(lcfg);
3774
3775         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3776         devname = lustre_cfg_string(lcfg, 0);
3777         param = lustre_cfg_string(lcfg, 1);
3778         if (!devname) {
3779                 /* Assume device name embedded in param:
3780                    lustre-OST0000.osc.max_dirty_mb=32 */
3781                 ptr = strchr(param, '.');
3782                 if (ptr) {
3783                         devname = param;
3784                         *ptr = 0;
3785                         param = ptr + 1;
3786                 }
3787         }
3788         if (!devname) {
3789                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3790                 RETURN(-ENOSYS);
3791         }
3792
3793         rc = mgs_parse_devname(devname, fsname, NULL);
3794         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3795                 /* param related to llite isn't allowed to set by OST or MDT */
3796                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3797                                    sizeof(PARAM_LLITE)) == 0)
3798                         RETURN(-EINVAL);
3799         } else {
3800                 /* assume devname is the fsname */
3801                 memset(fsname, 0, MTI_NAME_MAXLEN);
3802                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3803                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3804         }
3805         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3806
3807         rc = mgs_find_or_make_fsdb(env, mgs,
3808                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3809                                    PARAMS_FILENAME : fsname, &fsdb);
3810         if (rc)
3811                 RETURN(rc);
3812
3813         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3814             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3815             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3816                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3817                        "is '%s'\n", fsname, devname);
3818                 mgs_free_fsdb(mgs, fsdb);
3819                 RETURN(-EINVAL);
3820         }
3821
3822         /* Create a fake mti to hold everything */
3823         OBD_ALLOC_PTR(mti);
3824         if (!mti)
3825                 GOTO(out, rc = -ENOMEM);
3826         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3827             >= sizeof(mti->mti_fsname))
3828                 GOTO(out, rc = -E2BIG);
3829         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3830             >= sizeof(mti->mti_svname))
3831                 GOTO(out, rc = -E2BIG);
3832         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3833             >= sizeof(mti->mti_params))
3834                 GOTO(out, rc = -E2BIG);
3835         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3836         if (rc < 0)
3837                 /* Not a valid server; may be only fsname */
3838                 rc = 0;
3839         else
3840                 /* Strip -osc or -mdc suffix from svname */
3841                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3842                                      mti->mti_svname))
3843                         GOTO(out, rc = -EINVAL);
3844         /*
3845          * Revoke lock so everyone updates.  Should be alright if
3846          * someone was already reading while we were updating the logs,
3847          * so we don't really need to hold the lock while we're
3848          * writing (above).
3849          */
3850         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3851                 mti->mti_flags = rc | LDD_F_PARAM2;
3852                 mutex_lock(&fsdb->fsdb_mutex);
3853                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3854                 mutex_unlock(&fsdb->fsdb_mutex);
3855                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3856         } else {
3857                 mti->mti_flags = rc | LDD_F_PARAM;
3858                 mutex_lock(&fsdb->fsdb_mutex);
3859                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3860                 mutex_unlock(&fsdb->fsdb_mutex);
3861                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3862         }
3863
3864 out:
3865         OBD_FREE_PTR(mti);
3866         RETURN(rc);
3867 }
3868
3869 static int mgs_write_log_pool(const struct lu_env *env,
3870                               struct mgs_device *mgs, char *logname,
3871                               struct fs_db *fsdb, char *tgtname,
3872                               enum lcfg_command_type cmd,
3873                               char *fsname, char *poolname,
3874                               char *ostname, char *comment)
3875 {
3876         struct llog_handle *llh = NULL;
3877         int rc;
3878
3879         rc = record_start_log(env, mgs, &llh, logname);
3880         if (rc)
3881                 return rc;
3882         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3883         if (rc)
3884                 goto out;
3885         rc = record_base(env, llh, tgtname, 0, cmd,
3886                          fsname, poolname, ostname, 0);
3887         if (rc)
3888                 goto out;
3889         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3890 out:
3891         record_end_log(env, &llh);
3892         return rc;
3893 }
3894
3895 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3896                  enum lcfg_command_type cmd, char *fsname,
3897                  char *poolname, char *ostname)
3898 {
3899         struct fs_db *fsdb;
3900         char *lovname;
3901         char *logname;
3902         char *label = NULL, *canceled_label = NULL;
3903         int label_sz;
3904         struct mgs_target_info *mti = NULL;
3905         int rc, i;
3906         ENTRY;
3907
3908         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3909         if (rc) {
3910                 CERROR("Can't get db for %s\n", fsname);
3911                 RETURN(rc);
3912         }
3913         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3914                 CERROR("%s is not defined\n", fsname);
3915                 mgs_free_fsdb(mgs, fsdb);
3916                 RETURN(-EINVAL);
3917         }
3918
3919         label_sz = 10 + strlen(fsname) + strlen(poolname);
3920
3921         /* check if ostname match fsname */
3922         if (ostname != NULL) {
3923                 char *ptr;
3924
3925                 ptr = strrchr(ostname, '-');
3926                 if ((ptr == NULL) ||
3927                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3928                         RETURN(-EINVAL);
3929                 label_sz += strlen(ostname);
3930         }
3931
3932         OBD_ALLOC(label, label_sz);
3933         if (label == NULL)
3934                 RETURN(-ENOMEM);
3935
3936         switch(cmd) {
3937         case LCFG_POOL_NEW:
3938                 sprintf(label,
3939                         "new %s.%s", fsname, poolname);
3940                 break;
3941         case LCFG_POOL_ADD:
3942                 sprintf(label,
3943                         "add %s.%s.%s", fsname, poolname, ostname);
3944                 break;
3945         case LCFG_POOL_REM:
3946                 OBD_ALLOC(canceled_label, label_sz);
3947                 if (canceled_label == NULL)
3948                         GOTO(out_label, rc = -ENOMEM);
3949                 sprintf(label,
3950                         "rem %s.%s.%s", fsname, poolname, ostname);
3951                 sprintf(canceled_label,
3952                         "add %s.%s.%s", fsname, poolname, ostname);
3953                 break;
3954         case LCFG_POOL_DEL:
3955                 OBD_ALLOC(canceled_label, label_sz);
3956                 if (canceled_label == NULL)
3957                         GOTO(out_label, rc = -ENOMEM);
3958                 sprintf(label,
3959                         "del %s.%s", fsname, poolname);
3960                 sprintf(canceled_label,
3961                         "new %s.%s", fsname, poolname);
3962                 break;
3963         default:
3964                 break;
3965         }
3966
3967         if (canceled_label != NULL) {
3968                 OBD_ALLOC_PTR(mti);
3969                 if (mti == NULL)
3970                         GOTO(out_cancel, rc = -ENOMEM);
3971         }
3972
3973         mutex_lock(&fsdb->fsdb_mutex);
3974         /* write pool def to all MDT logs */
3975         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3976                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3977                         rc = name_create_mdt_and_lov(&logname, &lovname,
3978                                                      fsdb, i);
3979                         if (rc) {
3980                                 mutex_unlock(&fsdb->fsdb_mutex);
3981                                 GOTO(out_mti, rc);
3982                         }
3983                         if (canceled_label != NULL) {
3984                                 strcpy(mti->mti_svname, "lov pool");
3985                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3986                                                 lovname, canceled_label,
3987                                                 CM_SKIP);
3988                         }
3989
3990                         if (rc >= 0)
3991                                 rc = mgs_write_log_pool(env, mgs, logname,
3992                                                         fsdb, lovname, cmd,
3993                                                         fsname, poolname,
3994                                                         ostname, label);
3995                         name_destroy(&logname);
3996                         name_destroy(&lovname);
3997                         if (rc) {
3998                                 mutex_unlock(&fsdb->fsdb_mutex);
3999                                 GOTO(out_mti, rc);
4000                         }
4001                 }
4002         }
4003
4004         rc = name_create(&logname, fsname, "-client");
4005         if (rc) {
4006                 mutex_unlock(&fsdb->fsdb_mutex);
4007                 GOTO(out_mti, rc);
4008         }
4009         if (canceled_label != NULL) {
4010                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4011                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4012                 if (rc < 0) {
4013                         mutex_unlock(&fsdb->fsdb_mutex);
4014                         name_destroy(&logname);
4015                         GOTO(out_mti, rc);
4016                 }
4017         }
4018
4019         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4020                                 cmd, fsname, poolname, ostname, label);
4021         mutex_unlock(&fsdb->fsdb_mutex);
4022         name_destroy(&logname);
4023         /* request for update */
4024         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4025
4026         EXIT;
4027 out_mti:
4028         if (mti != NULL)
4029                 OBD_FREE_PTR(mti);
4030 out_cancel:
4031         if (canceled_label != NULL)
4032                 OBD_FREE(canceled_label, label_sz);
4033 out_label:
4034         OBD_FREE(label, label_sz);
4035         return rc;
4036 }