Whamcloud - gitweb
f6b69ba7d4cf8a5e8dc02b9116ad9c4c9ac7fcaa
[fs/lustre-release.git] / lustre / mgs / mgs_llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mgs/mgs_llog.c
37  *
38  * Lustre Management Server (mgs) config llog creation
39  *
40  * Author: Nathan Rutman <nathan@clusterfs.com>
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mikhail Pershin <tappro@whamcloud.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MGS
46 #define D_MGS D_CONFIG
47
48 #include <obd.h>
49 #include <obd_lov.h>
50 #include <lustre_param.h>
51 #include <lustre_sec.h>
52 #include <lustre_quota.h>
53
54 #include "mgs_internal.h"
55
56 /********************** Class functions ********************/
57
58 int class_dentry_readdir(const struct lu_env *env,
59                          struct mgs_device *mgs, cfs_list_t *list)
60 {
61         struct dt_object    *dir = mgs->mgs_configs_dir;
62         const struct dt_it_ops *iops;
63         struct dt_it        *it;
64         struct mgs_direntry *de;
65         char                *key;
66         int                  rc, key_sz;
67
68         CFS_INIT_LIST_HEAD(list);
69
70         LASSERT(dir);
71         LASSERT(dir->do_index_ops);
72
73         iops = &dir->do_index_ops->dio_it;
74         it = iops->init(env, dir, LUDA_64BITHASH, BYPASS_CAPA);
75         if (IS_ERR(it))
76                 RETURN(PTR_ERR(it));
77
78         rc = iops->load(env, it, 0);
79         if (rc <= 0)
80                 GOTO(fini, rc = 0);
81
82         /* main cycle */
83         do {
84                 key = (void *)iops->key(env, it);
85                 if (IS_ERR(key)) {
86                         CERROR("%s: key failed when listing %s: rc = %d\n",
87                                mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR,
88                                (int) PTR_ERR(key));
89                         goto next;
90                 }
91                 key_sz = iops->key_size(env, it);
92                 LASSERT(key_sz > 0);
93
94                 /* filter out "." and ".." entries */
95                 if (key[0] == '.') {
96                         if (key_sz == 1)
97                                 goto next;
98                         if (key_sz == 2 && key[1] == '.')
99                                 goto next;
100                 }
101
102                 de = mgs_direntry_alloc(key_sz + 1);
103                 if (de == NULL) {
104                         rc = -ENOMEM;
105                         break;
106                 }
107
108                 memcpy(de->name, key, key_sz);
109                 de->name[key_sz] = 0;
110
111                 cfs_list_add(&de->list, list);
112
113 next:
114                 rc = iops->next(env, it);
115         } while (rc == 0);
116         rc = 0;
117
118         iops->put(env, it);
119
120 fini:
121         iops->fini(env, it);
122         if (rc)
123                 CERROR("%s: key failed when listing %s: rc = %d\n",
124                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
125         RETURN(rc);
126 }
127
128 /******************** DB functions *********************/
129
130 static inline int name_create(char **newname, char *prefix, char *suffix)
131 {
132         LASSERT(newname);
133         OBD_ALLOC(*newname, strlen(prefix) + strlen(suffix) + 1);
134         if (!*newname)
135                 return -ENOMEM;
136         sprintf(*newname, "%s%s", prefix, suffix);
137         return 0;
138 }
139
140 static inline void name_destroy(char **name)
141 {
142         if (*name)
143                 OBD_FREE(*name, strlen(*name) + 1);
144         *name = NULL;
145 }
146
147 struct mgs_fsdb_handler_data
148 {
149         struct fs_db   *fsdb;
150         __u32           ver;
151 };
152
153 /* from the (client) config log, figure out:
154         1. which ost's/mdt's are configured (by index)
155         2. what the last config step is
156         3. COMPAT_18 osc name
157 */
158 /* It might be better to have a separate db file, instead of parsing the info
159    out of the client log.  This is slow and potentially error-prone. */
160 static int mgs_fsdb_handler(const struct lu_env *env, struct llog_handle *llh,
161                             struct llog_rec_hdr *rec, void *data)
162 {
163         struct mgs_fsdb_handler_data *d = data;
164         struct fs_db *fsdb = d->fsdb;
165         int cfg_len = rec->lrh_len;
166         char *cfg_buf = (char*) (rec + 1);
167         struct lustre_cfg *lcfg;
168         __u32 index;
169         int rc = 0;
170         ENTRY;
171
172         if (rec->lrh_type != OBD_CFG_REC) {
173                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
174                 RETURN(-EINVAL);
175         }
176
177         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
178         if (rc) {
179                 CERROR("Insane cfg\n");
180                 RETURN(rc);
181         }
182
183         lcfg = (struct lustre_cfg *)cfg_buf;
184
185         CDEBUG(D_INFO, "cmd %x %s %s\n", lcfg->lcfg_command,
186                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
187
188         /* Figure out ost indicies */
189         /* lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1 */
190         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD ||
191             lcfg->lcfg_command == LCFG_LOV_DEL_OBD) {
192                 index = simple_strtoul(lustre_cfg_string(lcfg, 2),
193                                        NULL, 10);
194                 CDEBUG(D_MGS, "OST index for %s is %u (%s)\n",
195                        lustre_cfg_string(lcfg, 1), index,
196                        lustre_cfg_string(lcfg, 2));
197                 set_bit(index, fsdb->fsdb_ost_index_map);
198         }
199
200         /* Figure out mdt indicies */
201         /* attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f */
202         if ((lcfg->lcfg_command == LCFG_ATTACH) &&
203             (strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_MDC_NAME) == 0)) {
204                 rc = server_name2index(lustre_cfg_string(lcfg, 0),
205                                        &index, NULL);
206                 if (rc != LDD_F_SV_TYPE_MDT) {
207                         CWARN("Unparsable MDC name %s, assuming index 0\n",
208                               lustre_cfg_string(lcfg, 0));
209                         index = 0;
210                 }
211                 rc = 0;
212                 CDEBUG(D_MGS, "MDT index is %u\n", index);
213                 set_bit(index, fsdb->fsdb_mdt_index_map);
214                 fsdb->fsdb_mdt_count ++;
215         }
216
217         /**
218          * figure out the old config. fsdb_gen = 0 means old log
219          * It is obsoleted and not supported anymore
220          */
221         if (fsdb->fsdb_gen == 0) {
222                 CERROR("Old config format is not supported\n");
223                 RETURN(-EINVAL);
224         }
225
226         /*
227          * compat to 1.8, check osc name used by MDT0 to OSTs, bz18548.
228          */
229         if (!test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags) &&
230             lcfg->lcfg_command == LCFG_ATTACH &&
231             strcmp(lustre_cfg_string(lcfg, 1), LUSTRE_OSC_NAME) == 0) {
232                 if (OBD_OCD_VERSION_MAJOR(d->ver) == 1 &&
233                     OBD_OCD_VERSION_MINOR(d->ver) <= 8) {
234                         CWARN("MDT using 1.8 OSC name scheme\n");
235                         set_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags);
236                 }
237         }
238
239         if (lcfg->lcfg_command == LCFG_MARKER) {
240                 struct cfg_marker *marker;
241                 marker = lustre_cfg_buf(lcfg, 1);
242
243                 d->ver = marker->cm_vers;
244
245                 /* Keep track of the latest marker step */
246                 fsdb->fsdb_gen = max(fsdb->fsdb_gen, marker->cm_step);
247         }
248
249         RETURN(rc);
250 }
251
252 /* fsdb->fsdb_mutex is already held  in mgs_find_or_make_fsdb*/
253 static int mgs_get_fsdb_from_llog(const struct lu_env *env,
254                                   struct mgs_device *mgs,
255                                   struct fs_db *fsdb)
256 {
257         char                            *logname;
258         struct llog_handle              *loghandle;
259         struct llog_ctxt                *ctxt;
260         struct mgs_fsdb_handler_data     d = { fsdb, 0 };
261         int rc;
262
263         ENTRY;
264
265         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
266         LASSERT(ctxt != NULL);
267         rc = name_create(&logname, fsdb->fsdb_name, "-client");
268         if (rc)
269                 GOTO(out_put, rc);
270         rc = llog_open_create(env, ctxt, &loghandle, NULL, logname);
271         if (rc)
272                 GOTO(out_pop, rc);
273
274         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
275         if (rc)
276                 GOTO(out_close, rc);
277
278         if (llog_get_size(loghandle) <= 1)
279                 set_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
280
281         rc = llog_process(env, loghandle, mgs_fsdb_handler, (void *)&d, NULL);
282         CDEBUG(D_INFO, "get_db = %d\n", rc);
283 out_close:
284         llog_close(env, loghandle);
285 out_pop:
286         name_destroy(&logname);
287 out_put:
288         llog_ctxt_put(ctxt);
289
290         RETURN(rc);
291 }
292
293 static void mgs_free_fsdb_srpc(struct fs_db *fsdb)
294 {
295         struct mgs_tgt_srpc_conf *tgtconf;
296
297         /* free target-specific rules */
298         while (fsdb->fsdb_srpc_tgt) {
299                 tgtconf = fsdb->fsdb_srpc_tgt;
300                 fsdb->fsdb_srpc_tgt = tgtconf->mtsc_next;
301
302                 LASSERT(tgtconf->mtsc_tgt);
303
304                 sptlrpc_rule_set_free(&tgtconf->mtsc_rset);
305                 OBD_FREE(tgtconf->mtsc_tgt, strlen(tgtconf->mtsc_tgt) + 1);
306                 OBD_FREE_PTR(tgtconf);
307         }
308
309         /* free general rules */
310         sptlrpc_rule_set_free(&fsdb->fsdb_srpc_gen);
311 }
312
313 struct fs_db *mgs_find_fsdb(struct mgs_device *mgs, char *fsname)
314 {
315         struct fs_db *fsdb;
316         cfs_list_t *tmp;
317
318         cfs_list_for_each(tmp, &mgs->mgs_fs_db_list) {
319                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
320                 if (strcmp(fsdb->fsdb_name, fsname) == 0)
321                         return fsdb;
322         }
323         return NULL;
324 }
325
326 /* caller must hold the mgs->mgs_fs_db_lock */
327 static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
328                                   struct mgs_device *mgs, char *fsname)
329 {
330         struct fs_db *fsdb;
331         int rc;
332         ENTRY;
333
334         if (strlen(fsname) >= sizeof(fsdb->fsdb_name)) {
335                 CERROR("fsname %s is too long\n", fsname);
336                 RETURN(NULL);
337         }
338
339         OBD_ALLOC_PTR(fsdb);
340         if (!fsdb)
341                 RETURN(NULL);
342
343         strcpy(fsdb->fsdb_name, fsname);
344         mutex_init(&fsdb->fsdb_mutex);
345         set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
346         fsdb->fsdb_gen = 1;
347
348         if (strcmp(fsname, MGSSELF_NAME) == 0) {
349                 set_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags);
350         } else {
351                 OBD_ALLOC(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
352                 OBD_ALLOC(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
353                 if (!fsdb->fsdb_ost_index_map || !fsdb->fsdb_mdt_index_map) {
354                         CERROR("No memory for index maps\n");
355                         GOTO(err, rc = -ENOMEM);
356                 }
357
358                 rc = name_create(&fsdb->fsdb_clilov, fsname, "-clilov");
359                 if (rc)
360                         GOTO(err, rc);
361                 rc = name_create(&fsdb->fsdb_clilmv, fsname, "-clilmv");
362                 if (rc)
363                         GOTO(err, rc);
364
365                 /* initialise data for NID table */
366                 mgs_ir_init_fs(env, mgs, fsdb);
367
368                 lproc_mgs_add_live(mgs, fsdb);
369         }
370
371         cfs_list_add(&fsdb->fsdb_list, &mgs->mgs_fs_db_list);
372
373         RETURN(fsdb);
374 err:
375         if (fsdb->fsdb_ost_index_map)
376                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
377         if (fsdb->fsdb_mdt_index_map)
378                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
379         name_destroy(&fsdb->fsdb_clilov);
380         name_destroy(&fsdb->fsdb_clilmv);
381         OBD_FREE_PTR(fsdb);
382         RETURN(NULL);
383 }
384
385 static void mgs_free_fsdb(struct mgs_device *mgs, struct fs_db *fsdb)
386 {
387         /* wait for anyone with the sem */
388         mutex_lock(&fsdb->fsdb_mutex);
389         lproc_mgs_del_live(mgs, fsdb);
390         cfs_list_del(&fsdb->fsdb_list);
391
392         /* deinitialize fsr */
393         mgs_ir_fini_fs(mgs, fsdb);
394
395         if (fsdb->fsdb_ost_index_map)
396                 OBD_FREE(fsdb->fsdb_ost_index_map, INDEX_MAP_SIZE);
397         if (fsdb->fsdb_mdt_index_map)
398                 OBD_FREE(fsdb->fsdb_mdt_index_map, INDEX_MAP_SIZE);
399         name_destroy(&fsdb->fsdb_clilov);
400         name_destroy(&fsdb->fsdb_clilmv);
401         mgs_free_fsdb_srpc(fsdb);
402         mutex_unlock(&fsdb->fsdb_mutex);
403         OBD_FREE_PTR(fsdb);
404 }
405
406 int mgs_init_fsdb_list(struct mgs_device *mgs)
407 {
408         CFS_INIT_LIST_HEAD(&mgs->mgs_fs_db_list);
409         return 0;
410 }
411
412 int mgs_cleanup_fsdb_list(struct mgs_device *mgs)
413 {
414         struct fs_db *fsdb;
415         cfs_list_t *tmp, *tmp2;
416         mutex_lock(&mgs->mgs_mutex);
417         cfs_list_for_each_safe(tmp, tmp2, &mgs->mgs_fs_db_list) {
418                 fsdb = cfs_list_entry(tmp, struct fs_db, fsdb_list);
419                 mgs_free_fsdb(mgs, fsdb);
420         }
421         mutex_unlock(&mgs->mgs_mutex);
422         return 0;
423 }
424
425 int mgs_find_or_make_fsdb(const struct lu_env *env,
426                           struct mgs_device *mgs, char *name,
427                           struct fs_db **dbh)
428 {
429         struct fs_db *fsdb;
430         int rc = 0;
431
432         ENTRY;
433         mutex_lock(&mgs->mgs_mutex);
434         fsdb = mgs_find_fsdb(mgs, name);
435         if (fsdb) {
436                 mutex_unlock(&mgs->mgs_mutex);
437                 *dbh = fsdb;
438                 RETURN(0);
439         }
440
441         CDEBUG(D_MGS, "Creating new db\n");
442         fsdb = mgs_new_fsdb(env, mgs, name);
443         /* lock fsdb_mutex until the db is loaded from llogs */
444         if (fsdb)
445                 mutex_lock(&fsdb->fsdb_mutex);
446         mutex_unlock(&mgs->mgs_mutex);
447         if (!fsdb)
448                 RETURN(-ENOMEM);
449
450         if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
451                 /* populate the db from the client llog */
452                 rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
453                 if (rc) {
454                         CERROR("Can't get db from client log %d\n", rc);
455                         GOTO(out_free, rc);
456                 }
457         }
458
459         /* populate srpc rules from params llog */
460         rc = mgs_get_fsdb_srpc_from_llog(env, mgs, fsdb);
461         if (rc) {
462                 CERROR("Can't get db from params log %d\n", rc);
463                 GOTO(out_free, rc);
464         }
465
466         mutex_unlock(&fsdb->fsdb_mutex);
467         *dbh = fsdb;
468
469         RETURN(0);
470
471 out_free:
472         mutex_unlock(&fsdb->fsdb_mutex);
473         mgs_free_fsdb(mgs, fsdb);
474         return rc;
475 }
476
477 /* 1 = index in use
478    0 = index unused
479    -1= empty client log */
480 int mgs_check_index(const struct lu_env *env,
481                     struct mgs_device *mgs,
482                     struct mgs_target_info *mti)
483 {
484         struct fs_db *fsdb;
485         void *imap;
486         int rc = 0;
487         ENTRY;
488
489         LASSERT(!(mti->mti_flags & LDD_F_NEED_INDEX));
490
491         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
492         if (rc) {
493                 CERROR("Can't get db for %s\n", mti->mti_fsname);
494                 RETURN(rc);
495         }
496
497         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags))
498                 RETURN(-1);
499
500         if (mti->mti_flags & LDD_F_SV_TYPE_OST)
501                 imap = fsdb->fsdb_ost_index_map;
502         else if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
503                 imap = fsdb->fsdb_mdt_index_map;
504         else
505                 RETURN(-EINVAL);
506
507         if (test_bit(mti->mti_stripe_index, imap))
508                 RETURN(1);
509         RETURN(0);
510 }
511
512 static __inline__ int next_index(void *index_map, int map_len)
513 {
514         int i;
515         for (i = 0; i < map_len * 8; i++)
516                 if (!test_bit(i, index_map)) {
517                          return i;
518                  }
519         CERROR("max index %d exceeded.\n", i);
520         return -1;
521 }
522
523 /* Return codes:
524         0  newly marked as in use
525         <0 err
526         +EALREADY for update of an old index */
527 static int mgs_set_index(const struct lu_env *env,
528                          struct mgs_device *mgs,
529                          struct mgs_target_info *mti)
530 {
531         struct fs_db *fsdb;
532         void *imap;
533         int rc = 0;
534         ENTRY;
535
536         rc = mgs_find_or_make_fsdb(env, mgs, mti->mti_fsname, &fsdb);
537         if (rc) {
538                 CERROR("Can't get db for %s\n", mti->mti_fsname);
539                 RETURN(rc);
540         }
541
542         mutex_lock(&fsdb->fsdb_mutex);
543         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
544                 imap = fsdb->fsdb_ost_index_map;
545         } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
546                 imap = fsdb->fsdb_mdt_index_map;
547         } else {
548                 GOTO(out_up, rc = -EINVAL);
549         }
550
551         if (mti->mti_flags & LDD_F_NEED_INDEX) {
552                 rc = next_index(imap, INDEX_MAP_SIZE);
553                 if (rc == -1)
554                         GOTO(out_up, rc = -ERANGE);
555                 mti->mti_stripe_index = rc;
556                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT)
557                         fsdb->fsdb_mdt_count ++;
558         }
559
560         if (mti->mti_stripe_index >= INDEX_MAP_SIZE * 8) {
561                 LCONSOLE_ERROR_MSG(0x13f, "Server %s requested index %d, "
562                                    "but the max index is %d.\n",
563                                    mti->mti_svname, mti->mti_stripe_index,
564                                    INDEX_MAP_SIZE * 8);
565                 GOTO(out_up, rc = -ERANGE);
566         }
567
568         if (test_bit(mti->mti_stripe_index, imap)) {
569                 if ((mti->mti_flags & LDD_F_VIRGIN) &&
570                     !(mti->mti_flags & LDD_F_WRITECONF)) {
571                         LCONSOLE_ERROR_MSG(0x140, "Server %s requested index "
572                                            "%d, but that index is already in "
573                                            "use. Use --writeconf to force\n",
574                                            mti->mti_svname,
575                                            mti->mti_stripe_index);
576                         GOTO(out_up, rc = -EADDRINUSE);
577                 } else {
578                         CDEBUG(D_MGS, "Server %s updating index %d\n",
579                                mti->mti_svname, mti->mti_stripe_index);
580                         GOTO(out_up, rc = EALREADY);
581                 }
582         }
583
584         set_bit(mti->mti_stripe_index, imap);
585         clear_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags);
586         mutex_unlock(&fsdb->fsdb_mutex);
587         server_make_name(mti->mti_flags & ~(LDD_F_VIRGIN | LDD_F_WRITECONF),
588                          mti->mti_stripe_index, mti->mti_fsname, mti->mti_svname);
589
590         CDEBUG(D_MGS, "Set index for %s to %d\n", mti->mti_svname,
591                mti->mti_stripe_index);
592
593         RETURN(0);
594 out_up:
595         mutex_unlock(&fsdb->fsdb_mutex);
596         return rc;
597 }
598
599 struct mgs_modify_lookup {
600         struct cfg_marker mml_marker;
601         int               mml_modified;
602 };
603
604 static int mgs_modify_handler(const struct lu_env *env,
605                               struct llog_handle *llh,
606                               struct llog_rec_hdr *rec, void *data)
607 {
608         struct mgs_modify_lookup *mml = data;
609         struct cfg_marker *marker;
610         struct lustre_cfg *lcfg = REC_DATA(rec);
611         int cfg_len = REC_DATA_LEN(rec);
612         int rc;
613         ENTRY;
614
615         if (rec->lrh_type != OBD_CFG_REC) {
616                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
617                 RETURN(-EINVAL);
618         }
619
620         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
621         if (rc) {
622                 CERROR("Insane cfg\n");
623                 RETURN(rc);
624         }
625
626         /* We only care about markers */
627         if (lcfg->lcfg_command != LCFG_MARKER)
628                 RETURN(0);
629
630         marker = lustre_cfg_buf(lcfg, 1);
631         if ((strcmp(mml->mml_marker.cm_comment, marker->cm_comment) == 0) &&
632             (strcmp(mml->mml_marker.cm_tgtname, marker->cm_tgtname) == 0) &&
633             !(marker->cm_flags & CM_SKIP)) {
634                 /* Found a non-skipped marker match */
635                 CDEBUG(D_MGS, "Changing rec %u marker %d %x->%x: %s %s\n",
636                        rec->lrh_index, marker->cm_step,
637                        marker->cm_flags, mml->mml_marker.cm_flags,
638                        marker->cm_tgtname, marker->cm_comment);
639                 /* Overwrite the old marker llog entry */
640                 marker->cm_flags &= ~CM_EXCLUDE; /* in case we're unexcluding */
641                 marker->cm_flags |= mml->mml_marker.cm_flags;
642                 marker->cm_canceltime = mml->mml_marker.cm_canceltime;
643                 /* Header and tail are added back to lrh_len in
644                    llog_lvfs_write_rec */
645                 rec->lrh_len = cfg_len;
646                 rc = llog_write(env, llh, rec, NULL, 0, (void *)lcfg,
647                                 rec->lrh_index);
648                 if (!rc)
649                          mml->mml_modified++;
650         }
651
652         RETURN(rc);
653 }
654
655 /**
656  * Modify an existing config log record (for CM_SKIP or CM_EXCLUDE)
657  * Return code:
658  * 0 - modified successfully,
659  * 1 - no modification was done
660  * negative - error
661  */
662 static int mgs_modify(const struct lu_env *env, struct mgs_device *mgs,
663                       struct fs_db *fsdb, struct mgs_target_info *mti,
664                       char *logname, char *devname, char *comment, int flags)
665 {
666         struct llog_handle *loghandle;
667         struct llog_ctxt *ctxt;
668         struct mgs_modify_lookup *mml;
669         int rc;
670
671         ENTRY;
672
673         LASSERT(mutex_is_locked(&fsdb->fsdb_mutex));
674         CDEBUG(D_MGS, "modify %s/%s/%s fl=%x\n", logname, devname, comment,
675                flags);
676
677         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
678         LASSERT(ctxt != NULL);
679         rc = llog_open(env, ctxt, &loghandle, NULL, logname, LLOG_OPEN_EXISTS);
680         if (rc < 0) {
681                 if (rc == -ENOENT)
682                         rc = 0;
683                 GOTO(out_pop, rc);
684         }
685
686         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
687         if (rc)
688                 GOTO(out_close, rc);
689
690         if (llog_get_size(loghandle) <= 1)
691                 GOTO(out_close, rc = 0);
692
693         OBD_ALLOC_PTR(mml);
694         if (!mml)
695                 GOTO(out_close, rc = -ENOMEM);
696         if (strlcpy(mml->mml_marker.cm_comment, comment,
697                     sizeof(mml->mml_marker.cm_comment)) >=
698             sizeof(mml->mml_marker.cm_comment))
699                 GOTO(out_free, rc = -E2BIG);
700         if (strlcpy(mml->mml_marker.cm_tgtname, devname,
701                     sizeof(mml->mml_marker.cm_tgtname)) >=
702             sizeof(mml->mml_marker.cm_tgtname))
703                 GOTO(out_free, rc = -E2BIG);
704         /* Modify mostly means cancel */
705         mml->mml_marker.cm_flags = flags;
706         mml->mml_marker.cm_canceltime = flags ? cfs_time_current_sec() : 0;
707         mml->mml_modified = 0;
708         rc = llog_process(env, loghandle, mgs_modify_handler, (void *)mml,
709                           NULL);
710         if (!rc && !mml->mml_modified)
711                 rc = 1;
712
713 out_free:
714         OBD_FREE_PTR(mml);
715
716 out_close:
717         llog_close(env, loghandle);
718 out_pop:
719         if (rc < 0)
720                 CERROR("%s: modify %s/%s failed: rc = %d\n",
721                        mgs->mgs_obd->obd_name, mti->mti_svname, comment, rc);
722         llog_ctxt_put(ctxt);
723         RETURN(rc);
724 }
725
726 /** This structure is passed to mgs_replace_handler */
727 struct mgs_replace_uuid_lookup {
728         /* Nids are replaced for this target device */
729         struct mgs_target_info target;
730         /* Temporary modified llog */
731         struct llog_handle *temp_llh;
732         /* Flag is set if in target block*/
733         int in_target_device;
734         /* Nids already added. Just skip (multiple nids) */
735         int device_nids_added;
736         /* Flag is set if this block should not be copied */
737         int skip_it;
738 };
739
740 /**
741  * Check: a) if block should be skipped
742  * b) is it target block
743  *
744  * \param[in] lcfg
745  * \param[in] mrul
746  *
747  * \retval 0 should not to be skipped
748  * \retval 1 should to be skipped
749  */
750 static int check_markers(struct lustre_cfg *lcfg,
751                          struct mgs_replace_uuid_lookup *mrul)
752 {
753          struct cfg_marker *marker;
754
755         /* Track markers. Find given device */
756         if (lcfg->lcfg_command == LCFG_MARKER) {
757                 marker = lustre_cfg_buf(lcfg, 1);
758                 /* Clean llog from records marked as CM_EXCLUDE.
759                    CM_SKIP records are used for "active" command
760                    and can be restored if needed */
761                 if ((marker->cm_flags & (CM_EXCLUDE | CM_START)) ==
762                     (CM_EXCLUDE | CM_START)) {
763                         mrul->skip_it = 1;
764                         return 1;
765                 }
766
767                 if ((marker->cm_flags & (CM_EXCLUDE | CM_END)) ==
768                     (CM_EXCLUDE | CM_END)) {
769                         mrul->skip_it = 0;
770                         return 1;
771                 }
772
773                 if (strcmp(mrul->target.mti_svname, marker->cm_tgtname) == 0) {
774                         LASSERT(!(marker->cm_flags & CM_START) ||
775                                 !(marker->cm_flags & CM_END));
776                         if (marker->cm_flags & CM_START) {
777                                 mrul->in_target_device = 1;
778                                 mrul->device_nids_added = 0;
779                         } else if (marker->cm_flags & CM_END)
780                                 mrul->in_target_device = 0;
781                 }
782         }
783
784         return 0;
785 }
786
787 static int record_lcfg(const struct lu_env *env, struct llog_handle *llh,
788                        struct lustre_cfg *lcfg)
789 {
790         struct llog_rec_hdr      rec;
791         int                      buflen, rc;
792
793         if (!lcfg || !llh)
794                 return -ENOMEM;
795
796         LASSERT(llh->lgh_ctxt);
797
798         buflen = lustre_cfg_len(lcfg->lcfg_bufcount,
799                                 lcfg->lcfg_buflens);
800         rec.lrh_len = llog_data_len(buflen);
801         rec.lrh_type = OBD_CFG_REC;
802
803         /* idx = -1 means append */
804         rc = llog_write(env, llh, &rec, NULL, 0, (void *)lcfg, -1);
805         if (rc)
806                 CERROR("failed %d\n", rc);
807         return rc;
808 }
809
810 static int record_base(const struct lu_env *env, struct llog_handle *llh,
811                      char *cfgname, lnet_nid_t nid, int cmd,
812                      char *s1, char *s2, char *s3, char *s4)
813 {
814         struct mgs_thread_info *mgi = mgs_env_info(env);
815         struct lustre_cfg     *lcfg;
816         int rc;
817
818         CDEBUG(D_MGS, "lcfg %s %#x %s %s %s %s\n", cfgname,
819                cmd, s1, s2, s3, s4);
820
821         lustre_cfg_bufs_reset(&mgi->mgi_bufs, cfgname);
822         if (s1)
823                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, s1);
824         if (s2)
825                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, s2);
826         if (s3)
827                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 3, s3);
828         if (s4)
829                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 4, s4);
830
831         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
832         if (!lcfg)
833                 return -ENOMEM;
834         lcfg->lcfg_nid = nid;
835
836         rc = record_lcfg(env, llh, lcfg);
837
838         lustre_cfg_free(lcfg);
839
840         if (rc) {
841                 CERROR("error %d: lcfg %s %#x %s %s %s %s\n", rc, cfgname,
842                        cmd, s1, s2, s3, s4);
843         }
844         return rc;
845 }
846
847 static inline int record_add_uuid(const struct lu_env *env,
848                                   struct llog_handle *llh,
849                                   uint64_t nid, char *uuid)
850 {
851         return record_base(env, llh, NULL, nid, LCFG_ADD_UUID, uuid, 0, 0, 0);
852 }
853
854 static inline int record_add_conn(const struct lu_env *env,
855                                   struct llog_handle *llh,
856                                   char *devname, char *uuid)
857 {
858         return record_base(env, llh, devname, 0, LCFG_ADD_CONN, uuid, 0, 0, 0);
859 }
860
861 static inline int record_attach(const struct lu_env *env,
862                                 struct llog_handle *llh, char *devname,
863                                 char *type, char *uuid)
864 {
865         return record_base(env, llh,devname, 0, LCFG_ATTACH, type, uuid, 0, 0);
866 }
867
868 static inline int record_setup(const struct lu_env *env,
869                                struct llog_handle *llh, char *devname,
870                                char *s1, char *s2, char *s3, char *s4)
871 {
872         return record_base(env, llh, devname, 0, LCFG_SETUP, s1, s2, s3, s4);
873 }
874
875 /**
876  * \retval <0 record processing error
877  * \retval n record is processed. No need copy original one.
878  * \retval 0 record is not processed.
879  */
880 static int process_command(const struct lu_env *env, struct lustre_cfg *lcfg,
881                            struct mgs_replace_uuid_lookup *mrul)
882 {
883         int nids_added = 0;
884         lnet_nid_t nid;
885         char *ptr;
886         int rc;
887
888         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
889                 /* LCFG_ADD_UUID command found. Let's skip original command
890                    and add passed nids */
891                 ptr = mrul->target.mti_params;
892                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
893                         CDEBUG(D_MGS, "add nid %s with uuid %s, "
894                                "device %s\n", libcfs_nid2str(nid),
895                                 mrul->target.mti_params,
896                                 mrul->target.mti_svname);
897                         rc = record_add_uuid(env,
898                                              mrul->temp_llh, nid,
899                                              mrul->target.mti_params);
900                         if (!rc)
901                                 nids_added++;
902                 }
903
904                 if (nids_added == 0) {
905                         CERROR("No new nids were added, nid %s with uuid %s, "
906                                "device %s\n", libcfs_nid2str(nid),
907                                mrul->target.mti_params,
908                                mrul->target.mti_svname);
909                         RETURN(-ENXIO);
910                 } else {
911                         mrul->device_nids_added = 1;
912                 }
913
914                 return nids_added;
915         }
916
917         if (mrul->device_nids_added && lcfg->lcfg_command == LCFG_SETUP) {
918                 /* LCFG_SETUP command found. UUID should be changed */
919                 rc = record_setup(env,
920                                   mrul->temp_llh,
921                                   /* devname the same */
922                                   lustre_cfg_string(lcfg, 0),
923                                   /* s1 is not changed */
924                                   lustre_cfg_string(lcfg, 1),
925                                   /* new uuid should be
926                                   the full nidlist */
927                                   mrul->target.mti_params,
928                                   /* s3 is not changed */
929                                   lustre_cfg_string(lcfg, 3),
930                                   /* s4 is not changed */
931                                   lustre_cfg_string(lcfg, 4));
932                 return rc ? rc : 1;
933         }
934
935         /* Another commands in target device block */
936         return 0;
937 }
938
939 /**
940  * Handler that called for every record in llog.
941  * Records are processed in order they placed in llog.
942  *
943  * \param[in] llh       log to be processed
944  * \param[in] rec       current record
945  * \param[in] data      mgs_replace_uuid_lookup structure
946  *
947  * \retval 0    success
948  */
949 static int mgs_replace_handler(const struct lu_env *env,
950                                struct llog_handle *llh,
951                                struct llog_rec_hdr *rec,
952                                void *data)
953 {
954         struct mgs_replace_uuid_lookup *mrul;
955         struct lustre_cfg *lcfg = REC_DATA(rec);
956         int cfg_len = REC_DATA_LEN(rec);
957         int rc;
958         ENTRY;
959
960         mrul = (struct mgs_replace_uuid_lookup *)data;
961
962         if (rec->lrh_type != OBD_CFG_REC) {
963                 CERROR("unhandled lrh_type: %#x, cmd %x %s %s\n",
964                        rec->lrh_type, lcfg->lcfg_command,
965                        lustre_cfg_string(lcfg, 0),
966                        lustre_cfg_string(lcfg, 1));
967                 RETURN(-EINVAL);
968         }
969
970         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
971         if (rc) {
972                 /* Do not copy any invalidated records */
973                 GOTO(skip_out, rc = 0);
974         }
975
976         rc = check_markers(lcfg, mrul);
977         if (rc || mrul->skip_it)
978                 GOTO(skip_out, rc = 0);
979
980         /* Write to new log all commands outside target device block */
981         if (!mrul->in_target_device)
982                 GOTO(copy_out, rc = 0);
983
984         /* Skip all other LCFG_ADD_UUID and LCFG_ADD_CONN records
985            (failover nids) for this target, assuming that if then
986            primary is changing then so is the failover */
987         if (mrul->device_nids_added &&
988             (lcfg->lcfg_command == LCFG_ADD_UUID ||
989              lcfg->lcfg_command == LCFG_ADD_CONN))
990                 GOTO(skip_out, rc = 0);
991
992         rc = process_command(env, lcfg, mrul);
993         if (rc < 0)
994                 RETURN(rc);
995
996         if (rc)
997                 RETURN(0);
998 copy_out:
999         /* Record is placed in temporary llog as is */
1000         rc = llog_write(env, mrul->temp_llh, rec, NULL, 0, NULL, -1);
1001
1002         CDEBUG(D_MGS, "Copied idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1003                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1004                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1005         RETURN(rc);
1006
1007 skip_out:
1008         CDEBUG(D_MGS, "Skipped idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
1009                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
1010                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
1011         RETURN(rc);
1012 }
1013
1014 static int mgs_log_is_empty(const struct lu_env *env,
1015                             struct mgs_device *mgs, char *name)
1016 {
1017         struct llog_ctxt        *ctxt;
1018         int                      rc;
1019
1020         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1021         LASSERT(ctxt != NULL);
1022
1023         rc = llog_is_empty(env, ctxt, name);
1024         llog_ctxt_put(ctxt);
1025         return rc;
1026 }
1027
1028 static int mgs_replace_nids_log(const struct lu_env *env,
1029                                 struct obd_device *mgs, struct fs_db *fsdb,
1030                                 char *logname, char *devname, char *nids)
1031 {
1032         struct llog_handle *orig_llh, *backup_llh;
1033         struct llog_ctxt *ctxt;
1034         struct mgs_replace_uuid_lookup *mrul;
1035         struct mgs_device *mgs_dev = lu2mgs_dev(mgs->obd_lu_dev);
1036         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1037         char *backup;
1038         int rc, rc2;
1039         ENTRY;
1040
1041         CDEBUG(D_MGS, "Replace nids for %s in %s\n", devname, logname);
1042
1043         ctxt = llog_get_context(mgs, LLOG_CONFIG_ORIG_CTXT);
1044         LASSERT(ctxt != NULL);
1045
1046         if (mgs_log_is_empty(env, mgs_dev, logname)) {
1047                 /* Log is empty. Nothing to replace */
1048                 GOTO(out_put, rc = 0);
1049         }
1050
1051         OBD_ALLOC(backup, strlen(logname) + strlen(".bak") + 1);
1052         if (backup == NULL)
1053                 GOTO(out_put, rc = -ENOMEM);
1054
1055         sprintf(backup, "%s.bak", logname);
1056
1057         rc = llog_backup(env, mgs, ctxt, ctxt, logname, backup);
1058         if (rc == 0) {
1059                 /* Now erase original log file. Connections are not allowed.
1060                    Backup is already saved */
1061                 rc = llog_erase(env, ctxt, NULL, logname);
1062                 if (rc < 0)
1063                         GOTO(out_free, rc);
1064         } else if (rc != -ENOENT) {
1065                 CERROR("%s: can't make backup for %s: rc = %d\n",
1066                        mgs->obd_name, logname, rc);
1067                 GOTO(out_free,rc);
1068         }
1069
1070         /* open local log */
1071         rc = llog_open_create(env, ctxt, &orig_llh, NULL, logname);
1072         if (rc)
1073                 GOTO(out_restore, rc);
1074
1075         rc = llog_init_handle(env, orig_llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1076         if (rc)
1077                 GOTO(out_closel, rc);
1078
1079         /* open backup llog */
1080         rc = llog_open(env, ctxt, &backup_llh, NULL, backup,
1081                        LLOG_OPEN_EXISTS);
1082         if (rc)
1083                 GOTO(out_closel, rc);
1084
1085         rc = llog_init_handle(env, backup_llh, LLOG_F_IS_PLAIN, NULL);
1086         if (rc)
1087                 GOTO(out_close, rc);
1088
1089         if (llog_get_size(backup_llh) <= 1)
1090                 GOTO(out_close, rc = 0);
1091
1092         OBD_ALLOC_PTR(mrul);
1093         if (!mrul)
1094                 GOTO(out_close, rc = -ENOMEM);
1095         /* devname is only needed information to replace UUID records */
1096         strncpy(mrul->target.mti_svname, devname, MTI_NAME_MAXLEN);
1097         /* parse nids later */
1098         strncpy(mrul->target.mti_params, nids, MTI_PARAM_MAXLEN);
1099         /* Copy records to this temporary llog */
1100         mrul->temp_llh = orig_llh;
1101
1102         rc = llog_process(env, backup_llh, mgs_replace_handler,
1103                           (void *)mrul, NULL);
1104         OBD_FREE_PTR(mrul);
1105 out_close:
1106         rc2 = llog_close(NULL, backup_llh);
1107         if (!rc)
1108                 rc = rc2;
1109 out_closel:
1110         rc2 = llog_close(NULL, orig_llh);
1111         if (!rc)
1112                 rc = rc2;
1113
1114 out_restore:
1115         if (rc) {
1116                 CERROR("%s: llog should be restored: rc = %d\n",
1117                        mgs->obd_name, rc);
1118                 rc2 = llog_backup(env, mgs, ctxt, ctxt, backup,
1119                                   logname);
1120                 if (rc2 < 0)
1121                         CERROR("%s: can't restore backup %s: rc = %d\n",
1122                                mgs->obd_name, logname, rc2);
1123         }
1124
1125 out_free:
1126         OBD_FREE(backup, strlen(backup) + 1);
1127
1128 out_put:
1129         llog_ctxt_put(ctxt);
1130
1131         if (rc)
1132                 CERROR("%s: failed to replace nids in log %s: rc = %d\n",
1133                        mgs->obd_name, logname, rc);
1134
1135         RETURN(rc);
1136 }
1137
1138 /**
1139  * Parse device name and get file system name and/or device index
1140  *
1141  * \param[in]   devname device name (ex. lustre-MDT0000)
1142  * \param[out]  fsname  file system name(optional)
1143  * \param[out]  index   device index(optional)
1144  *
1145  * \retval 0    success
1146  */
1147 static int mgs_parse_devname(char *devname, char *fsname, __u32 *index)
1148 {
1149         int rc;
1150         ENTRY;
1151
1152         /* Extract fsname */
1153         if (fsname) {
1154                 rc = server_name2fsname(devname, fsname, NULL);
1155                 if (rc < 0) {
1156                         CDEBUG(D_MGS, "Device name %s without fsname\n",
1157                                devname);
1158                         RETURN(-EINVAL);
1159                 }
1160         }
1161
1162         if (index) {
1163                 rc = server_name2index(devname, index, NULL);
1164                 if (rc < 0) {
1165                         CDEBUG(D_MGS, "Device name %s with wrong index\n",
1166                                devname);
1167                         RETURN(-EINVAL);
1168                 }
1169         }
1170
1171         RETURN(0);
1172 }
1173
1174 static int only_mgs_is_running(struct obd_device *mgs_obd)
1175 {
1176         /* TDB: Is global variable with devices count exists? */
1177         int num_devices = get_devices_count();
1178         /* osd, MGS and MGC + self_export
1179            (wc -l /proc/fs/lustre/devices <= 2) && (num_exports <= 2) */
1180         return (num_devices <= 3) && (mgs_obd->obd_num_exports <= 2);
1181 }
1182
1183 static int name_create_mdt(char **logname, char *fsname, int i)
1184 {
1185         char mdt_index[9];
1186
1187         sprintf(mdt_index, "-MDT%04x", i);
1188         return name_create(logname, fsname, mdt_index);
1189 }
1190
1191 /**
1192  * Replace nids for \a device to \a nids values
1193  *
1194  * \param obd           MGS obd device
1195  * \param devname       nids need to be replaced for this device
1196  * (ex. lustre-OST0000)
1197  * \param nids          nids list (ex. nid1,nid2,nid3)
1198  *
1199  * \retval 0    success
1200  */
1201 int mgs_replace_nids(const struct lu_env *env,
1202                      struct mgs_device *mgs,
1203                      char *devname, char *nids)
1204 {
1205         /* Assume fsname is part of device name */
1206         char fsname[MTI_NAME_MAXLEN];
1207         int rc;
1208         __u32 index;
1209         char *logname;
1210         struct fs_db *fsdb;
1211         unsigned int i;
1212         int conn_state;
1213         struct obd_device *mgs_obd = mgs->mgs_obd;
1214         ENTRY;
1215
1216         /* We can only change NIDs if no other nodes are connected */
1217         spin_lock(&mgs_obd->obd_dev_lock);
1218         conn_state = mgs_obd->obd_no_conn;
1219         mgs_obd->obd_no_conn = 1;
1220         spin_unlock(&mgs_obd->obd_dev_lock);
1221
1222         /* We can not change nids if not only MGS is started */
1223         if (!only_mgs_is_running(mgs_obd)) {
1224                 CERROR("Only MGS is allowed to be started\n");
1225                 GOTO(out, rc = -EINPROGRESS);
1226         }
1227
1228         /* Get fsname and index*/
1229         rc = mgs_parse_devname(devname, fsname, &index);
1230         if (rc)
1231                 GOTO(out, rc);
1232
1233         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
1234         if (rc) {
1235                 CERROR("%s: can't find fsdb: rc = %d\n", fsname, rc);
1236                 GOTO(out, rc);
1237         }
1238
1239         /* Process client llogs */
1240         name_create(&logname, fsname, "-client");
1241         rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1242         name_destroy(&logname);
1243         if (rc) {
1244                 CERROR("%s: error while replacing NIDs for %s: rc = %d\n",
1245                        fsname, devname, rc);
1246                 GOTO(out, rc);
1247         }
1248
1249         /* Process MDT llogs */
1250         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
1251                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
1252                         continue;
1253                 name_create_mdt(&logname, fsname, i);
1254                 rc = mgs_replace_nids_log(env, mgs_obd, fsdb, logname, devname, nids);
1255                 name_destroy(&logname);
1256                 if (rc)
1257                         GOTO(out, rc);
1258         }
1259
1260 out:
1261         spin_lock(&mgs_obd->obd_dev_lock);
1262         mgs_obd->obd_no_conn = conn_state;
1263         spin_unlock(&mgs_obd->obd_dev_lock);
1264
1265         RETURN(rc);
1266 }
1267
1268 static int record_lov_setup(const struct lu_env *env, struct llog_handle *llh,
1269                             char *devname, struct lov_desc *desc)
1270 {
1271         struct mgs_thread_info *mgi = mgs_env_info(env);
1272         struct lustre_cfg *lcfg;
1273         int rc;
1274
1275         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1276         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1277         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1278         if (!lcfg)
1279                 return -ENOMEM;
1280         rc = record_lcfg(env, llh, lcfg);
1281
1282         lustre_cfg_free(lcfg);
1283         return rc;
1284 }
1285
1286 static int record_lmv_setup(const struct lu_env *env, struct llog_handle *llh,
1287                             char *devname, struct lmv_desc *desc)
1288 {
1289         struct mgs_thread_info *mgi = mgs_env_info(env);
1290         struct lustre_cfg *lcfg;
1291         int rc;
1292
1293         lustre_cfg_bufs_reset(&mgi->mgi_bufs, devname);
1294         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, desc, sizeof(*desc));
1295         lcfg = lustre_cfg_new(LCFG_SETUP, &mgi->mgi_bufs);
1296
1297         rc = record_lcfg(env, llh, lcfg);
1298
1299         lustre_cfg_free(lcfg);
1300         return rc;
1301 }
1302
1303 static inline int record_mdc_add(const struct lu_env *env,
1304                                  struct llog_handle *llh,
1305                                  char *logname, char *mdcuuid,
1306                                  char *mdtuuid, char *index,
1307                                  char *gen)
1308 {
1309         return record_base(env,llh,logname,0,LCFG_ADD_MDC,
1310                            mdtuuid,index,gen,mdcuuid);
1311 }
1312
1313 static inline int record_lov_add(const struct lu_env *env,
1314                                  struct llog_handle *llh,
1315                                  char *lov_name, char *ost_uuid,
1316                                  char *index, char *gen)
1317 {
1318         return record_base(env,llh,lov_name,0,LCFG_LOV_ADD_OBD,
1319                            ost_uuid, index, gen, 0);
1320 }
1321
1322 static inline int record_mount_opt(const struct lu_env *env,
1323                                    struct llog_handle *llh,
1324                                    char *profile, char *lov_name,
1325                                    char *mdc_name)
1326 {
1327         return record_base(env,llh,NULL,0,LCFG_MOUNTOPT,
1328                            profile,lov_name,mdc_name,0);
1329 }
1330
1331 static int record_marker(const struct lu_env *env,
1332                          struct llog_handle *llh,
1333                          struct fs_db *fsdb, __u32 flags,
1334                          char *tgtname, char *comment)
1335 {
1336         struct mgs_thread_info *mgi = mgs_env_info(env);
1337         struct lustre_cfg *lcfg;
1338         int rc;
1339         int cplen = 0;
1340
1341         if (flags & CM_START)
1342                 fsdb->fsdb_gen++;
1343         mgi->mgi_marker.cm_step = fsdb->fsdb_gen;
1344         mgi->mgi_marker.cm_flags = flags;
1345         mgi->mgi_marker.cm_vers = LUSTRE_VERSION_CODE;
1346         cplen = strlcpy(mgi->mgi_marker.cm_tgtname, tgtname,
1347                         sizeof(mgi->mgi_marker.cm_tgtname));
1348         if (cplen >= sizeof(mgi->mgi_marker.cm_tgtname))
1349                 return -E2BIG;
1350         cplen = strlcpy(mgi->mgi_marker.cm_comment, comment,
1351                         sizeof(mgi->mgi_marker.cm_comment));
1352         if (cplen >= sizeof(mgi->mgi_marker.cm_comment))
1353                 return -E2BIG;
1354         mgi->mgi_marker.cm_createtime = cfs_time_current_sec();
1355         mgi->mgi_marker.cm_canceltime = 0;
1356         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
1357         lustre_cfg_bufs_set(&mgi->mgi_bufs, 1, &mgi->mgi_marker,
1358                             sizeof(mgi->mgi_marker));
1359         lcfg = lustre_cfg_new(LCFG_MARKER, &mgi->mgi_bufs);
1360         if (!lcfg)
1361                 return -ENOMEM;
1362         rc = record_lcfg(env, llh, lcfg);
1363
1364         lustre_cfg_free(lcfg);
1365         return rc;
1366 }
1367
1368 static int record_start_log(const struct lu_env *env, struct mgs_device *mgs,
1369                             struct llog_handle **llh, char *name)
1370 {
1371         static struct obd_uuid   cfg_uuid = { .uuid = "config_uuid" };
1372         struct llog_ctxt        *ctxt;
1373         int                      rc = 0;
1374
1375         if (*llh)
1376                 GOTO(out, rc = -EBUSY);
1377
1378         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1379         if (!ctxt)
1380                 GOTO(out, rc = -ENODEV);
1381         LASSERT(ctxt->loc_obd == mgs->mgs_obd);
1382
1383         rc = llog_open_create(env, ctxt, llh, NULL, name);
1384         if (rc)
1385                 GOTO(out_ctxt, rc);
1386         rc = llog_init_handle(env, *llh, LLOG_F_IS_PLAIN, &cfg_uuid);
1387         if (rc)
1388                 llog_close(env, *llh);
1389 out_ctxt:
1390         llog_ctxt_put(ctxt);
1391 out:
1392         if (rc) {
1393                 CERROR("%s: can't start log %s: rc = %d\n",
1394                        mgs->mgs_obd->obd_name, name, rc);
1395                 *llh = NULL;
1396         }
1397         RETURN(rc);
1398 }
1399
1400 static int record_end_log(const struct lu_env *env, struct llog_handle **llh)
1401 {
1402         int rc;
1403
1404         rc = llog_close(env, *llh);
1405         *llh = NULL;
1406
1407         return rc;
1408 }
1409
1410 /******************** config "macros" *********************/
1411
1412 /* write an lcfg directly into a log (with markers) */
1413 static int mgs_write_log_direct(const struct lu_env *env,
1414                                 struct mgs_device *mgs, struct fs_db *fsdb,
1415                                 char *logname, struct lustre_cfg *lcfg,
1416                                 char *devname, char *comment)
1417 {
1418         struct llog_handle *llh = NULL;
1419         int rc;
1420         ENTRY;
1421
1422         if (!lcfg)
1423                 RETURN(-ENOMEM);
1424
1425         rc = record_start_log(env, mgs, &llh, logname);
1426         if (rc)
1427                 RETURN(rc);
1428
1429         /* FIXME These should be a single journal transaction */
1430         rc = record_marker(env, llh, fsdb, CM_START, devname, comment);
1431         if (rc)
1432                 GOTO(out_end, rc);
1433         rc = record_lcfg(env, llh, lcfg);
1434         if (rc)
1435                 GOTO(out_end, rc);
1436         rc = record_marker(env, llh, fsdb, CM_END, devname, comment);
1437         if (rc)
1438                 GOTO(out_end, rc);
1439 out_end:
1440         record_end_log(env, &llh);
1441         RETURN(rc);
1442 }
1443
1444 /* write the lcfg in all logs for the given fs */
1445 int mgs_write_log_direct_all(const struct lu_env *env,
1446                              struct mgs_device *mgs,
1447                              struct fs_db *fsdb,
1448                              struct mgs_target_info *mti,
1449                              struct lustre_cfg *lcfg,
1450                              char *devname, char *comment,
1451                              int server_only)
1452 {
1453         cfs_list_t list;
1454         struct mgs_direntry *dirent, *n;
1455         char *fsname = mti->mti_fsname;
1456         char *logname;
1457         int rc = 0, len = strlen(fsname);
1458         ENTRY;
1459
1460         /* We need to set params for any future logs
1461            as well. FIXME Append this file to every new log.
1462            Actually, we should store as params (text), not llogs.  Or
1463            in a database. */
1464         rc = name_create(&logname, fsname, "-params");
1465         if (rc)
1466                 RETURN(rc);
1467         if (mgs_log_is_empty(env, mgs, logname)) {
1468                 struct llog_handle *llh = NULL;
1469                 rc = record_start_log(env, mgs, &llh, logname);
1470                 if (rc == 0)
1471                         record_end_log(env, &llh);
1472         }
1473         name_destroy(&logname);
1474         if (rc)
1475                 RETURN(rc);
1476
1477         /* Find all the logs in the CONFIGS directory */
1478         rc = class_dentry_readdir(env, mgs, &list);
1479         if (rc)
1480                 RETURN(rc);
1481
1482         /* Could use fsdb index maps instead of directory listing */
1483         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
1484                 cfs_list_del(&dirent->list);
1485                 /* don't write to sptlrpc rule log */
1486                 if (strstr(dirent->name, "-sptlrpc") != NULL)
1487                         goto next;
1488
1489                 /* caller wants write server logs only */
1490                 if (server_only && strstr(dirent->name, "-client") != NULL)
1491                         goto next;
1492
1493                 if (strncmp(fsname, dirent->name, len) == 0) {
1494                         CDEBUG(D_MGS, "Changing log %s\n", dirent->name);
1495                         /* Erase any old settings of this same parameter */
1496                         rc = mgs_modify(env, mgs, fsdb, mti, dirent->name,
1497                                         devname, comment, CM_SKIP);
1498                         if (rc < 0)
1499                                 CERROR("%s: Can't modify llog %s: rc = %d\n",
1500                                        mgs->mgs_obd->obd_name, dirent->name,rc);
1501                         /* Write the new one */
1502                         if (lcfg) {
1503                                 rc = mgs_write_log_direct(env, mgs, fsdb,
1504                                                           dirent->name,
1505                                                           lcfg, devname,
1506                                                           comment);
1507                                 if (rc)
1508                                         CERROR("%s: writing log %s: rc = %d\n",
1509                                                mgs->mgs_obd->obd_name,
1510                                                dirent->name, rc);
1511                         }
1512                 }
1513 next:
1514                 mgs_direntry_free(dirent);
1515         }
1516
1517         RETURN(rc);
1518 }
1519
1520 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
1521                                     struct mgs_device *mgs,
1522                                     struct fs_db *fsdb,
1523                                     struct mgs_target_info *mti,
1524                                     int index, char *logname);
1525 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
1526                                     struct mgs_device *mgs,
1527                                     struct fs_db *fsdb,
1528                                     struct mgs_target_info *mti,
1529                                     char *logname, char *suffix, char *lovname,
1530                                     enum lustre_sec_part sec_part, int flags);
1531 static int name_create_mdt_and_lov(char **logname, char **lovname,
1532                                    struct fs_db *fsdb, int i);
1533
1534 static int add_param(char *params, char *key, char *val)
1535 {
1536         char *start = params + strlen(params);
1537         char *end = params + sizeof(((struct mgs_target_info *)0)->mti_params);
1538         int keylen = 0;
1539
1540         if (key != NULL)
1541                 keylen = strlen(key);
1542         if (start + 1 + keylen + strlen(val) >= end) {
1543                 CERROR("params are too long: %s %s%s\n",
1544                        params, key != NULL ? key : "", val);
1545                 return -EINVAL;
1546         }
1547
1548         sprintf(start, " %s%s", key != NULL ? key : "", val);
1549         return 0;
1550 }
1551
1552 /**
1553  * Walk through client config log record and convert the related records
1554  * into the target.
1555  **/
1556 static int mgs_steal_client_llog_handler(const struct lu_env *env,
1557                                          struct llog_handle *llh,
1558                                          struct llog_rec_hdr *rec, void *data)
1559 {
1560         struct mgs_device *mgs;
1561         struct obd_device *obd;
1562         struct mgs_target_info *mti, *tmti;
1563         struct fs_db *fsdb;
1564         int cfg_len = rec->lrh_len;
1565         char *cfg_buf = (char*) (rec + 1);
1566         struct lustre_cfg *lcfg;
1567         int rc = 0;
1568         struct llog_handle *mdt_llh = NULL;
1569         static int got_an_osc_or_mdc = 0;
1570         /* 0: not found any osc/mdc;
1571            1: found osc;
1572            2: found mdc;
1573         */
1574         static int last_step = -1;
1575         int cplen = 0;
1576
1577         ENTRY;
1578
1579         mti = ((struct temp_comp*)data)->comp_mti;
1580         tmti = ((struct temp_comp*)data)->comp_tmti;
1581         fsdb = ((struct temp_comp*)data)->comp_fsdb;
1582         obd = ((struct temp_comp *)data)->comp_obd;
1583         mgs = lu2mgs_dev(obd->obd_lu_dev);
1584         LASSERT(mgs);
1585
1586         if (rec->lrh_type != OBD_CFG_REC) {
1587                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
1588                 RETURN(-EINVAL);
1589         }
1590
1591         rc = lustre_cfg_sanity_check(cfg_buf, cfg_len);
1592         if (rc) {
1593                 CERROR("Insane cfg\n");
1594                 RETURN(rc);
1595         }
1596
1597         lcfg = (struct lustre_cfg *)cfg_buf;
1598
1599         if (lcfg->lcfg_command == LCFG_MARKER) {
1600                 struct cfg_marker *marker;
1601                 marker = lustre_cfg_buf(lcfg, 1);
1602                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1603                     (marker->cm_flags & CM_START) &&
1604                      !(marker->cm_flags & CM_SKIP)) {
1605                         got_an_osc_or_mdc = 1;
1606                         cplen = strlcpy(tmti->mti_svname, marker->cm_tgtname,
1607                                         sizeof(tmti->mti_svname));
1608                         if (cplen >= sizeof(tmti->mti_svname))
1609                                 RETURN(-E2BIG);
1610                         rc = record_start_log(env, mgs, &mdt_llh,
1611                                               mti->mti_svname);
1612                         if (rc)
1613                                 RETURN(rc);
1614                         rc = record_marker(env, mdt_llh, fsdb, CM_START,
1615                                            mti->mti_svname, "add osc(copied)");
1616                         record_end_log(env, &mdt_llh);
1617                         last_step = marker->cm_step;
1618                         RETURN(rc);
1619                 }
1620                 if (!strncmp(marker->cm_comment, "add osc", 7) &&
1621                     (marker->cm_flags & CM_END) &&
1622                      !(marker->cm_flags & CM_SKIP)) {
1623                         LASSERT(last_step == marker->cm_step);
1624                         last_step = -1;
1625                         got_an_osc_or_mdc = 0;
1626                         memset(tmti, 0, sizeof(*tmti));
1627                         rc = record_start_log(env, mgs, &mdt_llh,
1628                                               mti->mti_svname);
1629                         if (rc)
1630                                 RETURN(rc);
1631                         rc = record_marker(env, mdt_llh, fsdb, CM_END,
1632                                            mti->mti_svname, "add osc(copied)");
1633                         record_end_log(env, &mdt_llh);
1634                         RETURN(rc);
1635                 }
1636                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1637                     (marker->cm_flags & CM_START) &&
1638                      !(marker->cm_flags & CM_SKIP)) {
1639                         got_an_osc_or_mdc = 2;
1640                         last_step = marker->cm_step;
1641                         memcpy(tmti->mti_svname, marker->cm_tgtname,
1642                                strlen(marker->cm_tgtname));
1643
1644                         RETURN(rc);
1645                 }
1646                 if (!strncmp(marker->cm_comment, "add mdc", 7) &&
1647                     (marker->cm_flags & CM_END) &&
1648                      !(marker->cm_flags & CM_SKIP)) {
1649                         LASSERT(last_step == marker->cm_step);
1650                         last_step = -1;
1651                         got_an_osc_or_mdc = 0;
1652                         memset(tmti, 0, sizeof(*tmti));
1653                         RETURN(rc);
1654                 }
1655         }
1656
1657         if (got_an_osc_or_mdc == 0 || last_step < 0)
1658                 RETURN(rc);
1659
1660         if (lcfg->lcfg_command == LCFG_ADD_UUID) {
1661                 uint64_t nodenid = lcfg->lcfg_nid;
1662
1663                 if (strlen(tmti->mti_uuid) == 0) {
1664                         /* target uuid not set, this config record is before
1665                          * LCFG_SETUP, this nid is one of target node nid.
1666                          */
1667                         tmti->mti_nids[tmti->mti_nid_count] = nodenid;
1668                         tmti->mti_nid_count++;
1669                 } else {
1670                         /* failover node nid */
1671                         rc = add_param(tmti->mti_params, PARAM_FAILNODE,
1672                                        libcfs_nid2str(nodenid));
1673                 }
1674
1675                 RETURN(rc);
1676         }
1677
1678         if (lcfg->lcfg_command == LCFG_SETUP) {
1679                 char *target;
1680
1681                 target = lustre_cfg_string(lcfg, 1);
1682                 memcpy(tmti->mti_uuid, target, strlen(target));
1683                 RETURN(rc);
1684         }
1685
1686         /* ignore client side sptlrpc_conf_log */
1687         if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF)
1688                 RETURN(rc);
1689
1690         if (lcfg->lcfg_command == LCFG_ADD_MDC) {
1691                 int index;
1692
1693                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1694                         RETURN (-EINVAL);
1695
1696                 memcpy(tmti->mti_fsname, mti->mti_fsname,
1697                        strlen(mti->mti_fsname));
1698                 tmti->mti_stripe_index = index;
1699
1700                 rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, tmti,
1701                                               mti->mti_stripe_index,
1702                                               mti->mti_svname);
1703                 memset(tmti, 0, sizeof(*tmti));
1704                 RETURN(rc);
1705         }
1706
1707         if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
1708                 int index;
1709                 char mdt_index[9];
1710                 char *logname, *lovname;
1711
1712                 rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
1713                                              mti->mti_stripe_index);
1714                 if (rc)
1715                         RETURN(rc);
1716                 sprintf(mdt_index, "-MDT%04x", mti->mti_stripe_index);
1717
1718                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) {
1719                         name_destroy(&logname);
1720                         name_destroy(&lovname);
1721                         RETURN(-EINVAL);
1722                 }
1723
1724                 tmti->mti_stripe_index = index;
1725                 rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, tmti, logname,
1726                                          mdt_index, lovname,
1727                                          LUSTRE_SP_MDT, 0);
1728                 name_destroy(&logname);
1729                 name_destroy(&lovname);
1730                 RETURN(rc);
1731         }
1732         RETURN(rc);
1733 }
1734
1735 /* fsdb->fsdb_mutex is already held  in mgs_write_log_target*/
1736 /* stealed from mgs_get_fsdb_from_llog*/
1737 static int mgs_steal_llog_for_mdt_from_client(const struct lu_env *env,
1738                                               struct mgs_device *mgs,
1739                                               char *client_name,
1740                                               struct temp_comp* comp)
1741 {
1742         struct llog_handle *loghandle;
1743         struct mgs_target_info *tmti;
1744         struct llog_ctxt *ctxt;
1745         int rc;
1746
1747         ENTRY;
1748
1749         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
1750         LASSERT(ctxt != NULL);
1751
1752         OBD_ALLOC_PTR(tmti);
1753         if (tmti == NULL)
1754                 GOTO(out_ctxt, rc = -ENOMEM);
1755
1756         comp->comp_tmti = tmti;
1757         comp->comp_obd = mgs->mgs_obd;
1758
1759         rc = llog_open(env, ctxt, &loghandle, NULL, client_name,
1760                        LLOG_OPEN_EXISTS);
1761         if (rc < 0) {
1762                 if (rc == -ENOENT)
1763                         rc = 0;
1764                 GOTO(out_pop, rc);
1765         }
1766
1767         rc = llog_init_handle(env, loghandle, LLOG_F_IS_PLAIN, NULL);
1768         if (rc)
1769                 GOTO(out_close, rc);
1770
1771         rc = llog_process_or_fork(env, loghandle, mgs_steal_client_llog_handler,
1772                                   (void *)comp, NULL, false);
1773         CDEBUG(D_MGS, "steal llog re = %d\n", rc);
1774 out_close:
1775         llog_close(env, loghandle);
1776 out_pop:
1777         OBD_FREE_PTR(tmti);
1778 out_ctxt:
1779         llog_ctxt_put(ctxt);
1780         RETURN(rc);
1781 }
1782
1783 /* lmv is the second thing for client logs */
1784 /* copied from mgs_write_log_lov. Please refer to that.  */
1785 static int mgs_write_log_lmv(const struct lu_env *env,
1786                              struct mgs_device *mgs,
1787                              struct fs_db *fsdb,
1788                              struct mgs_target_info *mti,
1789                              char *logname, char *lmvname)
1790 {
1791         struct llog_handle *llh = NULL;
1792         struct lmv_desc *lmvdesc;
1793         char *uuid;
1794         int rc = 0;
1795         ENTRY;
1796
1797         CDEBUG(D_MGS, "Writing lmv(%s) log for %s\n", lmvname,logname);
1798
1799         OBD_ALLOC_PTR(lmvdesc);
1800         if (lmvdesc == NULL)
1801                 RETURN(-ENOMEM);
1802         lmvdesc->ld_active_tgt_count = 0;
1803         lmvdesc->ld_tgt_count = 0;
1804         sprintf((char*)lmvdesc->ld_uuid.uuid, "%s_UUID", lmvname);
1805         uuid = (char *)lmvdesc->ld_uuid.uuid;
1806
1807         rc = record_start_log(env, mgs, &llh, logname);
1808         if (rc)
1809                 GOTO(out_free, rc);
1810         rc = record_marker(env, llh, fsdb, CM_START, lmvname, "lmv setup");
1811         if (rc)
1812                 GOTO(out_end, rc);
1813         rc = record_attach(env, llh, lmvname, "lmv", uuid);
1814         if (rc)
1815                 GOTO(out_end, rc);
1816         rc = record_lmv_setup(env, llh, lmvname, lmvdesc);
1817         if (rc)
1818                 GOTO(out_end, rc);
1819         rc = record_marker(env, llh, fsdb, CM_END, lmvname, "lmv setup");
1820         if (rc)
1821                 GOTO(out_end, rc);
1822 out_end:
1823         record_end_log(env, &llh);
1824 out_free:
1825         OBD_FREE_PTR(lmvdesc);
1826         RETURN(rc);
1827 }
1828
1829 /* lov is the first thing in the mdt and client logs */
1830 static int mgs_write_log_lov(const struct lu_env *env, struct mgs_device *mgs,
1831                              struct fs_db *fsdb, struct mgs_target_info *mti,
1832                              char *logname, char *lovname)
1833 {
1834         struct llog_handle *llh = NULL;
1835         struct lov_desc *lovdesc;
1836         char *uuid;
1837         int rc = 0;
1838         ENTRY;
1839
1840         CDEBUG(D_MGS, "Writing lov(%s) log for %s\n", lovname, logname);
1841
1842         /*
1843         #01 L attach   0:lov_mdsA  1:lov  2:71ccb_lov_mdsA_19f961a9e1
1844         #02 L lov_setup 0:lov_mdsA 1:(struct lov_desc)
1845               uuid=lov1_UUID, stripe count=1, size=1048576, offset=0, pattern=0
1846         */
1847
1848         /* FIXME just make lov_setup accept empty desc (put uuid in buf 2) */
1849         OBD_ALLOC_PTR(lovdesc);
1850         if (lovdesc == NULL)
1851                 RETURN(-ENOMEM);
1852         lovdesc->ld_magic = LOV_DESC_MAGIC;
1853         lovdesc->ld_tgt_count = 0;
1854         /* Defaults.  Can be changed later by lcfg config_param */
1855         lovdesc->ld_default_stripe_count = 1;
1856         lovdesc->ld_pattern = LOV_PATTERN_RAID0;
1857         lovdesc->ld_default_stripe_size = 1024 * 1024;
1858         lovdesc->ld_default_stripe_offset = -1;
1859         lovdesc->ld_qos_maxage = QOS_DEFAULT_MAXAGE;
1860         sprintf((char*)lovdesc->ld_uuid.uuid, "%s_UUID", lovname);
1861         /* can these be the same? */
1862         uuid = (char *)lovdesc->ld_uuid.uuid;
1863
1864         /* This should always be the first entry in a log.
1865         rc = mgs_clear_log(obd, logname); */
1866         rc = record_start_log(env, mgs, &llh, logname);
1867         if (rc)
1868                 GOTO(out_free, rc);
1869         /* FIXME these should be a single journal transaction */
1870         rc = record_marker(env, llh, fsdb, CM_START, lovname, "lov setup");
1871         if (rc)
1872                 GOTO(out_end, rc);
1873         rc = record_attach(env, llh, lovname, "lov", uuid);
1874         if (rc)
1875                 GOTO(out_end, rc);
1876         rc = record_lov_setup(env, llh, lovname, lovdesc);
1877         if (rc)
1878                 GOTO(out_end, rc);
1879         rc = record_marker(env, llh, fsdb, CM_END, lovname, "lov setup");
1880         if (rc)
1881                 GOTO(out_end, rc);
1882         EXIT;
1883 out_end:
1884         record_end_log(env, &llh);
1885 out_free:
1886         OBD_FREE_PTR(lovdesc);
1887         return rc;
1888 }
1889
1890 /* add failnids to open log */
1891 static int mgs_write_log_failnids(const struct lu_env *env,
1892                                   struct mgs_target_info *mti,
1893                                   struct llog_handle *llh,
1894                                   char *cliname)
1895 {
1896         char *failnodeuuid = NULL;
1897         char *ptr = mti->mti_params;
1898         lnet_nid_t nid;
1899         int rc = 0;
1900
1901         /*
1902         #03 L add_uuid  nid=uml1@tcp(0x20000c0a80201) nal=90 0:  1:uml1_UUID
1903         #04 L add_uuid  nid=1@elan(0x1000000000001)   nal=90 0:  1:uml1_UUID
1904         #05 L setup    0:OSC_uml1_ost1_mdsA  1:ost1_UUID  2:uml1_UUID
1905         #06 L add_uuid  nid=uml2@tcp(0x20000c0a80202) nal=90 0:  1:uml2_UUID
1906         #0x L add_uuid  nid=2@elan(0x1000000000002)   nal=90 0:  1:uml2_UUID
1907         #07 L add_conn 0:OSC_uml1_ost1_mdsA  1:uml2_UUID
1908         */
1909
1910         /* Pull failnid info out of params string */
1911         while (class_find_param(ptr, PARAM_FAILNODE, &ptr) == 0) {
1912                 while (class_parse_nid(ptr, &nid, &ptr) == 0) {
1913                         if (failnodeuuid == NULL) {
1914                                 /* We don't know the failover node name,
1915                                    so just use the first nid as the uuid */
1916                                 rc = name_create(&failnodeuuid,
1917                                                  libcfs_nid2str(nid), "");
1918                                 if (rc)
1919                                         return rc;
1920                         }
1921                         CDEBUG(D_MGS, "add nid %s for failover uuid %s, "
1922                                "client %s\n", libcfs_nid2str(nid),
1923                                failnodeuuid, cliname);
1924                         rc = record_add_uuid(env, llh, nid, failnodeuuid);
1925                 }
1926                 if (failnodeuuid)
1927                         rc = record_add_conn(env, llh, cliname, failnodeuuid);
1928         }
1929
1930         name_destroy(&failnodeuuid);
1931         return rc;
1932 }
1933
1934 static int mgs_write_log_mdc_to_lmv(const struct lu_env *env,
1935                                     struct mgs_device *mgs,
1936                                     struct fs_db *fsdb,
1937                                     struct mgs_target_info *mti,
1938                                     char *logname, char *lmvname)
1939 {
1940         struct llog_handle *llh = NULL;
1941         char *mdcname = NULL;
1942         char *nodeuuid = NULL;
1943         char *mdcuuid = NULL;
1944         char *lmvuuid = NULL;
1945         char index[6];
1946         int i, rc;
1947         ENTRY;
1948
1949         if (mgs_log_is_empty(env, mgs, logname)) {
1950                 CERROR("log is empty! Logical error\n");
1951                 RETURN(-EINVAL);
1952         }
1953
1954         CDEBUG(D_MGS, "adding mdc for %s to log %s:lmv(%s)\n",
1955                mti->mti_svname, logname, lmvname);
1956
1957         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
1958         if (rc)
1959                 RETURN(rc);
1960         rc = name_create(&mdcname, mti->mti_svname, "-mdc");
1961         if (rc)
1962                 GOTO(out_free, rc);
1963         rc = name_create(&mdcuuid, mdcname, "_UUID");
1964         if (rc)
1965                 GOTO(out_free, rc);
1966         rc = name_create(&lmvuuid, lmvname, "_UUID");
1967         if (rc)
1968                 GOTO(out_free, rc);
1969
1970         rc = record_start_log(env, mgs, &llh, logname);
1971         if (rc)
1972                 GOTO(out_free, rc);
1973         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
1974                            "add mdc");
1975         if (rc)
1976                 GOTO(out_end, rc);
1977         for (i = 0; i < mti->mti_nid_count; i++) {
1978                 CDEBUG(D_MGS, "add nid %s for mdt\n",
1979                        libcfs_nid2str(mti->mti_nids[i]));
1980
1981                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
1982                 if (rc)
1983                         GOTO(out_end, rc);
1984         }
1985
1986         rc = record_attach(env, llh, mdcname, LUSTRE_MDC_NAME, lmvuuid);
1987         if (rc)
1988                 GOTO(out_end, rc);
1989         rc = record_setup(env, llh, mdcname, mti->mti_uuid, nodeuuid, 0, 0);
1990         if (rc)
1991                 GOTO(out_end, rc);
1992         rc = mgs_write_log_failnids(env, mti, llh, mdcname);
1993         if (rc)
1994                 GOTO(out_end, rc);
1995         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
1996         rc = record_mdc_add(env, llh, lmvname, mdcuuid, mti->mti_uuid,
1997                             index, "1");
1998         if (rc)
1999                 GOTO(out_end, rc);
2000         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname,
2001                            "add mdc");
2002         if (rc)
2003                 GOTO(out_end, rc);
2004 out_end:
2005         record_end_log(env, &llh);
2006 out_free:
2007         name_destroy(&lmvuuid);
2008         name_destroy(&mdcuuid);
2009         name_destroy(&mdcname);
2010         name_destroy(&nodeuuid);
2011         RETURN(rc);
2012 }
2013
2014 static inline int name_create_lov(char **lovname, char *mdtname,
2015                                   struct fs_db *fsdb, int index)
2016 {
2017         /* COMPAT_180 */
2018         if (index == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2019                 return name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2020         else
2021                 return name_create(lovname, mdtname, "-mdtlov");
2022 }
2023
2024 static int name_create_mdt_and_lov(char **logname, char **lovname,
2025                                    struct fs_db *fsdb, int i)
2026 {
2027         int rc;
2028
2029         rc = name_create_mdt(logname, fsdb->fsdb_name, i);
2030         if (rc)
2031                 return rc;
2032         /* COMPAT_180 */
2033         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2034                 rc = name_create(lovname, fsdb->fsdb_name, "-mdtlov");
2035         else
2036                 rc = name_create(lovname, *logname, "-mdtlov");
2037         if (rc) {
2038                 name_destroy(logname);
2039                 *logname = NULL;
2040         }
2041         return rc;
2042 }
2043
2044 static inline int name_create_mdt_osc(char **oscname, char *ostname,
2045                                       struct fs_db *fsdb, int i)
2046 {
2047         char suffix[16];
2048
2049         if (i == 0 && test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2050                 sprintf(suffix, "-osc");
2051         else
2052                 sprintf(suffix, "-osc-MDT%04x", i);
2053         return name_create(oscname, ostname, suffix);
2054 }
2055
2056 /* add new mdc to already existent MDS */
2057 static int mgs_write_log_osp_to_mdt(const struct lu_env *env,
2058                                     struct mgs_device *mgs,
2059                                     struct fs_db *fsdb,
2060                                     struct mgs_target_info *mti,
2061                                     int mdt_index, char *logname)
2062 {
2063         struct llog_handle      *llh = NULL;
2064         char    *nodeuuid = NULL;
2065         char    *ospname = NULL;
2066         char    *lovuuid = NULL;
2067         char    *mdtuuid = NULL;
2068         char    *svname = NULL;
2069         char    *mdtname = NULL;
2070         char    *lovname = NULL;
2071         char    index_str[16];
2072         int     i, rc;
2073
2074         ENTRY;
2075         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2076                 CERROR("log is empty! Logical error\n");
2077                 RETURN (-EINVAL);
2078         }
2079
2080         CDEBUG(D_MGS, "adding osp index %d to %s\n", mti->mti_stripe_index,
2081                logname);
2082
2083         rc = name_create_mdt(&mdtname, fsdb->fsdb_name, mti->mti_stripe_index);
2084         if (rc)
2085                 RETURN(rc);
2086
2087         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2088         if (rc)
2089                 GOTO(out_destory, rc);
2090
2091         rc = name_create(&svname, mdtname, "-osp");
2092         if (rc)
2093                 GOTO(out_destory, rc);
2094
2095         sprintf(index_str, "-MDT%04x", mdt_index);
2096         rc = name_create(&ospname, svname, index_str);
2097         if (rc)
2098                 GOTO(out_destory, rc);
2099
2100         rc = name_create_lov(&lovname, logname, fsdb, mdt_index);
2101         if (rc)
2102                 GOTO(out_destory, rc);
2103
2104         rc = name_create(&lovuuid, lovname, "_UUID");
2105         if (rc)
2106                 GOTO(out_destory, rc);
2107
2108         rc = name_create(&mdtuuid, mdtname, "_UUID");
2109         if (rc)
2110                 GOTO(out_destory, rc);
2111
2112         rc = record_start_log(env, mgs, &llh, logname);
2113         if (rc)
2114                 GOTO(out_destory, rc);
2115
2116         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2117                            "add osp");
2118         if (rc)
2119                 GOTO(out_destory, rc);
2120
2121         for (i = 0; i < mti->mti_nid_count; i++) {
2122                 CDEBUG(D_MGS, "add nid %s for mdt\n",
2123                        libcfs_nid2str(mti->mti_nids[i]));
2124                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2125                 if (rc)
2126                         GOTO(out_end, rc);
2127         }
2128
2129         rc = record_attach(env, llh, ospname, LUSTRE_OSP_NAME, lovuuid);
2130         if (rc)
2131                 GOTO(out_end, rc);
2132
2133         rc = record_setup(env, llh, ospname, mti->mti_uuid, nodeuuid,
2134                           NULL, NULL);
2135         if (rc)
2136                 GOTO(out_end, rc);
2137
2138         rc = mgs_write_log_failnids(env, mti, llh, ospname);
2139         if (rc)
2140                 GOTO(out_end, rc);
2141
2142         /* Add mdc(osp) to lod */
2143         snprintf(index_str, sizeof(mti->mti_stripe_index), "%d",
2144                  mti->mti_stripe_index);
2145         rc = record_base(env, llh, lovname, 0, LCFG_ADD_MDC, mti->mti_uuid,
2146                          index_str, "1", NULL);
2147         if (rc)
2148                 GOTO(out_end, rc);
2149
2150         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add osp");
2151         if (rc)
2152                 GOTO(out_end, rc);
2153
2154 out_end:
2155         record_end_log(env, &llh);
2156
2157 out_destory:
2158         name_destroy(&mdtuuid);
2159         name_destroy(&lovuuid);
2160         name_destroy(&lovname);
2161         name_destroy(&ospname);
2162         name_destroy(&svname);
2163         name_destroy(&nodeuuid);
2164         name_destroy(&mdtname);
2165         RETURN(rc);
2166 }
2167
2168 static int mgs_write_log_mdt0(const struct lu_env *env,
2169                               struct mgs_device *mgs,
2170                               struct fs_db *fsdb,
2171                               struct mgs_target_info *mti)
2172 {
2173         char *log = mti->mti_svname;
2174         struct llog_handle *llh = NULL;
2175         char *uuid, *lovname;
2176         char mdt_index[6];
2177         char *ptr = mti->mti_params;
2178         int rc = 0, failout = 0;
2179         ENTRY;
2180
2181         OBD_ALLOC(uuid, sizeof(struct obd_uuid));
2182         if (uuid == NULL)
2183                 RETURN(-ENOMEM);
2184
2185         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2186                 failout = (strncmp(ptr, "failout", 7) == 0);
2187
2188         rc = name_create(&lovname, log, "-mdtlov");
2189         if (rc)
2190                 GOTO(out_free, rc);
2191         if (mgs_log_is_empty(env, mgs, log)) {
2192                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, log, lovname);
2193                 if (rc)
2194                         GOTO(out_lod, rc);
2195         }
2196
2197         sprintf(mdt_index, "%d", mti->mti_stripe_index);
2198
2199         rc = record_start_log(env, mgs, &llh, log);
2200         if (rc)
2201                 GOTO(out_lod, rc);
2202
2203         /* add MDT itself */
2204
2205         /* FIXME this whole fn should be a single journal transaction */
2206         sprintf(uuid, "%s_UUID", log);
2207         rc = record_marker(env, llh, fsdb, CM_START, log, "add mdt");
2208         if (rc)
2209                 GOTO(out_lod, rc);
2210         rc = record_attach(env, llh, log, LUSTRE_MDT_NAME, uuid);
2211         if (rc)
2212                 GOTO(out_end, rc);
2213         rc = record_mount_opt(env, llh, log, lovname, NULL);
2214         if (rc)
2215                 GOTO(out_end, rc);
2216         rc = record_setup(env, llh, log, uuid, mdt_index, lovname,
2217                         failout ? "n" : "f");
2218         if (rc)
2219                 GOTO(out_end, rc);
2220         rc = record_marker(env, llh, fsdb, CM_END, log, "add mdt");
2221         if (rc)
2222                 GOTO(out_end, rc);
2223 out_end:
2224         record_end_log(env, &llh);
2225 out_lod:
2226         name_destroy(&lovname);
2227 out_free:
2228         OBD_FREE(uuid, sizeof(struct obd_uuid));
2229         RETURN(rc);
2230 }
2231
2232 /* envelope method for all layers log */
2233 static int mgs_write_log_mdt(const struct lu_env *env,
2234                              struct mgs_device *mgs,
2235                              struct fs_db *fsdb,
2236                              struct mgs_target_info *mti)
2237 {
2238         struct mgs_thread_info *mgi = mgs_env_info(env);
2239         struct llog_handle *llh = NULL;
2240         char *cliname;
2241         int rc, i = 0;
2242         ENTRY;
2243
2244         CDEBUG(D_MGS, "writing new mdt %s\n", mti->mti_svname);
2245
2246         if (mti->mti_uuid[0] == '\0') {
2247                 /* Make up our own uuid */
2248                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2249                          "%s_UUID", mti->mti_svname);
2250         }
2251
2252         /* add mdt */
2253         rc = mgs_write_log_mdt0(env, mgs, fsdb, mti);
2254         if (rc)
2255                 RETURN(rc);
2256         /* Append the mdt info to the client log */
2257         rc = name_create(&cliname, mti->mti_fsname, "-client");
2258         if (rc)
2259                 RETURN(rc);
2260
2261         if (mgs_log_is_empty(env, mgs, cliname)) {
2262                 /* Start client log */
2263                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, cliname,
2264                                        fsdb->fsdb_clilov);
2265                 if (rc)
2266                         GOTO(out_free, rc);
2267                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, cliname,
2268                                        fsdb->fsdb_clilmv);
2269                 if (rc)
2270                         GOTO(out_free, rc);
2271         }
2272
2273         /*
2274         #09 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2275         #10 L attach   0:MDC_uml1_mdsA_MNT_client  1:mdc  2:1d834_MNT_client_03f
2276         #11 L setup    0:MDC_uml1_mdsA_MNT_client  1:mdsA_UUID  2:uml1_UUID
2277         #12 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2278         #13 L add_conn 0:MDC_uml1_mdsA_MNT_client  1:uml2_UUID
2279         #14 L mount_option 0:  1:client  2:lov1  3:MDC_uml1_mdsA_MNT_client
2280         */
2281
2282                 /* copy client info about lov/lmv */
2283                 mgi->mgi_comp.comp_mti = mti;
2284                 mgi->mgi_comp.comp_fsdb = fsdb;
2285
2286                 rc = mgs_steal_llog_for_mdt_from_client(env, mgs, cliname,
2287                                                         &mgi->mgi_comp);
2288                 if (rc)
2289                         GOTO(out_free, rc);
2290                 rc = mgs_write_log_mdc_to_lmv(env, mgs, fsdb, mti, cliname,
2291                                               fsdb->fsdb_clilmv);
2292                 if (rc)
2293                         GOTO(out_free, rc);
2294
2295                 /* add mountopts */
2296                 rc = record_start_log(env, mgs, &llh, cliname);
2297                 if (rc)
2298                         GOTO(out_free, rc);
2299
2300                 rc = record_marker(env, llh, fsdb, CM_START, cliname,
2301                                    "mount opts");
2302                 if (rc)
2303                         GOTO(out_end, rc);
2304                 rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov,
2305                                       fsdb->fsdb_clilmv);
2306                 if (rc)
2307                         GOTO(out_end, rc);
2308                 rc = record_marker(env, llh, fsdb, CM_END, cliname,
2309                                    "mount opts");
2310
2311         if (rc)
2312                 GOTO(out_end, rc);
2313
2314         /* for_all_existing_mdt except current one */
2315         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2316                 if (i !=  mti->mti_stripe_index &&
2317                     test_bit(i, fsdb->fsdb_mdt_index_map)) {
2318                         char *logname;
2319
2320                         rc = name_create_mdt(&logname, fsdb->fsdb_name, i);
2321                         if (rc)
2322                                 GOTO(out_end, rc);
2323
2324                         rc = mgs_write_log_osp_to_mdt(env, mgs, fsdb, mti,
2325                                                       i, logname);
2326                         name_destroy(&logname);
2327                         if (rc)
2328                                 GOTO(out_end, rc);
2329                 }
2330         }
2331 out_end:
2332         record_end_log(env, &llh);
2333 out_free:
2334         name_destroy(&cliname);
2335         RETURN(rc);
2336 }
2337
2338 /* Add the ost info to the client/mdt lov */
2339 static int mgs_write_log_osc_to_lov(const struct lu_env *env,
2340                                     struct mgs_device *mgs, struct fs_db *fsdb,
2341                                     struct mgs_target_info *mti,
2342                                     char *logname, char *suffix, char *lovname,
2343                                     enum lustre_sec_part sec_part, int flags)
2344 {
2345         struct llog_handle *llh = NULL;
2346         char *nodeuuid = NULL;
2347         char *oscname = NULL;
2348         char *oscuuid = NULL;
2349         char *lovuuid = NULL;
2350         char *svname = NULL;
2351         char index[6];
2352         int i, rc;
2353
2354         ENTRY;
2355         CDEBUG(D_INFO, "adding osc for %s to log %s\n",
2356                mti->mti_svname, logname);
2357
2358         if (mgs_log_is_empty(env, mgs, logname)) {
2359                 CERROR("log is empty! Logical error\n");
2360                 RETURN (-EINVAL);
2361         }
2362
2363         rc = name_create(&nodeuuid, libcfs_nid2str(mti->mti_nids[0]), "");
2364         if (rc)
2365                 RETURN(rc);
2366         rc = name_create(&svname, mti->mti_svname, "-osc");
2367         if (rc)
2368                 GOTO(out_free, rc);
2369
2370         /* for the system upgraded from old 1.8, keep using the old osc naming
2371          * style for mdt, see name_create_mdt_osc(). LU-1257 */
2372         if (test_bit(FSDB_OSCNAME18, &fsdb->fsdb_flags))
2373                 rc = name_create(&oscname, svname, "");
2374         else
2375                 rc = name_create(&oscname, svname, suffix);
2376         if (rc)
2377                 GOTO(out_free, rc);
2378
2379         rc = name_create(&oscuuid, oscname, "_UUID");
2380         if (rc)
2381                 GOTO(out_free, rc);
2382         rc = name_create(&lovuuid, lovname, "_UUID");
2383         if (rc)
2384                 GOTO(out_free, rc);
2385
2386
2387         /*
2388         #03 L add_uuid nid=uml1@tcp(0x20000c0a80201) 0:  1:uml1_UUID
2389         multihomed (#4)
2390         #04 L add_uuid  nid=1@elan(0x1000000000001)  nal=90 0:  1:uml1_UUID
2391         #04 L attach   0:OSC_uml1_ost1_MNT_client  1:osc  2:89070_lov1_a41dff51a
2392         #05 L setup    0:OSC_uml1_ost1_MNT_client  1:ost1_UUID  2:uml1_UUID
2393         failover (#6,7)
2394         #06 L add_uuid nid=uml2@tcp(0x20000c0a80202) 0:  1:uml2_UUID
2395         #07 L add_conn 0:OSC_uml1_ost1_MNT_client  1:uml2_UUID
2396         #08 L lov_modify_tgts add 0:lov1  1:ost1_UUID  2(index):0  3(gen):1
2397         */
2398
2399         rc = record_start_log(env, mgs, &llh, logname);
2400         if (rc)
2401                 GOTO(out_free, rc);
2402
2403         /* FIXME these should be a single journal transaction */
2404         rc = record_marker(env, llh, fsdb, CM_START | flags, mti->mti_svname,
2405                            "add osc");
2406         if (rc)
2407                 GOTO(out_end, rc);
2408
2409         /* NB: don't change record order, because upon MDT steal OSC config
2410          * from client, it treats all nids before LCFG_SETUP as target nids
2411          * (multiple interfaces), while nids after as failover node nids.
2412          * See mgs_steal_client_llog_handler() LCFG_ADD_UUID.
2413          */
2414         for (i = 0; i < mti->mti_nid_count; i++) {
2415                 CDEBUG(D_MGS, "add nid %s\n", libcfs_nid2str(mti->mti_nids[i]));
2416                 rc = record_add_uuid(env, llh, mti->mti_nids[i], nodeuuid);
2417                 if (rc)
2418                         GOTO(out_end, rc);
2419         }
2420         rc = record_attach(env, llh, oscname, LUSTRE_OSC_NAME, lovuuid);
2421         if (rc)
2422                 GOTO(out_end, rc);
2423         rc = record_setup(env, llh, oscname, mti->mti_uuid, nodeuuid, 0, 0);
2424         if (rc)
2425                 GOTO(out_end, rc);
2426         rc = mgs_write_log_failnids(env, mti, llh, oscname);
2427         if (rc)
2428                 GOTO(out_end, rc);
2429
2430         snprintf(index, sizeof(index), "%d", mti->mti_stripe_index);
2431
2432         rc = record_lov_add(env, llh, lovname, mti->mti_uuid, index, "1");
2433         if (rc)
2434                 GOTO(out_end, rc);
2435         rc = record_marker(env, llh, fsdb, CM_END | flags, mti->mti_svname,
2436                            "add osc");
2437         if (rc)
2438                 GOTO(out_end, rc);
2439 out_end:
2440         record_end_log(env, &llh);
2441 out_free:
2442         name_destroy(&lovuuid);
2443         name_destroy(&oscuuid);
2444         name_destroy(&oscname);
2445         name_destroy(&svname);
2446         name_destroy(&nodeuuid);
2447         RETURN(rc);
2448 }
2449
2450 static int mgs_write_log_ost(const struct lu_env *env,
2451                              struct mgs_device *mgs, struct fs_db *fsdb,
2452                              struct mgs_target_info *mti)
2453 {
2454         struct llog_handle *llh = NULL;
2455         char *logname, *lovname;
2456         char *ptr = mti->mti_params;
2457         int rc, flags = 0, failout = 0, i;
2458         ENTRY;
2459
2460         CDEBUG(D_MGS, "writing new ost %s\n", mti->mti_svname);
2461
2462         /* The ost startup log */
2463
2464         /* If the ost log already exists, that means that someone reformatted
2465            the ost and it called target_add again. */
2466         if (!mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2467                 LCONSOLE_ERROR_MSG(0x141, "The config log for %s already "
2468                                    "exists, yet the server claims it never "
2469                                    "registered. It may have been reformatted, "
2470                                    "or the index changed. writeconf the MDT to "
2471                                    "regenerate all logs.\n", mti->mti_svname);
2472                 RETURN(-EALREADY);
2473         }
2474
2475         /*
2476         attach obdfilter ost1 ost1_UUID
2477         setup /dev/loop2 ldiskfs f|n errors=remount-ro,user_xattr
2478         */
2479         if (class_find_param(ptr, PARAM_FAILMODE, &ptr) == 0)
2480                 failout = (strncmp(ptr, "failout", 7) == 0);
2481         rc = record_start_log(env, mgs, &llh, mti->mti_svname);
2482         if (rc)
2483                 RETURN(rc);
2484         /* FIXME these should be a single journal transaction */
2485         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,"add ost");
2486         if (rc)
2487                 GOTO(out_end, rc);
2488         if (*mti->mti_uuid == '\0')
2489                 snprintf(mti->mti_uuid, sizeof(mti->mti_uuid),
2490                          "%s_UUID", mti->mti_svname);
2491         rc = record_attach(env, llh, mti->mti_svname,
2492                            "obdfilter"/*LUSTRE_OST_NAME*/, mti->mti_uuid);
2493         if (rc)
2494                 GOTO(out_end, rc);
2495         rc = record_setup(env, llh, mti->mti_svname,
2496                           "dev"/*ignored*/, "type"/*ignored*/,
2497                           failout ? "n" : "f", 0/*options*/);
2498         if (rc)
2499                 GOTO(out_end, rc);
2500         rc = record_marker(env, llh, fsdb, CM_END, mti->mti_svname, "add ost");
2501         if (rc)
2502                 GOTO(out_end, rc);
2503 out_end:
2504         record_end_log(env, &llh);
2505         if (rc)
2506                 RETURN(rc);
2507         /* We also have to update the other logs where this osc is part of
2508            the lov */
2509
2510         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
2511                 /* If we're upgrading, the old mdt log already has our
2512                    entry. Let's do a fake one for fun. */
2513                 /* Note that we can't add any new failnids, since we don't
2514                    know the old osc names. */
2515                 flags = CM_SKIP | CM_UPGRADE146;
2516
2517         } else if ((mti->mti_flags & LDD_F_UPDATE) != LDD_F_UPDATE) {
2518                 /* If the update flag isn't set, don't update client/mdt
2519                    logs. */
2520                 flags |= CM_SKIP;
2521                 LCONSOLE_WARN("Client log for %s was not updated; writeconf "
2522                               "the MDT first to regenerate it.\n",
2523                               mti->mti_svname);
2524         }
2525
2526         /* Add ost to all MDT lov defs */
2527         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
2528                 if (test_bit(i, fsdb->fsdb_mdt_index_map)) {
2529                         char mdt_index[9];
2530
2531                         rc = name_create_mdt_and_lov(&logname, &lovname, fsdb,
2532                                                      i);
2533                         if (rc)
2534                                 RETURN(rc);
2535                         sprintf(mdt_index, "-MDT%04x", i);
2536                         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti,
2537                                                       logname, mdt_index,
2538                                                       lovname, LUSTRE_SP_MDT,
2539                                                       flags);
2540                         name_destroy(&logname);
2541                         name_destroy(&lovname);
2542                         if (rc)
2543                                 RETURN(rc);
2544                 }
2545         }
2546
2547         /* Append ost info to the client log */
2548         rc = name_create(&logname, mti->mti_fsname, "-client");
2549         if (rc)
2550                 RETURN(rc);
2551         if (mgs_log_is_empty(env, mgs, logname)) {
2552                 /* Start client log */
2553                 rc = mgs_write_log_lov(env, mgs, fsdb, mti, logname,
2554                                        fsdb->fsdb_clilov);
2555                 if (rc)
2556                         GOTO(out_free, rc);
2557                 rc = mgs_write_log_lmv(env, mgs, fsdb, mti, logname,
2558                                        fsdb->fsdb_clilmv);
2559                 if (rc)
2560                         GOTO(out_free, rc);
2561         }
2562         rc = mgs_write_log_osc_to_lov(env, mgs, fsdb, mti, logname, "",
2563                                       fsdb->fsdb_clilov, LUSTRE_SP_CLI, 0);
2564 out_free:
2565         name_destroy(&logname);
2566         RETURN(rc);
2567 }
2568
2569 static __inline__ int mgs_param_empty(char *ptr)
2570 {
2571         char *tmp;
2572
2573         if ((tmp = strchr(ptr, '=')) && (*(++tmp) == '\0'))
2574                 return 1;
2575         return 0;
2576 }
2577
2578 static int mgs_write_log_failnid_internal(const struct lu_env *env,
2579                                           struct mgs_device *mgs,
2580                                           struct fs_db *fsdb,
2581                                           struct mgs_target_info *mti,
2582                                           char *logname, char *cliname)
2583 {
2584         int rc;
2585         struct llog_handle *llh = NULL;
2586
2587         if (mgs_param_empty(mti->mti_params)) {
2588                 /* Remove _all_ failnids */
2589                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
2590                                 mti->mti_svname, "add failnid", CM_SKIP);
2591                 return rc < 0 ? rc : 0;
2592         }
2593
2594         /* Otherwise failover nids are additive */
2595         rc = record_start_log(env, mgs, &llh, logname);
2596         if (rc)
2597                 return rc;
2598                 /* FIXME this should be a single journal transaction */
2599         rc = record_marker(env, llh, fsdb, CM_START, mti->mti_svname,
2600                            "add failnid");
2601         if (rc)
2602                 goto out_end;
2603         rc = mgs_write_log_failnids(env, mti, llh, cliname);
2604         if (rc)
2605                 goto out_end;
2606         rc = record_marker(env, llh, fsdb, CM_END,
2607                            mti->mti_svname, "add failnid");
2608 out_end:
2609         record_end_log(env, &llh);
2610         return rc;
2611 }
2612
2613
2614 /* Add additional failnids to an existing log.
2615    The mdc/osc must have been added to logs first */
2616 /* tcp nids must be in dotted-quad ascii -
2617    we can't resolve hostnames from the kernel. */
2618 static int mgs_write_log_add_failnid(const struct lu_env *env,
2619                                      struct mgs_device *mgs,
2620                                      struct fs_db *fsdb,
2621                                      struct mgs_target_info *mti)
2622 {
2623         char *logname, *cliname;
2624         int rc;
2625         ENTRY;
2626
2627         /* FIXME we currently can't erase the failnids
2628          * given when a target first registers, since they aren't part of
2629          * an "add uuid" stanza */
2630
2631         /* Verify that we know about this target */
2632         if (mgs_log_is_empty(env, mgs, mti->mti_svname)) {
2633                 LCONSOLE_ERROR_MSG(0x142, "The target %s has not registered "
2634                                    "yet. It must be started before failnids "
2635                                    "can be added.\n", mti->mti_svname);
2636                 RETURN(-ENOENT);
2637         }
2638
2639         /* Create mdc/osc client name (e.g. lustre-OST0001-osc) */
2640         if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
2641                 rc = name_create(&cliname, mti->mti_svname, "-mdc");
2642         } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2643                 rc = name_create(&cliname, mti->mti_svname, "-osc");
2644         } else {
2645                 RETURN(-EINVAL);
2646         }
2647         if (rc)
2648                 RETURN(rc);
2649         /* Add failover nids to the client log */
2650         rc = name_create(&logname, mti->mti_fsname, "-client");
2651         if (rc) {
2652                 name_destroy(&cliname);
2653                 RETURN(rc);
2654         }
2655         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,mti,logname,cliname);
2656         name_destroy(&logname);
2657         name_destroy(&cliname);
2658         if (rc)
2659                 RETURN(rc);
2660
2661         if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
2662                 /* Add OST failover nids to the MDT logs as well */
2663                 int i;
2664
2665                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
2666                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
2667                                 continue;
2668                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
2669                         if (rc)
2670                                 RETURN(rc);
2671                         rc = name_create_mdt_osc(&cliname, mti->mti_svname,
2672                                                  fsdb, i);
2673                         if (rc) {
2674                                 name_destroy(&logname);
2675                                 RETURN(rc);
2676                         }
2677                         rc = mgs_write_log_failnid_internal(env, mgs, fsdb,
2678                                                             mti, logname,
2679                                                             cliname);
2680                         name_destroy(&cliname);
2681                         name_destroy(&logname);
2682                         if (rc)
2683                                 RETURN(rc);
2684                 }
2685         }
2686
2687         RETURN(rc);
2688 }
2689
2690 static int mgs_wlp_lcfg(const struct lu_env *env,
2691                         struct mgs_device *mgs, struct fs_db *fsdb,
2692                         struct mgs_target_info *mti,
2693                         char *logname, struct lustre_cfg_bufs *bufs,
2694                         char *tgtname, char *ptr)
2695 {
2696         char comment[MTI_NAME_MAXLEN];
2697         char *tmp;
2698         struct lustre_cfg *lcfg;
2699         int rc, del;
2700
2701         /* Erase any old settings of this same parameter */
2702         memcpy(comment, ptr, MTI_NAME_MAXLEN);
2703         comment[MTI_NAME_MAXLEN - 1] = 0;
2704         /* But don't try to match the value. */
2705         if ((tmp = strchr(comment, '=')))
2706             *tmp = 0;
2707         /* FIXME we should skip settings that are the same as old values */
2708         rc = mgs_modify(env, mgs, fsdb, mti, logname, tgtname, comment,CM_SKIP);
2709         if (rc < 0)
2710                 return rc;
2711         del = mgs_param_empty(ptr);
2712
2713         LCONSOLE_INFO("%sing parameter %s.%s in log %s\n", del ? "Disabl" : rc ?
2714                       "Sett" : "Modify", tgtname, comment, logname);
2715         if (del)
2716                 return rc;
2717
2718         lustre_cfg_bufs_reset(bufs, tgtname);
2719         lustre_cfg_bufs_set_string(bufs, 1, ptr);
2720         if (mti->mti_flags & LDD_F_PARAM2)
2721                 lustre_cfg_bufs_set_string(bufs, 2, LCTL_UPCALL);
2722
2723         lcfg = lustre_cfg_new((mti->mti_flags & LDD_F_PARAM2) ?
2724                               LCFG_SET_PARAM : LCFG_PARAM, bufs);
2725
2726         if (!lcfg)
2727                 return -ENOMEM;
2728         rc = mgs_write_log_direct(env, mgs, fsdb, logname,lcfg,tgtname,comment);
2729         lustre_cfg_free(lcfg);
2730         return rc;
2731 }
2732
2733 static int mgs_write_log_param2(const struct lu_env *env,
2734                                 struct mgs_device *mgs,
2735                                 struct fs_db *fsdb,
2736                                 struct mgs_target_info *mti, char *ptr)
2737 {
2738         struct lustre_cfg_bufs  bufs;
2739         int                     rc = 0;
2740         ENTRY;
2741
2742         CDEBUG(D_MGS, "next param '%s'\n", ptr);
2743         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, PARAMS_FILENAME, &bufs,
2744                           mti->mti_svname, ptr);
2745
2746         RETURN(rc);
2747 }
2748
2749 /* write global variable settings into log */
2750 static int mgs_write_log_sys(const struct lu_env *env,
2751                              struct mgs_device *mgs, struct fs_db *fsdb,
2752                              struct mgs_target_info *mti, char *sys, char *ptr)
2753 {
2754         struct mgs_thread_info *mgi = mgs_env_info(env);
2755         struct lustre_cfg *lcfg;
2756         char *tmp, sep;
2757         int rc, cmd, convert = 1;
2758
2759         if (class_match_param(ptr, PARAM_TIMEOUT, &tmp) == 0) {
2760                 cmd = LCFG_SET_TIMEOUT;
2761         } else if (class_match_param(ptr, PARAM_LDLM_TIMEOUT, &tmp) == 0) {
2762                 cmd = LCFG_SET_LDLM_TIMEOUT;
2763         /* Check for known params here so we can return error to lctl */
2764         } else if ((class_match_param(ptr, PARAM_AT_MIN, &tmp) == 0) ||
2765                 (class_match_param(ptr, PARAM_AT_MAX, &tmp) == 0) ||
2766                 (class_match_param(ptr, PARAM_AT_EXTRA, &tmp) == 0) ||
2767                 (class_match_param(ptr, PARAM_AT_EARLY_MARGIN, &tmp) == 0) ||
2768                 (class_match_param(ptr, PARAM_AT_HISTORY, &tmp) == 0)) {
2769                 cmd = LCFG_PARAM;
2770         } else if (class_match_param(ptr, PARAM_JOBID_VAR, &tmp) == 0) {
2771                 convert = 0; /* Don't convert string value to integer */
2772                 cmd = LCFG_PARAM;
2773         } else {
2774                 return -EINVAL;
2775         }
2776
2777         if (mgs_param_empty(ptr))
2778                 CDEBUG(D_MGS, "global '%s' removed\n", sys);
2779         else
2780                 CDEBUG(D_MGS, "global '%s' val=%s\n", sys, tmp);
2781
2782         lustre_cfg_bufs_reset(&mgi->mgi_bufs, NULL);
2783         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, sys);
2784         if (!convert && *tmp != '\0')
2785                 lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 2, tmp);
2786         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2787         lcfg->lcfg_num = convert ? simple_strtoul(tmp, NULL, 0) : 0;
2788         /* truncate the comment to the parameter name */
2789         ptr = tmp - 1;
2790         sep = *ptr;
2791         *ptr = '\0';
2792         /* modify all servers and clients */
2793         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2794                                       *tmp == '\0' ? NULL : lcfg,
2795                                       mti->mti_fsname, sys, 0);
2796         if (rc == 0 && *tmp != '\0') {
2797                 switch (cmd) {
2798                 case LCFG_SET_TIMEOUT:
2799                         if (!obd_timeout_set || lcfg->lcfg_num > obd_timeout)
2800                                 class_process_config(lcfg);
2801                         break;
2802                 case LCFG_SET_LDLM_TIMEOUT:
2803                         if (!ldlm_timeout_set || lcfg->lcfg_num > ldlm_timeout)
2804                                 class_process_config(lcfg);
2805                         break;
2806                 default:
2807                         break;
2808                 }
2809         }
2810         *ptr = sep;
2811         lustre_cfg_free(lcfg);
2812         return rc;
2813 }
2814
2815 /* write quota settings into log */
2816 static int mgs_write_log_quota(const struct lu_env *env, struct mgs_device *mgs,
2817                                struct fs_db *fsdb, struct mgs_target_info *mti,
2818                                char *quota, char *ptr)
2819 {
2820         struct mgs_thread_info  *mgi = mgs_env_info(env);
2821         struct lustre_cfg       *lcfg;
2822         char                    *tmp;
2823         char                     sep;
2824         int                      rc, cmd = LCFG_PARAM;
2825
2826         /* support only 'meta' and 'data' pools so far */
2827         if (class_match_param(ptr, QUOTA_METAPOOL_NAME, &tmp) != 0 &&
2828             class_match_param(ptr, QUOTA_DATAPOOL_NAME, &tmp) != 0) {
2829                 CERROR("parameter quota.%s isn't supported (only quota.mdt "
2830                        "& quota.ost are)\n", ptr);
2831                 return -EINVAL;
2832         }
2833
2834         if (*tmp == '\0') {
2835                 CDEBUG(D_MGS, "global '%s' removed\n", quota);
2836         } else {
2837                 CDEBUG(D_MGS, "global '%s'\n", quota);
2838
2839                 if (strchr(tmp, 'u') == NULL && strchr(tmp, 'g') == NULL &&
2840                     strcmp(tmp, "none") != 0) {
2841                         CERROR("enable option(%s) isn't supported\n", tmp);
2842                         return -EINVAL;
2843                 }
2844         }
2845
2846         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_fsname);
2847         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, quota);
2848         lcfg = lustre_cfg_new(cmd, &mgi->mgi_bufs);
2849         /* truncate the comment to the parameter name */
2850         ptr = tmp - 1;
2851         sep = *ptr;
2852         *ptr = '\0';
2853
2854         /* XXX we duplicated quota enable information in all server
2855          *     config logs, it should be moved to a separate config
2856          *     log once we cleanup the config log for global param. */
2857         /* modify all servers */
2858         rc = mgs_write_log_direct_all(env, mgs, fsdb, mti,
2859                                       *tmp == '\0' ? NULL : lcfg,
2860                                       mti->mti_fsname, quota, 1);
2861         *ptr = sep;
2862         lustre_cfg_free(lcfg);
2863         return rc < 0 ? rc : 0;
2864 }
2865
2866 static int mgs_srpc_set_param_disk(const struct lu_env *env,
2867                                    struct mgs_device *mgs,
2868                                    struct fs_db *fsdb,
2869                                    struct mgs_target_info *mti,
2870                                    char *param)
2871 {
2872         struct mgs_thread_info *mgi = mgs_env_info(env);
2873         struct llog_handle     *llh = NULL;
2874         char                   *logname;
2875         char                   *comment, *ptr;
2876         struct lustre_cfg      *lcfg;
2877         int                     rc, len;
2878         ENTRY;
2879
2880         /* get comment */
2881         ptr = strchr(param, '=');
2882         LASSERT(ptr);
2883         len = ptr - param;
2884
2885         OBD_ALLOC(comment, len + 1);
2886         if (comment == NULL)
2887                 RETURN(-ENOMEM);
2888         strncpy(comment, param, len);
2889         comment[len] = '\0';
2890
2891         /* prepare lcfg */
2892         lustre_cfg_bufs_reset(&mgi->mgi_bufs, mti->mti_svname);
2893         lustre_cfg_bufs_set_string(&mgi->mgi_bufs, 1, param);
2894         lcfg = lustre_cfg_new(LCFG_SPTLRPC_CONF, &mgi->mgi_bufs);
2895         if (lcfg == NULL)
2896                 GOTO(out_comment, rc = -ENOMEM);
2897
2898         /* construct log name */
2899         rc = name_create(&logname, mti->mti_fsname, "-sptlrpc");
2900         if (rc)
2901                 GOTO(out_lcfg, rc);
2902
2903         if (mgs_log_is_empty(env, mgs, logname)) {
2904                 rc = record_start_log(env, mgs, &llh, logname);
2905                 if (rc)
2906                         GOTO(out, rc);
2907                 record_end_log(env, &llh);
2908         }
2909
2910         /* obsolete old one */
2911         rc = mgs_modify(env, mgs, fsdb, mti, logname, mti->mti_svname,
2912                         comment, CM_SKIP);
2913         if (rc < 0)
2914                 GOTO(out, rc);
2915         /* write the new one */
2916         rc = mgs_write_log_direct(env, mgs, fsdb, logname, lcfg,
2917                                   mti->mti_svname, comment);
2918         if (rc)
2919                 CERROR("err %d writing log %s\n", rc, logname);
2920 out:
2921         name_destroy(&logname);
2922 out_lcfg:
2923         lustre_cfg_free(lcfg);
2924 out_comment:
2925         OBD_FREE(comment, len + 1);
2926         RETURN(rc);
2927 }
2928
2929 static int mgs_srpc_set_param_udesc_mem(struct fs_db *fsdb,
2930                                         char *param)
2931 {
2932         char    *ptr;
2933
2934         /* disable the adjustable udesc parameter for now, i.e. use default
2935          * setting that client always ship udesc to MDT if possible. to enable
2936          * it simply remove the following line */
2937         goto error_out;
2938
2939         ptr = strchr(param, '=');
2940         if (ptr == NULL)
2941                 goto error_out;
2942         *ptr++ = '\0';
2943
2944         if (strcmp(param, PARAM_SRPC_UDESC))
2945                 goto error_out;
2946
2947         if (strcmp(ptr, "yes") == 0) {
2948                 set_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2949                 CWARN("Enable user descriptor shipping from client to MDT\n");
2950         } else if (strcmp(ptr, "no") == 0) {
2951                 clear_bit(FSDB_UDESC, &fsdb->fsdb_flags);
2952                 CWARN("Disable user descriptor shipping from client to MDT\n");
2953         } else {
2954                 *(ptr - 1) = '=';
2955                 goto error_out;
2956         }
2957         return 0;
2958
2959 error_out:
2960         CERROR("Invalid param: %s\n", param);
2961         return -EINVAL;
2962 }
2963
2964 static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
2965                                   const char *svname,
2966                                   char *param)
2967 {
2968         struct sptlrpc_rule      rule;
2969         struct sptlrpc_rule_set *rset;
2970         int                      rc;
2971         ENTRY;
2972
2973         if (strncmp(param, PARAM_SRPC, sizeof(PARAM_SRPC) - 1) != 0) {
2974                 CERROR("Invalid sptlrpc parameter: %s\n", param);
2975                 RETURN(-EINVAL);
2976         }
2977
2978         if (strncmp(param, PARAM_SRPC_UDESC,
2979                     sizeof(PARAM_SRPC_UDESC) - 1) == 0) {
2980                 RETURN(mgs_srpc_set_param_udesc_mem(fsdb, param));
2981         }
2982
2983         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
2984                 CERROR("Invalid sptlrpc flavor parameter: %s\n", param);
2985                 RETURN(-EINVAL);
2986         }
2987
2988         param += sizeof(PARAM_SRPC_FLVR) - 1;
2989
2990         rc = sptlrpc_parse_rule(param, &rule);
2991         if (rc)
2992                 RETURN(rc);
2993
2994         /* mgs rules implies must be mgc->mgs */
2995         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
2996                 if ((rule.sr_from != LUSTRE_SP_MGC &&
2997                      rule.sr_from != LUSTRE_SP_ANY) ||
2998                     (rule.sr_to != LUSTRE_SP_MGS &&
2999                      rule.sr_to != LUSTRE_SP_ANY))
3000                         RETURN(-EINVAL);
3001         }
3002
3003         /* preapre room for this coming rule. svcname format should be:
3004          * - fsname: general rule
3005          * - fsname-tgtname: target-specific rule
3006          */
3007         if (strchr(svname, '-')) {
3008                 struct mgs_tgt_srpc_conf *tgtconf;
3009                 int                       found = 0;
3010
3011                 for (tgtconf = fsdb->fsdb_srpc_tgt; tgtconf != NULL;
3012                      tgtconf = tgtconf->mtsc_next) {
3013                         if (!strcmp(tgtconf->mtsc_tgt, svname)) {
3014                                 found = 1;
3015                                 break;
3016                         }
3017                 }
3018
3019                 if (!found) {
3020                         int name_len;
3021
3022                         OBD_ALLOC_PTR(tgtconf);
3023                         if (tgtconf == NULL)
3024                                 RETURN(-ENOMEM);
3025
3026                         name_len = strlen(svname);
3027
3028                         OBD_ALLOC(tgtconf->mtsc_tgt, name_len + 1);
3029                         if (tgtconf->mtsc_tgt == NULL) {
3030                                 OBD_FREE_PTR(tgtconf);
3031                                 RETURN(-ENOMEM);
3032                         }
3033                         memcpy(tgtconf->mtsc_tgt, svname, name_len);
3034
3035                         tgtconf->mtsc_next = fsdb->fsdb_srpc_tgt;
3036                         fsdb->fsdb_srpc_tgt = tgtconf;
3037                 }
3038
3039                 rset = &tgtconf->mtsc_rset;
3040         } else {
3041                 rset = &fsdb->fsdb_srpc_gen;
3042         }
3043
3044         rc = sptlrpc_rule_set_merge(rset, &rule);
3045
3046         RETURN(rc);
3047 }
3048
3049 static int mgs_srpc_set_param(const struct lu_env *env,
3050                               struct mgs_device *mgs,
3051                               struct fs_db *fsdb,
3052                               struct mgs_target_info *mti,
3053                               char *param)
3054 {
3055         char                   *copy;
3056         int                     rc, copy_size;
3057         ENTRY;
3058
3059 #ifndef HAVE_GSS
3060         RETURN(-EINVAL);
3061 #endif
3062         /* keep a copy of original param, which could be destroied
3063          * during parsing */
3064         copy_size = strlen(param) + 1;
3065         OBD_ALLOC(copy, copy_size);
3066         if (copy == NULL)
3067                 return -ENOMEM;
3068         memcpy(copy, param, copy_size);
3069
3070         rc = mgs_srpc_set_param_mem(fsdb, mti->mti_svname, param);
3071         if (rc)
3072                 goto out_free;
3073
3074         /* previous steps guaranteed the syntax is correct */
3075         rc = mgs_srpc_set_param_disk(env, mgs, fsdb, mti, copy);
3076         if (rc)
3077                 goto out_free;
3078
3079         if (test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
3080                 /*
3081                  * for mgs rules, make them effective immediately.
3082                  */
3083                 LASSERT(fsdb->fsdb_srpc_tgt == NULL);
3084                 sptlrpc_target_update_exp_flavor(mgs->mgs_obd,
3085                                                  &fsdb->fsdb_srpc_gen);
3086         }
3087
3088 out_free:
3089         OBD_FREE(copy, copy_size);
3090         RETURN(rc);
3091 }
3092
3093 struct mgs_srpc_read_data {
3094         struct fs_db   *msrd_fsdb;
3095         int             msrd_skip;
3096 };
3097
3098 static int mgs_srpc_read_handler(const struct lu_env *env,
3099                                  struct llog_handle *llh,
3100                                  struct llog_rec_hdr *rec, void *data)
3101 {
3102         struct mgs_srpc_read_data *msrd = data;
3103         struct cfg_marker         *marker;
3104         struct lustre_cfg         *lcfg = REC_DATA(rec);
3105         char                      *svname, *param;
3106         int                        cfg_len, rc;
3107         ENTRY;
3108
3109         if (rec->lrh_type != OBD_CFG_REC) {
3110                 CERROR("unhandled lrh_type: %#x\n", rec->lrh_type);
3111                 RETURN(-EINVAL);
3112         }
3113
3114         cfg_len = rec->lrh_len - sizeof(struct llog_rec_hdr) -
3115                   sizeof(struct llog_rec_tail);
3116
3117         rc = lustre_cfg_sanity_check(lcfg, cfg_len);
3118         if (rc) {
3119                 CERROR("Insane cfg\n");
3120                 RETURN(rc);
3121         }
3122
3123         if (lcfg->lcfg_command == LCFG_MARKER) {
3124                 marker = lustre_cfg_buf(lcfg, 1);
3125
3126                 if (marker->cm_flags & CM_START &&
3127                     marker->cm_flags & CM_SKIP)
3128                         msrd->msrd_skip = 1;
3129                 if (marker->cm_flags & CM_END)
3130                         msrd->msrd_skip = 0;
3131
3132                 RETURN(0);
3133         }
3134
3135         if (msrd->msrd_skip)
3136                 RETURN(0);
3137
3138         if (lcfg->lcfg_command != LCFG_SPTLRPC_CONF) {
3139                 CERROR("invalid command (%x)\n", lcfg->lcfg_command);
3140                 RETURN(0);
3141         }
3142
3143         svname = lustre_cfg_string(lcfg, 0);
3144         if (svname == NULL) {
3145                 CERROR("svname is empty\n");
3146                 RETURN(0);
3147         }
3148
3149         param = lustre_cfg_string(lcfg, 1);
3150         if (param == NULL) {
3151                 CERROR("param is empty\n");
3152                 RETURN(0);
3153         }
3154
3155         rc = mgs_srpc_set_param_mem(msrd->msrd_fsdb, svname, param);
3156         if (rc)
3157                 CERROR("read sptlrpc record error (%d): %s\n", rc, param);
3158
3159         RETURN(0);
3160 }
3161
3162 int mgs_get_fsdb_srpc_from_llog(const struct lu_env *env,
3163                                 struct mgs_device *mgs,
3164                                 struct fs_db *fsdb)
3165 {
3166         struct llog_handle        *llh = NULL;
3167         struct llog_ctxt          *ctxt;
3168         char                      *logname;
3169         struct mgs_srpc_read_data  msrd;
3170         int                        rc;
3171         ENTRY;
3172
3173         /* construct log name */
3174         rc = name_create(&logname, fsdb->fsdb_name, "-sptlrpc");
3175         if (rc)
3176                 RETURN(rc);
3177
3178         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3179         LASSERT(ctxt != NULL);
3180
3181         if (mgs_log_is_empty(env, mgs, logname))
3182                 GOTO(out, rc = 0);
3183
3184         rc = llog_open(env, ctxt, &llh, NULL, logname,
3185                        LLOG_OPEN_EXISTS);
3186         if (rc < 0) {
3187                 if (rc == -ENOENT)
3188                         rc = 0;
3189                 GOTO(out, rc);
3190         }
3191
3192         rc = llog_init_handle(env, llh, LLOG_F_IS_PLAIN, NULL);
3193         if (rc)
3194                 GOTO(out_close, rc);
3195
3196         if (llog_get_size(llh) <= 1)
3197                 GOTO(out_close, rc = 0);
3198
3199         msrd.msrd_fsdb = fsdb;
3200         msrd.msrd_skip = 0;
3201
3202         rc = llog_process(env, llh, mgs_srpc_read_handler, (void *)&msrd,
3203                           NULL);
3204
3205 out_close:
3206         llog_close(env, llh);
3207 out:
3208         llog_ctxt_put(ctxt);
3209         name_destroy(&logname);
3210
3211         if (rc)
3212                 CERROR("failed to read sptlrpc config database: %d\n", rc);
3213         RETURN(rc);
3214 }
3215
3216 /* Permanent settings of all parameters by writing into the appropriate
3217  * configuration logs.
3218  * A parameter with null value ("<param>='\0'") means to erase it out of
3219  * the logs.
3220  */
3221 static int mgs_write_log_param(const struct lu_env *env,
3222                                struct mgs_device *mgs, struct fs_db *fsdb,
3223                                struct mgs_target_info *mti, char *ptr)
3224 {
3225         struct mgs_thread_info *mgi = mgs_env_info(env);
3226         char *logname;
3227         char *tmp;
3228         int rc = 0, rc2 = 0;
3229         ENTRY;
3230
3231         /* For various parameter settings, we have to figure out which logs
3232            care about them (e.g. both mdt and client for lov settings) */
3233         CDEBUG(D_MGS, "next param '%s'\n", ptr);
3234
3235         /* The params are stored in MOUNT_DATA_FILE and modified via
3236            tunefs.lustre, or set using lctl conf_param */
3237
3238         /* Processed in lustre_start_mgc */
3239         if (class_match_param(ptr, PARAM_MGSNODE, NULL) == 0)
3240                 GOTO(end, rc);
3241
3242         /* Processed in ost/mdt */
3243         if (class_match_param(ptr, PARAM_NETWORK, NULL) == 0)
3244                 GOTO(end, rc);
3245
3246         /* Processed in mgs_write_log_ost */
3247         if (class_match_param(ptr, PARAM_FAILMODE, NULL) == 0) {
3248                 if (mti->mti_flags & LDD_F_PARAM) {
3249                         LCONSOLE_ERROR_MSG(0x169, "%s can only be "
3250                                            "changed with tunefs.lustre"
3251                                            "and --writeconf\n", ptr);
3252                         rc = -EPERM;
3253                 }
3254                 GOTO(end, rc);
3255         }
3256
3257         if (class_match_param(ptr, PARAM_SRPC, NULL) == 0) {
3258                 rc = mgs_srpc_set_param(env, mgs, fsdb, mti, ptr);
3259                 GOTO(end, rc);
3260         }
3261
3262         if (class_match_param(ptr, PARAM_FAILNODE, NULL) == 0) {
3263                 /* Add a failover nidlist */
3264                 rc = 0;
3265                 /* We already processed failovers params for new
3266                    targets in mgs_write_log_target */
3267                 if (mti->mti_flags & LDD_F_PARAM) {
3268                         CDEBUG(D_MGS, "Adding failnode\n");
3269                         rc = mgs_write_log_add_failnid(env, mgs, fsdb, mti);
3270                 }
3271                 GOTO(end, rc);
3272         }
3273
3274         if (class_match_param(ptr, PARAM_SYS, &tmp) == 0) {
3275                 rc = mgs_write_log_sys(env, mgs, fsdb, mti, ptr, tmp);
3276                 GOTO(end, rc);
3277         }
3278
3279         if (class_match_param(ptr, PARAM_QUOTA, &tmp) == 0) {
3280                 rc = mgs_write_log_quota(env, mgs, fsdb, mti, ptr, tmp);
3281                 GOTO(end, rc);
3282         }
3283
3284         if (class_match_param(ptr, PARAM_OSC""PARAM_ACTIVE, &tmp) == 0) {
3285                 /* active=0 means off, anything else means on */
3286                 int flag = (*tmp == '0') ? CM_EXCLUDE : 0;
3287                 int i;
3288
3289                 if (!(mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3290                         LCONSOLE_ERROR_MSG(0x144, "%s: Only OSCs can "
3291                                            "be (de)activated.\n",
3292                                            mti->mti_svname);
3293                         GOTO(end, rc = -EINVAL);
3294                 }
3295                 LCONSOLE_WARN("Permanently %sactivating %s\n",
3296                               flag ? "de": "re", mti->mti_svname);
3297                 /* Modify clilov */
3298                 rc = name_create(&logname, mti->mti_fsname, "-client");
3299                 if (rc)
3300                         GOTO(end, rc);
3301                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3302                                 mti->mti_svname, "add osc", flag);
3303                 name_destroy(&logname);
3304                 if (rc)
3305                         goto active_err;
3306                 /* Modify mdtlov */
3307                 /* Add to all MDT logs for CMD */
3308                 for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3309                         if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3310                                 continue;
3311                         rc = name_create_mdt(&logname, mti->mti_fsname, i);
3312                         if (rc)
3313                                 GOTO(end, rc);
3314                         rc = mgs_modify(env, mgs, fsdb, mti, logname,
3315                                         mti->mti_svname, "add osc", flag);
3316                         name_destroy(&logname);
3317                         if (rc)
3318                                 goto active_err;
3319                 }
3320         active_err:
3321                 if (rc) {
3322                         LCONSOLE_ERROR_MSG(0x145, "Couldn't find %s in"
3323                                            "log (%d). No permanent "
3324                                            "changes were made to the "
3325                                            "config log.\n",
3326                                            mti->mti_svname, rc);
3327                         if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags))
3328                                 LCONSOLE_ERROR_MSG(0x146, "This may be"
3329                                                    " because the log"
3330                                                    "is in the old 1.4"
3331                                                    "style. Consider "
3332                                                    " --writeconf to "
3333                                                    "update the logs.\n");
3334                         GOTO(end, rc);
3335                 }
3336                 /* Fall through to osc proc for deactivating live OSC
3337                    on running MDT / clients. */
3338         }
3339         /* Below here, let obd's XXX_process_config methods handle it */
3340
3341         /* All lov. in proc */
3342         if (class_match_param(ptr, PARAM_LOV, NULL) == 0) {
3343                 char *mdtlovname;
3344
3345                 CDEBUG(D_MGS, "lov param %s\n", ptr);
3346                 if (!(mti->mti_flags & LDD_F_SV_TYPE_MDT)) {
3347                         LCONSOLE_ERROR_MSG(0x147, "LOV params must be "
3348                                            "set on the MDT, not %s. "
3349                                            "Ignoring.\n",
3350                                            mti->mti_svname);
3351                         GOTO(end, rc = 0);
3352                 }
3353
3354                 /* Modify mdtlov */
3355                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3356                         GOTO(end, rc = -ENODEV);
3357
3358                 rc = name_create_mdt_and_lov(&logname, &mdtlovname, fsdb,
3359                                              mti->mti_stripe_index);
3360                 if (rc)
3361                         GOTO(end, rc);
3362                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3363                                   &mgi->mgi_bufs, mdtlovname, ptr);
3364                 name_destroy(&logname);
3365                 name_destroy(&mdtlovname);
3366                 if (rc)
3367                         GOTO(end, rc);
3368
3369                 /* Modify clilov */
3370                 rc = name_create(&logname, mti->mti_fsname, "-client");
3371                 if (rc)
3372                         GOTO(end, rc);
3373                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3374                                   fsdb->fsdb_clilov, ptr);
3375                 name_destroy(&logname);
3376                 GOTO(end, rc);
3377         }
3378
3379         /* All osc., mdc., llite. params in proc */
3380         if ((class_match_param(ptr, PARAM_OSC, NULL) == 0) ||
3381             (class_match_param(ptr, PARAM_MDC, NULL) == 0) ||
3382             (class_match_param(ptr, PARAM_LLITE, NULL) == 0)) {
3383                 char *cname;
3384
3385                 if (test_bit(FSDB_OLDLOG14, &fsdb->fsdb_flags)) {
3386                         LCONSOLE_ERROR_MSG(0x148, "Upgraded client logs for %s"
3387                                            " cannot be modified. Consider"
3388                                            " updating the configuration with"
3389                                            " --writeconf\n",
3390                                            mti->mti_svname);
3391                         GOTO(end, rc = -EINVAL);
3392                 }
3393                 if (memcmp(ptr, PARAM_LLITE, strlen(PARAM_LLITE)) == 0) {
3394                         rc = name_create(&cname, mti->mti_fsname, "-client");
3395                         /* Add the client type to match the obdname in
3396                            class_config_llog_handler */
3397                 } else if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3398                         rc = name_create(&cname, mti->mti_svname, "-mdc");
3399                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3400                         rc = name_create(&cname, mti->mti_svname, "-osc");
3401                 } else {
3402                         GOTO(end, rc = -EINVAL);
3403                 }
3404                 if (rc)
3405                         GOTO(end, rc);
3406
3407                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3408
3409                 /* Modify client */
3410                 rc = name_create(&logname, mti->mti_fsname, "-client");
3411                 if (rc) {
3412                         name_destroy(&cname);
3413                         GOTO(end, rc);
3414                 }
3415                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, logname, &mgi->mgi_bufs,
3416                                   cname, ptr);
3417
3418                 /* osc params affect the MDT as well */
3419                 if (!rc && (mti->mti_flags & LDD_F_SV_TYPE_OST)) {
3420                         int i;
3421
3422                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
3423                                 if (!test_bit(i, fsdb->fsdb_mdt_index_map))
3424                                         continue;
3425                                 name_destroy(&cname);
3426                                 rc = name_create_mdt_osc(&cname, mti->mti_svname,
3427                                                          fsdb, i);
3428                                 name_destroy(&logname);
3429                                 if (rc)
3430                                         break;
3431                                 rc = name_create_mdt(&logname,
3432                                                      mti->mti_fsname, i);
3433                                 if (rc)
3434                                         break;
3435                                 if (!mgs_log_is_empty(env, mgs, logname)) {
3436                                         rc = mgs_wlp_lcfg(env, mgs, fsdb,
3437                                                           mti, logname,
3438                                                           &mgi->mgi_bufs,
3439                                                           cname, ptr);
3440                                         if (rc)
3441                                                 break;
3442                                 }
3443                         }
3444                 }
3445                 name_destroy(&logname);
3446                 name_destroy(&cname);
3447                 GOTO(end, rc);
3448         }
3449
3450         /* All mdt. params in proc */
3451         if (class_match_param(ptr, PARAM_MDT, NULL) == 0) {
3452                 int i;
3453                 __u32 idx;
3454
3455                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3456                 if (strncmp(mti->mti_svname, mti->mti_fsname,
3457                             MTI_NAME_MAXLEN) == 0)
3458                         /* device is unspecified completely? */
3459                         rc = LDD_F_SV_TYPE_MDT | LDD_F_SV_ALL;
3460                 else
3461                         rc = server_name2index(mti->mti_svname, &idx, NULL);
3462                 if (rc < 0)
3463                         goto active_err;
3464                 if ((rc & LDD_F_SV_TYPE_MDT) == 0)
3465                         goto active_err;
3466                 if (rc & LDD_F_SV_ALL) {
3467                         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3468                                 if (!test_bit(i,
3469                                                   fsdb->fsdb_mdt_index_map))
3470                                         continue;
3471                                 rc = name_create_mdt(&logname,
3472                                                 mti->mti_fsname, i);
3473                                 if (rc)
3474                                         goto active_err;
3475                                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3476                                                   logname, &mgi->mgi_bufs,
3477                                                   logname, ptr);
3478                                 name_destroy(&logname);
3479                                 if (rc)
3480                                         goto active_err;
3481                         }
3482                 } else {
3483                         rc = mgs_wlp_lcfg(env, mgs, fsdb, mti,
3484                                           mti->mti_svname, &mgi->mgi_bufs,
3485                                           mti->mti_svname, ptr);
3486                         if (rc)
3487                                 goto active_err;
3488                 }
3489                 GOTO(end, rc);
3490         }
3491
3492         /* All mdd., ost. params in proc */
3493         if ((class_match_param(ptr, PARAM_MDD, NULL) == 0) ||
3494             (class_match_param(ptr, PARAM_OST, NULL) == 0)) {
3495                 CDEBUG(D_MGS, "%.3s param %s\n", ptr, ptr + 4);
3496                 if (mgs_log_is_empty(env, mgs, mti->mti_svname))
3497                         GOTO(end, rc = -ENODEV);
3498
3499                 rc = mgs_wlp_lcfg(env, mgs, fsdb, mti, mti->mti_svname,
3500                                   &mgi->mgi_bufs, mti->mti_svname, ptr);
3501                 GOTO(end, rc);
3502         }
3503
3504         LCONSOLE_WARN("Ignoring unrecognized param '%s'\n", ptr);
3505         rc2 = -ENOSYS;
3506
3507 end:
3508         if (rc)
3509                 CERROR("err %d on param '%s'\n", rc, ptr);
3510
3511         RETURN(rc ?: rc2);
3512 }
3513
3514 /* Not implementing automatic failover nid addition at this time. */
3515 int mgs_check_failnid(const struct lu_env *env, struct mgs_device *mgs,
3516                       struct mgs_target_info *mti)
3517 {
3518 #if 0
3519         struct fs_db *fsdb;
3520         int rc;
3521         ENTRY;
3522
3523         rc = mgs_find_or_make_fsdb(obd, fsname, &fsdb);
3524         if (rc)
3525                 RETURN(rc);
3526
3527         if (mgs_log_is_empty(obd, mti->mti_svname))
3528                 /* should never happen */
3529                 RETURN(-ENOENT);
3530
3531         CDEBUG(D_MGS, "Checking for new failnids for %s\n", mti->mti_svname);
3532
3533         /* FIXME We can just check mti->params to see if we're already in
3534            the failover list.  Modify mti->params for rewriting back at
3535            server_register_target(). */
3536
3537         mutex_lock(&fsdb->fsdb_mutex);
3538         rc = mgs_write_log_add_failnid(obd, fsdb, mti);
3539         mutex_unlock(&fsdb->fsdb_mutex);
3540
3541         RETURN(rc);
3542 #endif
3543         return 0;
3544 }
3545
3546 int mgs_write_log_target(const struct lu_env *env,
3547                          struct mgs_device *mgs,
3548                          struct mgs_target_info *mti,
3549                          struct fs_db *fsdb)
3550 {
3551         int rc = -EINVAL;
3552         char *buf, *params;
3553         ENTRY;
3554
3555         /* set/check the new target index */
3556         rc = mgs_set_index(env, mgs, mti);
3557         if (rc < 0) {
3558                 CERROR("Can't get index (%d)\n", rc);
3559                 RETURN(rc);
3560         }
3561
3562         if (rc == EALREADY) {
3563                 LCONSOLE_WARN("Found index %d for %s, updating log\n",
3564                               mti->mti_stripe_index, mti->mti_svname);
3565                 /* We would like to mark old log sections as invalid
3566                    and add new log sections in the client and mdt logs.
3567                    But if we add new sections, then live clients will
3568                    get repeat setup instructions for already running
3569                    osc's. So don't update the client/mdt logs. */
3570                 mti->mti_flags &= ~LDD_F_UPDATE;
3571                 rc = 0;
3572         }
3573
3574         mutex_lock(&fsdb->fsdb_mutex);
3575
3576         if (mti->mti_flags &
3577             (LDD_F_VIRGIN | LDD_F_UPGRADE14 | LDD_F_WRITECONF)) {
3578                 /* Generate a log from scratch */
3579                 if (mti->mti_flags & LDD_F_SV_TYPE_MDT) {
3580                         rc = mgs_write_log_mdt(env, mgs, fsdb, mti);
3581                 } else if (mti->mti_flags & LDD_F_SV_TYPE_OST) {
3582                         rc = mgs_write_log_ost(env, mgs, fsdb, mti);
3583                 } else {
3584                         CERROR("Unknown target type %#x, can't create log for "
3585                                "%s\n", mti->mti_flags, mti->mti_svname);
3586                 }
3587                 if (rc) {
3588                         CERROR("Can't write logs for %s (%d)\n",
3589                                mti->mti_svname, rc);
3590                         GOTO(out_up, rc);
3591                 }
3592         } else {
3593                 /* Just update the params from tunefs in mgs_write_log_params */
3594                 CDEBUG(D_MGS, "Update params for %s\n", mti->mti_svname);
3595                 mti->mti_flags |= LDD_F_PARAM;
3596         }
3597
3598         /* allocate temporary buffer, where class_get_next_param will
3599            make copy of a current  parameter */
3600         OBD_ALLOC(buf, strlen(mti->mti_params) + 1);
3601         if (buf == NULL)
3602                 GOTO(out_up, rc = -ENOMEM);
3603         params = mti->mti_params;
3604         while (params != NULL) {
3605                 rc = class_get_next_param(&params, buf);
3606                 if (rc) {
3607                         if (rc == 1)
3608                                 /* there is no next parameter, that is
3609                                    not an error */
3610                                 rc = 0;
3611                         break;
3612                 }
3613                 CDEBUG(D_MGS, "remaining string: '%s', param: '%s'\n",
3614                        params, buf);
3615                 rc = mgs_write_log_param(env, mgs, fsdb, mti, buf);
3616                 if (rc)
3617                         break;
3618         }
3619
3620         OBD_FREE(buf, strlen(mti->mti_params) + 1);
3621
3622 out_up:
3623         mutex_unlock(&fsdb->fsdb_mutex);
3624         RETURN(rc);
3625 }
3626
3627 int mgs_erase_log(const struct lu_env *env, struct mgs_device *mgs, char *name)
3628 {
3629         struct llog_ctxt        *ctxt;
3630         int                      rc = 0;
3631
3632         ctxt = llog_get_context(mgs->mgs_obd, LLOG_CONFIG_ORIG_CTXT);
3633         if (ctxt == NULL) {
3634                 CERROR("%s: MGS config context doesn't exist\n",
3635                        mgs->mgs_obd->obd_name);
3636                 rc = -ENODEV;
3637         } else {
3638                 rc = llog_erase(env, ctxt, NULL, name);
3639                 /* llog may not exist */
3640                 if (rc == -ENOENT)
3641                         rc = 0;
3642                 llog_ctxt_put(ctxt);
3643         }
3644
3645         if (rc)
3646                 CERROR("%s: failed to clear log %s: %d\n",
3647                        mgs->mgs_obd->obd_name, name, rc);
3648
3649         return rc;
3650 }
3651
3652 /* erase all logs for the given fs */
3653 int mgs_erase_logs(const struct lu_env *env, struct mgs_device *mgs, char *fsname)
3654 {
3655         struct fs_db *fsdb;
3656         cfs_list_t list;
3657         struct mgs_direntry *dirent, *n;
3658         int rc, len = strlen(fsname);
3659         char *suffix;
3660         ENTRY;
3661
3662         /* Find all the logs in the CONFIGS directory */
3663         rc = class_dentry_readdir(env, mgs, &list);
3664         if (rc)
3665                 RETURN(rc);
3666
3667         mutex_lock(&mgs->mgs_mutex);
3668
3669         /* Delete the fs db */
3670         fsdb = mgs_find_fsdb(mgs, fsname);
3671         if (fsdb)
3672                 mgs_free_fsdb(mgs, fsdb);
3673
3674         mutex_unlock(&mgs->mgs_mutex);
3675
3676         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3677                 cfs_list_del(&dirent->list);
3678                 suffix = strrchr(dirent->name, '-');
3679                 if (suffix != NULL) {
3680                         if ((len == suffix - dirent->name) &&
3681                             (strncmp(fsname, dirent->name, len) == 0)) {
3682                                 CDEBUG(D_MGS, "Removing log %s\n",
3683                                        dirent->name);
3684                                 mgs_erase_log(env, mgs, dirent->name);
3685                         }
3686                 }
3687                 mgs_direntry_free(dirent);
3688         }
3689
3690         RETURN(rc);
3691 }
3692
3693 /* list all logs for the given fs */
3694 int mgs_list_logs(const struct lu_env *env, struct mgs_device *mgs,
3695                   struct obd_ioctl_data *data)
3696 {
3697         cfs_list_t               list;
3698         struct mgs_direntry     *dirent, *n;
3699         char                    *out, *suffix;
3700         int                      l, remains, rc;
3701
3702         ENTRY;
3703
3704         /* Find all the logs in the CONFIGS directory */
3705         rc = class_dentry_readdir(env, mgs, &list);
3706         if (rc) {
3707                 CERROR("%s: can't read %s dir = %d\n",
3708                        mgs->mgs_obd->obd_name, MOUNT_CONFIGS_DIR, rc);
3709                 RETURN(rc);
3710         }
3711
3712         out = data->ioc_bulk;
3713         remains = data->ioc_inllen1;
3714         cfs_list_for_each_entry_safe(dirent, n, &list, list) {
3715                 cfs_list_del(&dirent->list);
3716                 suffix = strrchr(dirent->name, '-');
3717                 if (suffix != NULL) {
3718                         l = snprintf(out, remains, "config log: $%s\n",
3719                                      dirent->name);
3720                         out += l;
3721                         remains -= l;
3722                 }
3723                 mgs_direntry_free(dirent);
3724                 if (remains <= 0)
3725                         break;
3726         }
3727         RETURN(rc);
3728 }
3729
3730 /* from llog_swab */
3731 static void print_lustre_cfg(struct lustre_cfg *lcfg)
3732 {
3733         int i;
3734         ENTRY;
3735
3736         CDEBUG(D_MGS, "lustre_cfg: %p\n", lcfg);
3737         CDEBUG(D_MGS, "\tlcfg->lcfg_version: %#x\n", lcfg->lcfg_version);
3738
3739         CDEBUG(D_MGS, "\tlcfg->lcfg_command: %#x\n", lcfg->lcfg_command);
3740         CDEBUG(D_MGS, "\tlcfg->lcfg_num: %#x\n", lcfg->lcfg_num);
3741         CDEBUG(D_MGS, "\tlcfg->lcfg_flags: %#x\n", lcfg->lcfg_flags);
3742         CDEBUG(D_MGS, "\tlcfg->lcfg_nid: %s\n", libcfs_nid2str(lcfg->lcfg_nid));
3743
3744         CDEBUG(D_MGS, "\tlcfg->lcfg_bufcount: %d\n", lcfg->lcfg_bufcount);
3745         if (lcfg->lcfg_bufcount < LUSTRE_CFG_MAX_BUFCOUNT)
3746                 for (i = 0; i < lcfg->lcfg_bufcount; i++) {
3747                         CDEBUG(D_MGS, "\tlcfg->lcfg_buflens[%d]: %d %s\n",
3748                                i, lcfg->lcfg_buflens[i],
3749                                lustre_cfg_string(lcfg, i));
3750                 }
3751         EXIT;
3752 }
3753
3754 /* Set a permanent (config log) param for a target or fs
3755  * \param lcfg buf0 may contain the device (testfs-MDT0000) name
3756  *             buf1 contains the single parameter
3757  */
3758 int mgs_setparam(const struct lu_env *env, struct mgs_device *mgs,
3759                  struct lustre_cfg *lcfg, char *fsname)
3760 {
3761         struct fs_db *fsdb;
3762         struct mgs_target_info *mti;
3763         char *devname, *param;
3764         char *ptr;
3765         const char *tmp;
3766         __u32 index;
3767         int rc = 0;
3768         ENTRY;
3769
3770         print_lustre_cfg(lcfg);
3771
3772         /* lustre, lustre-mdtlov, lustre-client, lustre-MDT0000 */
3773         devname = lustre_cfg_string(lcfg, 0);
3774         param = lustre_cfg_string(lcfg, 1);
3775         if (!devname) {
3776                 /* Assume device name embedded in param:
3777                    lustre-OST0000.osc.max_dirty_mb=32 */
3778                 ptr = strchr(param, '.');
3779                 if (ptr) {
3780                         devname = param;
3781                         *ptr = 0;
3782                         param = ptr + 1;
3783                 }
3784         }
3785         if (!devname) {
3786                 LCONSOLE_ERROR_MSG(0x14d, "No target specified: %s\n", param);
3787                 RETURN(-ENOSYS);
3788         }
3789
3790         rc = mgs_parse_devname(devname, fsname, NULL);
3791         if (rc == 0 && !mgs_parse_devname(devname, NULL, &index)) {
3792                 /* param related to llite isn't allowed to set by OST or MDT */
3793                 if (rc == 0 && strncmp(param, PARAM_LLITE,
3794                                    sizeof(PARAM_LLITE)) == 0)
3795                         RETURN(-EINVAL);
3796         } else {
3797                 /* assume devname is the fsname */
3798                 memset(fsname, 0, MTI_NAME_MAXLEN);
3799                 strncpy(fsname, devname, MTI_NAME_MAXLEN);
3800                 fsname[MTI_NAME_MAXLEN - 1] = 0;
3801         }
3802         CDEBUG(D_MGS, "setparam fs='%s' device='%s'\n", fsname, devname);
3803
3804         rc = mgs_find_or_make_fsdb(env, mgs,
3805                                    lcfg->lcfg_command == LCFG_SET_PARAM ?
3806                                    PARAMS_FILENAME : fsname, &fsdb);
3807         if (rc)
3808                 RETURN(rc);
3809
3810         if (lcfg->lcfg_command != LCFG_SET_PARAM &&
3811             !test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
3812             test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3813                 CERROR("No filesystem targets for %s.  cfg_device from lctl "
3814                        "is '%s'\n", fsname, devname);
3815                 mgs_free_fsdb(mgs, fsdb);
3816                 RETURN(-EINVAL);
3817         }
3818
3819         /* Create a fake mti to hold everything */
3820         OBD_ALLOC_PTR(mti);
3821         if (!mti)
3822                 GOTO(out, rc = -ENOMEM);
3823         if (strlcpy(mti->mti_fsname, fsname, sizeof(mti->mti_fsname))
3824             >= sizeof(mti->mti_fsname))
3825                 GOTO(out, rc = -E2BIG);
3826         if (strlcpy(mti->mti_svname, devname, sizeof(mti->mti_svname))
3827             >= sizeof(mti->mti_svname))
3828                 GOTO(out, rc = -E2BIG);
3829         if (strlcpy(mti->mti_params, param, sizeof(mti->mti_params))
3830             >= sizeof(mti->mti_params))
3831                 GOTO(out, rc = -E2BIG);
3832         rc = server_name2index(mti->mti_svname, &mti->mti_stripe_index, &tmp);
3833         if (rc < 0)
3834                 /* Not a valid server; may be only fsname */
3835                 rc = 0;
3836         else
3837                 /* Strip -osc or -mdc suffix from svname */
3838                 if (server_make_name(rc, mti->mti_stripe_index, mti->mti_fsname,
3839                                      mti->mti_svname))
3840                         GOTO(out, rc = -EINVAL);
3841         /*
3842          * Revoke lock so everyone updates.  Should be alright if
3843          * someone was already reading while we were updating the logs,
3844          * so we don't really need to hold the lock while we're
3845          * writing (above).
3846          */
3847         if (lcfg->lcfg_command == LCFG_SET_PARAM) {
3848                 mti->mti_flags = rc | LDD_F_PARAM2;
3849                 mutex_lock(&fsdb->fsdb_mutex);
3850                 rc = mgs_write_log_param2(env, mgs, fsdb, mti, mti->mti_params);
3851                 mutex_unlock(&fsdb->fsdb_mutex);
3852                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_PARAMS);
3853         } else {
3854                 mti->mti_flags = rc | LDD_F_PARAM;
3855                 mutex_lock(&fsdb->fsdb_mutex);
3856                 rc = mgs_write_log_param(env, mgs, fsdb, mti, mti->mti_params);
3857                 mutex_unlock(&fsdb->fsdb_mutex);
3858                 mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
3859         }
3860
3861 out:
3862         OBD_FREE_PTR(mti);
3863         RETURN(rc);
3864 }
3865
3866 static int mgs_write_log_pool(const struct lu_env *env,
3867                               struct mgs_device *mgs, char *logname,
3868                               struct fs_db *fsdb, char *tgtname,
3869                               enum lcfg_command_type cmd,
3870                               char *fsname, char *poolname,
3871                               char *ostname, char *comment)
3872 {
3873         struct llog_handle *llh = NULL;
3874         int rc;
3875
3876         rc = record_start_log(env, mgs, &llh, logname);
3877         if (rc)
3878                 return rc;
3879         rc = record_marker(env, llh, fsdb, CM_START, tgtname, comment);
3880         if (rc)
3881                 goto out;
3882         rc = record_base(env, llh, tgtname, 0, cmd,
3883                          fsname, poolname, ostname, 0);
3884         if (rc)
3885                 goto out;
3886         rc = record_marker(env, llh, fsdb, CM_END, tgtname, comment);
3887 out:
3888         record_end_log(env, &llh);
3889         return rc;
3890 }
3891
3892 int mgs_pool_cmd(const struct lu_env *env, struct mgs_device *mgs,
3893                  enum lcfg_command_type cmd, char *fsname,
3894                  char *poolname, char *ostname)
3895 {
3896         struct fs_db *fsdb;
3897         char *lovname;
3898         char *logname;
3899         char *label = NULL, *canceled_label = NULL;
3900         int label_sz;
3901         struct mgs_target_info *mti = NULL;
3902         int rc, i;
3903         ENTRY;
3904
3905         rc = mgs_find_or_make_fsdb(env, mgs, fsname, &fsdb);
3906         if (rc) {
3907                 CERROR("Can't get db for %s\n", fsname);
3908                 RETURN(rc);
3909         }
3910         if (test_bit(FSDB_LOG_EMPTY, &fsdb->fsdb_flags)) {
3911                 CERROR("%s is not defined\n", fsname);
3912                 mgs_free_fsdb(mgs, fsdb);
3913                 RETURN(-EINVAL);
3914         }
3915
3916         label_sz = 10 + strlen(fsname) + strlen(poolname);
3917
3918         /* check if ostname match fsname */
3919         if (ostname != NULL) {
3920                 char *ptr;
3921
3922                 ptr = strrchr(ostname, '-');
3923                 if ((ptr == NULL) ||
3924                     (strncmp(fsname, ostname, ptr-ostname) != 0))
3925                         RETURN(-EINVAL);
3926                 label_sz += strlen(ostname);
3927         }
3928
3929         OBD_ALLOC(label, label_sz);
3930         if (label == NULL)
3931                 RETURN(-ENOMEM);
3932
3933         switch(cmd) {
3934         case LCFG_POOL_NEW:
3935                 sprintf(label,
3936                         "new %s.%s", fsname, poolname);
3937                 break;
3938         case LCFG_POOL_ADD:
3939                 sprintf(label,
3940                         "add %s.%s.%s", fsname, poolname, ostname);
3941                 break;
3942         case LCFG_POOL_REM:
3943                 OBD_ALLOC(canceled_label, label_sz);
3944                 if (canceled_label == NULL)
3945                         GOTO(out_label, rc = -ENOMEM);
3946                 sprintf(label,
3947                         "rem %s.%s.%s", fsname, poolname, ostname);
3948                 sprintf(canceled_label,
3949                         "add %s.%s.%s", fsname, poolname, ostname);
3950                 break;
3951         case LCFG_POOL_DEL:
3952                 OBD_ALLOC(canceled_label, label_sz);
3953                 if (canceled_label == NULL)
3954                         GOTO(out_label, rc = -ENOMEM);
3955                 sprintf(label,
3956                         "del %s.%s", fsname, poolname);
3957                 sprintf(canceled_label,
3958                         "new %s.%s", fsname, poolname);
3959                 break;
3960         default:
3961                 break;
3962         }
3963
3964         if (canceled_label != NULL) {
3965                 OBD_ALLOC_PTR(mti);
3966                 if (mti == NULL)
3967                         GOTO(out_cancel, rc = -ENOMEM);
3968         }
3969
3970         mutex_lock(&fsdb->fsdb_mutex);
3971         /* write pool def to all MDT logs */
3972         for (i = 0; i < INDEX_MAP_SIZE * 8; i++) {
3973                 if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
3974                         rc = name_create_mdt_and_lov(&logname, &lovname,
3975                                                      fsdb, i);
3976                         if (rc) {
3977                                 mutex_unlock(&fsdb->fsdb_mutex);
3978                                 GOTO(out_mti, rc);
3979                         }
3980                         if (canceled_label != NULL) {
3981                                 strcpy(mti->mti_svname, "lov pool");
3982                                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
3983                                                 lovname, canceled_label,
3984                                                 CM_SKIP);
3985                         }
3986
3987                         if (rc >= 0)
3988                                 rc = mgs_write_log_pool(env, mgs, logname,
3989                                                         fsdb, lovname, cmd,
3990                                                         fsname, poolname,
3991                                                         ostname, label);
3992                         name_destroy(&logname);
3993                         name_destroy(&lovname);
3994                         if (rc) {
3995                                 mutex_unlock(&fsdb->fsdb_mutex);
3996                                 GOTO(out_mti, rc);
3997                         }
3998                 }
3999         }
4000
4001         rc = name_create(&logname, fsname, "-client");
4002         if (rc) {
4003                 mutex_unlock(&fsdb->fsdb_mutex);
4004                 GOTO(out_mti, rc);
4005         }
4006         if (canceled_label != NULL) {
4007                 rc = mgs_modify(env, mgs, fsdb, mti, logname,
4008                                 fsdb->fsdb_clilov, canceled_label, CM_SKIP);
4009                 if (rc < 0) {
4010                         mutex_unlock(&fsdb->fsdb_mutex);
4011                         name_destroy(&logname);
4012                         GOTO(out_mti, rc);
4013                 }
4014         }
4015
4016         rc = mgs_write_log_pool(env, mgs, logname, fsdb, fsdb->fsdb_clilov,
4017                                 cmd, fsname, poolname, ostname, label);
4018         mutex_unlock(&fsdb->fsdb_mutex);
4019         name_destroy(&logname);
4020         /* request for update */
4021         mgs_revoke_lock(mgs, fsdb, CONFIG_T_CONFIG);
4022
4023         EXIT;
4024 out_mti:
4025         if (mti != NULL)
4026                 OBD_FREE_PTR(mti);
4027 out_cancel:
4028         if (canceled_label != NULL)
4029                 OBD_FREE(canceled_label, label_sz);
4030 out_label:
4031         OBD_FREE(label, label_sz);
4032         return rc;
4033 }